You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/05/24 19:33:11 UTC

[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #2591: Support for non equality predicates in `ON` clause of `LEFT`, `RIGHT, `and `FULL` joins

alamb commented on code in PR #2591:
URL: https://github.com/apache/arrow-datafusion/pull/2591#discussion_r880841593


##########
datafusion/core/src/optimizer/filter_push_down.rs:
##########
@@ -1143,6 +1145,7 @@ mod tests {
                 &right,
                 JoinType::Inner,
                 (vec![Column::from_name("a")], vec![Column::from_name("a")]),
+                None,

Review Comment:
   I recommend tests:
   1.  A  filter on an inner join and ensuring that the filter pushdown pushes it correctly
   2. A filter on an outer (left and full) showing that it is not pushed down (or that it is only pushed down on the non-preserved relation)
   
   (if the extra filter is not pushed, I think it would be fine to file a follow on ticket to add that functionality -- I bet some others in the community might want to pick it up)



##########
datafusion/core/tests/sql/joins.rs:
##########
@@ -149,14 +149,22 @@ async fn equijoin_right_and_condition_from_left() -> Result<()> {
 }
 
 #[tokio::test]
-async fn equijoin_and_unsupported_condition() -> Result<()> {
+async fn equijoin_and_condition() -> Result<()> {
     let ctx = create_join_context("t1_id", "t2_id")?;
     let sql =
-        "SELECT t1_id, t1_name, t2_name FROM t1 LEFT JOIN t2 ON t1_id = t2_id AND t1_id >= '44' ORDER BY t1_id";
-    let res = ctx.create_logical_plan(sql);
-
-    assert!(res.is_err());
-    assert_eq!(format!("{}", res.unwrap_err()), "This feature is not implemented: Unsupported expressions in Left JOIN: [#t1_id >= Utf8(\"44\")]");
+        "SELECT t1_id, t1_name, t2_name FROM t1 LEFT JOIN t2 ON t1_id = t2_id AND t1_id >= 44 ORDER BY t1_id";
+    let expected = vec![
+        "+-------+---------+---------+",
+        "| t1_id | t1_name | t2_name |",
+        "+-------+---------+---------+",
+        "| 11    | a       |         |",
+        "| 22    | b       |         |",
+        "| 33    | c       |         |",
+        "| 44    | d       | x       |",
+        "+-------+---------+---------+",
+    ];
+    let actual = execute_to_batches(&ctx, sql).await;
+    assert_batches_eq!(expected, &actual);
 

Review Comment:
   Can you please also add tests for `RIGHT OUTER JOIN` and `FULL OUTER JOIN` as well?



##########
datafusion/core/src/physical_plan/planner.rs:
##########
@@ -773,6 +775,65 @@ impl DefaultPhysicalPlanner {
                         })
                         .collect::<Result<join_utils::JoinOn>>()?;
 
+                    let join_filter = match filter {

Review Comment:
   The intermediate schema is very much tied to how the Join is implemented, right?  In other words, if we changed the order that the hash Join internally stored its columns I wonder if some / all of this code would need to change (and be hard to find / know to change). Thus I wonder if it might be best put into the join node itself



##########
datafusion/core/src/optimizer/projection_push_down.rs:
##########
@@ -216,6 +217,13 @@ fn optimize_plan(
                 new_required_columns.insert(r.clone());
             }
 
+            match filter {
+                Some(expr) => {
+                    expr_to_columns(expr, &mut new_required_columns)?;
+                }
+                None => (),
+            };

Review Comment:
   I think you can write this also like:
   
   ```suggestion
              if let Some(expr) = filter {
                       expr_to_columns(expr, &mut new_required_columns)?;
              }
   ```



##########
datafusion/core/src/dataframe.rs:
##########
@@ -320,12 +320,14 @@ impl DataFrame {
         join_type: JoinType,
         left_cols: &[&str],
         right_cols: &[&str],
+        filter: Option<Expr>,

Review Comment:
   I recommend we update the docstring description here to mention that `filter` is applied during the join for outer joins



##########
datafusion/expr/src/logical_plan/builder.rs:
##########
@@ -454,8 +454,9 @@ impl LogicalPlanBuilder {
         right: &LogicalPlan,
         join_type: JoinType,
         join_keys: (Vec<impl Into<Column>>, Vec<impl Into<Column>>),
+        filter: Option<Expr>,

Review Comment:
   As with the `DataFrame` interface, it might be nice to clarify in the doc comments that equality predicates are specified using `join_keys` and other arbitrary predicates are specified in `filter`



##########
datafusion/core/tests/sql/joins.rs:
##########
@@ -149,14 +149,22 @@ async fn equijoin_right_and_condition_from_left() -> Result<()> {
 }
 
 #[tokio::test]
-async fn equijoin_and_unsupported_condition() -> Result<()> {
+async fn equijoin_and_condition() -> Result<()> {
     let ctx = create_join_context("t1_id", "t2_id")?;
     let sql =
-        "SELECT t1_id, t1_name, t2_name FROM t1 LEFT JOIN t2 ON t1_id = t2_id AND t1_id >= '44' ORDER BY t1_id";
-    let res = ctx.create_logical_plan(sql);
-
-    assert!(res.is_err());
-    assert_eq!(format!("{}", res.unwrap_err()), "This feature is not implemented: Unsupported expressions in Left JOIN: [#t1_id >= Utf8(\"44\")]");
+        "SELECT t1_id, t1_name, t2_name FROM t1 LEFT JOIN t2 ON t1_id = t2_id AND t1_id >= 44 ORDER BY t1_id";
+    let expected = vec![
+        "+-------+---------+---------+",

Review Comment:
   This answer looks good to me



##########
datafusion/core/tests/sql/joins.rs:
##########
@@ -149,14 +149,22 @@ async fn equijoin_right_and_condition_from_left() -> Result<()> {
 }
 
 #[tokio::test]
-async fn equijoin_and_unsupported_condition() -> Result<()> {
+async fn equijoin_and_condition() -> Result<()> {

Review Comment:
   ```suggestion
   async fn left_equijoin_and_condition() -> Result<()> {
   ```



##########
datafusion/core/tests/sql/joins.rs:
##########
@@ -149,14 +149,22 @@ async fn equijoin_right_and_condition_from_left() -> Result<()> {
 }
 
 #[tokio::test]
-async fn equijoin_and_unsupported_condition() -> Result<()> {
+async fn equijoin_and_condition() -> Result<()> {
     let ctx = create_join_context("t1_id", "t2_id")?;
     let sql =
-        "SELECT t1_id, t1_name, t2_name FROM t1 LEFT JOIN t2 ON t1_id = t2_id AND t1_id >= '44' ORDER BY t1_id";
-    let res = ctx.create_logical_plan(sql);
-
-    assert!(res.is_err());
-    assert_eq!(format!("{}", res.unwrap_err()), "This feature is not implemented: Unsupported expressions in Left JOIN: [#t1_id >= Utf8(\"44\")]");
+        "SELECT t1_id, t1_name, t2_name FROM t1 LEFT JOIN t2 ON t1_id = t2_id AND t1_id >= 44 ORDER BY t1_id";

Review Comment:
   ❤️ 



##########
datafusion/sql/src/planner.rs:
##########
@@ -572,95 +572,116 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                 // extract join keys
                 extract_join_keys(expr, &mut keys, &mut filter);
 
-                let mut cols = HashSet::new();
-                exprlist_to_columns(&filter, &mut cols)?;
-
                 let (left_keys, right_keys): (Vec<Column>, Vec<Column>) =
                     keys.into_iter().unzip();
 
-                // return the logical plan representing the join
+                let normalized_filters = filter
+                    .into_iter()
+                    .map(|expr| {
+                        let mut using_columns = HashSet::new();
+                        expr_to_columns(&expr, &mut using_columns)?;
+
+                        normalize_col_with_schemas(
+                            expr,
+                            &[left.schema(), right.schema()],
+                            &[using_columns],
+                        )
+                    })
+                    .collect::<Result<Vec<_>>>()?;
+
                 if left_keys.is_empty() {
                     // When we don't have join keys, use cross join
                     let join = LogicalPlanBuilder::from(left).cross_join(&right)?;
 
-                    join.filter(filter.into_iter().reduce(Expr::and).unwrap())?
+                    if normalized_filters.is_empty() {
+                        join.build()
+                    } else {
+                        join.filter(
+                            normalized_filters.into_iter().reduce(Expr::and).unwrap(),
+                        )?
                         .build()
-                } else if filter.is_empty() {
+                    }
+                } else if join_type == JoinType::Inner && !normalized_filters.is_empty() {
                     let join = LogicalPlanBuilder::from(left).join(
                         &right,
                         join_type,
                         (left_keys, right_keys),
+                        None,
                     )?;
-                    join.build()
-                } else if join_type == JoinType::Inner {
+                    join.filter(
+                        normalized_filters.into_iter().reduce(Expr::and).unwrap(),
+                    )?
+                    .build()
+                } else if join_type == JoinType::Left {
+                    // Inner filters - predicates based only on right input columns

Review Comment:
   I didn't see tests for this logic -- namely that pushes push "single table predicates" to the appropriate inputs. Are there existing tests for this?
   
   I actually suggest that that you change the sql planner to simply put all predicates into the `filter` clause and then implement the optimization to push single table predicates down to the inputs with the other filter pushdown logic in datafusion/core/src/optimizer/filter_push_down.rs which would follow the pattern in the rest of the code more closely.
   
   This could also be done as a follow on PR



##########
datafusion/expr/src/logical_plan/plan.rs:
##########
@@ -1181,6 +1193,8 @@ pub struct Join {
     pub right: Arc<LogicalPlan>,
     /// Equijoin clause expressed as pairs of (left, right) join columns
     pub on: Vec<(Column, Column)>,
+    /// Filters applied before join output (non-equi conditions)

Review Comment:
   ```suggestion
       /// Filters applied during join (non-equi conditions)
   ```
   
   I always thought of these filtering happening "during" the join when matching rows are being found. They aren't really applied on the input (otherwise they could be represented as `Filter`s in the actual plan)



##########
datafusion/expr/src/logical_plan/plan.rs:
##########
@@ -743,22 +743,34 @@ impl LogicalPlan {
                     }
                     LogicalPlan::Join(Join {
                         on: ref keys,
+                        filter,
                         join_constraint,
                         join_type,
                         ..
                     }) => {
                         let join_expr: Vec<String> =
                             keys.iter().map(|(l, r)| format!("{} = {}", l, r)).collect();
+                        let filter_expr = filter.as_ref().map_or_else(
+                            || "".to_string(),
+                            |expr| format!(" Filter: {}", expr),
+                        );

Review Comment:
   Another way to write this might be:
   
   ```suggestion
                           let filter_expr = filter.as_ref()
                               .map(|expr| format!(" Filter: {}", expr))
                               .unwrap_or_else(|| "".to_string());
   ```
   
   Note sure if that is any more or less readable 🤔 



##########
datafusion/sql/src/planner.rs:
##########
@@ -572,95 +572,116 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                 // extract join keys
                 extract_join_keys(expr, &mut keys, &mut filter);
 
-                let mut cols = HashSet::new();
-                exprlist_to_columns(&filter, &mut cols)?;
-
                 let (left_keys, right_keys): (Vec<Column>, Vec<Column>) =
                     keys.into_iter().unzip();
 
-                // return the logical plan representing the join
+                let normalized_filters = filter
+                    .into_iter()
+                    .map(|expr| {
+                        let mut using_columns = HashSet::new();
+                        expr_to_columns(&expr, &mut using_columns)?;
+
+                        normalize_col_with_schemas(
+                            expr,
+                            &[left.schema(), right.schema()],
+                            &[using_columns],
+                        )
+                    })
+                    .collect::<Result<Vec<_>>>()?;
+
                 if left_keys.is_empty() {
                     // When we don't have join keys, use cross join
                     let join = LogicalPlanBuilder::from(left).cross_join(&right)?;
 
-                    join.filter(filter.into_iter().reduce(Expr::and).unwrap())?
+                    if normalized_filters.is_empty() {
+                        join.build()
+                    } else {
+                        join.filter(
+                            normalized_filters.into_iter().reduce(Expr::and).unwrap(),
+                        )?
                         .build()
-                } else if filter.is_empty() {
+                    }

Review Comment:
   You might be able to do something like 
   
   ```rust
   let join = if let Some(filter) = normalized_filters.into_iter().reduce(Expr::and) {
     join.filter(filter)
   };
   
   join.build()
   ```



##########
datafusion/core/src/physical_plan/hash_join.rs:
##########
@@ -791,6 +826,109 @@ fn build_join_indexes(
     }
 }
 
+fn apply_join_filter(
+    left: &RecordBatch,
+    right: &RecordBatch,
+    join_type: JoinType,
+    left_indices: UInt64Array,
+    right_indices: UInt32Array,
+    filter: &JoinFilter,
+) -> Result<(UInt64Array, UInt32Array)> {
+    if left_indices.is_empty() {
+        return Ok((left_indices, right_indices));
+    };
+
+    let (intermediate_batch, _) = build_batch_from_indices(
+        filter.schema(),
+        left,
+        right,
+        PrimitiveArray::from(left_indices.data().clone()),
+        PrimitiveArray::from(right_indices.data().clone()),
+        filter.column_indices(),
+    )?;
+
+    match join_type {
+        JoinType::Inner | JoinType::Left => {
+            // For both INNER and LEFT joins, input arrays contains only indices for matched data.
+            // Due to this fact it's correct to simply apply filter to intermediate batch and return
+            // indices for left/right rows satisfying filter predicate
+            let filter_result = filter
+                .expression()
+                .evaluate(&intermediate_batch)?
+                .into_array(intermediate_batch.num_rows());
+            let mask = as_boolean_array(&filter_result);
+
+            let left_filtered = PrimitiveArray::<UInt64Type>::from(
+                compute::filter(&left_indices, mask)?.data().clone(),

Review Comment:
   I agree with @korowa  that moving filters out of the `On` clause is better left to the planner and optimizer.
   
   This is an important point though -- perhaps we could add a comment to the docstring of the HashJoin operator explaining that `filter` should ideally only contain expressions that can not be pushed to the inputs of the join, one way or the other



##########
datafusion/core/src/physical_plan/hash_join.rs:
##########
@@ -791,6 +826,109 @@ fn build_join_indexes(
     }
 }
 
+fn apply_join_filter(
+    left: &RecordBatch,
+    right: &RecordBatch,
+    join_type: JoinType,
+    left_indices: UInt64Array,
+    right_indices: UInt32Array,
+    filter: &JoinFilter,
+) -> Result<(UInt64Array, UInt32Array)> {
+    if left_indices.is_empty() {
+        return Ok((left_indices, right_indices));
+    };
+
+    let (intermediate_batch, _) = build_batch_from_indices(
+        filter.schema(),
+        left,
+        right,
+        PrimitiveArray::from(left_indices.data().clone()),
+        PrimitiveArray::from(right_indices.data().clone()),
+        filter.column_indices(),
+    )?;
+
+    match join_type {
+        JoinType::Inner | JoinType::Left => {
+            // For both INNER and LEFT joins, input arrays contains only indices for matched data.
+            // Due to this fact it's correct to simply apply filter to intermediate batch and return
+            // indices for left/right rows satisfying filter predicate
+            let filter_result = filter
+                .expression()
+                .evaluate(&intermediate_batch)?
+                .into_array(intermediate_batch.num_rows());
+            let mask = as_boolean_array(&filter_result);
+
+            let left_filtered = PrimitiveArray::<UInt64Type>::from(
+                compute::filter(&left_indices, mask)?.data().clone(),
+            );
+            let right_filtered = PrimitiveArray::<UInt32Type>::from(
+                compute::filter(&right_indices, mask)?.data().clone(),
+            );
+
+            Ok((left_filtered, right_filtered))
+        }
+        JoinType::Right | JoinType::Full => {
+            // In case of RIGHT and FULL join, left_indices could contain null values - these rows,
+            // where no match has been found, should retain in result arrays (thus join condition is satified)
+            //
+            // So, filter should be applied only to matched rows, and in case right (outer, batch) index

Review Comment:
   This logic I think should also apply left joins. 
   
   As for FULL joins, I think it means if there is no match for either left or right inputs, it should a row should still be produced.
   
   I think the best way to resolve this comment is to add some tests showing correct answers for `RIGHT` and `FULL` joins in the `sql_integration` suite



##########
datafusion/core/src/physical_plan/hash_join.rs:
##########
@@ -110,6 +113,8 @@ pub struct HashJoinExec {
     right: Arc<dyn ExecutionPlan>,
     /// Set of common columns used to join on
     on: Vec<(Column, Column)>,
+    /// Filters which applied before records output

Review Comment:
   ```suggestion
       /// Filters which are applied while finding matching rows
   ```



##########
datafusion/core/src/physical_plan/hash_join.rs:
##########
@@ -791,6 +826,109 @@ fn build_join_indexes(
     }
 }
 
+fn apply_join_filter(
+    left: &RecordBatch,
+    right: &RecordBatch,
+    join_type: JoinType,
+    left_indices: UInt64Array,
+    right_indices: UInt32Array,
+    filter: &JoinFilter,
+) -> Result<(UInt64Array, UInt32Array)> {
+    if left_indices.is_empty() {
+        return Ok((left_indices, right_indices));
+    };
+
+    let (intermediate_batch, _) = build_batch_from_indices(
+        filter.schema(),
+        left,
+        right,
+        PrimitiveArray::from(left_indices.data().clone()),
+        PrimitiveArray::from(right_indices.data().clone()),
+        filter.column_indices(),
+    )?;
+
+    match join_type {
+        JoinType::Inner | JoinType::Left => {
+            // For both INNER and LEFT joins, input arrays contains only indices for matched data.

Review Comment:
   Something bothers me about this code treating `Left` and `Right` joins differently -- I would expect they look like mirrors of each other and `Full` to be treated differently



##########
dev/build-arrow-ballista.sh:
##########
@@ -24,7 +24,7 @@ rm -rf arrow-ballista 2>/dev/null
 
 # clone the repo
 # TODO make repo/branch configurable
-git clone https://github.com/apache/arrow-ballista
+git clone --branch join_filter https://github.com/korowa/arrow-ballista

Review Comment:
   I think this change probably needs to be reverted prior to merge



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org