You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2022/05/20 14:07:15 UTC

[arrow-datafusion] branch master updated: Support limit push down for offset_plan (#2566)

This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 753e6f84c Support limit push down for offset_plan (#2566)
753e6f84c is described below

commit 753e6f84c69a9e3acf4964b6faccfbe8ae95394b
Author: Yang Jiang <37...@users.noreply.github.com>
AuthorDate: Fri May 20 22:07:10 2022 +0800

    Support limit push down for offset_plan (#2566)
    
    * support offset push down
    
    * change the limit and offset order
    
    * add test for join and subquery
    
    * fmt
    
    * Apply suggestions from code review
    
    Co-authored-by: Andy Grove <an...@gmail.com>
    
    * add sql test in planner.rs
    
    * add sql test in planner.rs
    
    * fix clippy
    
    Co-authored-by: Andy Grove <an...@gmail.com>
---
 datafusion/core/src/optimizer/limit_push_down.rs | 210 ++++++++++++++++++++++-
 datafusion/core/src/sql/planner.rs               |  47 ++++-
 2 files changed, 247 insertions(+), 10 deletions(-)

diff --git a/datafusion/core/src/optimizer/limit_push_down.rs b/datafusion/core/src/optimizer/limit_push_down.rs
index 0c68f1761..a52fd40df 100644
--- a/datafusion/core/src/optimizer/limit_push_down.rs
+++ b/datafusion/core/src/optimizer/limit_push_down.rs
@@ -24,6 +24,7 @@ use crate::logical_plan::plan::Projection;
 use crate::logical_plan::{Limit, TableScan};
 use crate::logical_plan::{LogicalPlan, Union};
 use crate::optimizer::optimizer::OptimizerRule;
+use datafusion_expr::logical_plan::Offset;
 use std::sync::Arc;
 
 /// Optimization rule that tries pushes down LIMIT n
@@ -43,18 +44,24 @@ fn limit_push_down(
     upper_limit: Option<usize>,
     plan: &LogicalPlan,
     _execution_props: &ExecutionProps,
+    is_offset: bool,
 ) -> Result<LogicalPlan> {
     match (plan, upper_limit) {
         (LogicalPlan::Limit(Limit { n, input }), upper_limit) => {
-            let smallest = upper_limit.map(|x| std::cmp::min(x, *n)).unwrap_or(*n);
+            let new_limit: usize = if is_offset {
+                *n + upper_limit.unwrap_or(0)
+            } else {
+                upper_limit.map(|x| std::cmp::min(x, *n)).unwrap_or(*n)
+            };
             Ok(LogicalPlan::Limit(Limit {
-                n: smallest,
+                n: new_limit,
                 // push down limit to plan (minimum of upper limit and current limit)
                 input: Arc::new(limit_push_down(
                     _optimizer,
-                    Some(smallest),
+                    Some(new_limit),
                     input.as_ref(),
                     _execution_props,
+                    false,
                 )?),
             }))
         }
@@ -95,6 +102,7 @@ fn limit_push_down(
                     upper_limit,
                     input.as_ref(),
                     _execution_props,
+                    false,
                 )?),
                 schema: schema.clone(),
                 alias: alias.clone(),
@@ -119,6 +127,7 @@ fn limit_push_down(
                             Some(upper_limit),
                             x,
                             _execution_props,
+                            false,
                         )?),
                     }))
                 })
@@ -129,6 +138,25 @@ fn limit_push_down(
                 schema: schema.clone(),
             }))
         }
