You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "jackwener (via GitHub)" <gi...@apache.org> on 2023/04/16 14:32:25 UTC

[GitHub] [arrow-datafusion] jackwener commented on a diff in pull request #5749: Add ResolveGroupingAnalytics analyzer rule

jackwener commented on code in PR #5749:
URL: https://github.com/apache/arrow-datafusion/pull/5749#discussion_r1167927918


##########
datafusion/sql/tests/integration_test.rs:
##########
@@ -2856,8 +2856,8 @@ fn aggregate_with_rollup() {
     let sql =
         "SELECT id, state, age, COUNT(*) FROM person GROUP BY id, ROLLUP (state, age)";
     let expected = "Projection: person.id, person.state, person.age, COUNT(UInt8(1))\
-    \n  Aggregate: groupBy=[[GROUPING SETS ((person.id), (person.id, person.state), (person.id, person.state, person.age))]], aggr=[[COUNT(UInt8(1))]]\
-    \n    TableScan: person";
+        \n  Aggregate: groupBy=[[person.id, ROLLUP (person.state, person.age)]], aggr=[[COUNT(UInt8(1))]]\

Review Comment:
   Nice



##########
datafusion/physical-expr/src/expressions/column.rs:
##########
@@ -202,6 +202,77 @@ impl PartialEq<dyn Any> for UnKnownColumn {
     }
 }
 
+#[derive(Debug, Hash, PartialEq, Eq, Clone)]

Review Comment:
   👍



##########
datafusion/core/src/physical_plan/aggregates/mod.rs:
##########
@@ -689,23 +710,61 @@ fn evaluate_group_by(
         })
         .collect::<Result<Vec<_>>>()?;
 
