You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/05/29 13:43:00 UTC

[GitHub] [arrow-datafusion] korowa opened a new pull request, #2647: pushdown support for predicates in `ON` clause of joins

korowa opened a new pull request, #2647:
URL: https://github.com/apache/arrow-datafusion/pull/2647

   # Which issue does this PR close?
   
   <!--
   We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123.
   -->
   
   Closes #2619.
   
    # Rationale for this change
   <!--
    Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed.
    Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes.  
   -->
   
   Pushdown support for predicates in `JOIN ON` clause and simplification of join planning.
   
   # What changes are included in this PR?
   <!--
   There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR.
   -->
   
   Join planning now just puts join filter into plan node without splitting it into pushable to inputs predicates.
   
   `flter_push_down` rule checks for pushable predicates in join filter expression and replans them as filters for corresponding inputs.
   
   # Are there any user-facing changes?
   <!--
   If there are user-facing changes then we may require documentation to be updated before approving the PR.
   -->
   
   No
   
   <!--
   If there are any breaking changes to public APIs, please add the `api change` label.
   -->
   
   # Does this PR break compatibility with Ballista?
   
   <!--
   The CI checks will attempt to build [arrow-ballista](https://github.com/apache/arrow-ballista) against this PR. If 
   this check fails then it indicates that this PR makes a breaking change to the DataFusion API.
   
   If possible, try to make the change in a way that is not a breaking API change. For example, if code has moved 
    around, try adding `pub use` from the original location to preserve the current API.
   
   If it is not possible to avoid a breaking change (such as when adding enum variants) then follow this process:
   
   - Make a corresponding PR against `arrow-ballista` with the changes required there
   - Update `dev/build-arrow-ballista.sh` to clone the appropriate `arrow-ballista` repo & branch
   - Merge this PR when CI passes
   - Merge the Ballista PR
   - Create a new PR here to reset `dev/build-arrow-ballista.sh` to point to `arrow-ballista` master again
   
   _If you would like to help improve this process, please see https://github.com/apache/arrow-datafusion/issues/2583_
   -->
   
   No


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] jackwener commented on a diff in pull request #2647: pushdown support for predicates in `ON` clause of joins

Posted by GitBox <gi...@apache.org>.
jackwener commented on code in PR #2647:
URL: https://github.com/apache/arrow-datafusion/pull/2647#discussion_r884369300


##########
datafusion/sql/src/planner.rs:
##########
@@ -3837,10 +3759,9 @@ mod tests {
             LEFT JOIN orders \
             ON id = customer_id AND order_id > 1 AND age < 30";
         let expected = "Projection: #person.id, #orders.order_id\
-        \n  Left Join: #person.id = #orders.customer_id Filter: #person.age < Int64(30)\
+        \n  Left Join: #person.id = #orders.customer_id Filter: #orders.order_id > Int64(1) AND #person.age < Int64(30)\
         \n    TableScan: person projection=None\
-        \n    Filter: #orders.order_id > Int64(1)\
-        \n      TableScan: orders projection=None";
+        \n    TableScan: orders projection=None";

Review Comment:
   it seems that it can't pushdown filter to TableScan๐Ÿ‘€



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #2647: pushdown support for predicates in `ON` clause of joins

Posted by GitBox <gi...@apache.org>.
alamb commented on code in PR #2647:
URL: https://github.com/apache/arrow-datafusion/pull/2647#discussion_r887261918


##########
datafusion/core/src/optimizer/filter_push_down.rs:
##########
@@ -178,13 +177,35 @@ fn lr_is_preserved(plan: &LogicalPlan) -> (bool, bool) {
     }
 }
 
+// For a given JOIN logical plan, determine whether each side of the join is preserved
+// in terms on join filtering.
+// Predicates from join filter can only be pushed to preserved join side.

Review Comment:
   ๐Ÿ‘ 



