You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/05/23 18:39:58 UTC
[arrow-datafusion] branch master updated: merge conflict (#2596)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 43575e1e4 merge conflict (#2596)
43575e1e4 is described below
commit 43575e1e4bdeb23320ed7a1119a4a6a5e192254b
Author: Yang Jiang <37...@users.noreply.github.com>
AuthorDate: Tue May 24 02:39:53 2022 +0800
merge conflict (#2596)
---
datafusion/core/src/optimizer/limit_push_down.rs | 157 ++++++++++++++++++++---
1 file changed, 141 insertions(+), 16 deletions(-)
diff --git a/datafusion/core/src/optimizer/limit_push_down.rs b/datafusion/core/src/optimizer/limit_push_down.rs
index 087ac53c8..31748d326 100644
--- a/datafusion/core/src/optimizer/limit_push_down.rs
+++ b/datafusion/core/src/optimizer/limit_push_down.rs
@@ -23,7 +23,8 @@ use crate::logical_plan::plan::Projection;
use crate::logical_plan::{Limit, TableScan};
use crate::logical_plan::{LogicalPlan, Union};
use crate::optimizer::optimizer::OptimizerRule;
-use datafusion_expr::logical_plan::Offset;
+use datafusion_common::DataFusionError;
+use datafusion_expr::logical_plan::{Join, JoinType, Offset};
use datafusion_expr::utils::from_plan;
use std::sync::Arc;
@@ -157,25 +158,99 @@ fn limit_push_down(
)?),
}))
}
+ (LogicalPlan::Join(Join { join_type, .. }), upper_limit) => match join_type {
+ JoinType::Left => {
+ //if LeftOuter join push limit to left
+ generate_push_down_join(
+ _optimizer,
+ _execution_props,
+ plan,
+ upper_limit,
+ None,
+ )
+ }
+ JoinType::Right =>
+ // If RightOuter join push limit to right
+ {
+ generate_push_down_join(
+ _optimizer,
+ _execution_props,
+ plan,
+ None,
+ upper_limit,
+ )
+ }
+ _ => generate_push_down_join(_optimizer, _execution_props, plan, None, None),
+ },
// For other nodes we can't push down the limit
// But try to recurse and find other limit nodes to push down
- _ => {
- let expr = plan.expressions();
-
- // apply the optimization to all inputs of the plan
- let inputs = plan.inputs();
- let new_inputs = inputs
- .iter()
- .map(|plan| {
- limit_push_down(_optimizer, None, plan, _execution_props, false)
- })
- .collect::<Result<Vec<_>>>()?;
+ _ => push_down_children_limit(_optimizer, _execution_props, plan),
+ }
+}
- from_plan(plan, &expr, &new_inputs)
- }
+fn generate_push_down_join(
+ _optimizer: &LimitPushDown,
+ _execution_props: &ExecutionProps,
+ join: &LogicalPlan,
+ left_limit: Option<usize>,
+ right_limit: Option<usize>,
+) -> Result<LogicalPlan> {
+ if let LogicalPlan::Join(Join {
+ left,
+ right,
+ on,
+ join_type,
+ join_constraint,
+ schema,
+ null_equals_null,
+ }) = join
+ {
+ return Ok(LogicalPlan::Join(Join {
+ left: Arc::new(limit_push_down(
+ _optimizer,
+ left_limit,
+ left.as_ref(),
+ _execution_props,
+ true,
+ )?),
+ right: Arc::new(limit_push_down(
+ _optimizer,
+ right_limit,
+ right.as_ref(),
+ _execution_props,
+ true,
+ )?),
+ on: on.clone(),
+ join_type: *join_type,
+ join_constraint: *join_constraint,
+ schema: schema.clone(),
+ null_equals_null: *null_equals_null,
+ }));
+ } else {
+ Err(DataFusionError::Internal(format!(
+ "{:?} must be join type",
+ join
+ )))
}
}
+fn push_down_children_limit(
+ _optimizer: &LimitPushDown,
+ _execution_props: &ExecutionProps,
+ plan: &LogicalPlan,
+) -> Result<LogicalPlan> {
+ let expr = plan.expressions();
+
+ // apply the optimization to all inputs of the plan
+ let inputs = plan.inputs();
+ let new_inputs = inputs
+ .iter()
+ .map(|plan| limit_push_down(_optimizer, None, plan, _execution_props, false))
+ .collect::<Result<Vec<_>>>()?;
+
+ from_plan(plan, &expr, &new_inputs)
+}
+
impl OptimizerRule for LimitPushDown {
fn optimize(
&self,
@@ -429,7 +504,7 @@ mod test {
let plan = LogicalPlanBuilder::from(table_scan_1)
.join(
&LogicalPlanBuilder::from(table_scan_2).build()?,
- JoinType::Left,
+ JoinType::Inner,
(vec!["a"], vec!["a"]),
)?
.limit(1000)?
@@ -439,7 +514,7 @@ mod test {
// Limit pushdown Not supported in Join
let expected = "Offset: 10\
\n Limit: 1010\
- \n Left Join: #test.a = #test2.a\
+ \n Inner Join: #test.a = #test2.a\
\n TableScan: test projection=None\
\n TableScan: test2 projection=None";
@@ -478,4 +553,54 @@ mod test {
Ok(())
}
+
+ #[test]
+ fn limit_should_push_down_left_outer_join() -> Result<()> {
+ let table_scan_1 = test_table_scan()?;
+ let table_scan_2 = test_table_scan_with_name("test2")?;
+
+ let plan = LogicalPlanBuilder::from(table_scan_1)
+ .join(
+ &LogicalPlanBuilder::from(table_scan_2).build()?,
+ JoinType::Left,
+ (vec!["a"], vec!["a"]),
+ )?
+ .limit(1000)?
+ .build()?;
+
+ // Limit pushdown Not supported in Join
+ let expected = "Limit: 1000\
+ \n Left Join: #test.a = #test2.a\
+ \n TableScan: test projection=None, limit=1000\
+ \n TableScan: test2 projection=None";
+
+ assert_optimized_plan_eq(&plan, expected);
+
+ Ok(())
+ }
+
+ #[test]
+ fn limit_should_push_down_right_outer_join() -> Result<()> {
+ let table_scan_1 = test_table_scan()?;
+ let table_scan_2 = test_table_scan_with_name("test2")?;
+
+ let plan = LogicalPlanBuilder::from(table_scan_1)
+ .join(
+ &LogicalPlanBuilder::from(table_scan_2).build()?,
+ JoinType::Right,
+ (vec!["a"], vec!["a"]),
+ )?
+ .limit(1000)?
+ .build()?;
+
+ // Limit pushdown Not supported in Join
+ let expected = "Limit: 1000\
+ \n Right Join: #test.a = #test2.a\
+ \n TableScan: test projection=None\
+ \n TableScan: test2 projection=None, limit=1000";
+
+ assert_optimized_plan_eq(&plan, expected);
+
+ Ok(())
+ }
}