-    Ok(group_by
-        .groups
-        .iter()
-        .map(|group| {
-            group
-                .iter()
-                .enumerate()
-                .map(|(idx, is_null)| {
-                    if *is_null {
-                        null_exprs[idx].clone()
-                    } else {
-                        exprs[idx].clone()
-                    }
-                })
-                .collect()
-        })
-        .collect())
+    if !group_by.hidden_grouping_set_expr().is_empty() {
+        let hidden_exprs_value: Vec<ArrayRef> = group_by
+            .hidden_grouping_set_expr
+            .iter()
+            .map(|(expr, _)| {
+                let value = expr.evaluate(batch)?;
+                Ok(value.into_array(batch.num_rows()))
+            })
+            .collect::<Result<Vec<_>>>()?;
+
+        let chunk_size = hidden_exprs_value.len() / group_by.groups.len();
+        let hidden_expr_value_chunks =
+            hidden_exprs_value.chunks(chunk_size).collect::<Vec<_>>();
+
+        Ok(group_by
+            .groups
+            .iter()
+            .enumerate()
+            .map(|(groud_id, group)| {
+                let mut group_data = group
+                    .iter()
+                    .enumerate()
+                    .map(|(idx, is_null)| {
+                        if *is_null {
+                            null_exprs_value[idx].clone()
+                        } else {
+                            exprs_value[idx].clone()
+                        }
+                    })
+                    .collect::<Vec<_>>();

Review Comment:
   we can extract a function named `get_group_data`.
   



##########
datafusion/optimizer/src/push_down_filter.rs:
##########
@@ -649,7 +649,9 @@ impl OptimizerRule for PushDownFilter {
                 let mut push_predicates = vec![];
                 for expr in predicates {
                     let cols = expr.to_columns()?;
-                    if cols.iter().all(|c| group_expr_columns.contains(c)) {
+                    if !expr.contains_hidden_columns()
+                        && cols.iter().all(|c| group_expr_columns.contains(c))
+                    {

Review Comment:
   `hiddent columns` may influent optimizer rule, it may cause some bug.
   We should add more test to cover it in the following PR.



##########
datafusion/expr/src/aggregate_function.rs:
##########
@@ -63,12 +63,19 @@ pub enum AggregateFunction {
     ApproxMedian,
     /// Grouping
     Grouping,
+    /// GroupingID
+    GroupingId,
 }
 
 impl fmt::Display for AggregateFunction {
     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
         // uppercase of the debug.
-        write!(f, "{}", format!("{self:?}").to_uppercase())
+        match self {
+            AggregateFunction::GroupingId => {
+                write!(f, "GROUPING_ID")
+            }

Review Comment:
   look like we don't need it



##########
datafusion/core/src/physical_plan/aggregates/mod.rs:
##########
@@ -689,23 +710,61 @@ fn evaluate_group_by(
         })
         .collect::<Result<Vec<_>>>()?;
 
-    Ok(group_by
-        .groups
-        .iter()
-        .map(|group| {
-            group
-                .iter()
-                .enumerate()
-                .map(|(idx, is_null)| {
-                    if *is_null {
-                        null_exprs[idx].clone()
-                    } else {
-                        exprs[idx].clone()
-                    }
-                })
-                .collect()
-        })
-        .collect())
+    if !group_by.hidden_grouping_set_expr().is_empty() {
+        let hidden_exprs_value: Vec<ArrayRef> = group_by
+            .hidden_grouping_set_expr
+            .iter()
+            .map(|(expr, _)| {
+                let value = expr.evaluate(batch)?;
+                Ok(value.into_array(batch.num_rows()))
+            })
+            .collect::<Result<Vec<_>>>()?;
+
+        let chunk_size = hidden_exprs_value.len() / group_by.groups.len();
+        let hidden_expr_value_chunks =
+            hidden_exprs_value.chunks(chunk_size).collect::<Vec<_>>();
+
+        Ok(group_by
+            .groups
+            .iter()
+            .enumerate()
+            .map(|(groud_id, group)| {
+                let mut group_data = group
+                    .iter()
+                    .enumerate()
+                    .map(|(idx, is_null)| {
+                        if *is_null {
+                            null_exprs_value[idx].clone()
+                        } else {
+                            exprs_value[idx].clone()
+                        }
+                    })
+                    .collect::<Vec<_>>();
+                for data in hidden_expr_value_chunks[groud_id] {
+                    group_data.push(data.clone());
+                }
+                group_data
+            })
+            .collect())
+    } else {
+        Ok(group_by
+            .groups
+            .iter()
+            .map(|group| {
+                group
+                    .iter()
+                    .enumerate()
+                    .map(|(idx, is_null)| {
+                        if *is_null {
+                            null_exprs_value[idx].clone()
+                        } else {
+                            exprs_value[idx].clone()
+                        }
+                    })
+                    .collect::<Vec<_>>()

Review Comment:
   use function `get_group_data `



##########
datafusion/expr/src/expr.rs:
##########
@@ -1364,6 +1423,8 @@ fn create_name(e: &Expr) -> Result<String> {
             "Create name does not support qualified wildcard".to_string(),
         )),
         Expr::Placeholder { id, .. } => Ok((*id).to_string()),
+        Expr::HiddenColumn(_, c) => Ok(format!("#{}", c)),
+        Expr::HiddenExpr(first, _) => Ok(format!("#{}", first)),

Review Comment:
   Should `HiddenExpe` be `Ok(format!("{}", first))`? look like `#` is wrong.



##########
datafusion/core/tests/sql/group_by.rs:
##########
@@ -110,6 +110,28 @@ async fn csv_query_group_by_boolean() -> Result<()> {
     Ok(())
 }
 
+#[tokio::test]

Review Comment:
   A future ticket: add some test in `slt`



##########
datafusion/core/src/physical_plan/aggregates/mod.rs:
##########
@@ -86,7 +86,11 @@ pub enum AggregateMode {
 #[derive(Clone, Debug, Default)]
 pub struct PhysicalGroupBy {
     /// Distinct (Physical Expr, Alias) in the grouping set
-    expr: Vec<(Arc<dyn PhysicalExpr>, String)>,
+    grouping_set_expr: Vec<(Arc<dyn PhysicalExpr>, String)>,
+    /// Hidden grouping set expr in the grouping set
+    hidden_grouping_set_expr: Vec<(Arc<dyn PhysicalExpr>, String)>,
+    /// Distinct result expr for the grouping set, used to generate output schema
+    result_expr: Vec<(Arc<dyn PhysicalExpr>, String)>,

Review Comment:
   I have a question.
   
   Is `result_expr` a part of `grouping_set_expr`?
   If yes, maybe we can use a index vec to point `grouping_set_expr` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org