You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ak...@apache.org on 2023/06/03 06:34:06 UTC

[arrow-datafusion] branch main updated: Resolve contradictory requirements by conversion of ordering sensitive aggregators (#6482)

This is an automated email from the ASF dual-hosted git repository.

akurmustafa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 5ddcbc42c1 Resolve contradictory requirements by conversion of ordering sensitive aggregators (#6482)
5ddcbc42c1 is described below

commit 5ddcbc42c1d601359a621590862605eac07470fc
Author: Mustafa Akur <10...@users.noreply.github.com>
AuthorDate: Sat Jun 3 09:34:00 2023 +0300

    Resolve contradictory requirements by conversion of ordering sensitive aggregators (#6482)
    
    * Naive test pass
    
    i
    
    * Add new tests and simplifications
    
    * move tests to the .slt file
    
    * update requirement
    
    * update tests
    
    * Add support for partiallyOrdered aggregation sensitive.
    
    * Resolve linter errors
    
    * update comments
    
    * minor changes
    
    * retract changes in generated
    
    * update proto files
    
    * Simplifications
    
    * Make types consistent in schema, and data
    
    * Update todos
    
    * Convert API to vector
    
    * Convert get_finest to handle Vector inputs
    
    * simplifications, update comment
    
    * initial commit, add test
    
    * Add support for FIRST Aggregate function.
    
    * Add support for last aggregate
    
    * Update cargo.lock
    
    * Remove distinct, and limit from First and last aggregate.
    
    * Add reverse for First and Last Aggregator
    
    * Update cargo lock
    
    * Minor code simplifications
    
    * Update comment
    
    * Update documents
    
    * Fix projection pushdown bug
    
    * fix projection push down failure bug
    
    * combine first_agg and last_agg parsers
    
    * Update documentation
    
    * Update subproject
    
    * initial commit
    
    * Add test code
    
    * initial version
    
    * simplify prints
    
    * minor changes
    
    * sqllogictests pass
    
    * All tests pass
    
    * update proto function names
    
    * Minor changes
    
    * do not consider ordering requirement in ordering insensitive aggregators
    
    * Reject aggregate order by for window functions.
    
    * initial commit
    
    * Add new tests
    
    * Add reverse for sort removal
    
    * simplifications
    
    * simplifications
    
    * Bug fix, do not consider reverse requirement if it is not possible
    
    * Fix cargo lock file
    
    * Change reverse_order_by function place
    
    * Move required input ordering calculation logic to its own function
    
    * Add new tests
    
    * simplifications
    
    * Update comment
    
    * Rename aggregator first and last
    
    * minor change
    
    * Comment improvements
    
    * Remove count from First,Last accumulators
    
    * Remove clone
    
    * Simplifications
    
    * Simplifications, update comment
    
    * Improve comments
    
    * Move LexOrdering requirement to common place
    
    * Update comment, refactor implementation
    
    * bug fix:
    
    * Use naive requirement when reverse requirement is not helpful by convention.
    
    * Update test
    
    * Update comments
    
    * Change function place
    
    * Move get_finer to utls
    
    * change name of last, first impl,
    
    * Fix error message
    
    * change display of first and last
    
    ---------
    
    Co-authored-by: Mehmet Ozan Kabak <oz...@gmail.com>
    Co-authored-by: berkaysynnada <be...@synnada.ai>
---
 datafusion/core/src/execution/context.rs           |   5 +-
 .../core/src/physical_plan/aggregates/mod.rs       | 338 +++++++++++++++------
 .../tests/sqllogictests/test_files/aggregate.slt   |   4 +-
 .../tests/sqllogictests/test_files/explain.slt     |   2 +-
 .../tests/sqllogictests/test_files/groupby.slt     | 205 +++++++++++++
 .../physical-expr/src/aggregate/first_last.rs      |  14 +-
 datafusion/physical-expr/src/aggregate/mod.rs      |  11 +
 datafusion/physical-expr/src/lib.rs                |   6 +-
 datafusion/physical-expr/src/sort_expr.rs          |   5 +-
 datafusion/physical-expr/src/utils.rs              |  38 +++
 datafusion/physical-expr/src/window/aggregate.rs   |   6 +-
 datafusion/physical-expr/src/window/built_in.rs    |   4 +-
 .../physical-expr/src/window/sliding_aggregate.rs  |   6 +-
 datafusion/physical-expr/src/window/window_expr.rs |  13 -
 14 files changed, 531 insertions(+), 126 deletions(-)

diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs
index f43d7c87c4..69f621bb44 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -1460,14 +1460,13 @@ impl SessionState {
             // The EnforceDistribution rule is for adding essential repartition to satisfy the required
             // distribution. Please make sure that the whole plan tree is determined before this rule.
             Arc::new(EnforceDistribution::new()),
+            // The CombinePartialFinalAggregate rule should be applied after the EnforceDistribution rule
+            Arc::new(CombinePartialFinalAggregate::new()),
             // The EnforceSorting rule is for adding essential local sorting to satisfy the required
             // ordering. Please make sure that the whole plan tree is determined before this rule.
             // Note that one should always run this rule after running the EnforceDistribution rule
             // as the latter may break local sorting requirements.
             Arc::new(EnforceSorting::new()),
-            // The CombinePartialFinalAggregate rule should be applied after the EnforceDistribution
-            // and EnforceSorting rules
-            Arc::new(CombinePartialFinalAggregate::new()),
             // The CoalesceBatches rule will not influence the distribution and ordering of the
             // whole plan tree. Therefore, to avoid influencing other rules, it should run last.
             Arc::new(CoalesceBatches::new()),
diff --git a/datafusion/core/src/physical_plan/aggregates/mod.rs b/datafusion/core/src/physical_plan/aggregates/mod.rs
index 9eba869290..455a86660e 100644
--- a/datafusion/core/src/physical_plan/aggregates/mod.rs
+++ b/datafusion/core/src/physical_plan/aggregates/mod.rs
@@ -37,10 +37,10 @@ use datafusion_physical_expr::{
     aggregate::row_accumulator::RowAccumulator,
     equivalence::project_equivalence_properties,
     expressions::{Avg, CastExpr, Column, Sum},
-    normalize_out_expr_with_columns_map,
-    utils::{convert_to_expr, get_indices_of_matching_exprs, ordering_satisfy_concrete},
-    AggregateExpr, OrderingEquivalenceProperties, PhysicalExpr, PhysicalSortExpr,
-    PhysicalSortRequirement,
+    normalize_out_expr_with_columns_map, reverse_order_bys,
+    utils::{convert_to_expr, get_indices_of_matching_exprs},
+    AggregateExpr, LexOrdering, LexOrderingReq, OrderingEquivalenceProperties,
+    PhysicalExpr, PhysicalSortExpr, PhysicalSortRequirement,
 };
 use std::any::Any;
 use std::collections::HashMap;
@@ -52,8 +52,11 @@ mod row_hash;
 mod utils;
 
 pub use datafusion_expr::AggregateFunction;
+use datafusion_physical_expr::aggregate::is_order_sensitive;
 pub use datafusion_physical_expr::expressions::create_aggregate_expr;
-use datafusion_physical_expr::expressions::{ArrayAgg, FirstValue, LastValue};
+use datafusion_physical_expr::utils::{
+    get_finer_ordering, ordering_satisfy_requirement_concrete,
+};
 
 /// Hash aggregate modes
 #[derive(Debug, Copy, Clone, PartialEq, Eq)]
@@ -213,7 +216,7 @@ pub(crate) struct AggregationOrdering {
     /// expressions match input ordering.
     order_indices: Vec<usize>,
     /// Actual ordering information of the GROUP BY columns.
-    ordering: Vec<PhysicalSortExpr>,
+    ordering: LexOrdering,
 }
 
 /// Hash aggregate execution plan
@@ -228,7 +231,7 @@ pub struct AggregateExec {
     /// FILTER (WHERE clause) expression for each aggregate expression
     pub(crate) filter_expr: Vec<Option<Arc<dyn PhysicalExpr>>>,
     /// (ORDER BY clause) expression for each aggregate expression
-    pub(crate) order_by_expr: Vec<Option<Vec<PhysicalSortExpr>>>,
+    pub(crate) order_by_expr: Vec<Option<LexOrdering>>,
     /// Input plan, could be a partial aggregate or the input to the aggregate
     pub(crate) input: Arc<dyn ExecutionPlan>,
     /// Schema after the aggregate is applied
@@ -244,7 +247,7 @@ pub struct AggregateExec {
     metrics: ExecutionPlanMetricsSet,
     /// Stores mode and output ordering information for the `AggregateExec`.
     aggregation_ordering: Option<AggregationOrdering>,
-    required_input_ordering: Option<Vec<PhysicalSortRequirement>>,
+    required_input_ordering: Option<LexOrderingReq>,
 }
 
 /// Calculates the working mode for `GROUP BY` queries.
@@ -339,6 +342,29 @@ fn output_group_expr_helper(group_by: &PhysicalGroupBy) -> Vec<Arc<dyn PhysicalE
         .collect()
 }
 
+/// This function returns the ordering requirement of the first non-reversible
+/// order-sensitive aggregate function such as ARRAY_AGG. This requirement serves
+/// as the initial requirement while calculating the finest requirement among all
+/// aggregate functions. If this function returns `None`, it means there is no
+/// hard ordering requirement for the aggregate functions (in terms of direction).
+/// Then, we can generate two alternative requirements with opposite directions.
+fn get_init_req(
+    aggr_expr: &[Arc<dyn AggregateExpr>],
+    order_by_expr: &[Option<LexOrdering>],
+) -> Option<LexOrdering> {
+    for (aggr_expr, fn_reqs) in aggr_expr.iter().zip(order_by_expr.iter()) {
+        // If the aggregation function is a non-reversible order-sensitive function
+        // and there is a hard requirement, choose first such requirement:
+        if is_order_sensitive(aggr_expr)
+            && aggr_expr.reverse_expr().is_none()
+            && fn_reqs.is_some()
+        {
+            return fn_reqs.clone();
+        }
+    }
+    None
+}
+
 /// This function gets the finest ordering requirement among all the aggregation
 /// functions. If requirements are conflicting, (i.e. we can not compute the
 /// aggregations in a single [`AggregateExec`]), the function returns an error.
@@ -346,33 +372,45 @@ fn get_finest_requirement<
     F: Fn() -> EquivalenceProperties,
     F2: Fn() -> OrderingEquivalenceProperties,
 >(
-    order_by_expr: &[Option<Vec<PhysicalSortExpr>>],
+    aggr_expr: &mut [Arc<dyn AggregateExpr>],
+    order_by_expr: &mut [Option<LexOrdering>],
     eq_properties: F,
     ordering_eq_properties: F2,
-) -> Result<Option<Vec<PhysicalSortExpr>>> {
-    let mut result: Option<Vec<PhysicalSortExpr>> = None;
-    for fn_reqs in order_by_expr.iter().flatten() {
-        if let Some(result) = &mut result {
-            if ordering_satisfy_concrete(
-                result,
-                fn_reqs,
+) -> Result<Option<LexOrdering>> {
+    let mut finest_req = get_init_req(aggr_expr, order_by_expr);
+    for (aggr_expr, fn_req) in aggr_expr.iter_mut().zip(order_by_expr.iter_mut()) {
+        let fn_req = if let Some(fn_req) = fn_req {
+            fn_req
+        } else {
+            continue;
+        };
+        if let Some(finest_req) = &mut finest_req {
+            if let Some(finer) = get_finer_ordering(
+                finest_req,
+                fn_req,
                 &eq_properties,
                 &ordering_eq_properties,
             ) {
-                // Do not update the result as it already satisfies current
-                // function's requirement:
+                *finest_req = finer.to_vec();
                 continue;
             }
-            if ordering_satisfy_concrete(
-                fn_reqs,
-                result,
-                &eq_properties,
-                &ordering_eq_properties,
-            ) {
-                // Update result with current function's requirements, as it is
-                // a finer requirement than what we currently have.
-                *result = fn_reqs.clone();
-                continue;
+            // If an aggregate function is reversible, analyze whether its reverse
+            // direction is compatible with existing requirements:
+            if let Some(reverse) = aggr_expr.reverse_expr() {
+                let fn_req_reverse = reverse_order_bys(fn_req);
+                if let Some(finer) = get_finer_ordering(
+                    finest_req,
+                    &fn_req_reverse,
+                    &eq_properties,
+                    &ordering_eq_properties,
+                ) {
+                    // We need to update `aggr_expr` with its reverse, since only its
+                    // reverse requirement is compatible with existing requirements:
+                    *aggr_expr = reverse;
+                    *finest_req = finer.to_vec();
+                    *fn_req = fn_req_reverse;
+                    continue;
+                }
             }
             // If neither of the requirements satisfy the other, this means
             // requirements are conflicting. Currently, we do not support
@@ -381,20 +419,107 @@ fn get_finest_requirement<
                 "Conflicting ordering requirements in aggregate functions is not supported".to_string(),
             ));
         } else {
-            result = Some(fn_reqs.clone());
+            finest_req = Some(fn_req.clone());
         }
     }
-    Ok(result)
+    Ok(finest_req)
 }
 
-/// Checks whether the given aggregate expression is order-sensitive.
-/// For instance, a `SUM` aggregation doesn't depend on the order of its inputs.
-/// However, a `FirstAgg` depends on the input ordering (if the order changes,
-/// the first value in the list would change).
-fn is_order_sensitive(aggr_expr: &Arc<dyn AggregateExpr>) -> bool {
-    aggr_expr.as_any().is::<FirstValue>()
-        || aggr_expr.as_any().is::<LastValue>()
-        || aggr_expr.as_any().is::<ArrayAgg>()
+/// Calculate the required input ordering for the [`AggregateExec`] by considering
+/// ordering requirements of order-sensitive aggregation functions.
+fn calc_required_input_ordering(
+    input: &Arc<dyn ExecutionPlan>,
+    aggr_expr: &mut [Arc<dyn AggregateExpr>],
+    aggregator_reqs: LexOrderingReq,
+    aggregator_reverse_reqs: Option<LexOrderingReq>,
+    aggregation_ordering: &Option<AggregationOrdering>,
+) -> Result<Option<LexOrderingReq>> {
+    let mut required_input_ordering = vec![];
+    // Boolean shows that whether `required_input_ordering` stored comes from
+    // `aggregator_reqs` or `aggregator_reverse_reqs`
+    let mut reverse_req = false;
+    // If reverse aggregator is None, there is no way to run aggregators in reverse mode. Hence ignore it during analysis
+    let aggregator_requirements =
+        if let Some(aggregator_reverse_reqs) = aggregator_reverse_reqs {
+            // If existing ordering doesn't satisfy requirement, we should do calculations
+            // on naive requirement (by convention, otherwise the final plan will be unintuitive),
+            // even if reverse ordering is possible.
+            // Hence, while iterating consider naive requirement last, by this way
+            // we prioritize naive requirement over reverse requirement, when
+            // reverse requirement is not helpful with removing SortExec from the plan.
+            vec![(true, aggregator_reverse_reqs), (false, aggregator_reqs)]
+        } else {
+            vec![(false, aggregator_reqs)]
+        };
+    for (is_reverse, aggregator_requirement) in aggregator_requirements.into_iter() {
+        if let Some(AggregationOrdering {
+            ordering,
+            // If the mode is FullyOrdered or PartiallyOrdered (i.e. we are
+            // running with bounded memory, without breaking the pipeline),
+            // then we append the aggregator ordering requirement to the existing
+            // ordering. This way, we can still run with bounded memory.
+            mode: GroupByOrderMode::FullyOrdered | GroupByOrderMode::PartiallyOrdered,
+            ..
+        }) = aggregation_ordering
+        {
+            // Get the section of the input ordering that enables us to run in
+            // FullyOrdered or PartiallyOrdered modes:
+            let requirement_prefix =
+                if let Some(existing_ordering) = input.output_ordering() {
+                    &existing_ordering[0..ordering.len()]
+                } else {
+                    &[]
+                };
+            let mut requirement =
+                PhysicalSortRequirement::from_sort_exprs(requirement_prefix.iter());
+            for req in aggregator_requirement {
+                if requirement.iter().all(|item| req.expr.ne(&item.expr)) {
+                    requirement.push(req);
+                }
+            }
+            required_input_ordering = requirement;
+        } else {
+            required_input_ordering = aggregator_requirement;
+        }
+        // keep track of from which direction required_input_ordering is constructed
+        reverse_req = is_reverse;
+        // If all of the order-sensitive aggregate functions are reversible (such as all of the order-sensitive aggregators are
+        // either FIRST_VALUE or LAST_VALUE). We can run aggregate expressions both in the direction of naive required ordering
+        // (e.g finest requirement that satisfy each aggregate function requirement) and in its reversed (opposite) direction.
+        // We analyze these two possibilities, and use the version that satisfies existing ordering (This saves us adding
+        // unnecessary SortExec to the final plan). If none of the versions satisfy existing ordering, we use naive required ordering.
+        // In short, if running aggregators in reverse order, helps us to remove a `SortExec`, we do so. Otherwise, we use aggregators as is.
+        let existing_ordering = input.output_ordering().unwrap_or(&[]);
+        if ordering_satisfy_requirement_concrete(
+            existing_ordering,
+            &required_input_ordering,
+            || input.equivalence_properties(),
+            || input.ordering_equivalence_properties(),
+        ) {
+            break;
+        }
+    }
+    // If `required_input_ordering` is constructed using reverse requirement, we should reverse
+    // each `aggr_expr` to be able to correctly calculate their result in reverse order.
+    if reverse_req {
+        aggr_expr
+            .iter_mut()
+            .map(|elem| {
+                if is_order_sensitive(elem) {
+                    if let Some(reverse) = elem.reverse_expr() {
+                        *elem = reverse;
+                    } else {
+                        return Err(DataFusionError::Execution(
+                            "Aggregate expression should have a reverse expression"
+                                .to_string(),
+                        ));
+                    }
+                }
+                Ok(())
+            })
+            .collect::<Result<Vec<_>>>()?;
+    }
+    Ok((!required_input_ordering.is_empty()).then_some(required_input_ordering))
 }
 
 impl AggregateExec {
@@ -402,9 +527,9 @@ impl AggregateExec {
     pub fn try_new(
         mode: AggregateMode,
         group_by: PhysicalGroupBy,
-        aggr_expr: Vec<Arc<dyn AggregateExpr>>,
+        mut aggr_expr: Vec<Arc<dyn AggregateExpr>>,
         filter_expr: Vec<Option<Arc<dyn PhysicalExpr>>>,
-        mut order_by_expr: Vec<Option<Vec<PhysicalSortExpr>>>,
+        mut order_by_expr: Vec<Option<LexOrdering>>,
         input: Arc<dyn ExecutionPlan>,
         input_schema: SchemaRef,
     ) -> Result<Self> {
@@ -417,30 +542,53 @@ impl AggregateExec {
         )?;
 
         let schema = Arc::new(schema);
-        let mut aggregator_requirement = None;
-        // Ordering requirement makes sense only in Partial and Single modes.
-        // In other modes, all groups are collapsed, therefore their input schema
-        // can not contain expressions in the requirement.
-        if mode == AggregateMode::Partial || mode == AggregateMode::Single {
-            order_by_expr = aggr_expr
-                .iter()
-                .zip(order_by_expr.into_iter())
-                .map(|(aggr_expr, fn_reqs)| {
-                    // If aggregation function is ordering sensitive, keep ordering requirement as is; otherwise ignore requirement
-                    if is_order_sensitive(aggr_expr) {
-                        fn_reqs
-                    } else {
-                        None
-                    }
-                })
-                .collect::<Vec<_>>();
+        // Reset ordering requirement to `None` if aggregator is not order-sensitive
+        order_by_expr = aggr_expr
+            .iter()
+            .zip(order_by_expr.into_iter())
+            .map(|(aggr_expr, fn_reqs)| {
+                // If aggregation function is ordering sensitive, keep ordering requirement as is; otherwise ignore requirement
+                if is_order_sensitive(aggr_expr) {
+                    fn_reqs
+                } else {
+                    None
+                }
+            })
+            .collect::<Vec<_>>();
+
+        let mut aggregator_reqs = vec![];
+        let mut aggregator_reverse_reqs = None;
+        // Currently we support order-sensitive aggregation only in `Single` mode.
+        // For `Final` and `FinalPartitioned` modes, we cannot guarantee they will receive
+        // data according to ordering requirements. As long as we cannot produce correct result
+        // in `Final` mode, it is not important to produce correct result in `Partial` mode.
+        // We only support `Single` mode, where we are sure that output produced is final, and it
+        // is produced in a single step.
+        if mode == AggregateMode::Single {
             let requirement = get_finest_requirement(
-                &order_by_expr,
+                &mut aggr_expr,
+                &mut order_by_expr,
                 || input.equivalence_properties(),
                 || input.ordering_equivalence_properties(),
             )?;
-            aggregator_requirement = requirement
+            let aggregator_requirement = requirement
+                .as_ref()
                 .map(|exprs| PhysicalSortRequirement::from_sort_exprs(exprs.iter()));
+            aggregator_reqs = aggregator_requirement.unwrap_or(vec![]);
+            // If all aggregate expressions are reversible, also consider reverse
+            // requirement(s). The reason is that existing ordering may satisfy the
+            // given requirement or its reverse. By considering both, we can generate better plans.
+            if aggr_expr
+                .iter()
+                .all(|expr| !is_order_sensitive(expr) || expr.reverse_expr().is_some())
+            {
+                let reverse_agg_requirement = requirement.map(|reqs| {
+                    PhysicalSortRequirement::from_sort_exprs(
+                        reverse_order_bys(&reqs).iter(),
+                    )
+                });
+                aggregator_reverse_reqs = reverse_agg_requirement;
+            }
         }
 
         // construct a map from the input columns to the output columns of the Aggregation
@@ -455,37 +603,22 @@ impl AggregateExec {
 
         let aggregation_ordering = calc_aggregation_ordering(&input, &group_by);
 
-        let mut required_input_ordering = None;
-        if let Some(AggregationOrdering {
-            ordering,
-            // If the mode is FullyOrdered or PartiallyOrdered (i.e. we are
-            // running with bounded memory, without breaking pipeline), then
-            // we append aggregator ordering requirement to the existing
-            // ordering. This way, we can still run with bounded memory.
-            mode: GroupByOrderMode::FullyOrdered | GroupByOrderMode::PartiallyOrdered,
-            ..
-        }) = &aggregation_ordering
+        let required_input_ordering = calc_required_input_ordering(
+            &input,
+            &mut aggr_expr,
+            aggregator_reqs,
+            aggregator_reverse_reqs,
+            &aggregation_ordering,
+        )?;
+
+        // If aggregator is working on multiple partitions and there is an order-sensitive aggregator with a requirement return error.
+        if input.output_partitioning().partition_count() > 1
+            && order_by_expr.iter().any(|req| req.is_some())
         {
-            if let Some(aggregator_requirement) = aggregator_requirement {
-                // Get the section of the input ordering that enables us to run in the
-                // FullyOrdered or PartiallyOrdered mode:
-                let requirement_prefix =
-                    if let Some(existing_ordering) = input.output_ordering() {
-                        existing_ordering[0..ordering.len()].to_vec()
-                    } else {
-                        vec![]
-                    };
-                let mut requirement =
-                    PhysicalSortRequirement::from_sort_exprs(requirement_prefix.iter());
-                for req in aggregator_requirement {
-                    if requirement.iter().all(|item| req.expr.ne(&item.expr)) {
-                        requirement.push(req);
-                    }
-                }
-                required_input_ordering = Some(requirement);
-            }
-        } else {
-            required_input_ordering = aggregator_requirement;
+            return Err(DataFusionError::NotImplemented(
+                "Order-sensitive aggregators is not supported on multiple partitions"
+                    .to_string(),
+            ));
         }
 
         Ok(AggregateExec {
@@ -530,7 +663,7 @@ impl AggregateExec {
     }
 
     /// ORDER BY clause expression for each aggregate expression
-    pub fn order_by_expr(&self) -> &[Option<Vec<PhysicalSortExpr>>] {
+    pub fn order_by_expr(&self) -> &[Option<LexOrdering>] {
         &self.order_by_expr
     }
 
@@ -642,7 +775,7 @@ impl ExecutionPlan for AggregateExec {
         }
     }
 
-    fn required_input_ordering(&self) -> Vec<Option<Vec<PhysicalSortRequirement>>> {
+    fn required_input_ordering(&self) -> Vec<Option<LexOrderingReq>> {
         vec![self.required_input_ordering.clone()]
     }
 
@@ -1065,7 +1198,7 @@ mod tests {
     use datafusion_common::{DataFusionError, Result, ScalarValue};
     use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv};
     use datafusion_physical_expr::expressions::{
-        lit, ApproxDistinct, Column, Count, Median,
+        lit, ApproxDistinct, Column, Count, FirstValue, Median,
     };
     use datafusion_physical_expr::{
         AggregateExpr, EquivalenceProperties, OrderedColumn,
@@ -1714,6 +1847,7 @@ mod tests {
             descending: false,
             nulls_first: false,
         };
+        // This is the reverse requirement of options1
         let options2 = SortOptions {
             descending: true,
             nulls_first: true,
@@ -1729,8 +1863,7 @@ mod tests {
             &vec![OrderedColumn::new(col_a.clone(), options1)],
             &vec![OrderedColumn::new(col_c.clone(), options2)],
         ));
-
-        let order_by_exprs = vec![
+        let mut order_by_exprs = vec![
             None,
             Some(vec![PhysicalSortExpr {
                 expr: Arc::new(col_a.clone()),
@@ -1746,7 +1879,7 @@ mod tests {
             }]),
             Some(vec![
                 PhysicalSortExpr {
-                    expr: Arc::new(col_a),
+                    expr: Arc::new(col_a.clone()),
                     options: options1,
                 },
                 PhysicalSortExpr {
@@ -1754,9 +1887,22 @@ mod tests {
                     options: options1,
                 },
             ]),
+            // Since aggregate expression is reversible (FirstValue), we should be able to resolve below
+            // contradictory requirement by reversing it.
+            Some(vec![PhysicalSortExpr {
+                expr: Arc::new(col_b.clone()),
+                options: options2,
+            }]),
         ];
+        let aggr_expr = Arc::new(FirstValue::new(
+            Arc::new(col_a.clone()),
+            "first1",
+            DataType::Int32,
+        )) as _;
+        let mut aggr_exprs = vec![aggr_expr; order_by_exprs.len()];
         let res = get_finest_requirement(
-            &order_by_exprs,
+            &mut aggr_exprs,
+            &mut order_by_exprs,
             || eq_properties.clone(),
             || ordering_eq_properties.clone(),
         )?;
diff --git a/datafusion/core/tests/sqllogictests/test_files/aggregate.slt b/datafusion/core/tests/sqllogictests/test_files/aggregate.slt
index ab3516e9e5..fea1b6cb8e 100644
--- a/datafusion/core/tests/sqllogictests/test_files/aggregate.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/aggregate.slt
@@ -65,7 +65,7 @@ statement error Error during planning: The percentile sample points count for Ap
 SELECT approx_percentile_cont(c3, 0.95, 111.1) FROM aggregate_test_100
 
 # csv_query_array_agg_unsupported
-statement error This feature is not implemented: ORDER BY not supported in ARRAY_AGG: c1
+statement error This feature is not implemented: Order-sensitive aggregators is not supported on multiple partitions
 SELECT array_agg(c13 ORDER BY c1) FROM aggregate_test_100
 
 statement error This feature is not implemented: LIMIT not supported in ARRAY_AGG: 1
@@ -1169,7 +1169,7 @@ select c2, sum(c3) sum_c3, avg(c3) avg_c3, max(c3) max_c3, min(c3) min_c3, count
 5 -194 -13.857142857143 118 -101 14
 
 # csv_query_array_agg_unsupported
-statement error This feature is not implemented: ORDER BY not supported in ARRAY_AGG: c1
+statement error This feature is not implemented: Order-sensitive aggregators is not supported on multiple partitions
 SELECT array_agg(c13 ORDER BY c1) FROM aggregate_test_100;
 
 # csv_query_array_cube_agg_with_overflow
diff --git a/datafusion/core/tests/sqllogictests/test_files/explain.slt b/datafusion/core/tests/sqllogictests/test_files/explain.slt
index 75002fecb1..6a9d07aba7 100644
--- a/datafusion/core/tests/sqllogictests/test_files/explain.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/explain.slt
@@ -208,8 +208,8 @@ physical_plan after PipelineFixer SAME TEXT AS ABOVE
 physical_plan after repartition SAME TEXT AS ABOVE
 physical_plan after global_sort_selection SAME TEXT AS ABOVE
 physical_plan after EnforceDistribution SAME TEXT AS ABOVE
-physical_plan after EnforceSorting SAME TEXT AS ABOVE
 physical_plan after CombinePartialFinalAggregate SAME TEXT AS ABOVE
+physical_plan after EnforceSorting SAME TEXT AS ABOVE
 physical_plan after coalesce_batches SAME TEXT AS ABOVE
 physical_plan after PipelineChecker SAME TEXT AS ABOVE
 physical_plan CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], has_header=true
diff --git a/datafusion/core/tests/sqllogictests/test_files/groupby.slt b/datafusion/core/tests/sqllogictests/test_files/groupby.slt
index 08523648dc..9868e684ca 100644
--- a/datafusion/core/tests/sqllogictests/test_files/groupby.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/groupby.slt
@@ -2384,3 +2384,208 @@ SELECT s.country, ARRAY_AGG(s.amount ORDER BY s.country DESC, s.amount DESC) AS
 FRA [200.0, 50.0] 250
 GRC [80.0, 30.0] 110
 TUR [100.0, 75.0] 175
+
+# test_reverse_aggregate_expr
+# Some of the Aggregators can be reversed, by this way we can still run aggregators without re-ordering
+# that have contradictory requirements at first glance.
+query TT
+EXPLAIN SELECT country, ARRAY_AGG(amount ORDER BY amount DESC) AS amounts,
+  FIRST_VALUE(amount ORDER BY amount ASC) AS fv1,
+  LAST_VALUE(amount ORDER BY amount DESC) AS fv2
+  FROM sales_global
+  GROUP BY country
+----
+logical_plan
+Projection: sales_global.country, ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST] AS amounts, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST] AS fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST] AS fv2
+--Aggregate: groupBy=[[sales_global.country]], aggr=[[ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST], FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST]]]
+----TableScan: sales_global projection=[country, amount]
+physical_plan
+ProjectionExec: expr=[country@0 as country, ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST]@1 as amounts, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@2 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST]@3 as fv2]
+--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[ARRAY_AGG(sales_global.amount), LAST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)]
+----SortExec: expr=[amount@1 DESC]
+------MemoryExec: partitions=1, partition_sizes=[1]
+
+query T?RR
+SELECT country, ARRAY_AGG(amount ORDER BY amount DESC) AS amounts,
+  FIRST_VALUE(amount ORDER BY amount ASC) AS fv1,
+  LAST_VALUE(amount ORDER BY amount DESC) AS fv2
+  FROM sales_global
+  GROUP BY country
+----
+FRA [200.0, 50.0] 50 50
+TUR [100.0, 75.0] 75 75
+GRC [80.0, 30.0] 30 30
+
+# test_reverse_aggregate_expr2
+# Some of the Aggregators can be reversed, by this way we can still run aggregators without re-ordering
+# that have contradictory requirements at first glance.
+query TT
+EXPLAIN SELECT country, ARRAY_AGG(amount ORDER BY amount ASC) AS amounts,
+  FIRST_VALUE(amount ORDER BY amount ASC) AS fv1,
+  LAST_VALUE(amount ORDER BY amount DESC) AS fv2
+  FROM sales_global
+  GROUP BY country
+----
+logical_plan
+Projection: sales_global.country, ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST] AS amounts, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST] AS fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST] AS fv2
+--Aggregate: groupBy=[[sales_global.country]], aggr=[[ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST], FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST]]]
+----TableScan: sales_global projection=[country, amount]
+physical_plan
+ProjectionExec: expr=[country@0 as country, ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@1 as amounts, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@2 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST]@3 as fv2]
+--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[ARRAY_AGG(sales_global.amount), FIRST_VALUE(sales_global.amount), FIRST_VALUE(sales_global.amount)]
+----SortExec: expr=[amount@1 ASC NULLS LAST]
+------MemoryExec: partitions=1, partition_sizes=[1]
+
+query T?RR
+SELECT country, ARRAY_AGG(amount ORDER BY amount ASC) AS amounts,
+  FIRST_VALUE(amount ORDER BY amount ASC) AS fv1,
+  LAST_VALUE(amount ORDER BY amount DESC) AS fv2
+  FROM sales_global
+  GROUP BY country
+----
+GRC [30.0, 80.0] 30 30
+FRA [50.0, 200.0] 50 50
+TUR [75.0, 100.0] 75 75
+
+# test_reverse_aggregate_expr3
+# Some of the Aggregators can be reversed, by this way we can still run aggregators without re-ordering
+# that have contradictory requirements at first glance. This algorithm shouldn't depend
+# on the order of the aggregation expressions.
+query TT
+EXPLAIN SELECT country, FIRST_VALUE(amount ORDER BY amount ASC) AS fv1,
+  LAST_VALUE(amount ORDER BY amount DESC) AS fv2,
+  ARRAY_AGG(amount ORDER BY amount ASC) AS amounts
+  FROM sales_global
+  GROUP BY country
+----
+logical_plan
+Projection: sales_global.country, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST] AS fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST] AS fv2, ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST] AS amounts
+--Aggregate: groupBy=[[sales_global.country]], aggr=[[FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST], ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]]]
+----TableScan: sales_global projection=[country, amount]
+physical_plan
+ProjectionExec: expr=[country@0 as country, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@1 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST]@2 as fv2, ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@3 as amounts]
+--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount), FIRST_VALUE(sales_global.amount), ARRAY_AGG(sales_global.amount)]
+----SortExec: expr=[amount@1 ASC NULLS LAST]
+------MemoryExec: partitions=1, partition_sizes=[1]
+
+query TRR?
+SELECT country, FIRST_VALUE(amount ORDER BY amount ASC) AS fv1,
+  LAST_VALUE(amount ORDER BY amount DESC) AS fv2,
+  ARRAY_AGG(amount ORDER BY amount ASC) AS amounts
+  FROM sales_global
+  GROUP BY country
+----
+GRC 30 30 [30.0, 80.0]
+FRA 50 50 [50.0, 200.0]
+TUR 75 75 [75.0, 100.0]
+
+# test_reverse_aggregate_expr4
+# Ordering requirement by the ordering insensitive aggregators shouldn't have effect on
+# final plan. Hence seemingly conflicting requirements by SUM and ARRAY_AGG shouldn't raise error.
+query TT
+EXPLAIN SELECT country, SUM(amount ORDER BY ts DESC) AS sum1,
+  ARRAY_AGG(amount ORDER BY amount ASC) AS amounts
+  FROM sales_global
+  GROUP BY country
+----
+logical_plan
+Projection: sales_global.country, SUM(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST] AS sum1, ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST] AS amounts
+--Aggregate: groupBy=[[sales_global.country]], aggr=[[SUM(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST], ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]]]
+----TableScan: sales_global projection=[country, ts, amount]
+physical_plan
+ProjectionExec: expr=[country@0 as country, SUM(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@1 as sum1, ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@2 as amounts]
+--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[SUM(sales_global.amount), ARRAY_AGG(sales_global.amount)]
+----SortExec: expr=[amount@2 ASC NULLS LAST]
+------MemoryExec: partitions=1, partition_sizes=[1]
+
+query TR?
+SELECT country, SUM(amount ORDER BY ts DESC) AS sum1,
+  ARRAY_AGG(amount ORDER BY amount ASC) AS amounts
+  FROM sales_global
+  GROUP BY country
+----
+GRC 110 [30.0, 80.0]
+FRA 250 [50.0, 200.0]
+TUR 175 [75.0, 100.0]
+
+# test_reverse_aggregate_expr5
+# If all of the ordering sensitive aggregation functions are reversible
+# we should be able to reverse requirements, if this helps to remove a SortExec.
+# Hence in query below, FIRST_VALUE, and LAST_VALUE should be reversed to calculate its result according to `ts ASC` ordering.
+# Please note that after `ts ASC` ordering because of inner query. There is no SortExec in the final plan.
+query TT
+EXPLAIN SELECT country, FIRST_VALUE(amount ORDER BY ts DESC) as fv1,
+  LAST_VALUE(amount ORDER BY ts DESC) as lv1,
+  SUM(amount ORDER BY ts DESC) as sum1
+  FROM (SELECT *
+    FROM sales_global
+    ORDER BY ts ASC)
+  GROUP BY country
+----
+logical_plan
+Projection: sales_global.country, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST] AS fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST] AS lv1, SUM(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST] AS sum1
+--Aggregate: groupBy=[[sales_global.country]], aggr=[[FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST], SUM(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]]]
+----Sort: sales_global.ts ASC NULLS LAST
+------TableScan: sales_global projection=[country, ts, amount]
+physical_plan
+ProjectionExec: expr=[country@0 as country, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@1 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@2 as lv1, SUM(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@3 as sum1]
+--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[LAST_VALUE(sales_global.amount), FIRST_VALUE(sales_global.amount), SUM(sales_global.amount)]
+----SortExec: expr=[ts@1 ASC NULLS LAST]
+------MemoryExec: partitions=1, partition_sizes=[1]
+
+query TRRR
+SELECT country, FIRST_VALUE(amount ORDER BY ts DESC) as fv1,
+  LAST_VALUE(amount ORDER BY ts DESC) as lv1,
+  SUM(amount ORDER BY ts DESC) as sum1
+  FROM (SELECT *
+    FROM sales_global
+    ORDER BY ts ASC)
+  GROUP BY country
+----
+GRC 80 30 110
+FRA 200 50 250
+TUR 100 75 175
+
+# If existing ordering doesn't satisfy requirement, we should do calculations
+# on naive requirement (by convention, otherwise the final plan will be unintuitive),
+# even if reverse ordering is possible.
+# hence, below query should add `SortExec(ts DESC)` to the final plan.
+query TT
+EXPLAIN SELECT country, FIRST_VALUE(amount ORDER BY ts DESC) as fv1,
+    LAST_VALUE(amount ORDER BY ts DESC) as lv1,
+    SUM(amount ORDER BY ts DESC) as sum1
+  FROM sales_global
+  GROUP BY country
+----
+logical_plan
+Projection: sales_global.country, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST] AS fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST] AS lv1, SUM(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST] AS sum1
+--Aggregate: groupBy=[[sales_global.country]], aggr=[[FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST], SUM(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]]]
+----TableScan: sales_global projection=[country, ts, amount]
+physical_plan
+ProjectionExec: expr=[country@0 as country, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@1 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@2 as lv1, SUM(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@3 as sum1]
+--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount), SUM(sales_global.amount)]
+----SortExec: expr=[ts@1 DESC]
+------MemoryExec: partitions=1, partition_sizes=[1]
+
+query TRRR
+SELECT country, FIRST_VALUE(amount ORDER BY ts DESC) as fv1,
+    LAST_VALUE(amount ORDER BY ts DESC) as lv1,
+    SUM(amount ORDER BY ts DESC) as sum1
+  FROM sales_global
+  GROUP BY country
+----
+TUR 100 75 175
+GRC 80 30 110
+FRA 200 50 250
+
+# Run order-sensitive aggregators in multiple partitions
+statement ok
+set datafusion.execution.target_partitions = 2;
+
+# Currently, we do not support running order-sensitive aggregators in multiple partitions.
+statement error This feature is not implemented: Order-sensitive aggregators is not supported on multiple partitions
+SELECT country, ARRAY_AGG(amount ORDER BY amount DESC) AS amounts,
+  FIRST_VALUE(amount ORDER BY amount ASC) AS fv1,
+  LAST_VALUE(amount ORDER BY amount DESC) AS fv2
+  FROM sales_global
+  GROUP BY country
diff --git a/datafusion/physical-expr/src/aggregate/first_last.rs b/datafusion/physical-expr/src/aggregate/first_last.rs
index 5dd9620ce0..a350637c48 100644
--- a/datafusion/physical-expr/src/aggregate/first_last.rs
+++ b/datafusion/physical-expr/src/aggregate/first_last.rs
@@ -84,9 +84,14 @@ impl AggregateExpr for FirstValue {
     }
 
     fn reverse_expr(&self) -> Option<Arc<dyn AggregateExpr>> {
+        let name = if self.name.starts_with("FIRST") {
+            format!("LAST{}", &self.name[5..])
+        } else {
+            format!("LAST_VALUE({})", self.expr)
+        };
         Some(Arc::new(LastValue::new(
             self.expr.clone(),
-            self.name.clone(),
+            name,
             self.data_type.clone(),
         )))
     }
