You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/06/01 20:14:04 UTC
[arrow-datafusion] branch master updated: fix OFFSET without LIMIT (#2638)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 45a975d56 fix OFFSET without LIMIT (#2638)
45a975d56 is described below
commit 45a975d56c3fdced0ad9c5a7c7e2eee4fb66da81
Author: Hu Ming <hu...@gmail.com>
AuthorDate: Thu Jun 2 04:13:59 2022 +0800
fix OFFSET without LIMIT (#2638)
---
datafusion/core/src/optimizer/limit_push_down.rs | 288 ++++++++++++++++++++---
datafusion/sql/src/planner.rs | 22 +-
2 files changed, 261 insertions(+), 49 deletions(-)
diff --git a/datafusion/core/src/optimizer/limit_push_down.rs b/datafusion/core/src/optimizer/limit_push_down.rs
index 990182c38..41fb7cc5f 100644
--- a/datafusion/core/src/optimizer/limit_push_down.rs
+++ b/datafusion/core/src/optimizer/limit_push_down.rs
@@ -28,7 +28,7 @@ use datafusion_expr::{
use std::sync::Arc;
/// Optimization rule that tries pushes down LIMIT n
-/// where applicable to reduce the amount of scanned / processed data
+/// where applicable to reduce the amount of scanned / processed data.
#[derive(Default)]
pub struct LimitPushDown {}
@@ -39,29 +39,67 @@ impl LimitPushDown {
}
}
+/// Ancestor indicates the current ancestor in the LogicalPlan tree
+/// when traversing down related to "limit push down".
+enum Ancestor {
+ /// Limit
+ FromLimit,
+ /// Offset
+ FromOffset,
+ /// Other nodes that don't affect the adjustment of "Limit"
+ NotRelevant,
+}
+
+///
+/// When doing limit push down with "offset" and "limit" during traversal,
+/// the "limit" should be adjusted.
+/// limit_push_down is a recursive function that tracks three important information
+/// to make the adjustment.
+///
+/// 1. ancestor: the kind of Ancestor.
+/// 2. ancestor_offset: ancestor's offset value
+/// 3. ancestor_limit: ancestor's limit value
+///
+/// (ancestor_offset, ancestor_limit) is updated in the following cases
+/// 1. Ancestor_Limit(n1) -> .. -> Current_Limit(n2)
+/// When the ancestor is a "Limit" and the current node is a "Limit",
+/// it is updated to (None, min(n1, n2))).
+/// 2. Ancestor_Limit(n1) -> .. -> Current_Offset(m1)
+/// it is updated to (m1, n1 + m1).
+/// 3. Ancestor_Offset(m1) -> .. -> Current_Offset(m2)
+/// it is updated to (m2, None).
+/// 4. Ancestor_Offset(m1) -> .. -> Current_Limit(n1)
+/// it is updated to (None, n1). Note that this won't happen when we
+/// generate the plan from SQL, it can happen when we build the plan
+/// using LogicalPlanBuilder.
fn limit_push_down(
_optimizer: &LimitPushDown,
- upper_limit: Option<usize>,
+ ancestor: Ancestor,
+ ancestor_offset: Option<usize>,
+ ancestor_limit: Option<usize>,
plan: &LogicalPlan,
_optimizer_config: &OptimizerConfig,
- is_offset: bool,
) -> Result<LogicalPlan> {
- match (plan, upper_limit) {
- (LogicalPlan::Limit(Limit { n, input }), upper_limit) => {
- let new_limit: usize = if is_offset {
- *n + upper_limit.unwrap_or(0)
- } else {
- upper_limit.map(|x| std::cmp::min(x, *n)).unwrap_or(*n)
+ match (plan, ancestor_limit) {
+ (LogicalPlan::Limit(Limit { n, input }), ancestor_limit) => {
+ let (new_ancestor_offset, new_ancestor_limit) = match ancestor {
+ Ancestor::FromLimit | Ancestor::FromOffset => (
+ None,
+ Some(ancestor_limit.map_or(*n, |x| std::cmp::min(x, *n))),
+ ),
+ Ancestor::NotRelevant => (None, Some(*n)),
};
+
Ok(LogicalPlan::Limit(Limit {
- n: new_limit,
+ n: new_ancestor_limit.unwrap_or(*n),
// push down limit to plan (minimum of upper limit and current limit)
input: Arc::new(limit_push_down(
_optimizer,
- Some(new_limit),
+ Ancestor::FromLimit,
+ new_ancestor_offset,
+ new_ancestor_limit,
input.as_ref(),
_optimizer_config,
- false,
)?),
}))
}
@@ -74,15 +112,15 @@ fn limit_push_down(
limit,
projected_schema,
}),
- Some(upper_limit),
+ Some(ancestor_limit),
) => Ok(LogicalPlan::TableScan(TableScan {
table_name: table_name.clone(),
source: source.clone(),
projection: projection.clone(),
filters: filters.clone(),
limit: limit
- .map(|x| std::cmp::min(x, upper_limit))
- .or(Some(upper_limit)),
+ .map(|x| std::cmp::min(x, ancestor_limit))
+ .or(Some(ancestor_limit)),
projected_schema: projected_schema.clone(),
})),
(
@@ -92,17 +130,18 @@ fn limit_push_down(
schema,
alias,
}),
- upper_limit,
+ ancestor_limit,
) => {
// Push down limit directly (projection doesn't change number of rows)
Ok(LogicalPlan::Projection(Projection {
expr: expr.clone(),
input: Arc::new(limit_push_down(
_optimizer,
- upper_limit,
+ ancestor,
+ ancestor_offset,
+ ancestor_limit,
input.as_ref(),
_optimizer_config,
- false,
)?),
schema: schema.clone(),
alias: alias.clone(),
@@ -114,20 +153,21 @@ fn limit_push_down(
alias,
schema,
}),
- Some(upper_limit),
+ Some(ancestor_limit),
) => {
// Push down limit through UNION
let new_inputs = inputs
.iter()
.map(|x| {
Ok(LogicalPlan::Limit(Limit {
- n: upper_limit,
+ n: ancestor_limit,
input: Arc::new(limit_push_down(
_optimizer,
- Some(upper_limit),
+ Ancestor::FromLimit,
+ None,
+ Some(ancestor_limit),
x,
_optimizer_config,
- false,
)?),
}))
})
@@ -139,21 +179,24 @@ fn limit_push_down(
}))
}
// offset 5 limit 10 then push limit 15 (5 + 10)
- // Limit should always be Offset's input
- (LogicalPlan::Offset(Offset { offset, input }), upper_limit) => {
- let new_limit = if let Some(ul) = upper_limit {
- ul + *offset
- } else {
- *offset
+ (LogicalPlan::Offset(Offset { offset, input }), ancestor_limit) => {
+ let (new_ancestor_offset, new_ancestor_limit) = match ancestor {
+ Ancestor::FromLimit => {
+ (Some(*offset), ancestor_limit.map(|x| x + *offset))
+ }
+ Ancestor::FromOffset => (Some(*offset), None),
+ Ancestor::NotRelevant => (Some(*offset), None),
};
+
Ok(LogicalPlan::Offset(Offset {
offset: *offset,
input: Arc::new(limit_push_down(
_optimizer,
- Some(new_limit),
+ Ancestor::FromOffset,
+ new_ancestor_offset,
+ new_ancestor_limit,
input.as_ref(),
_optimizer_config,
- true,
)?),
}))
}
@@ -208,17 +251,19 @@ fn generate_push_down_join(
return Ok(LogicalPlan::Join(Join {
left: Arc::new(limit_push_down(
_optimizer,
+ Ancestor::FromLimit,
+ None,
left_limit,
left.as_ref(),
_optimizer_config,
- true,
)?),
right: Arc::new(limit_push_down(
_optimizer,
+ Ancestor::FromLimit,
+ None,
right_limit,
right.as_ref(),
_optimizer_config,
- true,
)?),
on: on.clone(),
filter: filter.clone(),
@@ -246,7 +291,16 @@ fn push_down_children_limit(
let inputs = plan.inputs();
let new_inputs = inputs
.iter()
- .map(|plan| limit_push_down(_optimizer, None, plan, _optimizer_config, false))
+ .map(|plan| {
+ limit_push_down(
+ _optimizer,
+ Ancestor::NotRelevant,
+ None,
+ None,
+ plan,
+ _optimizer_config,
+ )
+ })
.collect::<Result<Vec<_>>>()?;
from_plan(plan, &expr, &new_inputs)
@@ -258,7 +312,14 @@ impl OptimizerRule for LimitPushDown {
plan: &LogicalPlan,
optimizer_config: &OptimizerConfig,
) -> Result<LogicalPlan> {
- limit_push_down(self, None, plan, optimizer_config, false)
+ limit_push_down(
+ self,
+ Ancestor::NotRelevant,
+ None,
+ None,
+ plan,
+ optimizer_config,
+ )
}
fn name(&self) -> &str {
@@ -387,6 +448,20 @@ mod test {
Ok(())
}
+ #[test]
+ fn limit_pushdown_should_not_pushdown_limit_with_offset_only() -> Result<()> {
+ let table_scan = test_table_scan()?;
+ let plan = LogicalPlanBuilder::from(table_scan).offset(10)?.build()?;
+
+ // Should not push any limit down to table provider
+ // When it has a select
+ let expected = "Offset: 10\
+ \n TableScan: test projection=None";
+
+ assert_optimized_plan_eq(&plan, expected);
+ Ok(())
+ }
+
#[test]
fn limit_pushdown_with_offset_projection_table_provider() -> Result<()> {
let table_scan = test_table_scan()?;
@@ -420,7 +495,27 @@ mod test {
.build()?;
let expected = "Offset: 10\
- \n Limit: 1010\
+ \n Limit: 1000\
+ \n Projection: #test.a\
+ \n TableScan: test projection=None, limit=1000";
+
+ assert_optimized_plan_eq(&plan, expected);
+
+ Ok(())
+ }
+
+ #[test]
+ fn limit_pushdown_with_limit_after_offset() -> Result<()> {
+ let table_scan = test_table_scan()?;
+
+ let plan = LogicalPlanBuilder::from(table_scan)
+ .project(vec![col("a")])?
+ .offset(10)?
+ .limit(1000)?
+ .build()?;
+
+ let expected = "Limit: 1000\
+ \n Offset: 10\
\n Projection: #test.a\
\n TableScan: test projection=None, limit=1010";
@@ -498,7 +593,7 @@ mod test {
}
#[test]
- fn limit_should_not_push_down_with_offset_join() -> Result<()> {
+ fn limit_offset_should_not_push_down_with_offset_join() -> Result<()> {
let table_scan_1 = test_table_scan()?;
let table_scan_2 = test_table_scan_with_name("test2")?;
@@ -515,7 +610,35 @@ mod test {
// Limit pushdown Not supported in Join
let expected = "Offset: 10\
- \n Limit: 1010\
+ \n Limit: 1000\
+ \n Inner Join: #test.a = #test2.a\
+ \n TableScan: test projection=None\
+ \n TableScan: test2 projection=None";
+
+ assert_optimized_plan_eq(&plan, expected);
+
+ Ok(())
+ }
+
+ #[test]
+ fn offset_limit_should_not_push_down_with_offset_join() -> Result<()> {
+ let table_scan_1 = test_table_scan()?;
+ let table_scan_2 = test_table_scan_with_name("test2")?;
+
+ let plan = LogicalPlanBuilder::from(table_scan_1)
+ .join(
+ &LogicalPlanBuilder::from(table_scan_2).build()?,
+ JoinType::Inner,
+ (vec!["a"], vec!["a"]),
+ None,
+ )?
+ .offset(10)?
+ .limit(1000)?
+ .build()?;
+
+ // Limit pushdown Not supported in Join
+ let expected = "Limit: 1000\
+ \n Offset: 10\
\n Inner Join: #test.a = #test2.a\
\n TableScan: test projection=None\
\n TableScan: test2 projection=None";
@@ -526,7 +649,7 @@ mod test {
}
#[test]
- fn limit_should_not_push_down_with_offset_sub_query() -> Result<()> {
+ fn limit_offset_should_not_push_down_with_offset_sub_query() -> Result<()> {
let table_scan_1 = test_table_scan_with_name("test1")?;
let table_scan_2 = test_table_scan_with_name("test2")?;
@@ -544,7 +667,38 @@ mod test {
// Limit pushdown Not supported in sub_query
let expected = "Offset: 10\
- \n Limit: 110\
+ \n Limit: 100\
+ \n Filter: EXISTS (Subquery: Filter: #test1.a = #test1.a\
+ \n Projection: #test1.a\
+ \n TableScan: test1 projection=None)\
+ \n Projection: #test2.a\
+ \n TableScan: test2 projection=None";
+
+ assert_optimized_plan_eq(&outer_query, expected);
+
+ Ok(())
+ }
+
+ #[test]
+ fn offset_limit_should_not_push_down_with_offset_sub_query() -> Result<()> {
+ let table_scan_1 = test_table_scan_with_name("test1")?;
+ let table_scan_2 = test_table_scan_with_name("test2")?;
+
+ let subquery = LogicalPlanBuilder::from(table_scan_1)
+ .project(vec![col("a")])?
+ .filter(col("a").eq(col("test1.a")))?
+ .build()?;
+
+ let outer_query = LogicalPlanBuilder::from(table_scan_2)
+ .project(vec![col("a")])?
+ .filter(exists(Arc::new(subquery)))?
+ .offset(10)?
+ .limit(100)?
+ .build()?;
+
+ // Limit pushdown Not supported in sub_query
+ let expected = "Limit: 100\
+ \n Offset: 10\
\n Filter: EXISTS (Subquery: Filter: #test1.a = #test1.a\
\n Projection: #test1.a\
\n TableScan: test1 projection=None)\
@@ -582,6 +736,34 @@ mod test {
Ok(())
}
+ #[test]
+ fn limit_should_push_down_left_outer_join_with_offset() -> Result<()> {
+ let table_scan_1 = test_table_scan()?;
+ let table_scan_2 = test_table_scan_with_name("test2")?;
+
+ let plan = LogicalPlanBuilder::from(table_scan_1)
+ .join(
+ &LogicalPlanBuilder::from(table_scan_2).build()?,
+ JoinType::Left,
+ (vec!["a"], vec!["a"]),
+ None,
+ )?
+ .offset(10)?
+ .limit(1000)?
+ .build()?;
+
+ // Limit pushdown Not supported in Join
+ let expected = "Limit: 1000\
+ \n Offset: 10\
+ \n Left Join: #test.a = #test2.a\
+ \n TableScan: test projection=None, limit=1010\
+ \n TableScan: test2 projection=None";
+
+ assert_optimized_plan_eq(&plan, expected);
+
+ Ok(())
+ }
+
#[test]
fn limit_should_push_down_right_outer_join() -> Result<()> {
let table_scan_1 = test_table_scan()?;
@@ -607,4 +789,32 @@ mod test {
Ok(())
}
+
+ #[test]
+ fn limit_should_push_down_right_outer_join_with_offset() -> Result<()> {
+ let table_scan_1 = test_table_scan()?;
+ let table_scan_2 = test_table_scan_with_name("test2")?;
+
+ let plan = LogicalPlanBuilder::from(table_scan_1)
+ .join(
+ &LogicalPlanBuilder::from(table_scan_2).build()?,
+ JoinType::Right,
+ (vec!["a"], vec!["a"]),
+ None,
+ )?
+ .offset(10)?
+ .limit(1000)?
+ .build()?;
+
+ // Limit pushdown with offset supported in right outer join
+ let expected = "Limit: 1000\
+ \n Offset: 10\
+ \n Right Join: #test.a = #test2.a\
+ \n TableScan: test projection=None\
+ \n TableScan: test2 projection=None, limit=1010";
+
+ assert_optimized_plan_eq(&plan, expected);
+
+ Ok(())
+ }
}
diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs
index b21550567..238cf7dbe 100644
--- a/datafusion/sql/src/planner.rs
+++ b/datafusion/sql/src/planner.rs
@@ -297,10 +297,12 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
let plan = self.order_by(plan, query.order_by)?;
- let plan: LogicalPlan = self.limit(plan, query.limit)?;
-
- //make limit as offset's input will enable limit push down simply
- self.offset(plan, query.offset)
+ // Offset is the parent of Limit.
+ // If both OFFSET and LIMIT appear,
+ // then OFFSET rows are skipped before starting to count the LIMIT rows that are returned.
+ // see https://www.postgresql.org/docs/current/queries-limit.html
+ let plan = self.offset(plan, query.offset)?;
+ self.limit(plan, query.limit)
}
fn set_expr_to_plan(
@@ -4878,8 +4880,8 @@ mod tests {
#[test]
fn test_zero_offset_with_limit() {
let sql = "select id from person where person.id > 100 LIMIT 5 OFFSET 0;";
- let expected = "Offset: 0\
- \n Limit: 5\
+ let expected = "Limit: 5\
+ \n Offset: 0\
\n Projection: #person.id\
\n Filter: #person.id > Int64(100)\
\n TableScan: person projection=None";
@@ -4903,8 +4905,8 @@ mod tests {
#[test]
fn test_offset_after_limit() {
let sql = "select id from person where person.id > 100 LIMIT 5 OFFSET 3;";
- let expected = "Offset: 3\
- \n Limit: 5\
+ let expected = "Limit: 5\
+ \n Offset: 3\
\n Projection: #person.id\
\n Filter: #person.id > Int64(100)\
\n TableScan: person projection=None";
@@ -4914,8 +4916,8 @@ mod tests {
#[test]
fn test_offset_before_limit() {
let sql = "select id from person where person.id > 100 OFFSET 3 LIMIT 5;";
- let expected = "Offset: 3\
- \n Limit: 5\
+ let expected = "Limit: 5\
+ \n Offset: 3\
\n Projection: #person.id\
\n Filter: #person.id > Int64(100)\
\n TableScan: person projection=None";