You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ak...@apache.org on 2023/06/03 06:34:06 UTC
[arrow-datafusion] branch main updated: Resolve contradictory requirements by conversion of ordering sensitive aggregators (#6482)
This is an automated email from the ASF dual-hosted git repository.
akurmustafa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 5ddcbc42c1 Resolve contradictory requirements by conversion of ordering sensitive aggregators (#6482)
5ddcbc42c1 is described below
commit 5ddcbc42c1d601359a621590862605eac07470fc
Author: Mustafa Akur <10...@users.noreply.github.com>
AuthorDate: Sat Jun 3 09:34:00 2023 +0300
Resolve contradictory requirements by conversion of ordering sensitive aggregators (#6482)
* Naive test pass
i
* Add new tests and simplifications
* move tests to the .slt file
* update requirement
* update tests
* Add support for partiallyOrdered aggregation sensitive.
* Resolve linter errors
* update comments
* minor changes
* retract changes in generated
* update proto files
* Simplifications
* Make types consistent in schema, and data
* Update todos
* Convert API to vector
* Convert get_finest to handle Vector inputs
* simplifications, update comment
* initial commit, add test
* Add support for FIRST Aggregate function.
* Add support for last aggregate
* Update cargo.lock
* Remove distinct, and limit from First and last aggregate.
* Add reverse for First and Last Aggregator
* Update cargo lock
* Minor code simplifications
* Update comment
* Update documents
* Fix projection pushdown bug
* fix projection push down failure bug
* combine first_agg and last_agg parsers
* Update documentation
* Update subproject
* initial commit
* Add test code
* initial version
* simplify prints
* minor changes
* sqllogictests pass
* All tests pass
* update proto function names
* Minor changes
* do not consider ordering requirement in ordering insensitive aggregators
* Reject aggregate order by for window functions.
* initial commit
* Add new tests
* Add reverse for sort removal
* simplifications
* simplifications
* Bug fix, do not consider reverse requirement if it is not possible
* Fix cargo lock file
* Change reverse_order_by function place
* Move required input ordering calculation logic to its own function
* Add new tests
* simplifications
* Update comment
* Rename aggregator first and last
* minor change
* Comment improvements
* Remove count from First,Last accumulators
* Remove clone
* Simplifications
* Simplifications, update comment
* Improve comments
* Move LexOrdering requirement to common place
* Update comment, refactor implementation
* bug fix:
* Use naive requirement when reverse requirement is not helpful by convention.
* Update test
* Update comments
* Change function place
* Move get_finer to utls
* change name of last, first impl,
* Fix error message
* change display of first and last
---------
Co-authored-by: Mehmet Ozan Kabak <oz...@gmail.com>
Co-authored-by: berkaysynnada <be...@synnada.ai>
---
datafusion/core/src/execution/context.rs | 5 +-
.../core/src/physical_plan/aggregates/mod.rs | 338 +++++++++++++++------
.../tests/sqllogictests/test_files/aggregate.slt | 4 +-
.../tests/sqllogictests/test_files/explain.slt | 2 +-
.../tests/sqllogictests/test_files/groupby.slt | 205 +++++++++++++
.../physical-expr/src/aggregate/first_last.rs | 14 +-
datafusion/physical-expr/src/aggregate/mod.rs | 11 +
datafusion/physical-expr/src/lib.rs | 6 +-
datafusion/physical-expr/src/sort_expr.rs | 5 +-
datafusion/physical-expr/src/utils.rs | 38 +++
datafusion/physical-expr/src/window/aggregate.rs | 6 +-
datafusion/physical-expr/src/window/built_in.rs | 4 +-
.../physical-expr/src/window/sliding_aggregate.rs | 6 +-
datafusion/physical-expr/src/window/window_expr.rs | 13 -
14 files changed, 531 insertions(+), 126 deletions(-)
diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs
index f43d7c87c4..69f621bb44 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -1460,14 +1460,13 @@ impl SessionState {
// The EnforceDistribution rule is for adding essential repartition to satisfy the required
// distribution. Please make sure that the whole plan tree is determined before this rule.
Arc::new(EnforceDistribution::new()),
+ // The CombinePartialFinalAggregate rule should be applied after the EnforceDistribution rule
+ Arc::new(CombinePartialFinalAggregate::new()),
// The EnforceSorting rule is for adding essential local sorting to satisfy the required
// ordering. Please make sure that the whole plan tree is determined before this rule.
// Note that one should always run this rule after running the EnforceDistribution rule
// as the latter may break local sorting requirements.
Arc::new(EnforceSorting::new()),
- // The CombinePartialFinalAggregate rule should be applied after the EnforceDistribution
- // and EnforceSorting rules
- Arc::new(CombinePartialFinalAggregate::new()),
// The CoalesceBatches rule will not influence the distribution and ordering of the
// whole plan tree. Therefore, to avoid influencing other rules, it should run last.
Arc::new(CoalesceBatches::new()),
diff --git a/datafusion/core/src/physical_plan/aggregates/mod.rs b/datafusion/core/src/physical_plan/aggregates/mod.rs
index 9eba869290..455a86660e 100644
--- a/datafusion/core/src/physical_plan/aggregates/mod.rs
+++ b/datafusion/core/src/physical_plan/aggregates/mod.rs
@@ -37,10 +37,10 @@ use datafusion_physical_expr::{
aggregate::row_accumulator::RowAccumulator,
equivalence::project_equivalence_properties,
expressions::{Avg, CastExpr, Column, Sum},
- normalize_out_expr_with_columns_map,
- utils::{convert_to_expr, get_indices_of_matching_exprs, ordering_satisfy_concrete},
- AggregateExpr, OrderingEquivalenceProperties, PhysicalExpr, PhysicalSortExpr,
- PhysicalSortRequirement,
+ normalize_out_expr_with_columns_map, reverse_order_bys,
+ utils::{convert_to_expr, get_indices_of_matching_exprs},
+ AggregateExpr, LexOrdering, LexOrderingReq, OrderingEquivalenceProperties,
+ PhysicalExpr, PhysicalSortExpr, PhysicalSortRequirement,
};
use std::any::Any;
use std::collections::HashMap;
@@ -52,8 +52,11 @@ mod row_hash;
mod utils;
pub use datafusion_expr::AggregateFunction;
+use datafusion_physical_expr::aggregate::is_order_sensitive;
pub use datafusion_physical_expr::expressions::create_aggregate_expr;
-use datafusion_physical_expr::expressions::{ArrayAgg, FirstValue, LastValue};
+use datafusion_physical_expr::utils::{
+ get_finer_ordering, ordering_satisfy_requirement_concrete,
+};
/// Hash aggregate modes
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
@@ -213,7 +216,7 @@ pub(crate) struct AggregationOrdering {
/// expressions match input ordering.
order_indices: Vec<usize>,
/// Actual ordering information of the GROUP BY columns.
- ordering: Vec<PhysicalSortExpr>,
+ ordering: LexOrdering,
}
/// Hash aggregate execution plan
@@ -228,7 +231,7 @@ pub struct AggregateExec {
/// FILTER (WHERE clause) expression for each aggregate expression
pub(crate) filter_expr: Vec<Option<Arc<dyn PhysicalExpr>>>,
/// (ORDER BY clause) expression for each aggregate expression
- pub(crate) order_by_expr: Vec<Option<Vec<PhysicalSortExpr>>>,
+ pub(crate) order_by_expr: Vec<Option<LexOrdering>>,
/// Input plan, could be a partial aggregate or the input to the aggregate
pub(crate) input: Arc<dyn ExecutionPlan>,
/// Schema after the aggregate is applied
@@ -244,7 +247,7 @@ pub struct AggregateExec {
metrics: ExecutionPlanMetricsSet,
/// Stores mode and output ordering information for the `AggregateExec`.
aggregation_ordering: Option<AggregationOrdering>,
- required_input_ordering: Option<Vec<PhysicalSortRequirement>>,
+ required_input_ordering: Option<LexOrderingReq>,
}
/// Calculates the working mode for `GROUP BY` queries.
@@ -339,6 +342,29 @@ fn output_group_expr_helper(group_by: &PhysicalGroupBy) -> Vec<Arc<dyn PhysicalE
.collect()
}
+/// This function returns the ordering requirement of the first non-reversible
+/// order-sensitive aggregate function such as ARRAY_AGG. This requirement serves
+/// as the initial requirement while calculating the finest requirement among all
+/// aggregate functions. If this function returns `None`, it means there is no
+/// hard ordering requirement for the aggregate functions (in terms of direction).
+/// Then, we can generate two alternative requirements with opposite directions.
+fn get_init_req(
+ aggr_expr: &[Arc<dyn AggregateExpr>],
+ order_by_expr: &[Option<LexOrdering>],
+) -> Option<LexOrdering> {
+ for (aggr_expr, fn_reqs) in aggr_expr.iter().zip(order_by_expr.iter()) {
+ // If the aggregation function is a non-reversible order-sensitive function
+ // and there is a hard requirement, choose first such requirement:
+ if is_order_sensitive(aggr_expr)
+ && aggr_expr.reverse_expr().is_none()
+ && fn_reqs.is_some()
+ {
+ return fn_reqs.clone();
+ }
+ }
+ None
+}
+
/// This function gets the finest ordering requirement among all the aggregation
/// functions. If requirements are conflicting, (i.e. we can not compute the
/// aggregations in a single [`AggregateExec`]), the function returns an error.
@@ -346,33 +372,45 @@ fn get_finest_requirement<
F: Fn() -> EquivalenceProperties,
F2: Fn() -> OrderingEquivalenceProperties,
>(
- order_by_expr: &[Option<Vec<PhysicalSortExpr>>],
+ aggr_expr: &mut [Arc<dyn AggregateExpr>],
+ order_by_expr: &mut [Option<LexOrdering>],
eq_properties: F,
ordering_eq_properties: F2,
-) -> Result<Option<Vec<PhysicalSortExpr>>> {
- let mut result: Option<Vec<PhysicalSortExpr>> = None;
- for fn_reqs in order_by_expr.iter().flatten() {
- if let Some(result) = &mut result {
- if ordering_satisfy_concrete(
- result,
- fn_reqs,
+) -> Result<Option<LexOrdering>> {
+ let mut finest_req = get_init_req(aggr_expr, order_by_expr);
+ for (aggr_expr, fn_req) in aggr_expr.iter_mut().zip(order_by_expr.iter_mut()) {
+ let fn_req = if let Some(fn_req) = fn_req {
+ fn_req
+ } else {
+ continue;
+ };
+ if let Some(finest_req) = &mut finest_req {
+ if let Some(finer) = get_finer_ordering(
+ finest_req,
+ fn_req,
&eq_properties,
&ordering_eq_properties,
) {
- // Do not update the result as it already satisfies current
- // function's requirement:
+ *finest_req = finer.to_vec();
continue;
}
- if ordering_satisfy_concrete(
- fn_reqs,
- result,
- &eq_properties,
- &ordering_eq_properties,
- ) {
- // Update result with current function's requirements, as it is
- // a finer requirement than what we currently have.
- *result = fn_reqs.clone();
- continue;
+ // If an aggregate function is reversible, analyze whether its reverse
+ // direction is compatible with existing requirements:
+ if let Some(reverse) = aggr_expr.reverse_expr() {
+ let fn_req_reverse = reverse_order_bys(fn_req);
+ if let Some(finer) = get_finer_ordering(
+ finest_req,
+ &fn_req_reverse,
+ &eq_properties,
+ &ordering_eq_properties,
+ ) {
+ // We need to update `aggr_expr` with its reverse, since only its
+ // reverse requirement is compatible with existing requirements:
+ *aggr_expr = reverse;
+ *finest_req = finer.to_vec();
+ *fn_req = fn_req_reverse;
+ continue;
+ }
}
// If neither of the requirements satisfy the other, this means
// requirements are conflicting. Currently, we do not support
@@ -381,20 +419,107 @@ fn get_finest_requirement<
"Conflicting ordering requirements in aggregate functions is not supported".to_string(),
));
} else {
- result = Some(fn_reqs.clone());
+ finest_req = Some(fn_req.clone());
}
}
- Ok(result)
+ Ok(finest_req)
}
-/// Checks whether the given aggregate expression is order-sensitive.
-/// For instance, a `SUM` aggregation doesn't depend on the order of its inputs.
-/// However, a `FirstAgg` depends on the input ordering (if the order changes,
-/// the first value in the list would change).
-fn is_order_sensitive(aggr_expr: &Arc<dyn AggregateExpr>) -> bool {
- aggr_expr.as_any().is::<FirstValue>()
- || aggr_expr.as_any().is::<LastValue>()
- || aggr_expr.as_any().is::<ArrayAgg>()
+/// Calculate the required input ordering for the [`AggregateExec`] by considering
+/// ordering requirements of order-sensitive aggregation functions.
+fn calc_required_input_ordering(
+ input: &Arc<dyn ExecutionPlan>,
+ aggr_expr: &mut [Arc<dyn AggregateExpr>],
+ aggregator_reqs: LexOrderingReq,
+ aggregator_reverse_reqs: Option<LexOrderingReq>,
+ aggregation_ordering: &Option<AggregationOrdering>,
+) -> Result<Option<LexOrderingReq>> {
+ let mut required_input_ordering = vec![];
+ // Boolean shows that whether `required_input_ordering` stored comes from
+ // `aggregator_reqs` or `aggregator_reverse_reqs`
+ let mut reverse_req = false;
+ // If reverse aggregator is None, there is no way to run aggregators in reverse mode. Hence ignore it during analysis
+ let aggregator_requirements =
+ if let Some(aggregator_reverse_reqs) = aggregator_reverse_reqs {
+ // If existing ordering doesn't satisfy requirement, we should do calculations
+ // on naive requirement (by convention, otherwise the final plan will be unintuitive),
+ // even if reverse ordering is possible.
+ // Hence, while iterating consider naive requirement last, by this way
+ // we prioritize naive requirement over reverse requirement, when
+ // reverse requirement is not helpful with removing SortExec from the plan.
+ vec![(true, aggregator_reverse_reqs), (false, aggregator_reqs)]
+ } else {
+ vec![(false, aggregator_reqs)]
+ };
+ for (is_reverse, aggregator_requirement) in aggregator_requirements.into_iter() {
+ if let Some(AggregationOrdering {
+ ordering,
+ // If the mode is FullyOrdered or PartiallyOrdered (i.e. we are
+ // running with bounded memory, without breaking the pipeline),
+ // then we append the aggregator ordering requirement to the existing
+ // ordering. This way, we can still run with bounded memory.
+ mode: GroupByOrderMode::FullyOrdered | GroupByOrderMode::PartiallyOrdered,
+ ..
+ }) = aggregation_ordering
+ {
+ // Get the section of the input ordering that enables us to run in
+ // FullyOrdered or PartiallyOrdered modes:
+ let requirement_prefix =
+ if let Some(existing_ordering) = input.output_ordering() {
+ &existing_ordering[0..ordering.len()]
+ } else {
+ &[]
+ };
+ let mut requirement =
+ PhysicalSortRequirement::from_sort_exprs(requirement_prefix.iter());
+ for req in aggregator_requirement {
+ if requirement.iter().all(|item| req.expr.ne(&item.expr)) {
+ requirement.push(req);
+ }
+ }
+ required_input_ordering = requirement;
+ } else {
+ required_input_ordering = aggregator_requirement;
+ }
+ // keep track of from which direction required_input_ordering is constructed
+ reverse_req = is_reverse;
+ // If all of the order-sensitive aggregate functions are reversible (such as all of the order-sensitive aggregators are
+ // either FIRST_VALUE or LAST_VALUE). We can run aggregate expressions both in the direction of naive required ordering
+ // (e.g finest requirement that satisfy each aggregate function requirement) and in its reversed (opposite) direction.
+ // We analyze these two possibilities, and use the version that satisfies existing ordering (This saves us adding
+ // unnecessary SortExec to the final plan). If none of the versions satisfy existing ordering, we use naive required ordering.
+ // In short, if running aggregators in reverse order, helps us to remove a `SortExec`, we do so. Otherwise, we use aggregators as is.
+ let existing_ordering = input.output_ordering().unwrap_or(&[]);
+ if ordering_satisfy_requirement_concrete(
+ existing_ordering,
+ &required_input_ordering,
+ || input.equivalence_properties(),
+ || input.ordering_equivalence_properties(),
+ ) {
+ break;
+ }
+ }
+ // If `required_input_ordering` is constructed using reverse requirement, we should reverse
+ // each `aggr_expr` to be able to correctly calculate their result in reverse order.
+ if reverse_req {
+ aggr_expr
+ .iter_mut()
+ .map(|elem| {
+ if is_order_sensitive(elem) {
+ if let Some(reverse) = elem.reverse_expr() {
+ *elem = reverse;
+ } else {
+ return Err(DataFusionError::Execution(
+ "Aggregate expression should have a reverse expression"
+ .to_string(),
+ ));
+ }
+ }
+ Ok(())
+ })
+ .collect::<Result<Vec<_>>>()?;
+ }
+ Ok((!required_input_ordering.is_empty()).then_some(required_input_ordering))
}
impl AggregateExec {
@@ -402,9 +527,9 @@ impl AggregateExec {
pub fn try_new(
mode: AggregateMode,
group_by: PhysicalGroupBy,
- aggr_expr: Vec<Arc<dyn AggregateExpr>>,
+ mut aggr_expr: Vec<Arc<dyn AggregateExpr>>,
filter_expr: Vec<Option<Arc<dyn PhysicalExpr>>>,
- mut order_by_expr: Vec<Option<Vec<PhysicalSortExpr>>>,
+ mut order_by_expr: Vec<Option<LexOrdering>>,
input: Arc<dyn ExecutionPlan>,
input_schema: SchemaRef,
) -> Result<Self> {
@@ -417,30 +542,53 @@ impl AggregateExec {
)?;
let schema = Arc::new(schema);
- let mut aggregator_requirement = None;
- // Ordering requirement makes sense only in Partial and Single modes.
- // In other modes, all groups are collapsed, therefore their input schema
- // can not contain expressions in the requirement.
- if mode == AggregateMode::Partial || mode == AggregateMode::Single {
- order_by_expr = aggr_expr
- .iter()
- .zip(order_by_expr.into_iter())
- .map(|(aggr_expr, fn_reqs)| {
- // If aggregation function is ordering sensitive, keep ordering requirement as is; otherwise ignore requirement
- if is_order_sensitive(aggr_expr) {
- fn_reqs
- } else {
- None
- }
- })
- .collect::<Vec<_>>();
+ // Reset ordering requirement to `None` if aggregator is not order-sensitive
+ order_by_expr = aggr_expr
+ .iter()
+ .zip(order_by_expr.into_iter())
+ .map(|(aggr_expr, fn_reqs)| {
+ // If aggregation function is ordering sensitive, keep ordering requirement as is; otherwise ignore requirement
+ if is_order_sensitive(aggr_expr) {
+ fn_reqs
+ } else {
+ None
+ }
+ })
+ .collect::<Vec<_>>();
+
+ let mut aggregator_reqs = vec![];
+ let mut aggregator_reverse_reqs = None;
+ // Currently we support order-sensitive aggregation only in `Single` mode.
+ // For `Final` and `FinalPartitioned` modes, we cannot guarantee they will receive
+ // data according to ordering requirements. As long as we cannot produce correct result
+ // in `Final` mode, it is not important to produce correct result in `Partial` mode.
+ // We only support `Single` mode, where we are sure that output produced is final, and it
+ // is produced in a single step.
+ if mode == AggregateMode::Single {
let requirement = get_finest_requirement(
- &order_by_expr,
+ &mut aggr_expr,
+ &mut order_by_expr,
|| input.equivalence_properties(),
|| input.ordering_equivalence_properties(),
)?;
- aggregator_requirement = requirement
+ let aggregator_requirement = requirement
+ .as_ref()
.map(|exprs| PhysicalSortRequirement::from_sort_exprs(exprs.iter()));
+ aggregator_reqs = aggregator_requirement.unwrap_or(vec![]);
+ // If all aggregate expressions are reversible, also consider reverse
+ // requirement(s). The reason is that existing ordering may satisfy the
+ // given requirement or its reverse. By considering both, we can generate better plans.
+ if aggr_expr
+ .iter()
+ .all(|expr| !is_order_sensitive(expr) || expr.reverse_expr().is_some())
+ {
+ let reverse_agg_requirement = requirement.map(|reqs| {
+ PhysicalSortRequirement::from_sort_exprs(
+ reverse_order_bys(&reqs).iter(),
+ )
+ });
+ aggregator_reverse_reqs = reverse_agg_requirement;
+ }
}
// construct a map from the input columns to the output columns of the Aggregation
@@ -455,37 +603,22 @@ impl AggregateExec {
let aggregation_ordering = calc_aggregation_ordering(&input, &group_by);
- let mut required_input_ordering = None;
- if let Some(AggregationOrdering {
- ordering,
- // If the mode is FullyOrdered or PartiallyOrdered (i.e. we are
- // running with bounded memory, without breaking pipeline), then
- // we append aggregator ordering requirement to the existing
- // ordering. This way, we can still run with bounded memory.
- mode: GroupByOrderMode::FullyOrdered | GroupByOrderMode::PartiallyOrdered,
- ..
- }) = &aggregation_ordering
+ let required_input_ordering = calc_required_input_ordering(
+ &input,
+ &mut aggr_expr,
+ aggregator_reqs,
+ aggregator_reverse_reqs,
+ &aggregation_ordering,
+ )?;
+
+ // If aggregator is working on multiple partitions and there is an order-sensitive aggregator with a requirement return error.
+ if input.output_partitioning().partition_count() > 1
+ && order_by_expr.iter().any(|req| req.is_some())
{
- if let Some(aggregator_requirement) = aggregator_requirement {
- // Get the section of the input ordering that enables us to run in the
- // FullyOrdered or PartiallyOrdered mode:
- let requirement_prefix =
- if let Some(existing_ordering) = input.output_ordering() {
- existing_ordering[0..ordering.len()].to_vec()
- } else {
- vec![]
- };
- let mut requirement =
- PhysicalSortRequirement::from_sort_exprs(requirement_prefix.iter());
- for req in aggregator_requirement {
- if requirement.iter().all(|item| req.expr.ne(&item.expr)) {
- requirement.push(req);
- }
- }
- required_input_ordering = Some(requirement);
- }
- } else {
- required_input_ordering = aggregator_requirement;
+ return Err(DataFusionError::NotImplemented(
+ "Order-sensitive aggregators is not supported on multiple partitions"
+ .to_string(),
+ ));
}
Ok(AggregateExec {
@@ -530,7 +663,7 @@ impl AggregateExec {
}
/// ORDER BY clause expression for each aggregate expression
- pub fn order_by_expr(&self) -> &[Option<Vec<PhysicalSortExpr>>] {
+ pub fn order_by_expr(&self) -> &[Option<LexOrdering>] {
&self.order_by_expr
}
@@ -642,7 +775,7 @@ impl ExecutionPlan for AggregateExec {
}
}
- fn required_input_ordering(&self) -> Vec<Option<Vec<PhysicalSortRequirement>>> {
+ fn required_input_ordering(&self) -> Vec<Option<LexOrderingReq>> {
vec![self.required_input_ordering.clone()]
}
@@ -1065,7 +1198,7 @@ mod tests {
use datafusion_common::{DataFusionError, Result, ScalarValue};
use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion_physical_expr::expressions::{
- lit, ApproxDistinct, Column, Count, Median,
+ lit, ApproxDistinct, Column, Count, FirstValue, Median,
};
use datafusion_physical_expr::{
AggregateExpr, EquivalenceProperties, OrderedColumn,
@@ -1714,6 +1847,7 @@ mod tests {
descending: false,
nulls_first: false,
};
+ // This is the reverse requirement of options1
let options2 = SortOptions {
descending: true,
nulls_first: true,
@@ -1729,8 +1863,7 @@ mod tests {
&vec![OrderedColumn::new(col_a.clone(), options1)],
&vec![OrderedColumn::new(col_c.clone(), options2)],
));
-
- let order_by_exprs = vec![
+ let mut order_by_exprs = vec![
None,
Some(vec![PhysicalSortExpr {
expr: Arc::new(col_a.clone()),
@@ -1746,7 +1879,7 @@ mod tests {
}]),
Some(vec![
PhysicalSortExpr {
- expr: Arc::new(col_a),
+ expr: Arc::new(col_a.clone()),
options: options1,
},
PhysicalSortExpr {
@@ -1754,9 +1887,22 @@ mod tests {
options: options1,
},
]),
+ // Since aggregate expression is reversible (FirstValue), we should be able to resolve below
+ // contradictory requirement by reversing it.
+ Some(vec![PhysicalSortExpr {
+ expr: Arc::new(col_b.clone()),
+ options: options2,
+ }]),
];
+ let aggr_expr = Arc::new(FirstValue::new(
+ Arc::new(col_a.clone()),
+ "first1",
+ DataType::Int32,
+ )) as _;
+ let mut aggr_exprs = vec![aggr_expr; order_by_exprs.len()];
let res = get_finest_requirement(
- &order_by_exprs,
+ &mut aggr_exprs,
+ &mut order_by_exprs,
|| eq_properties.clone(),
|| ordering_eq_properties.clone(),
)?;
diff --git a/datafusion/core/tests/sqllogictests/test_files/aggregate.slt b/datafusion/core/tests/sqllogictests/test_files/aggregate.slt
index ab3516e9e5..fea1b6cb8e 100644
--- a/datafusion/core/tests/sqllogictests/test_files/aggregate.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/aggregate.slt
@@ -65,7 +65,7 @@ statement error Error during planning: The percentile sample points count for Ap
SELECT approx_percentile_cont(c3, 0.95, 111.1) FROM aggregate_test_100
# csv_query_array_agg_unsupported
-statement error This feature is not implemented: ORDER BY not supported in ARRAY_AGG: c1
+statement error This feature is not implemented: Order-sensitive aggregators is not supported on multiple partitions
SELECT array_agg(c13 ORDER BY c1) FROM aggregate_test_100
statement error This feature is not implemented: LIMIT not supported in ARRAY_AGG: 1
@@ -1169,7 +1169,7 @@ select c2, sum(c3) sum_c3, avg(c3) avg_c3, max(c3) max_c3, min(c3) min_c3, count
5 -194 -13.857142857143 118 -101 14
# csv_query_array_agg_unsupported
-statement error This feature is not implemented: ORDER BY not supported in ARRAY_AGG: c1
+statement error This feature is not implemented: Order-sensitive aggregators is not supported on multiple partitions
SELECT array_agg(c13 ORDER BY c1) FROM aggregate_test_100;
# csv_query_array_cube_agg_with_overflow
diff --git a/datafusion/core/tests/sqllogictests/test_files/explain.slt b/datafusion/core/tests/sqllogictests/test_files/explain.slt
index 75002fecb1..6a9d07aba7 100644
--- a/datafusion/core/tests/sqllogictests/test_files/explain.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/explain.slt
@@ -208,8 +208,8 @@ physical_plan after PipelineFixer SAME TEXT AS ABOVE
physical_plan after repartition SAME TEXT AS ABOVE
physical_plan after global_sort_selection SAME TEXT AS ABOVE
physical_plan after EnforceDistribution SAME TEXT AS ABOVE
-physical_plan after EnforceSorting SAME TEXT AS ABOVE
physical_plan after CombinePartialFinalAggregate SAME TEXT AS ABOVE
+physical_plan after EnforceSorting SAME TEXT AS ABOVE
physical_plan after coalesce_batches SAME TEXT AS ABOVE
physical_plan after PipelineChecker SAME TEXT AS ABOVE
physical_plan CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], has_header=true
diff --git a/datafusion/core/tests/sqllogictests/test_files/groupby.slt b/datafusion/core/tests/sqllogictests/test_files/groupby.slt
index 08523648dc..9868e684ca 100644
--- a/datafusion/core/tests/sqllogictests/test_files/groupby.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/groupby.slt
@@ -2384,3 +2384,208 @@ SELECT s.country, ARRAY_AGG(s.amount ORDER BY s.country DESC, s.amount DESC) AS
FRA [200.0, 50.0] 250
GRC [80.0, 30.0] 110
TUR [100.0, 75.0] 175
+
+# test_reverse_aggregate_expr
+# Some of the Aggregators can be reversed, by this way we can still run aggregators without re-ordering
+# that have contradictory requirements at first glance.
+query TT
+EXPLAIN SELECT country, ARRAY_AGG(amount ORDER BY amount DESC) AS amounts,
+ FIRST_VALUE(amount ORDER BY amount ASC) AS fv1,
+ LAST_VALUE(amount ORDER BY amount DESC) AS fv2
+ FROM sales_global
+ GROUP BY country
+----
+logical_plan
+Projection: sales_global.country, ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST] AS amounts, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST] AS fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST] AS fv2
+--Aggregate: groupBy=[[sales_global.country]], aggr=[[ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST], FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST]]]
+----TableScan: sales_global projection=[country, amount]
+physical_plan
+ProjectionExec: expr=[country@0 as country, ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST]@1 as amounts, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@2 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST]@3 as fv2]
+--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[ARRAY_AGG(sales_global.amount), LAST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)]
+----SortExec: expr=[amount@1 DESC]
+------MemoryExec: partitions=1, partition_sizes=[1]
+
+query T?RR
+SELECT country, ARRAY_AGG(amount ORDER BY amount DESC) AS amounts,
+ FIRST_VALUE(amount ORDER BY amount ASC) AS fv1,
+ LAST_VALUE(amount ORDER BY amount DESC) AS fv2
+ FROM sales_global
+ GROUP BY country
+----
+FRA [200.0, 50.0] 50 50
+TUR [100.0, 75.0] 75 75
+GRC [80.0, 30.0] 30 30
+
+# test_reverse_aggregate_expr2
+# Some of the Aggregators can be reversed, by this way we can still run aggregators without re-ordering
+# that have contradictory requirements at first glance.
+query TT
+EXPLAIN SELECT country, ARRAY_AGG(amount ORDER BY amount ASC) AS amounts,
+ FIRST_VALUE(amount ORDER BY amount ASC) AS fv1,
+ LAST_VALUE(amount ORDER BY amount DESC) AS fv2
+ FROM sales_global
+ GROUP BY country
+----
+logical_plan
+Projection: sales_global.country, ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST] AS amounts, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST] AS fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST] AS fv2
+--Aggregate: groupBy=[[sales_global.country]], aggr=[[ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST], FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST]]]
+----TableScan: sales_global projection=[country, amount]
+physical_plan
+ProjectionExec: expr=[country@0 as country, ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@1 as amounts, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@2 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST]@3 as fv2]
+--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[ARRAY_AGG(sales_global.amount), FIRST_VALUE(sales_global.amount), FIRST_VALUE(sales_global.amount)]
+----SortExec: expr=[amount@1 ASC NULLS LAST]
+------MemoryExec: partitions=1, partition_sizes=[1]
+
+query T?RR
+SELECT country, ARRAY_AGG(amount ORDER BY amount ASC) AS amounts,
+ FIRST_VALUE(amount ORDER BY amount ASC) AS fv1,
+ LAST_VALUE(amount ORDER BY amount DESC) AS fv2
+ FROM sales_global
+ GROUP BY country
+----
+GRC [30.0, 80.0] 30 30
+FRA [50.0, 200.0] 50 50
+TUR [75.0, 100.0] 75 75
+
+# test_reverse_aggregate_expr3
+# Some of the Aggregators can be reversed, by this way we can still run aggregators without re-ordering
+# that have contradictory requirements at first glance. This algorithm shouldn't depend
+# on the order of the aggregation expressions.
+query TT
+EXPLAIN SELECT country, FIRST_VALUE(amount ORDER BY amount ASC) AS fv1,
+ LAST_VALUE(amount ORDER BY amount DESC) AS fv2,
+ ARRAY_AGG(amount ORDER BY amount ASC) AS amounts
+ FROM sales_global
+ GROUP BY country
+----
+logical_plan
+Projection: sales_global.country, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST] AS fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST] AS fv2, ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST] AS amounts
+--Aggregate: groupBy=[[sales_global.country]], aggr=[[FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST], ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]]]
+----TableScan: sales_global projection=[country, amount]
+physical_plan
+ProjectionExec: expr=[country@0 as country, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@1 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST]@2 as fv2, ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@3 as amounts]
+--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount), FIRST_VALUE(sales_global.amount), ARRAY_AGG(sales_global.amount)]
+----SortExec: expr=[amount@1 ASC NULLS LAST]
+------MemoryExec: partitions=1, partition_sizes=[1]
+
+query TRR?
+SELECT country, FIRST_VALUE(amount ORDER BY amount ASC) AS fv1,
+ LAST_VALUE(amount ORDER BY amount DESC) AS fv2,
+ ARRAY_AGG(amount ORDER BY amount ASC) AS amounts
+ FROM sales_global
+ GROUP BY country
+----
+GRC 30 30 [30.0, 80.0]
+FRA 50 50 [50.0, 200.0]
+TUR 75 75 [75.0, 100.0]
+
+# test_reverse_aggregate_expr4
+# Ordering requirement by the ordering insensitive aggregators shouldn't have effect on
+# final plan. Hence seemingly conflicting requirements by SUM and ARRAY_AGG shouldn't raise error.
+query TT
+EXPLAIN SELECT country, SUM(amount ORDER BY ts DESC) AS sum1,
+ ARRAY_AGG(amount ORDER BY amount ASC) AS amounts
+ FROM sales_global
+ GROUP BY country
+----
+logical_plan
+Projection: sales_global.country, SUM(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST] AS sum1, ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST] AS amounts
+--Aggregate: groupBy=[[sales_global.country]], aggr=[[SUM(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST], ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]]]
+----TableScan: sales_global projection=[country, ts, amount]
+physical_plan
+ProjectionExec: expr=[country@0 as country, SUM(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@1 as sum1, ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@2 as amounts]
+--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[SUM(sales_global.amount), ARRAY_AGG(sales_global.amount)]
+----SortExec: expr=[amount@2 ASC NULLS LAST]
+------MemoryExec: partitions=1, partition_sizes=[1]
+
+query TR?
+SELECT country, SUM(amount ORDER BY ts DESC) AS sum1,
+ ARRAY_AGG(amount ORDER BY amount ASC) AS amounts
+ FROM sales_global
+ GROUP BY country
+----
+GRC 110 [30.0, 80.0]
+FRA 250 [50.0, 200.0]
+TUR 175 [75.0, 100.0]
+
+# test_reverse_aggregate_expr5
+# If all of the ordering sensitive aggregation functions are reversible
+# we should be able to reverse requirements, if this helps to remove a SortExec.
+# Hence in query below, FIRST_VALUE, and LAST_VALUE should be reversed to calculate its result according to `ts ASC` ordering.
+# Please note that after `ts ASC` ordering because of inner query. There is no SortExec in the final plan.
+query TT
+EXPLAIN SELECT country, FIRST_VALUE(amount ORDER BY ts DESC) as fv1,
+ LAST_VALUE(amount ORDER BY ts DESC) as lv1,
+ SUM(amount ORDER BY ts DESC) as sum1
+ FROM (SELECT *
+ FROM sales_global
+ ORDER BY ts ASC)
+ GROUP BY country
+----
+logical_plan
+Projection: sales_global.country, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST] AS fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST] AS lv1, SUM(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST] AS sum1
+--Aggregate: groupBy=[[sales_global.country]], aggr=[[FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST], SUM(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]]]
+----Sort: sales_global.ts ASC NULLS LAST
+------TableScan: sales_global projection=[country, ts, amount]
+physical_plan
+ProjectionExec: expr=[country@0 as country, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@1 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@2 as lv1, SUM(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@3 as sum1]
+--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[LAST_VALUE(sales_global.amount), FIRST_VALUE(sales_global.amount), SUM(sales_global.amount)]
+----SortExec: expr=[ts@1 ASC NULLS LAST]
+------MemoryExec: partitions=1, partition_sizes=[1]
+
+query TRRR
+SELECT country, FIRST_VALUE(amount ORDER BY ts DESC) as fv1,
+ LAST_VALUE(amount ORDER BY ts DESC) as lv1,
+ SUM(amount ORDER BY ts DESC) as sum1
+ FROM (SELECT *
+ FROM sales_global
+ ORDER BY ts ASC)
+ GROUP BY country
+----
+GRC 80 30 110
+FRA 200 50 250
+TUR 100 75 175
+
+# If existing ordering doesn't satisfy requirement, we should do calculations
+# on naive requirement (by convention, otherwise the final plan will be unintuitive),
+# even if reverse ordering is possible.
+# hence, below query should add `SortExec(ts DESC)` to the final plan.
+query TT
+EXPLAIN SELECT country, FIRST_VALUE(amount ORDER BY ts DESC) as fv1,
+ LAST_VALUE(amount ORDER BY ts DESC) as lv1,
+ SUM(amount ORDER BY ts DESC) as sum1
+ FROM sales_global
+ GROUP BY country
+----
+logical_plan
+Projection: sales_global.country, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST] AS fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST] AS lv1, SUM(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST] AS sum1
+--Aggregate: groupBy=[[sales_global.country]], aggr=[[FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST], SUM(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]]]
+----TableScan: sales_global projection=[country, ts, amount]
+physical_plan
+ProjectionExec: expr=[country@0 as country, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@1 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@2 as lv1, SUM(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@3 as sum1]
+--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount), SUM(sales_global.amount)]
+----SortExec: expr=[ts@1 DESC]
+------MemoryExec: partitions=1, partition_sizes=[1]
+
+query TRRR
+SELECT country, FIRST_VALUE(amount ORDER BY ts DESC) as fv1,
+ LAST_VALUE(amount ORDER BY ts DESC) as lv1,
+ SUM(amount ORDER BY ts DESC) as sum1
+ FROM sales_global
+ GROUP BY country
+----
+TUR 100 75 175
+GRC 80 30 110
+FRA 200 50 250
+
+# Run order-sensitive aggregators in multiple partitions
+statement ok
+set datafusion.execution.target_partitions = 2;
+
+# Currently, we do not support running order-sensitive aggregators in multiple partitions.
+statement error This feature is not implemented: Order-sensitive aggregators is not supported on multiple partitions
+SELECT country, ARRAY_AGG(amount ORDER BY amount DESC) AS amounts,
+ FIRST_VALUE(amount ORDER BY amount ASC) AS fv1,
+ LAST_VALUE(amount ORDER BY amount DESC) AS fv2
+ FROM sales_global
+ GROUP BY country
diff --git a/datafusion/physical-expr/src/aggregate/first_last.rs b/datafusion/physical-expr/src/aggregate/first_last.rs
index 5dd9620ce0..a350637c48 100644
--- a/datafusion/physical-expr/src/aggregate/first_last.rs
+++ b/datafusion/physical-expr/src/aggregate/first_last.rs
@@ -84,9 +84,14 @@ impl AggregateExpr for FirstValue {
}
fn reverse_expr(&self) -> Option<Arc<dyn AggregateExpr>> {
+ let name = if self.name.starts_with("FIRST") {
+ format!("LAST{}", &self.name[5..])
+ } else {
+ format!("LAST_VALUE({})", self.expr)
+ };
Some(Arc::new(LastValue::new(
self.expr.clone(),
- self.name.clone(),
+ name,
self.data_type.clone(),
)))
}
@@ -214,9 +219,14 @@ impl AggregateExpr for LastValue {
}
fn reverse_expr(&self) -> Option<Arc<dyn AggregateExpr>> {
+ let name = if self.name.starts_with("LAST") {
+ format!("FIRST{}", &self.name[4..])
+ } else {
+ format!("FIRST_VALUE({})", self.expr)
+ };
Some(Arc::new(FirstValue::new(
self.expr.clone(),
- self.name.clone(),
+ name,
self.data_type.clone(),
)))
}
diff --git a/datafusion/physical-expr/src/aggregate/mod.rs b/datafusion/physical-expr/src/aggregate/mod.rs
index 8da635cfb2..09fd9bcfc5 100644
--- a/datafusion/physical-expr/src/aggregate/mod.rs
+++ b/datafusion/physical-expr/src/aggregate/mod.rs
@@ -16,6 +16,7 @@
// under the License.
use crate::aggregate::row_accumulator::RowAccumulator;
+use crate::expressions::{ArrayAgg, FirstValue, LastValue};
use crate::PhysicalExpr;
use arrow::datatypes::Field;
use datafusion_common::{DataFusionError, Result};
@@ -130,3 +131,13 @@ pub trait AggregateExpr: Send + Sync + Debug + PartialEq<dyn Any> {
)))
}
}
+
+/// Checks whether the given aggregate expression is order-sensitive.
+/// For instance, a `SUM` aggregation doesn't depend on the order of its inputs.
+/// However, a `FirstValue` depends on the input ordering (if the order changes,
+/// the first value in the list would change).
+pub fn is_order_sensitive(aggr_expr: &Arc<dyn AggregateExpr>) -> bool {
+ aggr_expr.as_any().is::<FirstValue>()
+ || aggr_expr.as_any().is::<LastValue>()
+ || aggr_expr.as_any().is::<ArrayAgg>()
+}
diff --git a/datafusion/physical-expr/src/lib.rs b/datafusion/physical-expr/src/lib.rs
index b54bcda601..494f35566d 100644
--- a/datafusion/physical-expr/src/lib.rs
+++ b/datafusion/physical-expr/src/lib.rs
@@ -56,9 +56,11 @@ pub use equivalence::{
pub use physical_expr::{AnalysisContext, ExprBoundaries, PhysicalExpr, PhysicalExprRef};
pub use planner::create_physical_expr;
pub use scalar_function::ScalarFunctionExpr;
-pub use sort_expr::{LexOrdering, PhysicalSortExpr, PhysicalSortRequirement};
+pub use sort_expr::{
+ LexOrdering, LexOrderingReq, PhysicalSortExpr, PhysicalSortRequirement,
+};
pub use utils::{
expr_list_eq_any_order, expr_list_eq_strict_order,
normalize_expr_with_equivalence_properties, normalize_out_expr_with_columns_map,
- split_conjunction,
+ reverse_order_bys, split_conjunction,
};
diff --git a/datafusion/physical-expr/src/sort_expr.rs b/datafusion/physical-expr/src/sort_expr.rs
index 665a47e586..dc93b67fa6 100644
--- a/datafusion/physical-expr/src/sort_expr.rs
+++ b/datafusion/physical-expr/src/sort_expr.rs
@@ -214,5 +214,8 @@ fn to_str(options: &SortOptions) -> &str {
}
}
-/// `LexOrdering` is a type alias for lexicographical ordering definition `Vec<PhysicalSortExpr>`
+///`LexOrdering` is a type alias for lexicographical ordering definition`Vec<PhysicalSortExpr>`
pub type LexOrdering = Vec<PhysicalSortExpr>;
+
+///`LexOrderingReq` is a type alias for lexicographical ordering requirement definition`Vec<PhysicalSortRequirement>`
+pub type LexOrderingReq = Vec<PhysicalSortRequirement>;
diff --git a/datafusion/physical-expr/src/utils.rs b/datafusion/physical-expr/src/utils.rs
index d9009ca31e..9c28243bed 100644
--- a/datafusion/physical-expr/src/utils.rs
+++ b/datafusion/physical-expr/src/utils.rs
@@ -679,6 +679,44 @@ pub fn reassign_predicate_columns(
})
}
+/// Reverses the ORDER BY expression, which is useful during equivalent window
+/// expression construction. For instance, 'ORDER BY a ASC, NULLS LAST' turns into
+/// 'ORDER BY a DESC, NULLS FIRST'.
+pub fn reverse_order_bys(order_bys: &[PhysicalSortExpr]) -> Vec<PhysicalSortExpr> {
+ order_bys
+ .iter()
+ .map(|e| PhysicalSortExpr {
+ expr: e.expr.clone(),
+ options: !e.options,
+ })
+ .collect()
+}
+
+/// Find the finer requirement among `req1` and `req2`
+/// If `None`, this means that `req1` and `req2` are not compatible
+/// e.g there is no requirement that satisfies both
+pub fn get_finer_ordering<
+ 'a,
+ F: Fn() -> EquivalenceProperties,
+ F2: Fn() -> OrderingEquivalenceProperties,
+>(
+ req1: &'a [PhysicalSortExpr],
+ req2: &'a [PhysicalSortExpr],
+ eq_properties: F,
+ ordering_eq_properties: F2,
+) -> Option<&'a [PhysicalSortExpr]> {
+ if ordering_satisfy_concrete(req1, req2, &eq_properties, &ordering_eq_properties) {
+ // Finer requirement is `provided`, since it satisfies the other:
+ return Some(req1);
+ }
+ if ordering_satisfy_concrete(req2, req1, &eq_properties, &ordering_eq_properties) {
+ // Finer requirement is `req`, since it satisfies the other:
+ return Some(req2);
+ }
+ // Neither `provided` nor `req` satisfies one another, they are incompatible.
+ None
+}
+
#[cfg(test)]
mod tests {
use super::*;
diff --git a/datafusion/physical-expr/src/window/aggregate.rs b/datafusion/physical-expr/src/window/aggregate.rs
index 95fd86148a..d0173949dd 100644
--- a/datafusion/physical-expr/src/window/aggregate.rs
+++ b/datafusion/physical-expr/src/window/aggregate.rs
@@ -29,11 +29,13 @@ use datafusion_common::ScalarValue;
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::{Accumulator, WindowFrame};
-use crate::window::window_expr::{reverse_order_bys, AggregateWindowExpr};
+use crate::window::window_expr::AggregateWindowExpr;
use crate::window::{
PartitionBatches, PartitionWindowAggStates, SlidingAggregateWindowExpr, WindowExpr,
};
-use crate::{expressions::PhysicalSortExpr, AggregateExpr, PhysicalExpr};
+use crate::{
+ expressions::PhysicalSortExpr, reverse_order_bys, AggregateExpr, PhysicalExpr,
+};
/// A window expr that takes the form of an aggregate function
/// Aggregate Window Expressions that have the form
diff --git a/datafusion/physical-expr/src/window/built_in.rs b/datafusion/physical-expr/src/window/built_in.rs
index 1576e8e9c4..59674257ed 100644
--- a/datafusion/physical-expr/src/window/built_in.rs
+++ b/datafusion/physical-expr/src/window/built_in.rs
@@ -25,12 +25,12 @@ use super::window_frame_state::WindowFrameContext;
use super::BuiltInWindowFunctionExpr;
use super::WindowExpr;
use crate::window::window_expr::{
- reverse_order_bys, BuiltinWindowState, NthValueKind, NthValueState, WindowFn,
+ BuiltinWindowState, NthValueKind, NthValueState, WindowFn,
};
use crate::window::{
PartitionBatches, PartitionWindowAggStates, WindowAggState, WindowState,
};
-use crate::{expressions::PhysicalSortExpr, PhysicalExpr};
+use crate::{expressions::PhysicalSortExpr, reverse_order_bys, PhysicalExpr};
use arrow::array::{new_empty_array, Array, ArrayRef};
use arrow::compute::SortOptions;
use arrow::datatypes::Field;
diff --git a/datafusion/physical-expr/src/window/sliding_aggregate.rs b/datafusion/physical-expr/src/window/sliding_aggregate.rs
index 7fa33d71ca..8ce3f42bea 100644
--- a/datafusion/physical-expr/src/window/sliding_aggregate.rs
+++ b/datafusion/physical-expr/src/window/sliding_aggregate.rs
@@ -28,11 +28,13 @@ use arrow::record_batch::RecordBatch;
use datafusion_common::{Result, ScalarValue};
use datafusion_expr::{Accumulator, WindowFrame};
-use crate::window::window_expr::{reverse_order_bys, AggregateWindowExpr};
+use crate::window::window_expr::AggregateWindowExpr;
use crate::window::{
PartitionBatches, PartitionWindowAggStates, PlainAggregateWindowExpr, WindowExpr,
};
-use crate::{expressions::PhysicalSortExpr, AggregateExpr, PhysicalExpr};
+use crate::{
+ expressions::PhysicalSortExpr, reverse_order_bys, AggregateExpr, PhysicalExpr,
+};
/// A window expr that takes the form of an aggregate function
/// Aggregate Window Expressions that have the form
diff --git a/datafusion/physical-expr/src/window/window_expr.rs b/datafusion/physical-expr/src/window/window_expr.rs
index 140ce67f4a..ec0d929b7d 100644
--- a/datafusion/physical-expr/src/window/window_expr.rs
+++ b/datafusion/physical-expr/src/window/window_expr.rs
@@ -253,19 +253,6 @@ pub trait AggregateWindowExpr: WindowExpr {
}
}
-/// Reverses the ORDER BY expression, which is useful during equivalent window
-/// expression construction. For instance, 'ORDER BY a ASC, NULLS LAST' turns into
-/// 'ORDER BY a DESC, NULLS FIRST'.
-pub fn reverse_order_bys(order_bys: &[PhysicalSortExpr]) -> Vec<PhysicalSortExpr> {
- order_bys
- .iter()
- .map(|e| PhysicalSortExpr {
- expr: e.expr.clone(),
- options: !e.options,
- })
- .collect()
-}
-
#[derive(Debug)]
pub enum WindowFn {
Builtin(Box<dyn PartitionEvaluator>),