You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2022/05/20 14:07:15 UTC
[arrow-datafusion] branch master updated: Support limit push down for offset_plan (#2566)
This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 753e6f84c Support limit push down for offset_plan (#2566)
753e6f84c is described below
commit 753e6f84c69a9e3acf4964b6faccfbe8ae95394b
Author: Yang Jiang <37...@users.noreply.github.com>
AuthorDate: Fri May 20 22:07:10 2022 +0800
Support limit push down for offset_plan (#2566)
* support offset push down
* change the limit and offset order
* add test for join and subquery
* fmt
* Apply suggestions from code review
Co-authored-by: Andy Grove <an...@gmail.com>
* add sql test in planner.rs
* add sql test in planner.rs
* fix clippy
Co-authored-by: Andy Grove <an...@gmail.com>
---
datafusion/core/src/optimizer/limit_push_down.rs | 210 ++++++++++++++++++++++-
datafusion/core/src/sql/planner.rs | 47 ++++-
2 files changed, 247 insertions(+), 10 deletions(-)
diff --git a/datafusion/core/src/optimizer/limit_push_down.rs b/datafusion/core/src/optimizer/limit_push_down.rs
index 0c68f1761..a52fd40df 100644
--- a/datafusion/core/src/optimizer/limit_push_down.rs
+++ b/datafusion/core/src/optimizer/limit_push_down.rs
@@ -24,6 +24,7 @@ use crate::logical_plan::plan::Projection;
use crate::logical_plan::{Limit, TableScan};
use crate::logical_plan::{LogicalPlan, Union};
use crate::optimizer::optimizer::OptimizerRule;
+use datafusion_expr::logical_plan::Offset;
use std::sync::Arc;
/// Optimization rule that tries pushes down LIMIT n
@@ -43,18 +44,24 @@ fn limit_push_down(
upper_limit: Option<usize>,
plan: &LogicalPlan,
_execution_props: &ExecutionProps,
+ is_offset: bool,
) -> Result<LogicalPlan> {
match (plan, upper_limit) {
(LogicalPlan::Limit(Limit { n, input }), upper_limit) => {
- let smallest = upper_limit.map(|x| std::cmp::min(x, *n)).unwrap_or(*n);
+ let new_limit: usize = if is_offset {
+ *n + upper_limit.unwrap_or(0)
+ } else {
+ upper_limit.map(|x| std::cmp::min(x, *n)).unwrap_or(*n)
+ };
Ok(LogicalPlan::Limit(Limit {
- n: smallest,
+ n: new_limit,
// push down limit to plan (minimum of upper limit and current limit)
input: Arc::new(limit_push_down(
_optimizer,
- Some(smallest),
+ Some(new_limit),
input.as_ref(),
_execution_props,
+ false,
)?),
}))
}
@@ -95,6 +102,7 @@ fn limit_push_down(
upper_limit,
input.as_ref(),
_execution_props,
+ false,
)?),
schema: schema.clone(),
alias: alias.clone(),
@@ -119,6 +127,7 @@ fn limit_push_down(
Some(upper_limit),
x,
_execution_props,
+ false,
)?),
}))
})
@@ -129,6 +138,25 @@ fn limit_push_down(
schema: schema.clone(),
}))
}
+ // offset 5 limit 10 then push limit 15 (5 + 10)
+ // Limit should always be Offset's input
+ (LogicalPlan::Offset(Offset { offset, input }), upper_limit) => {
+ let new_limit = if let Some(ul) = upper_limit {
+ ul + *offset
+ } else {
+ *offset
+ };
+ Ok(LogicalPlan::Offset(Offset {
+ offset: *offset,
+ input: Arc::new(limit_push_down(
+ _optimizer,
+ Some(new_limit),
+ input.as_ref(),
+ _execution_props,
+ true,
+ )?),
+ }))
+ }
// For other nodes we can't push down the limit
// But try to recurse and find other limit nodes to push down
_ => {
@@ -138,7 +166,9 @@ fn limit_push_down(
let inputs = plan.inputs();
let new_inputs = inputs
.iter()
- .map(|plan| limit_push_down(_optimizer, None, plan, _execution_props))
+ .map(|plan| {
+ limit_push_down(_optimizer, None, plan, _execution_props, false)
+ })
.collect::<Result<Vec<_>>>()?;
utils::from_plan(plan, &expr, &new_inputs)
@@ -152,7 +182,7 @@ impl OptimizerRule for LimitPushDown {
plan: &LogicalPlan,
execution_props: &ExecutionProps,
) -> Result<LogicalPlan> {
- limit_push_down(self, None, plan, execution_props)
+ limit_push_down(self, None, plan, execution_props, false)
}
fn name(&self) -> &str {
@@ -167,6 +197,8 @@ mod test {
logical_plan::{col, max, LogicalPlan, LogicalPlanBuilder},
test::*,
};
+ use datafusion_expr::exists;
+ use datafusion_expr::logical_plan::JoinType;
fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
let rule = LimitPushDown::new();
@@ -278,4 +310,172 @@ mod test {
Ok(())
}
+
+ #[test]
+ fn limit_pushdown_with_offset_projection_table_provider() -> Result<()> {
+ let table_scan = test_table_scan()?;
+
+ let plan = LogicalPlanBuilder::from(table_scan)
+ .project(vec![col("a")])?
+ .offset(10)?
+ .limit(1000)?
+ .build()?;
+
+ // Should push the limit down to table provider
+ // When it has a select
+ let expected = "Limit: 1000\
+ \n Offset: 10\
+ \n Projection: #test.a\
+ \n TableScan: test projection=None, limit=1010";
+
+ assert_optimized_plan_eq(&plan, expected);
+
+ Ok(())
+ }
+
+ #[test]
+ fn limit_pushdown_with_offset_after_limit() -> Result<()> {
+ let table_scan = test_table_scan()?;
+
+ let plan = LogicalPlanBuilder::from(table_scan)
+ .project(vec![col("a")])?
+ .limit(1000)?
+ .offset(10)?
+ .build()?;
+
+ let expected = "Offset: 10\
+ \n Limit: 1010\
+ \n Projection: #test.a\
+ \n TableScan: test projection=None, limit=1010";
+
+ assert_optimized_plan_eq(&plan, expected);
+
+ Ok(())
+ }
+
+ #[test]
+ fn limit_push_down_with_offset_take_smaller_limit() -> Result<()> {
+ let table_scan = test_table_scan()?;
+
+ let plan = LogicalPlanBuilder::from(table_scan)
+ .offset(10)?
+ .limit(1000)?
+ .limit(10)?
+ .build()?;
+
+ // Should push down the smallest limit
+ // Towards table scan
+ // This rule doesn't replace multiple limits
+ let expected = "Limit: 10\
+ \n Limit: 10\
+ \n Offset: 10\
+ \n TableScan: test projection=None, limit=20";
+
+ assert_optimized_plan_eq(&plan, expected);
+
+ Ok(())
+ }
+
+ #[test]
+ fn limit_doesnt_push_down_with_offset_aggregation() -> Result<()> {
+ let table_scan = test_table_scan()?;
+
+ let plan = LogicalPlanBuilder::from(table_scan)
+ .aggregate(vec![col("a")], vec![max(col("b"))])?
+ .offset(10)?
+ .limit(1000)?
+ .build()?;
+
+ // Limit should *not* push down aggregate node
+ let expected = "Limit: 1000\
+ \n Offset: 10\
+ \n Aggregate: groupBy=[[#test.a]], aggr=[[MAX(#test.b)]]\
+ \n TableScan: test projection=None";
+
+ assert_optimized_plan_eq(&plan, expected);
+
+ Ok(())
+ }
+
+ #[test]
+ fn limit_should_push_down_with_offset_union() -> Result<()> {
+ let table_scan = test_table_scan()?;
+
+ let plan = LogicalPlanBuilder::from(table_scan.clone())
+ .union(LogicalPlanBuilder::from(table_scan).build()?)?
+ .offset(10)?
+ .limit(1000)?
+ .build()?;
+
+ // Limit should push down through union
+ let expected = "Limit: 1000\
+ \n Offset: 10\
+ \n Union\
+ \n Limit: 1010\
+ \n TableScan: test projection=None, limit=1010\
+ \n Limit: 1010\
+ \n TableScan: test projection=None, limit=1010";
+
+ assert_optimized_plan_eq(&plan, expected);
+
+ Ok(())
+ }
+
+ #[test]
+ fn limit_should_not_push_down_with_offset_join() -> Result<()> {
+ let table_scan_1 = test_table_scan()?;
+ let table_scan_2 = test_table_scan_with_name("test2")?;
+
+ let plan = LogicalPlanBuilder::from(table_scan_1)
+ .join(
+ &LogicalPlanBuilder::from(table_scan_2).build()?,
+ JoinType::Left,
+ (vec!["a"], vec!["a"]),
+ )?
+ .limit(1000)?
+ .offset(10)?
+ .build()?;
+
+ // Limit pushdown Not supported in Join
+ let expected = "Offset: 10\
+ \n Limit: 1010\
+ \n Left Join: #test.a = #test2.a\
+ \n TableScan: test projection=None\
+ \n TableScan: test2 projection=None";
+
+ assert_optimized_plan_eq(&plan, expected);
+
+ Ok(())
+ }
+
+ #[test]
+ fn limit_should_not_push_down_with_offset_sub_query() -> Result<()> {
+ let table_scan_1 = test_table_scan_with_name("test1")?;
+ let table_scan_2 = test_table_scan_with_name("test2")?;
+
+ let subquery = LogicalPlanBuilder::from(table_scan_1)
+ .project(vec![col("a")])?
+ .filter(col("a").eq(col("test1.a")))?
+ .build()?;
+
+ let outer_query = LogicalPlanBuilder::from(table_scan_2)
+ .project(vec![col("a")])?
+ .filter(exists(Arc::new(subquery)))?
+ .limit(100)?
+ .offset(10)?
+ .build()?;
+
+ // Limit pushdown Not supported in sub_query
+ let expected = "Offset: 10\
+ \n Limit: 110\
+ \n Filter: EXISTS (Subquery: Filter: #test1.a = #test1.a\
+ \n Projection: #test1.a\
+ \n TableScan: test1 projection=None)\
+ \n Projection: #test2.a\
+ \n TableScan: test2 projection=None";
+
+ assert_optimized_plan_eq(&outer_query, expected);
+
+ Ok(())
+ }
}
diff --git a/datafusion/core/src/sql/planner.rs b/datafusion/core/src/sql/planner.rs
index b4efc0868..9f96097ea 100644
--- a/datafusion/core/src/sql/planner.rs
+++ b/datafusion/core/src/sql/planner.rs
@@ -296,9 +296,10 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
let plan = self.order_by(plan, query.order_by)?;
- let plan: LogicalPlan = self.offset(plan, query.offset)?;
+ let plan: LogicalPlan = self.limit(plan, query.limit)?;
- self.limit(plan, query.limit)
+ //make limit as offset's input will enable limit push down simply
+ self.offset(plan, query.offset)
}
fn set_expr_to_plan(
@@ -2646,6 +2647,9 @@ fn parse_sql_number(n: &str) -> Result<Expr> {
#[cfg(test)]
mod tests {
use crate::datasource::empty::EmptyTable;
+ use crate::execution::context::ExecutionProps;
+ use crate::optimizer::limit_push_down::LimitPushDown;
+ use crate::optimizer::optimizer::OptimizerRule;
use crate::{assert_contains, logical_plan::create_udf, sql::parser::DFParser};
use datafusion_expr::{ScalarFunctionImplementation, Volatility};
@@ -4386,6 +4390,16 @@ mod tests {
assert_eq!(format!("{:?}", plan), expected);
}
+ fn quick_test_with_limit_pushdown(sql: &str, expected: &str) {
+ let plan = logical_plan(sql).unwrap();
+ let rule = LimitPushDown::new();
+ let optimized_plan = rule
+ .optimize(&plan, &ExecutionProps::new())
+ .expect("failed to optimize plan");
+ let formatted_plan = format!("{:?}", optimized_plan);
+ assert_eq!(formatted_plan, expected);
+ }
+
struct MockContextProvider {}
impl ContextProvider for MockContextProvider {
@@ -4834,10 +4848,10 @@ mod tests {
}
#[test]
- fn test_offset_with_limit() {
+ fn test_zero_offset_with_limit() {
let sql = "select id from person where person.id > 100 LIMIT 5 OFFSET 0;";
- let expected = "Limit: 5\
- \n Offset: 0\
+ let expected = "Offset: 0\
+ \n Limit: 5\
\n Projection: #person.id\
\n Filter: #person.id > Int64(100)\
\n TableScan: person projection=None";
@@ -4858,6 +4872,29 @@ mod tests {
quick_test(sql, expected);
}
+ #[test]
+ fn test_offset_after_limit_with_limit_push() {
+ let sql = "select id from person where person.id > 100 LIMIT 5 OFFSET 3;";
+ let expected = "Offset: 3\
+ \n Limit: 8\
+ \n Projection: #person.id\
+ \n Filter: #person.id > Int64(100)\
+ \n TableScan: person projection=None";
+
+ quick_test_with_limit_pushdown(sql, expected);
+ }
+
+ #[test]
+ fn test_offset_before_limit_with_limit_push() {
+ let sql = "select id from person where person.id > 100 OFFSET 3 LIMIT 5;";
+ let expected = "Offset: 3\
+ \n Limit: 8\
+ \n Projection: #person.id\
+ \n Filter: #person.id > Int64(100)\
+ \n TableScan: person projection=None";
+ quick_test_with_limit_pushdown(sql, expected);
+ }
+
fn assert_field_not_found(err: DataFusionError, name: &str) {
match err {
DataFusionError::SchemaError { .. } => {