You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "alamb (via GitHub)" <gi...@apache.org> on 2023/05/12 16:53:41 UTC

[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #6332: Add support for ordering sensitive aggregation

alamb commented on code in PR #6332:
URL: https://github.com/apache/arrow-datafusion/pull/6332#discussion_r1192588611


##########
datafusion/core/src/physical_plan/aggregates/mod.rs:
##########
@@ -356,6 +409,23 @@ impl AggregateExec {
         )?;
 
         let schema = Arc::new(schema);
+        let mut aggregator_requirement = None;
+        // Ordering requirement makes sense only in Partial and Single modes.

Review Comment:
   This comment implies that an ORDER BY clause can not be done with a multi-step group by (which makes sense). 
   
   
   Given ordering doesn't make sense for AggregateMode::Final, did you consider returning an error if an ordering was supplied? (to fail fast, rather than ignore the field)?
   
   



##########
datafusion/core/tests/sqllogictests/test_files/groupby.slt:
##########
@@ -2010,3 +2010,201 @@ SELECT a, d,
 
 statement ok
 drop table annotated_data_infinite2;
+
+# create a table for testing
+statement ok
+CREATE TABLE sales_global (zip_code INT,
+          country VARCHAR(3),
+          sn INT,
+          ts TIMESTAMP,
+          currency VARCHAR(3),
+          amount FLOAT
+        ) as VALUES
+          (0, 'GRC', 0, '2022-01-01 06:00:00'::timestamp, 'EUR', 30.0),
+          (1, 'FRA', 1, '2022-01-01 08:00:00'::timestamp, 'EUR', 50.0),
+          (1, 'TUR', 2, '2022-01-01 11:30:00'::timestamp, 'TRY', 75.0),
+          (1, 'FRA', 3, '2022-01-02 12:00:00'::timestamp, 'EUR', 200.0),
+          (1, 'TUR', 4, '2022-01-03 10:00:00'::timestamp, 'TRY', 100.0),
+          (0, 'GRC', 4, '2022-01-03 10:00:00'::timestamp, 'EUR', 80.0)
+
+# test_ordering_sensitive_aggregation
+# ordering sensitive requirement should add a SortExec in the final plan. To satisfy amount ASC
+# in the aggregation
+statement ok
+set datafusion.execution.target_partitions = 1;
+
+statement ok
+set datafusion.optimizer.skip_failed_rules = false;

Review Comment:
   I don't think this  is necessary anymore after https://github.com/apache/arrow-datafusion/pull/6265 from @jackwener where the default was changed. However, it doesn't hurt either. 



##########
datafusion/core/tests/sqllogictests/test_files/groupby.slt:
##########
@@ -2010,3 +2010,201 @@ SELECT a, d,
 
 statement ok
 drop table annotated_data_infinite2;
+
+# create a table for testing
+statement ok
+CREATE TABLE sales_global (zip_code INT,
+          country VARCHAR(3),
+          sn INT,
+          ts TIMESTAMP,
+          currency VARCHAR(3),
+          amount FLOAT
+        ) as VALUES
+          (0, 'GRC', 0, '2022-01-01 06:00:00'::timestamp, 'EUR', 30.0),
+          (1, 'FRA', 1, '2022-01-01 08:00:00'::timestamp, 'EUR', 50.0),
+          (1, 'TUR', 2, '2022-01-01 11:30:00'::timestamp, 'TRY', 75.0),
+          (1, 'FRA', 3, '2022-01-02 12:00:00'::timestamp, 'EUR', 200.0),
+          (1, 'TUR', 4, '2022-01-03 10:00:00'::timestamp, 'TRY', 100.0),
+          (0, 'GRC', 4, '2022-01-03 10:00:00'::timestamp, 'EUR', 80.0)
+
+# test_ordering_sensitive_aggregation
+# ordering sensitive requirement should add a SortExec in the final plan. To satisfy amount ASC
+# in the aggregation
+statement ok
+set datafusion.execution.target_partitions = 1;
+
+statement ok
+set datafusion.optimizer.skip_failed_rules = false;
+
+query TT
+EXPLAIN SELECT country, (ARRAY_AGG(amount ORDER BY amount ASC)) AS amounts
+  FROM sales_global
+  GROUP BY country
+----
+logical_plan
+Projection: sales_global.country, ARRAYAGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST] AS amounts
+  Aggregate: groupBy=[[sales_global.country]], aggr=[[ARRAYAGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]]]
+    TableScan: sales_global projection=[country, amount]
+physical_plan
+ProjectionExec: expr=[country@0 as country, ARRAYAGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@1 as amounts]
+  AggregateExec: mode=Single, gby=[country@0 as country], aggr=[ARRAYAGG(sales_global.amount)]
+    SortExec: expr=[amount@1 ASC NULLS LAST]
+      MemoryExec: partitions=1, partition_sizes=[1]
+
+
+query T?
+SELECT country, (ARRAY_AGG(amount ORDER BY amount ASC)) AS amounts
+  FROM sales_global
+  GROUP BY country
+----
+GRC [30.0, 80.0]
+FRA [50.0, 200.0]
+TUR [75.0, 100.0]
+
+# test_ordering_sensitive_aggregation2
+# We should be able to satisfy the finest requirement among all aggregators, when we have multiple aggregators.

