You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "crepererum (via GitHub)" <gi...@apache.org> on 2023/04/26 15:46:22 UTC

[GitHub] [arrow-datafusion] crepererum opened a new pull request, #6129: fix: `common_subexpr_eliminate` and aggregates

crepererum opened a new pull request, #6129:
URL: https://github.com/apache/arrow-datafusion/pull/6129

   # Which issue does this PR close?
   Fixes #6116.
   
   # Rationale for this change
   `common_subexpr_eliminate` MUST NOT push aggregate functions into the `LogicalPlan::Aggregate` child.
   
   # What changes are included in this PR?
   Fixed and improved logic for aggregates.
   
   # Are these changes tested?
   - existing tests pass (modulo `aggregate`)
   - fixed and extended `aggregate` test
   
   # Are there any user-facing changes?
   Probably no changes for SQL users. This mostly affects Rust API users that manually construct logical plans (e.g. for other query languages).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #6129: fix: `common_subexpr_eliminate` and aggregates

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #6129:
URL: https://github.com/apache/arrow-datafusion/pull/6129#issuecomment-1523666499

   cc @waynexia 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] crepererum commented on pull request #6129: fix: `common_subexpr_eliminate` and aggregates

Posted by "crepererum (via GitHub)" <gi...@apache.org>.
crepererum commented on PR #6129:
URL: https://github.com/apache/arrow-datafusion/pull/6129#issuecomment-1523779888

   I'm pretty sure it's unrelated:
   
   ```text
   error[E0425]: cannot find value `env_var` in crate `gix_path`
     --> /usr/local/cargo/registry/src/github.com-1ecc6299db9ec823/gix-config-0.20.1/src/file/init/comfort.rs:31:54
      |
   31 |                     .storage_location(&mut gix_path::env_var)
      |                                                      ^^^^^^^ not found in `gix_path`
   
   error[E0425]: cannot find function `home_dir` in crate `gix_path`
     --> /usr/local/cargo/registry/src/github.com-1ecc6299db9ec823/gix-config-0.20.1/src/file/init/comfort.rs:44:30
      |
   44 |         let home = gix_path::home_dir();
      |                              ^^^^^^^^ not found in `gix_path`
      |
   help: consider importing one of these items
      |
   1  | use gix_path::env::home_dir;
      |
   1  | use std::env::home_dir;
      |
   help: if you import `home_dir`, refer to it directly
      |
   44 -         let home = gix_path::home_dir();
   44 +         let home = home_dir();
      |
   
   ....
   ```
   
   I can debug this tomorrow if nobody beats me on that.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] crepererum commented on a diff in pull request #6129: fix: `common_subexpr_eliminate` and aggregates

Posted by "crepererum (via GitHub)" <gi...@apache.org>.
crepererum commented on code in PR #6129:
URL: https://github.com/apache/arrow-datafusion/pull/6129#discussion_r1180181390


