You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/11/29 15:27:49 UTC
[arrow-datafusion] branch master updated: reimplment eliminate_limit to remove `global-state`. (#4324)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 02da32e2e reimplment eliminate_limit to remove `global-state`. (#4324)
02da32e2e is described below
commit 02da32e2ef7bc40a89f024a3c6f1a9540412f636
Author: jakevin <ja...@gmail.com>
AuthorDate: Tue Nov 29 23:27:41 2022 +0800
reimplment eliminate_limit to remove `global-state`. (#4324)
---
datafusion/optimizer/src/eliminate_limit.rs | 307 ++++++++++------------------
1 file changed, 108 insertions(+), 199 deletions(-)
diff --git a/datafusion/optimizer/src/eliminate_limit.rs b/datafusion/optimizer/src/eliminate_limit.rs
index 2deea4eb9..1cf8b6ad2 100644
--- a/datafusion/optimizer/src/eliminate_limit.rs
+++ b/datafusion/optimizer/src/eliminate_limit.rs
@@ -20,17 +20,12 @@
//! on a plan with an empty relation.
//! This rule also removes OFFSET 0 from the [LogicalPlan]
//! This saves time in planning and executing the query.
-use crate::{OptimizerConfig, OptimizerRule};
+use crate::{utils, OptimizerConfig, OptimizerRule};
use datafusion_common::Result;
-use datafusion_expr::{
- logical_plan::{EmptyRelation, Limit, LogicalPlan},
- utils::from_plan,
-};
+use datafusion_expr::logical_plan::{EmptyRelation, LogicalPlan};
-/// Optimization rule that replaces LIMIT 0 or
-/// LIMIT whose ancestor LIMIT's skip is greater than or equal to current's fetch
-/// with an [LogicalPlan::EmptyRelation].
-/// This rule also removes OFFSET 0 from the [LogicalPlan]
+/// Optimization rule that eliminate LIMIT 0 or useless LIMIT(skip:0, fetch:None).
+/// It can cooperate with `propagate_empty_relation` and `limit_push_down`.
#[derive(Default)]
pub struct EliminateLimit;
@@ -41,104 +36,31 @@ impl EliminateLimit {
}
}
-/// Ancestor indicates the current ancestor in the LogicalPlan tree
-/// when traversing down related to "eliminate limit".
-enum Ancestor {
- /// Limit
- FromLimit { skip: usize },
- /// Other nodes that don't affect the adjustment of "Limit"
- NotRelevant,
-}
-
-/// replaces LIMIT 0 with an [LogicalPlan::EmptyRelation]
-/// replaces LIMIT node whose ancestor LIMIT's skip is greater than or equal to current's fetch
-/// with an [LogicalPlan::EmptyRelation]
-/// removes OFFSET 0 from the [LogicalPlan]
-fn eliminate_limit(
- _optimizer: &EliminateLimit,
- ancestor: &Ancestor,
- plan: &LogicalPlan,
- _optimizer_config: &OptimizerConfig,
-) -> Result<LogicalPlan> {
- match plan {
- LogicalPlan::Limit(Limit {
- skip, fetch, input, ..
- }) => {
- let ancestor_skip = match ancestor {
- Ancestor::FromLimit { skip } => *skip,
- _ => 0,
- };
- // If ancestor's skip is equal or greater than current's fetch,
- // replaces with an [LogicalPlan::EmptyRelation].
- // For such query, the inner query(select * from xxx limit 5) should be optimized as an EmptyRelation:
- // select * from (select * from xxx limit 5) a limit 2 offset 5;
- match fetch {
+impl OptimizerRule for EliminateLimit {
+ fn optimize(
+ &self,
+ plan: &LogicalPlan,
+ optimizer_config: &mut OptimizerConfig,
+ ) -> Result<LogicalPlan> {
+ if let LogicalPlan::Limit(limit) = plan {
+ match limit.fetch {
Some(fetch) => {
- if *fetch == 0 || ancestor_skip >= *fetch {
+ if fetch == 0 {
return Ok(LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
- schema: input.schema().clone(),
+ schema: limit.input.schema().clone(),
}));
}
}
None => {
- if *skip == 0 {
- // If there is no LIMIT and OFFSET is zero, LIMIT/OFFSET can be removed
- return Ok(input.as_ref().clone());
+ if limit.skip == 0 {
+ let input = &*limit.input;
+ return utils::optimize_children(self, input, optimizer_config);
}
}
}
-
- let expr = plan.expressions();
-
- // apply the optimization to all inputs of the plan
- let inputs = plan.inputs();
- let new_inputs = inputs
- .iter()
- .map(|plan| {
- eliminate_limit(
- _optimizer,
- &Ancestor::FromLimit { skip: *skip },
- plan,
- _optimizer_config,
- )
- })
- .collect::<Result<Vec<_>>>()?;
-
- from_plan(plan, &expr, &new_inputs)
- }
- // Rest: recurse and find possible LIMIT 0/Multi LIMIT OFFSET nodes
- _ => {
- // For those plans(projection/sort/..) which do not affect the output rows of sub-plans, we still use ancestor;
- // otherwise, use NotRelevant instead.
- let ancestor = match plan {
- LogicalPlan::Projection { .. } | LogicalPlan::Sort { .. } => ancestor,
- _ => &Ancestor::NotRelevant,
- };
-
- let expr = plan.expressions();
-
- // apply the optimization to all inputs of the plan
- let inputs = plan.inputs();
- let new_inputs = inputs
- .iter()
- .map(|plan| {
- eliminate_limit(_optimizer, ancestor, plan, _optimizer_config)
- })
- .collect::<Result<Vec<_>>>()?;
-
- from_plan(plan, &expr, &new_inputs)
}
- }
-}
-
-impl OptimizerRule for EliminateLimit {
- fn optimize(
- &self,
- plan: &LogicalPlan,
- optimizer_config: &mut OptimizerConfig,
- ) -> Result<LogicalPlan> {
- eliminate_limit(self, &Ancestor::NotRelevant, plan, optimizer_config)
+ utils::optimize_children(self, plan, optimizer_config)
}
fn name(&self) -> &str {
@@ -149,6 +71,7 @@ impl OptimizerRule for EliminateLimit {
#[cfg(test)]
mod tests {
use super::*;
+ use crate::limit_push_down::LimitPushDown;
use crate::test::*;
use datafusion_common::Column;
use datafusion_expr::{
@@ -157,180 +80,166 @@ mod tests {
sum,
};
- fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
- let rule = EliminateLimit::new();
- let optimized_plan = rule
+ fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) -> Result<()> {
+ let optimized_plan = EliminateLimit::new()
.optimize(plan, &mut OptimizerConfig::new())
.expect("failed to optimize plan");
let formatted_plan = format!("{:?}", optimized_plan);
assert_eq!(formatted_plan, expected);
assert_eq!(plan.schema(), optimized_plan.schema());
+ Ok(())
+ }
+
+ fn assert_optimized_plan_eq_with_pushdown(
+ plan: &LogicalPlan,
+ expected: &str,
+ ) -> Result<()> {
+ let optimized_plan = LimitPushDown::new()
+ .optimize(plan, &mut OptimizerConfig::new())
+ .expect("failed to optimize plan");
+ let optimized_plan = EliminateLimit::new()
+ .optimize(&optimized_plan, &mut OptimizerConfig::new())
+ .expect("failed to optimize plan");
+ let formatted_plan = format!("{:?}", optimized_plan);
+ assert_eq!(formatted_plan, expected);
+ assert_eq!(plan.schema(), optimized_plan.schema());
+ Ok(())
}
#[test]
- fn limit_0_root() {
+ fn limit_0_root() -> Result<()> {
let table_scan = test_table_scan().unwrap();
let plan = LogicalPlanBuilder::from(table_scan)
- .aggregate(vec![col("a")], vec![sum(col("b"))])
- .unwrap()
- .limit(0, Some(0))
- .unwrap()
- .build()
- .unwrap();
+ .aggregate(vec![col("a")], vec![sum(col("b"))])?
+ .limit(0, Some(0))?
+ .build()?;
// No aggregate / scan / limit
let expected = "EmptyRelation";
- assert_optimized_plan_eq(&plan, expected);
+ assert_optimized_plan_eq(&plan, expected)
}
#[test]
- fn limit_0_nested() {
- let table_scan = test_table_scan().unwrap();
+ fn limit_0_nested() -> Result<()> {
+ let table_scan = test_table_scan()?;
let plan1 = LogicalPlanBuilder::from(table_scan.clone())
- .aggregate(vec![col("a")], vec![sum(col("b"))])
- .unwrap()
- .build()
- .unwrap();
+ .aggregate(vec![col("a")], vec![sum(col("b"))])?
+ .build()?;
let plan = LogicalPlanBuilder::from(table_scan)
- .aggregate(vec![col("a")], vec![sum(col("b"))])
- .unwrap()
- .limit(0, Some(0))
- .unwrap()
- .union(plan1)
- .unwrap()
- .build()
- .unwrap();
+ .aggregate(vec![col("a")], vec![sum(col("b"))])?
+ .limit(0, Some(0))?
+ .union(plan1)?
+ .build()?;
// Left side is removed
let expected = "Union\
\n EmptyRelation\
\n Aggregate: groupBy=[[test.a]], aggr=[[SUM(test.b)]]\
\n TableScan: test";
- assert_optimized_plan_eq(&plan, expected);
+ assert_optimized_plan_eq(&plan, expected)
}
#[test]
- fn limit_fetch_with_ancestor_limit_skip() {
- let table_scan = test_table_scan().unwrap();
+ fn limit_fetch_with_ancestor_limit_skip() -> Result<()> {
+ let table_scan = test_table_scan()?;
let plan = LogicalPlanBuilder::from(table_scan)
- .aggregate(vec![col("a")], vec![sum(col("b"))])
- .unwrap()
- .limit(0, Some(2))
- .unwrap()
- .limit(2, None)
- .unwrap()
- .build()
- .unwrap();
+ .aggregate(vec![col("a")], vec![sum(col("b"))])?
+ .limit(0, Some(2))?
+ .limit(2, None)?
+ .build()?;
// No aggregate / scan / limit
- let expected = "Limit: skip=2, fetch=None\
- \n EmptyRelation";
- assert_optimized_plan_eq(&plan, expected);
+ let expected = "EmptyRelation";
+ assert_optimized_plan_eq_with_pushdown(&plan, expected)
}
#[test]
- fn multi_limit_offset_sort_eliminate() {
- let table_scan = test_table_scan().unwrap();
+ fn multi_limit_offset_sort_eliminate() -> Result<()> {
+ let table_scan = test_table_scan()?;
let plan = LogicalPlanBuilder::from(table_scan)
- .aggregate(vec![col("a")], vec![sum(col("b"))])
- .unwrap()
- .limit(0, Some(2))
- .unwrap()
- .sort(vec![col("a")])
- .unwrap()
- .limit(2, Some(1))
- .unwrap()
- .build()
- .unwrap();
-
+ .aggregate(vec![col("a")], vec![sum(col("b"))])?
+ .limit(0, Some(2))?
+ .sort(vec![col("a")])?
+ .limit(2, Some(1))?
+ .build()?;
+
+ // After remove global-state, we don't record the parent <skip, fetch>
+ // So, bottom don't know parent info, so can't eliminate.
let expected = "Limit: skip=2, fetch=1\
- \n Sort: test.a\
- \n EmptyRelation";
- assert_optimized_plan_eq(&plan, expected);
+ \n Sort: test.a, fetch=3\
+ \n Limit: skip=0, fetch=2\
+ \n Aggregate: groupBy=[[test.a]], aggr=[[SUM(test.b)]]\
+ \n TableScan: test";
+ assert_optimized_plan_eq_with_pushdown(&plan, expected)
}
#[test]
- fn limit_fetch_with_ancestor_limit_fetch() {
- let table_scan = test_table_scan().unwrap();
+ fn limit_fetch_with_ancestor_limit_fetch() -> Result<()> {
+ let table_scan = test_table_scan()?;
let plan = LogicalPlanBuilder::from(table_scan)
- .aggregate(vec![col("a")], vec![sum(col("b"))])
- .unwrap()
- .limit(0, Some(2))
- .unwrap()
- .sort(vec![col("a")])
- .unwrap()
- .limit(0, Some(1))
- .unwrap()
- .build()
- .unwrap();
+ .aggregate(vec![col("a")], vec![sum(col("b"))])?
+ .limit(0, Some(2))?
+ .sort(vec![col("a")])?
+ .limit(0, Some(1))?
+ .build()?;
let expected = "Limit: skip=0, fetch=1\
\n Sort: test.a\
\n Limit: skip=0, fetch=2\
\n Aggregate: groupBy=[[test.a]], aggr=[[SUM(test.b)]]\
\n TableScan: test";
- assert_optimized_plan_eq(&plan, expected);
+ assert_optimized_plan_eq(&plan, expected)
}
#[test]
- fn limit_with_ancestor_limit() {
+ fn limit_with_ancestor_limit() -> Result<()> {
let table_scan = test_table_scan().unwrap();
let plan = LogicalPlanBuilder::from(table_scan)
- .aggregate(vec![col("a")], vec![sum(col("b"))])
- .unwrap()
- .limit(2, Some(1))
- .unwrap()
- .sort(vec![col("a")])
- .unwrap()
- .limit(3, Some(1))
- .unwrap()
- .build()
- .unwrap();
+ .aggregate(vec![col("a")], vec![sum(col("b"))])?
+ .limit(2, Some(1))?
+ .sort(vec![col("a")])?
+ .limit(3, Some(1))?
+ .build()?;
let expected = "Limit: skip=3, fetch=1\
- \n Sort: test.a\
- \n EmptyRelation";
- assert_optimized_plan_eq(&plan, expected);
+ \n Sort: test.a\
+ \n Limit: skip=2, fetch=1\
+ \n Aggregate: groupBy=[[test.a]], aggr=[[SUM(test.b)]]\
+ \n TableScan: test";
+ assert_optimized_plan_eq(&plan, expected)
}
#[test]
- fn limit_join_with_ancestor_limit() {
- let table_scan = test_table_scan().unwrap();
- let table_scan_inner = test_table_scan_with_name("test1").unwrap();
+ fn limit_join_with_ancestor_limit() -> Result<()> {
+ let table_scan = test_table_scan()?;
+ let table_scan_inner = test_table_scan_with_name("test1")?;
let plan = LogicalPlanBuilder::from(table_scan)
- .limit(2, Some(1))
- .unwrap()
+ .limit(2, Some(1))?
.join_using(
&table_scan_inner,
JoinType::Inner,
vec![Column::from_name("a".to_string())],
- )
- .unwrap()
- .limit(3, Some(1))
- .unwrap()
- .build()
- .unwrap();
+ )?
+ .limit(3, Some(1))?
+ .build()?;
let expected = "Limit: skip=3, fetch=1\
\n Inner Join: Using test.a = test1.a\
\n Limit: skip=2, fetch=1\
\n TableScan: test\
\n TableScan: test1";
- assert_optimized_plan_eq(&plan, expected);
+ assert_optimized_plan_eq(&plan, expected)
}
#[test]
- fn remove_zero_offset() {
- let table_scan = test_table_scan().unwrap();
+ fn remove_zero_offset() -> Result<()> {
+ let table_scan = test_table_scan()?;
let plan = LogicalPlanBuilder::from(table_scan)
- .aggregate(vec![col("a")], vec![sum(col("b"))])
- .unwrap()
- .limit(0, None)
- .unwrap()
- .build()
- .unwrap();
+ .aggregate(vec![col("a")], vec![sum(col("b"))])?
+ .limit(0, None)?
+ .build()?;
let expected = "Aggregate: groupBy=[[test.a]], aggr=[[SUM(test.b)]]\
\n TableScan: test";
- assert_optimized_plan_eq(&plan, expected);
+ assert_optimized_plan_eq(&plan, expected)
}
}