@@ -214,9 +219,14 @@ impl AggregateExpr for LastValue {
     }
 
     fn reverse_expr(&self) -> Option<Arc<dyn AggregateExpr>> {
+        let name = if self.name.starts_with("LAST") {
+            format!("FIRST{}", &self.name[4..])
+        } else {
+            format!("FIRST_VALUE({})", self.expr)
+        };
         Some(Arc::new(FirstValue::new(
             self.expr.clone(),
-            self.name.clone(),
+            name,
             self.data_type.clone(),
         )))
     }
diff --git a/datafusion/physical-expr/src/aggregate/mod.rs b/datafusion/physical-expr/src/aggregate/mod.rs
index 8da635cfb2..09fd9bcfc5 100644
--- a/datafusion/physical-expr/src/aggregate/mod.rs
+++ b/datafusion/physical-expr/src/aggregate/mod.rs
@@ -16,6 +16,7 @@
 // under the License.
 
 use crate::aggregate::row_accumulator::RowAccumulator;
+use crate::expressions::{ArrayAgg, FirstValue, LastValue};
 use crate::PhysicalExpr;
 use arrow::datatypes::Field;
 use datafusion_common::{DataFusionError, Result};
@@ -130,3 +131,13 @@ pub trait AggregateExpr: Send + Sync + Debug + PartialEq<dyn Any> {
         )))
     }
 }