Review Comment:
   This test verifies that an aggregate with no sort and a single expr works
   
   I think it would be valuable to also test more than one column, something like:
   
   ```
   ARRAY_AGG(s.amount ORDER BY s.amount DESC, ts ASC) AS amounts,
   SUM(s.amount ORDER BY s.amount DESC) AS sum1
   ```
   
   



##########
datafusion/core/tests/sqllogictests/test_files/groupby.slt:
##########
@@ -2010,3 +2010,201 @@ SELECT a, d,
 
 statement ok
 drop table annotated_data_infinite2;
+
+# create a table for testing
+statement ok
+CREATE TABLE sales_global (zip_code INT,
+          country VARCHAR(3),
+          sn INT,
+          ts TIMESTAMP,
+          currency VARCHAR(3),
+          amount FLOAT
+        ) as VALUES
+          (0, 'GRC', 0, '2022-01-01 06:00:00'::timestamp, 'EUR', 30.0),
+          (1, 'FRA', 1, '2022-01-01 08:00:00'::timestamp, 'EUR', 50.0),
+          (1, 'TUR', 2, '2022-01-01 11:30:00'::timestamp, 'TRY', 75.0),
+          (1, 'FRA', 3, '2022-01-02 12:00:00'::timestamp, 'EUR', 200.0),
+          (1, 'TUR', 4, '2022-01-03 10:00:00'::timestamp, 'TRY', 100.0),
+          (0, 'GRC', 4, '2022-01-03 10:00:00'::timestamp, 'EUR', 80.0)
+
+# test_ordering_sensitive_aggregation
+# ordering sensitive requirement should add a SortExec in the final plan. To satisfy amount ASC
+# in the aggregation
+statement ok
+set datafusion.execution.target_partitions = 1;
+
+statement ok
+set datafusion.optimizer.skip_failed_rules = false;
+
+query TT
+EXPLAIN SELECT country, (ARRAY_AGG(amount ORDER BY amount ASC)) AS amounts
+  FROM sales_global
+  GROUP BY country
+----
+logical_plan
+Projection: sales_global.country, ARRAYAGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST] AS amounts
+  Aggregate: groupBy=[[sales_global.country]], aggr=[[ARRAYAGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]]]
+    TableScan: sales_global projection=[country, amount]
+physical_plan
+ProjectionExec: expr=[country@0 as country, ARRAYAGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@1 as amounts]
+  AggregateExec: mode=Single, gby=[country@0 as country], aggr=[ARRAYAGG(sales_global.amount)]
+    SortExec: expr=[amount@1 ASC NULLS LAST]
+      MemoryExec: partitions=1, partition_sizes=[1]
+
+
+query T?
+SELECT country, (ARRAY_AGG(amount ORDER BY amount ASC)) AS amounts
+  FROM sales_global
+  GROUP BY country
+----
+GRC [30.0, 80.0]
+FRA [50.0, 200.0]
+TUR [75.0, 100.0]
+
+# test_ordering_sensitive_aggregation2
+# We should be able to satisfy the finest requirement among all aggregators, when we have multiple aggregators.
+# Hence final plan should have SortExec: expr=[amount@1 DESC] to satisfy array_agg requirement.
+query TT
+EXPLAIN SELECT s.country, ARRAY_AGG(s.amount ORDER BY s.amount DESC) AS amounts,
+          SUM(s.amount) AS sum1
+        FROM sales_global AS s
+        GROUP BY s.country
+----
+logical_plan
+Projection: s.country, ARRAYAGG(s.amount) ORDER BY [s.amount DESC NULLS FIRST] AS amounts, SUM(s.amount) AS sum1
+  Aggregate: groupBy=[[s.country]], aggr=[[ARRAYAGG(s.amount) ORDER BY [s.amount DESC NULLS FIRST], SUM(s.amount)]]
+    SubqueryAlias: s
+      TableScan: sales_global projection=[country, amount]
+physical_plan
+ProjectionExec: expr=[country@0 as country, ARRAYAGG(s.amount) ORDER BY [s.amount DESC NULLS FIRST]@1 as amounts, SUM(s.amount)@2 as sum1]
+  AggregateExec: mode=Single, gby=[country@0 as country], aggr=[ARRAYAGG(s.amount), SUM(s.amount)]
+    SortExec: expr=[amount@1 DESC]
+      MemoryExec: partitions=1, partition_sizes=[1]
+
+query T?R
+SELECT s.country, ARRAY_AGG(s.amount ORDER BY s.amount DESC) AS amounts,
+    SUM(s.amount) AS sum1
+  FROM sales_global AS s
+  GROUP BY s.country
+----
+FRA [200.0, 50.0] 250
+TUR [100.0, 75.0] 175
+GRC [80.0, 30.0] 110
+
+# test_ordering_sensitive_aggregation3
+# When different aggregators have conflicting requirements, we cannot satisfy all of them in current implementation.

