You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/05/15 16:43:54 UTC
[arrow-datafusion] branch main updated: Add support for ordering sensitive aggregation (#6332)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new d8a92be182 Add support for ordering sensitive aggregation (#6332)
d8a92be182 is described below
commit d8a92be182327f98dffaa5db5f515e113eb3b2bf
Author: Mustafa Akur <10...@users.noreply.github.com>
AuthorDate: Mon May 15 19:43:48 2023 +0300
Add support for ordering sensitive aggregation (#6332)
* Naive test pass
i
* Add new tests and simplifications
* move tests to the .slt file
* update requirement
* update tests
* Add support for partiallyOrdered aggregation sensitive.
* Resolve linter errors
* update comments
* minor changes
* retract changes in generated
* update proto files
* Simplifications
* Make types consistent in schema, and data
* Update todos
* Convert API to vector
* Convert get_finest to handle Vector inputs
* simplifications, update comment
* Minor code simplifications
* Update comment
* Update documents
* fix projection push down failure bug
* Simplifications, Address reviews
* Update comment
* Resolve linter errors
---------
Co-authored-by: Mehmet Ozan Kabak <oz...@gmail.com>
---
.../src/physical_optimizer/aggregate_statistics.rs | 12 ++
.../combine_partial_final_agg.rs | 4 +
.../src/physical_optimizer/dist_enforcement.rs | 11 +-
.../core/src/physical_optimizer/repartition.rs | 2 +
.../src/physical_optimizer/sort_enforcement.rs | 1 +
.../core/src/physical_plan/aggregates/mod.rs | 193 ++++++++++++++++++-
datafusion/core/src/physical_plan/planner.rs | 149 ++++++++++-----
datafusion/core/tests/aggregate_fuzz.rs | 2 +
.../tests/sqllogictests/test_files/groupby.slt | 205 +++++++++++++++++++++
datafusion/expr/src/expr.rs | 60 ++++--
datafusion/expr/src/expr_fn.rs | 12 ++
datafusion/expr/src/tree_node/expr.rs | 37 +++-
datafusion/expr/src/udaf.rs | 1 +
.../optimizer/src/analyzer/count_wildcard_rule.rs | 2 +
datafusion/optimizer/src/analyzer/type_coercion.rs | 20 +-
.../optimizer/src/common_subexpr_eliminate.rs | 1 +
datafusion/optimizer/src/push_down_projection.rs | 1 +
.../optimizer/src/single_distinct_to_groupby.rs | 3 +
datafusion/physical-expr/src/utils.rs | 2 +-
datafusion/proto/proto/datafusion.proto | 7 +
datafusion/proto/src/generated/pbjson.rs | 146 +++++++++++++++
datafusion/proto/src/generated/prost.rs | 12 ++
datafusion/proto/src/logical_plan/from_proto.rs | 16 ++
datafusion/proto/src/logical_plan/mod.rs | 4 +
datafusion/proto/src/logical_plan/to_proto.rs | 22 ++-
datafusion/proto/src/physical_plan/from_proto.rs | 25 +++
datafusion/proto/src/physical_plan/mod.rs | 33 +++-
datafusion/proto/src/physical_plan/to_proto.rs | 30 ++-
datafusion/sql/src/expr/function.rs | 6 +-
datafusion/sql/src/expr/mod.rs | 22 ++-
datafusion/sql/src/utils.rs | 24 ++-
datafusion/substrait/src/logical_plan/consumer.rs | 4 +
datafusion/substrait/src/logical_plan/producer.rs | 3 +-
docs/source/user-guide/sql/aggregate_functions.md | 4 +-
docs/source/user-guide/sql/select.md | 9 +
35 files changed, 977 insertions(+), 108 deletions(-)
diff --git a/datafusion/core/src/physical_optimizer/aggregate_statistics.rs b/datafusion/core/src/physical_optimizer/aggregate_statistics.rs
index 4abe3ce0ed..396e66972f 100644
--- a/datafusion/core/src/physical_optimizer/aggregate_statistics.rs
+++ b/datafusion/core/src/physical_optimizer/aggregate_statistics.rs
@@ -412,6 +412,7 @@ mod tests {
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![None],
+ vec![None],
source,
Arc::clone(&schema),
)?;
@@ -421,6 +422,7 @@ mod tests {
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![None],
+ vec![None],
Arc::new(partial_agg),
Arc::clone(&schema),
)?;
@@ -442,6 +444,7 @@ mod tests {
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![None],
+ vec![None],
source,
Arc::clone(&schema),
)?;
@@ -451,6 +454,7 @@ mod tests {
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![None],
+ vec![None],
Arc::new(partial_agg),
Arc::clone(&schema),
)?;
@@ -471,6 +475,7 @@ mod tests {
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![None],
+ vec![None],
source,
Arc::clone(&schema),
)?;
@@ -483,6 +488,7 @@ mod tests {
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![None],
+ vec![None],
Arc::new(coalesce),
Arc::clone(&schema),
)?;
@@ -503,6 +509,7 @@ mod tests {
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![None],
+ vec![None],
source,
Arc::clone(&schema),
)?;
@@ -515,6 +522,7 @@ mod tests {
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![None],
+ vec![None],
Arc::new(coalesce),
Arc::clone(&schema),
)?;
@@ -546,6 +554,7 @@ mod tests {
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![None],
+ vec![None],
filter,
Arc::clone(&schema),
)?;
@@ -555,6 +564,7 @@ mod tests {
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![None],
+ vec![None],
Arc::new(partial_agg),
Arc::clone(&schema),
)?;
@@ -591,6 +601,7 @@ mod tests {
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![None],
+ vec![None],
filter,
Arc::clone(&schema),
)?;
@@ -600,6 +611,7 @@ mod tests {
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![None],
+ vec![None],
Arc::new(partial_agg),
Arc::clone(&schema),
)?;
diff --git a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
index 59e5fc95ed..5657c62921 100644
--- a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
+++ b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
@@ -73,6 +73,7 @@ impl PhysicalOptimizerRule for CombinePartialFinalAggregate {
group_by: input_group_by,
aggr_expr: input_aggr_expr,
filter_expr: input_filter_expr,
+ order_by_expr: input_order_by_expr,
input_schema,
..
}| {
@@ -95,6 +96,7 @@ impl PhysicalOptimizerRule for CombinePartialFinalAggregate {
input_group_by.clone(),
input_aggr_expr.to_vec(),
input_filter_expr.to_vec(),
+ input_order_by_expr.to_vec(),
partial_input.clone(),
input_schema.clone(),
)
@@ -279,6 +281,7 @@ mod tests {
group_by,
aggr_expr,
vec![],
+ vec![],
input,
schema,
)
@@ -298,6 +301,7 @@ mod tests {
group_by,
aggr_expr,
vec![],
+ vec![],
input,
schema,
)
diff --git a/datafusion/core/src/physical_optimizer/dist_enforcement.rs b/datafusion/core/src/physical_optimizer/dist_enforcement.rs
index e6fd15b7ce..4c30170ace 100644
--- a/datafusion/core/src/physical_optimizer/dist_enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/dist_enforcement.rs
@@ -42,7 +42,7 @@ use datafusion_physical_expr::expressions::NoOp;
use datafusion_physical_expr::utils::map_columns_before_projection;
use datafusion_physical_expr::{
expr_list_eq_strict_order, normalize_expr_with_equivalence_properties, AggregateExpr,
- PhysicalExpr,
+ PhysicalExpr, PhysicalSortExpr,
};
use std::sync::Arc;
@@ -254,6 +254,7 @@ fn adjust_input_keys_ordering(
group_by,
aggr_expr,
filter_expr,
+ order_by_expr,
input,
input_schema,
..
@@ -267,6 +268,7 @@ fn adjust_input_keys_ordering(
group_by,
aggr_expr,
filter_expr,
+ order_by_expr,
input.clone(),
input_schema,
)?),
@@ -367,12 +369,14 @@ where
}
}
+#[allow(clippy::too_many_arguments)]
fn reorder_aggregate_keys(
agg_plan: Arc<dyn ExecutionPlan>,
parent_required: &[Arc<dyn PhysicalExpr>],
group_by: &PhysicalGroupBy,
aggr_expr: &[Arc<dyn AggregateExpr>],
filter_expr: &[Option<Arc<dyn PhysicalExpr>>],
+ order_by_expr: &[Option<Vec<PhysicalSortExpr>>],
agg_input: Arc<dyn ExecutionPlan>,
input_schema: &SchemaRef,
) -> Result<PlanWithKeyRequirements> {
@@ -403,6 +407,7 @@ fn reorder_aggregate_keys(
group_by,
aggr_expr,
filter_expr,
+ order_by_expr,
input,
input_schema,
..
@@ -422,6 +427,7 @@ fn reorder_aggregate_keys(
new_partial_group_by,
aggr_expr.clone(),
filter_expr.clone(),
+ order_by_expr.clone(),
input.clone(),
input_schema.clone(),
)?))
@@ -453,6 +459,7 @@ fn reorder_aggregate_keys(
new_group_by,
aggr_expr.to_vec(),
filter_expr.to_vec(),
+ order_by_expr.to_vec(),
partial_agg,
input_schema.clone(),
)?);
@@ -1104,12 +1111,14 @@ mod tests {
final_grouping,
vec![],
vec![],
+ vec![],
Arc::new(
AggregateExec::try_new(
AggregateMode::Partial,
group_by,
vec![],
vec![],
+ vec![],
input,
schema.clone(),
)
diff --git a/datafusion/core/src/physical_optimizer/repartition.rs b/datafusion/core/src/physical_optimizer/repartition.rs
index 1b52e4e274..8b407ed289 100644
--- a/datafusion/core/src/physical_optimizer/repartition.rs
+++ b/datafusion/core/src/physical_optimizer/repartition.rs
@@ -478,12 +478,14 @@ mod tests {
PhysicalGroupBy::default(),
vec![],
vec![],
+ vec![],
Arc::new(
AggregateExec::try_new(
AggregateMode::Partial,
PhysicalGroupBy::default(),
vec![],
vec![],
+ vec![],
input,
schema.clone(),
)
diff --git a/datafusion/core/src/physical_optimizer/sort_enforcement.rs b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
index 3d9363f34d..f71c79e9fc 100644
--- a/datafusion/core/src/physical_optimizer/sort_enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
@@ -2867,6 +2867,7 @@ mod tests {
PhysicalGroupBy::default(),
vec![],
vec![],
+ vec![],
input,
schema,
)
diff --git a/datafusion/core/src/physical_plan/aggregates/mod.rs b/datafusion/core/src/physical_plan/aggregates/mod.rs
index ffd985513d..247dfc2778 100644
--- a/datafusion/core/src/physical_plan/aggregates/mod.rs
+++ b/datafusion/core/src/physical_plan/aggregates/mod.rs
@@ -41,8 +41,9 @@ use datafusion_physical_expr::{
equivalence::project_equivalence_properties,
expressions::{Avg, CastExpr, Column, Sum},
normalize_out_expr_with_columns_map,
- utils::{convert_to_expr, get_indices_of_matching_exprs},
- AggregateExpr, PhysicalExpr, PhysicalSortExpr,
+ utils::{convert_to_expr, get_indices_of_matching_exprs, ordering_satisfy_concrete},
+ AggregateExpr, OrderingEquivalenceProperties, PhysicalExpr, PhysicalSortExpr,
+ PhysicalSortRequirement,
};
use std::any::Any;
use std::collections::HashMap;
@@ -228,6 +229,8 @@ pub struct AggregateExec {
pub(crate) aggr_expr: Vec<Arc<dyn AggregateExpr>>,
/// FILTER (WHERE clause) expression for each aggregate expression
pub(crate) filter_expr: Vec<Option<Arc<dyn PhysicalExpr>>>,
+ /// (ORDER BY clause) expression for each aggregate expression
+ pub(crate) order_by_expr: Vec<Option<Vec<PhysicalSortExpr>>>,
/// Input plan, could be a partial aggregate or the input to the aggregate
pub(crate) input: Arc<dyn ExecutionPlan>,
/// Schema after the aggregate is applied
@@ -243,6 +246,7 @@ pub struct AggregateExec {
metrics: ExecutionPlanMetricsSet,
/// Stores mode and output ordering information for the `AggregateExec`.
aggregation_ordering: Option<AggregationOrdering>,
+ required_input_ordering: Option<Vec<PhysicalSortRequirement>>,
}
/// Calculates the working mode for `GROUP BY` queries.
@@ -337,6 +341,54 @@ fn output_group_expr_helper(group_by: &PhysicalGroupBy) -> Vec<Arc<dyn PhysicalE
.collect()
}
+/// This function gets the finest ordering requirement among all the aggregation
+/// functions. If requirements are conflicting, (i.e. we can not compute the
+/// aggregations in a single [`AggregateExec`]), the function returns an error.
+fn get_finest_requirement<
+ F: Fn() -> EquivalenceProperties,
+ F2: Fn() -> OrderingEquivalenceProperties,
+>(
+ order_by_expr: &[Option<Vec<PhysicalSortExpr>>],
+ eq_properties: F,
+ ordering_eq_properties: F2,
+) -> Result<Option<Vec<PhysicalSortExpr>>> {
+ let mut result: Option<Vec<PhysicalSortExpr>> = None;
+ for fn_reqs in order_by_expr.iter().flatten() {
+ if let Some(result) = &mut result {
+ if ordering_satisfy_concrete(
+ result,
+ fn_reqs,
+ &eq_properties,
+ &ordering_eq_properties,
+ ) {
+ // Do not update the result as it already satisfies current
+ // function's requirement:
+ continue;
+ }
+ if ordering_satisfy_concrete(
+ fn_reqs,
+ result,
+ &eq_properties,
+ &ordering_eq_properties,
+ ) {
+ // Update result with current function's requirements, as it is
+ // a finer requirement than what we currently have.
+ *result = fn_reqs.clone();
+ continue;
+ }
+ // If neither of the requirements satisfy the other, this means
+ // requirements are conflicting. Currently, we do not support
+ // conflicting requirements.
+ return Err(DataFusionError::NotImplemented(
+ "Conflicting ordering requirements in aggregate functions is not supported".to_string(),
+ ));
+ } else {
+ result = Some(fn_reqs.clone());
+ }
+ }
+ Ok(result)
+}
+
impl AggregateExec {
/// Create a new hash aggregate execution plan
pub fn try_new(
@@ -344,6 +396,7 @@ impl AggregateExec {
group_by: PhysicalGroupBy,
aggr_expr: Vec<Arc<dyn AggregateExpr>>,
filter_expr: Vec<Option<Arc<dyn PhysicalExpr>>>,
+ order_by_expr: Vec<Option<Vec<PhysicalSortExpr>>>,
input: Arc<dyn ExecutionPlan>,
input_schema: SchemaRef,
) -> Result<Self> {
@@ -356,6 +409,19 @@ impl AggregateExec {
)?;
let schema = Arc::new(schema);
+ let mut aggregator_requirement = None;
+ // Ordering requirement makes sense only in Partial and Single modes.
+ // In other modes, all groups are collapsed, therefore their input schema
+ // can not contain expressions in the requirement.
+ if mode == AggregateMode::Partial || mode == AggregateMode::Single {
+ let requirement = get_finest_requirement(
+ &order_by_expr,
+ || input.equivalence_properties(),
+ || input.ordering_equivalence_properties(),
+ )?;
+ aggregator_requirement = requirement
+ .map(|exprs| PhysicalSortRequirement::from_sort_exprs(exprs.iter()));
+ }
// construct a map from the input columns to the output columns of the Aggregation
let mut columns_map: HashMap<Column, Vec<Column>> = HashMap::new();
@@ -369,17 +435,52 @@ impl AggregateExec {
let aggregation_ordering = calc_aggregation_ordering(&input, &group_by);
+ let mut required_input_ordering = None;
+ if let Some(AggregationOrdering {
+ ordering,
+ // If the mode is FullyOrdered or PartiallyOrdered (i.e. we are
+ // running with bounded memory, without breaking pipeline), then
+ // we append aggregator ordering requirement to the existing
+ // ordering. This way, we can still run with bounded memory.
+ mode: GroupByOrderMode::FullyOrdered | GroupByOrderMode::PartiallyOrdered,
+ ..
+ }) = &aggregation_ordering
+ {
+ if let Some(aggregator_requirement) = aggregator_requirement {
+ // Get the section of the input ordering that enables us to run in the
+ // FullyOrdered or PartiallyOrdered mode:
+ let requirement_prefix =
+ if let Some(existing_ordering) = input.output_ordering() {
+ existing_ordering[0..ordering.len()].to_vec()
+ } else {
+ vec![]
+ };
+ let mut requirement =
+ PhysicalSortRequirement::from_sort_exprs(requirement_prefix.iter());
+ for req in aggregator_requirement {
+ if requirement.iter().all(|item| req.expr.ne(&item.expr)) {
+ requirement.push(req);
+ }
+ }
+ required_input_ordering = Some(requirement);
+ }
+ } else {
+ required_input_ordering = aggregator_requirement;
+ }
+
Ok(AggregateExec {
mode,
group_by,
aggr_expr,
filter_expr,
+ order_by_expr,
input,
schema,
input_schema,
columns_map,
metrics: ExecutionPlanMetricsSet::new(),
aggregation_ordering,
+ required_input_ordering,
})
}
@@ -408,6 +509,11 @@ impl AggregateExec {
&self.filter_expr
}
+ /// ORDER BY clause expression for each aggregate expression
+ pub fn order_by_expr(&self) -> &[Option<Vec<PhysicalSortExpr>>] {
+ &self.order_by_expr
+ }
+
/// Input plan
pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
&self.input
@@ -547,6 +653,10 @@ impl ExecutionPlan for AggregateExec {
}
}
+ fn required_input_ordering(&self) -> Vec<Option<Vec<PhysicalSortRequirement>>> {
+ vec![self.required_input_ordering.clone()]
+ }
+
fn equivalence_properties(&self) -> EquivalenceProperties {
let mut new_properties = EquivalenceProperties::new(self.schema());
project_equivalence_properties(
@@ -570,6 +680,7 @@ impl ExecutionPlan for AggregateExec {
self.group_by.clone(),
self.aggr_expr.clone(),
self.filter_expr.clone(),
+ self.order_by_expr.clone(),
children[0].clone(),
self.input_schema.clone(),
)?))
@@ -951,7 +1062,8 @@ mod tests {
use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use crate::from_slice::FromSlice;
use crate::physical_plan::aggregates::{
- get_working_mode, AggregateExec, AggregateMode, PhysicalGroupBy,
+ get_finest_requirement, get_working_mode, AggregateExec, AggregateMode,
+ PhysicalGroupBy,
};
use crate::physical_plan::expressions::{col, Avg};
use crate::test::exec::{assert_strong_count_converges_to_zero, BlockingExec};
@@ -962,8 +1074,13 @@ mod tests {
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use datafusion_common::{DataFusionError, Result, ScalarValue};
- use datafusion_physical_expr::expressions::{lit, ApproxDistinct, Count, Median};
- use datafusion_physical_expr::{AggregateExpr, PhysicalExpr, PhysicalSortExpr};
+ use datafusion_physical_expr::expressions::{
+ lit, ApproxDistinct, Column, Count, Median,
+ };
+ use datafusion_physical_expr::{
+ AggregateExpr, EquivalenceProperties, OrderedColumn,
+ OrderingEquivalenceProperties, PhysicalExpr, PhysicalSortExpr,
+ };
use futures::{FutureExt, Stream};
use std::any::Any;
use std::sync::Arc;
@@ -1130,6 +1247,7 @@ mod tests {
grouping_set.clone(),
aggregates.clone(),
vec![None],
+ vec![None],
input,
input_schema.clone(),
)?);
@@ -1173,6 +1291,7 @@ mod tests {
final_grouping_set,
aggregates,
vec![None],
+ vec![None],
merge,
input_schema,
)?);
@@ -1235,6 +1354,7 @@ mod tests {
grouping_set.clone(),
aggregates.clone(),
vec![None],
+ vec![None],
input,
input_schema.clone(),
)?);
@@ -1268,6 +1388,7 @@ mod tests {
final_grouping_set,
aggregates,
vec![None],
+ vec![None],
merge,
input_schema,
)?);
@@ -1482,6 +1603,7 @@ mod tests {
groups,
aggregates,
vec![None; 3],
+ vec![None; 3],
input.clone(),
input_schema.clone(),
)?);
@@ -1538,6 +1660,7 @@ mod tests {
groups.clone(),
aggregates.clone(),
vec![None],
+ vec![None],
blocking_exec,
schema,
)?);
@@ -1577,6 +1700,7 @@ mod tests {
groups,
aggregates.clone(),
vec![None],
+ vec![None],
blocking_exec,
schema,
)?);
@@ -1590,4 +1714,63 @@ mod tests {
Ok(())
}
+
+ #[tokio::test]
+ async fn test_get_finest_requirements() -> Result<()> {
+ let test_schema = create_test_schema()?;
+ // Assume column a and b are aliases
+ // Assume also that a ASC and c DESC describe the same global ordering for the table. (Since they are ordering equivalent).
+ let options1 = SortOptions {
+ descending: false,
+ nulls_first: false,
+ };
+ let options2 = SortOptions {
+ descending: true,
+ nulls_first: true,
+ };
+ let mut eq_properties = EquivalenceProperties::new(test_schema.clone());
+ let col_a = Column::new("a", 0);
+ let col_b = Column::new("b", 1);
+ let col_c = Column::new("c", 2);
+ let col_d = Column::new("d", 3);
+ eq_properties.add_equal_conditions((&col_a, &col_b));
+ let mut ordering_eq_properties = OrderingEquivalenceProperties::new(test_schema);
+ ordering_eq_properties.add_equal_conditions((
+ &OrderedColumn::new(col_a.clone(), options1),
+ &OrderedColumn::new(col_c.clone(), options2),
+ ));
+
+ let order_by_exprs = vec![
+ None,
+ Some(vec![PhysicalSortExpr {
+ expr: Arc::new(col_a.clone()),
+ options: options1,
+ }]),
+ Some(vec![PhysicalSortExpr {
+ expr: Arc::new(col_b.clone()),
+ options: options1,
+ }]),
+ Some(vec![PhysicalSortExpr {
+ expr: Arc::new(col_c),
+ options: options2,
+ }]),
+ Some(vec![
+ PhysicalSortExpr {
+ expr: Arc::new(col_a),
+ options: options1,
+ },
+ PhysicalSortExpr {
+ expr: Arc::new(col_d),
+ options: options1,
+ },
+ ]),
+ ];
+ let res = get_finest_requirement(
+ &order_by_exprs,
+ || eq_properties.clone(),
+ || ordering_eq_properties.clone(),
+ )?;
+ assert_eq!(res, order_by_exprs[4]);
+ Ok(())
+ }
}
diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs
index cc52739f6a..57c0251d24 100644
--- a/datafusion/core/src/physical_plan/planner.rs
+++ b/datafusion/core/src/physical_plan/planner.rs
@@ -74,7 +74,7 @@ use datafusion_physical_expr::expressions::Literal;
use datafusion_sql::utils::window_expr_common_partition_keys;
use futures::future::BoxFuture;
use futures::{FutureExt, StreamExt, TryStreamExt};
-use itertools::Itertools;
+use itertools::{multiunzip, Itertools};
use log::{debug, trace};
use std::collections::HashMap;
use std::fmt::Write;
@@ -199,12 +199,23 @@ fn create_physical_name(e: &Expr, is_first_expr: bool) -> Result<String> {
args,
..
}) => create_function_physical_name(&fun.to_string(), *distinct, args),
- Expr::AggregateUDF(AggregateUDF { fun, args, filter }) => {
+ Expr::AggregateUDF(AggregateUDF {
+ fun,
+ args,
+ filter,
+ order_by,
+ }) => {
+ // TODO: Add support for filter and order by in AggregateUDF
if filter.is_some() {
return Err(DataFusionError::Execution(
"aggregate expression with filter is not supported".to_string(),
));
}
+ if order_by.is_some() {
+ return Err(DataFusionError::Execution(
+ "aggregate expression with order_by is not supported".to_string(),
+ ));
+ }
let mut names = Vec::with_capacity(args.len());
for e in args {
names.push(create_physical_name(e, false)?);
@@ -703,13 +714,15 @@ impl DefaultPhysicalPlanner {
)
})
.collect::<Result<Vec<_>>>()?;
- let (aggregates, filters): (Vec<_>, Vec<_>) = agg_filter.into_iter().unzip();
+
+ let (aggregates, filters, order_bys) : (Vec<_>, Vec<_>, Vec<_>) = multiunzip(agg_filter.into_iter());
let initial_aggr = Arc::new(AggregateExec::try_new(
AggregateMode::Partial,
groups.clone(),
aggregates.clone(),
filters.clone(),
+ order_bys.clone(),
input_exec,
physical_input_schema.clone(),
)?);
@@ -746,6 +759,7 @@ impl DefaultPhysicalPlanner {
final_grouping_set,
aggregates,
filters,
+ order_bys,
initial_aggr,
physical_input_schema.clone(),
)?))
@@ -867,25 +881,12 @@ impl DefaultPhysicalPlanner {
let input_dfschema = input.as_ref().schema();
let sort_expr = expr
.iter()
- .map(|e| match e {
- Expr::Sort(expr::Sort {
- expr,
- asc,
- nulls_first,
- }) => create_physical_sort_expr(
- expr,
- input_dfschema,
- &input_schema,
- SortOptions {
- descending: !*asc,
- nulls_first: *nulls_first,
- },
- session_state.execution_props(),
- ),
- _ => Err(DataFusionError::Plan(
- "Sort only accepts sort expressions".to_string(),
- )),
- })
+ .map(|e| create_physical_sort_expr(
+ e,
+ input_dfschema,
+ &input_schema,
+ session_state.execution_props(),
+ ))
.collect::<Result<Vec<_>>>()?;
let new_sort = SortExec::new(sort_expr, physical_input)
.with_fetch(*fetch);
@@ -1554,24 +1555,13 @@ pub fn create_window_expr_with_name(
.collect::<Result<Vec<_>>>()?;
let order_by = order_by
.iter()
- .map(|e| match e {
- Expr::Sort(expr::Sort {
- expr,
- asc,
- nulls_first,
- }) => create_physical_sort_expr(
- expr,
+ .map(|e| {
+ create_physical_sort_expr(
+ e,
logical_input_schema,
physical_input_schema,
- SortOptions {
- descending: !*asc,
- nulls_first: *nulls_first,
- },
execution_props,
- ),
- _ => Err(DataFusionError::Plan(
- "Sort only accepts sort expressions".to_string(),
- )),
+ )
})
.collect::<Result<Vec<_>>>()?;
if !is_window_valid(window_frame) {
@@ -1619,8 +1609,13 @@ pub fn create_window_expr(
)
}
-type AggregateExprWithOptionalFilter =
- (Arc<dyn AggregateExpr>, Option<Arc<dyn PhysicalExpr>>);
+type AggregateExprWithOptionalArgs = (
+ Arc<dyn AggregateExpr>,
+ // The filter clause, if any
+ Option<Arc<dyn PhysicalExpr>>,
+ // Ordering requirements, if any
+ Option<Vec<PhysicalSortExpr>>,
+);
/// Create an aggregate expression with a name from a logical expression
pub fn create_aggregate_expr_with_name_and_maybe_filter(
@@ -1629,13 +1624,14 @@ pub fn create_aggregate_expr_with_name_and_maybe_filter(
logical_input_schema: &DFSchema,
physical_input_schema: &Schema,
execution_props: &ExecutionProps,
-) -> Result<AggregateExprWithOptionalFilter> {
+) -> Result<AggregateExprWithOptionalArgs> {
match e {
Expr::AggregateFunction(AggregateFunction {
fun,
distinct,
args,
filter,
+ order_by,
}) => {
let args = args
.iter()
@@ -1663,10 +1659,30 @@ pub fn create_aggregate_expr_with_name_and_maybe_filter(
&args,
physical_input_schema,
name,
- );
- Ok((agg_expr?, filter))
+ )?;
+ let order_by = match order_by {
+ Some(e) => Some(
+ e.iter()
+ .map(|expr| {
+ create_physical_sort_expr(
+ expr,
+ logical_input_schema,
+ physical_input_schema,
+ execution_props,
+ )
+ })
+ .collect::<Result<Vec<_>>>()?,
+ ),
+ None => None,
+ };
+ Ok((agg_expr, filter, order_by))
}
- Expr::AggregateUDF(AggregateUDF { fun, args, filter }) => {
+ Expr::AggregateUDF(AggregateUDF {
+ fun,
+ args,
+ filter,
+ order_by,
+ }) => {
let args = args
.iter()
.map(|e| {
@@ -1688,10 +1704,25 @@ pub fn create_aggregate_expr_with_name_and_maybe_filter(
)?),
None => None,
};
+ let order_by = match order_by {
+ Some(e) => Some(
+ e.iter()
+ .map(|expr| {
+ create_physical_sort_expr(
+ expr,
+ logical_input_schema,
+ physical_input_schema,
+ execution_props,
+ )
+ })
+ .collect::<Result<Vec<_>>>()?,
+ ),
+ None => None,
+ };
let agg_expr =
udaf::create_aggregate_expr(fun, &args, physical_input_schema, name);
- Ok((agg_expr?, filter))
+ Ok((agg_expr?, filter, order_by))
}
other => Err(DataFusionError::Internal(format!(
"Invalid aggregate expression '{other:?}'"
@@ -1705,7 +1736,7 @@ pub fn create_aggregate_expr_and_maybe_filter(
logical_input_schema: &DFSchema,
physical_input_schema: &Schema,
execution_props: &ExecutionProps,
-) -> Result<AggregateExprWithOptionalFilter> {
+) -> Result<AggregateExprWithOptionalArgs> {
// unpack (nested) aliased logical expressions, e.g. "sum(col) as total"
let (name, e) = match e {
Expr::Alias(sub_expr, alias) => (alias.clone(), sub_expr.as_ref()),
@@ -1726,13 +1757,31 @@ pub fn create_physical_sort_expr(
e: &Expr,
input_dfschema: &DFSchema,
input_schema: &Schema,
- options: SortOptions,
execution_props: &ExecutionProps,
) -> Result<PhysicalSortExpr> {
- Ok(PhysicalSortExpr {
- expr: create_physical_expr(e, input_dfschema, input_schema, execution_props)?,
- options,
- })
+ if let Expr::Sort(expr::Sort {
+ expr,
+ asc,
+ nulls_first,
+ }) = e
+ {
+ Ok(PhysicalSortExpr {
+ expr: create_physical_expr(
+ expr,
+ input_dfschema,
+ input_schema,
+ execution_props,
+ )?,
+ options: SortOptions {
+ descending: !asc,
+ nulls_first: *nulls_first,
+ },
+ })
+ } else {
+ Err(DataFusionError::Internal(
+ "Expects a sort expression".to_string(),
+ ))
+ }
}
impl DefaultPhysicalPlanner {
diff --git a/datafusion/core/tests/aggregate_fuzz.rs b/datafusion/core/tests/aggregate_fuzz.rs
index 14cf469624..74370049e8 100644
--- a/datafusion/core/tests/aggregate_fuzz.rs
+++ b/datafusion/core/tests/aggregate_fuzz.rs
@@ -113,6 +113,7 @@ async fn run_aggregate_test(input1: Vec<RecordBatch>, group_by_columns: Vec<&str
group_by.clone(),
aggregate_expr.clone(),
vec![None],
+ vec![None],
running_source,
schema.clone(),
)
@@ -125,6 +126,7 @@ async fn run_aggregate_test(input1: Vec<RecordBatch>, group_by_columns: Vec<&str
group_by.clone(),
aggregate_expr.clone(),
vec![None],
+ vec![None],
usual_source,
schema.clone(),
)
diff --git a/datafusion/core/tests/sqllogictests/test_files/groupby.slt b/datafusion/core/tests/sqllogictests/test_files/groupby.slt
index b9d2543e11..e5a93e709f 100644
--- a/datafusion/core/tests/sqllogictests/test_files/groupby.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/groupby.slt
@@ -2010,3 +2010,208 @@ SELECT a, d,
statement ok
drop table annotated_data_infinite2;
+
+# create a table for testing
+statement ok
+CREATE TABLE sales_global (zip_code INT,
+ country VARCHAR(3),
+ sn INT,
+ ts TIMESTAMP,
+ currency VARCHAR(3),
+ amount FLOAT
+ ) as VALUES
+ (0, 'GRC', 0, '2022-01-01 06:00:00'::timestamp, 'EUR', 30.0),
+ (1, 'FRA', 1, '2022-01-01 08:00:00'::timestamp, 'EUR', 50.0),
+ (1, 'TUR', 2, '2022-01-01 11:30:00'::timestamp, 'TRY', 75.0),
+ (1, 'FRA', 3, '2022-01-02 12:00:00'::timestamp, 'EUR', 200.0),
+ (1, 'TUR', 4, '2022-01-03 10:00:00'::timestamp, 'TRY', 100.0),
+ (0, 'GRC', 4, '2022-01-03 10:00:00'::timestamp, 'EUR', 80.0)
+
+# test_ordering_sensitive_aggregation
+# ordering sensitive requirement should add a SortExec in the final plan. To satisfy amount ASC
+# in the aggregation
+statement ok
+set datafusion.execution.target_partitions = 1;
+
+query TT
+EXPLAIN SELECT country, (ARRAY_AGG(amount ORDER BY amount ASC)) AS amounts
+ FROM sales_global
+ GROUP BY country
+----
+logical_plan
+Projection: sales_global.country, ARRAYAGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST] AS amounts
+--Aggregate: groupBy=[[sales_global.country]], aggr=[[ARRAYAGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]]]
+----TableScan: sales_global projection=[country, amount]
+physical_plan
+ProjectionExec: expr=[country@0 as country, ARRAYAGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@1 as amounts]
+--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[ARRAYAGG(sales_global.amount)]
+----SortExec: expr=[amount@1 ASC NULLS LAST]
+------MemoryExec: partitions=1, partition_sizes=[1]
+
+
+query T?
+SELECT country, (ARRAY_AGG(amount ORDER BY amount ASC)) AS amounts
+ FROM sales_global
+ GROUP BY country
+----
+GRC [30.0, 80.0]
+FRA [50.0, 200.0]
+TUR [75.0, 100.0]
+
+# test_ordering_sensitive_aggregation2
+# We should be able to satisfy the finest requirement among all aggregators, when we have multiple aggregators.
+# Hence final plan should have SortExec: expr=[amount@1 DESC] to satisfy array_agg requirement.
+query TT
+EXPLAIN SELECT s.country, ARRAY_AGG(s.amount ORDER BY s.amount DESC) AS amounts,
+ SUM(s.amount) AS sum1
+ FROM sales_global AS s
+ GROUP BY s.country
+----
+logical_plan
+Projection: s.country, ARRAYAGG(s.amount) ORDER BY [s.amount DESC NULLS FIRST] AS amounts, SUM(s.amount) AS sum1
+--Aggregate: groupBy=[[s.country]], aggr=[[ARRAYAGG(s.amount) ORDER BY [s.amount DESC NULLS FIRST], SUM(s.amount)]]
+----SubqueryAlias: s
+------TableScan: sales_global projection=[country, amount]
+physical_plan
+ProjectionExec: expr=[country@0 as country, ARRAYAGG(s.amount) ORDER BY [s.amount DESC NULLS FIRST]@1 as amounts, SUM(s.amount)@2 as sum1]
+--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[ARRAYAGG(s.amount), SUM(s.amount)]
+----SortExec: expr=[amount@1 DESC]
+------MemoryExec: partitions=1, partition_sizes=[1]
+
+query T?R
+SELECT s.country, ARRAY_AGG(s.amount ORDER BY s.amount DESC) AS amounts,
+ SUM(s.amount) AS sum1
+ FROM sales_global AS s
+ GROUP BY s.country
+----
+FRA [200.0, 50.0] 250
+TUR [100.0, 75.0] 175
+GRC [80.0, 30.0] 110
+
+# test_ordering_sensitive_multiple_req
+# Currently we do not support multiple ordering requirement for aggregation
+# once this support is added. This test should change
+# See issue: https://github.com/sqlparser-rs/sqlparser-rs/issues/875
+statement error DataFusion error: SQL error: ParserError\("Expected \), found: ,"\)
+SELECT s.country, ARRAY_AGG(s.amount ORDER BY s.amount DESC, s.country DESC) AS amounts,
+ SUM(s.amount ORDER BY s.amount DESC) AS sum1
+ FROM sales_global AS s
+ GROUP BY s.country
+
+# test_ordering_sensitive_aggregation3
+# When different aggregators have conflicting requirements, we cannot satisfy all of them in current implementation.
+# test below should raise Plan Error.
+statement error DataFusion error: This feature is not implemented: Conflicting ordering requirements in aggregate functions is not supported
+SELECT ARRAY_AGG(s.amount ORDER BY s.amount DESC) AS amounts,
+ ARRAY_AGG(s.amount ORDER BY s.amount ASC) AS amounts2,
+ ARRAY_AGG(s.amount ORDER BY s.sn ASC) AS amounts3
+ FROM sales_global AS s
+ GROUP BY s.country
+
+# test_ordering_sensitive_aggregation4
+# If aggregators can work with bounded memory (FullyOrdered or PartiallyOrdered mode), we should append requirement to
+# the existing ordering. This enables us to still work with bounded memory, and also satisfy aggregation requirement.
+# This test checks for whether we can satisfy aggregation requirement in FullyOrdered mode.
+query TT
+EXPLAIN SELECT s.country, ARRAY_AGG(s.amount ORDER BY s.amount DESC) AS amounts,
+ SUM(s.amount) AS sum1
+ FROM (SELECT *
+ FROM sales_global
+ ORDER BY country) AS s
+ GROUP BY s.country
+----
+logical_plan
+Projection: s.country, ARRAYAGG(s.amount) ORDER BY [s.amount DESC NULLS FIRST] AS amounts, SUM(s.amount) AS sum1
+--Aggregate: groupBy=[[s.country]], aggr=[[ARRAYAGG(s.amount) ORDER BY [s.amount DESC NULLS FIRST], SUM(s.amount)]]
+----SubqueryAlias: s
+------Sort: sales_global.country ASC NULLS LAST
+--------TableScan: sales_global projection=[country, amount]
+physical_plan
+ProjectionExec: expr=[country@0 as country, ARRAYAGG(s.amount) ORDER BY [s.amount DESC NULLS FIRST]@1 as amounts, SUM(s.amount)@2 as sum1]
+--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[ARRAYAGG(s.amount), SUM(s.amount)], ordering_mode=FullyOrdered
+----SortExec: expr=[country@0 ASC NULLS LAST,amount@1 DESC]
+------MemoryExec: partitions=1, partition_sizes=[1]
+
+query T?R
+SELECT s.country, ARRAY_AGG(s.amount ORDER BY s.amount DESC) AS amounts,
+ SUM(s.amount) AS sum1
+ FROM (SELECT *
+ FROM sales_global
+ ORDER BY country) AS s
+ GROUP BY s.country
+----
+FRA [200.0, 50.0] 250
+GRC [80.0, 30.0] 110
+TUR [100.0, 75.0] 175
+
+# test_ordering_sensitive_aggregation5
+# If aggregators can work with bounded memory (FullyOrdered or PartiallyOrdered mode), we should be append requirement to
+# the existing ordering. This enables us to still work with bounded memory, and also satisfy aggregation requirement.
+# This test checks for whether we can satisfy aggregation requirement in PartiallyOrdered mode.
+query TT
+EXPLAIN SELECT s.country, s.zip_code, ARRAY_AGG(s.amount ORDER BY s.amount DESC) AS amounts,
+ SUM(s.amount) AS sum1
+ FROM (SELECT *
+ FROM sales_global
+ ORDER BY country) AS s
+ GROUP BY s.country, s.zip_code
+----
+logical_plan
+Projection: s.country, s.zip_code, ARRAYAGG(s.amount) ORDER BY [s.amount DESC NULLS FIRST] AS amounts, SUM(s.amount) AS sum1
+--Aggregate: groupBy=[[s.country, s.zip_code]], aggr=[[ARRAYAGG(s.amount) ORDER BY [s.amount DESC NULLS FIRST], SUM(s.amount)]]
+----SubqueryAlias: s
+------Sort: sales_global.country ASC NULLS LAST
+--------TableScan: sales_global projection=[zip_code, country, amount]
+physical_plan
+ProjectionExec: expr=[country@0 as country, zip_code@1 as zip_code, ARRAYAGG(s.amount) ORDER BY [s.amount DESC NULLS FIRST]@2 as amounts, SUM(s.amount)@3 as sum1]
+--AggregateExec: mode=Single, gby=[country@1 as country, zip_code@0 as zip_code], aggr=[ARRAYAGG(s.amount), SUM(s.amount)], ordering_mode=PartiallyOrdered
+----SortExec: expr=[country@1 ASC NULLS LAST,amount@2 DESC]
+------MemoryExec: partitions=1, partition_sizes=[1]
+
+query TI?R
+SELECT s.country, s.zip_code, ARRAY_AGG(s.amount ORDER BY s.amount DESC) AS amounts,
+ SUM(s.amount) AS sum1
+ FROM (SELECT *
+ FROM sales_global
+ ORDER BY country) AS s
+ GROUP BY s.country, s.zip_code
+----
+FRA 1 [200.0, 50.0] 250
+GRC 0 [80.0, 30.0] 110
+TUR 1 [100.0, 75.0] 175
+
+# test_ordering_sensitive_aggregation6
+# If aggregators can work with bounded memory (FullyOrdered or PartiallyOrdered mode), we should be append requirement to
+# the existing ordering. When group by expressions contain aggregation requirement, we shouldn't append redundant expression.
+# Hence in the final plan SortExec should be SortExec: expr=[country@0 DESC] not SortExec: expr=[country@0 ASC NULLS LAST,country@0 DESC]
+query TT
+EXPLAIN SELECT s.country, ARRAY_AGG(s.amount ORDER BY s.country DESC) AS amounts,
+ SUM(s.amount) AS sum1
+ FROM (SELECT *
+ FROM sales_global
+ ORDER BY country) AS s
+ GROUP BY s.country
+----
+logical_plan
+Projection: s.country, ARRAYAGG(s.amount) ORDER BY [s.country DESC NULLS FIRST] AS amounts, SUM(s.amount) AS sum1
+--Aggregate: groupBy=[[s.country]], aggr=[[ARRAYAGG(s.amount) ORDER BY [s.country DESC NULLS FIRST], SUM(s.amount)]]
+----SubqueryAlias: s
+------Sort: sales_global.country ASC NULLS LAST
+--------TableScan: sales_global projection=[country, amount]
+physical_plan
+ProjectionExec: expr=[country@0 as country, ARRAYAGG(s.amount) ORDER BY [s.country DESC NULLS FIRST]@1 as amounts, SUM(s.amount)@2 as sum1]
+--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[ARRAYAGG(s.amount), SUM(s.amount)], ordering_mode=FullyOrdered
+----SortExec: expr=[country@0 ASC NULLS LAST]
+------MemoryExec: partitions=1, partition_sizes=[1]
+
+query T?R
+SELECT s.country, ARRAY_AGG(s.amount ORDER BY s.amount DESC) AS amounts,
+ SUM(s.amount) AS sum1
+ FROM (SELECT *
+ FROM sales_global
+ ORDER BY country) AS s
+ GROUP BY s.country
+----
+FRA [200.0, 50.0] 250
+GRC [80.0, 30.0] 110
+TUR [100.0, 75.0] 175
diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs
index 55bc71f1ab..230e2fb916 100644
--- a/datafusion/expr/src/expr.rs
+++ b/datafusion/expr/src/expr.rs
@@ -27,9 +27,7 @@ use crate::window_frame;
use crate::window_function;
use crate::Operator;
use arrow::datatypes::DataType;
-use datafusion_common::Result;
-use datafusion_common::{plan_err, Column};
-use datafusion_common::{DataFusionError, ScalarValue};
+use datafusion_common::{plan_err, Column, DataFusionError, Result, ScalarValue};
use std::collections::HashSet;
use std::fmt;
use std::fmt::{Display, Formatter, Write};
@@ -424,6 +422,8 @@ pub struct AggregateFunction {
pub distinct: bool,
/// Optional filter
pub filter: Option<Box<Expr>>,
+ /// Optional ordering
+ pub order_by: Option<Vec<Expr>>,
}
impl AggregateFunction {
@@ -432,12 +432,14 @@ impl AggregateFunction {
args: Vec<Expr>,
distinct: bool,
filter: Option<Box<Expr>>,
+ order_by: Option<Vec<Expr>>,
) -> Self {
Self {
fun,
args,
distinct,
filter,
+ order_by,
}
}
}
@@ -500,6 +502,8 @@ pub struct AggregateUDF {
pub args: Vec<Expr>,
/// Optional filter
pub filter: Option<Box<Expr>>,
+ /// Optional ORDER BY applied prior to aggregating
+ pub order_by: Option<Vec<Expr>>,
}
impl AggregateUDF {
@@ -508,8 +512,14 @@ impl AggregateUDF {
fun: Arc<udaf::AggregateUDF>,
args: Vec<Expr>,
filter: Option<Box<Expr>>,
+ order_by: Option<Vec<Expr>>,
) -> Self {
- Self { fun, args, filter }
+ Self {
+ fun,
+ args,
+ filter,
+ order_by,
+ }
}
}
@@ -1042,24 +1052,32 @@ impl fmt::Debug for Expr {
distinct,
ref args,
filter,
+ order_by,
..
}) => {
fmt_function(f, &fun.to_string(), *distinct, args, true)?;
if let Some(fe) = filter {
write!(f, " FILTER (WHERE {fe})")?;
}
+ if let Some(ob) = order_by {
+ write!(f, " ORDER BY {:?}", ob)?;
+ }
Ok(())
}
Expr::AggregateUDF(AggregateUDF {
fun,
ref args,
filter,
+ order_by,
..
}) => {
fmt_function(f, &fun.name, false, args, false)?;
if let Some(fe) = filter {
write!(f, " FILTER (WHERE {fe})")?;
}
+ if let Some(ob) = order_by {
+ write!(f, " ORDER BY {:?}", ob)?;
+ }
Ok(())
}
Expr::Between(Between {
@@ -1398,25 +1416,35 @@ fn create_name(e: &Expr) -> Result<String> {
distinct,
args,
filter,
+ order_by,
}) => {
- let name = create_function_name(&fun.to_string(), *distinct, args)?;
+ let mut name = create_function_name(&fun.to_string(), *distinct, args)?;
if let Some(fe) = filter {
- Ok(format!("{name} FILTER (WHERE {fe})"))
- } else {
- Ok(name)
- }
+ name = format!("{name} FILTER (WHERE {fe})");
+ };
+ if let Some(order_by) = order_by {
+ name = format!("{name} ORDER BY {order_by:?}");
+ };
+ Ok(name)
}
- Expr::AggregateUDF(AggregateUDF { fun, args, filter }) => {
+ Expr::AggregateUDF(AggregateUDF {
+ fun,
+ args,
+ filter,
+ order_by,
+ }) => {
let mut names = Vec::with_capacity(args.len());
for e in args {
names.push(create_name(e)?);
}
- let filter = if let Some(fe) = filter {
- format!(" FILTER (WHERE {fe})")
- } else {
- "".to_string()
- };
- Ok(format!("{}({}){}", fun.name, names.join(","), filter))
+ let mut info = String::new();
+ if let Some(fe) = filter {
+ info += &format!(" FILTER (WHERE {fe})");
+ }
+ if let Some(ob) = order_by {
+ info += &format!(" ORDER BY ({:?})", ob);
+ }
+ Ok(format!("{}({}){}", fun.name, names.join(","), info))
}
Expr::GroupingSet(grouping_set) => match grouping_set {
GroupingSet::Rollup(exprs) => {
diff --git a/datafusion/expr/src/expr_fn.rs b/datafusion/expr/src/expr_fn.rs
index 1d781e6f0b..6aa51be566 100644
--- a/datafusion/expr/src/expr_fn.rs
+++ b/datafusion/expr/src/expr_fn.rs
@@ -108,6 +108,7 @@ pub fn min(expr: Expr) -> Expr {
vec![expr],
false,
None,
+ None,
))
}
@@ -118,6 +119,7 @@ pub fn max(expr: Expr) -> Expr {
vec![expr],
false,
None,
+ None,
))
}
@@ -128,6 +130,7 @@ pub fn sum(expr: Expr) -> Expr {
vec![expr],
false,
None,
+ None,
))
}
@@ -138,6 +141,7 @@ pub fn avg(expr: Expr) -> Expr {
vec![expr],
false,
None,
+ None,
))
}
@@ -148,6 +152,7 @@ pub fn count(expr: Expr) -> Expr {
vec![expr],
false,
None,
+ None,
))
}
@@ -203,6 +208,7 @@ pub fn count_distinct(expr: Expr) -> Expr {
vec![expr],
true,
None,
+ None,
))
}
@@ -254,6 +260,7 @@ pub fn approx_distinct(expr: Expr) -> Expr {
vec![expr],
false,
None,
+ None,
))
}
@@ -264,6 +271,7 @@ pub fn median(expr: Expr) -> Expr {
vec![expr],
false,
None,
+ None,
))
}
@@ -274,6 +282,7 @@ pub fn approx_median(expr: Expr) -> Expr {
vec![expr],
false,
None,
+ None,
))
}
@@ -284,6 +293,7 @@ pub fn approx_percentile_cont(expr: Expr, percentile: Expr) -> Expr {
vec![expr, percentile],
false,
None,
+ None,
))
}
@@ -298,6 +308,7 @@ pub fn approx_percentile_cont_with_weight(
vec![expr, weight_expr, percentile],
false,
None,
+ None,
))
}
@@ -367,6 +378,7 @@ pub fn stddev(expr: Expr) -> Expr {
vec![expr],
false,
None,
+ None,
))
}
diff --git a/datafusion/expr/src/tree_node/expr.rs b/datafusion/expr/src/tree_node/expr.rs
index 3b8df59dab..3c98844109 100644
--- a/datafusion/expr/src/tree_node/expr.rs
+++ b/datafusion/expr/src/tree_node/expr.rs
@@ -97,13 +97,16 @@ impl TreeNode for Expr {
}
expr_vec
}
- Expr::AggregateFunction(AggregateFunction { args, filter, .. })
- | Expr::AggregateUDF(AggregateUDF { args, filter, .. }) => {
+ Expr::AggregateFunction(AggregateFunction { args, filter, order_by, .. })
+ | Expr::AggregateUDF(AggregateUDF { args, filter, order_by, .. }) => {
let mut expr_vec = args.clone();
if let Some(f) = filter {
expr_vec.push(f.as_ref().clone());
}
+ if let Some(o) = order_by {
+ expr_vec.extend(o.clone());
+ }
expr_vec
}
@@ -292,11 +295,13 @@ impl TreeNode for Expr {
fun,
distinct,
filter,
+ order_by,
}) => Expr::AggregateFunction(AggregateFunction::new(
fun,
transform_vec(args, &mut transform)?,
distinct,
transform_option_box(filter, &mut transform)?,
+ transform_option_vec(order_by, &mut transform)?,
)),
Expr::GroupingSet(grouping_set) => match grouping_set {
GroupingSet::Rollup(exprs) => Expr::GroupingSet(GroupingSet::Rollup(
@@ -314,11 +319,22 @@ impl TreeNode for Expr {
))
}
},
- Expr::AggregateUDF(AggregateUDF { args, fun, filter }) => {
+ Expr::AggregateUDF(AggregateUDF {
+ args,
+ fun,
+ filter,
+ order_by,
+ }) => {
+ let order_by = if let Some(order_by) = order_by {
+ Some(transform_vec(order_by, &mut transform)?)
+ } else {
+ None
+ };
Expr::AggregateUDF(AggregateUDF::new(
fun,
transform_vec(args, &mut transform)?,
transform_option_box(filter, &mut transform)?,
+ transform_option_vec(order_by, &mut transform)?,
))
}
Expr::InList(InList {
@@ -371,6 +387,21 @@ where
.transpose()
}
+/// &mut transform a Option<`Vec` of `Expr`s>
+fn transform_option_vec<F>(
+ option_box: Option<Vec<Expr>>,
+ transform: &mut F,
+) -> Result<Option<Vec<Expr>>>
+where
+ F: FnMut(Expr) -> Result<Expr>,
+{
+ Ok(if let Some(exprs) = option_box {
+ Some(transform_vec(exprs, transform)?)
+ } else {
+ None
+ })
+}
+
/// &mut transform a `Vec` of `Expr`s
fn transform_vec<F>(v: Vec<Expr>, transform: &mut F) -> Result<Vec<Expr>>
where
diff --git a/datafusion/expr/src/udaf.rs b/datafusion/expr/src/udaf.rs
index d681390d27..6c3690e283 100644
--- a/datafusion/expr/src/udaf.rs
+++ b/datafusion/expr/src/udaf.rs
@@ -90,6 +90,7 @@ impl AggregateUDF {
fun: Arc::new(self.clone()),
args,
filter: None,
+ order_by: None,
})
}
}
diff --git a/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs b/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs
index 3b0e334618..0e2689c14f 100644
--- a/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs
+++ b/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs
@@ -168,12 +168,14 @@ impl TreeNodeRewriter for CountWildcardRewriter {
args,
distinct,
filter,
+ order_by,
}) if args.len() == 1 => match args[0] {
Expr::Wildcard => Expr::AggregateFunction(AggregateFunction {
fun: aggregate_function::AggregateFunction::Count,
args: vec![lit(COUNT_STAR_EXPANSION)],
distinct,
filter,
+ order_by,
}),
_ => old_expr,
},
diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs b/datafusion/optimizer/src/analyzer/type_coercion.rs
index 6de095cccd..fbb61fb1a3 100644
--- a/datafusion/optimizer/src/analyzer/type_coercion.rs
+++ b/datafusion/optimizer/src/analyzer/type_coercion.rs
@@ -393,6 +393,7 @@ impl TreeNodeRewriter for TypeCoercionRewriter {
args,
distinct,
filter,
+ order_by,
}) => {
let new_expr = coerce_agg_exprs_for_signature(
&fun,
@@ -401,18 +402,24 @@ impl TreeNodeRewriter for TypeCoercionRewriter {
&aggregate_function::signature(&fun),
)?;
let expr = Expr::AggregateFunction(expr::AggregateFunction::new(
- fun, new_expr, distinct, filter,
+ fun, new_expr, distinct, filter, order_by,
));
Ok(expr)
}
- Expr::AggregateUDF(expr::AggregateUDF { fun, args, filter }) => {
+ Expr::AggregateUDF(expr::AggregateUDF {
+ fun,
+ args,
+ filter,
+ order_by,
+ }) => {
let new_expr = coerce_arguments_for_signature(
args.as_slice(),
&self.schema,
&fun.signature,
)?;
- let expr =
- Expr::AggregateUDF(expr::AggregateUDF::new(fun, new_expr, filter));
+ let expr = Expr::AggregateUDF(expr::AggregateUDF::new(
+ fun, new_expr, filter, order_by,
+ ));
Ok(expr)
}
Expr::WindowFunction(WindowFunction {
@@ -885,6 +892,7 @@ mod test {
Arc::new(my_avg),
vec![lit(10i64)],
None,
+ None,
));
let plan = LogicalPlan::Projection(Projection::try_new(vec![udaf], empty)?);
let expected = "Projection: MY_AVG(CAST(Int64(10) AS Float64))\n EmptyRelation";
@@ -915,6 +923,7 @@ mod test {
Arc::new(my_avg),
vec![lit("10")],
None,
+ None,
));
let plan = LogicalPlan::Projection(Projection::try_new(vec![udaf], empty)?);
let err = assert_analyzed_plan_eq(Arc::new(TypeCoercion::new()), &plan, "")
@@ -936,6 +945,7 @@ mod test {
vec![lit(12i64)],
false,
None,
+ None,
));
let plan = LogicalPlan::Projection(Projection::try_new(vec![agg_expr], empty)?);
let expected = "Projection: AVG(Int64(12))\n EmptyRelation";
@@ -948,6 +958,7 @@ mod test {
vec![col("a")],
false,
None,
+ None,
));
let plan = LogicalPlan::Projection(Projection::try_new(vec![agg_expr], empty)?);
let expected = "Projection: AVG(a)\n EmptyRelation";
@@ -964,6 +975,7 @@ mod test {
vec![lit("1")],
false,
None,
+ None,
));
let err = Projection::try_new(vec![agg_expr], empty).err().unwrap();
assert_eq!(
diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs b/datafusion/optimizer/src/common_subexpr_eliminate.rs
index a2db933309..6989ca5352 100644
--- a/datafusion/optimizer/src/common_subexpr_eliminate.rs
+++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs
@@ -880,6 +880,7 @@ mod test {
)),
vec![inner],
None,
+ None,
))
};
diff --git a/datafusion/optimizer/src/push_down_projection.rs b/datafusion/optimizer/src/push_down_projection.rs
index 5c6f10825a..c64dfc578b 100644
--- a/datafusion/optimizer/src/push_down_projection.rs
+++ b/datafusion/optimizer/src/push_down_projection.rs
@@ -1056,6 +1056,7 @@ mod tests {
vec![col("b")],
false,
Some(Box::new(col("c").gt(lit(42)))),
+ None,
));
let plan = LogicalPlanBuilder::from(table_scan)
diff --git a/datafusion/optimizer/src/single_distinct_to_groupby.rs b/datafusion/optimizer/src/single_distinct_to_groupby.rs
index cee31b5b33..ba7e89094b 100644
--- a/datafusion/optimizer/src/single_distinct_to_groupby.rs
+++ b/datafusion/optimizer/src/single_distinct_to_groupby.rs
@@ -131,6 +131,7 @@ impl OptimizerRule for SingleDistinctToGroupBy {
fun,
args,
filter,
+ order_by,
..
}) => {
// is_single_distinct_agg ensure args.len=1
@@ -144,6 +145,7 @@ impl OptimizerRule for SingleDistinctToGroupBy {
vec![col(SINGLE_DISTINCT_ALIAS)],
false, // intentional to remove distinct here
filter.clone(),
+ order_by.clone(),
)))
}
_ => Ok(aggr_expr.clone()),
@@ -399,6 +401,7 @@ mod tests {
vec![col("b")],
true,
None,
+ None,
)),
],
)?
diff --git a/datafusion/physical-expr/src/utils.rs b/datafusion/physical-expr/src/utils.rs
index 967ecdb40f..a8a0625ca0 100644
--- a/datafusion/physical-expr/src/utils.rs
+++ b/datafusion/physical-expr/src/utils.rs
@@ -304,7 +304,7 @@ pub fn ordering_satisfy<
/// Checks whether the required [`PhysicalSortExpr`]s are satisfied by the
/// provided [`PhysicalSortExpr`]s.
-fn ordering_satisfy_concrete<
+pub fn ordering_satisfy_concrete<
F: FnOnce() -> EquivalenceProperties,
F2: FnOnce() -> OrderingEquivalenceProperties,
>(
diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto
index 08e6360aa3..ef1e2f284e 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -576,12 +576,14 @@ message AggregateExprNode {
repeated LogicalExprNode expr = 2;
bool distinct = 3;
LogicalExprNode filter = 4;
+ repeated LogicalExprNode order_by = 5;
}
message AggregateUDFExprNode {
string fun_name = 1;
repeated LogicalExprNode args = 2;
LogicalExprNode filter = 3;
+ repeated LogicalExprNode order_by = 4;
}
message ScalarUDFExprNode {
@@ -1284,6 +1286,10 @@ message MaybeFilter {
PhysicalExprNode expr = 1;
}
+message MaybePhysicalSortExprs {
+ repeated PhysicalSortExprNode sort_expr = 1;
+}
+
message AggregateExecNode {
repeated PhysicalExprNode group_expr = 1;
repeated PhysicalExprNode aggr_expr = 2;
@@ -1296,6 +1302,7 @@ message AggregateExecNode {
repeated PhysicalExprNode null_expr = 8;
repeated bool groups = 9;
repeated MaybeFilter filter_expr = 10;
+ repeated MaybePhysicalSortExprs order_by_expr = 11;
}
message GlobalLimitExecNode {
diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs
index 79f4f06d37..c217c4d7fe 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -36,6 +36,9 @@ impl serde::Serialize for AggregateExecNode {
if !self.filter_expr.is_empty() {
len += 1;
}
+ if !self.order_by_expr.is_empty() {
+ len += 1;
+ }
let mut struct_ser = serializer.serialize_struct("datafusion.AggregateExecNode", len)?;
if !self.group_expr.is_empty() {
struct_ser.serialize_field("groupExpr", &self.group_expr)?;
@@ -69,6 +72,9 @@ impl serde::Serialize for AggregateExecNode {
if !self.filter_expr.is_empty() {
struct_ser.serialize_field("filterExpr", &self.filter_expr)?;
}
+ if !self.order_by_expr.is_empty() {
+ struct_ser.serialize_field("orderByExpr", &self.order_by_expr)?;
+ }
struct_ser.end()
}
}
@@ -96,6 +102,8 @@ impl<'de> serde::Deserialize<'de> for AggregateExecNode {
"groups",
"filter_expr",
"filterExpr",
+ "order_by_expr",
+ "orderByExpr",
];
#[allow(clippy::enum_variant_names)]
@@ -110,6 +118,7 @@ impl<'de> serde::Deserialize<'de> for AggregateExecNode {
NullExpr,
Groups,
FilterExpr,
+ OrderByExpr,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
fn deserialize<D>(deserializer: D) -> std::result::Result<GeneratedField, D::Error>
@@ -141,6 +150,7 @@ impl<'de> serde::Deserialize<'de> for AggregateExecNode {
"nullExpr" | "null_expr" => Ok(GeneratedField::NullExpr),
"groups" => Ok(GeneratedField::Groups),
"filterExpr" | "filter_expr" => Ok(GeneratedField::FilterExpr),
+ "orderByExpr" | "order_by_expr" => Ok(GeneratedField::OrderByExpr),
_ => Err(serde::de::Error::unknown_field(value, FIELDS)),
}
}
@@ -170,6 +180,7 @@ impl<'de> serde::Deserialize<'de> for AggregateExecNode {
let mut null_expr__ = None;
let mut groups__ = None;
let mut filter_expr__ = None;
+ let mut order_by_expr__ = None;
while let Some(k) = map.next_key()? {
match k {
GeneratedField::GroupExpr => {
@@ -232,6 +243,12 @@ impl<'de> serde::Deserialize<'de> for AggregateExecNode {
}
filter_expr__ = Some(map.next_value()?);
}
+ GeneratedField::OrderByExpr => {
+ if order_by_expr__.is_some() {
+ return Err(serde::de::Error::duplicate_field("orderByExpr"));
+ }
+ order_by_expr__ = Some(map.next_value()?);
+ }
}
}
Ok(AggregateExecNode {
@@ -245,6 +262,7 @@ impl<'de> serde::Deserialize<'de> for AggregateExecNode {
null_expr: null_expr__.unwrap_or_default(),
groups: groups__.unwrap_or_default(),
filter_expr: filter_expr__.unwrap_or_default(),
+ order_by_expr: order_by_expr__.unwrap_or_default(),
})
}
}
@@ -271,6 +289,9 @@ impl serde::Serialize for AggregateExprNode {
if self.filter.is_some() {
len += 1;
}
+ if !self.order_by.is_empty() {
+ len += 1;
+ }
let mut struct_ser = serializer.serialize_struct("datafusion.AggregateExprNode", len)?;
if self.aggr_function != 0 {
let v = AggregateFunction::from_i32(self.aggr_function)
@@ -286,6 +307,9 @@ impl serde::Serialize for AggregateExprNode {
if let Some(v) = self.filter.as_ref() {
struct_ser.serialize_field("filter", v)?;
}
+ if !self.order_by.is_empty() {
+ struct_ser.serialize_field("orderBy", &self.order_by)?;
+ }
struct_ser.end()
}
}
@@ -301,6 +325,8 @@ impl<'de> serde::Deserialize<'de> for AggregateExprNode {
"expr",
"distinct",
"filter",
+ "order_by",
+ "orderBy",
];
#[allow(clippy::enum_variant_names)]
@@ -309,6 +335,7 @@ impl<'de> serde::Deserialize<'de> for AggregateExprNode {
Expr,
Distinct,
Filter,
+ OrderBy,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
fn deserialize<D>(deserializer: D) -> std::result::Result<GeneratedField, D::Error>
@@ -334,6 +361,7 @@ impl<'de> serde::Deserialize<'de> for AggregateExprNode {
"expr" => Ok(GeneratedField::Expr),
"distinct" => Ok(GeneratedField::Distinct),
"filter" => Ok(GeneratedField::Filter),
+ "orderBy" | "order_by" => Ok(GeneratedField::OrderBy),
_ => Err(serde::de::Error::unknown_field(value, FIELDS)),
}
}
@@ -357,6 +385,7 @@ impl<'de> serde::Deserialize<'de> for AggregateExprNode {
let mut expr__ = None;
let mut distinct__ = None;
let mut filter__ = None;
+ let mut order_by__ = None;
while let Some(k) = map.next_key()? {
match k {
GeneratedField::AggrFunction => {
@@ -383,6 +412,12 @@ impl<'de> serde::Deserialize<'de> for AggregateExprNode {
}
filter__ = map.next_value()?;
}
+ GeneratedField::OrderBy => {
+ if order_by__.is_some() {
+ return Err(serde::de::Error::duplicate_field("orderBy"));
+ }
+ order_by__ = Some(map.next_value()?);
+ }
}
}
Ok(AggregateExprNode {
@@ -390,6 +425,7 @@ impl<'de> serde::Deserialize<'de> for AggregateExprNode {
expr: expr__.unwrap_or_default(),
distinct: distinct__.unwrap_or_default(),
filter: filter__,
+ order_by: order_by__.unwrap_or_default(),
})
}
}
@@ -758,6 +794,9 @@ impl serde::Serialize for AggregateUdfExprNode {
if self.filter.is_some() {
len += 1;
}
+ if !self.order_by.is_empty() {
+ len += 1;
+ }
let mut struct_ser = serializer.serialize_struct("datafusion.AggregateUDFExprNode", len)?;
if !self.fun_name.is_empty() {
struct_ser.serialize_field("funName", &self.fun_name)?;
@@ -768,6 +807,9 @@ impl serde::Serialize for AggregateUdfExprNode {
if let Some(v) = self.filter.as_ref() {
struct_ser.serialize_field("filter", v)?;
}
+ if !self.order_by.is_empty() {
+ struct_ser.serialize_field("orderBy", &self.order_by)?;
+ }
struct_ser.end()
}
}
@@ -782,6 +824,8 @@ impl<'de> serde::Deserialize<'de> for AggregateUdfExprNode {
"funName",
"args",
"filter",
+ "order_by",
+ "orderBy",
];
#[allow(clippy::enum_variant_names)]
@@ -789,6 +833,7 @@ impl<'de> serde::Deserialize<'de> for AggregateUdfExprNode {
FunName,
Args,
Filter,
+ OrderBy,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
fn deserialize<D>(deserializer: D) -> std::result::Result<GeneratedField, D::Error>
@@ -813,6 +858,7 @@ impl<'de> serde::Deserialize<'de> for AggregateUdfExprNode {
"funName" | "fun_name" => Ok(GeneratedField::FunName),
"args" => Ok(GeneratedField::Args),
"filter" => Ok(GeneratedField::Filter),
+ "orderBy" | "order_by" => Ok(GeneratedField::OrderBy),
_ => Err(serde::de::Error::unknown_field(value, FIELDS)),
}
}
@@ -835,6 +881,7 @@ impl<'de> serde::Deserialize<'de> for AggregateUdfExprNode {
let mut fun_name__ = None;
let mut args__ = None;
let mut filter__ = None;
+ let mut order_by__ = None;
while let Some(k) = map.next_key()? {
match k {
GeneratedField::FunName => {
@@ -855,12 +902,19 @@ impl<'de> serde::Deserialize<'de> for AggregateUdfExprNode {
}
filter__ = map.next_value()?;
}
+ GeneratedField::OrderBy => {
+ if order_by__.is_some() {
+ return Err(serde::de::Error::duplicate_field("orderBy"));
+ }
+ order_by__ = Some(map.next_value()?);
+ }
}
}
Ok(AggregateUdfExprNode {
fun_name: fun_name__.unwrap_or_default(),
args: args__.unwrap_or_default(),
filter: filter__,
+ order_by: order_by__.unwrap_or_default(),
})
}
}
@@ -11499,6 +11553,98 @@ impl<'de> serde::Deserialize<'de> for MaybeFilter {
deserializer.deserialize_struct("datafusion.MaybeFilter", FIELDS, GeneratedVisitor)
}
}
+impl serde::Serialize for MaybePhysicalSortExprs {
+ #[allow(deprecated)]
+ fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
+ where
+ S: serde::Serializer,
+ {
+ use serde::ser::SerializeStruct;
+ let mut len = 0;
+ if !self.sort_expr.is_empty() {
+ len += 1;
+ }
+ let mut struct_ser = serializer.serialize_struct("datafusion.MaybePhysicalSortExprs", len)?;
+ if !self.sort_expr.is_empty() {
+ struct_ser.serialize_field("sortExpr", &self.sort_expr)?;
+ }
+ struct_ser.end()
+ }
+}
+impl<'de> serde::Deserialize<'de> for MaybePhysicalSortExprs {
+ #[allow(deprecated)]
+ fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
+ where
+ D: serde::Deserializer<'de>,
+ {
+ const FIELDS: &[&str] = &[
+ "sort_expr",
+ "sortExpr",
+ ];
+
+ #[allow(clippy::enum_variant_names)]
+ enum GeneratedField {
+ SortExpr,
+ }
+ impl<'de> serde::Deserialize<'de> for GeneratedField {
+ fn deserialize<D>(deserializer: D) -> std::result::Result<GeneratedField, D::Error>
+ where
+ D: serde::Deserializer<'de>,
+ {
+ struct GeneratedVisitor;
+
+ impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+ type Value = GeneratedField;
+
+ fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(formatter, "expected one of: {:?}", &FIELDS)
+ }
+
+ #[allow(unused_variables)]
+ fn visit_str<E>(self, value: &str) -> std::result::Result<GeneratedField, E>
+ where
+ E: serde::de::Error,
+ {
+ match value {
+ "sortExpr" | "sort_expr" => Ok(GeneratedField::SortExpr),
+ _ => Err(serde::de::Error::unknown_field(value, FIELDS)),
+ }
+ }
+ }
+ deserializer.deserialize_identifier(GeneratedVisitor)
+ }
+ }
+ struct GeneratedVisitor;
+ impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+ type Value = MaybePhysicalSortExprs;
+
+ fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ formatter.write_str("struct datafusion.MaybePhysicalSortExprs")
+ }
+
+ fn visit_map<V>(self, mut map: V) -> std::result::Result<MaybePhysicalSortExprs, V::Error>
+ where
+ V: serde::de::MapAccess<'de>,
+ {
+ let mut sort_expr__ = None;
+ while let Some(k) = map.next_key()? {
+ match k {
+ GeneratedField::SortExpr => {
+ if sort_expr__.is_some() {
+ return Err(serde::de::Error::duplicate_field("sortExpr"));
+ }
+ sort_expr__ = Some(map.next_value()?);
+ }
+ }
+ }
+ Ok(MaybePhysicalSortExprs {
+ sort_expr: sort_expr__.unwrap_or_default(),
+ })
+ }
+ }
+ deserializer.deserialize_struct("datafusion.MaybePhysicalSortExprs", FIELDS, GeneratedVisitor)
+ }
+}
impl serde::Serialize for NegativeNode {
#[allow(deprecated)]
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs
index 0736b75084..83255797fa 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -692,6 +692,8 @@ pub struct AggregateExprNode {
pub distinct: bool,
#[prost(message, optional, boxed, tag = "4")]
pub filter: ::core::option::Option<::prost::alloc::boxed::Box<LogicalExprNode>>,
+ #[prost(message, repeated, tag = "5")]
+ pub order_by: ::prost::alloc::vec::Vec<LogicalExprNode>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
@@ -702,6 +704,8 @@ pub struct AggregateUdfExprNode {
pub args: ::prost::alloc::vec::Vec<LogicalExprNode>,
#[prost(message, optional, boxed, tag = "3")]
pub filter: ::core::option::Option<::prost::alloc::boxed::Box<LogicalExprNode>>,
+ #[prost(message, repeated, tag = "4")]
+ pub order_by: ::prost::alloc::vec::Vec<LogicalExprNode>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
@@ -1820,6 +1824,12 @@ pub struct MaybeFilter {
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct MaybePhysicalSortExprs {
+ #[prost(message, repeated, tag = "1")]
+ pub sort_expr: ::prost::alloc::vec::Vec<PhysicalSortExprNode>,
+}
+#[allow(clippy::derive_partial_eq_without_eq)]
+#[derive(Clone, PartialEq, ::prost::Message)]
pub struct AggregateExecNode {
#[prost(message, repeated, tag = "1")]
pub group_expr: ::prost::alloc::vec::Vec<PhysicalExprNode>,
@@ -1842,6 +1852,8 @@ pub struct AggregateExecNode {
pub groups: ::prost::alloc::vec::Vec<bool>,
#[prost(message, repeated, tag = "10")]
pub filter_expr: ::prost::alloc::vec::Vec<MaybeFilter>,
+ #[prost(message, repeated, tag = "11")]
+ pub order_by_expr: ::prost::alloc::vec::Vec<MaybePhysicalSortExprs>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
diff --git a/datafusion/proto/src/logical_plan/from_proto.rs b/datafusion/proto/src/logical_plan/from_proto.rs
index de4b03a069..b40f867d98 100644
--- a/datafusion/proto/src/logical_plan/from_proto.rs
+++ b/datafusion/proto/src/logical_plan/from_proto.rs
@@ -990,6 +990,7 @@ pub fn parse_expr(
.collect::<Result<Vec<_>, _>>()?,
expr.distinct,
parse_optional_expr(expr.filter.as_deref(), registry)?.map(Box::new),
+ parse_vec_expr(&expr.order_by, registry)?,
)))
}
ExprType::Alias(alias) => Ok(Expr::Alias(
@@ -1390,6 +1391,7 @@ pub fn parse_expr(
.map(|expr| parse_expr(expr, registry))
.collect::<Result<Vec<_>, Error>>()?,
parse_optional_expr(pb.filter.as_deref(), registry)?.map(Box::new),
+ parse_vec_expr(&pb.order_by, registry)?,
)))
}
@@ -1479,6 +1481,20 @@ pub fn from_proto_binary_op(op: &str) -> Result<Operator, Error> {
}
}
+fn parse_vec_expr(
+ p: &[protobuf::LogicalExprNode],
+ registry: &dyn FunctionRegistry,
+) -> Result<Option<Vec<Expr>>, Error> {
+ let res = p
+ .iter()
+ .map(|elem| {
+ parse_expr(elem, registry).map_err(|e| DataFusionError::Plan(e.to_string()))
+ })
+ .collect::<Result<Vec<_>>>()?;
+ // Convert empty vector to None.
+ Ok((!res.is_empty()).then_some(res))
+}
+
fn parse_optional_expr(
p: Option<&protobuf::LogicalExprNode>,
registry: &dyn FunctionRegistry,
diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs
index 8d68a958b2..c61f90b2dc 100644
--- a/datafusion/proto/src/logical_plan/mod.rs
+++ b/datafusion/proto/src/logical_plan/mod.rs
@@ -2525,6 +2525,7 @@ mod roundtrip_tests {
vec![col("bananas")],
false,
None,
+ None,
));
let ctx = SessionContext::new();
roundtrip_expr_test(test_expr, ctx);
@@ -2537,6 +2538,7 @@ mod roundtrip_tests {
vec![col("bananas")],
true,
None,
+ None,
));
let ctx = SessionContext::new();
roundtrip_expr_test(test_expr, ctx);
@@ -2549,6 +2551,7 @@ mod roundtrip_tests {
vec![col("bananas"), lit(0.42_f32)],
false,
None,
+ None,
));
let ctx = SessionContext::new();
@@ -2606,6 +2609,7 @@ mod roundtrip_tests {
Arc::new(dummy_agg.clone()),
vec![lit(1.0_f64)],
Some(Box::new(lit(true))),
+ None,
));
let ctx = SessionContext::new();
diff --git a/datafusion/proto/src/logical_plan/to_proto.rs b/datafusion/proto/src/logical_plan/to_proto.rs
index 0ffc893071..06156c9f40 100644
--- a/datafusion/proto/src/logical_plan/to_proto.rs
+++ b/datafusion/proto/src/logical_plan/to_proto.rs
@@ -622,6 +622,7 @@ impl TryFrom<&Expr> for protobuf::LogicalExprNode {
ref args,
ref distinct,
ref filter,
+ ref order_by,
}) => {
let aggr_function = match fun {
AggregateFunction::ApproxDistinct => {
@@ -679,6 +680,13 @@ impl TryFrom<&Expr> for protobuf::LogicalExprNode {
Some(e) => Some(Box::new(e.as_ref().try_into()?)),
None => None,
},
+ order_by: match order_by {
+ Some(e) => e
+ .iter()
+ .map(|expr| expr.try_into())
+ .collect::<Result<Vec<_>, _>>()?,
+ None => vec![],
+ },
};
Self {
expr_type: Some(ExprType::AggregateExpr(Box::new(aggregate_expr))),
@@ -714,7 +722,12 @@ impl TryFrom<&Expr> for protobuf::LogicalExprNode {
.collect::<Result<Vec<_>, Error>>()?,
})),
},
- Expr::AggregateUDF(expr::AggregateUDF { fun, args, filter }) => Self {
+ Expr::AggregateUDF(expr::AggregateUDF {
+ fun,
+ args,
+ filter,
+ order_by,
+ }) => Self {
expr_type: Some(ExprType::AggregateUdfExpr(Box::new(
protobuf::AggregateUdfExprNode {
fun_name: fun.name.clone(),
@@ -727,6 +740,13 @@ impl TryFrom<&Expr> for protobuf::LogicalExprNode {
Some(e) => Some(Box::new(e.as_ref().try_into()?)),
None => None,
},
+ order_by: match order_by {
+ Some(e) => e
+ .iter()
+ .map(|expr| expr.try_into())
+ .collect::<Result<Vec<_>, _>>()?,
+ None => vec![],
+ },
},
))),
},
diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs
index 8e9ba1f605..1290db0752 100644
--- a/datafusion/proto/src/physical_plan/from_proto.rs
+++ b/datafusion/proto/src/physical_plan/from_proto.rs
@@ -61,6 +61,31 @@ impl From<&protobuf::PhysicalColumn> for Column {
}
}
+/// Parses a physical sort expression from a protobuf.
+///
+/// # Arguments
+///
+/// * `proto` - Input proto with physical sort expression node
+/// * `registry` - A registry knows how to build logical expressions out of user-defined function' names
+/// * `input_schema` - The Arrow schema for the input, used for determining expression data types
+/// when performing type coercion.
+pub fn parse_physical_sort_expr(
+ proto: &protobuf::PhysicalSortExprNode,
+ registry: &dyn FunctionRegistry,
+ input_schema: &Schema,
+) -> Result<PhysicalSortExpr> {
+ if let Some(expr) = &proto.expr {
+ let expr = parse_physical_expr(expr.as_ref(), registry, input_schema)?;
+ let options = SortOptions {
+ descending: !proto.asc,
+ nulls_first: proto.nulls_first,
+ };
+ Ok(PhysicalSortExpr { expr, options })
+ } else {
+ Err(proto_error("Unexpected empty physical expression"))
+ }
+}
+
/// Parses a physical expression from a protobuf.
///
/// # Arguments
diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs
index cd8950e97b..94e26f2cf2 100644
--- a/datafusion/proto/src/physical_plan/mod.rs
+++ b/datafusion/proto/src/physical_plan/mod.rs
@@ -54,7 +54,7 @@ use prost::Message;
use crate::common::proto_error;
use crate::common::{csv_delimiter_to_string, str_to_byte};
use crate::physical_plan::from_proto::{
- parse_physical_expr, parse_protobuf_file_scan_config,
+ parse_physical_expr, parse_physical_sort_expr, parse_protobuf_file_scan_config,
};
use crate::protobuf::physical_aggregate_expr_node::AggregateFunction;
use crate::protobuf::physical_expr_node::ExprType;
@@ -409,13 +409,25 @@ impl AsExecutionPlan for PhysicalPlanNode {
.filter_expr
.iter()
.map(|expr| {
- let x = expr
- .expr
+ expr.expr
.as_ref()
- .map(|e| parse_physical_expr(e, registry, &physical_schema));
- x.transpose()
+ .map(|e| parse_physical_expr(e, registry, &physical_schema))
+ .transpose()
})
.collect::<Result<Vec<_>, _>>()?;
+ let physical_order_by_expr = hash_agg
+ .order_by_expr
+ .iter()
+ .map(|expr| {
+ expr.sort_expr
+ .iter()
+ .map(|e| {
+ parse_physical_sort_expr(e, registry, &physical_schema)
+ })
+ .collect::<Result<Vec<_>>>()
+ .map(|exprs| (!exprs.is_empty()).then_some(exprs))
+ })
+ .collect::<Result<Vec<_>>>()?;
let physical_aggr_expr: Vec<Arc<dyn AggregateExpr>> = hash_agg
.aggr_expr
@@ -473,6 +485,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
PhysicalGroupBy::new(group_expr, null_expr, groups),
physical_aggr_expr,
physical_filter_expr,
+ physical_order_by_expr,
input,
Arc::new((&input_schema).try_into()?),
)?))
@@ -893,6 +906,12 @@ impl AsExecutionPlan for PhysicalPlanNode {
.map(|expr| expr.to_owned().try_into())
.collect::<Result<Vec<_>>>()?;
+ let order_by = exec
+ .order_by_expr()
+ .iter()
+ .map(|expr| expr.to_owned().try_into())
+ .collect::<Result<Vec<_>>>()?;
+
let agg = exec
.aggr_expr()
.iter()
@@ -942,6 +961,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
group_expr_name: group_names,
aggr_expr: agg,
filter_expr: filter,
+ order_by_expr: order_by,
aggr_expr_name: agg_names,
mode: agg_mode as i32,
input: Some(Box::new(input)),
@@ -1425,6 +1445,7 @@ mod roundtrip_tests {
PhysicalGroupBy::new_single(groups.clone()),
aggregates.clone(),
vec![None],
+ vec![None],
Arc::new(EmptyExec::new(false, schema.clone())),
schema,
)?))
@@ -1494,6 +1515,7 @@ mod roundtrip_tests {
PhysicalGroupBy::new_single(groups.clone()),
aggregates.clone(),
vec![None],
+ vec![None],
Arc::new(EmptyExec::new(false, schema.clone())),
schema,
)?),
@@ -1707,6 +1729,7 @@ mod roundtrip_tests {
PhysicalGroupBy::new_single(groups),
aggregates.clone(),
vec![None],
+ vec![None],
Arc::new(EmptyExec::new(false, schema.clone())),
schema,
)?))
diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs
index 90260b231f..f2f65b89f0 100644
--- a/datafusion/proto/src/physical_plan/to_proto.rs
+++ b/datafusion/proto/src/physical_plan/to_proto.rs
@@ -46,7 +46,7 @@ use crate::protobuf;
use crate::protobuf::{physical_aggregate_expr_node, PhysicalSortExprNode, ScalarValue};
use datafusion::logical_expr::BuiltinScalarFunction;
use datafusion::physical_expr::expressions::{DateTimeIntervalExpr, GetIndexedFieldExpr};
-use datafusion::physical_expr::ScalarFunctionExpr;
+use datafusion::physical_expr::{PhysicalSortExpr, ScalarFunctionExpr};
use datafusion::physical_plan::joins::utils::JoinSide;
use datafusion::physical_plan::udaf::AggregateFunctionExpr;
use datafusion_common::{DataFusionError, Result};
@@ -541,3 +541,31 @@ impl TryFrom<Option<Arc<dyn PhysicalExpr>>> for protobuf::MaybeFilter {
}
}
}
+
+impl TryFrom<Option<Vec<PhysicalSortExpr>>> for protobuf::MaybePhysicalSortExprs {
+ type Error = DataFusionError;
+
+ fn try_from(sort_exprs: Option<Vec<PhysicalSortExpr>>) -> Result<Self, Self::Error> {
+ match sort_exprs {
+ None => Ok(protobuf::MaybePhysicalSortExprs { sort_expr: vec![] }),
+ Some(sort_exprs) => Ok(protobuf::MaybePhysicalSortExprs {
+ sort_expr: sort_exprs
+ .into_iter()
+ .map(|sort_expr| sort_expr.try_into())
+ .collect::<Result<Vec<_>>>()?,
+ }),
+ }
+ }
+}
+
+impl TryFrom<PhysicalSortExpr> for protobuf::PhysicalSortExprNode {
+ type Error = DataFusionError;
+
+ fn try_from(sort_expr: PhysicalSortExpr) -> std::result::Result<Self, Self::Error> {
+ Ok(PhysicalSortExprNode {
+ expr: Some(Box::new(sort_expr.expr.try_into()?)),
+ asc: !sort_expr.options.descending,
+ nulls_first: sort_expr.options.nulls_first,
+ })
+ }
+}
diff --git a/datafusion/sql/src/expr/function.rs b/datafusion/sql/src/expr/function.rs
index 482cd5f0c4..2bc3125d73 100644
--- a/datafusion/sql/src/expr/function.rs
+++ b/datafusion/sql/src/expr/function.rs
@@ -113,7 +113,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
let (fun, args) =
self.aggregate_fn_to_expr(fun, function.args, schema, planner_context)?;
return Ok(Expr::AggregateFunction(expr::AggregateFunction::new(
- fun, args, distinct, None,
+ fun, args, distinct, None, None,
)));
};
@@ -128,7 +128,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
if let Some(fm) = self.schema_provider.get_aggregate_meta(&name) {
let args =
self.function_args_to_expr(function.args, schema, planner_context)?;
- return Ok(Expr::AggregateUDF(expr::AggregateUDF::new(fm, args, None)));
+ return Ok(Expr::AggregateUDF(expr::AggregateUDF::new(
+ fm, args, None, None,
+ )));
}
// Special case arrow_cast (as its type is dependent on its argument value)
diff --git a/datafusion/sql/src/expr/mod.rs b/datafusion/sql/src/expr/mod.rs
index a120b3400a..b914149cf4 100644
--- a/datafusion/sql/src/expr/mod.rs
+++ b/datafusion/sql/src/expr/mod.rs
@@ -321,11 +321,17 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
within_group,
} = array_agg;
- if let Some(order_by) = order_by {
- return Err(DataFusionError::NotImplemented(format!(
- "ORDER BY not supported in ARRAY_AGG: {order_by}"
- )));
- }
+ let order_by = if let Some(order_by) = order_by {
+ // TODO: Once sqlparser supports multiple order by clause, handle it
+ // see issue: https://github.com/sqlparser-rs/sqlparser-rs/issues/875
+ Some(vec![self.order_by_to_sort_expr(
+ *order_by,
+ input_schema,
+ planner_context,
+ )?])
+ } else {
+ None
+ };
if let Some(limit) = limit {
return Err(DataFusionError::NotImplemented(format!(
@@ -341,11 +347,11 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
let args =
vec![self.sql_expr_to_logical_expr(*expr, input_schema, planner_context)?];
+
// next, aggregate built-ins
let fun = AggregateFunction::ArrayAgg;
-
Ok(Expr::AggregateFunction(expr::AggregateFunction::new(
- fun, args, distinct, None,
+ fun, args, distinct, None, order_by,
)))
}
@@ -479,6 +485,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
fun,
args,
distinct,
+ order_by,
..
}) => Ok(Expr::AggregateFunction(expr::AggregateFunction::new(
fun,
@@ -489,6 +496,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
schema,
planner_context,
)?)),
+ order_by,
))),
_ => Err(DataFusionError::Internal(
"AggregateExpressionWithFilter expression was not an AggregateFunction"
diff --git a/datafusion/sql/src/utils.rs b/datafusion/sql/src/utils.rs
index 1605433609..dd96f1b2bb 100644
--- a/datafusion/sql/src/utils.rs
+++ b/datafusion/sql/src/utils.rs
@@ -169,6 +169,7 @@ where
args,
distinct,
filter,
+ order_by,
}) => Ok(Expr::AggregateFunction(AggregateFunction::new(
fun.clone(),
args.iter()
@@ -176,6 +177,7 @@ where
.collect::<Result<Vec<Expr>>>()?,
*distinct,
filter.clone(),
+ order_by.clone(),
))),
Expr::WindowFunction(WindowFunction {
fun,
@@ -198,15 +200,19 @@ where
.collect::<Result<Vec<_>>>()?,
window_frame.clone(),
))),
- Expr::AggregateUDF(AggregateUDF { fun, args, filter }) => {
- Ok(Expr::AggregateUDF(AggregateUDF::new(
- fun.clone(),
- args.iter()
- .map(|e| clone_with_replacement(e, replacement_fn))
- .collect::<Result<Vec<Expr>>>()?,
- filter.clone(),
- )))
- }
+ Expr::AggregateUDF(AggregateUDF {
+ fun,
+ args,
+ filter,
+ order_by,
+ }) => Ok(Expr::AggregateUDF(AggregateUDF::new(
+ fun.clone(),
+ args.iter()
+ .map(|e| clone_with_replacement(e, replacement_fn))
+ .collect::<Result<Vec<Expr>>>()?,
+ filter.clone(),
+ order_by.clone(),
+ ))),
Expr::Alias(nested_expr, alias_name) => Ok(Expr::Alias(
Box::new(clone_with_replacement(nested_expr, replacement_fn)?),
alias_name.clone(),
diff --git a/datafusion/substrait/src/logical_plan/consumer.rs b/datafusion/substrait/src/logical_plan/consumer.rs
index 2b8ffde422..24bac58dd5 100644
--- a/datafusion/substrait/src/logical_plan/consumer.rs
+++ b/datafusion/substrait/src/logical_plan/consumer.rs
@@ -278,6 +278,8 @@ pub async fn from_substrait_rel(
input.schema(),
extensions,
filter,
+ // TODO: Add parsing of order_by also
+ None,
distinct,
)
.await
@@ -549,6 +551,7 @@ pub async fn from_substrait_agg_func(
input_schema: &DFSchema,
extensions: &HashMap<u32, &String>,
filter: Option<Box<Expr>>,
+ order_by: Option<Vec<Expr>>,
distinct: bool,
) -> Result<Arc<Expr>> {
let mut args: Vec<Expr> = vec![];
@@ -579,6 +582,7 @@ pub async fn from_substrait_agg_func(
args,
distinct,
filter,
+ order_by,
})))
}
diff --git a/datafusion/substrait/src/logical_plan/producer.rs b/datafusion/substrait/src/logical_plan/producer.rs
index 17d424cea6..0523c51cc3 100644
--- a/datafusion/substrait/src/logical_plan/producer.rs
+++ b/datafusion/substrait/src/logical_plan/producer.rs
@@ -415,7 +415,8 @@ pub fn to_substrait_agg_measure(
),
) -> Result<Measure> {
match expr {
- Expr::AggregateFunction(expr::AggregateFunction { fun, args, distinct, filter }) => {
+ // TODO: Once substrait supports order by, add handling for it.
+ Expr::AggregateFunction(expr::AggregateFunction { fun, args, distinct, filter, order_by: _order_by }) => {
let mut arguments: Vec<FunctionArgument> = vec![];
for arg in args {
arguments.push(FunctionArgument { arg_type: Some(ArgType::Value(to_substrait_rex(arg, schema, extension_info)?)) });
diff --git a/docs/source/user-guide/sql/aggregate_functions.md b/docs/source/user-guide/sql/aggregate_functions.md
index 68c02ef550..d02c733efc 100644
--- a/docs/source/user-guide/sql/aggregate_functions.md
+++ b/docs/source/user-guide/sql/aggregate_functions.md
@@ -194,10 +194,10 @@ sum(expression)
### `array_agg`
-Returns an array created from the expression elements.
+Returns an array created from the expression elements. If ordering requirement is given, elements are inserted in the order of required ordering.
```
-array_agg(expression)
+array_agg(expression [ORDER BY expression])
```
#### Arguments
diff --git a/docs/source/user-guide/sql/select.md b/docs/source/user-guide/sql/select.md
index 68be88d7cf..cb73e08520 100644
--- a/docs/source/user-guide/sql/select.md
+++ b/docs/source/user-guide/sql/select.md
@@ -189,6 +189,15 @@ Example:
SELECT a, b, MAX(c) FROM table GROUP BY a, b
```
+Some aggregation functions accept optional ordering requirement, such as `ARRAY_AGG`. If a requirement is given,
+aggregation is calculated in the order of the requirement.
+
+Example:
+
+```sql
+SELECT a, b, ARRAY_AGG(c, ORDER BY d) FROM table GROUP BY a, b
+```
+
## HAVING clause
Example: