You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "mingmwang (via GitHub)" <gi...@apache.org> on 2023/04/03 07:36:41 UTC

[GitHub] [arrow-datafusion] mingmwang opened a new pull request, #5837: Add new physical rule CombinePartialFinalAggregate

mingmwang opened a new pull request, #5837:
URL: https://github.com/apache/arrow-datafusion/pull/5837

   # Which issue does this PR close?
   
   <!--
   We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123.
   -->
   
   Closes #5836, #5774.
   
   # Rationale for this change
   
   
   <!--
    Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed.
    Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes.  
   -->
   
   # What changes are included in this PR?
   
   <!--
   There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR.
   -->
   
   # Are these changes tested?
   
   <!--
   We typically require tests for all PRs in order to:
   1. Prevent the code from being accidentally broken by subsequent changes
   2. Serve as another way to document the expected behavior of the code
   
   If tests are not included in your PR, please explain why (for example, are they covered by existing tests)?
   -->
   
   # Are there any user-facing changes?
   
   <!--
   If there are user-facing changes then we may require documentation to be updated before approving the PR.
   -->
   
   <!--
   If there are any breaking changes to public APIs, please add the `api change` label.
   -->


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #5837: Add new physical rule CombinePartialFinalAggregate

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on code in PR #5837:
URL: https://github.com/apache/arrow-datafusion/pull/5837#discussion_r1162467854


##########
datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs:
##########
@@ -0,0 +1,120 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! CombinePartialFinalAggregate optimizer rule checks the adjacent Partial and Final AggregateExecs
+//! and try to combine them if necessary
+use crate::error::Result;
+use crate::physical_optimizer::PhysicalOptimizerRule;
+use crate::physical_plan::aggregates::{AggregateExec, AggregateMode};
+use crate::physical_plan::ExecutionPlan;
+use datafusion_common::config::ConfigOptions;
+use std::sync::Arc;
+
+use datafusion_common::tree_node::{Transformed, TreeNode};
+
+/// CombinePartialFinalAggregate optimizer rule combines the adjacent Partial and Final AggregateExecs
+/// into a Single AggregateExec if their grouping exprs and aggregate exprs equal.
+///
+/// This rule should be applied after the EnforceDistribution and EnforceSorting rules
+///
+#[derive(Default)]
+pub struct CombinePartialFinalAggregate {}
+
+impl CombinePartialFinalAggregate {
+    #[allow(missing_docs)]
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+impl PhysicalOptimizerRule for CombinePartialFinalAggregate {
+    fn optimize(
+        &self,
+        plan: Arc<dyn ExecutionPlan>,
+        _config: &ConfigOptions,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        plan.transform_down(&|plan| {
+            let transformed = plan.as_any().downcast_ref::<AggregateExec>().and_then(
+                |AggregateExec {
+                     mode: final_mode,
+                     input: final_input,
+                     group_by: final_group_by,
+                     aggr_expr: final_aggr_expr,
+                     ..
+                 }| {
+                    if matches!(
+                        final_mode,
+                        AggregateMode::Final | AggregateMode::FinalPartitioned
+                    ) {
+                        final_input
+                            .as_any()
+                            .downcast_ref::<AggregateExec>()
+                            .and_then(
+                                |AggregateExec {
+                                     mode: input_mode,
+                                     input: partial_input,
+                                     group_by: input_group_by,
+                                     aggr_expr: input_aggr_expr,
+                                     input_schema,
+                                     ..
+                                 }| {
+                                    if matches!(input_mode, AggregateMode::Partial)
+                                        && final_group_by.eq(input_group_by)
+                                        && final_aggr_expr.len() == input_aggr_expr.len()
+                                        && final_aggr_expr
+                                            .iter()
+                                            .zip(input_aggr_expr.iter())
+                                            .all(|(final_expr, partial_expr)| {
+                                                final_expr.eq(partial_expr)
+                                            })
+                                    {
+                                        AggregateExec::try_new(
+                                            AggregateMode::Single,
+                                            input_group_by.clone(),
+                                            input_aggr_expr.to_vec(),
+                                            partial_input.clone(),
+                                            input_schema.clone(),
+                                        )
+                                        .ok()
+                                        .map(Arc::new)
+                                    } else {
+                                        None
+                                    }
+                                },
+                            )
+                    } else {
+                        None
+                    }
+                },
+            );
+
+            Ok(if let Some(transformed) = transformed {
+                Transformed::Yes(transformed)
+            } else {
+                Transformed::No(plan)
+            })
+        })
+    }
+
+    fn name(&self) -> &str {
+        "CombinePartialFinalAggregate"
+    }
+
+    fn schema_check(&self) -> bool {
+        true
+    }
+}

Review Comment:
   Sure, will do.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] Dandandan commented on pull request #5837: Add new physical rule CombinePartialFinalAggregate