+
+/// Checks whether the given aggregate expression is order-sensitive.
+/// For instance, a `SUM` aggregation doesn't depend on the order of its inputs.
+/// However, a `FirstValue` depends on the input ordering (if the order changes,
+/// the first value in the list would change).
+pub fn is_order_sensitive(aggr_expr: &Arc<dyn AggregateExpr>) -> bool {
+    aggr_expr.as_any().is::<FirstValue>()
+        || aggr_expr.as_any().is::<LastValue>()
+        || aggr_expr.as_any().is::<ArrayAgg>()
+}
diff --git a/datafusion/physical-expr/src/lib.rs b/datafusion/physical-expr/src/lib.rs
index b54bcda601..494f35566d 100644
--- a/datafusion/physical-expr/src/lib.rs
+++ b/datafusion/physical-expr/src/lib.rs
@@ -56,9 +56,11 @@ pub use equivalence::{
 pub use physical_expr::{AnalysisContext, ExprBoundaries, PhysicalExpr, PhysicalExprRef};
 pub use planner::create_physical_expr;
 pub use scalar_function::ScalarFunctionExpr;
-pub use sort_expr::{LexOrdering, PhysicalSortExpr, PhysicalSortRequirement};
+pub use sort_expr::{
+    LexOrdering, LexOrderingReq, PhysicalSortExpr, PhysicalSortRequirement,
+};
 pub use utils::{
     expr_list_eq_any_order, expr_list_eq_strict_order,
     normalize_expr_with_equivalence_properties, normalize_out_expr_with_columns_map,
-    split_conjunction,
+    reverse_order_bys, split_conjunction,
 };
diff --git a/datafusion/physical-expr/src/sort_expr.rs b/datafusion/physical-expr/src/sort_expr.rs
index 665a47e586..dc93b67fa6 100644
--- a/datafusion/physical-expr/src/sort_expr.rs
+++ b/datafusion/physical-expr/src/sort_expr.rs
@@ -214,5 +214,8 @@ fn to_str(options: &SortOptions) -> &str {
     }
 }
 
