You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/11/29 15:27:49 UTC

[arrow-datafusion] branch master updated: reimplment eliminate_limit to remove `global-state`. (#4324)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 02da32e2e reimplment eliminate_limit to remove `global-state`. (#4324)
02da32e2e is described below

commit 02da32e2ef7bc40a89f024a3c6f1a9540412f636
Author: jakevin <ja...@gmail.com>
AuthorDate: Tue Nov 29 23:27:41 2022 +0800

    reimplment eliminate_limit to remove `global-state`. (#4324)
---
 datafusion/optimizer/src/eliminate_limit.rs | 307 ++++++++++------------------
 1 file changed, 108 insertions(+), 199 deletions(-)

diff --git a/datafusion/optimizer/src/eliminate_limit.rs b/datafusion/optimizer/src/eliminate_limit.rs
index 2deea4eb9..1cf8b6ad2 100644
--- a/datafusion/optimizer/src/eliminate_limit.rs
+++ b/datafusion/optimizer/src/eliminate_limit.rs
@@ -20,17 +20,12 @@
 //! on a plan with an empty relation.
 //! This rule also removes OFFSET 0 from the [LogicalPlan]
 //! This saves time in planning and executing the query.
-use crate::{OptimizerConfig, OptimizerRule};
+use crate::{utils, OptimizerConfig, OptimizerRule};
 use datafusion_common::Result;
-use datafusion_expr::{
-    logical_plan::{EmptyRelation, Limit, LogicalPlan},
-    utils::from_plan,
-};
+use datafusion_expr::logical_plan::{EmptyRelation, LogicalPlan};
 
-/// Optimization rule that replaces LIMIT 0 or
-/// LIMIT whose ancestor LIMIT's skip is greater than or equal to current's fetch
-/// with an [LogicalPlan::EmptyRelation].
-/// This rule also removes OFFSET 0 from the [LogicalPlan]
+/// Optimization rule that eliminate LIMIT 0 or useless LIMIT(skip:0, fetch:None).
+/// It can cooperate with `propagate_empty_relation` and `limit_push_down`.
 #[derive(Default)]
 pub struct EliminateLimit;
 
@@ -41,104 +36,31 @@ impl EliminateLimit {
     }
 }
 
-/// Ancestor indicates the current ancestor in the LogicalPlan tree
-/// when traversing down related to "eliminate limit".
-enum Ancestor {
-    /// Limit
-    FromLimit { skip: usize },
-    /// Other nodes that don't affect the adjustment of "Limit"
-    NotRelevant,
-}
-
-/// replaces LIMIT 0 with an [LogicalPlan::EmptyRelation]
-/// replaces LIMIT node whose ancestor LIMIT's skip is greater than or equal to current's fetch
-/// with an [LogicalPlan::EmptyRelation]
-/// removes OFFSET 0 from the [LogicalPlan]
-fn eliminate_limit(
-    _optimizer: &EliminateLimit,
-    ancestor: &Ancestor,
-    plan: &LogicalPlan,
-    _optimizer_config: &OptimizerConfig,
-) -> Result<LogicalPlan> {
-    match plan {
-        LogicalPlan::Limit(Limit {
-            skip, fetch, input, ..
-        }) => {
-            let ancestor_skip = match ancestor {
-                Ancestor::FromLimit { skip } => *skip,
-                _ => 0,
-            };
-            // If ancestor's skip is equal or greater than current's fetch,
-            // replaces with an [LogicalPlan::EmptyRelation].
-            // For such query, the inner query(select * from xxx limit 5) should be optimized as an EmptyRelation:
-            // select * from (select * from xxx limit 5) a limit 2 offset 5;
-            match fetch {
+impl OptimizerRule for EliminateLimit {
+    fn optimize(
+        &self,
+        plan: &LogicalPlan,
+        optimizer_config: &mut OptimizerConfig,
+    ) -> Result<LogicalPlan> {
+        if let LogicalPlan::Limit(limit) = plan {
+            match limit.fetch {
                 Some(fetch) => {
-                    if *fetch == 0 || ancestor_skip >= *fetch {
+                    if fetch == 0 {
                         return Ok(LogicalPlan::EmptyRelation(EmptyRelation {
                             produce_one_row: false,
-                            schema: input.schema().clone(),
+                            schema: limit.input.schema().clone(),
                         }));
                     }
                 }
                 None => {
-                    if *skip == 0 {
-                        // If there is no LIMIT and OFFSET is zero, LIMIT/OFFSET can be removed
-                        return Ok(input.as_ref().clone());
+                    if limit.skip == 0 {
+                        let input = &*limit.input;
+                        return utils::optimize_children(self, input, optimizer_config);
                     }
                 }
             }
-
-            let expr = plan.expressions();
-
-            // apply the optimization to all inputs of the plan
-            let inputs = plan.inputs();
-            let new_inputs = inputs
-                .iter()
-                .map(|plan| {
-                    eliminate_limit(
-                        _optimizer,
-                        &Ancestor::FromLimit { skip: *skip },
-                        plan,
-                        _optimizer_config,
-                    )
-                })
-                .collect::<Result<Vec<_>>>()?;
-
-            from_plan(plan, &expr, &new_inputs)
-        }
-        // Rest: recurse and find possible LIMIT 0/Multi LIMIT OFFSET nodes
-        _ => {
-            // For those plans(projection/sort/..) which do not affect the output rows of sub-plans, we still use ancestor;
-            // otherwise, use NotRelevant instead.
-            let ancestor = match plan {
-                LogicalPlan::Projection { .. } | LogicalPlan::Sort { .. } => ancestor,
-                _ => &Ancestor::NotRelevant,
-            };
-
-            let expr = plan.expressions();
-
-            // apply the optimization to all inputs of the plan
-            let inputs = plan.inputs();
-            let new_inputs = inputs
-                .iter()
-                .map(|plan| {
-                    eliminate_limit(_optimizer, ancestor, plan, _optimizer_config)
-                })
-                .collect::<Result<Vec<_>>>()?;
-
-            from_plan(plan, &expr, &new_inputs)
         }
-    }
-}
-
-impl OptimizerRule for EliminateLimit {
-    fn optimize(
-        &self,
-        plan: &LogicalPlan,
-        optimizer_config: &mut OptimizerConfig,
-    ) -> Result<LogicalPlan> {
-        eliminate_limit(self, &Ancestor::NotRelevant, plan, optimizer_config)
+        utils::optimize_children(self, plan, optimizer_config)
     }
 
     fn name(&self) -> &str {
@@ -149,6 +71,7 @@ impl OptimizerRule for EliminateLimit {
 #[cfg(test)]
 mod tests {
     use super::*;
+    use crate::limit_push_down::LimitPushDown;
     use crate::test::*;
     use datafusion_common::Column;
     use datafusion_expr::{
@@ -157,180 +80,166 @@ mod tests {
         sum,
     };
 
-    fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
-        let rule = EliminateLimit::new();
-        let optimized_plan = rule
+    fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) -> Result<()> {
+        let optimized_plan = EliminateLimit::new()
             .optimize(plan, &mut OptimizerConfig::new())
             .expect("failed to optimize plan");
         let formatted_plan = format!("{:?}", optimized_plan);
         assert_eq!(formatted_plan, expected);
         assert_eq!(plan.schema(), optimized_plan.schema());
+        Ok(())
+    }
+
+    fn assert_optimized_plan_eq_with_pushdown(
+        plan: &LogicalPlan,
+        expected: &str,
+    ) -> Result<()> {
+        let optimized_plan = LimitPushDown::new()
+            .optimize(plan, &mut OptimizerConfig::new())
+            .expect("failed to optimize plan");
+        let optimized_plan = EliminateLimit::new()
+            .optimize(&optimized_plan, &mut OptimizerConfig::new())
+            .expect("failed to optimize plan");
+        let formatted_plan = format!("{:?}", optimized_plan);
+        assert_eq!(formatted_plan, expected);
+        assert_eq!(plan.schema(), optimized_plan.schema());
+        Ok(())
     }
 
     #[test]
-    fn limit_0_root() {
+    fn limit_0_root() -> Result<()> {
         let table_scan = test_table_scan().unwrap();
         let plan = LogicalPlanBuilder::from(table_scan)
-            .aggregate(vec![col("a")], vec![sum(col("b"))])
-            .unwrap()
-            .limit(0, Some(0))
-            .unwrap()
-            .build()
-            .unwrap();
+            .aggregate(vec![col("a")], vec![sum(col("b"))])?
+            .limit(0, Some(0))?
+            .build()?;
         // No aggregate / scan / limit
         let expected = "EmptyRelation";
-        assert_optimized_plan_eq(&plan, expected);
+        assert_optimized_plan_eq(&plan, expected)
     }
 
     #[test]
-    fn limit_0_nested() {
-        let table_scan = test_table_scan().unwrap();
+    fn limit_0_nested() -> Result<()> {
+        let table_scan = test_table_scan()?;
         let plan1 = LogicalPlanBuilder::from(table_scan.clone())
-            .aggregate(vec![col("a")], vec![sum(col("b"))])
-            .unwrap()
-            .build()
-            .unwrap();
+            .aggregate(vec![col("a")], vec![sum(col("b"))])?
+            .build()?;
         let plan = LogicalPlanBuilder::from(table_scan)
-            .aggregate(vec![col("a")], vec![sum(col("b"))])
-            .unwrap()
-            .limit(0, Some(0))
-            .unwrap()
-            .union(plan1)
-            .unwrap()
-            .build()
-            .unwrap();
+            .aggregate(vec![col("a")], vec![sum(col("b"))])?
+            .limit(0, Some(0))?
+            .union(plan1)?
+            .build()?;
 
         // Left side is removed
         let expected = "Union\
             \n  EmptyRelation\
             \n  Aggregate: groupBy=[[test.a]], aggr=[[SUM(test.b)]]\
             \n    TableScan: test";
-        assert_optimized_plan_eq(&plan, expected);
+        assert_optimized_plan_eq(&plan, expected)
     }
 
     #[test]
-    fn limit_fetch_with_ancestor_limit_skip() {
-        let table_scan = test_table_scan().unwrap();
+    fn limit_fetch_with_ancestor_limit_skip() -> Result<()> {
+        let table_scan = test_table_scan()?;
         let plan = LogicalPlanBuilder::from(table_scan)
-            .aggregate(vec![col("a")], vec![sum(col("b"))])
-            .unwrap()
-            .limit(0, Some(2))
-            .unwrap()
-            .limit(2, None)
-            .unwrap()
-            .build()
-            .unwrap();
+            .aggregate(vec![col("a")], vec![sum(col("b"))])?
+            .limit(0, Some(2))?
+            .limit(2, None)?
+            .build()?;
 
         // No aggregate / scan / limit
-        let expected = "Limit: skip=2, fetch=None\
-            \n  EmptyRelation";
-        assert_optimized_plan_eq(&plan, expected);
+        let expected = "EmptyRelation";
+        assert_optimized_plan_eq_with_pushdown(&plan, expected)
     }
 
     #[test]
-    fn multi_limit_offset_sort_eliminate() {
-        let table_scan = test_table_scan().unwrap();
+    fn multi_limit_offset_sort_eliminate() -> Result<()> {
+        let table_scan = test_table_scan()?;
         let plan = LogicalPlanBuilder::from(table_scan)
-            .aggregate(vec![col("a")], vec![sum(col("b"))])
-            .unwrap()
-            .limit(0, Some(2))
-            .unwrap()
-            .sort(vec![col("a")])
-            .unwrap()
-            .limit(2, Some(1))
-            .unwrap()
-            .build()
-            .unwrap();
-
+            .aggregate(vec![col("a")], vec![sum(col("b"))])?
+            .limit(0, Some(2))?
+            .sort(vec![col("a")])?
+            .limit(2, Some(1))?
+            .build()?;
+
+        // After remove global-state, we don't record the parent <skip, fetch>
+        // So, bottom don't know parent info, so can't eliminate.
         let expected = "Limit: skip=2, fetch=1\
-            \n  Sort: test.a\
-            \n    EmptyRelation";
-        assert_optimized_plan_eq(&plan, expected);
+        \n  Sort: test.a, fetch=3\
+        \n    Limit: skip=0, fetch=2\
+        \n      Aggregate: groupBy=[[test.a]], aggr=[[SUM(test.b)]]\
+        \n        TableScan: test";
+        assert_optimized_plan_eq_with_pushdown(&plan, expected)
     }
 
     #[test]
-    fn limit_fetch_with_ancestor_limit_fetch() {
-        let table_scan = test_table_scan().unwrap();
+    fn limit_fetch_with_ancestor_limit_fetch() -> Result<()> {
+        let table_scan = test_table_scan()?;
         let plan = LogicalPlanBuilder::from(table_scan)
-            .aggregate(vec![col("a")], vec![sum(col("b"))])
-            .unwrap()
-            .limit(0, Some(2))
-            .unwrap()
-            .sort(vec![col("a")])
-            .unwrap()
-            .limit(0, Some(1))
-            .unwrap()
-            .build()
-            .unwrap();
+            .aggregate(vec![col("a")], vec![sum(col("b"))])?
+            .limit(0, Some(2))?
+            .sort(vec![col("a")])?
+            .limit(0, Some(1))?
+            .build()?;
 
         let expected = "Limit: skip=0, fetch=1\
             \n  Sort: test.a\
             \n    Limit: skip=0, fetch=2\
             \n      Aggregate: groupBy=[[test.a]], aggr=[[SUM(test.b)]]\
             \n        TableScan: test";
-        assert_optimized_plan_eq(&plan, expected);
+        assert_optimized_plan_eq(&plan, expected)
     }
 
     #[test]
-    fn limit_with_ancestor_limit() {
+    fn limit_with_ancestor_limit() -> Result<()> {
         let table_scan = test_table_scan().unwrap();
         let plan = LogicalPlanBuilder::from(table_scan)
-            .aggregate(vec![col("a")], vec![sum(col("b"))])
-            .unwrap()
-            .limit(2, Some(1))
-            .unwrap()
-            .sort(vec![col("a")])
-            .unwrap()
-            .limit(3, Some(1))
-            .unwrap()
-            .build()
-            .unwrap();
+            .aggregate(vec![col("a")], vec![sum(col("b"))])?
+            .limit(2, Some(1))?
+            .sort(vec![col("a")])?
+            .limit(3, Some(1))?
+            .build()?;
 
         let expected = "Limit: skip=3, fetch=1\
-            \n  Sort: test.a\
-            \n    EmptyRelation";
-        assert_optimized_plan_eq(&plan, expected);
+        \n  Sort: test.a\
+        \n    Limit: skip=2, fetch=1\
+        \n      Aggregate: groupBy=[[test.a]], aggr=[[SUM(test.b)]]\
+        \n        TableScan: test";
+        assert_optimized_plan_eq(&plan, expected)
     }
 
     #[test]
-    fn limit_join_with_ancestor_limit() {
-        let table_scan = test_table_scan().unwrap();
-        let table_scan_inner = test_table_scan_with_name("test1").unwrap();
+    fn limit_join_with_ancestor_limit() -> Result<()> {
+        let table_scan = test_table_scan()?;
+        let table_scan_inner = test_table_scan_with_name("test1")?;
         let plan = LogicalPlanBuilder::from(table_scan)
-            .limit(2, Some(1))
-            .unwrap()
+            .limit(2, Some(1))?
             .join_using(
                 &table_scan_inner,
                 JoinType::Inner,
                 vec![Column::from_name("a".to_string())],
-            )
-            .unwrap()
-            .limit(3, Some(1))
-            .unwrap()
-            .build()
-            .unwrap();
+            )?
+            .limit(3, Some(1))?
+            .build()?;
 
         let expected = "Limit: skip=3, fetch=1\
             \n  Inner Join: Using test.a = test1.a\
             \n    Limit: skip=2, fetch=1\
             \n      TableScan: test\
             \n    TableScan: test1";
-        assert_optimized_plan_eq(&plan, expected);
+        assert_optimized_plan_eq(&plan, expected)
     }
 
     #[test]
-    fn remove_zero_offset() {
-        let table_scan = test_table_scan().unwrap();
+    fn remove_zero_offset() -> Result<()> {
+        let table_scan = test_table_scan()?;
         let plan = LogicalPlanBuilder::from(table_scan)
-            .aggregate(vec![col("a")], vec![sum(col("b"))])
-            .unwrap()
-            .limit(0, None)
-            .unwrap()
-            .build()
-            .unwrap();
+            .aggregate(vec![col("a")], vec![sum(col("b"))])?
+            .limit(0, None)?
+            .build()?;
 
         let expected = "Aggregate: groupBy=[[test.a]], aggr=[[SUM(test.b)]]\
             \n  TableScan: test";
-        assert_optimized_plan_eq(&plan, expected);
+        assert_optimized_plan_eq(&plan, expected)
     }
 }