Posted by "Dandandan (via GitHub)" <gi...@apache.org>.
Dandandan commented on PR #5837:
URL: https://github.com/apache/arrow-datafusion/pull/5837#issuecomment-1494310373

   As far as I can see, this only works for single partitions as input and not repartitioning in between (e.g. no concurrency), could you confirm?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] yahoNanJing merged pull request #5837: Add new physical rule CombinePartialFinalAggregate

Posted by "yahoNanJing (via GitHub)" <gi...@apache.org>.
yahoNanJing merged PR #5837:
URL: https://github.com/apache/arrow-datafusion/pull/5837


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #5837: Add new physical rule CombinePartialFinalAggregate

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on code in PR #5837:
URL: https://github.com/apache/arrow-datafusion/pull/5837#discussion_r1162464083


##########
datafusion/core/tests/sql/joins.rs:
##########
@@ -1772,6 +1772,171 @@ async fn right_semi_join() -> Result<()> {
     Ok(())
 }
 
+#[tokio::test]
+async fn join_and_aggregate_on_same_key() -> Result<()> {
+    let ctx = create_join_context("t1_id", "t2_id", true)?;
+    let sql = "select distinct(t1.t1_id) from t1 inner join t2 on t1.t1_id = t2.t2_id";
+
+    // assert logical plan
+    let msg = format!("Creating logical plan for '{sql}'");
+    let dataframe = ctx.sql(&("explain ".to_owned() + sql)).await.expect(&msg);
+    let plan = dataframe.into_optimized_plan().unwrap();
+
+    let expected = vec![
+        "Explain [plan_type:Utf8, plan:Utf8]",
+        "  Aggregate: groupBy=[[t1.t1_id]], aggr=[[]] [t1_id:UInt32;N]",
+        "    Projection: t1.t1_id [t1_id:UInt32;N]",
+        "      Inner Join: t1.t1_id = t2.t2_id [t1_id:UInt32;N, t2_id:UInt32;N]",
+        "        TableScan: t1 projection=[t1_id] [t1_id:UInt32;N]",
+        "        TableScan: t2 projection=[t2_id] [t2_id:UInt32;N]",
+    ];
+
+    let formatted = plan.display_indent_schema().to_string();
+    let actual: Vec<&str> = formatted.trim().lines().collect();
+    assert_eq!(
+        expected, actual,
+        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+    );
+
+    let msg = format!("Creating physical plan for '{sql}'");
+    let dataframe = ctx.sql(sql).await.expect(&msg);
+    let physical_plan = dataframe.create_physical_plan().await?;
+    let expected =
+        vec![
+            "AggregateExec: mode=Single, gby=[t1_id@0 as t1_id], aggr=[]",

Review Comment:
   Yes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #5837: Add new physical rule CombinePartialFinalAggregate

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #5837:
URL: https://github.com/apache/arrow-datafusion/pull/5837#discussion_r1162025055


##########
datafusion/core/src/physical_plan/aggregates/mod.rs:
##########
@@ -147,6 +149,24 @@ impl PhysicalGroupBy {
     }
 }
 
+impl PartialEq for PhysicalGroupBy {
+    fn eq(&self, other: &PhysicalGroupBy) -> bool {
+        self.expr.len() == other.expr.len()
+            && self
+                .expr
+                .iter()
+                .zip(other.expr.iter())
+                .all(|((expr1, name1), (expr2, name2))| expr1.eq(expr2) && name1 == name2)

Review Comment:
   I wondered why this needed to be manually derived, so I tried removing it and got this error:
   
   ```
   error[E0369]: binary operation `==` cannot be applied to type `Vec<(Arc<dyn PhysicalExpr>, std::string::String)>`
     --> datafusion/core/src/physical_plan/aggregates/mod.rs:91:5
      |
   88 | #[derive(Clone, Debug, Default, PartialEq)]
      |                                 --------- in this derive macro expansion
   ...
   91 |     expr: Vec<(Arc<dyn PhysicalExpr>, String)>,
      |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
      |
      = note: this error originates in the derive macro `PartialEq` (in Nightly builds, run with -Z macro-backtrace for more info)
   
   ```



##########
datafusion/core/tests/sql/joins.rs:
##########
@@ -1772,6 +1772,171 @@ async fn right_semi_join() -> Result<()> {
     Ok(())
 }
 
+#[tokio::test]
+async fn join_and_aggregate_on_same_key() -> Result<()> {
+    let ctx = create_join_context("t1_id", "t2_id", true)?;
+    let sql = "select distinct(t1.t1_id) from t1 inner join t2 on t1.t1_id = t2.t2_id";
+
+    // assert logical plan
+    let msg = format!("Creating logical plan for '{sql}'");
+    let dataframe = ctx.sql(&("explain ".to_owned() + sql)).await.expect(&msg);
+    let plan = dataframe.into_optimized_plan().unwrap();
+
+    let expected = vec![
+        "Explain [plan_type:Utf8, plan:Utf8]",
+        "  Aggregate: groupBy=[[t1.t1_id]], aggr=[[]] [t1_id:UInt32;N]",
+        "    Projection: t1.t1_id [t1_id:UInt32;N]",
+        "      Inner Join: t1.t1_id = t2.t2_id [t1_id:UInt32;N, t2_id:UInt32;N]",
+        "        TableScan: t1 projection=[t1_id] [t1_id:UInt32;N]",
+        "        TableScan: t2 projection=[t2_id] [t2_id:UInt32;N]",
+    ];
+
+    let formatted = plan.display_indent_schema().to_string();
+    let actual: Vec<&str> = formatted.trim().lines().collect();
+    assert_eq!(
+        expected, actual,
+        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+    );
+
+    let msg = format!("Creating physical plan for '{sql}'");
+    let dataframe = ctx.sql(sql).await.expect(&msg);
+    let physical_plan = dataframe.create_physical_plan().await?;
+    let expected =
+        vec![
+            "AggregateExec: mode=Single, gby=[t1_id@0 as t1_id], aggr=[]",

Review Comment:
   Is it correct that this plan can use a single aggregate because is is already partitioned on the group key (t1_id) after the join



##########
datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs:
##########
@@ -0,0 +1,120 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! CombinePartialFinalAggregate optimizer rule checks the adjacent Partial and Final AggregateExecs
+//! and try to combine them if necessary
+use crate::error::Result;
+use crate::physical_optimizer::PhysicalOptimizerRule;
+use crate::physical_plan::aggregates::{AggregateExec, AggregateMode};
+use crate::physical_plan::ExecutionPlan;
+use datafusion_common::config::ConfigOptions;
+use std::sync::Arc;
+
+use datafusion_common::tree_node::{Transformed, TreeNode};
+
+/// CombinePartialFinalAggregate optimizer rule combines the adjacent Partial and Final AggregateExecs
+/// into a Single AggregateExec if their grouping exprs and aggregate exprs equal.
+///
+/// This rule should be applied after the EnforceDistribution and EnforceSorting rules
+///
+#[derive(Default)]
+pub struct CombinePartialFinalAggregate {}
+
+impl CombinePartialFinalAggregate {
+    #[allow(missing_docs)]
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+impl PhysicalOptimizerRule for CombinePartialFinalAggregate {
+    fn optimize(
+        &self,
+        plan: Arc<dyn ExecutionPlan>,
+        _config: &ConfigOptions,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        plan.transform_down(&|plan| {
+            let transformed = plan.as_any().downcast_ref::<AggregateExec>().and_then(
+                |AggregateExec {
+                     mode: final_mode,
+                     input: final_input,
+                     group_by: final_group_by,
+                     aggr_expr: final_aggr_expr,
+                     ..
+                 }| {
+                    if matches!(
+                        final_mode,
+                        AggregateMode::Final | AggregateMode::FinalPartitioned
+                    ) {
+                        final_input
+                            .as_any()
+                            .downcast_ref::<AggregateExec>()
+                            .and_then(
+                                |AggregateExec {
+                                     mode: input_mode,
+                                     input: partial_input,
+                                     group_by: input_group_by,
+                                     aggr_expr: input_aggr_expr,
+                                     input_schema,
+                                     ..
+                                 }| {
+                                    if matches!(input_mode, AggregateMode::Partial)
+                                        && final_group_by.eq(input_group_by)
+                                        && final_aggr_expr.len() == input_aggr_expr.len()
+                                        && final_aggr_expr
+                                            .iter()
+                                            .zip(input_aggr_expr.iter())
+                                            .all(|(final_expr, partial_expr)| {
+                                                final_expr.eq(partial_expr)
+                                            })
+                                    {
+                                        AggregateExec::try_new(
+                                            AggregateMode::Single,
+                                            input_group_by.clone(),
+                                            input_aggr_expr.to_vec(),
+                                            partial_input.clone(),
+                                            input_schema.clone(),
+                                        )
+                                        .ok()
+                                        .map(Arc::new)
+                                    } else {
+                                        None
+                                    }
+                                },
+                            )
+                    } else {
+                        None
+                    }
+                },
+            );
+
+            Ok(if let Some(transformed) = transformed {
+                Transformed::Yes(transformed)
+            } else {
+                Transformed::No(plan)
+            })
+        })
+    }
+
+    fn name(&self) -> &str {
+        "CombinePartialFinalAggregate"
+    }
+
+    fn schema_check(&self) -> bool {
+        true
+    }
+}

Review Comment:
   I think it would help to add unit tests to this optimizer so we can see what it does in isolation (and test things like different agg exprs not being matched)



##########
datafusion/core/src/physical_plan/aggregates/mod.rs:
##########
@@ -65,6 +65,8 @@ pub enum AggregateMode {
     /// with Hash repartitioning on the group keys. If a group key is
     /// duplicated, duplicate groups would be produced
     FinalPartitioned,
+    /// Single aggregate is a combination of Partial and Final aggregate mode

Review Comment:
   ```suggestion
       /// Applies the entire logical aggregation operation in a single operator,
       /// as opposed to Partial / Final modes which apply the logical aggregation using
       /// two operators.  
   ```



##########
datafusion/physical-expr/src/aggregate/mod.rs:
##########
@@ -56,7 +56,7 @@ pub(crate) mod variance;
 /// * knows how to create its accumulator
 /// * knows its accumulator's state's field
 /// * knows the expressions from whose its accumulator will receive values
-pub trait AggregateExpr: Send + Sync + Debug {
+pub trait AggregateExpr: Send + Sync + Debug + PartialEq<dyn Any> {

Review Comment:
   this is an API change. I understand why we need a PartialEq against `dyn Any` but it might be somewhat confusing to others
   
   could you add a documentation describing how to do so (perhaps pointing at the `down_cast_any_ref` utility function)



##########
datafusion/physical-expr/src/aggregate/utils.rs:
##########
@@ -31,3 +34,17 @@ pub fn get_accum_scalar_values_as_arrays(
         .map(|s| s.to_array_of_size(1))
         .collect::<Vec<_>>())
 }
+
+pub fn down_cast_any_ref(any: &dyn Any) -> &dyn Any {

Review Comment:
   Can you  please document what this function does (with an example) given it is a new `pub` function?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on pull request #5837: Add new physical rule CombinePartialFinalAggregate

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on PR #5837:
URL: https://github.com/apache/arrow-datafusion/pull/5837#issuecomment-1504496616

   The agg expression and group expression comparing between the partial and final aggregation is problematic.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on pull request #5837: Add new physical rule CombinePartialFinalAggregate

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on PR #5837:
URL: https://github.com/apache/arrow-datafusion/pull/5837#issuecomment-1493846754

   Will add some UT soon.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #5837: Add new physical rule CombinePartialFinalAggregate

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on code in PR #5837:
URL: https://github.com/apache/arrow-datafusion/pull/5837#discussion_r1159221512


##########
datafusion/core/src/physical_plan/aggregates/mod.rs:
##########
@@ -360,7 +380,9 @@ impl ExecutionPlan for AggregateExec {
 
     fn required_input_distribution(&self) -> Vec<Distribution> {
         match &self.mode {
-            AggregateMode::Partial => vec![Distribution::UnspecifiedDistribution],
+            AggregateMode::Partial | AggregateMode::Single => {

Review Comment:
   It should be the same as the `AggregateMode::Partial`.  And this new rule will be applied after the `EnforceDistribution` rule,  which means the distribution requirements are already satisfied, and then the adjacent `Partial` and `Final` Aggregate are combined to a new `Single` Aggregate.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] Dandandan commented on a diff in pull request #5837: Add new physical rule CombinePartialFinalAggregate

Posted by "Dandandan (via GitHub)" <gi...@apache.org>.
Dandandan commented on code in PR #5837:
URL: https://github.com/apache/arrow-datafusion/pull/5837#discussion_r1155982772


##########
datafusion/core/src/physical_plan/aggregates/mod.rs:
##########
@@ -360,7 +380,9 @@ impl ExecutionPlan for AggregateExec {
 
     fn required_input_distribution(&self) -> Vec<Distribution> {
         match &self.mode {
-            AggregateMode::Partial => vec![Distribution::UnspecifiedDistribution],
+            AggregateMode::Partial | AggregateMode::Single => {

Review Comment:
   Shouldn't it be `Distribution::SinglePartition`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] yahoNanJing commented on a diff in pull request #5837: Add new physical rule CombinePartialFinalAggregate

Posted by "yahoNanJing (via GitHub)" <gi...@apache.org>.
yahoNanJing commented on code in PR #5837:
URL: https://github.com/apache/arrow-datafusion/pull/5837#discussion_r1163540011


##########
datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs:
##########
@@ -0,0 +1,323 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! CombinePartialFinalAggregate optimizer rule checks the adjacent Partial and Final AggregateExecs
+//! and try to combine them if necessary
+use crate::error::Result;
+use crate::physical_optimizer::PhysicalOptimizerRule;
+use crate::physical_plan::aggregates::{AggregateExec, AggregateMode};
+use crate::physical_plan::ExecutionPlan;
+use datafusion_common::config::ConfigOptions;
+use std::sync::Arc;
+
+use datafusion_common::tree_node::{Transformed, TreeNode};
+
+/// CombinePartialFinalAggregate optimizer rule combines the adjacent Partial and Final AggregateExecs
+/// into a Single AggregateExec if their grouping exprs and aggregate exprs equal.
+///
+/// This rule should be applied after the EnforceDistribution and EnforceSorting rules
+///
+#[derive(Default)]
+pub struct CombinePartialFinalAggregate {}
+
+impl CombinePartialFinalAggregate {
+    #[allow(missing_docs)]
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+impl PhysicalOptimizerRule for CombinePartialFinalAggregate {
+    fn optimize(
+        &self,
+        plan: Arc<dyn ExecutionPlan>,
+        _config: &ConfigOptions,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        plan.transform_down(&|plan| {
+            let transformed = plan.as_any().downcast_ref::<AggregateExec>().and_then(
+                |AggregateExec {
+                     mode: final_mode,
+                     input: final_input,
+                     group_by: final_group_by,
+                     aggr_expr: final_aggr_expr,
+                     filter_expr: final_filter_expr,
+                     ..
+                 }| {
+                    if matches!(
+                        final_mode,
+                        AggregateMode::Final | AggregateMode::FinalPartitioned
+                    ) {
+                        final_input
+                            .as_any()
+                            .downcast_ref::<AggregateExec>()

Review Comment:
   Thanks @mingmwang for introducing this rule, which will significantly improve the query performances for the SQL patterns shown in UTs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] Dandandan commented on pull request #5837: Add new physical rule CombinePartialFinalAggregate

Posted by "Dandandan (via GitHub)" <gi...@apache.org>.
Dandandan commented on PR #5837:
URL: https://github.com/apache/arrow-datafusion/pull/5837#issuecomment-1494393563

   > > As far as I can see, this only works for single partitions as input and not repartitioning in between (e.g. no concurrency), could you confirm?
   > 
   > No always. We will see the adjacent Partial + Final Aggregator for normal join and aggregation on the same key. I will add more UTs and intg tests tomorrow to show the cases:
   > 
   > ```rust
   > select distinct(t1.t1_id) from t1 inner join t2 on t1.t1_id = t2.t2_id;
   > ```
   > 
   > ```sql
   > AggregateExec: mode=Single, gby=[t1_id@0 as t1_id], aggr=[]",
   >       ProjectionExec: expr=[t1_id@0 as t1_id]",
   >         CoalesceBatchesExec: target_batch_size=4096",
   >           HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"t1_id\", index: 0 }, Column { name: \"t2_id\", index: 0 })]",
   >             CoalesceBatchesExec: target_batch_size=4096",
   >               RepartitionExec: partitioning=Hash([Column { name: \"t1_id\", index: 0 }], 2), input_partitions=2",
   >                 RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
   >                   MemoryExec: partitions=1, partition_sizes=[1]",
   >             CoalesceBatchesExec: target_batch_size=4096",
   >               RepartitionExec: partitioning=Hash([Column { name: \"t2_id\", index: 0 }], 2), input_partitions=2",
   >                 RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
   >                   MemoryExec: partitions=1, partition_sizes=[1]",
   > ```
   
   Ah yes  - in the case the underlying partition is already hash-repartitioned on the key. Makes sense, thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #5837: Add new physical rule CombinePartialFinalAggregate

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on code in PR #5837:
URL: https://github.com/apache/arrow-datafusion/pull/5837#discussion_r1162675006


##########
datafusion/physical-expr/src/aggregate/utils.rs:
##########
@@ -31,3 +34,17 @@ pub fn get_accum_scalar_values_as_arrays(
         .map(|s| s.to_array_of_size(1))
         .collect::<Vec<_>>())
 }
+
+pub fn down_cast_any_ref(any: &dyn Any) -> &dyn Any {

Review Comment:
   Yes, just add more comments. I have an example in the count unitest.
   
   ```rust
   #[test]
       fn count_eq() -> Result<()> {
           let count = Count::new(lit(1i8), "COUNT(1)".to_string(), DataType::Int64);
           let arc_count: Arc<dyn AggregateExpr> = Arc::new(Count::new(
               lit(1i8),
               "COUNT(1)".to_string(),
               DataType::Int64,
           ));
           let box_count: Box<dyn AggregateExpr> = Box::new(Count::new(
               lit(1i8),
               "COUNT(1)".to_string(),
               DataType::Int64,
           ));
           let count2 = Count::new(lit(1i8), "COUNT(2)".to_string(), DataType::Int64);
   
           assert!(arc_count.eq(&box_count));
           assert!(box_count.eq(&arc_count));
           assert!(arc_count.eq(&count));
           assert!(count.eq(&box_count));
           assert!(count.eq(&arc_count));
   
           assert!(count2.ne(&arc_count));
   
           Ok(())
    }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #5837: Add new physical rule CombinePartialFinalAggregate

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on code in PR #5837:
URL: https://github.com/apache/arrow-datafusion/pull/5837#discussion_r1162474934


##########
datafusion/core/src/physical_plan/aggregates/mod.rs:
##########
@@ -147,6 +149,24 @@ impl PhysicalGroupBy {
     }
 }
 
+impl PartialEq for PhysicalGroupBy {
+    fn eq(&self, other: &PhysicalGroupBy) -> bool {
+        self.expr.len() == other.expr.len()
+            && self
+                .expr
+                .iter()
+                .zip(other.expr.iter())
+                .all(|((expr1, name1), (expr2, name2))| expr1.eq(expr2) && name1 == name2)

Review Comment:
   It looks like if Struct contains any boxed Trait Object, we can not use the `PartialEq`  derive macros.
   
   https://github.com/rust-lang/rust/issues/39128
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] yahoNanJing commented on a diff in pull request #5837: Add new physical rule CombinePartialFinalAggregate

Posted by "yahoNanJing (via GitHub)" <gi...@apache.org>.
yahoNanJing commented on code in PR #5837:
URL: https://github.com/apache/arrow-datafusion/pull/5837#discussion_r1163539462


##########
datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs:
##########
@@ -0,0 +1,323 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! CombinePartialFinalAggregate optimizer rule checks the adjacent Partial and Final AggregateExecs
+//! and try to combine them if necessary
+use crate::error::Result;
+use crate::physical_optimizer::PhysicalOptimizerRule;
+use crate::physical_plan::aggregates::{AggregateExec, AggregateMode};
+use crate::physical_plan::ExecutionPlan;
+use datafusion_common::config::ConfigOptions;
+use std::sync::Arc;
+
+use datafusion_common::tree_node::{Transformed, TreeNode};
+
+/// CombinePartialFinalAggregate optimizer rule combines the adjacent Partial and Final AggregateExecs
+/// into a Single AggregateExec if their grouping exprs and aggregate exprs equal.
+///
+/// This rule should be applied after the EnforceDistribution and EnforceSorting rules
+///
+#[derive(Default)]
+pub struct CombinePartialFinalAggregate {}
+
+impl CombinePartialFinalAggregate {
+    #[allow(missing_docs)]
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+impl PhysicalOptimizerRule for CombinePartialFinalAggregate {
+    fn optimize(
+        &self,
+        plan: Arc<dyn ExecutionPlan>,
+        _config: &ConfigOptions,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        plan.transform_down(&|plan| {
+            let transformed = plan.as_any().downcast_ref::<AggregateExec>().and_then(
+                |AggregateExec {
+                     mode: final_mode,
+                     input: final_input,
+                     group_by: final_group_by,
+                     aggr_expr: final_aggr_expr,
+                     filter_expr: final_filter_expr,
+                     ..
+                 }| {
+                    if matches!(
+                        final_mode,
+                        AggregateMode::Final | AggregateMode::FinalPartitioned
+                    ) {
+                        final_input
+                            .as_any()
+                            .downcast_ref::<AggregateExec>()

Review Comment:
   Since there's no `RepartitionExec`, it means the distribution of `AggregateExec` with final mode and `AggregateExec` with partial mode are the same. Therefore, there's no need to do two-phase aggregations.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] yahoNanJing commented on pull request #5837: Add new physical rule CombinePartialFinalAggregate

Posted by "yahoNanJing (via GitHub)" <gi...@apache.org>.
yahoNanJing commented on PR #5837:
URL: https://github.com/apache/arrow-datafusion/pull/5837#issuecomment-1504503453

   Convert it to draft


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #5837: Add new physical rule CombinePartialFinalAggregate

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on code in PR #5837:
URL: https://github.com/apache/arrow-datafusion/pull/5837#discussion_r1163666734


##########
datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs:
##########
@@ -0,0 +1,323 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! CombinePartialFinalAggregate optimizer rule checks the adjacent Partial and Final AggregateExecs
+//! and try to combine them if necessary
+use crate::error::Result;
+use crate::physical_optimizer::PhysicalOptimizerRule;
+use crate::physical_plan::aggregates::{AggregateExec, AggregateMode};
+use crate::physical_plan::ExecutionPlan;
+use datafusion_common::config::ConfigOptions;
+use std::sync::Arc;
+
+use datafusion_common::tree_node::{Transformed, TreeNode};
+
+/// CombinePartialFinalAggregate optimizer rule combines the adjacent Partial and Final AggregateExecs
+/// into a Single AggregateExec if their grouping exprs and aggregate exprs equal.
+///
+/// This rule should be applied after the EnforceDistribution and EnforceSorting rules
+///
+#[derive(Default)]
+pub struct CombinePartialFinalAggregate {}
+
+impl CombinePartialFinalAggregate {
+    #[allow(missing_docs)]
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+impl PhysicalOptimizerRule for CombinePartialFinalAggregate {
+    fn optimize(
+        &self,
+        plan: Arc<dyn ExecutionPlan>,
+        _config: &ConfigOptions,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        plan.transform_down(&|plan| {
+            let transformed = plan.as_any().downcast_ref::<AggregateExec>().and_then(
+                |AggregateExec {
+                     mode: final_mode,
+                     input: final_input,
+                     group_by: final_group_by,
+                     aggr_expr: final_aggr_expr,
+                     filter_expr: final_filter_expr,
+                     ..
+                 }| {
+                    if matches!(
+                        final_mode,
+                        AggregateMode::Final | AggregateMode::FinalPartitioned
+                    ) {
+                        final_input
+                            .as_any()
+                            .downcast_ref::<AggregateExec>()

Review Comment:
   Actually the performance improve will not that significant, because usually the `Final` aggregation step is not that heavy.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on pull request #5837: Add new physical rule CombinePartialFinalAggregate

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on PR #5837:
URL: https://github.com/apache/arrow-datafusion/pull/5837#issuecomment-1504502967

   @yahoNanJing @alamb 
   Please help to move this PR to Draft.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on pull request #5837: Add new physical rule CombinePartialFinalAggregate

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on PR #5837:
URL: https://github.com/apache/arrow-datafusion/pull/5837#issuecomment-1494370086

   > As far as I can see, this only works for single partitions as input and not repartitioning in between (e.g. no concurrency), could you confirm?
   
   No always. We will see the adjacent Partial + Final Aggregator for normal join and aggregation on the same key.
   I will add more UTs and intg tests tomorrow to show the cases:
   ```rust
   select distinct(t1.t1_id) from t1 inner join t2 on t1.t1_id = t2.t2_id;
   ```
   
   ```sql
   AggregateExec: mode=Single, gby=[t1_id@0 as t1_id], aggr=[]",
         ProjectionExec: expr=[t1_id@0 as t1_id]",
           CoalesceBatchesExec: target_batch_size=4096",
             HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"t1_id\", index: 0 }, Column { name: \"t2_id\", index: 0 })]",
               CoalesceBatchesExec: target_batch_size=4096",
                 RepartitionExec: partitioning=Hash([Column { name: \"t1_id\", index: 0 }], 2), input_partitions=2",
                   RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
                     MemoryExec: partitions=1, partition_sizes=[1]",
               CoalesceBatchesExec: target_batch_size=4096",
                 RepartitionExec: partitioning=Hash([Column { name: \"t2_id\", index: 0 }], 2), input_partitions=2",
                   RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
                     MemoryExec: partitions=1, partition_sizes=[1]",
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] yjshen commented on pull request #5837: Add new physical rule CombinePartialFinalAggregate

Posted by "yjshen (via GitHub)" <gi...@apache.org>.
yjshen commented on PR #5837:
URL: https://github.com/apache/arrow-datafusion/pull/5837#issuecomment-1493980630

   I understand the collapsing rule as it removes the requirement of creating a RecordBatch from states and then reading them back for final evaluation.
   
   As for naming this new aggregation mode, I find `Complete` more descriptive when displayed as output, but I have no strong preference.
   ```scala
   ProjectionExec: expr=[l_partkey@0 as l_partkey, ....
     AggregateExec: mode=Single...
         ParquetExec ...
   ```
   ```scala
   ProjectionExec: expr=[l_partkey@0 as l_partkey, ...
     AggregateExec: mode=Complete...
         ParquetExec ...
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #5837: Add new physical rule CombinePartialFinalAggregate

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on code in PR #5837:
URL: https://github.com/apache/arrow-datafusion/pull/5837#discussion_r1162474934


##########
datafusion/core/src/physical_plan/aggregates/mod.rs:
##########
@@ -147,6 +149,24 @@ impl PhysicalGroupBy {
     }
 }
 
+impl PartialEq for PhysicalGroupBy {
+    fn eq(&self, other: &PhysicalGroupBy) -> bool {
+        self.expr.len() == other.expr.len()
+            && self
+                .expr
+                .iter()
+                .zip(other.expr.iter())
+                .all(|((expr1, name1), (expr2, name2))| expr1.eq(expr2) && name1 == name2)

Review Comment:
   It looks like if Struct contains any dynamic trait object, we can not use the `PartialEq`  derive macros.
   
   https://github.com/rust-lang/rust/issues/39128
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on pull request #5837: Add new physical rule CombinePartialFinalAggregate

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on PR #5837:
URL: https://github.com/apache/arrow-datafusion/pull/5837#issuecomment-1501157610

   @Dandandan @yjshen @alamb 
   Would you please help to review and approve this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #5837: Add new physical rule CombinePartialFinalAggregate

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #5837:
URL: https://github.com/apache/arrow-datafusion/pull/5837#issuecomment-1502008421

   I will review this PR carefully today


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org