-/// `LexOrdering` is a type alias for lexicographical ordering definition `Vec<PhysicalSortExpr>`
+///`LexOrdering` is a type alias for lexicographical ordering definition`Vec<PhysicalSortExpr>`
 pub type LexOrdering = Vec<PhysicalSortExpr>;
+
+///`LexOrderingReq` is a type alias for lexicographical ordering requirement definition`Vec<PhysicalSortRequirement>`
+pub type LexOrderingReq = Vec<PhysicalSortRequirement>;
diff --git a/datafusion/physical-expr/src/utils.rs b/datafusion/physical-expr/src/utils.rs
index d9009ca31e..9c28243bed 100644
--- a/datafusion/physical-expr/src/utils.rs
+++ b/datafusion/physical-expr/src/utils.rs
@@ -679,6 +679,44 @@ pub fn reassign_predicate_columns(
     })
 }
 
+/// Reverses the ORDER BY expression, which is useful during equivalent window
+/// expression construction. For instance, 'ORDER BY a ASC, NULLS LAST' turns into
+/// 'ORDER BY a DESC, NULLS FIRST'.
+pub fn reverse_order_bys(order_bys: &[PhysicalSortExpr]) -> Vec<PhysicalSortExpr> {
+    order_bys
+        .iter()
+        .map(|e| PhysicalSortExpr {
+            expr: e.expr.clone(),
+            options: !e.options,
+        })
+        .collect()
+}
+
+/// Find the finer requirement among `req1` and `req2`
+/// If `None`, this means that `req1` and `req2` are not compatible
+/// e.g there is no requirement that satisfies both
+pub fn get_finer_ordering<
+    'a,
+    F: Fn() -> EquivalenceProperties,
+    F2: Fn() -> OrderingEquivalenceProperties,
+>(
+    req1: &'a [PhysicalSortExpr],
+    req2: &'a [PhysicalSortExpr],
+    eq_properties: F,
+    ordering_eq_properties: F2,
+) -> Option<&'a [PhysicalSortExpr]> {
+    if ordering_satisfy_concrete(req1, req2, &eq_properties, &ordering_eq_properties) {
+        // Finer requirement is `provided`, since it satisfies the other:
+        return Some(req1);
+    }
+    if ordering_satisfy_concrete(req2, req1, &eq_properties, &ordering_eq_properties) {
+        // Finer requirement is `req`, since it satisfies the other:
+        return Some(req2);
+    }
+    // Neither `provided` nor `req` satisfies one another, they are incompatible.
+    None
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
diff --git a/datafusion/physical-expr/src/window/aggregate.rs b/datafusion/physical-expr/src/window/aggregate.rs
index 95fd86148a..d0173949dd 100644
--- a/datafusion/physical-expr/src/window/aggregate.rs
+++ b/datafusion/physical-expr/src/window/aggregate.rs
@@ -29,11 +29,13 @@ use datafusion_common::ScalarValue;
 use datafusion_common::{DataFusionError, Result};
 use datafusion_expr::{Accumulator, WindowFrame};
 