##########
datafusion/optimizer/src/common_subexpr_eliminate.rs:
##########
@@ -708,22 +889,149 @@ mod test {
     fn aggregate() -> Result<()> {
         let table_scan = test_table_scan()?;
 
-        let plan = LogicalPlanBuilder::from(table_scan)
+        let return_type: ReturnTypeFunction = Arc::new(|inputs| {
+            assert_eq!(inputs, &[DataType::UInt32]);
+            Ok(Arc::new(DataType::UInt32))
+        });
+        let accumulator: AccumulatorFunctionImplementation =
+            Arc::new(|_| unimplemented!());
+        let state_type: StateTypeFunction = Arc::new(|_| unimplemented!());
+        let udf_agg = |inner: Expr| Expr::AggregateUDF {
+            fun: Arc::new(AggregateUDF::new(
+                "my_agg",
+                &Signature::exact(vec![DataType::UInt32], Volatility::Stable),
+                &return_type,
+                &accumulator,
+                &state_type,
+            )),
+            args: vec![inner],
+            filter: None,
+        };
+
+        // test: common aggregates
+        let plan = LogicalPlanBuilder::from(table_scan.clone())
+            .aggregate(
+                iter::empty::<Expr>(),
+                vec![
+                    // common: avg(col("a"))
+                    avg(col("a")).alias("col1"),
+                    avg(col("a")).alias("col2"),
+                    // no common
+                    avg(col("b")).alias("col3"),
+                    avg(col("c")),
+                    // common: udf_agg(col("a"))
+                    udf_agg(col("a")).alias("col4"),
+                    udf_agg(col("a")).alias("col5"),
+                    // no common
+                    udf_agg(col("b")).alias("col6"),
+                    udf_agg(col("c")),
+                ],
+            )?
+            .build()?;
+
+        let expected = "Projection: AVG(test.a)test.a AS AVG(test.a) AS col1, AVG(test.a)test.a AS AVG(test.a) AS col2, col3, AVG(test.c) AS AVG(test.c), my_agg(test.a)test.a AS my_agg(test.a) AS col4, my_agg(test.a)test.a AS my_agg(test.a) AS col5, col6, my_agg(test.c) AS my_agg(test.c)\
+        \n  Aggregate: groupBy=[[]], aggr=[[AVG(test.a) AS AVG(test.a)test.a, my_agg(test.a) AS my_agg(test.a)test.a, AVG(test.b) AS col3, AVG(test.c) AS AVG(test.c), my_agg(test.b) AS col6, my_agg(test.c) AS my_agg(test.c)]]\
+        \n    TableScan: test";
+
+        assert_optimized_plan_eq(expected, &plan);
+
+        // test: trafo after aggregate
+        let plan = LogicalPlanBuilder::from(table_scan.clone())
             .aggregate(
                 iter::empty::<Expr>(),
                 vec![
                     binary_expr(lit(1), Operator::Plus, avg(col("a"))),
                     binary_expr(lit(1), Operator::Minus, avg(col("a"))),
+                    binary_expr(lit(1), Operator::Plus, udf_agg(col("a"))),
+                    binary_expr(lit(1), Operator::Minus, udf_agg(col("a"))),

Review Comment:
   done (also for the other tests within this module



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] crepererum commented on a diff in pull request #6129: fix: `common_subexpr_eliminate` and aggregates

Posted by "crepererum (via GitHub)" <gi...@apache.org>.
crepererum commented on code in PR #6129:
URL: https://github.com/apache/arrow-datafusion/pull/6129#discussion_r1178074706


##########
datafusion/optimizer/src/common_subexpr_eliminate.rs:
##########
@@ -352,6 +462,49 @@ fn build_recover_project_plan(schema: &DFSchema, input: LogicalPlan) -> LogicalP
     )
 }
 
+/// Which type of [expressions](Expr) should be considered for rewriting?

Review Comment:
   While this resulted in more code, I've decided to pull the masking out into its own enum / config value so the code is easier to read and extend (I could imagine that we may wanna have more special treatments for other logical plan nodes at some point).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb merged pull request #6129: fix: `common_subexpr_eliminate` and aggregates

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb merged PR #6129:
URL: https://github.com/apache/arrow-datafusion/pull/6129


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #6129: fix: `common_subexpr_eliminate` and aggregates

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #6129:
URL: https://github.com/apache/arrow-datafusion/pull/6129#issuecomment-1523741626

   This PR seems to have some CI issues but I am not sure if they are related to changes here 🤔 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #6129: fix: `common_subexpr_eliminate` and aggregates

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #6129:
URL: https://github.com/apache/arrow-datafusion/pull/6129#discussion_r1179407218


##########
datafusion/optimizer/src/common_subexpr_eliminate.rs:
##########
@@ -708,22 +889,149 @@ mod test {
     fn aggregate() -> Result<()> {
         let table_scan = test_table_scan()?;
 
-        let plan = LogicalPlanBuilder::from(table_scan)
+        let return_type: ReturnTypeFunction = Arc::new(|inputs| {
+            assert_eq!(inputs, &[DataType::UInt32]);
+            Ok(Arc::new(DataType::UInt32))
+        });
+        let accumulator: AccumulatorFunctionImplementation =
+            Arc::new(|_| unimplemented!());
+        let state_type: StateTypeFunction = Arc::new(|_| unimplemented!());
+        let udf_agg = |inner: Expr| Expr::AggregateUDF {
+            fun: Arc::new(AggregateUDF::new(
+                "my_agg",
+                &Signature::exact(vec![DataType::UInt32], Volatility::Stable),
+                &return_type,
+                &accumulator,
+                &state_type,
+            )),
+            args: vec![inner],
+            filter: None,
+        };
+
+        // test: common aggregates
+        let plan = LogicalPlanBuilder::from(table_scan.clone())
+            .aggregate(
+                iter::empty::<Expr>(),
+                vec![
+                    // common: avg(col("a"))
+                    avg(col("a")).alias("col1"),
+                    avg(col("a")).alias("col2"),
+                    // no common
+                    avg(col("b")).alias("col3"),
+                    avg(col("c")),
+                    // common: udf_agg(col("a"))
+                    udf_agg(col("a")).alias("col4"),
+                    udf_agg(col("a")).alias("col5"),
+                    // no common
+                    udf_agg(col("b")).alias("col6"),
+                    udf_agg(col("c")),
+                ],
+            )?
+            .build()?;
+
+        let expected = "Projection: AVG(test.a)test.a AS AVG(test.a) AS col1, AVG(test.a)test.a AS AVG(test.a) AS col2, col3, AVG(test.c) AS AVG(test.c), my_agg(test.a)test.a AS my_agg(test.a) AS col4, my_agg(test.a)test.a AS my_agg(test.a) AS col5, col6, my_agg(test.c) AS my_agg(test.c)\

Review Comment:
   👍 



##########
datafusion/optimizer/src/common_subexpr_eliminate.rs:
##########
@@ -708,22 +889,149 @@ mod test {
     fn aggregate() -> Result<()> {
         let table_scan = test_table_scan()?;
 
-        let plan = LogicalPlanBuilder::from(table_scan)
+        let return_type: ReturnTypeFunction = Arc::new(|inputs| {
+            assert_eq!(inputs, &[DataType::UInt32]);
+            Ok(Arc::new(DataType::UInt32))
+        });
+        let accumulator: AccumulatorFunctionImplementation =
+            Arc::new(|_| unimplemented!());
+        let state_type: StateTypeFunction = Arc::new(|_| unimplemented!());
+        let udf_agg = |inner: Expr| Expr::AggregateUDF {
+            fun: Arc::new(AggregateUDF::new(
+                "my_agg",
+                &Signature::exact(vec![DataType::UInt32], Volatility::Stable),
+                &return_type,
+                &accumulator,
+                &state_type,
+            )),
+            args: vec![inner],
+            filter: None,
+        };
+
+        // test: common aggregates
+        let plan = LogicalPlanBuilder::from(table_scan.clone())
+            .aggregate(
+                iter::empty::<Expr>(),
+                vec![
+                    // common: avg(col("a"))
+                    avg(col("a")).alias("col1"),
+                    avg(col("a")).alias("col2"),
+                    // no common
+                    avg(col("b")).alias("col3"),
+                    avg(col("c")),
+                    // common: udf_agg(col("a"))
+                    udf_agg(col("a")).alias("col4"),
+                    udf_agg(col("a")).alias("col5"),
+                    // no common
+                    udf_agg(col("b")).alias("col6"),
+                    udf_agg(col("c")),
+                ],
+            )?
+            .build()?;
+
+        let expected = "Projection: AVG(test.a)test.a AS AVG(test.a) AS col1, AVG(test.a)test.a AS AVG(test.a) AS col2, col3, AVG(test.c) AS AVG(test.c), my_agg(test.a)test.a AS my_agg(test.a) AS col4, my_agg(test.a)test.a AS my_agg(test.a) AS col5, col6, my_agg(test.c) AS my_agg(test.c)\
+        \n  Aggregate: groupBy=[[]], aggr=[[AVG(test.a) AS AVG(test.a)test.a, my_agg(test.a) AS my_agg(test.a)test.a, AVG(test.b) AS col3, AVG(test.c) AS AVG(test.c), my_agg(test.b) AS col6, my_agg(test.c) AS my_agg(test.c)]]\
+        \n    TableScan: test";
+
+        assert_optimized_plan_eq(expected, &plan);
+
+        // test: trafo after aggregate
+        let plan = LogicalPlanBuilder::from(table_scan.clone())
             .aggregate(
                 iter::empty::<Expr>(),
                 vec![
                     binary_expr(lit(1), Operator::Plus, avg(col("a"))),
                     binary_expr(lit(1), Operator::Minus, avg(col("a"))),
+                    binary_expr(lit(1), Operator::Plus, udf_agg(col("a"))),
+                    binary_expr(lit(1), Operator::Minus, udf_agg(col("a"))),
                 ],
             )?
             .build()?;
 
-        let expected = "Aggregate: groupBy=[[]], aggr=[[Int32(1) + AVG(test.a)test.a AS AVG(test.a), Int32(1) - AVG(test.a)test.a AS AVG(test.a)]]\
-        \n  Projection: AVG(test.a) AS AVG(test.a)test.a, test.a, test.b, test.c\
+        let expected = "Projection: Int32(1) + AVG(test.a)test.a AS AVG(test.a), Int32(1) - AVG(test.a)test.a AS AVG(test.a), Int32(1) + my_agg(test.a)test.a AS my_agg(test.a), Int32(1) - my_agg(test.a)test.a AS my_agg(test.a)\
+        \n  Aggregate: groupBy=[[]], aggr=[[AVG(test.a) AS AVG(test.a)test.a, my_agg(test.a) AS my_agg(test.a)test.a]]\
         \n    TableScan: test";
 
         assert_optimized_plan_eq(expected, &plan);
 
+        // test: trafo before aggregate

Review Comment:
   What is a `trafo`?



##########
datafusion/optimizer/src/common_subexpr_eliminate.rs:
##########
@@ -352,6 +462,49 @@ fn build_recover_project_plan(schema: &DFSchema, input: LogicalPlan) -> LogicalP
     )
 }
 
+/// Which type of [expressions](Expr) should be considered for rewriting?

Review Comment:
   Another enum also adds an excellent place for more documentation 👍 



##########
datafusion/optimizer/src/common_subexpr_eliminate.rs:
##########
@@ -708,22 +889,149 @@ mod test {
     fn aggregate() -> Result<()> {
         let table_scan = test_table_scan()?;
 
-        let plan = LogicalPlanBuilder::from(table_scan)
+        let return_type: ReturnTypeFunction = Arc::new(|inputs| {
+            assert_eq!(inputs, &[DataType::UInt32]);
+            Ok(Arc::new(DataType::UInt32))
+        });
+        let accumulator: AccumulatorFunctionImplementation =
+            Arc::new(|_| unimplemented!());
+        let state_type: StateTypeFunction = Arc::new(|_| unimplemented!());
+        let udf_agg = |inner: Expr| Expr::AggregateUDF {
+            fun: Arc::new(AggregateUDF::new(
+                "my_agg",
+                &Signature::exact(vec![DataType::UInt32], Volatility::Stable),
+                &return_type,
+                &accumulator,
+                &state_type,
+            )),
+            args: vec![inner],
+            filter: None,
+        };
+
+        // test: common aggregates
+        let plan = LogicalPlanBuilder::from(table_scan.clone())
+            .aggregate(
+                iter::empty::<Expr>(),
+                vec![
+                    // common: avg(col("a"))
+                    avg(col("a")).alias("col1"),
+                    avg(col("a")).alias("col2"),
+                    // no common
+                    avg(col("b")).alias("col3"),
+                    avg(col("c")),
+                    // common: udf_agg(col("a"))
+                    udf_agg(col("a")).alias("col4"),
+                    udf_agg(col("a")).alias("col5"),
+                    // no common
+                    udf_agg(col("b")).alias("col6"),
+                    udf_agg(col("c")),
+                ],
+            )?
+            .build()?;
+
+        let expected = "Projection: AVG(test.a)test.a AS AVG(test.a) AS col1, AVG(test.a)test.a AS AVG(test.a) AS col2, col3, AVG(test.c) AS AVG(test.c), my_agg(test.a)test.a AS my_agg(test.a) AS col4, my_agg(test.a)test.a AS my_agg(test.a) AS col5, col6, my_agg(test.c) AS my_agg(test.c)\
+        \n  Aggregate: groupBy=[[]], aggr=[[AVG(test.a) AS AVG(test.a)test.a, my_agg(test.a) AS my_agg(test.a)test.a, AVG(test.b) AS col3, AVG(test.c) AS AVG(test.c), my_agg(test.b) AS col6, my_agg(test.c) AS my_agg(test.c)]]\
+        \n    TableScan: test";
+
+        assert_optimized_plan_eq(expected, &plan);
+
+        // test: trafo after aggregate
+        let plan = LogicalPlanBuilder::from(table_scan.clone())
             .aggregate(
                 iter::empty::<Expr>(),
                 vec![
                     binary_expr(lit(1), Operator::Plus, avg(col("a"))),
                     binary_expr(lit(1), Operator::Minus, avg(col("a"))),
+                    binary_expr(lit(1), Operator::Plus, udf_agg(col("a"))),
+                    binary_expr(lit(1), Operator::Minus, udf_agg(col("a"))),

Review Comment:
   FWIW you can also write these expressions like this (which I find easier to read / follow the intent) as it matches the output more closely
   
   ```suggestion
                      lit(1)+ avg(col("a")),
                       lit(1)- avg(col("a"))),
                       lit(1) + udf_agg(col("a")),
                       lit(1) - udf_agg(col("a")),
   ```
   
   The same comment applies to the new tests below as well



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #6129: fix: `common_subexpr_eliminate` and aggregates

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #6129:
URL: https://github.com/apache/arrow-datafusion/pull/6129#issuecomment-1527640512

   Thanks again @crepererum  and @waynexia 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #6129: fix: `common_subexpr_eliminate` and aggregates

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #6129:
URL: https://github.com/apache/arrow-datafusion/pull/6129#issuecomment-1524016117

   FWIW I see the `gix_config` error on a brach with no code changes . Filed https://github.com/apache/arrow-datafusion/issues/6132


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] waynexia commented on a diff in pull request #6129: fix: `common_subexpr_eliminate` and aggregates

Posted by "waynexia (via GitHub)" <gi...@apache.org>.
waynexia commented on code in PR #6129:
URL: https://github.com/apache/arrow-datafusion/pull/6129#discussion_r1179987474


##########
datafusion/optimizer/src/common_subexpr_eliminate.rs:
##########
@@ -58,17 +60,14 @@ type Identifier = String;
 pub struct CommonSubexprEliminate {}
 
 impl CommonSubexprEliminate {
-    fn rewrite_expr(
+    fn rewrite_exprs_list(

Review Comment:
   👍 good refactor



##########
datafusion/optimizer/src/common_subexpr_eliminate.rs:
##########
@@ -352,6 +462,49 @@ fn build_recover_project_plan(schema: &DFSchema, input: LogicalPlan) -> LogicalP
     )
 }
 
+/// Which type of [expressions](Expr) should be considered for rewriting?

Review Comment:
   This enumeration is excellent! It provides a more organized approach to managing the "special cases".



##########
datafusion/optimizer/src/common_subexpr_eliminate.rs:
##########
@@ -197,16 +223,93 @@ impl OptimizerRule for CommonSubexprEliminate {
                 let new_aggr_expr = pop_expr(&mut new_expr)?;
                 let new_group_expr = pop_expr(&mut new_expr)?;
 
-                Some(LogicalPlan::Aggregate(Aggregate::try_new_with_schema(

Review Comment:
   I need file a PR to restructure this (very) huge map as it keeps expanding...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] crepererum commented on a diff in pull request #6129: fix: `common_subexpr_eliminate` and aggregates

Posted by "crepererum (via GitHub)" <gi...@apache.org>.
crepererum commented on code in PR #6129:
URL: https://github.com/apache/arrow-datafusion/pull/6129#discussion_r1180070069


##########
datafusion/optimizer/src/common_subexpr_eliminate.rs:
##########
@@ -708,22 +889,149 @@ mod test {
     fn aggregate() -> Result<()> {
         let table_scan = test_table_scan()?;
 
-        let plan = LogicalPlanBuilder::from(table_scan)
+        let return_type: ReturnTypeFunction = Arc::new(|inputs| {
+            assert_eq!(inputs, &[DataType::UInt32]);
+            Ok(Arc::new(DataType::UInt32))
+        });
+        let accumulator: AccumulatorFunctionImplementation =
+            Arc::new(|_| unimplemented!());
+        let state_type: StateTypeFunction = Arc::new(|_| unimplemented!());
+        let udf_agg = |inner: Expr| Expr::AggregateUDF {
+            fun: Arc::new(AggregateUDF::new(
+                "my_agg",
+                &Signature::exact(vec![DataType::UInt32], Volatility::Stable),
+                &return_type,
+                &accumulator,
+                &state_type,
+            )),
+            args: vec![inner],
+            filter: None,
+        };
+
+        // test: common aggregates
+        let plan = LogicalPlanBuilder::from(table_scan.clone())
+            .aggregate(
+                iter::empty::<Expr>(),
+                vec![
+                    // common: avg(col("a"))
+                    avg(col("a")).alias("col1"),
+                    avg(col("a")).alias("col2"),
+                    // no common
+                    avg(col("b")).alias("col3"),
+                    avg(col("c")),
+                    // common: udf_agg(col("a"))
+                    udf_agg(col("a")).alias("col4"),
+                    udf_agg(col("a")).alias("col5"),
+                    // no common
+                    udf_agg(col("b")).alias("col6"),
+                    udf_agg(col("c")),
+                ],
+            )?
+            .build()?;
+
+        let expected = "Projection: AVG(test.a)test.a AS AVG(test.a) AS col1, AVG(test.a)test.a AS AVG(test.a) AS col2, col3, AVG(test.c) AS AVG(test.c), my_agg(test.a)test.a AS my_agg(test.a) AS col4, my_agg(test.a)test.a AS my_agg(test.a) AS col5, col6, my_agg(test.c) AS my_agg(test.c)\
+        \n  Aggregate: groupBy=[[]], aggr=[[AVG(test.a) AS AVG(test.a)test.a, my_agg(test.a) AS my_agg(test.a)test.a, AVG(test.b) AS col3, AVG(test.c) AS AVG(test.c), my_agg(test.b) AS col6, my_agg(test.c) AS my_agg(test.c)]]\
+        \n    TableScan: test";
+
+        assert_optimized_plan_eq(expected, &plan);
+
+        // test: trafo after aggregate
+        let plan = LogicalPlanBuilder::from(table_scan.clone())
             .aggregate(
                 iter::empty::<Expr>(),
                 vec![
                     binary_expr(lit(1), Operator::Plus, avg(col("a"))),
                     binary_expr(lit(1), Operator::Minus, avg(col("a"))),
+                    binary_expr(lit(1), Operator::Plus, udf_agg(col("a"))),
+                    binary_expr(lit(1), Operator::Minus, udf_agg(col("a"))),
                 ],
             )?
             .build()?;
 
-        let expected = "Aggregate: groupBy=[[]], aggr=[[Int32(1) + AVG(test.a)test.a AS AVG(test.a), Int32(1) - AVG(test.a)test.a AS AVG(test.a)]]\
-        \n  Projection: AVG(test.a) AS AVG(test.a)test.a, test.a, test.b, test.c\
+        let expected = "Projection: Int32(1) + AVG(test.a)test.a AS AVG(test.a), Int32(1) - AVG(test.a)test.a AS AVG(test.a), Int32(1) + my_agg(test.a)test.a AS my_agg(test.a), Int32(1) - my_agg(test.a)test.a AS my_agg(test.a)\
+        \n  Aggregate: groupBy=[[]], aggr=[[AVG(test.a) AS AVG(test.a)test.a, my_agg(test.a) AS my_agg(test.a)test.a]]\
         \n    TableScan: test";
 
         assert_optimized_plan_eq(expected, &plan);
 
+        // test: trafo before aggregate

Review Comment:
   transformation, will fix the comment to spell it out 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org