You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/05/23 18:39:58 UTC

[arrow-datafusion] branch master updated: merge conflict (#2596)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 43575e1e4 merge conflict (#2596)
43575e1e4 is described below

commit 43575e1e4bdeb23320ed7a1119a4a6a5e192254b
Author: Yang Jiang <37...@users.noreply.github.com>
AuthorDate: Tue May 24 02:39:53 2022 +0800

    merge conflict (#2596)
---
 datafusion/core/src/optimizer/limit_push_down.rs | 157 ++++++++++++++++++++---
 1 file changed, 141 insertions(+), 16 deletions(-)

diff --git a/datafusion/core/src/optimizer/limit_push_down.rs b/datafusion/core/src/optimizer/limit_push_down.rs
index 087ac53c8..31748d326 100644
--- a/datafusion/core/src/optimizer/limit_push_down.rs
+++ b/datafusion/core/src/optimizer/limit_push_down.rs
@@ -23,7 +23,8 @@ use crate::logical_plan::plan::Projection;
 use crate::logical_plan::{Limit, TableScan};
 use crate::logical_plan::{LogicalPlan, Union};
 use crate::optimizer::optimizer::OptimizerRule;
-use datafusion_expr::logical_plan::Offset;
+use datafusion_common::DataFusionError;
+use datafusion_expr::logical_plan::{Join, JoinType, Offset};
 use datafusion_expr::utils::from_plan;
 use std::sync::Arc;
 
@@ -157,25 +158,99 @@ fn limit_push_down(
                 )?),
             }))
         }
+        (LogicalPlan::Join(Join { join_type, .. }), upper_limit) => match join_type {
+            JoinType::Left => {
+                //if LeftOuter join push limit to left
+                generate_push_down_join(
+                    _optimizer,
+                    _execution_props,
+                    plan,
+                    upper_limit,
+                    None,
+                )
+            }
+            JoinType::Right =>
+            // If RightOuter join  push limit to right
+            {
+                generate_push_down_join(
+                    _optimizer,
+                    _execution_props,
+                    plan,
+                    None,
+                    upper_limit,
+                )
+            }
+            _ => generate_push_down_join(_optimizer, _execution_props, plan, None, None),
+        },
         // For other nodes we can't push down the limit
         // But try to recurse and find other limit nodes to push down
-        _ => {
-            let expr = plan.expressions();
-
-            // apply the optimization to all inputs of the plan
-            let inputs = plan.inputs();
-            let new_inputs = inputs
-                .iter()
-                .map(|plan| {
-                    limit_push_down(_optimizer, None, plan, _execution_props, false)
-                })
-                .collect::<Result<Vec<_>>>()?;
+        _ => push_down_children_limit(_optimizer, _execution_props, plan),
+    }
+}
 
-            from_plan(plan, &expr, &new_inputs)
-        }
+fn generate_push_down_join(
+    _optimizer: &LimitPushDown,
+    _execution_props: &ExecutionProps,
+    join: &LogicalPlan,
+    left_limit: Option<usize>,
+    right_limit: Option<usize>,
+) -> Result<LogicalPlan> {
+    if let LogicalPlan::Join(Join {
+        left,
+        right,
+        on,
+        join_type,
+        join_constraint,
+        schema,
+        null_equals_null,
+    }) = join
+    {
+        return Ok(LogicalPlan::Join(Join {
+            left: Arc::new(limit_push_down(
+                _optimizer,
+                left_limit,
+                left.as_ref(),
+                _execution_props,
+                true,
+            )?),
+            right: Arc::new(limit_push_down(
+                _optimizer,
+                right_limit,
+                right.as_ref(),
+                _execution_props,
+                true,
+            )?),
+            on: on.clone(),
+            join_type: *join_type,
+            join_constraint: *join_constraint,
+            schema: schema.clone(),
+            null_equals_null: *null_equals_null,
+        }));
+    } else {
+        Err(DataFusionError::Internal(format!(
+            "{:?} must be join type",
+            join
+        )))
     }
 }
 
+fn push_down_children_limit(
+    _optimizer: &LimitPushDown,
+    _execution_props: &ExecutionProps,
+    plan: &LogicalPlan,
+) -> Result<LogicalPlan> {
+    let expr = plan.expressions();
+
+    // apply the optimization to all inputs of the plan
+    let inputs = plan.inputs();
+    let new_inputs = inputs
+        .iter()
+        .map(|plan| limit_push_down(_optimizer, None, plan, _execution_props, false))
+        .collect::<Result<Vec<_>>>()?;
+
+    from_plan(plan, &expr, &new_inputs)
+}
+
 impl OptimizerRule for LimitPushDown {
     fn optimize(
         &self,
@@ -429,7 +504,7 @@ mod test {
         let plan = LogicalPlanBuilder::from(table_scan_1)
             .join(
                 &LogicalPlanBuilder::from(table_scan_2).build()?,
-                JoinType::Left,
+                JoinType::Inner,
                 (vec!["a"], vec!["a"]),
             )?
             .limit(1000)?
@@ -439,7 +514,7 @@ mod test {
         // Limit pushdown Not supported in Join
         let expected = "Offset: 10\
         \n  Limit: 1010\
-        \n    Left Join: #test.a = #test2.a\
+        \n    Inner Join: #test.a = #test2.a\
         \n      TableScan: test projection=None\
         \n      TableScan: test2 projection=None";
 
@@ -478,4 +553,54 @@ mod test {
 
         Ok(())
     }
+
+    #[test]
+    fn limit_should_push_down_left_outer_join() -> Result<()> {
+        let table_scan_1 = test_table_scan()?;
+        let table_scan_2 = test_table_scan_with_name("test2")?;
+
+        let plan = LogicalPlanBuilder::from(table_scan_1)
+            .join(
+                &LogicalPlanBuilder::from(table_scan_2).build()?,
+                JoinType::Left,
+                (vec!["a"], vec!["a"]),
+            )?
+            .limit(1000)?
+            .build()?;
+
+        // Limit pushdown Not supported in Join
+        let expected = "Limit: 1000\
+        \n  Left Join: #test.a = #test2.a\
+        \n    TableScan: test projection=None, limit=1000\
+        \n    TableScan: test2 projection=None";
+
+        assert_optimized_plan_eq(&plan, expected);
+
+        Ok(())
+    }
+
+    #[test]
+    fn limit_should_push_down_right_outer_join() -> Result<()> {
+        let table_scan_1 = test_table_scan()?;
+        let table_scan_2 = test_table_scan_with_name("test2")?;
+
+        let plan = LogicalPlanBuilder::from(table_scan_1)
+            .join(
+                &LogicalPlanBuilder::from(table_scan_2).build()?,
+                JoinType::Right,
+                (vec!["a"], vec!["a"]),
+            )?
+            .limit(1000)?
+            .build()?;
+
+        // Limit pushdown Not supported in Join
+        let expected = "Limit: 1000\
+        \n  Right Join: #test.a = #test2.a\
+        \n    TableScan: test projection=None\
+        \n    TableScan: test2 projection=None, limit=1000";
+
+        assert_optimized_plan_eq(&plan, expected);
+
+        Ok(())
+    }
 }