-use crate::window::window_expr::{reverse_order_bys, AggregateWindowExpr};
+use crate::window::window_expr::AggregateWindowExpr;
 use crate::window::{
     PartitionBatches, PartitionWindowAggStates, SlidingAggregateWindowExpr, WindowExpr,
 };
-use crate::{expressions::PhysicalSortExpr, AggregateExpr, PhysicalExpr};
+use crate::{
+    expressions::PhysicalSortExpr, reverse_order_bys, AggregateExpr, PhysicalExpr,
+};
 
 /// A window expr that takes the form of an aggregate function
 /// Aggregate Window Expressions that have the form
diff --git a/datafusion/physical-expr/src/window/built_in.rs b/datafusion/physical-expr/src/window/built_in.rs
index 1576e8e9c4..59674257ed 100644
--- a/datafusion/physical-expr/src/window/built_in.rs
+++ b/datafusion/physical-expr/src/window/built_in.rs
@@ -25,12 +25,12 @@ use super::window_frame_state::WindowFrameContext;
 use super::BuiltInWindowFunctionExpr;
 use super::WindowExpr;
 use crate::window::window_expr::{
-    reverse_order_bys, BuiltinWindowState, NthValueKind, NthValueState, WindowFn,
+    BuiltinWindowState, NthValueKind, NthValueState, WindowFn,
 };
 use crate::window::{
     PartitionBatches, PartitionWindowAggStates, WindowAggState, WindowState,
 };
