You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "alamb (via GitHub)" <gi...@apache.org> on 2023/05/31 20:30:44 UTC

[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #6482: Resolve contradictory requirements by conversion of ordering sensitive aggregators

alamb commented on code in PR #6482:
URL: https://github.com/apache/arrow-datafusion/pull/6482#discussion_r1212262351


##########
datafusion/core/src/physical_plan/aggregates/mod.rs:
##########
@@ -339,40 +340,96 @@ fn output_group_expr_helper(group_by: &PhysicalGroupBy) -> Vec<Arc<dyn PhysicalE
         .collect()
 }
 
+/// This function returns the ordering requirement of the first non-reversible
+/// order-sensitive aggregate function such as ARRAY_AGG. This requirement serves
+/// as the initial requirement while calculating the finest requirement among all
+/// aggregate functions. If this function returns `None`, it means there is no
+/// hard ordering requirement for the aggregate functions (in terms of direction).
+/// Then, we can generate two alternative requirements with opposite directions.
+fn get_init_req(
+    aggr_expr: &[Arc<dyn AggregateExpr>],
+    order_by_expr: &[Option<LexOrdering>],
+) -> Option<LexOrdering> {
+    for (aggr_expr, fn_reqs) in aggr_expr.iter().zip(order_by_expr.iter()) {
+        // If the aggregation function is a non-reversible order-sensitive function
+        // and there is a hard requirement, choose first such requirement:
+        if is_order_sensitive(aggr_expr)
+            && aggr_expr.reverse_expr().is_none()
+            && fn_reqs.is_some()
+        {
+            return fn_reqs.clone();
+        }
+    }
+    None
+}
+
+fn get_finer_ordering<

Review Comment:
   I wonder if putting this check in `datafusion_physical_expr::utils` would make it easier to discover if it is needed in the future?



##########
datafusion/physical-expr/src/sort_expr.rs:
##########
@@ -214,5 +214,8 @@ fn to_str(options: &SortOptions) -> &str {
     }
 }
 
-/// `LexOrdering` is a type alias for lexicographical ordering definition `Vec<PhysicalSortExpr>`
+///`LexOrdering` is a type alias for lexicographical ordering definition`Vec<PhysicalSortExpr>`
 pub type LexOrdering = Vec<PhysicalSortExpr>;
+
+///`LexOrderingReq` is a type alias for lexicographical ordering requirement definition`Vec<PhysicalSortRequirement>`

Review Comment:
   👍 



##########
datafusion/core/src/physical_plan/aggregates/mod.rs:
##########
@@ -397,14 +454,111 @@ fn is_order_sensitive(aggr_expr: &Arc<dyn AggregateExpr>) -> bool {
         || aggr_expr.as_any().is::<ArrayAgg>()
 }
 
+/// Calculate the required input ordering for the [`AggregateExec`] by considering

Review Comment:
   I know you didn't add it in this PR, but it seems to me like 
   
   ```rust
   /// Checks whether the given aggregate expression is order-sensitive.
   /// For instance, a `SUM` aggregation doesn't depend on the order of its inputs.
   /// However, a `FirstAgg` depends on the input ordering (if the order changes,
   /// the first value in the list would change).
   fn is_order_sensitive(aggr_expr: &Arc<dyn AggregateExpr>) -> bool {
       aggr_expr.as_any().is::<FirstValue>()
           || aggr_expr.as_any().is::<LastValue>()
           || aggr_expr.as_any().is::<ArrayAgg>()
   }
   ```
   
   Might be better as a function of `AggregateExpr` (so that as people add new `AggregateExpr` they know there is some extra behavior used in the codebase) otherwise they will need to find this hard coded list
   
   



##########
datafusion/core/tests/sqllogictests/test_files/groupby.slt:
##########
@@ -2384,3 +2384,197 @@ SELECT s.country, ARRAY_AGG(s.amount ORDER BY s.country DESC, s.amount DESC) AS
 FRA [200.0, 50.0] 250
 GRC [80.0, 30.0] 110
 TUR [100.0, 75.0] 175
+
+# test_reverse_aggregate_expr
+# Some of the Aggregators can be reversed, by this way we can still run aggregators
+# that have contradictory requirements at first glance.
+query TT
+EXPLAIN SELECT country, ARRAY_AGG(amount ORDER BY amount DESC) AS amounts,
+  FIRST_VALUE(amount ORDER BY amount ASC) AS fv1,
+  LAST_VALUE(amount ORDER BY amount DESC) AS fv2
+  FROM sales_global
+  GROUP BY country
+----
+logical_plan
+Projection: sales_global.country, ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST] AS amounts, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST] AS fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST] AS fv2
+--Aggregate: groupBy=[[sales_global.country]], aggr=[[ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST], FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST]]]
+----TableScan: sales_global projection=[country, amount]
+physical_plan
+ProjectionExec: expr=[country@0 as country, ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST]@1 as amounts, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@2 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST]@3 as fv2]
+--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[ARRAY_AGG(sales_global.amount), FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)]

Review Comment:
   I don't understand this plan -- if the sort is by `amount1 DESC` then isn't 
   
   I am expecting that these are the same:
   
   ```sql
     FIRST_VALUE(amount ORDER BY amount ASC) AS fv1,
     LAST_VALUE(amount ORDER BY amount DESC) AS fv2
   ```
   
   So when the orderby is rewritten to be `ORDER BY amount1 DESC` I expect to see that 
   
   ```sql
     LAST_VALUE(amount ORDER BY amount DESC) AS fv1, -- this got switched
     LAST_VALUE(amount ORDER BY amount DESC) AS fv2
   ```
   
   However, the query seems to get the right answer, so I must be missing something
   



##########
datafusion/core/tests/sqllogictests/test_files/groupby.slt:
##########
@@ -2384,3 +2384,197 @@ SELECT s.country, ARRAY_AGG(s.amount ORDER BY s.country DESC, s.amount DESC) AS
 FRA [200.0, 50.0] 250
 GRC [80.0, 30.0] 110
 TUR [100.0, 75.0] 175
+
+# test_reverse_aggregate_expr
+# Some of the Aggregators can be reversed, by this way we can still run aggregators

Review Comment:
   Maybe mention that "we can still run aggregators without resorting"?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org