##########
datafusion/core/src/optimizer/filter_push_down.rs:
##########
@@ -224,32 +244,67 @@ fn optimize_join(
     plan: &LogicalPlan,
     left: &LogicalPlan,
     right: &LogicalPlan,
+    on_filter: Vec<(Expr, HashSet<Column>)>,
 ) -> Result<LogicalPlan> {
+    // Get pushable predicates from current optimizer state
     let (left_preserved, right_preserved) = lr_is_preserved(plan);
-    let to_left = get_pushable_join_predicates(&state, left.schema(), left_preserved);
-    let to_right = get_pushable_join_predicates(&state, right.schema(), right_preserved);
-
+    let to_left =

Review Comment:
   I think this code is very nicely written and easy to follow ๐Ÿ‘ 



##########
datafusion/expr/src/utils.rs:
##########
@@ -378,19 +378,24 @@ pub fn from_plan(
             join_type,
             join_constraint,
             on,
-            filter,
             null_equals_null,
             ..
         }) => {
             let schema =
                 build_join_schema(inputs[0].schema(), inputs[1].schema(), join_type)?;
+            let filter_expr = if on.len() * 2 == expr.len() {

Review Comment:
   ```suggestion
               // Assume that the last expr, if any,
               // is the filter_expr (non equality predicate from ON clause)
               let filter_expr = if on.len() * 2 == expr.len() {
   ```



##########
datafusion/core/src/optimizer/filter_push_down.rs:
##########
@@ -1351,7 +1427,7 @@ mod tests {
         let right = LogicalPlanBuilder::from(right_table_scan)
             .project(vec![col("a"), col("b"), col("c")])?
             .build()?;
-        let filter = col("test.a")
+        let filter = col("test.c")

Review Comment:
   Why is this changed? Because `test.a` is a join predicate as well?



##########
datafusion/sql/src/planner.rs:
##########
@@ -589,98 +589,21 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                             &[using_columns],
                         )
                     })
-                    .collect::<Result<Vec<_>>>()?;
+                    .collect::<Result<Vec<_>>>()?
+                    .into_iter()
+                    .reduce(Expr::and);
 
                 if left_keys.is_empty() {
                     // When we don't have join keys, use cross join
                     let join = LogicalPlanBuilder::from(left).cross_join(&right)?;
-                    normalized_filters
-                        .into_iter()
-                        .reduce(Expr::and)
+                    join_filter
                         .map(|filter| join.filter(filter))
                         .unwrap_or(Ok(join))?
                         .build()
-                } else if join_type == JoinType::Inner && !normalized_filters.is_empty() {

Review Comment:
   โค๏ธ 



##########
datafusion/core/src/optimizer/filter_push_down.rs:
##########
@@ -178,13 +177,35 @@ fn lr_is_preserved(plan: &LogicalPlan) -> (bool, bool) {
     }
 }
 
+// For a given JOIN logical plan, determine whether each side of the join is preserved
+// in terms on join filtering.
+// Predicates from join filter can only be pushed to preserved join side.
+fn on_lr_is_preserved(plan: &LogicalPlan) -> (bool, bool) {
+    match plan {
+        LogicalPlan::Join(Join { join_type, .. }) => match join_type {
+            JoinType::Inner => (true, true),
+            JoinType::Left => (false, true),
+            JoinType::Right => (true, false),
+            JoinType::Full => (false, false),
+            // Semi/Anti joins can not have join filter.
+            JoinType::Semi | JoinType::Anti => unreachable!(
+                "on_lr_is_preserved cannot be appplied to SEMI/ANTI-JOIN nodes"
+            ),
+        },
+        LogicalPlan::CrossJoin(_) => {
+            unreachable!("on_lr_is_preserved cannot be applied to CROSSJOIN nodes")
+        }
+        _ => unreachable!("on_lr_is_preserved only valid for JOIN nodes"),
+    }
+}
+
 // Determine which predicates in state can be pushed down to a given side of a join.
 // To determine this, we need to know the schema of the relevant join side and whether
 // or not the side's rows are preserved when joining. If the side is not preserved, we
 // do not push down anything. Otherwise we can push down predicates where all of the
 // relevant columns are contained on the relevant join side's schema.
 fn get_pushable_join_predicates<'a>(
-    state: &'a State,
+    filters: &'a [(Expr, HashSet<Column>)],

Review Comment:
   I think it would help to document what the `filters` here represents, either in a doc string ("pairs of Exprs and a set of columns that represent ...") or maybe as a struct?
   
   ```rust
   struct PushFilters {
     /// expr represents an On clause predicate
     expr: Expr,
      /// columns represents columns that appear in `expr`
      cols: HashSet<Column>
   }
   ```



##########
datafusion/core/src/optimizer/filter_push_down.rs:
##########
@@ -1387,9 +1463,97 @@ mod tests {
         Ok(())
     }
 
