You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/06/23 15:34:07 UTC
[arrow-datafusion] branch main updated: Add support for order-sensitive aggregation for multipartitions (#6734)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new e60e9d2eda Add support for order-sensitive aggregation for multipartitions (#6734)
e60e9d2eda is described below
commit e60e9d2edafdc931720ec4e7d053ce946daf61a7
Author: Mustafa Akur <10...@users.noreply.github.com>
AuthorDate: Fri Jun 23 18:34:02 2023 +0300
Add support for order-sensitive aggregation for multipartitions (#6734)
* Naive test pass
i
* Add new tests and simplifications
* move tests to the .slt file
* update requirement
* update tests
* Add support for partiallyOrdered aggregation sensitive.
* Resolve linter errors
* update comments
* minor changes
* retract changes in generated
* update proto files
* Simplifications
* Make types consistent in schema, and data
* Update todos
* Convert API to vector
* Convert get_finest to handle Vector inputs
* simplifications, update comment
* initial commit, add test
* Add support for FIRST Aggregate function.
* Add support for last aggregate
* Update cargo.lock
* Remove distinct, and limit from First and last aggregate.
* Add reverse for First and Last Aggregator
* Update cargo lock
* Minor code simplifications
* Update comment
* Update documents
* Fix projection pushdown bug
* fix projection push down failure bug
* combine first_agg and last_agg parsers
* Update documentation
* Update subproject
* initial commit
* Add test code
* initial version
* simplify prints
* minor changes
* sqllogictests pass
* All tests pass
* update proto function names
* Minor changes
* do not consider ordering requirement in ordering insensitive aggregators
* Reject aggregate order by for window functions.
* initial commit
* Add new tests
* Add reverse for sort removal
* simplifications
* simplifications
* Bug fix, do not consider reverse requirement if it is not possible
* Fix cargo lock file
* Change reverse_order_by function place
* Move required input ordering calculation logic to its own function
* Add new tests
* simplifications
* Update comment
* Rename aggregator first and last
* minor change
* Comment improvements
* Remove count from First,Last accumulators
* Remove clone
* Simplifications
* Simplifications, update comment
* Improve comments
* Move LexOrdering requirement to common place
* Update comment, refactor implementation
* bug fix:
* Use naive requirement when reverse requirement is not helpful by convention.
* Update test
* Update comments
* Change function place
* Move get_finer to utls
* change name of last, first impl,
* Fix error message
* initial commit
* initial impl
* all slt tests pass
* Multipartition first, last aggregator
* Add new slt tests
* all tests pass
* resolve linter errors
* minor changes
* Add new tests
* array_agg multi partition implementation
* Resolve linter errors
* remove unnecessary prints
* minor changes
* minor changes
* remove unnecessary prints
* Simplifications
* Simplifications
* Add order by expressions in AggregateExpr
* Update size calculation
* Minor changes
* Linear array merge implementation
* Change merge algorithm to heap
* Handle distinct and order-sensitive case
* Update comments
* Address reviews
* Improve comments, use functional style when applicable, better attribute naming
* Address reviews
* Address reviews
* Fix tests
---------
Co-authored-by: Mehmet Ozan Kabak <oz...@gmail.com>
Co-authored-by: berkaysynnada <be...@synnada.ai>
---
.../core/src/physical_plan/aggregates/mod.rs | 155 +++--
datafusion/core/src/physical_plan/windows/mod.rs | 10 +-
datafusion/core/src/physical_planner.rs | 16 +-
.../tests/sqllogictests/test_files/groupby.slt | 294 +++++++++-
.../src/aggregate/array_agg_ordered.rs | 621 +++++++++++++++++++++
datafusion/physical-expr/src/aggregate/build_in.rs | 39 +-
.../physical-expr/src/aggregate/first_last.rs | 203 +++++--
datafusion/physical-expr/src/aggregate/mod.rs | 14 +-
datafusion/physical-expr/src/aggregate/utils.rs | 24 +-
datafusion/physical-expr/src/expressions/mod.rs | 1 +
datafusion/proto/proto/datafusion.proto | 1 +
datafusion/proto/src/generated/pbjson.rs | 18 +
datafusion/proto/src/generated/prost.rs | 2 +
datafusion/proto/src/physical_plan/mod.rs | 4 +-
datafusion/proto/src/physical_plan/to_proto.rs | 13 +
15 files changed, 1274 insertions(+), 141 deletions(-)
diff --git a/datafusion/core/src/physical_plan/aggregates/mod.rs b/datafusion/core/src/physical_plan/aggregates/mod.rs
index 8e68450078..343f7628b7 100644
--- a/datafusion/core/src/physical_plan/aggregates/mod.rs
+++ b/datafusion/core/src/physical_plan/aggregates/mod.rs
@@ -81,6 +81,8 @@ pub enum AggregateMode {
/// Group By expression modes
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum GroupByOrderMode {
+ /// None of the expressions in the GROUP BY clause have an ordering.
+ None,
/// Some of the expressions in the GROUP BY clause have an ordering.
// For example, if the input is ordered by a, b, c and we group by b, a, d;
// the mode will be `PartiallyOrdered` meaning a subset of group b, a, d
@@ -429,10 +431,12 @@ fn get_finest_requirement<
/// ordering requirements of order-sensitive aggregation functions.
fn calc_required_input_ordering(
input: &Arc<dyn ExecutionPlan>,
- aggr_expr: &mut [Arc<dyn AggregateExpr>],
+ aggr_exprs: &mut [Arc<dyn AggregateExpr>],
+ order_by_exprs: &mut [Option<LexOrdering>],
aggregator_reqs: LexOrderingReq,
aggregator_reverse_reqs: Option<LexOrderingReq>,
- aggregation_ordering: &Option<AggregationOrdering>,
+ aggregation_ordering: &mut Option<AggregationOrdering>,
+ mode: &AggregateMode,
) -> Result<Option<LexOrderingReq>> {
let mut required_input_ordering = vec![];
// Boolean shows that whether `required_input_ordering` stored comes from
@@ -459,14 +463,14 @@ fn calc_required_input_ordering(
// then we append the aggregator ordering requirement to the existing
// ordering. This way, we can still run with bounded memory.
mode: GroupByOrderMode::FullyOrdered | GroupByOrderMode::PartiallyOrdered,
- ..
+ order_indices,
}) = aggregation_ordering
{
// Get the section of the input ordering that enables us to run in
// FullyOrdered or PartiallyOrdered modes:
let requirement_prefix =
if let Some(existing_ordering) = input.output_ordering() {
- &existing_ordering[0..ordering.len()]
+ &existing_ordering[0..order_indices.len()]
} else {
&[]
};
@@ -474,21 +478,44 @@ fn calc_required_input_ordering(
PhysicalSortRequirement::from_sort_exprs(requirement_prefix.iter());
for req in aggregator_requirement {
if requirement.iter().all(|item| req.expr.ne(&item.expr)) {
- requirement.push(req);
+ requirement.push(req.clone());
+ }
+ // In partial mode, append required ordering of the aggregator to the output ordering.
+ // In case of multiple partitions, this enables us to reduce partitions correctly.
+ if matches!(mode, AggregateMode::Partial)
+ && ordering.iter().all(|item| req.expr.ne(&item.expr))
+ {
+ ordering.push(req.into());
}
}
required_input_ordering = requirement;
} else {
+ // If there was no pre-existing output ordering, the output ordering is simply the required
+ // ordering of the aggregator in partial mode.
+ if matches!(mode, AggregateMode::Partial)
+ && !aggregator_requirement.is_empty()
+ {
+ *aggregation_ordering = Some(AggregationOrdering {
+ mode: GroupByOrderMode::None,
+ order_indices: vec![],
+ ordering: PhysicalSortRequirement::to_sort_exprs(
+ aggregator_requirement.clone(),
+ ),
+ });
+ }
required_input_ordering = aggregator_requirement;
}
- // keep track of from which direction required_input_ordering is constructed
+ // Keep track of the direction from which required_input_ordering is constructed:
reverse_req = is_reverse;
- // If all of the order-sensitive aggregate functions are reversible (such as all of the order-sensitive aggregators are
- // either FIRST_VALUE or LAST_VALUE). We can run aggregate expressions both in the direction of naive required ordering
- // (e.g finest requirement that satisfy each aggregate function requirement) and in its reversed (opposite) direction.
- // We analyze these two possibilities, and use the version that satisfies existing ordering (This saves us adding
- // unnecessary SortExec to the final plan). If none of the versions satisfy existing ordering, we use naive required ordering.
- // In short, if running aggregators in reverse order, helps us to remove a `SortExec`, we do so. Otherwise, we use aggregators as is.
+ // If all the order-sensitive aggregate functions are reversible (e.g. all the
+ // order-sensitive aggregators are either FIRST_VALUE or LAST_VALUE), then we can
+ // run aggregate expressions either in the given required ordering, (i.e. finest
+ // requirement that satisfies every aggregate function requirement) or its reverse
+ // (opposite) direction. We analyze these two possibilities, and use the version that
+ // satisfies existing ordering. This enables us to avoid an extra sort step in the final
+ // plan. If neither version satisfies the existing ordering, we use the given ordering
+ // requirement. In short, if running aggregators in reverse order help us to avoid a
+ // sorting step, we do so. Otherwise, we use the aggregators as is.
let existing_ordering = input.output_ordering().unwrap_or(&[]);
if ordering_satisfy_requirement_concrete(
existing_ordering,
@@ -499,17 +526,20 @@ fn calc_required_input_ordering(
break;
}
}
- // If `required_input_ordering` is constructed using reverse requirement, we should reverse
- // each `aggr_expr` to be able to correctly calculate their result in reverse order.
+ // If `required_input_ordering` is constructed using the reverse requirement, we
+ // should reverse each `aggr_expr` in order to correctly calculate their results
+ // in reverse order.
if reverse_req {
- aggr_expr
+ aggr_exprs
.iter_mut()
- .map(|elem| {
- if is_order_sensitive(elem) {
- if let Some(reverse) = elem.reverse_expr() {
- *elem = reverse;
+ .zip(order_by_exprs.iter_mut())
+ .map(|(aggr_expr, ob_expr)| {
+ if is_order_sensitive(aggr_expr) {
+ if let Some(reverse) = aggr_expr.reverse_expr() {
+ *aggr_expr = reverse;
+ *ob_expr = ob_expr.as_ref().map(|obs| reverse_order_bys(obs));
} else {
- return Err(DataFusionError::Execution(
+ return Err(DataFusionError::Plan(
"Aggregate expression should have a reverse expression"
.to_string(),
));
@@ -529,6 +559,7 @@ impl AggregateExec {
group_by: PhysicalGroupBy,
mut aggr_expr: Vec<Arc<dyn AggregateExpr>>,
filter_expr: Vec<Option<Arc<dyn PhysicalExpr>>>,
+ // Ordering requirement of each aggregate expression
mut order_by_expr: Vec<Option<LexOrdering>>,
input: Arc<dyn ExecutionPlan>,
input_schema: SchemaRef,
@@ -555,8 +586,6 @@ impl AggregateExec {
}
})
.collect::<Vec<_>>();
-
- let mut aggregator_reqs = vec![];
let mut aggregator_reverse_reqs = None;
// Currently we support order-sensitive aggregation only in `Single` mode.
// For `Final` and `FinalPartitioned` modes, we cannot guarantee they will receive
@@ -564,31 +593,27 @@ impl AggregateExec {
// in `Final` mode, it is not important to produce correct result in `Partial` mode.
// We only support `Single` mode, where we are sure that output produced is final, and it
// is produced in a single step.
- if mode == AggregateMode::Single {
- let requirement = get_finest_requirement(
- &mut aggr_expr,
- &mut order_by_expr,
- || input.equivalence_properties(),
- || input.ordering_equivalence_properties(),
- )?;
- let aggregator_requirement = requirement
- .as_ref()
- .map(|exprs| PhysicalSortRequirement::from_sort_exprs(exprs.iter()));
- aggregator_reqs = aggregator_requirement.unwrap_or(vec![]);
- // If all aggregate expressions are reversible, also consider reverse
- // requirement(s). The reason is that existing ordering may satisfy the
- // given requirement or its reverse. By considering both, we can generate better plans.
- if aggr_expr
- .iter()
- .all(|expr| !is_order_sensitive(expr) || expr.reverse_expr().is_some())
- {
- let reverse_agg_requirement = requirement.map(|reqs| {
- PhysicalSortRequirement::from_sort_exprs(
- reverse_order_bys(&reqs).iter(),
- )
- });
- aggregator_reverse_reqs = reverse_agg_requirement;
- }
+
+ let requirement = get_finest_requirement(
+ &mut aggr_expr,
+ &mut order_by_expr,
+ || input.equivalence_properties(),
+ || input.ordering_equivalence_properties(),
+ )?;
+ let aggregator_requirement = requirement
+ .as_ref()
+ .map(|exprs| PhysicalSortRequirement::from_sort_exprs(exprs.iter()));
+ let aggregator_reqs = aggregator_requirement.unwrap_or(vec![]);
+ // If all aggregate expressions are reversible, also consider reverse
+ // requirement(s). The reason is that existing ordering may satisfy the
+ // given requirement or its reverse. By considering both, we can generate better plans.
+ if aggr_expr
+ .iter()
+ .all(|expr| !is_order_sensitive(expr) || expr.reverse_expr().is_some())
+ {
+ aggregator_reverse_reqs = requirement.map(|reqs| {
+ PhysicalSortRequirement::from_sort_exprs(reverse_order_bys(&reqs).iter())
+ });
}
// construct a map from the input columns to the output columns of the Aggregation
@@ -601,26 +626,18 @@ impl AggregateExec {
};
}
- let aggregation_ordering = calc_aggregation_ordering(&input, &group_by);
+ let mut aggregation_ordering = calc_aggregation_ordering(&input, &group_by);
let required_input_ordering = calc_required_input_ordering(
&input,
&mut aggr_expr,
+ &mut order_by_expr,
aggregator_reqs,
aggregator_reverse_reqs,
- &aggregation_ordering,
+ &mut aggregation_ordering,
+ &mode,
)?;
- // If aggregator is working on multiple partitions and there is an order-sensitive aggregator with a requirement return error.
- if input.output_partitioning().partition_count() > 1
- && order_by_expr.iter().any(|req| req.is_some())
- {
- return Err(DataFusionError::NotImplemented(
- "Order-sensitive aggregators is not supported on multiple partitions"
- .to_string(),
- ));
- }
-
Ok(AggregateExec {
mode,
group_by,
@@ -1001,14 +1018,28 @@ fn aggregate_expressions(
} else {
None
};
- agg.expressions()
+ let mut result = agg
+ .expressions()
.into_iter()
.map(|expr| {
pre_cast_type.clone().map_or(expr.clone(), |cast_type| {
Arc::new(CastExpr::new(expr, cast_type, None))
})
})
- .collect::<Vec<_>>()
+ .collect::<Vec<_>>();
+ // In partial mode, append ordering requirements to expressions' results.
+ // Ordering requirements are used by subsequent executors to satisfy the required
+ // ordering for `AggregateMode::FinalPartitioned`/`AggregateMode::Final` modes.
+ if matches!(mode, AggregateMode::Partial) {
+ if let Some(ordering_req) = agg.order_bys() {
+ let ordering_exprs = ordering_req
+ .iter()
+ .map(|item| item.expr.clone())
+ .collect::<Vec<_>>();
+ result.extend(ordering_exprs);
+ }
+ }
+ result
})
.collect()),
// in this mode, we build the merge expressions of the aggregation
@@ -1903,6 +1934,8 @@ mod tests {
Arc::new(col_a.clone()),
"first1",
DataType::Int32,
+ vec![],
+ vec![],
)) as _;
let mut aggr_exprs = vec![aggr_expr; order_by_exprs.len()];
let res = get_finest_requirement(
diff --git a/datafusion/core/src/physical_plan/windows/mod.rs b/datafusion/core/src/physical_plan/windows/mod.rs
index 261cfa04bf..88fafe99b4 100644
--- a/datafusion/core/src/physical_plan/windows/mod.rs
+++ b/datafusion/core/src/physical_plan/windows/mod.rs
@@ -67,8 +67,14 @@ pub fn create_window_expr(
) -> Result<Arc<dyn WindowExpr>> {
Ok(match fun {
WindowFunction::AggregateFunction(fun) => {
- let aggregate =
- aggregates::create_aggregate_expr(fun, false, args, input_schema, name)?;
+ let aggregate = aggregates::create_aggregate_expr(
+ fun,
+ false,
+ args,
+ &[],
+ input_schema,
+ name,
+ )?;
window_expr_from_aggregate_expr(
partition_by,
order_by,
diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs
index aa0cc52daf..75566208e3 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -1652,13 +1652,6 @@ pub fn create_aggregate_expr_with_name_and_maybe_filter(
)?),
None => None,
};
- let agg_expr = aggregates::create_aggregate_expr(
- fun,
- *distinct,
- &args,
- physical_input_schema,
- name,
- )?;
let order_by = match order_by {
Some(e) => Some(
e.iter()
@@ -1674,6 +1667,15 @@ pub fn create_aggregate_expr_with_name_and_maybe_filter(
),
None => None,
};
+ let ordering_reqs = order_by.clone().unwrap_or(vec![]);
+ let agg_expr = aggregates::create_aggregate_expr(
+ fun,
+ *distinct,
+ &args,
+ &ordering_reqs,
+ physical_input_schema,
+ name,
+ )?;
Ok((agg_expr, filter, order_by))
}
Expr::AggregateUDF(AggregateUDF {
diff --git a/datafusion/core/tests/sqllogictests/test_files/groupby.slt b/datafusion/core/tests/sqllogictests/test_files/groupby.slt
index ca400e0ef4..597f2c1611 100644
--- a/datafusion/core/tests/sqllogictests/test_files/groupby.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/groupby.slt
@@ -2076,7 +2076,7 @@ Projection: annotated_data_infinite2.a, annotated_data_infinite2.b, FIRST_VALUE(
----TableScan: annotated_data_infinite2 projection=[a, b, c]
physical_plan
ProjectionExec: expr=[a@0 as a, b@1 as b, FIRST_VALUE(annotated_data_infinite2.c) ORDER BY [annotated_data_infinite2.a DESC NULLS FIRST]@2 as first_c]
---AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b], aggr=[FIRST_VALUE(annotated_data_infinite2.c)], ordering_mode=FullyOrdered
+--AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b], aggr=[LAST_VALUE(annotated_data_infinite2.c)], ordering_mode=FullyOrdered
----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], has_header=true
query III
@@ -2084,10 +2084,10 @@ SELECT a, b, FIRST_VALUE(c ORDER BY a DESC) as first_c
FROM annotated_data_infinite2
GROUP BY a, b
----
-0 0 0
-0 1 25
-1 2 50
-1 3 75
+0 0 24
+0 1 49
+1 2 74
+1 3 99
# test_source_sorted_groupby4
@@ -2102,7 +2102,7 @@ Projection: annotated_data_infinite2.a, annotated_data_infinite2.b, LAST_VALUE(a
----TableScan: annotated_data_infinite2 projection=[a, b, c]
physical_plan
ProjectionExec: expr=[a@0 as a, b@1 as b, LAST_VALUE(annotated_data_infinite2.c) ORDER BY [annotated_data_infinite2.a DESC NULLS FIRST]@2 as last_c]
---AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b], aggr=[LAST_VALUE(annotated_data_infinite2.c)], ordering_mode=FullyOrdered
+--AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b], aggr=[FIRST_VALUE(annotated_data_infinite2.c)], ordering_mode=FullyOrdered
----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], has_header=true
query III
@@ -2110,10 +2110,10 @@ SELECT a, b, LAST_VALUE(c ORDER BY a DESC) as last_c
FROM annotated_data_infinite2
GROUP BY a, b
----
-0 0 24
-0 1 49
-1 2 74
-1 3 99
+0 0 0
+0 1 25
+1 2 50
+1 3 75
# when LAST_VALUE, or FIRST_VALUE value do not contain ordering requirement
# queries should still work, However, result depends on the scanning order and
@@ -2580,12 +2580,280 @@ FRA 200 50 250
# Run order-sensitive aggregators in multiple partitions
statement ok
-set datafusion.execution.target_partitions = 2;
+set datafusion.execution.target_partitions = 8;
+
+# order-sensitive FIRST_VALUE and LAST_VALUE aggregators should work in
+# multi-partitions without group by also.
+query TT
+EXPLAIN SELECT country, FIRST_VALUE(amount ORDER BY ts ASC) AS fv1,
+ LAST_VALUE(amount ORDER BY ts ASC) AS fv2
+ FROM sales_global
+ GROUP BY country
+ ORDER BY country
+----
+logical_plan
+Sort: sales_global.country ASC NULLS LAST
+--Projection: sales_global.country, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST] AS fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST] AS fv2
+----Aggregate: groupBy=[[sales_global.country]], aggr=[[FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]]]
+------TableScan: sales_global projection=[country, ts, amount]
+physical_plan
+SortPreservingMergeExec: [country@0 ASC NULLS LAST]
+--SortExec: expr=[country@0 ASC NULLS LAST]
+----ProjectionExec: expr=[country@0 as country, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]@1 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]@2 as fv2]
+------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)]
+--------SortExec: expr=[ts@1 ASC NULLS LAST]
+----------CoalesceBatchesExec: target_batch_size=8192
+------------RepartitionExec: partitioning=Hash([country@0], 8), input_partitions=8
+--------------AggregateExec: mode=Partial, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)], ordering_mode=None
+----------------SortExec: expr=[ts@1 ASC NULLS LAST]
+------------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
+--------------------MemoryExec: partitions=1, partition_sizes=[1]
+
+query TRR
+SELECT country, FIRST_VALUE(amount ORDER BY ts ASC) AS fv1,
+ LAST_VALUE(amount ORDER BY ts ASC) AS fv2
+ FROM sales_global
+ GROUP BY country
+ ORDER BY country
+----
+FRA 50 200
+GRC 30 80
+TUR 75 100
+
+# Conversion in between FIRST_VALUE and LAST_VALUE to resolve
+# contradictory requirements should work in multi partitions.
+query TT
+EXPLAIN SELECT country, FIRST_VALUE(amount ORDER BY ts ASC) AS fv1,
+ LAST_VALUE(amount ORDER BY ts DESC) AS fv2
+ FROM sales_global
+ GROUP BY country
+ ORDER BY country
+----
+logical_plan
+Sort: sales_global.country ASC NULLS LAST
+--Projection: sales_global.country, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST] AS fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST] AS fv2
+----Aggregate: groupBy=[[sales_global.country]], aggr=[[FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]]]
+------TableScan: sales_global projection=[country, ts, amount]
+physical_plan
+SortPreservingMergeExec: [country@0 ASC NULLS LAST]
+--SortExec: expr=[country@0 ASC NULLS LAST]
+----ProjectionExec: expr=[country@0 as country, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]@1 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@2 as fv2]
+------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount), FIRST_VALUE(sales_global.amount)]
+--------SortExec: expr=[ts@1 ASC NULLS LAST]
+----------CoalesceBatchesExec: target_batch_size=8192
+------------RepartitionExec: partitioning=Hash([country@0], 8), input_partitions=8
+--------------AggregateExec: mode=Partial, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount), FIRST_VALUE(sales_global.amount)], ordering_mode=None
+----------------SortExec: expr=[ts@1 ASC NULLS LAST]
+------------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
+--------------------MemoryExec: partitions=1, partition_sizes=[1]
+
+query TRR
+SELECT country, FIRST_VALUE(amount ORDER BY ts ASC) AS fv1,
+ LAST_VALUE(amount ORDER BY ts DESC) AS fv2
+ FROM sales_global
+ GROUP BY country
+ ORDER BY country
+----
+FRA 50 50
+GRC 30 30
+TUR 75 75
+
+# order-sensitive FIRST_VALUE and LAST_VALUE aggregators should work in
+# multi-partitions without group by also.
+query TT
+EXPLAIN SELECT FIRST_VALUE(amount ORDER BY ts ASC) AS fv1,
+ LAST_VALUE(amount ORDER BY ts ASC) AS fv2
+ FROM sales_global
+----
+logical_plan
+Projection: FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST] AS fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST] AS fv2
+--Aggregate: groupBy=[[]], aggr=[[FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]]]
+----TableScan: sales_global projection=[ts, amount]
+physical_plan
+ProjectionExec: expr=[FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]@0 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]@1 as fv2]
+--AggregateExec: mode=Final, gby=[], aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)]
+----SortPreservingMergeExec: [ts@0 ASC NULLS LAST]
+------AggregateExec: mode=Partial, gby=[], aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)], ordering_mode=None
+--------SortExec: expr=[ts@0 ASC NULLS LAST]
+----------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
+------------MemoryExec: partitions=1, partition_sizes=[1]
+
+query RR
+SELECT FIRST_VALUE(amount ORDER BY ts ASC) AS fv1,
+ LAST_VALUE(amount ORDER BY ts ASC) AS fv2
+ FROM sales_global
+----
+30 80
+
+# Conversion in between FIRST_VALUE and LAST_VALUE to resolve
+# contradictory requirements should work in multi partitions.
+query TT
+EXPLAIN SELECT FIRST_VALUE(amount ORDER BY ts ASC) AS fv1,
+ LAST_VALUE(amount ORDER BY ts DESC) AS fv2
+ FROM sales_global
+----
+logical_plan
+Projection: FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST] AS fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST] AS fv2
+--Aggregate: groupBy=[[]], aggr=[[FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]]]
+----TableScan: sales_global projection=[ts, amount]
+physical_plan
+ProjectionExec: expr=[FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]@0 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@1 as fv2]
+--AggregateExec: mode=Final, gby=[], aggr=[FIRST_VALUE(sales_global.amount), FIRST_VALUE(sales_global.amount)]
+----SortPreservingMergeExec: [ts@0 ASC NULLS LAST]
+------AggregateExec: mode=Partial, gby=[], aggr=[FIRST_VALUE(sales_global.amount), FIRST_VALUE(sales_global.amount)], ordering_mode=None
+--------SortExec: expr=[ts@0 ASC NULLS LAST]
+----------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
+------------MemoryExec: partitions=1, partition_sizes=[1]
+
+query RR
+SELECT FIRST_VALUE(amount ORDER BY ts ASC) AS fv1,
+ LAST_VALUE(amount ORDER BY ts DESC) AS fv2
+ FROM sales_global
+----
+30 30
-# Currently, we do not support running order-sensitive aggregators in multiple partitions.
-statement error This feature is not implemented: Order-sensitive aggregators is not supported on multiple partitions
+# ARRAY_AGG should work in multiple partitions
+query TT
+EXPLAIN SELECT ARRAY_AGG(amount ORDER BY ts ASC) AS array_agg1
+ FROM sales_global
+----
+logical_plan
+Projection: ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST] AS array_agg1
+--Aggregate: groupBy=[[]], aggr=[[ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]]]
+----TableScan: sales_global projection=[ts, amount]
+physical_plan
+ProjectionExec: expr=[ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]@0 as array_agg1]
+--AggregateExec: mode=Final, gby=[], aggr=[ARRAY_AGG(sales_global.amount)]
+----SortPreservingMergeExec: [ts@0 ASC NULLS LAST]
+------AggregateExec: mode=Partial, gby=[], aggr=[ARRAY_AGG(sales_global.amount)], ordering_mode=None
+--------SortExec: expr=[ts@0 ASC NULLS LAST]
+----------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
+------------MemoryExec: partitions=1, partition_sizes=[1]
+
+query ?
+SELECT ARRAY_AGG(amount ORDER BY ts ASC) AS array_agg1
+ FROM sales_global
+----
+[30.0, 50.0, 75.0, 200.0, 100.0, 80.0]
+
+# ARRAY_AGG should work in multiple partitions
+query TT
+EXPLAIN SELECT ARRAY_AGG(amount ORDER BY ts DESC) AS array_agg1
+ FROM sales_global
+----
+logical_plan
+Projection: ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST] AS array_agg1
+--Aggregate: groupBy=[[]], aggr=[[ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]]]
+----TableScan: sales_global projection=[ts, amount]
+physical_plan
+ProjectionExec: expr=[ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@0 as array_agg1]
+--AggregateExec: mode=Final, gby=[], aggr=[ARRAY_AGG(sales_global.amount)]
+----SortPreservingMergeExec: [ts@0 DESC]
+------AggregateExec: mode=Partial, gby=[], aggr=[ARRAY_AGG(sales_global.amount)], ordering_mode=None
+--------SortExec: expr=[ts@0 DESC]
+----------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
+------------MemoryExec: partitions=1, partition_sizes=[1]
+
+query ?
+SELECT ARRAY_AGG(amount ORDER BY ts DESC) AS array_agg1
+ FROM sales_global
+----
+[100.0, 80.0, 200.0, 75.0, 50.0, 30.0]
+
+# ARRAY_AGG should work in multiple partitions
+query TT
+EXPLAIN SELECT ARRAY_AGG(amount ORDER BY amount ASC) AS array_agg1
+ FROM sales_global
+----
+logical_plan
+Projection: ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST] AS array_agg1
+--Aggregate: groupBy=[[]], aggr=[[ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]]]
+----TableScan: sales_global projection=[amount]
+physical_plan
+ProjectionExec: expr=[ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@0 as array_agg1]
+--AggregateExec: mode=Final, gby=[], aggr=[ARRAY_AGG(sales_global.amount)]
+----SortPreservingMergeExec: [amount@0 ASC NULLS LAST]
+------AggregateExec: mode=Partial, gby=[], aggr=[ARRAY_AGG(sales_global.amount)], ordering_mode=None
+--------SortExec: expr=[amount@0 ASC NULLS LAST]
+----------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
+------------MemoryExec: partitions=1, partition_sizes=[1]
+
+query ?
+SELECT ARRAY_AGG(amount ORDER BY amount ASC) AS array_agg1
+ FROM sales_global
+----
+[30.0, 50.0, 75.0, 80.0, 100.0, 200.0]
+
+# ARRAY_AGG should work in multiple partitions
+query TT
+EXPLAIN SELECT country, ARRAY_AGG(amount ORDER BY amount ASC) AS array_agg1
+ FROM sales_global
+ GROUP BY country
+ ORDER BY country
+----
+logical_plan
+Sort: sales_global.country ASC NULLS LAST
+--Projection: sales_global.country, ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST] AS array_agg1
+----Aggregate: groupBy=[[sales_global.country]], aggr=[[ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]]]
+------TableScan: sales_global projection=[country, amount]
+physical_plan
+SortPreservingMergeExec: [country@0 ASC NULLS LAST]
+--SortExec: expr=[country@0 ASC NULLS LAST]
+----ProjectionExec: expr=[country@0 as country, ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@1 as array_agg1]
+------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country], aggr=[ARRAY_AGG(sales_global.amount)]
+--------SortExec: expr=[amount@1 ASC NULLS LAST]
+----------CoalesceBatchesExec: target_batch_size=8192
+------------RepartitionExec: partitioning=Hash([country@0], 8), input_partitions=8
+--------------AggregateExec: mode=Partial, gby=[country@0 as country], aggr=[ARRAY_AGG(sales_global.amount)], ordering_mode=None
+----------------SortExec: expr=[amount@1 ASC NULLS LAST]
+------------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
+--------------------MemoryExec: partitions=1, partition_sizes=[1]
+
+query T?
+SELECT country, ARRAY_AGG(amount ORDER BY amount ASC) AS array_agg1
+ FROM sales_global
+ GROUP BY country
+ ORDER BY country
+----
+FRA [50.0, 200.0]
+GRC [30.0, 80.0]
+TUR [75.0, 100.0]
+
+# ARRAY_AGG, FIRST_VALUE, LAST_VALUE should work in multiple partitions
+query TT
+EXPLAIN SELECT country, ARRAY_AGG(amount ORDER BY amount DESC) AS amounts,
+ FIRST_VALUE(amount ORDER BY amount ASC) AS fv1,
+ LAST_VALUE(amount ORDER BY amount DESC) AS fv2
+ FROM sales_global
+ GROUP BY country
+ ORDER BY country
+----
+logical_plan
+Sort: sales_global.country ASC NULLS LAST
+--Projection: sales_global.country, ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST] AS amounts, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST] AS fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST] AS fv2
+----Aggregate: groupBy=[[sales_global.country]], aggr=[[ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST], FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST]]]
+------TableScan: sales_global projection=[country, amount]
+physical_plan
+SortPreservingMergeExec: [country@0 ASC NULLS LAST]
+--SortExec: expr=[country@0 ASC NULLS LAST]
+----ProjectionExec: expr=[country@0 as country, ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST]@1 as amounts, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@2 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST]@3 as fv2]
+------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country], aggr=[ARRAY_AGG(sales_global.amount), LAST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)]
+--------SortExec: expr=[amount@1 DESC]
+----------CoalesceBatchesExec: target_batch_size=8192
+------------RepartitionExec: partitioning=Hash([country@0], 8), input_partitions=8
+--------------AggregateExec: mode=Partial, gby=[country@0 as country], aggr=[ARRAY_AGG(sales_global.amount), LAST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)], ordering_mode=None
+----------------SortExec: expr=[amount@1 DESC]
+------------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
+--------------------MemoryExec: partitions=1, partition_sizes=[1]
+
+query T?RR
SELECT country, ARRAY_AGG(amount ORDER BY amount DESC) AS amounts,
FIRST_VALUE(amount ORDER BY amount ASC) AS fv1,
LAST_VALUE(amount ORDER BY amount DESC) AS fv2
FROM sales_global
GROUP BY country
+ ORDER BY country
+----
+FRA [200.0, 50.0] 50 50
+GRC [80.0, 30.0] 30 30
+TUR [100.0, 75.0] 75 75
diff --git a/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs b/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs
new file mode 100644
index 0000000000..f0eb442af0
--- /dev/null
+++ b/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs
@@ -0,0 +1,621 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions which specify ordering requirement
+//! that can evaluated at runtime during query execution
+
+use crate::aggregate::utils::{down_cast_any_ref, ordering_fields};
+use crate::expressions::format_state_name;
+use crate::{AggregateExpr, LexOrdering, PhysicalExpr, PhysicalSortExpr};
+use arrow::array::ArrayRef;
+use arrow::datatypes::{DataType, Field};
+use arrow_array::{Array, ListArray};
+use arrow_schema::{Fields, SortOptions};
+use datafusion_common::utils::{compare_rows, get_row_at_idx};
+use datafusion_common::{DataFusionError, Result, ScalarValue};
+use datafusion_expr::Accumulator;
+use itertools::izip;
+use std::any::Any;
+use std::cmp::Ordering;
+use std::collections::BinaryHeap;
+use std::fmt::Debug;
+use std::sync::Arc;
+
+/// Expression for a ARRAY_AGG(ORDER BY) aggregation.
+/// When aggregation works in multiple partitions
+/// aggregations are split into multiple partitions,
+/// then their results are merged. This aggregator
+/// is a version of ARRAY_AGG that can support producing
+/// intermediate aggregation (with necessary side information)
+/// and that can merge aggregations from multiple partitions.
+#[derive(Debug)]
+pub struct OrderSensitiveArrayAgg {
+ name: String,
+ input_data_type: DataType,
+ order_by_data_types: Vec<DataType>,
+ expr: Arc<dyn PhysicalExpr>,
+ ordering_req: LexOrdering,
+}
+
+impl OrderSensitiveArrayAgg {
+ /// Create a new `OrderSensitiveArrayAgg` aggregate function
+ pub fn new(
+ expr: Arc<dyn PhysicalExpr>,
+ name: impl Into<String>,
+ input_data_type: DataType,
+ order_by_data_types: Vec<DataType>,
+ ordering_req: LexOrdering,
+ ) -> Self {
+ Self {
+ name: name.into(),
+ expr,
+ input_data_type,
+ order_by_data_types,
+ ordering_req,
+ }
+ }
+}
+
+impl AggregateExpr for OrderSensitiveArrayAgg {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn field(&self) -> Result<Field> {
+ Ok(Field::new_list(
+ &self.name,
+ Field::new("item", self.input_data_type.clone(), true),
+ false,
+ ))
+ }
+
+ fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+ Ok(Box::new(OrderSensitiveArrayAggAccumulator::try_new(
+ &self.input_data_type,
+ &self.order_by_data_types,
+ self.ordering_req.clone(),
+ )?))
+ }
+
+ fn state_fields(&self) -> Result<Vec<Field>> {
+ let mut fields = vec![Field::new_list(
+ format_state_name(&self.name, "array_agg"),
+ Field::new("item", self.input_data_type.clone(), true),
+ false,
+ )];
+ let orderings = ordering_fields(&self.ordering_req, &self.order_by_data_types);
+ fields.push(Field::new_list(
+ format_state_name(&self.name, "array_agg_orderings"),
+ Field::new(
+ "item",
+ DataType::Struct(Fields::from(orderings.clone())),
+ true,
+ ),
+ false,
+ ));
+ fields.extend(orderings);
+ Ok(fields)
+ }
+
+ fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+ vec![self.expr.clone()]
+ }
+
+ fn order_bys(&self) -> Option<&[PhysicalSortExpr]> {
+ if self.ordering_req.is_empty() {
+ None
+ } else {
+ Some(&self.ordering_req)
+ }
+ }
+
+ fn name(&self) -> &str {
+ &self.name
+ }
+}
+
+impl PartialEq<dyn Any> for OrderSensitiveArrayAgg {
+ fn eq(&self, other: &dyn Any) -> bool {
+ down_cast_any_ref(other)
+ .downcast_ref::<Self>()
+ .map(|x| {
+ self.name == x.name
+ && self.input_data_type == x.input_data_type
+ && self.order_by_data_types == x.order_by_data_types
+ && self.expr.eq(&x.expr)
+ })
+ .unwrap_or(false)
+ }
+}
+
+#[derive(Debug)]
+pub(crate) struct OrderSensitiveArrayAggAccumulator {
+ // `values` stores entries in the ARRAY_AGG result.
+ values: Vec<ScalarValue>,
+ // `ordering_values` stores values of ordering requirement expression
+ // corresponding to each value in the ARRAY_AGG.
+ // For each `ScalarValue` inside `values`, there will be a corresponding
+ // `Vec<ScalarValue>` inside `ordering_values` which stores it ordering.
+ // This information is used during merging results of the different partitions.
+ // For detailed information how merging is done see [`merge_ordered_arrays`]
+ ordering_values: Vec<Vec<ScalarValue>>,
+ // `datatypes` stores, datatype of expression inside ARRAY_AGG and ordering requirement expressions.
+ datatypes: Vec<DataType>,
+ // Stores ordering requirement of the Accumulator
+ ordering_req: LexOrdering,
+}
+
+impl OrderSensitiveArrayAggAccumulator {
+ /// Create a new order-sensitive ARRAY_AGG accumulator based on the given
+ /// item data type.
+ pub fn try_new(
+ datatype: &DataType,
+ ordering_dtypes: &[DataType],
+ ordering_req: LexOrdering,
+ ) -> Result<Self> {
+ let mut datatypes = vec![datatype.clone()];
+ datatypes.extend(ordering_dtypes.iter().cloned());
+ Ok(Self {
+ values: vec![],
+ ordering_values: vec![],
+ datatypes,
+ ordering_req,
+ })
+ }
+}
+
+impl Accumulator for OrderSensitiveArrayAggAccumulator {
+ fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+ if values.is_empty() {
+ return Ok(());
+ }
+ let n_row = values[0].len();
+ for index in 0..n_row {
+ let row = get_row_at_idx(values, index)?;
+ self.values.push(row[0].clone());
+ self.ordering_values.push(row[1..].to_vec());
+ }
+ Ok(())
+ }
+
+ fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
+ if states.is_empty() {
+ return Ok(());
+ }
+ // First entry in the state is the aggregation result.
+ let array_agg_values = &states[0];
+ // 2nd entry stores values received for ordering requirement columns, for each aggregation value inside ARRAY_AGG list.
+ // For each `ScalarValue` inside ARRAY_AGG list, we will receive a `Vec<ScalarValue>` that stores
+ // values received from its ordering requirement expression. (This information is necessary for during merging).
+ let agg_orderings = &states[1];
+ if agg_orderings.as_any().is::<ListArray>() {
+ // Stores ARRAY_AGG results coming from each partition
+ let mut partition_values = vec![];
+ // Stores ordering requirement expression results coming from each partition
+ let mut partition_ordering_values = vec![];
+ for index in 0..agg_orderings.len() {
+ let ordering = ScalarValue::try_from_array(agg_orderings, index)?;
+ // Ordering requirement expression values for each entry in the ARRAY_AGG list
+ let other_ordering_values =
+ self.convert_array_agg_to_orderings(ordering)?;
+ // ARRAY_AGG result. (It is a `ScalarValue::List` under the hood, it stores `Vec<ScalarValue>`)
+ let array_agg_res = ScalarValue::try_from_array(array_agg_values, index)?;
+ if let ScalarValue::List(Some(other_values), _) = array_agg_res {
+ partition_values.push(other_values);
+ partition_ordering_values.push(other_ordering_values);
+ } else {
+ return Err(DataFusionError::Internal(
+ "ARRAY_AGG state must be list!".into(),
+ ));
+ }
+ }
+ let sort_options = self
+ .ordering_req
+ .iter()
+ .map(|sort_expr| sort_expr.options)
+ .collect::<Vec<_>>();
+ self.values = merge_ordered_arrays(
+ &partition_values,
+ &partition_ordering_values,
+ &sort_options,
+ )?;
+ } else {
+ return Err(DataFusionError::Execution(
+ "Expects to receive a list array".to_string(),
+ ));
+ }
+ Ok(())
+ }
+
+ fn state(&self) -> Result<Vec<ScalarValue>> {
+ let mut result = vec![self.evaluate()?];
+ result.push(self.evaluate_orderings()?);
+ let last_ordering = if let Some(ordering) = self.ordering_values.last() {
+ ordering.clone()
+ } else {
+ // In case ordering is empty, construct ordering as NULL:
+ self.datatypes
+ .iter()
+ .skip(1)
+ .map(ScalarValue::try_from)
+ .collect::<Result<Vec<_>>>()?
+ };
+ result.extend(last_ordering);
+ Ok(result)
+ }
+
+ fn evaluate(&self) -> Result<ScalarValue> {
+ Ok(ScalarValue::new_list(
+ Some(self.values.clone()),
+ self.datatypes[0].clone(),
+ ))
+ }
+
+ fn size(&self) -> usize {
+ let mut total = std::mem::size_of_val(self)
+ + ScalarValue::size_of_vec(&self.values)
+ - std::mem::size_of_val(&self.values);
+
+ // Add size of the `self.ordering_values`
+ total +=
+ std::mem::size_of::<Vec<ScalarValue>>() * self.ordering_values.capacity();
+ for row in &self.ordering_values {
+ total += ScalarValue::size_of_vec(row) - std::mem::size_of_val(row);
+ }
+
+ // Add size of the `self.datatypes`
+ total += std::mem::size_of::<DataType>() * self.datatypes.capacity();
+ for dtype in &self.datatypes {
+ total += dtype.size() - std::mem::size_of_val(dtype);
+ }
+
+ // Add size of the `self.ordering_req`
+ total += std::mem::size_of::<PhysicalSortExpr>() * self.ordering_req.capacity();
+ // TODO: Calculate size of each `PhysicalSortExpr` more accurately.
+ total
+ }
+}
+
+impl OrderSensitiveArrayAggAccumulator {
+ fn convert_array_agg_to_orderings(
+ &self,
+ in_data: ScalarValue,
+ ) -> Result<Vec<Vec<ScalarValue>>> {
+ if let ScalarValue::List(Some(list_vals), _field_ref) = in_data {
+ list_vals.into_iter().map(|struct_vals| {
+ if let ScalarValue::Struct(Some(orderings), _fields) = struct_vals {
+ Ok(orderings)
+ } else {
+ Err(DataFusionError::Execution(format!(
+ "Expects to receive ScalarValue::Struct(Some(..), _) but got:{:?}",
+ struct_vals.get_datatype()
+ )))
+ }
+ }).collect::<Result<Vec<_>>>()
+ } else {
+ Err(DataFusionError::Execution(format!(
+ "Expects to receive ScalarValue::List(Some(..), _) but got:{:?}",
+ in_data.get_datatype()
+ )))
+ }
+ }
+
+ fn evaluate_orderings(&self) -> Result<ScalarValue> {
+ let fields = ordering_fields(&self.ordering_req, &self.datatypes[1..]);
+ let struct_field = Fields::from(fields.clone());
+ let orderings = self
+ .ordering_values
+ .iter()
+ .map(|ordering| {
+ ScalarValue::Struct(Some(ordering.clone()), struct_field.clone())
+ })
+ .collect();
+ let struct_type = DataType::Struct(Fields::from(fields));
+ Ok(ScalarValue::new_list(Some(orderings), struct_type))
+ }
+}
+
+/// This is a wrapper struct to be able to correctly merge ARRAY_AGG
+/// data from multiple partitions using `BinaryHeap`.
+/// When used inside `BinaryHeap` this struct returns smallest `CustomElement`,
+/// where smallest is determined by `ordering` values (`Vec<ScalarValue>`)
+/// according to `sort_options`
+#[derive(Debug, PartialEq, Eq)]
+struct CustomElement<'a> {
+ // Stores from which partition entry is received
+ branch_idx: usize,
+ // values to be merged
+ value: ScalarValue,
+ // according to `ordering` values, comparisons will be done.
+ ordering: Vec<ScalarValue>,
+ // `sort_options` defines, desired ordering by the user
+ sort_options: &'a [SortOptions],
+}
+
+impl<'a> CustomElement<'a> {
+ fn new(
+ branch_idx: usize,
+ value: ScalarValue,
+ ordering: Vec<ScalarValue>,
+ sort_options: &'a [SortOptions],
+ ) -> Self {
+ Self {
+ branch_idx,
+ value,
+ ordering,
+ sort_options,
+ }
+ }
+
+ fn ordering(
+ &self,
+ current: &[ScalarValue],
+ target: &[ScalarValue],
+ ) -> Result<Ordering> {
+ // Calculate ordering according to `sort_options`
+ compare_rows(current, target, self.sort_options)
+ }
+}
+
+// Overwrite ordering implementation such that
+// - `self.ordering` values are used for comparison,
+// - When used inside `BinaryHeap` it is a min-heap.
+impl<'a> Ord for CustomElement<'a> {
+ fn cmp(&self, other: &Self) -> Ordering {
+ // Compares according to custom ordering
+ self.ordering(&self.ordering, &other.ordering)
+ // Convert max heap to min heap
+ .map(|ordering| ordering.reverse())
+ // This function return error, when `self.ordering` and `other.ordering`
+ // have different types (such as one is `ScalarValue::Int64`, other is `ScalarValue::Float32`)
+ // Here this case won't happen, because data from each partition will have same type
+ .unwrap()
+ }
+}
+
+impl<'a> PartialOrd for CustomElement<'a> {
+ fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+ Some(self.cmp(other))
+ }
+}
+
+/// This functions merges `values` array (`&[Vec<ScalarValue>]`) into single array `Vec<ScalarValue>`
+/// Merging done according to ordering values stored inside `ordering_values` (`&[Vec<Vec<ScalarValue>>]`)
+/// Inner `Vec<ScalarValue>` in the `ordering_values` can be thought as ordering information for the
+/// each `ScalarValue` in the `values` array.
+/// Desired ordering specified by `sort_options` argument (Should have same size with inner `Vec<ScalarValue>`
+/// of the `ordering_values` array).
+///
+/// As an example
+/// values can be \[
+/// \[1, 2, 3, 4, 5\],
+/// \[1, 2, 3, 4\],
+/// \[1, 2, 3, 4, 5, 6\],
+/// \]
+/// In this case we will be merging three arrays (doesn't have to be same size)
+/// and produce a merged array with size 15 (sum of 5+4+6)
+/// Merging will be done according to ordering at `ordering_values` vector.
+/// As an example `ordering_values` can be [
+/// \[(1, a), (2, b), (3, b), (4, a), (5, b) \],
+/// \[(1, a), (2, b), (3, b), (4, a) \],
+/// \[(1, b), (2, c), (3, d), (4, e), (5, a), (6, b) \],
+/// ]
+/// For each ScalarValue in the `values` we have a corresponding `Vec<ScalarValue>` (like timestamp of it)
+/// for the example above `sort_options` will have size two, that defines ordering requirement of the merge.
+/// Inner `Vec<ScalarValue>`s of the `ordering_values` will be compared according `sort_options` (Their sizes should match)
+fn merge_ordered_arrays(
+ // We will merge values into single `Vec<ScalarValue>`.
+ values: &[Vec<ScalarValue>],
+ // `values` will be merged according to `ordering_values`.
+ // Inner `Vec<ScalarValue>` can be thought as ordering information for the
+ // each `ScalarValue` in the values`.
+ ordering_values: &[Vec<Vec<ScalarValue>>],
+ // Defines according to which ordering comparisons should be done.
+ sort_options: &[SortOptions],
+) -> Result<Vec<ScalarValue>> {
+ // Keep track the most recent data of each branch, in binary heap data structure.
+ let mut heap: BinaryHeap<CustomElement> = BinaryHeap::new();
+
+ if !(values.len() == ordering_values.len()
+ && values
+ .iter()
+ .zip(ordering_values.iter())
+ .all(|(vals, ordering_vals)| vals.len() == ordering_vals.len()))
+ {
+ return Err(DataFusionError::Execution(
+ "Expects values arguments and/or ordering_values arguments to have same size"
+ .to_string(),
+ ));
+ }
+ let n_branch = values.len();
+ // For each branch we keep track of indices of next will be merged entry
+ let mut indices = vec![0_usize; n_branch];
+ // Keep track of sizes of each branch.
+ let end_indices = (0..n_branch)
+ .map(|idx| values[idx].len())
+ .collect::<Vec<_>>();
+ let mut merged_values = vec![];
+ // Continue iterating the loop until consuming data of all branches.
+ loop {
+ let min_elem = if let Some(min_elem) = heap.pop() {
+ min_elem
+ } else {
+ // Heap is empty, fill it with the next entries from each branch.
+ for (idx, end_idx, ordering, branch_index) in izip!(
+ indices.iter(),
+ end_indices.iter(),
+ ordering_values.iter(),
+ 0..n_branch
+ ) {
+ // We consumed this branch, skip it
+ if idx == end_idx {
+ continue;
+ }
+
+ // Push the next element to the heap.
+ let elem = CustomElement::new(
+ branch_index,
+ values[branch_index][*idx].clone(),
+ ordering[*idx].to_vec(),
+ sort_options,
+ );
+ heap.push(elem);
+ }
+ // Now we have filled the heap, get the largest entry (this will be the next element in merge)
+ if let Some(min_elem) = heap.pop() {
+ min_elem
+ } else {
+ // Heap is empty, this means that all indices are same with end_indices. e.g
+ // We have consumed all of the branches. Merging is completed
+ // Exit from the loop
+ break;
+ }
+ };
+ let branch_idx = min_elem.branch_idx;
+ // Increment the index of merged branch,
+ indices[branch_idx] += 1;
+ let row_idx = indices[branch_idx];
+ merged_values.push(min_elem.value.clone());
+ if row_idx < end_indices[branch_idx] {
+ // Push next entry in the most recently consumed branch to the heap
+ // If there is an available entry
+ let value = values[branch_idx][row_idx].clone();
+ let ordering_row = ordering_values[branch_idx][row_idx].to_vec();
+ let elem = CustomElement::new(branch_idx, value, ordering_row, sort_options);
+ heap.push(elem);
+ }
+ }
+
+ Ok(merged_values)
+}
+
+#[cfg(test)]
+mod tests {
+ use crate::aggregate::array_agg_ordered::merge_ordered_arrays;
+ use arrow_array::{Array, ArrayRef, Int64Array};
+ use arrow_schema::SortOptions;
+ use datafusion_common::utils::get_row_at_idx;
+ use datafusion_common::{Result, ScalarValue};
+ use std::sync::Arc;
+
+ #[test]
+ fn test_merge_asc() -> Result<()> {
+ let lhs_arrays: Vec<ArrayRef> = vec![
+ Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2])),
+ Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])),
+ ];
+ let n_row = lhs_arrays[0].len();
+ let lhs_orderings = (0..n_row)
+ .map(|idx| get_row_at_idx(&lhs_arrays, idx))
+ .collect::<Result<Vec<_>>>()?;
+
+ let rhs_arrays: Vec<ArrayRef> = vec![
+ Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2])),
+ Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])),
+ ];
+ let n_row = rhs_arrays[0].len();
+ let rhs_orderings = (0..n_row)
+ .map(|idx| get_row_at_idx(&rhs_arrays, idx))
+ .collect::<Result<Vec<_>>>()?;
+ let sort_options = vec![
+ SortOptions {
+ descending: false,
+ nulls_first: false,
+ },
+ SortOptions {
+ descending: false,
+ nulls_first: false,
+ },
+ ];
+
+ let lhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])) as ArrayRef;
+ let lhs_vals = (0..lhs_vals_arr.len())
+ .map(|idx| ScalarValue::try_from_array(&lhs_vals_arr, idx))
+ .collect::<Result<Vec<_>>>()?;
+
+ let rhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])) as ArrayRef;
+ let rhs_vals = (0..rhs_vals_arr.len())
+ .map(|idx| ScalarValue::try_from_array(&rhs_vals_arr, idx))
+ .collect::<Result<Vec<_>>>()?;
+ let expected =
+ Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2, 2, 3, 3, 4, 4])) as ArrayRef;
+
+ let merged_vals = merge_ordered_arrays(
+ &[lhs_vals, rhs_vals],
+ &[lhs_orderings, rhs_orderings],
+ &sort_options,
+ )?;
+ let merged_vals = ScalarValue::iter_to_array(merged_vals.into_iter())?;
+ assert_eq!(&merged_vals, &expected);
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_merge_desc() -> Result<()> {
+ let lhs_arrays: Vec<ArrayRef> = vec![
+ Arc::new(Int64Array::from(vec![2, 1, 1, 0, 0])),
+ Arc::new(Int64Array::from(vec![4, 3, 2, 1, 0])),
+ ];
+ let n_row = lhs_arrays[0].len();
+ let lhs_orderings = (0..n_row)
+ .map(|idx| get_row_at_idx(&lhs_arrays, idx))
+ .collect::<Result<Vec<_>>>()?;
+
+ let rhs_arrays: Vec<ArrayRef> = vec![
+ Arc::new(Int64Array::from(vec![2, 1, 1, 0, 0])),
+ Arc::new(Int64Array::from(vec![4, 3, 2, 1, 0])),
+ ];
+ let n_row = rhs_arrays[0].len();
+ let rhs_orderings = (0..n_row)
+ .map(|idx| get_row_at_idx(&rhs_arrays, idx))
+ .collect::<Result<Vec<_>>>()?;
+ let sort_options = vec![
+ SortOptions {
+ descending: true,
+ nulls_first: false,
+ },
+ SortOptions {
+ descending: true,
+ nulls_first: false,
+ },
+ ];
+
+ // Values (which will be merged) doesn't have to be ordered.
+ let lhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 1, 2])) as ArrayRef;
+ let lhs_vals = (0..lhs_vals_arr.len())
+ .map(|idx| ScalarValue::try_from_array(&lhs_vals_arr, idx))
+ .collect::<Result<Vec<_>>>()?;
+
+ let rhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 1, 2])) as ArrayRef;
+ let rhs_vals = (0..rhs_vals_arr.len())
+ .map(|idx| ScalarValue::try_from_array(&rhs_vals_arr, idx))
+ .collect::<Result<Vec<_>>>()?;
+ let expected =
+ Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2, 2, 1, 1, 2, 2])) as ArrayRef;
+
+ let merged_vals = merge_ordered_arrays(
+ &[lhs_vals, rhs_vals],
+ &[lhs_orderings, rhs_orderings],
+ &sort_options,
+ )?;
+ let merged_vals = ScalarValue::iter_to_array(merged_vals.into_iter())?;
+
+ assert_eq!(&merged_vals, &expected);
+ Ok(())
+ }
+}
diff --git a/datafusion/physical-expr/src/aggregate/build_in.rs b/datafusion/physical-expr/src/aggregate/build_in.rs
index 71ddf91315..3f778b6f17 100644
--- a/datafusion/physical-expr/src/aggregate/build_in.rs
+++ b/datafusion/physical-expr/src/aggregate/build_in.rs
@@ -26,7 +26,7 @@
//! * Signature: see `Signature`
//! * Return type: a function `(arg_types) -> return_type`. E.g. for min, ([f32]) -> f32, ([f64]) -> f64.
-use crate::{expressions, AggregateExpr, PhysicalExpr};
+use crate::{expressions, AggregateExpr, PhysicalExpr, PhysicalSortExpr};
use arrow::datatypes::Schema;
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::aggregate_function::{return_type, sum_type_of_avg};
@@ -39,6 +39,7 @@ pub fn create_aggregate_expr(
fun: &AggregateFunction,
distinct: bool,
input_phy_exprs: &[Arc<dyn PhysicalExpr>],
+ ordering_req: &[PhysicalSortExpr],
input_schema: &Schema,
name: impl Into<String>,
) -> Result<Arc<dyn AggregateExpr>> {
@@ -49,8 +50,11 @@ pub fn create_aggregate_expr(
.map(|e| e.data_type(input_schema))
.collect::<Result<Vec<_>>>()?;
let rt_type = return_type(fun, &input_phy_types)?;
+ let ordering_types = ordering_req
+ .iter()
+ .map(|e| e.expr.data_type(input_schema))
+ .collect::<Result<Vec<_>>>()?;
let input_phy_exprs = input_phy_exprs.to_vec();
-
Ok(match (fun, distinct) {
(AggregateFunction::Count, false) => Arc::new(
expressions::Count::new_with_multiple_exprs(input_phy_exprs, name, rt_type),
@@ -116,12 +120,27 @@ pub fn create_aggregate_expr(
input_phy_types[0].clone(),
))
}
- (AggregateFunction::ArrayAgg, false) => Arc::new(expressions::ArrayAgg::new(
- input_phy_exprs[0].clone(),
- name,
- input_phy_types[0].clone(),
- )),
+ (AggregateFunction::ArrayAgg, false) => {
+ let expr = input_phy_exprs[0].clone();
+ let data_type = input_phy_types[0].clone();
+ if ordering_req.is_empty() {
+ Arc::new(expressions::ArrayAgg::new(expr, name, data_type))
+ } else {
+ Arc::new(expressions::OrderSensitiveArrayAgg::new(
+ expr,
+ name,
+ data_type,
+ ordering_types,
+ ordering_req.to_vec(),
+ ))
+ }
+ }
(AggregateFunction::ArrayAgg, true) => {
+ if !ordering_req.is_empty() {
+ return Err(DataFusionError::NotImplemented(
+ "ARRAY_AGG(DISTINCT ORDER BY a ASC) order-sensitive aggregations are not available".to_string(),
+ ));
+ }
Arc::new(expressions::DistinctArrayAgg::new(
input_phy_exprs[0].clone(),
name,
@@ -292,11 +311,15 @@ pub fn create_aggregate_expr(
input_phy_exprs[0].clone(),
name,
input_phy_types[0].clone(),
+ ordering_req.to_vec(),
+ ordering_types,
)),
(AggregateFunction::LastValue, _) => Arc::new(expressions::LastValue::new(
input_phy_exprs[0].clone(),
name,
input_phy_types[0].clone(),
+ ordering_req.to_vec(),
+ ordering_types,
)),
})
}
@@ -1200,7 +1223,7 @@ mod tests {
"Invalid or wrong number of arguments passed to aggregate: '{name}'",
)));
}
- create_aggregate_expr(fun, distinct, &coerced_phy_exprs, input_schema, name)
+ create_aggregate_expr(fun, distinct, &coerced_phy_exprs, &[], input_schema, name)
}
// Returns the coerced exprs for each `input_exprs`.
diff --git a/datafusion/physical-expr/src/aggregate/first_last.rs b/datafusion/physical-expr/src/aggregate/first_last.rs
index a350637c48..f322419a7b 100644
--- a/datafusion/physical-expr/src/aggregate/first_last.rs
+++ b/datafusion/physical-expr/src/aggregate/first_last.rs
@@ -17,9 +17,9 @@
//! Defines the FIRST_VALUE/LAST_VALUE aggregations.
-use crate::aggregate::utils::down_cast_any_ref;
+use crate::aggregate::utils::{down_cast_any_ref, ordering_fields};
use crate::expressions::format_state_name;
-use crate::{AggregateExpr, PhysicalExpr};
+use crate::{AggregateExpr, LexOrdering, PhysicalExpr, PhysicalSortExpr};
use arrow::array::ArrayRef;
use arrow::datatypes::{DataType, Field};
@@ -27,6 +27,9 @@ use arrow_array::Array;
use datafusion_common::{Result, ScalarValue};
use datafusion_expr::Accumulator;
+use arrow::compute;
+use arrow_array::cast::AsArray;
+use datafusion_common::utils::get_row_at_idx;
use std::any::Any;
use std::sync::Arc;
@@ -34,8 +37,10 @@ use std::sync::Arc;
#[derive(Debug)]
pub struct FirstValue {
name: String,
- pub data_type: DataType,
+ input_data_type: DataType,
+ order_by_data_types: Vec<DataType>,
expr: Arc<dyn PhysicalExpr>,
+ ordering_req: LexOrdering,
}
impl FirstValue {
@@ -43,12 +48,16 @@ impl FirstValue {
pub fn new(
expr: Arc<dyn PhysicalExpr>,
name: impl Into<String>,
- data_type: DataType,
+ input_data_type: DataType,
+ ordering_req: LexOrdering,
+ order_by_data_types: Vec<DataType>,
) -> Self {
Self {
name: name.into(),
- data_type,
+ input_data_type,
+ order_by_data_types,
expr,
+ ordering_req,
}
}
}
@@ -60,25 +69,46 @@ impl AggregateExpr for FirstValue {
}
fn field(&self) -> Result<Field> {
- Ok(Field::new(&self.name, self.data_type.clone(), true))
+ Ok(Field::new(&self.name, self.input_data_type.clone(), true))
}
fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
- Ok(Box::new(FirstValueAccumulator::try_new(&self.data_type)?))
+ Ok(Box::new(FirstValueAccumulator::try_new(
+ &self.input_data_type,
+ &self.order_by_data_types,
+ )?))
}
fn state_fields(&self) -> Result<Vec<Field>> {
- Ok(vec![Field::new(
+ let mut fields = vec![Field::new(
format_state_name(&self.name, "first_value"),
- self.data_type.clone(),
+ self.input_data_type.clone(),
true,
- )])
+ )];
+ fields.extend(ordering_fields(
+ &self.ordering_req,
+ &self.order_by_data_types,
+ ));
+ fields.push(Field::new(
+ format_state_name(&self.name, "is_set"),
+ DataType::Boolean,
+ true,
+ ));
+ Ok(fields)
}
fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
vec![self.expr.clone()]
}
+ fn order_bys(&self) -> Option<&[PhysicalSortExpr]> {
+ if self.ordering_req.is_empty() {
+ None
+ } else {
+ Some(&self.ordering_req)
+ }
+ }
+
fn name(&self) -> &str {
&self.name
}
@@ -92,12 +122,17 @@ impl AggregateExpr for FirstValue {
Some(Arc::new(LastValue::new(
self.expr.clone(),
name,
- self.data_type.clone(),
+ self.input_data_type.clone(),
+ self.ordering_req.clone(),
+ self.order_by_data_types.clone(),
)))
}
fn create_sliding_accumulator(&self) -> Result<Box<dyn Accumulator>> {
- Ok(Box::new(FirstValueAccumulator::try_new(&self.data_type)?))
+ Ok(Box::new(FirstValueAccumulator::try_new(
+ &self.input_data_type,
+ &self.order_by_data_types,
+ )?))
}
}
@@ -107,7 +142,8 @@ impl PartialEq<dyn Any> for FirstValue {
.downcast_ref::<Self>()
.map(|x| {
self.name == x.name
- && self.data_type == x.data_type
+ && self.input_data_type == x.input_data_type
+ && self.order_by_data_types == x.order_by_data_types
&& self.expr.eq(&x.expr)
})
.unwrap_or(false)
@@ -117,34 +153,44 @@ impl PartialEq<dyn Any> for FirstValue {
#[derive(Debug)]
struct FirstValueAccumulator {
first: ScalarValue,
- // At the beginning, `is_set` is `false`, this means `first` is not seen yet.
- // Once we see (`is_set=true`) first value, we do not update `first`.
+ // At the beginning, `is_set` is false, which means `first` is not seen yet.
+ // Once we see the first value, we set the `is_set` flag and do not update `first` anymore.
is_set: bool,
+ // Stores ordering values, of the aggregator requirement corresponding to first value
+ // of the aggregator. These values are used during merging of multiple partitions.
+ orderings: Vec<ScalarValue>,
}
impl FirstValueAccumulator {
/// Creates a new `FirstValueAccumulator` for the given `data_type`.
- pub fn try_new(data_type: &DataType) -> Result<Self> {
+ pub fn try_new(data_type: &DataType, ordering_dtypes: &[DataType]) -> Result<Self> {
+ let orderings = ordering_dtypes
+ .iter()
+ .map(ScalarValue::try_from)
+ .collect::<Result<Vec<_>>>()?;
ScalarValue::try_from(data_type).map(|value| Self {
first: value,
is_set: false,
+ orderings,
})
}
}
impl Accumulator for FirstValueAccumulator {
fn state(&self) -> Result<Vec<ScalarValue>> {
- Ok(vec![
- self.first.clone(),
- ScalarValue::Boolean(Some(self.is_set)),
- ])
+ let mut result = vec![self.first.clone()];
+ result.extend(self.orderings.iter().cloned());
+ result.push(ScalarValue::Boolean(Some(self.is_set)));
+ Ok(result)
}
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
// If we have seen first value, we shouldn't update it
- let values = &values[0];
- if !values.is_empty() && !self.is_set {
- self.first = ScalarValue::try_from_array(values, 0)?;
+ if !values[0].is_empty() && !self.is_set {
+ let row = get_row_at_idx(values, 0)?;
+ // Update with last value in the array.
+ self.first = row[0].clone();
+ self.orderings = row[1..].to_vec();
self.is_set = true;
}
Ok(())
@@ -152,7 +198,14 @@ impl Accumulator for FirstValueAccumulator {
fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
// FIRST_VALUE(first1, first2, first3, ...)
- self.update_batch(states)
+ let last_idx = states.len() - 1;
+ let is_set_flags = &states[last_idx];
+ let flags = is_set_flags.as_boolean();
+ let mut filtered_first_vals = vec![];
+ for state in states.iter().take(last_idx - 1) {
+ filtered_first_vals.push(compute::filter(state, flags)?)
+ }
+ self.update_batch(&filtered_first_vals)
}
fn evaluate(&self) -> Result<ScalarValue> {
@@ -162,6 +215,8 @@ impl Accumulator for FirstValueAccumulator {
fn size(&self) -> usize {
std::mem::size_of_val(self) - std::mem::size_of_val(&self.first)
+ self.first.size()
+ + ScalarValue::size_of_vec(&self.orderings)
+ - std::mem::size_of_val(&self.orderings)
}
}
@@ -169,8 +224,10 @@ impl Accumulator for FirstValueAccumulator {
#[derive(Debug)]
pub struct LastValue {
name: String,
- pub data_type: DataType,
+ input_data_type: DataType,
+ order_by_data_types: Vec<DataType>,
expr: Arc<dyn PhysicalExpr>,
+ ordering_req: LexOrdering,
}
impl LastValue {
@@ -178,12 +235,16 @@ impl LastValue {
pub fn new(
expr: Arc<dyn PhysicalExpr>,
name: impl Into<String>,
- data_type: DataType,
+ input_data_type: DataType,
+ ordering_req: LexOrdering,
+ order_by_data_types: Vec<DataType>,
) -> Self {
Self {
name: name.into(),
- data_type,
+ input_data_type,
+ order_by_data_types,
expr,
+ ordering_req,
}
}
}
@@ -195,25 +256,46 @@ impl AggregateExpr for LastValue {
}
fn field(&self) -> Result<Field> {
- Ok(Field::new(&self.name, self.data_type.clone(), true))
+ Ok(Field::new(&self.name, self.input_data_type.clone(), true))
}
fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
- Ok(Box::new(LastValueAccumulator::try_new(&self.data_type)?))
+ Ok(Box::new(LastValueAccumulator::try_new(
+ &self.input_data_type,
+ &self.order_by_data_types,
+ )?))
}
fn state_fields(&self) -> Result<Vec<Field>> {
- Ok(vec![Field::new(
+ let mut fields = vec![Field::new(
format_state_name(&self.name, "last_value"),
- self.data_type.clone(),
+ self.input_data_type.clone(),
+ true,
+ )];
+ fields.extend(ordering_fields(
+ &self.ordering_req,
+ &self.order_by_data_types,
+ ));
+ fields.push(Field::new(
+ format_state_name(&self.name, "is_set"),
+ DataType::Boolean,
true,
- )])
+ ));
+ Ok(fields)
}
fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
vec![self.expr.clone()]
}
+ fn order_bys(&self) -> Option<&[PhysicalSortExpr]> {
+ if self.ordering_req.is_empty() {
+ None
+ } else {
+ Some(&self.ordering_req)
+ }
+ }
+
fn name(&self) -> &str {
&self.name
}
@@ -227,12 +309,17 @@ impl AggregateExpr for LastValue {
Some(Arc::new(FirstValue::new(
self.expr.clone(),
name,
- self.data_type.clone(),
+ self.input_data_type.clone(),
+ self.ordering_req.clone(),
+ self.order_by_data_types.clone(),
)))
}
fn create_sliding_accumulator(&self) -> Result<Box<dyn Accumulator>> {
- Ok(Box::new(LastValueAccumulator::try_new(&self.data_type)?))
+ Ok(Box::new(LastValueAccumulator::try_new(
+ &self.input_data_type,
+ &self.order_by_data_types,
+ )?))
}
}
@@ -242,7 +329,8 @@ impl PartialEq<dyn Any> for LastValue {
.downcast_ref::<Self>()
.map(|x| {
self.name == x.name
- && self.data_type == x.data_type
+ && self.input_data_type == x.input_data_type
+ && self.order_by_data_types == x.order_by_data_types
&& self.expr.eq(&x.expr)
})
.unwrap_or(false)
@@ -252,34 +340,57 @@ impl PartialEq<dyn Any> for LastValue {
#[derive(Debug)]
struct LastValueAccumulator {
last: ScalarValue,
+ // The `is_set` flag keeps track of whether the last value is finalized.
+ // This information is used to discriminate genuine NULLs and NULLS that
+ // occur due to empty partitions.
+ is_set: bool,
+ orderings: Vec<ScalarValue>,
}
impl LastValueAccumulator {
/// Creates a new `LastValueAccumulator` for the given `data_type`.
- pub fn try_new(data_type: &DataType) -> Result<Self> {
+ pub fn try_new(data_type: &DataType, ordering_dtypes: &[DataType]) -> Result<Self> {
+ let orderings = ordering_dtypes
+ .iter()
+ .map(ScalarValue::try_from)
+ .collect::<Result<Vec<_>>>()?;
Ok(Self {
last: ScalarValue::try_from(data_type)?,
+ is_set: false,
+ orderings,
})
}
}
impl Accumulator for LastValueAccumulator {
fn state(&self) -> Result<Vec<ScalarValue>> {
- Ok(vec![self.last.clone()])
+ let mut result = vec![self.last.clone()];
+ result.extend(self.orderings.clone());
+ result.push(ScalarValue::Boolean(Some(self.is_set)));
+ Ok(result)
}
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
- let values = &values[0];
- if !values.is_empty() {
+ if !values[0].is_empty() {
+ let row = get_row_at_idx(values, values[0].len() - 1)?;
// Update with last value in the array.
- self.last = ScalarValue::try_from_array(values, values.len() - 1)?;
+ self.last = row[0].clone();
+ self.orderings = row[1..].to_vec();
+ self.is_set = true;
}
Ok(())
}
fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
// LAST_VALUE(last1, last2, last3, ...)
- self.update_batch(states)
+ let last_idx = states.len() - 1;
+ let is_set_flags = &states[last_idx];
+ let flags = is_set_flags.as_boolean();
+ let mut filtered_first_vals = vec![];
+ for state in states.iter().take(last_idx - 1) {
+ filtered_first_vals.push(compute::filter(state, flags)?)
+ }
+ self.update_batch(&filtered_first_vals)
}
fn evaluate(&self) -> Result<ScalarValue> {
@@ -287,7 +398,10 @@ impl Accumulator for LastValueAccumulator {
}
fn size(&self) -> usize {
- std::mem::size_of_val(self) - std::mem::size_of_val(&self.last) + self.last.size()
+ std::mem::size_of_val(self) - std::mem::size_of_val(&self.last)
+ + self.last.size()
+ + ScalarValue::size_of_vec(&self.orderings)
+ - std::mem::size_of_val(&self.orderings)
}
}
@@ -302,8 +416,9 @@ mod tests {
#[test]
fn test_first_last_value_value() -> Result<()> {
- let mut first_accumulator = FirstValueAccumulator::try_new(&DataType::Int64)?;
- let mut last_accumulator = LastValueAccumulator::try_new(&DataType::Int64)?;
+ let mut first_accumulator =
+ FirstValueAccumulator::try_new(&DataType::Int64, &[])?;
+ let mut last_accumulator = LastValueAccumulator::try_new(&DataType::Int64, &[])?;
// first value in the tuple is start of the range (inclusive),
// second value in the tuple is end of the range (exclusive)
let ranges: Vec<(i64, i64)> = vec![(0, 10), (1, 11), (2, 13)];
diff --git a/datafusion/physical-expr/src/aggregate/mod.rs b/datafusion/physical-expr/src/aggregate/mod.rs
index 7d2316c532..9be6d5e1ba 100644
--- a/datafusion/physical-expr/src/aggregate/mod.rs
+++ b/datafusion/physical-expr/src/aggregate/mod.rs
@@ -16,8 +16,8 @@
// under the License.
use crate::aggregate::row_accumulator::RowAccumulator;
-use crate::expressions::{ArrayAgg, FirstValue, LastValue};
-use crate::PhysicalExpr;
+use crate::expressions::{FirstValue, LastValue, OrderSensitiveArrayAgg};
+use crate::{PhysicalExpr, PhysicalSortExpr};
use arrow::datatypes::Field;
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::Accumulator;
@@ -31,6 +31,7 @@ pub(crate) mod approx_percentile_cont;
pub(crate) mod approx_percentile_cont_with_weight;
pub(crate) mod array_agg;
pub(crate) mod array_agg_distinct;
+pub(crate) mod array_agg_ordered;
pub(crate) mod average;
pub(crate) mod bit_and_or_xor;
pub(crate) mod bool_and_or;
@@ -85,6 +86,13 @@ pub trait AggregateExpr: Send + Sync + Debug + PartialEq<dyn Any> {
/// Single-column aggregations such as `sum` return a single value, others (e.g. `cov`) return many.
fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>>;
+ /// Order by requirements for the aggregate function
+ /// By default it is `None` (there is no requirement)
+ /// Order-sensitive aggregators, such as `FIRST_VALUE(x ORDER BY y)` should implement this
+ fn order_bys(&self) -> Option<&[PhysicalSortExpr]> {
+ None
+ }
+
/// Human readable name such as `"MIN(c2)"`. The default
/// implementation returns placeholder text.
fn name(&self) -> &str {
@@ -133,5 +141,5 @@ pub trait AggregateExpr: Send + Sync + Debug + PartialEq<dyn Any> {
pub fn is_order_sensitive(aggr_expr: &Arc<dyn AggregateExpr>) -> bool {
aggr_expr.as_any().is::<FirstValue>()
|| aggr_expr.as_any().is::<LastValue>()
- || aggr_expr.as_any().is::<ArrayAgg>()
+ || aggr_expr.as_any().is::<OrderSensitiveArrayAgg>()
}
diff --git a/datafusion/physical-expr/src/aggregate/utils.rs b/datafusion/physical-expr/src/aggregate/utils.rs
index aa0834a89c..f6f0086919 100644
--- a/datafusion/physical-expr/src/aggregate/utils.rs
+++ b/datafusion/physical-expr/src/aggregate/utils.rs
@@ -17,10 +17,10 @@
//! Utilities used in aggregates
-use crate::AggregateExpr;
+use crate::{AggregateExpr, PhysicalSortExpr};
use arrow::array::ArrayRef;
use arrow::datatypes::{MAX_DECIMAL_FOR_EACH_PRECISION, MIN_DECIMAL_FOR_EACH_PRECISION};
-use arrow_schema::DataType;
+use arrow_schema::{DataType, Field};
use datafusion_common::{DataFusionError, Result, ScalarValue};
use datafusion_expr::Accumulator;
use std::any::Any;
@@ -103,3 +103,23 @@ pub fn down_cast_any_ref(any: &dyn Any) -> &dyn Any {
any
}
}
+
+/// Construct corresponding fields for lexicographical ordering requirement expression
+pub(crate) fn ordering_fields(
+ ordering_req: &[PhysicalSortExpr],
+ // Data type of each expression in the ordering requirement
+ data_types: &[DataType],
+) -> Vec<Field> {
+ ordering_req
+ .iter()
+ .zip(data_types.iter())
+ .map(|(expr, dtype)| {
+ Field::new(
+ expr.to_string().as_str(),
+ dtype.clone(),
+ // Multi partitions may be empty hence field should be nullable.
+ true,
+ )
+ })
+ .collect()
+}
diff --git a/datafusion/physical-expr/src/expressions/mod.rs b/datafusion/physical-expr/src/expressions/mod.rs
index 0ca132aefd..c660cfadcc 100644
--- a/datafusion/physical-expr/src/expressions/mod.rs
+++ b/datafusion/physical-expr/src/expressions/mod.rs
@@ -46,6 +46,7 @@ pub use crate::aggregate::approx_percentile_cont::ApproxPercentileCont;
pub use crate::aggregate::approx_percentile_cont_with_weight::ApproxPercentileContWithWeight;
pub use crate::aggregate::array_agg::ArrayAgg;
pub use crate::aggregate::array_agg_distinct::DistinctArrayAgg;
+pub use crate::aggregate::array_agg_ordered::OrderSensitiveArrayAgg;
pub use crate::aggregate::average::{Avg, AvgAccumulator};
pub use crate::aggregate::bit_and_or_xor::{BitAnd, BitOr, BitXor, DistinctBitXor};
pub use crate::aggregate::bool_and_or::{BoolAnd, BoolOr};
diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto
index 9b05dea712..7fddd31a6b 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -1107,6 +1107,7 @@ message PhysicalAggregateExprNode {
string user_defined_aggr_function = 4;
}
repeated PhysicalExprNode expr = 2;
+ repeated PhysicalSortExprNode ordering_req = 5;
bool distinct = 3;
}
diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs
index 890fe7221a..4c1bab5e39 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -13080,6 +13080,9 @@ impl serde::Serialize for PhysicalAggregateExprNode {
if !self.expr.is_empty() {
len += 1;
}
+ if !self.ordering_req.is_empty() {
+ len += 1;
+ }
if self.distinct {
len += 1;
}
@@ -13090,6 +13093,9 @@ impl serde::Serialize for PhysicalAggregateExprNode {
if !self.expr.is_empty() {
struct_ser.serialize_field("expr", &self.expr)?;
}
+ if !self.ordering_req.is_empty() {
+ struct_ser.serialize_field("orderingReq", &self.ordering_req)?;
+ }
if self.distinct {
struct_ser.serialize_field("distinct", &self.distinct)?;
}
@@ -13116,6 +13122,8 @@ impl<'de> serde::Deserialize<'de> for PhysicalAggregateExprNode {
{
const FIELDS: &[&str] = &[
"expr",
+ "ordering_req",
+ "orderingReq",
"distinct",
"aggr_function",
"aggrFunction",
@@ -13126,6 +13134,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalAggregateExprNode {
#[allow(clippy::enum_variant_names)]
enum GeneratedField {
Expr,
+ OrderingReq,
Distinct,
AggrFunction,
UserDefinedAggrFunction,
@@ -13151,6 +13160,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalAggregateExprNode {
{
match value {
"expr" => Ok(GeneratedField::Expr),
+ "orderingReq" | "ordering_req" => Ok(GeneratedField::OrderingReq),
"distinct" => Ok(GeneratedField::Distinct),
"aggrFunction" | "aggr_function" => Ok(GeneratedField::AggrFunction),
"userDefinedAggrFunction" | "user_defined_aggr_function" => Ok(GeneratedField::UserDefinedAggrFunction),
@@ -13174,6 +13184,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalAggregateExprNode {
V: serde::de::MapAccess<'de>,
{
let mut expr__ = None;
+ let mut ordering_req__ = None;
let mut distinct__ = None;
let mut aggregate_function__ = None;
while let Some(k) = map.next_key()? {
@@ -13184,6 +13195,12 @@ impl<'de> serde::Deserialize<'de> for PhysicalAggregateExprNode {
}
expr__ = Some(map.next_value()?);
}
+ GeneratedField::OrderingReq => {
+ if ordering_req__.is_some() {
+ return Err(serde::de::Error::duplicate_field("orderingReq"));
+ }
+ ordering_req__ = Some(map.next_value()?);
+ }
GeneratedField::Distinct => {
if distinct__.is_some() {
return Err(serde::de::Error::duplicate_field("distinct"));
@@ -13206,6 +13223,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalAggregateExprNode {
}
Ok(PhysicalAggregateExprNode {
expr: expr__.unwrap_or_default(),
+ ordering_req: ordering_req__.unwrap_or_default(),
distinct: distinct__.unwrap_or_default(),
aggregate_function: aggregate_function__,
})
diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs
index b1ae0058dc..8dfc209477 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -1513,6 +1513,8 @@ pub struct PhysicalScalarUdfNode {
pub struct PhysicalAggregateExprNode {
#[prost(message, repeated, tag = "2")]
pub expr: ::prost::alloc::vec::Vec<PhysicalExprNode>,
+ #[prost(message, repeated, tag = "5")]
+ pub ordering_req: ::prost::alloc::vec::Vec<PhysicalSortExprNode>,
#[prost(bool, tag = "3")]
pub distinct: bool,
#[prost(oneof = "physical_aggregate_expr_node::AggregateFunction", tags = "1, 4")]
diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs
index 904cf08f7b..1daa1c2e4b 100644
--- a/datafusion/proto/src/physical_plan/mod.rs
+++ b/datafusion/proto/src/physical_plan/mod.rs
@@ -441,7 +441,8 @@ impl AsExecutionPlan for PhysicalPlanNode {
ExprType::AggregateExpr(agg_node) => {
let input_phy_expr: Vec<Arc<dyn PhysicalExpr>> = agg_node.expr.iter()
.map(|e| parse_physical_expr(e, registry, &physical_schema).unwrap()).collect();
-
+ let ordering_req: Vec<PhysicalSortExpr> = agg_node.ordering_req.iter()
+ .map(|e| parse_physical_sort_expr(e, registry, &physical_schema).unwrap()).collect();
agg_node.aggregate_function.as_ref().map(|func| {
match func {
AggregateFunction::AggrFunction(i) => {
@@ -458,6 +459,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
&aggr_function.into(),
agg_node.distinct,
input_phy_expr.as_slice(),
+ &ordering_req,
&physical_schema,
name.to_string(),
)
diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs
index 0910ddaad0..aaf3569d16 100644
--- a/datafusion/proto/src/physical_plan/to_proto.rs
+++ b/datafusion/proto/src/physical_plan/to_proto.rs
@@ -67,6 +67,13 @@ impl TryFrom<Arc<dyn AggregateExpr>> for protobuf::PhysicalExprNode {
.map(|e| e.clone().try_into())
.collect::<Result<Vec<_>>>()?;
+ let ordering_req: Vec<protobuf::PhysicalSortExprNode> = a
+ .order_bys()
+ .unwrap_or(&[])
+ .iter()
+ .map(|e| e.clone().try_into())
+ .collect::<Result<Vec<_>>>()?;
+
let mut distinct = false;
let aggr_function = if a.as_any().downcast_ref::<Avg>().is_some() {
Ok(AggregateFunction::Avg.into())
@@ -151,6 +158,10 @@ impl TryFrom<Arc<dyn AggregateExpr>> for protobuf::PhysicalExprNode {
.is_some()
{
Ok(AggregateFunction::ApproxMedian.into())
+ } else if a.as_any().is::<expressions::FirstValue>() {
+ Ok(AggregateFunction::FirstValueAgg.into())
+ } else if a.as_any().is::<expressions::LastValue>() {
+ Ok(AggregateFunction::LastValueAgg.into())
} else {
if let Some(a) = a.as_any().downcast_ref::<AggregateFunctionExpr>() {
return Ok(protobuf::PhysicalExprNode {
@@ -158,6 +169,7 @@ impl TryFrom<Arc<dyn AggregateExpr>> for protobuf::PhysicalExprNode {
protobuf::PhysicalAggregateExprNode {
aggregate_function: Some(physical_aggregate_expr_node::AggregateFunction::UserDefinedAggrFunction(a.fun().name.clone())),
expr: expressions,
+ ordering_req,
distinct,
},
)),
@@ -178,6 +190,7 @@ impl TryFrom<Arc<dyn AggregateExpr>> for protobuf::PhysicalExprNode {
),
),
expr: expressions,
+ ordering_req,
distinct,
},
)),