+        // offset 5 limit 10 then push limit 15 (5 + 10)
+        // Limit should always be Offset's input
+        (LogicalPlan::Offset(Offset { offset, input }), upper_limit) => {
+            let new_limit = if let Some(ul) = upper_limit {
+                ul + *offset
+            } else {
+                *offset
+            };
+            Ok(LogicalPlan::Offset(Offset {
+                offset: *offset,
+                input: Arc::new(limit_push_down(
+                    _optimizer,
+                    Some(new_limit),
+                    input.as_ref(),
+                    _execution_props,
+                    true,
+                )?),
+            }))
+        }
         // For other nodes we can't push down the limit
         // But try to recurse and find other limit nodes to push down
         _ => {
@@ -138,7 +166,9 @@ fn limit_push_down(
             let inputs = plan.inputs();
             let new_inputs = inputs
                 .iter()
-                .map(|plan| limit_push_down(_optimizer, None, plan, _execution_props))
+                .map(|plan| {
+                    limit_push_down(_optimizer, None, plan, _execution_props, false)
+                })
                 .collect::<Result<Vec<_>>>()?;
 
             utils::from_plan(plan, &expr, &new_inputs)
@@ -152,7 +182,7 @@ impl OptimizerRule for LimitPushDown {
         plan: &LogicalPlan,
         execution_props: &ExecutionProps,
     ) -> Result<LogicalPlan> {
-        limit_push_down(self, None, plan, execution_props)
+        limit_push_down(self, None, plan, execution_props, false)
     }
 
     fn name(&self) -> &str {
@@ -167,6 +197,8 @@ mod test {
         logical_plan::{col, max, LogicalPlan, LogicalPlanBuilder},
         test::*,
     };
+    use datafusion_expr::exists;
+    use datafusion_expr::logical_plan::JoinType;
 
     fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
         let rule = LimitPushDown::new();
@@ -278,4 +310,172 @@ mod test {
 
         Ok(())
     }
+
+    #[test]
+    fn limit_pushdown_with_offset_projection_table_provider() -> Result<()> {
+        let table_scan = test_table_scan()?;
+
+        let plan = LogicalPlanBuilder::from(table_scan)
+            .project(vec![col("a")])?
+            .offset(10)?
+            .limit(1000)?
+            .build()?;
+
+        // Should push the limit down to table provider
+        // When it has a select
+        let expected = "Limit: 1000\
+        \n  Offset: 10\
+        \n    Projection: #test.a\
+        \n      TableScan: test projection=None, limit=1010";
+
+        assert_optimized_plan_eq(&plan, expected);
+
+        Ok(())
+    }
+
+    #[test]
+    fn limit_pushdown_with_offset_after_limit() -> Result<()> {
+        let table_scan = test_table_scan()?;
+
+        let plan = LogicalPlanBuilder::from(table_scan)
+            .project(vec![col("a")])?
+            .limit(1000)?
+            .offset(10)?
+            .build()?;
+
+        let expected = "Offset: 10\
+        \n  Limit: 1010\
+        \n    Projection: #test.a\
+        \n      TableScan: test projection=None, limit=1010";
+
+        assert_optimized_plan_eq(&plan, expected);
+
+        Ok(())
+    }
+
+    #[test]
+    fn limit_push_down_with_offset_take_smaller_limit() -> Result<()> {
+        let table_scan = test_table_scan()?;
+
+        let plan = LogicalPlanBuilder::from(table_scan)
+            .offset(10)?
+            .limit(1000)?
+            .limit(10)?
+            .build()?;
+
+        // Should push down the smallest limit
+        // Towards table scan
+        // This rule doesn't replace multiple limits
+        let expected = "Limit: 10\
+        \n  Limit: 10\
+        \n    Offset: 10\
+        \n      TableScan: test projection=None, limit=20";
+
+        assert_optimized_plan_eq(&plan, expected);
+
+        Ok(())
+    }
+
+    #[test]
+    fn limit_doesnt_push_down_with_offset_aggregation() -> Result<()> {
+        let table_scan = test_table_scan()?;
+
+        let plan = LogicalPlanBuilder::from(table_scan)
+            .aggregate(vec![col("a")], vec![max(col("b"))])?
+            .offset(10)?
+            .limit(1000)?
+            .build()?;
+
+        // Limit should *not* push down aggregate node
+        let expected = "Limit: 1000\
+        \n  Offset: 10\
+        \n    Aggregate: groupBy=[[#test.a]], aggr=[[MAX(#test.b)]]\
+        \n      TableScan: test projection=None";
+
+        assert_optimized_plan_eq(&plan, expected);
+
+        Ok(())
+    }
+
+    #[test]
+    fn limit_should_push_down_with_offset_union() -> Result<()> {
+        let table_scan = test_table_scan()?;
+
+        let plan = LogicalPlanBuilder::from(table_scan.clone())
+            .union(LogicalPlanBuilder::from(table_scan).build()?)?
+            .offset(10)?
+            .limit(1000)?
+            .build()?;
+
+        // Limit should push down through union
+        let expected = "Limit: 1000\
+        \n  Offset: 10\
+        \n    Union\
+        \n      Limit: 1010\
+        \n        TableScan: test projection=None, limit=1010\
+        \n      Limit: 1010\
+        \n        TableScan: test projection=None, limit=1010";
+
+        assert_optimized_plan_eq(&plan, expected);
+
+        Ok(())
+    }
+
+    #[test]
+    fn limit_should_not_push_down_with_offset_join() -> Result<()> {
+        let table_scan_1 = test_table_scan()?;
+        let table_scan_2 = test_table_scan_with_name("test2")?;
+
+        let plan = LogicalPlanBuilder::from(table_scan_1)
+            .join(
+                &LogicalPlanBuilder::from(table_scan_2).build()?,
+                JoinType::Left,
+                (vec!["a"], vec!["a"]),
+            )?
+            .limit(1000)?
+            .offset(10)?
+            .build()?;
+
+        // Limit pushdown Not supported in Join
+        let expected = "Offset: 10\
+        \n  Limit: 1010\
+        \n    Left Join: #test.a = #test2.a\
+        \n      TableScan: test projection=None\
+        \n      TableScan: test2 projection=None";
+
+        assert_optimized_plan_eq(&plan, expected);
+
+        Ok(())
+    }
+
+    #[test]
+    fn limit_should_not_push_down_with_offset_sub_query() -> Result<()> {
+        let table_scan_1 = test_table_scan_with_name("test1")?;
+        let table_scan_2 = test_table_scan_with_name("test2")?;
+
+        let subquery = LogicalPlanBuilder::from(table_scan_1)
+            .project(vec![col("a")])?
+            .filter(col("a").eq(col("test1.a")))?
+            .build()?;
+
+        let outer_query = LogicalPlanBuilder::from(table_scan_2)
+            .project(vec![col("a")])?
+            .filter(exists(Arc::new(subquery)))?
+            .limit(100)?
+            .offset(10)?
+            .build()?;
+
+        // Limit pushdown Not supported in sub_query
+        let expected = "Offset: 10\
+        \n  Limit: 110\
+        \n    Filter: EXISTS (Subquery: Filter: #test1.a = #test1.a\
+        \n  Projection: #test1.a\
+        \n    TableScan: test1 projection=None)\
+        \n      Projection: #test2.a\
+        \n        TableScan: test2 projection=None";
+
+        assert_optimized_plan_eq(&outer_query, expected);
+
+        Ok(())
+    }
 }
diff --git a/datafusion/core/src/sql/planner.rs b/datafusion/core/src/sql/planner.rs
index b4efc0868..9f96097ea 100644
--- a/datafusion/core/src/sql/planner.rs
+++ b/datafusion/core/src/sql/planner.rs
@@ -296,9 +296,10 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
 
         let plan = self.order_by(plan, query.order_by)?;
 
-        let plan: LogicalPlan = self.offset(plan, query.offset)?;
+        let plan: LogicalPlan = self.limit(plan, query.limit)?;
 
-        self.limit(plan, query.limit)
+        //make limit as offset's input will enable limit push down simply
+        self.offset(plan, query.offset)
     }
 
     fn set_expr_to_plan(
@@ -2646,6 +2647,9 @@ fn parse_sql_number(n: &str) -> Result<Expr> {
 #[cfg(test)]
 mod tests {
     use crate::datasource::empty::EmptyTable;
+    use crate::execution::context::ExecutionProps;
+    use crate::optimizer::limit_push_down::LimitPushDown;
+    use crate::optimizer::optimizer::OptimizerRule;
     use crate::{assert_contains, logical_plan::create_udf, sql::parser::DFParser};
     use datafusion_expr::{ScalarFunctionImplementation, Volatility};
 
@@ -4386,6 +4390,16 @@ mod tests {
         assert_eq!(format!("{:?}", plan), expected);
     }
 
+    fn quick_test_with_limit_pushdown(sql: &str, expected: &str) {
+        let plan = logical_plan(sql).unwrap();
+        let rule = LimitPushDown::new();
+        let optimized_plan = rule
+            .optimize(&plan, &ExecutionProps::new())
+            .expect("failed to optimize plan");
+        let formatted_plan = format!("{:?}", optimized_plan);
+        assert_eq!(formatted_plan, expected);
+    }
+
     struct MockContextProvider {}
 
     impl ContextProvider for MockContextProvider {
@@ -4834,10 +4848,10 @@ mod tests {
     }
 
     #[test]
-    fn test_offset_with_limit() {
+    fn test_zero_offset_with_limit() {
         let sql = "select id from person where person.id > 100 LIMIT 5 OFFSET 0;";
-        let expected = "Limit: 5\
-                                    \n  Offset: 0\
+        let expected = "Offset: 0\
+                                    \n  Limit: 5\
                                     \n    Projection: #person.id\
                                     \n      Filter: #person.id > Int64(100)\
                                     \n        TableScan: person projection=None";
@@ -4858,6 +4872,29 @@ mod tests {
         quick_test(sql, expected);
     }
 
+    #[test]
+    fn test_offset_after_limit_with_limit_push() {
+        let sql = "select id from person where person.id > 100 LIMIT 5 OFFSET 3;";
+        let expected = "Offset: 3\
+                                    \n  Limit: 8\
+                                    \n    Projection: #person.id\
+                                    \n      Filter: #person.id > Int64(100)\
+                                    \n        TableScan: person projection=None";
+
+        quick_test_with_limit_pushdown(sql, expected);
+    }
+
+    #[test]
+    fn test_offset_before_limit_with_limit_push() {
+        let sql = "select id from person where person.id > 100 OFFSET 3 LIMIT 5;";
+        let expected = "Offset: 3\
+                                    \n  Limit: 8\
+                                    \n    Projection: #person.id\
+                                    \n      Filter: #person.id > Int64(100)\
+                                    \n        TableScan: person projection=None";
+        quick_test_with_limit_pushdown(sql, expected);
+    }
+
     fn assert_field_not_found(err: DataFusionError, name: &str) {
         match err {
             DataFusionError::SchemaError { .. } => {