You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by li...@apache.org on 2022/12/02 03:12:51 UTC

[arrow-datafusion] branch master updated: Fix `push down limit` regression when meet SubqueryAlias (#4425)

This is an automated email from the ASF dual-hosted git repository.

liukun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 1aa645f8d Fix `push down limit` regression when meet SubqueryAlias (#4425)
1aa645f8d is described below

commit 1aa645f8d06a751c6e3dcc86b7e1f525bfcc1d9f
Author: jakevin <ja...@gmail.com>
AuthorDate: Fri Dec 2 11:12:46 2022 +0800

    Fix `push down limit` regression when meet SubqueryAlias (#4425)
    
    * support `SubqueryAlias` in `PushdownLimit`
    
    * fix regression for push_down_limit meet subquery-alias
---
 datafusion/core/src/datasource/view.rs      |  3 +-
 datafusion/optimizer/src/limit_push_down.rs | 47 +++++++++++++++++------------
 2 files changed, 29 insertions(+), 21 deletions(-)

diff --git a/datafusion/core/src/datasource/view.rs b/datafusion/core/src/datasource/view.rs
index e5c27f894..5939c25e6 100644
--- a/datafusion/core/src/datasource/view.rs
+++ b/datafusion/core/src/datasource/view.rs
@@ -474,8 +474,7 @@ mod tests {
         let formatted = arrow::util::pretty::pretty_format_batches(&plan)
             .unwrap()
             .to_string();
-        // TODO: limit_push_down support SubqueryAlias
-        assert!(formatted.contains("GlobalLimitExec: skip=0, fetch=10"));
+        assert!(formatted.contains("ParquetExec: limit=Some(10)"));
         Ok(())
     }
 
diff --git a/datafusion/optimizer/src/limit_push_down.rs b/datafusion/optimizer/src/limit_push_down.rs
index 8dcacb03d..e7104daff 100644
--- a/datafusion/optimizer/src/limit_push_down.rs
+++ b/datafusion/optimizer/src/limit_push_down.rs
@@ -20,9 +20,7 @@
 use crate::{utils, OptimizerConfig, OptimizerRule};
 use datafusion_common::Result;
 use datafusion_expr::{
-    logical_plan::{
-        Join, JoinType, Limit, LogicalPlan, Projection, Sort, TableScan, Union,
-    },
+    logical_plan::{Join, JoinType, Limit, LogicalPlan, Sort, TableScan, Union},
     CrossJoin,
 };
 use std::sync::Arc;
@@ -123,7 +121,8 @@ impl OptimizerRule for LimitPushDown {
         };
         let skip = limit.skip;
 
-        let plan = match &*limit.input {
+        let child_plan = &*limit.input;
+        let plan = match child_plan {
             LogicalPlan::TableScan(scan) => {
                 let limit = if fetch != 0 { fetch + skip } else { 0 };
                 let new_input = LogicalPlan::TableScan(TableScan {
@@ -136,21 +135,6 @@ impl OptimizerRule for LimitPushDown {
                 });
                 plan.with_new_inputs(&[new_input])?
             }
-
-            LogicalPlan::Projection(projection) => {
-                let new_input = LogicalPlan::Limit(Limit {
-                    skip,
-                    fetch: Some(fetch),
-                    input: Arc::new((*projection.input).clone()),
-                });
-                // Push down limit directly (projection doesn't change number of rows)
-                LogicalPlan::Projection(Projection::try_new_with_schema(
-                    projection.expr.clone(),
-                    Arc::new(new_input),
-                    projection.schema.clone(),
-                )?)
-            }
-
             LogicalPlan::Union(union) => {
                 let new_inputs = union
                     .inputs
@@ -230,6 +214,14 @@ impl OptimizerRule for LimitPushDown {
                 });
                 plan.with_new_inputs(&[new_sort])?
             }
+            LogicalPlan::Projection(_) | LogicalPlan::SubqueryAlias(_) => {
+                // commute
+                let new_limit =
+                    plan.with_new_inputs(&[
+                        (*(child_plan.inputs().get(0).unwrap())).clone()
+                    ])?;
+                child_plan.with_new_inputs(&[new_limit])?
+            }
             _ => plan.clone(),
         };
 
@@ -931,4 +923,21 @@ mod test {
 
         assert_optimized_plan_eq(&plan, expected)
     }
+
+    #[test]
+    fn push_down_subquery_alias() -> Result<()> {
+        let scan = test_table_scan()?;
+
+        let plan = LogicalPlanBuilder::from(scan)
+            .alias("a")?
+            .limit(0, Some(1))?
+            .limit(1000, None)?
+            .build()?;
+
+        let expected = "SubqueryAlias: a\
+        \n  Limit: skip=1000, fetch=0\
+        \n    TableScan: test, fetch=0";
+
+        assert_optimized_plan_eq(&plan, expected)
+    }
 }