You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by dh...@apache.org on 2021/11/19 10:13:46 UTC
[arrow-datafusion] branch master updated: Optimize the performance queries with a single distinct aggregate (#1315)
This is an automated email from the ASF dual-hosted git repository.
dheres pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new a60cdb0 Optimize the performance queries with a single distinct aggregate (#1315)
a60cdb0 is described below
commit a60cdb07a5b4b4ac524232837dd4631c6c9f639c
Author: ic4y <83...@users.noreply.github.com>
AuthorDate: Fri Nov 19 18:13:41 2021 +0800
Optimize the performance queries with a single distinct aggregate (#1315)
* add single_distinct_to_group_by optimizer rule
* add single_distinct_to_group_by optimizer rule
* Fix a test method and support multiple aggregateFunction
Co-authored-by: liuli <li...@analysys.com.cn>
---
datafusion/src/execution/context.rs | 2 +
datafusion/src/optimizer/mod.rs | 1 +
.../src/optimizer/single_distinct_to_groupby.rs | 322 +++++++++++++++++++++
datafusion/tests/user_defined_plan.rs | 6 +-
4 files changed, 328 insertions(+), 3 deletions(-)
diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs
index 52c5160..5f77b2b 100644
--- a/datafusion/src/execution/context.rs
+++ b/datafusion/src/execution/context.rs
@@ -78,6 +78,7 @@ use crate::physical_optimizer::merge_exec::AddCoalescePartitionsExec;
use crate::physical_optimizer::repartition::Repartition;
use crate::logical_plan::plan::ExplainPlan;
+use crate::optimizer::single_distinct_to_groupby::SingleDistinctToGroupBy;
use crate::physical_plan::planner::DefaultPhysicalPlanner;
use crate::physical_plan::udf::ScalarUDF;
use crate::physical_plan::ExecutionPlan;
@@ -902,6 +903,7 @@ impl Default for ExecutionConfig {
Arc::new(FilterPushDown::new()),
Arc::new(SimplifyExpressions::new()),
Arc::new(LimitPushDown::new()),
+ Arc::new(SingleDistinctToGroupBy::new()),
],
physical_optimizers: vec![
Arc::new(AggregateStatistics::new()),
diff --git a/datafusion/src/optimizer/mod.rs b/datafusion/src/optimizer/mod.rs
index e6ed3b5..419d6bc 100644
--- a/datafusion/src/optimizer/mod.rs
+++ b/datafusion/src/optimizer/mod.rs
@@ -26,4 +26,5 @@ pub mod limit_push_down;
pub mod optimizer;
pub mod projection_push_down;
pub mod simplify_expressions;
+pub mod single_distinct_to_groupby;
pub mod utils;
diff --git a/datafusion/src/optimizer/single_distinct_to_groupby.rs b/datafusion/src/optimizer/single_distinct_to_groupby.rs
new file mode 100644
index 0000000..f6178a2
--- /dev/null
+++ b/datafusion/src/optimizer/single_distinct_to_groupby.rs
@@ -0,0 +1,322 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! single distinct to group by optimizer rule
+
+use crate::error::Result;
+use crate::execution::context::ExecutionProps;
+use crate::logical_plan::{columnize_expr, DFSchema, Expr, LogicalPlan};
+use crate::optimizer::optimizer::OptimizerRule;
+use crate::optimizer::utils;
+use hashbrown::HashSet;
+use std::sync::Arc;
+
+/// single distinct to group by optimizer rule
+/// ```text
+/// SELECT F1(DISTINCT s),F2(DISTINCT s)
+/// ...
+/// GROUP BY k
+///
+/// Into
+///
+/// SELECT F1(s),F2(s)
+/// FROM (
+/// SELECT s, k ... GROUP BY s, k
+/// )
+/// GROUP BY k
+/// ```
+pub struct SingleDistinctToGroupBy {}
+
+impl SingleDistinctToGroupBy {
+ #[allow(missing_docs)]
+ pub fn new() -> Self {
+ Self {}
+ }
+}
+
+fn optimize(plan: &LogicalPlan) -> Result<LogicalPlan> {
+ match plan {
+ LogicalPlan::Aggregate {
+ input,
+ aggr_expr,
+ schema,
+ group_expr,
+ } => {
+ if is_single_distinct_agg(plan) {
+ let mut group_fields_set = HashSet::new();
+ let mut all_group_args = group_expr.clone();
+ // remove distinct and collection args
+ let new_aggr_expr = aggr_expr
+ .iter()
+ .map(|agg_expr| match agg_expr {
+ Expr::AggregateFunction { fun, args, .. } => {
+ // is_single_distinct_agg ensure args.len=1
+ if group_fields_set
+ .insert(args[0].name(input.schema()).unwrap())
+ {
+ all_group_args.push(args[0].clone());
+ }
+ Expr::AggregateFunction {
+ fun: fun.clone(),
+ args: args.clone(),
+ distinct: false,
+ }
+ }
+ _ => agg_expr.clone(),
+ })
+ .collect::<Vec<_>>();
+
+ let all_field = all_group_args
+ .iter()
+ .map(|expr| expr.to_field(input.schema()).unwrap())
+ .collect::<Vec<_>>();
+
+ let grouped_schema = DFSchema::new(all_field).unwrap();
+ let grouped_agg = LogicalPlan::Aggregate {
+ input: input.clone(),
+ group_expr: all_group_args,
+ aggr_expr: Vec::new(),
+ schema: Arc::new(grouped_schema.clone()),
+ };
+ let grouped_agg = optimize_children(&grouped_agg);
+ let final_agg_schema = Arc::new(
+ DFSchema::new(
+ group_expr
+ .iter()
+ .chain(new_aggr_expr.iter())
+ .map(|expr| expr.to_field(&grouped_schema).unwrap())
+ .collect::<Vec<_>>(),
+ )
+ .unwrap(),
+ );
+
+ let final_agg = LogicalPlan::Aggregate {
+ input: Arc::new(grouped_agg.unwrap()),
+ group_expr: group_expr.clone(),
+ aggr_expr: new_aggr_expr,
+ schema: final_agg_schema.clone(),
+ };
+
+ //so the aggregates are displayed in the same way even after the rewrite
+ let mut alias_expr: Vec<Expr> = Vec::new();
+ final_agg
+ .expressions()
+ .iter()
+ .enumerate()
+ .for_each(|(i, field)| {
+ alias_expr.push(columnize_expr(
+ field.clone().alias(schema.clone().fields()[i].name()),
+ &final_agg_schema,
+ ));
+ });
+
+ Ok(LogicalPlan::Projection {
+ expr: alias_expr,
+ input: Arc::new(final_agg),
+ schema: schema.clone(),
+ alias: Option::None,
+ })
+ } else {
+ optimize_children(plan)
+ }
+ }
+ _ => optimize_children(plan),
+ }
+}
+
+fn optimize_children(plan: &LogicalPlan) -> Result<LogicalPlan> {
+ let expr = plan.expressions();
+ let inputs = plan.inputs();
+ let new_inputs = inputs
+ .iter()
+ .map(|plan| optimize(plan))
+ .collect::<Result<Vec<_>>>()?;
+ utils::from_plan(plan, &expr, &new_inputs)
+}
+
+fn is_single_distinct_agg(plan: &LogicalPlan) -> bool {
+ match plan {
+ LogicalPlan::Aggregate {
+ input, aggr_expr, ..
+ } => {
+ let mut fields_set = HashSet::new();
+ aggr_expr
+ .iter()
+ .filter(|expr| {
+ let mut is_distinct = false;
+ if let Expr::AggregateFunction { distinct, args, .. } = expr {
+ is_distinct = *distinct;
+ args.iter().for_each(|expr| {
+ fields_set.insert(expr.name(input.schema()).unwrap());
+ })
+ }
+ is_distinct
+ })
+ .count()
+ == aggr_expr.len()
+ && fields_set.len() == 1
+ }
+ _ => false,
+ }
+}
+
+impl OptimizerRule for SingleDistinctToGroupBy {
+ fn optimize(
+ &self,
+ plan: &LogicalPlan,
+ _execution_props: &ExecutionProps,
+ ) -> Result<LogicalPlan> {
+ optimize(plan)
+ }
+ fn name(&self) -> &str {
+ "SingleDistinctAggregationToGroupBy"
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::logical_plan::{col, count, count_distinct, max, LogicalPlanBuilder};
+ use crate::physical_plan::aggregates;
+ use crate::test::*;
+
+ fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
+ let rule = SingleDistinctToGroupBy::new();
+ let optimized_plan = rule
+ .optimize(plan, &ExecutionProps::new())
+ .expect("failed to optimize plan");
+ let formatted_plan = format!("{}", optimized_plan.display_indent_schema());
+ assert_eq!(formatted_plan, expected);
+ }
+
+ #[test]
+ fn not_exist_distinct() -> Result<()> {
+ let table_scan = test_table_scan()?;
+
+ let plan = LogicalPlanBuilder::from(table_scan)
+ .aggregate(Vec::<Expr>::new(), vec![max(col("b"))])?
+ .build()?;
+
+ // Do nothing
+ let expected = "Aggregate: groupBy=[[]], aggr=[[MAX(#test.b)]] [MAX(test.b):UInt32;N]\
+ \n TableScan: test projection=None [a:UInt32, b:UInt32, c:UInt32]";
+
+ assert_optimized_plan_eq(&plan, expected);
+ Ok(())
+ }
+
+ #[test]
+ fn single_distinct() -> Result<()> {
+ let table_scan = test_table_scan()?;
+
+ let plan = LogicalPlanBuilder::from(table_scan)
+ .aggregate(Vec::<Expr>::new(), vec![count_distinct(col("b"))])?
+ .build()?;
+
+ // Should work
+ let expected = "Projection: #COUNT(test.b) AS COUNT(DISTINCT test.b) [COUNT(DISTINCT test.b):UInt64;N]\
+ \n Aggregate: groupBy=[[]], aggr=[[COUNT(#test.b)]] [COUNT(test.b):UInt64;N]\
+ \n Aggregate: groupBy=[[#test.b]], aggr=[[]] [b:UInt32]\
+ \n TableScan: test projection=None [a:UInt32, b:UInt32, c:UInt32]";
+
+ assert_optimized_plan_eq(&plan, expected);
+ Ok(())
+ }
+
+ #[test]
+ fn single_distinct_and_groupby() -> Result<()> {
+ let table_scan = test_table_scan()?;
+
+ let plan = LogicalPlanBuilder::from(table_scan)
+ .aggregate(vec![col("a")], vec![count_distinct(col("b"))])?
+ .build()?;
+
+ // Should work
+ let expected = "Projection: #test.a AS a, #COUNT(test.b) AS COUNT(DISTINCT test.b) [a:UInt32, COUNT(DISTINCT test.b):UInt64;N]\
+ \n Aggregate: groupBy=[[#test.a]], aggr=[[COUNT(#test.b)]] [a:UInt32, COUNT(test.b):UInt64;N]\
+ \n Aggregate: groupBy=[[#test.a, #test.b]], aggr=[[]] [a:UInt32, b:UInt32]\
+ \n TableScan: test projection=None [a:UInt32, b:UInt32, c:UInt32]";
+
+ assert_optimized_plan_eq(&plan, expected);
+ Ok(())
+ }
+
+ #[test]
+ fn two_distinct_and_groupby() -> Result<()> {
+ let table_scan = test_table_scan()?;
+
+ let plan = LogicalPlanBuilder::from(table_scan)
+ .aggregate(
+ vec![col("a")],
+ vec![count_distinct(col("b")), count_distinct(col("c"))],
+ )?
+ .build()?;
+
+ // Do nothing
+ let expected = "Aggregate: groupBy=[[#test.a]], aggr=[[COUNT(DISTINCT #test.b), COUNT(DISTINCT #test.c)]] [a:UInt32, COUNT(DISTINCT test.b):UInt64;N, COUNT(DISTINCT test.c):UInt64;N]\
+ \n TableScan: test projection=None [a:UInt32, b:UInt32, c:UInt32]";
+
+ assert_optimized_plan_eq(&plan, expected);
+ Ok(())
+ }
+
+ #[test]
+ fn one_field_two_distinct_and_groupby() -> Result<()> {
+ let table_scan = test_table_scan()?;
+
+ let plan = LogicalPlanBuilder::from(table_scan)
+ .aggregate(
+ vec![col("a")],
+ vec![
+ count_distinct(col("b")),
+ Expr::AggregateFunction {
+ fun: aggregates::AggregateFunction::Max,
+ distinct: true,
+ args: vec![col("b")],
+ },
+ ],
+ )?
+ .build()?;
+ // Should work
+ let expected = "Projection: #test.a AS a, #COUNT(test.b) AS COUNT(DISTINCT test.b), #MAX(test.b) AS MAX(DISTINCT test.b) [a:UInt32, COUNT(DISTINCT test.b):UInt64;N, MAX(DISTINCT test.b):UInt32;N]\
+ \n Aggregate: groupBy=[[#test.a]], aggr=[[COUNT(#test.b), MAX(#test.b)]] [a:UInt32, COUNT(test.b):UInt64;N, MAX(test.b):UInt32;N]\
+ \n Aggregate: groupBy=[[#test.a, #test.b]], aggr=[[]] [a:UInt32, b:UInt32]\
+ \n TableScan: test projection=None [a:UInt32, b:UInt32, c:UInt32]";
+
+ assert_optimized_plan_eq(&plan, expected);
+ Ok(())
+ }
+
+ #[test]
+ fn distinct_and_common() -> Result<()> {
+ let table_scan = test_table_scan()?;
+
+ let plan = LogicalPlanBuilder::from(table_scan)
+ .aggregate(
+ vec![col("a")],
+ vec![count_distinct(col("b")), count(col("c"))],
+ )?
+ .build()?;
+
+ // Do nothing
+ let expected = "Aggregate: groupBy=[[#test.a]], aggr=[[COUNT(DISTINCT #test.b), COUNT(#test.c)]] [a:UInt32, COUNT(DISTINCT test.b):UInt64;N, COUNT(test.c):UInt64;N]\
+ \n TableScan: test projection=None [a:UInt32, b:UInt32, c:UInt32]";
+
+ assert_optimized_plan_eq(&plan, expected);
+ Ok(())
+ }
+}
diff --git a/datafusion/tests/user_defined_plan.rs b/datafusion/tests/user_defined_plan.rs
index 46bc70b..74fbd4e 100644
--- a/datafusion/tests/user_defined_plan.rs
+++ b/datafusion/tests/user_defined_plan.rs
@@ -217,9 +217,9 @@ async fn topk_plan() -> Result<()> {
let mut ctx = setup_table(make_topk_context()).await?;
let expected = vec![
- "| logical_plan after topk | TopK: k=3 |",
- "| | Projection: #sales.customer_id, #sales.revenue |",
- "| | TableScan: sales projection=Some([0, 1]) |",
+ "| logical_plan after topk | TopK: k=3 |",
+ "| | Projection: #sales.customer_id, #sales.revenue |",
+ "| | TableScan: sales projection=Some([0, 1]) |",
].join("\n");
let explain_query = format!("EXPLAIN VERBOSE {}", QUERY);