-use crate::{expressions::PhysicalSortExpr, PhysicalExpr};
+use crate::{expressions::PhysicalSortExpr, reverse_order_bys, PhysicalExpr};
 use arrow::array::{new_empty_array, Array, ArrayRef};
 use arrow::compute::SortOptions;
 use arrow::datatypes::Field;
diff --git a/datafusion/physical-expr/src/window/sliding_aggregate.rs b/datafusion/physical-expr/src/window/sliding_aggregate.rs
index 7fa33d71ca..8ce3f42bea 100644
--- a/datafusion/physical-expr/src/window/sliding_aggregate.rs
+++ b/datafusion/physical-expr/src/window/sliding_aggregate.rs
@@ -28,11 +28,13 @@ use arrow::record_batch::RecordBatch;
 use datafusion_common::{Result, ScalarValue};
 use datafusion_expr::{Accumulator, WindowFrame};
 
-use crate::window::window_expr::{reverse_order_bys, AggregateWindowExpr};
+use crate::window::window_expr::AggregateWindowExpr;
 use crate::window::{
     PartitionBatches, PartitionWindowAggStates, PlainAggregateWindowExpr, WindowExpr,
 };
-use crate::{expressions::PhysicalSortExpr, AggregateExpr, PhysicalExpr};
+use crate::{
+    expressions::PhysicalSortExpr, reverse_order_bys, AggregateExpr, PhysicalExpr,
+};
 
 /// A window expr that takes the form of an aggregate function
 /// Aggregate Window Expressions that have the form
diff --git a/datafusion/physical-expr/src/window/window_expr.rs b/datafusion/physical-expr/src/window/window_expr.rs
index 140ce67f4a..ec0d929b7d 100644
--- a/datafusion/physical-expr/src/window/window_expr.rs
+++ b/datafusion/physical-expr/src/window/window_expr.rs
@@ -253,19 +253,6 @@ pub trait AggregateWindowExpr: WindowExpr {
     }
 }
 
-/// Reverses the ORDER BY expression, which is useful during equivalent window
-/// expression construction. For instance, 'ORDER BY a ASC, NULLS LAST' turns into
-/// 'ORDER BY a DESC, NULLS FIRST'.
-pub fn reverse_order_bys(order_bys: &[PhysicalSortExpr]) -> Vec<PhysicalSortExpr> {
-    order_bys
-        .iter()
-        .map(|e| PhysicalSortExpr {
-            expr: e.expr.clone(),
-            options: !e.options,
-        })
-        .collect()
-}
-
 #[derive(Debug)]
 pub enum WindowFn {
     Builtin(Box<dyn PartitionEvaluator>),