+    /// join filter should be completely removed after pushdown
+    #[test]
+    fn join_filter_removed() -> Result<()> {
+        let table_scan = test_table_scan()?;
+        let left = LogicalPlanBuilder::from(table_scan)
+            .project(vec![col("a"), col("b"), col("c")])?
+            .build()?;
+        let right_table_scan = test_table_scan_with_name("test2")?;
+        let right = LogicalPlanBuilder::from(right_table_scan)
+            .project(vec![col("a"), col("b"), col("c")])?
+            .build()?;
+        let filter = col("test.b")
+            .gt(lit(1u32))
+            .and(col("test2.c").gt(lit(4u32)));
+        let plan = LogicalPlanBuilder::from(left)
+            .join(
+                &right,
+                JoinType::Inner,
+                (vec![Column::from_name("a")], vec![Column::from_name("a")]),
+                Some(filter),
+            )?
+            .build()?;
+
+        // not part of the test, just good to know:
+        assert_eq!(
+            format!("{:?}", plan),
+            "\
+            Inner Join: #test.a = #test2.a Filter: #test.b > UInt32(1) AND #test2.c > UInt32(4)\
+            \n  Projection: #test.a, #test.b, #test.c\
+            \n    TableScan: test projection=None\
+            \n  Projection: #test2.a, #test2.b, #test2.c\
+            \n    TableScan: test2 projection=None"
+        );
+
+        let expected = "\
+        Inner Join: #test.a = #test2.a\
+        \n  Projection: #test.a, #test.b, #test.c\
+        \n    Filter: #test.b > UInt32(1)\
+        \n      TableScan: test projection=None\
+        \n  Projection: #test2.a, #test2.b, #test2.c\
+        \n    Filter: #test2.c > UInt32(4)\
+        \n      TableScan: test2 projection=None";
+        assert_optimized_plan_eq(&plan, expected);
+        Ok(())
+    }
+
+    /// predicate on join key in filter expression should be pushed down to both inputs
+    #[test]
+    fn join_filter_on_common() -> Result<()> {
+        let table_scan = test_table_scan()?;
+        let left = LogicalPlanBuilder::from(table_scan)
+            .project(vec![col("a")])?
+            .build()?;
+        let right_table_scan = test_table_scan_with_name("test2")?;
+        let right = LogicalPlanBuilder::from(right_table_scan)
+            .project(vec![col("a")])?
+            .build()?;
+        let filter = col("test.a").gt(lit(1u32));
+        let plan = LogicalPlanBuilder::from(left)
+            .join(
+                &right,
+                JoinType::Inner,
+                (vec![Column::from_name("a")], vec![Column::from_name("a")]),
+                Some(filter),
+            )?
+            .build()?;
+
+        // not part of the test, just good to know:
+        assert_eq!(
+            format!("{:?}", plan),
+            "\
+            Inner Join: #test.a = #test2.a Filter: #test.a > UInt32(1)\
+            \n  Projection: #test.a\
+            \n    TableScan: test projection=None\
+            \n  Projection: #test2.a\
+            \n    TableScan: test2 projection=None"
+        );
+
+        let expected = "\
+        Inner Join: #test.a = #test2.a\

Review Comment:
   I recommend using different columns here -- like `test.a = test2.b` so that you can validate that correct column was pushed to each side



##########
datafusion/sql/src/planner.rs:
##########
@@ -3837,10 +3759,9 @@ mod tests {
             LEFT JOIN orders \
             ON id = customer_id AND order_id > 1 AND age < 30";
         let expected = "Projection: #person.id, #orders.order_id\
-        \n  Left Join: #person.id = #orders.customer_id Filter: #person.age < Int64(30)\
+        \n  Left Join: #person.id = #orders.customer_id Filter: #orders.order_id > Int64(1) AND #person.age < Int64(30)\
         \n    TableScan: person projection=None\
-        \n    Filter: #orders.order_id > Int64(1)\
-        \n      TableScan: orders projection=None";
+        \n    TableScan: orders projection=None";

Review Comment:
   So to be clear, when run normally the pushdown will still happen, but they are no longer pushed down by the planner, rather they are pushed down at a later stage.
   
   ๐Ÿ‘ 



##########
datafusion/expr/src/logical_plan/plan.rs:
##########
@@ -227,9 +227,15 @@ impl LogicalPlan {
                 aggr_expr,
                 ..
             }) => group_expr.iter().chain(aggr_expr.iter()).cloned().collect(),
-            LogicalPlan::Join(Join { on, .. }) => on
+            LogicalPlan::Join(Join { on, filter, .. }) => on
                 .iter()
                 .flat_map(|(l, r)| vec![Expr::Column(l.clone()), Expr::Column(r.clone())])
+                .chain(

Review Comment:
   This is a good catch



##########
datafusion/core/src/optimizer/filter_push_down.rs:
##########
@@ -224,32 +244,67 @@ fn optimize_join(
     plan: &LogicalPlan,
     left: &LogicalPlan,
     right: &LogicalPlan,
+    on_filter: Vec<(Expr, HashSet<Column>)>,
 ) -> Result<LogicalPlan> {
+    // Get pushable predicates from current optimizer state
     let (left_preserved, right_preserved) = lr_is_preserved(plan);
-    let to_left = get_pushable_join_predicates(&state, left.schema(), left_preserved);
-    let to_right = get_pushable_join_predicates(&state, right.schema(), right_preserved);
-
+    let to_left =
+        get_pushable_join_predicates(&state.filters, left.schema(), left_preserved);
+    let to_right =
+        get_pushable_join_predicates(&state.filters, right.schema(), right_preserved);
     let to_keep: Predicates = state
         .filters
         .iter()
-        .filter(|(expr, _)| {
-            let pushed_to_left = to_left.0.contains(&expr);
-            let pushed_to_right = to_right.0.contains(&expr);
-            !pushed_to_left && !pushed_to_right
-        })
+        .filter(|(e, _)| !to_left.0.contains(&e) && !to_right.0.contains(&e))
         .map(|(a, b)| (a, b))
         .unzip();
 
-    let mut left_state = state.clone();
-    left_state.filters = keep_filters(&left_state.filters, &to_left);
+    // Get pushable predicates from join filter
+    let (on_to_left, on_to_right, on_to_keep) = if on_filter.is_empty() {
+        ((vec![], vec![]), (vec![], vec![]), vec![])
+    } else {
+        let (on_left_preserved, on_right_preserved) = on_lr_is_preserved(plan);
+        let on_to_left =
+            get_pushable_join_predicates(&on_filter, left.schema(), on_left_preserved);
+        let on_to_right =
+            get_pushable_join_predicates(&on_filter, right.schema(), on_right_preserved);
+        let on_to_keep = on_filter
+            .iter()
+            .filter(|(e, _)| !on_to_left.0.contains(&e) && !on_to_right.0.contains(&e))
+            .map(|(a, _)| a.clone())
+            .collect::<Vec<_>>();
+
+        (on_to_left, on_to_right, on_to_keep)
+    };
+
+    // Build new filter states using pushable predicates
+    // from current optimizer states and from ON clause.
+    // Then recursively call optimization for both join inputs
+    let mut left_state = State { filters: vec![] };
+    left_state.append_predicates(to_left);
+    left_state.append_predicates(on_to_left);
     let left = optimize(left, left_state)?;
 
-    let mut right_state = state.clone();
-    right_state.filters = keep_filters(&right_state.filters, &to_right);
+    let mut right_state = State { filters: vec![] };
+    right_state.append_predicates(to_right);
+    right_state.append_predicates(on_to_right);
     let right = optimize(right, right_state)?;
 
     // create a new Join with the new `left` and `right`
     let expr = plan.expressions();
+    let expr = if !on_filter.is_empty() && on_to_keep.is_empty() {

Review Comment:
   I don't really follow how we know the last element here in `expr` are the `on` expression -- doesn't that implicitly depend on the order of expressions returned from `Expr::expressions()`?
   
   I wonder if we can make it more explicit somehow



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #2647: pushdown support for predicates in `ON` clause of joins

Posted by GitBox <gi...@apache.org>.
alamb commented on PR #2647:
URL: https://github.com/apache/arrow-datafusion/pull/2647#issuecomment-1142057529

   I plan to review this later today or tomorrow if no one else has a chance to do so before


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #2647: pushdown support for predicates in `ON` clause of joins

Posted by GitBox <gi...@apache.org>.
alamb commented on code in PR #2647:
URL: https://github.com/apache/arrow-datafusion/pull/2647#discussion_r887818870


##########
datafusion/core/src/optimizer/filter_push_down.rs:
##########
@@ -224,32 +244,67 @@ fn optimize_join(
     plan: &LogicalPlan,
     left: &LogicalPlan,
     right: &LogicalPlan,
+    on_filter: Vec<(Expr, HashSet<Column>)>,
 ) -> Result<LogicalPlan> {
+    // Get pushable predicates from current optimizer state
     let (left_preserved, right_preserved) = lr_is_preserved(plan);
-    let to_left = get_pushable_join_predicates(&state, left.schema(), left_preserved);
-    let to_right = get_pushable_join_predicates(&state, right.schema(), right_preserved);
-
+    let to_left =
+        get_pushable_join_predicates(&state.filters, left.schema(), left_preserved);
+    let to_right =
+        get_pushable_join_predicates(&state.filters, right.schema(), right_preserved);
     let to_keep: Predicates = state
         .filters
         .iter()
-        .filter(|(expr, _)| {
-            let pushed_to_left = to_left.0.contains(&expr);
-            let pushed_to_right = to_right.0.contains(&expr);
-            !pushed_to_left && !pushed_to_right
-        })
+        .filter(|(e, _)| !to_left.0.contains(&e) && !to_right.0.contains(&e))
         .map(|(a, b)| (a, b))
         .unzip();
 
-    let mut left_state = state.clone();
-    left_state.filters = keep_filters(&left_state.filters, &to_left);
+    // Get pushable predicates from join filter
+    let (on_to_left, on_to_right, on_to_keep) = if on_filter.is_empty() {
+        ((vec![], vec![]), (vec![], vec![]), vec![])
+    } else {
+        let (on_left_preserved, on_right_preserved) = on_lr_is_preserved(plan);
+        let on_to_left =
+            get_pushable_join_predicates(&on_filter, left.schema(), on_left_preserved);
+        let on_to_right =
+            get_pushable_join_predicates(&on_filter, right.schema(), on_right_preserved);
+        let on_to_keep = on_filter
+            .iter()
+            .filter(|(e, _)| !on_to_left.0.contains(&e) && !on_to_right.0.contains(&e))
+            .map(|(a, _)| a.clone())
+            .collect::<Vec<_>>();
+
+        (on_to_left, on_to_right, on_to_keep)
+    };
+
+    // Build new filter states using pushable predicates
+    // from current optimizer states and from ON clause.
+    // Then recursively call optimization for both join inputs
+    let mut left_state = State { filters: vec![] };
+    left_state.append_predicates(to_left);
+    left_state.append_predicates(on_to_left);
     let left = optimize(left, left_state)?;
 
-    let mut right_state = state.clone();
-    right_state.filters = keep_filters(&right_state.filters, &to_right);
+    let mut right_state = State { filters: vec![] };
+    right_state.append_predicates(to_right);
+    right_state.append_predicates(on_to_right);
     let right = optimize(right, right_state)?;
 
     // create a new Join with the new `left` and `right`
     let expr = plan.expressions();
+    let expr = if !on_filter.is_empty() && on_to_keep.is_empty() {

Review Comment:
   Thank you -- I think the comment helps a lot.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb merged pull request #2647: pushdown support for predicates in `ON` clause of joins

Posted by GitBox <gi...@apache.org>.
alamb merged PR #2647:
URL: https://github.com/apache/arrow-datafusion/pull/2647


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] korowa commented on a diff in pull request #2647: pushdown support for predicates in `ON` clause of joins

Posted by GitBox <gi...@apache.org>.
korowa commented on code in PR #2647:
URL: https://github.com/apache/arrow-datafusion/pull/2647#discussion_r887557450


##########
datafusion/core/src/optimizer/filter_push_down.rs:
##########
@@ -1351,7 +1427,7 @@ mod tests {
         let right = LogicalPlanBuilder::from(right_table_scan)
             .project(vec![col("a"), col("b"), col("c")])?
             .build()?;
-        let filter = col("test.a")
+        let filter = col("test.c")

Review Comment:
   Exactly, I've decided not to mix these cases and moved case with filtering on one of join keys to `join_filter_on_common`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] korowa commented on a diff in pull request #2647: pushdown support for predicates in `ON` clause of joins

Posted by GitBox <gi...@apache.org>.
korowa commented on code in PR #2647:
URL: https://github.com/apache/arrow-datafusion/pull/2647#discussion_r887557450


##########
datafusion/core/src/optimizer/filter_push_down.rs:
##########
@@ -1351,7 +1427,7 @@ mod tests {
         let right = LogicalPlanBuilder::from(right_table_scan)
             .project(vec![col("a"), col("b"), col("c")])?
             .build()?;
-        let filter = col("test.a")
+        let filter = col("test.c")

Review Comment:
   Exactly, I've decided not to mix these cases and moved test for filtering on one of join keys to `join_filter_on_common`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #2647: pushdown support for predicates in `ON` clause of joins

Posted by GitBox <gi...@apache.org>.
alamb commented on PR #2647:
URL: https://github.com/apache/arrow-datafusion/pull/2647#issuecomment-1144717956

   I retriggered failing CI tests -- they seemed to be unrelated


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] korowa commented on a diff in pull request #2647: pushdown support for predicates in `ON` clause of joins

Posted by GitBox <gi...@apache.org>.
korowa commented on code in PR #2647:
URL: https://github.com/apache/arrow-datafusion/pull/2647#discussion_r887587666


##########
datafusion/core/src/optimizer/filter_push_down.rs:
##########
@@ -224,32 +244,67 @@ fn optimize_join(
     plan: &LogicalPlan,
     left: &LogicalPlan,
     right: &LogicalPlan,
+    on_filter: Vec<(Expr, HashSet<Column>)>,
 ) -> Result<LogicalPlan> {
+    // Get pushable predicates from current optimizer state
     let (left_preserved, right_preserved) = lr_is_preserved(plan);
-    let to_left = get_pushable_join_predicates(&state, left.schema(), left_preserved);
-    let to_right = get_pushable_join_predicates(&state, right.schema(), right_preserved);
-
+    let to_left =
+        get_pushable_join_predicates(&state.filters, left.schema(), left_preserved);
+    let to_right =
+        get_pushable_join_predicates(&state.filters, right.schema(), right_preserved);
     let to_keep: Predicates = state
         .filters
         .iter()
-        .filter(|(expr, _)| {
-            let pushed_to_left = to_left.0.contains(&expr);
-            let pushed_to_right = to_right.0.contains(&expr);
-            !pushed_to_left && !pushed_to_right
-        })
+        .filter(|(e, _)| !to_left.0.contains(&e) && !to_right.0.contains(&e))
         .map(|(a, b)| (a, b))
         .unzip();
 
-    let mut left_state = state.clone();
-    left_state.filters = keep_filters(&left_state.filters, &to_left);
+    // Get pushable predicates from join filter
+    let (on_to_left, on_to_right, on_to_keep) = if on_filter.is_empty() {
+        ((vec![], vec![]), (vec![], vec![]), vec![])
+    } else {
+        let (on_left_preserved, on_right_preserved) = on_lr_is_preserved(plan);
+        let on_to_left =
+            get_pushable_join_predicates(&on_filter, left.schema(), on_left_preserved);
+        let on_to_right =
+            get_pushable_join_predicates(&on_filter, right.schema(), on_right_preserved);
+        let on_to_keep = on_filter
+            .iter()
+            .filter(|(e, _)| !on_to_left.0.contains(&e) && !on_to_right.0.contains(&e))
+            .map(|(a, _)| a.clone())
+            .collect::<Vec<_>>();
+
+        (on_to_left, on_to_right, on_to_keep)
+    };
+
+    // Build new filter states using pushable predicates
+    // from current optimizer states and from ON clause.
+    // Then recursively call optimization for both join inputs
+    let mut left_state = State { filters: vec![] };
+    left_state.append_predicates(to_left);
+    left_state.append_predicates(on_to_left);
     let left = optimize(left, left_state)?;
 
-    let mut right_state = state.clone();
-    right_state.filters = keep_filters(&right_state.filters, &to_right);
+    let mut right_state = State { filters: vec![] };
+    right_state.append_predicates(to_right);
+    right_state.append_predicates(on_to_right);
     let right = optimize(right, right_state)?;
 
     // create a new Join with the new `left` and `right`
     let expr = plan.expressions();
+    let expr = if !on_filter.is_empty() && on_to_keep.is_empty() {

Review Comment:
   I guess it's the downside of approach when we represent complex (or complex enough) structure (or some parts of it) with simple list/dict/tuple/etc, however after looking at how `LogicalPlan::Repartition` and `LogicalPlan::Aggregate` translated into expressions I supposed that we can rely on expressions vector element order in this case.
   
   Regarding your comment above -- I've added explanation of what's going on with last element and what's expected `expressions()` behaviour -- hope it can be helpful for next person who access this part of code. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] korowa commented on a diff in pull request #2647: pushdown support for predicates in `ON` clause of joins

Posted by GitBox <gi...@apache.org>.
korowa commented on code in PR #2647:
URL: https://github.com/apache/arrow-datafusion/pull/2647#discussion_r887587789


##########
datafusion/core/src/optimizer/filter_push_down.rs:
##########
@@ -1387,9 +1463,97 @@ mod tests {
         Ok(())
     }
 
+    /// join filter should be completely removed after pushdown
+    #[test]
+    fn join_filter_removed() -> Result<()> {
+        let table_scan = test_table_scan()?;
+        let left = LogicalPlanBuilder::from(table_scan)
+            .project(vec![col("a"), col("b"), col("c")])?
+            .build()?;
+        let right_table_scan = test_table_scan_with_name("test2")?;
+        let right = LogicalPlanBuilder::from(right_table_scan)
+            .project(vec![col("a"), col("b"), col("c")])?
+            .build()?;
+        let filter = col("test.b")
+            .gt(lit(1u32))
+            .and(col("test2.c").gt(lit(4u32)));
+        let plan = LogicalPlanBuilder::from(left)
+            .join(
+                &right,
+                JoinType::Inner,
+                (vec![Column::from_name("a")], vec![Column::from_name("a")]),
+                Some(filter),
+            )?
+            .build()?;
+
+        // not part of the test, just good to know:
+        assert_eq!(
+            format!("{:?}", plan),
+            "\
+            Inner Join: #test.a = #test2.a Filter: #test.b > UInt32(1) AND #test2.c > UInt32(4)\
+            \n  Projection: #test.a, #test.b, #test.c\
+            \n    TableScan: test projection=None\
+            \n  Projection: #test2.a, #test2.b, #test2.c\
+            \n    TableScan: test2 projection=None"
+        );
+
+        let expected = "\
+        Inner Join: #test.a = #test2.a\
+        \n  Projection: #test.a, #test.b, #test.c\
+        \n    Filter: #test.b > UInt32(1)\
+        \n      TableScan: test projection=None\
+        \n  Projection: #test2.a, #test2.b, #test2.c\
+        \n    Filter: #test2.c > UInt32(4)\
+        \n      TableScan: test2 projection=None";
+        assert_optimized_plan_eq(&plan, expected);
+        Ok(())
+    }
+
+    /// predicate on join key in filter expression should be pushed down to both inputs
+    #[test]
+    fn join_filter_on_common() -> Result<()> {
+        let table_scan = test_table_scan()?;
+        let left = LogicalPlanBuilder::from(table_scan)
+            .project(vec![col("a")])?
+            .build()?;
+        let right_table_scan = test_table_scan_with_name("test2")?;
+        let right = LogicalPlanBuilder::from(right_table_scan)
+            .project(vec![col("a")])?
+            .build()?;
+        let filter = col("test.a").gt(lit(1u32));
+        let plan = LogicalPlanBuilder::from(left)
+            .join(
+                &right,
+                JoinType::Inner,
+                (vec![Column::from_name("a")], vec![Column::from_name("a")]),
+                Some(filter),
+            )?
+            .build()?;
+
+        // not part of the test, just good to know:
+        assert_eq!(
+            format!("{:?}", plan),
+            "\
+            Inner Join: #test.a = #test2.a Filter: #test.a > UInt32(1)\
+            \n  Projection: #test.a\
+            \n    TableScan: test projection=None\
+            \n  Projection: #test2.a\
+            \n    TableScan: test2 projection=None"
+        );
+
+        let expected = "\
+        Inner Join: #test.a = #test2.a\

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] korowa commented on a diff in pull request #2647: pushdown support for predicates in `ON` clause of joins

Posted by GitBox <gi...@apache.org>.
korowa commented on code in PR #2647:
URL: https://github.com/apache/arrow-datafusion/pull/2647#discussion_r887579087


##########
datafusion/core/src/optimizer/filter_push_down.rs:
##########
@@ -178,13 +177,35 @@ fn lr_is_preserved(plan: &LogicalPlan) -> (bool, bool) {
     }
 }
 
+// For a given JOIN logical plan, determine whether each side of the join is preserved
+// in terms on join filtering.
+// Predicates from join filter can only be pushed to preserved join side.
+fn on_lr_is_preserved(plan: &LogicalPlan) -> (bool, bool) {
+    match plan {
+        LogicalPlan::Join(Join { join_type, .. }) => match join_type {
+            JoinType::Inner => (true, true),
+            JoinType::Left => (false, true),
+            JoinType::Right => (true, false),
+            JoinType::Full => (false, false),
+            // Semi/Anti joins can not have join filter.
+            JoinType::Semi | JoinType::Anti => unreachable!(
+                "on_lr_is_preserved cannot be appplied to SEMI/ANTI-JOIN nodes"
+            ),
+        },
+        LogicalPlan::CrossJoin(_) => {
+            unreachable!("on_lr_is_preserved cannot be applied to CROSSJOIN nodes")
+        }
+        _ => unreachable!("on_lr_is_preserved only valid for JOIN nodes"),
+    }
+}
+
 // Determine which predicates in state can be pushed down to a given side of a join.
 // To determine this, we need to know the schema of the relevant join side and whether
 // or not the side's rows are preserved when joining. If the side is not preserved, we
 // do not push down anything. Otherwise we can push down predicates where all of the
 // relevant columns are contained on the relevant join side's schema.
 fn get_pushable_join_predicates<'a>(
-    state: &'a State,
+    filters: &'a [(Expr, HashSet<Column>)],

Review Comment:
   I've added `type Predicate` and replaced all tuples of expression and its columns in signatures/attributes with it - hope it looks better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] korowa commented on a diff in pull request #2647: pushdown support for predicates in `ON` clause of joins

Posted by GitBox <gi...@apache.org>.
korowa commented on code in PR #2647:
URL: https://github.com/apache/arrow-datafusion/pull/2647#discussion_r884421525


##########
datafusion/sql/src/planner.rs:
##########
@@ -3837,10 +3759,9 @@ mod tests {
             LEFT JOIN orders \
             ON id = customer_id AND order_id > 1 AND age < 30";
         let expected = "Projection: #person.id, #orders.order_id\
-        \n  Left Join: #person.id = #orders.customer_id Filter: #person.age < Int64(30)\
+        \n  Left Join: #person.id = #orders.customer_id Filter: #orders.order_id > Int64(1) AND #person.age < Int64(30)\
         \n    TableScan: person projection=None\
-        \n    Filter: #orders.order_id > Int64(1)\
-        \n      TableScan: orders projection=None";
+        \n    TableScan: orders projection=None";

Review Comment:
   You're right, it can not anymore - now all predicate movements are on `filter_push_down` optimizer, and there is no need for planner to define should it push some filters down or not - it just creates plan node as it is.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] korowa commented on a diff in pull request #2647: pushdown support for predicates in `ON` clause of joins

Posted by GitBox <gi...@apache.org>.
korowa commented on code in PR #2647:
URL: https://github.com/apache/arrow-datafusion/pull/2647#discussion_r887579087


##########
datafusion/core/src/optimizer/filter_push_down.rs:
##########
@@ -178,13 +177,35 @@ fn lr_is_preserved(plan: &LogicalPlan) -> (bool, bool) {
     }
 }
 
+// For a given JOIN logical plan, determine whether each side of the join is preserved
+// in terms on join filtering.
+// Predicates from join filter can only be pushed to preserved join side.
+fn on_lr_is_preserved(plan: &LogicalPlan) -> (bool, bool) {
+    match plan {
+        LogicalPlan::Join(Join { join_type, .. }) => match join_type {
+            JoinType::Inner => (true, true),
+            JoinType::Left => (false, true),
+            JoinType::Right => (true, false),
+            JoinType::Full => (false, false),
+            // Semi/Anti joins can not have join filter.
+            JoinType::Semi | JoinType::Anti => unreachable!(
+                "on_lr_is_preserved cannot be appplied to SEMI/ANTI-JOIN nodes"
+            ),
+        },
+        LogicalPlan::CrossJoin(_) => {
+            unreachable!("on_lr_is_preserved cannot be applied to CROSSJOIN nodes")
+        }
+        _ => unreachable!("on_lr_is_preserved only valid for JOIN nodes"),
+    }
+}
+
 // Determine which predicates in state can be pushed down to a given side of a join.
 // To determine this, we need to know the schema of the relevant join side and whether
 // or not the side's rows are preserved when joining. If the side is not preserved, we
 // do not push down anything. Otherwise we can push down predicates where all of the
 // relevant columns are contained on the relevant join side's schema.
 fn get_pushable_join_predicates<'a>(
-    state: &'a State,
+    filters: &'a [(Expr, HashSet<Column>)],

Review Comment:
   I've added `type Predicate` (we already have `type Predicates` for similar kind of thing, so uniform naming might be better) and replaced all tuples of expression and its columns in signatures/attributes with it - hope it looks better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #2647: pushdown support for predicates in `ON` clause of joins

Posted by GitBox <gi...@apache.org>.
alamb commented on PR #2647:
URL: https://github.com/apache/arrow-datafusion/pull/2647#issuecomment-1145160024

   Thanks again @korowa 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org