Review Comment:
   👍 



##########
docs/source/user-guide/sql/aggregate_functions.md:
##########
@@ -124,10 +124,10 @@ sum(expression)
 
 ### `array_agg`
 
-Returns an array created from the expression elements.
+Returns an array created from the expression elements. If ordering requirement is given, elements are inserted in the order of required ordering.

Review Comment:
   Thank you



##########
datafusion/core/src/physical_plan/aggregates/mod.rs:
##########
@@ -337,13 +341,62 @@ fn output_group_expr_helper(group_by: &PhysicalGroupBy) -> Vec<Arc<dyn PhysicalE
         .collect()
 }
 
+/// This function gets the finest ordering requirement among all the aggregation
+/// functions. If requirements are conflicting, (i.e. we can not compute the
+/// aggregations in a single [`AggregateExec`]), the function returns an error.
+fn get_finest_requirement<
+    F: Fn() -> EquivalenceProperties,
+    F2: Fn() -> OrderingEquivalenceProperties,
+>(
+    order_by_expr: &[Option<Vec<PhysicalSortExpr>>],
+    eq_properties: F,
+    ordering_eq_properties: F2,
+) -> Result<Option<Vec<PhysicalSortExpr>>> {
+    let mut result: Option<Vec<PhysicalSortExpr>> = None;
+    for fn_reqs in order_by_expr.iter().flatten() {
+        if let Some(result) = &mut result {
+            if ordering_satisfy_concrete(
+                result,
+                fn_reqs,
+                &eq_properties,
+                &ordering_eq_properties,
+            ) {
+                // Do not update the result as it already satisfies current
+                // function's requirement:
+                continue;
+            }
+            if ordering_satisfy_concrete(
+                fn_reqs,
+                result,
+                &eq_properties,
+                &ordering_eq_properties,
+            ) {
+                // Update result with current function's requirements, as it is
+                // a finer requirement than what we currently have.
+                *result = fn_reqs.clone();
+                continue;
+            }
+            // If neither of the requirements satisfy the other, this means
+            // requirements are conflicting. Currently, we do not support
+            // conflicting requirements.
+            return Err(DataFusionError::Plan(
+                "Conflicting ordering requirements in aggregate functions".to_string(),
+            ));
+        } else {
+            result = Some(fn_reqs.clone());
+        }
+    }
+    Ok(result)
+}
+
 impl AggregateExec {
     /// Create a new hash aggregate execution plan
     pub fn try_new(
         mode: AggregateMode,
         group_by: PhysicalGroupBy,
         aggr_expr: Vec<Arc<dyn AggregateExpr>>,
         filter_expr: Vec<Option<Arc<dyn PhysicalExpr>>>,
+        order_by_expr: Vec<Option<Vec<PhysicalSortExpr>>>,

Review Comment:
   this API change is unfortunate (that adding a new argument requires changing so many files). 
   
   Maybe (as part of a future PR) we could make it more builder style (I can file a issue for this if you think it is a reasonable idea):
   
   ```rust
     let agg_exec = AggregateExecBuilder::new(mode, group_by)
        .with_filter_exprs(exprs)
       .build()?
   ```
   
   



##########
datafusion/core/src/physical_plan/aggregates/mod.rs:
##########
@@ -356,6 +409,23 @@ impl AggregateExec {
         )?;
 
         let schema = Arc::new(schema);
+        let mut aggregator_requirement = None;
+        // Ordering requirement makes sense only in Partial and Single modes.
+        // In other modes, all groups are collapsed, therefore their input schema
+        // can not contain expressions in the requirement.
+        if mode == AggregateMode::Partial || mode == AggregateMode::Single {
+            let requirement = get_finest_requirement(
+                &order_by_expr,
+                || input.equivalence_properties(),
+                || input.ordering_equivalence_properties(),
+            )?;
+            aggregator_requirement = requirement.map(|exprs| {

Review Comment:
   I think this code could be simplified using https://docs.rs/datafusion-physical-expr/24.0.0/datafusion_physical_expr/struct.PhysicalSortRequirement.html#method.from_sort_exprs



##########
datafusion/core/tests/sqllogictests/test_files/groupby.slt:
##########
@@ -2010,3 +2010,201 @@ SELECT a, d,
 
 statement ok
 drop table annotated_data_infinite2;
+
+# create a table for testing
+statement ok
+CREATE TABLE sales_global (zip_code INT,
+          country VARCHAR(3),
+          sn INT,
+          ts TIMESTAMP,
+          currency VARCHAR(3),
+          amount FLOAT
+        ) as VALUES
+          (0, 'GRC', 0, '2022-01-01 06:00:00'::timestamp, 'EUR', 30.0),
+          (1, 'FRA', 1, '2022-01-01 08:00:00'::timestamp, 'EUR', 50.0),
+          (1, 'TUR', 2, '2022-01-01 11:30:00'::timestamp, 'TRY', 75.0),
+          (1, 'FRA', 3, '2022-01-02 12:00:00'::timestamp, 'EUR', 200.0),
+          (1, 'TUR', 4, '2022-01-03 10:00:00'::timestamp, 'TRY', 100.0),
+          (0, 'GRC', 4, '2022-01-03 10:00:00'::timestamp, 'EUR', 80.0)
+
+# test_ordering_sensitive_aggregation
+# ordering sensitive requirement should add a SortExec in the final plan. To satisfy amount ASC
+# in the aggregation
+statement ok
+set datafusion.execution.target_partitions = 1;
+
+statement ok
+set datafusion.optimizer.skip_failed_rules = false;
+
+query TT
+EXPLAIN SELECT country, (ARRAY_AGG(amount ORDER BY amount ASC)) AS amounts
+  FROM sales_global
+  GROUP BY country
+----
+logical_plan
+Projection: sales_global.country, ARRAYAGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST] AS amounts
+  Aggregate: groupBy=[[sales_global.country]], aggr=[[ARRAYAGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]]]
+    TableScan: sales_global projection=[country, amount]
+physical_plan
+ProjectionExec: expr=[country@0 as country, ARRAYAGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@1 as amounts]
+  AggregateExec: mode=Single, gby=[country@0 as country], aggr=[ARRAYAGG(sales_global.amount)]
+    SortExec: expr=[amount@1 ASC NULLS LAST]
+      MemoryExec: partitions=1, partition_sizes=[1]
+
+
+query T?
+SELECT country, (ARRAY_AGG(amount ORDER BY amount ASC)) AS amounts
+  FROM sales_global
+  GROUP BY country
+----
+GRC [30.0, 80.0]
+FRA [50.0, 200.0]
+TUR [75.0, 100.0]
+
+# test_ordering_sensitive_aggregation2
+# We should be able to satisfy the finest requirement among all aggregators, when we have multiple aggregators.
+# Hence final plan should have SortExec: expr=[amount@1 DESC] to satisfy array_agg requirement.
+query TT
+EXPLAIN SELECT s.country, ARRAY_AGG(s.amount ORDER BY s.amount DESC) AS amounts,
+          SUM(s.amount) AS sum1
+        FROM sales_global AS s
+        GROUP BY s.country
+----
+logical_plan
+Projection: s.country, ARRAYAGG(s.amount) ORDER BY [s.amount DESC NULLS FIRST] AS amounts, SUM(s.amount) AS sum1
+  Aggregate: groupBy=[[s.country]], aggr=[[ARRAYAGG(s.amount) ORDER BY [s.amount DESC NULLS FIRST], SUM(s.amount)]]
+    SubqueryAlias: s
+      TableScan: sales_global projection=[country, amount]
+physical_plan
+ProjectionExec: expr=[country@0 as country, ARRAYAGG(s.amount) ORDER BY [s.amount DESC NULLS FIRST]@1 as amounts, SUM(s.amount)@2 as sum1]
+  AggregateExec: mode=Single, gby=[country@0 as country], aggr=[ARRAYAGG(s.amount), SUM(s.amount)]
+    SortExec: expr=[amount@1 DESC]
+      MemoryExec: partitions=1, partition_sizes=[1]
+
+query T?R
+SELECT s.country, ARRAY_AGG(s.amount ORDER BY s.amount DESC) AS amounts,
+    SUM(s.amount) AS sum1
+  FROM sales_global AS s
+  GROUP BY s.country
+----
+FRA [200.0, 50.0] 250
+TUR [100.0, 75.0] 175
+GRC [80.0, 30.0] 110
+
+# test_ordering_sensitive_aggregation3
+# When different aggregators have conflicting requirements, we cannot satisfy all of them in current implementation.
+# test below should raise Plan Error.
+statement error DataFusion error: Error during planning: Conflicting ordering requirements in aggregate functions
+SELECT ARRAY_AGG(s.amount ORDER BY s.amount DESC) AS amounts,
+    ARRAY_AGG(s.amount ORDER BY s.amount ASC) AS amounts2,
+    ARRAY_AGG(s.amount ORDER BY s.sn ASC) AS amounts3
+  FROM sales_global AS s
+  GROUP BY s.country
+
+# test_ordering_sensitive_aggregation4
+# If aggregators can work with bounded memory (FullyOrdered or PartiallyOrdered mode), we should be append requirement to

Review Comment:
   These comments really help -- thank you



##########
datafusion/core/src/physical_plan/aggregates/mod.rs:
##########
@@ -337,13 +341,62 @@ fn output_group_expr_helper(group_by: &PhysicalGroupBy) -> Vec<Arc<dyn PhysicalE
         .collect()
 }
 
+/// This function gets the finest ordering requirement among all the aggregation
+/// functions. If requirements are conflicting, (i.e. we can not compute the
+/// aggregations in a single [`AggregateExec`]), the function returns an error.
+fn get_finest_requirement<
+    F: Fn() -> EquivalenceProperties,
+    F2: Fn() -> OrderingEquivalenceProperties,
+>(
+    order_by_expr: &[Option<Vec<PhysicalSortExpr>>],
+    eq_properties: F,
+    ordering_eq_properties: F2,
+) -> Result<Option<Vec<PhysicalSortExpr>>> {
+    let mut result: Option<Vec<PhysicalSortExpr>> = None;
+    for fn_reqs in order_by_expr.iter().flatten() {
+        if let Some(result) = &mut result {
+            if ordering_satisfy_concrete(
+                result,
+                fn_reqs,
+                &eq_properties,
+                &ordering_eq_properties,
+            ) {
+                // Do not update the result as it already satisfies current
+                // function's requirement:
+                continue;
+            }
+            if ordering_satisfy_concrete(
+                fn_reqs,
+                result,
+                &eq_properties,
+                &ordering_eq_properties,
+            ) {
+                // Update result with current function's requirements, as it is
+                // a finer requirement than what we currently have.
+                *result = fn_reqs.clone();
+                continue;
+            }
+            // If neither of the requirements satisfy the other, this means
+            // requirements are conflicting. Currently, we do not support
+            // conflicting requirements.
+            return Err(DataFusionError::Plan(

Review Comment:
   I think this should be a `DataFusionError::NotImplemented` because we could imagine, in the future, being fancier and supporting conflicting sorting requirements 



##########
datafusion/sql/src/expr/mod.rs:
##########
@@ -321,11 +321,16 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
             within_group,
         } = array_agg;
 
-        if let Some(order_by) = order_by {
-            return Err(DataFusionError::NotImplemented(format!(
-                "ORDER BY not supported in ARRAY_AGG: {order_by}"
-            )));
-        }
+        let order_by = if let Some(order_by) = order_by {
+            // TODO: Once sqlparser supports multiple order by clause, handle it

Review Comment:
   If there is a ticket, it would be nice to put a link here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org