You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by li...@apache.org on 2022/12/02 03:12:51 UTC
[arrow-datafusion] branch master updated: Fix `push down limit` regression when meet SubqueryAlias (#4425)
This is an automated email from the ASF dual-hosted git repository.
liukun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 1aa645f8d Fix `push down limit` regression when meet SubqueryAlias (#4425)
1aa645f8d is described below
commit 1aa645f8d06a751c6e3dcc86b7e1f525bfcc1d9f
Author: jakevin <ja...@gmail.com>
AuthorDate: Fri Dec 2 11:12:46 2022 +0800
Fix `push down limit` regression when meet SubqueryAlias (#4425)
* support `SubqueryAlias` in `PushdownLimit`
* fix regression for push_down_limit meet subquery-alias
---
datafusion/core/src/datasource/view.rs | 3 +-
datafusion/optimizer/src/limit_push_down.rs | 47 +++++++++++++++++------------
2 files changed, 29 insertions(+), 21 deletions(-)
diff --git a/datafusion/core/src/datasource/view.rs b/datafusion/core/src/datasource/view.rs
index e5c27f894..5939c25e6 100644
--- a/datafusion/core/src/datasource/view.rs
+++ b/datafusion/core/src/datasource/view.rs
@@ -474,8 +474,7 @@ mod tests {
let formatted = arrow::util::pretty::pretty_format_batches(&plan)
.unwrap()
.to_string();
- // TODO: limit_push_down support SubqueryAlias
- assert!(formatted.contains("GlobalLimitExec: skip=0, fetch=10"));
+ assert!(formatted.contains("ParquetExec: limit=Some(10)"));
Ok(())
}
diff --git a/datafusion/optimizer/src/limit_push_down.rs b/datafusion/optimizer/src/limit_push_down.rs
index 8dcacb03d..e7104daff 100644
--- a/datafusion/optimizer/src/limit_push_down.rs
+++ b/datafusion/optimizer/src/limit_push_down.rs
@@ -20,9 +20,7 @@
use crate::{utils, OptimizerConfig, OptimizerRule};
use datafusion_common::Result;
use datafusion_expr::{
- logical_plan::{
- Join, JoinType, Limit, LogicalPlan, Projection, Sort, TableScan, Union,
- },
+ logical_plan::{Join, JoinType, Limit, LogicalPlan, Sort, TableScan, Union},
CrossJoin,
};
use std::sync::Arc;
@@ -123,7 +121,8 @@ impl OptimizerRule for LimitPushDown {
};
let skip = limit.skip;
- let plan = match &*limit.input {
+ let child_plan = &*limit.input;
+ let plan = match child_plan {
LogicalPlan::TableScan(scan) => {
let limit = if fetch != 0 { fetch + skip } else { 0 };
let new_input = LogicalPlan::TableScan(TableScan {
@@ -136,21 +135,6 @@ impl OptimizerRule for LimitPushDown {
});
plan.with_new_inputs(&[new_input])?
}
-
- LogicalPlan::Projection(projection) => {
- let new_input = LogicalPlan::Limit(Limit {
- skip,
- fetch: Some(fetch),
- input: Arc::new((*projection.input).clone()),
- });
- // Push down limit directly (projection doesn't change number of rows)
- LogicalPlan::Projection(Projection::try_new_with_schema(
- projection.expr.clone(),
- Arc::new(new_input),
- projection.schema.clone(),
- )?)
- }
-
LogicalPlan::Union(union) => {
let new_inputs = union
.inputs
@@ -230,6 +214,14 @@ impl OptimizerRule for LimitPushDown {
});
plan.with_new_inputs(&[new_sort])?
}
+ LogicalPlan::Projection(_) | LogicalPlan::SubqueryAlias(_) => {
+ // commute
+ let new_limit =
+ plan.with_new_inputs(&[
+ (*(child_plan.inputs().get(0).unwrap())).clone()
+ ])?;
+ child_plan.with_new_inputs(&[new_limit])?
+ }
_ => plan.clone(),
};
@@ -931,4 +923,21 @@ mod test {
assert_optimized_plan_eq(&plan, expected)
}
+
+ #[test]
+ fn push_down_subquery_alias() -> Result<()> {
+ let scan = test_table_scan()?;
+
+ let plan = LogicalPlanBuilder::from(scan)
+ .alias("a")?
+ .limit(0, Some(1))?
+ .limit(1000, None)?
+ .build()?;
+
+ let expected = "SubqueryAlias: a\
+ \n Limit: skip=1000, fetch=0\
+ \n TableScan: test, fetch=0";
+
+ assert_optimized_plan_eq(&plan, expected)
+ }
}