You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/06/01 20:14:04 UTC

[arrow-datafusion] branch master updated: fix OFFSET without LIMIT (#2638)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 45a975d56 fix OFFSET without LIMIT (#2638)
45a975d56 is described below

commit 45a975d56c3fdced0ad9c5a7c7e2eee4fb66da81
Author: Hu Ming <hu...@gmail.com>
AuthorDate: Thu Jun 2 04:13:59 2022 +0800

    fix OFFSET without LIMIT (#2638)
---
 datafusion/core/src/optimizer/limit_push_down.rs | 288 ++++++++++++++++++++---
 datafusion/sql/src/planner.rs                    |  22 +-
 2 files changed, 261 insertions(+), 49 deletions(-)

diff --git a/datafusion/core/src/optimizer/limit_push_down.rs b/datafusion/core/src/optimizer/limit_push_down.rs
index 990182c38..41fb7cc5f 100644
--- a/datafusion/core/src/optimizer/limit_push_down.rs
+++ b/datafusion/core/src/optimizer/limit_push_down.rs
@@ -28,7 +28,7 @@ use datafusion_expr::{
 use std::sync::Arc;
 
 /// Optimization rule that tries pushes down LIMIT n
-/// where applicable to reduce the amount of scanned / processed data
+/// where applicable to reduce the amount of scanned / processed data.
 #[derive(Default)]
 pub struct LimitPushDown {}
 
@@ -39,29 +39,67 @@ impl LimitPushDown {
     }
 }
 
+/// Ancestor indicates the current ancestor in the LogicalPlan tree
+/// when traversing down related to "limit push down".
+enum Ancestor {
+    /// Limit
+    FromLimit,
+    /// Offset
+    FromOffset,
+    /// Other nodes that don't affect the adjustment of "Limit"
+    NotRelevant,
+}
+
+///
+/// When doing limit push down with "offset" and "limit" during traversal,
+/// the "limit" should be adjusted.
+/// limit_push_down is a recursive function that tracks three important information
+/// to make the adjustment.
+///
+/// 1. ancestor: the kind of Ancestor.
+/// 2. ancestor_offset: ancestor's offset value
+/// 3. ancestor_limit: ancestor's limit value
+///
+/// (ancestor_offset, ancestor_limit) is updated in the following cases
+/// 1. Ancestor_Limit(n1) -> .. -> Current_Limit(n2)
+///    When the ancestor is a "Limit" and the current node is a "Limit",
+///    it is updated to (None, min(n1, n2))).
+/// 2. Ancestor_Limit(n1) -> .. -> Current_Offset(m1)
+///    it is updated to (m1, n1 + m1).
+/// 3. Ancestor_Offset(m1) -> .. -> Current_Offset(m2)
+///    it is updated to (m2, None).
+/// 4. Ancestor_Offset(m1) -> .. -> Current_Limit(n1)
+///    it is updated to (None, n1). Note that this won't happen when we
+///    generate the plan from SQL, it can happen when we build the plan
+///    using LogicalPlanBuilder.
 fn limit_push_down(
     _optimizer: &LimitPushDown,
-    upper_limit: Option<usize>,
+    ancestor: Ancestor,
+    ancestor_offset: Option<usize>,
+    ancestor_limit: Option<usize>,
     plan: &LogicalPlan,
     _optimizer_config: &OptimizerConfig,
-    is_offset: bool,
 ) -> Result<LogicalPlan> {
-    match (plan, upper_limit) {
-        (LogicalPlan::Limit(Limit { n, input }), upper_limit) => {
-            let new_limit: usize = if is_offset {
-                *n + upper_limit.unwrap_or(0)
-            } else {
-                upper_limit.map(|x| std::cmp::min(x, *n)).unwrap_or(*n)
+    match (plan, ancestor_limit) {
+        (LogicalPlan::Limit(Limit { n, input }), ancestor_limit) => {
+            let (new_ancestor_offset, new_ancestor_limit) = match ancestor {
+                Ancestor::FromLimit | Ancestor::FromOffset => (
+                    None,
+                    Some(ancestor_limit.map_or(*n, |x| std::cmp::min(x, *n))),
+                ),
+                Ancestor::NotRelevant => (None, Some(*n)),
             };
+
             Ok(LogicalPlan::Limit(Limit {
-                n: new_limit,
+                n: new_ancestor_limit.unwrap_or(*n),
                 // push down limit to plan (minimum of upper limit and current limit)
                 input: Arc::new(limit_push_down(
                     _optimizer,
-                    Some(new_limit),
+                    Ancestor::FromLimit,
+                    new_ancestor_offset,
+                    new_ancestor_limit,
                     input.as_ref(),
                     _optimizer_config,
-                    false,
                 )?),
             }))
         }
@@ -74,15 +112,15 @@ fn limit_push_down(
                 limit,
                 projected_schema,
             }),
-            Some(upper_limit),
+            Some(ancestor_limit),
         ) => Ok(LogicalPlan::TableScan(TableScan {
             table_name: table_name.clone(),
             source: source.clone(),
             projection: projection.clone(),
             filters: filters.clone(),
             limit: limit
-                .map(|x| std::cmp::min(x, upper_limit))
-                .or(Some(upper_limit)),
+                .map(|x| std::cmp::min(x, ancestor_limit))
+                .or(Some(ancestor_limit)),
             projected_schema: projected_schema.clone(),
         })),
         (
@@ -92,17 +130,18 @@ fn limit_push_down(
                 schema,
                 alias,
             }),
-            upper_limit,
+            ancestor_limit,
         ) => {
             // Push down limit directly (projection doesn't change number of rows)
             Ok(LogicalPlan::Projection(Projection {
                 expr: expr.clone(),
                 input: Arc::new(limit_push_down(
                     _optimizer,
-                    upper_limit,
+                    ancestor,
+                    ancestor_offset,
+                    ancestor_limit,
                     input.as_ref(),
                     _optimizer_config,
-                    false,
                 )?),
                 schema: schema.clone(),
                 alias: alias.clone(),
@@ -114,20 +153,21 @@ fn limit_push_down(
                 alias,
                 schema,
             }),
-            Some(upper_limit),
+            Some(ancestor_limit),
         ) => {
             // Push down limit through UNION
             let new_inputs = inputs
                 .iter()
                 .map(|x| {
                     Ok(LogicalPlan::Limit(Limit {
-                        n: upper_limit,
+                        n: ancestor_limit,
                         input: Arc::new(limit_push_down(
                             _optimizer,
-                            Some(upper_limit),
+                            Ancestor::FromLimit,
+                            None,
+                            Some(ancestor_limit),
                             x,
                             _optimizer_config,
-                            false,
                         )?),
                     }))
                 })
@@ -139,21 +179,24 @@ fn limit_push_down(
             }))
         }
         // offset 5 limit 10 then push limit 15 (5 + 10)
-        // Limit should always be Offset's input
-        (LogicalPlan::Offset(Offset { offset, input }), upper_limit) => {
-            let new_limit = if let Some(ul) = upper_limit {
-                ul + *offset
-            } else {
-                *offset
+        (LogicalPlan::Offset(Offset { offset, input }), ancestor_limit) => {
+            let (new_ancestor_offset, new_ancestor_limit) = match ancestor {
+                Ancestor::FromLimit => {
+                    (Some(*offset), ancestor_limit.map(|x| x + *offset))
+                }
+                Ancestor::FromOffset => (Some(*offset), None),
+                Ancestor::NotRelevant => (Some(*offset), None),
             };
+
             Ok(LogicalPlan::Offset(Offset {
                 offset: *offset,
                 input: Arc::new(limit_push_down(
                     _optimizer,
-                    Some(new_limit),
+                    Ancestor::FromOffset,
+                    new_ancestor_offset,
+                    new_ancestor_limit,
                     input.as_ref(),
                     _optimizer_config,
-                    true,
                 )?),
             }))
         }
@@ -208,17 +251,19 @@ fn generate_push_down_join(
         return Ok(LogicalPlan::Join(Join {
             left: Arc::new(limit_push_down(
                 _optimizer,
+                Ancestor::FromLimit,
+                None,
                 left_limit,
                 left.as_ref(),
                 _optimizer_config,
-                true,
             )?),
             right: Arc::new(limit_push_down(
                 _optimizer,
+                Ancestor::FromLimit,
+                None,
                 right_limit,
                 right.as_ref(),
                 _optimizer_config,
-                true,
             )?),
             on: on.clone(),
             filter: filter.clone(),
@@ -246,7 +291,16 @@ fn push_down_children_limit(
     let inputs = plan.inputs();
     let new_inputs = inputs
         .iter()
-        .map(|plan| limit_push_down(_optimizer, None, plan, _optimizer_config, false))
+        .map(|plan| {
+            limit_push_down(
+                _optimizer,
+                Ancestor::NotRelevant,
+                None,
+                None,
+                plan,
+                _optimizer_config,
+            )
+        })
         .collect::<Result<Vec<_>>>()?;
 
     from_plan(plan, &expr, &new_inputs)
@@ -258,7 +312,14 @@ impl OptimizerRule for LimitPushDown {
         plan: &LogicalPlan,
         optimizer_config: &OptimizerConfig,
     ) -> Result<LogicalPlan> {
-        limit_push_down(self, None, plan, optimizer_config, false)
+        limit_push_down(
+            self,
+            Ancestor::NotRelevant,
+            None,
+            None,
+            plan,
+            optimizer_config,
+        )
     }
 
     fn name(&self) -> &str {
@@ -387,6 +448,20 @@ mod test {
         Ok(())
     }
 
+    #[test]
+    fn limit_pushdown_should_not_pushdown_limit_with_offset_only() -> Result<()> {
+        let table_scan = test_table_scan()?;
+        let plan = LogicalPlanBuilder::from(table_scan).offset(10)?.build()?;
+
+        // Should not push any limit down to table provider
+        // When it has a select
+        let expected = "Offset: 10\
+        \n  TableScan: test projection=None";
+
+        assert_optimized_plan_eq(&plan, expected);
+        Ok(())
+    }
+
     #[test]
     fn limit_pushdown_with_offset_projection_table_provider() -> Result<()> {
         let table_scan = test_table_scan()?;
@@ -420,7 +495,27 @@ mod test {
             .build()?;
 
         let expected = "Offset: 10\
-        \n  Limit: 1010\
+        \n  Limit: 1000\
+        \n    Projection: #test.a\
+        \n      TableScan: test projection=None, limit=1000";
+
+        assert_optimized_plan_eq(&plan, expected);
+
+        Ok(())
+    }
+
+    #[test]
+    fn limit_pushdown_with_limit_after_offset() -> Result<()> {
+        let table_scan = test_table_scan()?;
+
+        let plan = LogicalPlanBuilder::from(table_scan)
+            .project(vec![col("a")])?
+            .offset(10)?
+            .limit(1000)?
+            .build()?;
+
+        let expected = "Limit: 1000\
+        \n  Offset: 10\
         \n    Projection: #test.a\
         \n      TableScan: test projection=None, limit=1010";
 
@@ -498,7 +593,7 @@ mod test {
     }
 
     #[test]
-    fn limit_should_not_push_down_with_offset_join() -> Result<()> {
+    fn limit_offset_should_not_push_down_with_offset_join() -> Result<()> {
         let table_scan_1 = test_table_scan()?;
         let table_scan_2 = test_table_scan_with_name("test2")?;
 
@@ -515,7 +610,35 @@ mod test {
 
         // Limit pushdown Not supported in Join
         let expected = "Offset: 10\
-        \n  Limit: 1010\
+        \n  Limit: 1000\
+        \n    Inner Join: #test.a = #test2.a\
+        \n      TableScan: test projection=None\
+        \n      TableScan: test2 projection=None";
+
+        assert_optimized_plan_eq(&plan, expected);
+
+        Ok(())
+    }
+
+    #[test]
+    fn offset_limit_should_not_push_down_with_offset_join() -> Result<()> {
+        let table_scan_1 = test_table_scan()?;
+        let table_scan_2 = test_table_scan_with_name("test2")?;
+
+        let plan = LogicalPlanBuilder::from(table_scan_1)
+            .join(
+                &LogicalPlanBuilder::from(table_scan_2).build()?,
+                JoinType::Inner,
+                (vec!["a"], vec!["a"]),
+                None,
+            )?
+            .offset(10)?
+            .limit(1000)?
+            .build()?;
+
+        // Limit pushdown Not supported in Join
+        let expected = "Limit: 1000\
+        \n  Offset: 10\
         \n    Inner Join: #test.a = #test2.a\
         \n      TableScan: test projection=None\
         \n      TableScan: test2 projection=None";
@@ -526,7 +649,7 @@ mod test {
     }
 
     #[test]
-    fn limit_should_not_push_down_with_offset_sub_query() -> Result<()> {
+    fn limit_offset_should_not_push_down_with_offset_sub_query() -> Result<()> {
         let table_scan_1 = test_table_scan_with_name("test1")?;
         let table_scan_2 = test_table_scan_with_name("test2")?;
 
@@ -544,7 +667,38 @@ mod test {
 
         // Limit pushdown Not supported in sub_query
         let expected = "Offset: 10\
-        \n  Limit: 110\
+        \n  Limit: 100\
+        \n    Filter: EXISTS (Subquery: Filter: #test1.a = #test1.a\
+        \n  Projection: #test1.a\
+        \n    TableScan: test1 projection=None)\
+        \n      Projection: #test2.a\
+        \n        TableScan: test2 projection=None";
+
+        assert_optimized_plan_eq(&outer_query, expected);
+
+        Ok(())
+    }
+
+    #[test]
+    fn offset_limit_should_not_push_down_with_offset_sub_query() -> Result<()> {
+        let table_scan_1 = test_table_scan_with_name("test1")?;
+        let table_scan_2 = test_table_scan_with_name("test2")?;
+
+        let subquery = LogicalPlanBuilder::from(table_scan_1)
+            .project(vec![col("a")])?
+            .filter(col("a").eq(col("test1.a")))?
+            .build()?;
+
+        let outer_query = LogicalPlanBuilder::from(table_scan_2)
+            .project(vec![col("a")])?
+            .filter(exists(Arc::new(subquery)))?
+            .offset(10)?
+            .limit(100)?
+            .build()?;
+
+        // Limit pushdown Not supported in sub_query
+        let expected = "Limit: 100\
+        \n  Offset: 10\
         \n    Filter: EXISTS (Subquery: Filter: #test1.a = #test1.a\
         \n  Projection: #test1.a\
         \n    TableScan: test1 projection=None)\
@@ -582,6 +736,34 @@ mod test {
         Ok(())
     }
 
+    #[test]
+    fn limit_should_push_down_left_outer_join_with_offset() -> Result<()> {
+        let table_scan_1 = test_table_scan()?;
+        let table_scan_2 = test_table_scan_with_name("test2")?;
+
+        let plan = LogicalPlanBuilder::from(table_scan_1)
+            .join(
+                &LogicalPlanBuilder::from(table_scan_2).build()?,
+                JoinType::Left,
+                (vec!["a"], vec!["a"]),
+                None,
+            )?
+            .offset(10)?
+            .limit(1000)?
+            .build()?;
+
+        // Limit pushdown Not supported in Join
+        let expected = "Limit: 1000\
+        \n  Offset: 10\
+        \n    Left Join: #test.a = #test2.a\
+        \n      TableScan: test projection=None, limit=1010\
+        \n      TableScan: test2 projection=None";
+
+        assert_optimized_plan_eq(&plan, expected);
+
+        Ok(())
+    }
+
     #[test]
     fn limit_should_push_down_right_outer_join() -> Result<()> {
         let table_scan_1 = test_table_scan()?;
@@ -607,4 +789,32 @@ mod test {
 
         Ok(())
     }
+
+    #[test]
+    fn limit_should_push_down_right_outer_join_with_offset() -> Result<()> {
+        let table_scan_1 = test_table_scan()?;
+        let table_scan_2 = test_table_scan_with_name("test2")?;
+
+        let plan = LogicalPlanBuilder::from(table_scan_1)
+            .join(
+                &LogicalPlanBuilder::from(table_scan_2).build()?,
+                JoinType::Right,
+                (vec!["a"], vec!["a"]),
+                None,
+            )?
+            .offset(10)?
+            .limit(1000)?
+            .build()?;
+
+        // Limit pushdown with offset supported in right outer join
+        let expected = "Limit: 1000\
+        \n  Offset: 10\
+        \n    Right Join: #test.a = #test2.a\
+        \n      TableScan: test projection=None\
+        \n      TableScan: test2 projection=None, limit=1010";
+
+        assert_optimized_plan_eq(&plan, expected);
+
+        Ok(())
+    }
 }
diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs
index b21550567..238cf7dbe 100644
--- a/datafusion/sql/src/planner.rs
+++ b/datafusion/sql/src/planner.rs
@@ -297,10 +297,12 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
 
         let plan = self.order_by(plan, query.order_by)?;
 
-        let plan: LogicalPlan = self.limit(plan, query.limit)?;
-
-        //make limit as offset's input will enable limit push down simply
-        self.offset(plan, query.offset)
+        // Offset is the parent of Limit.
+        // If both OFFSET and LIMIT appear,
+        // then OFFSET rows are skipped before starting to count the LIMIT rows that are returned.
+        // see https://www.postgresql.org/docs/current/queries-limit.html
+        let plan = self.offset(plan, query.offset)?;
+        self.limit(plan, query.limit)
     }
 
     fn set_expr_to_plan(
@@ -4878,8 +4880,8 @@ mod tests {
     #[test]
     fn test_zero_offset_with_limit() {
         let sql = "select id from person where person.id > 100 LIMIT 5 OFFSET 0;";
-        let expected = "Offset: 0\
-                                    \n  Limit: 5\
+        let expected = "Limit: 5\
+                                    \n  Offset: 0\
                                     \n    Projection: #person.id\
                                     \n      Filter: #person.id > Int64(100)\
                                     \n        TableScan: person projection=None";
@@ -4903,8 +4905,8 @@ mod tests {
     #[test]
     fn test_offset_after_limit() {
         let sql = "select id from person where person.id > 100 LIMIT 5 OFFSET 3;";
-        let expected = "Offset: 3\
-        \n  Limit: 5\
+        let expected = "Limit: 5\
+        \n  Offset: 3\
         \n    Projection: #person.id\
         \n      Filter: #person.id > Int64(100)\
         \n        TableScan: person projection=None";
@@ -4914,8 +4916,8 @@ mod tests {
     #[test]
     fn test_offset_before_limit() {
         let sql = "select id from person where person.id > 100 OFFSET 3 LIMIT 5;";
-        let expected = "Offset: 3\
-        \n  Limit: 5\
+        let expected = "Limit: 5\
+        \n  Offset: 3\
         \n    Projection: #person.id\
         \n      Filter: #person.id > Int64(100)\
         \n        TableScan: person projection=None";