You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/07/07 11:53:44 UTC

[arrow-datafusion] branch master updated: Allow non-equijoin filters in join condition (#660)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 0368f59  Allow non-equijoin filters in join condition (#660)
0368f59 is described below

commit 0368f59016b943448124f72d1f70b4108c45860e
Author: Daniƫl Heres <da...@gmail.com>
AuthorDate: Wed Jul 7 13:53:29 2021 +0200

    Allow non-equijoin filters in join condition (#660)
    
    * Allow non-equijoin filters in join condition
    
    * Revert change to query
    
    * Fix, only do for inner join
    
    * Add test
    
    * docs update
    
    * Update test name
    
    Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
    
    * Add negative test
    
    Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
---
 datafusion/src/sql/planner.rs | 83 ++++++++++++++++++++++++++++++-------------
 datafusion/tests/sql.rs       | 22 ++++++++++++
 2 files changed, 81 insertions(+), 24 deletions(-)

diff --git a/datafusion/src/sql/planner.rs b/datafusion/src/sql/planner.rs
index 213ae89..e34f0e6 100644
--- a/datafusion/src/sql/planner.rs
+++ b/datafusion/src/sql/planner.rs
@@ -368,15 +368,34 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                 // parse ON expression
                 let expr = self.sql_to_rex(sql_expr, &join_schema)?;
 
+                // expression that didn't match equi-join pattern
+                let mut filter = vec![];
+
                 // extract join keys
-                extract_join_keys(&expr, &mut keys)?;
+                extract_join_keys(&expr, &mut keys, &mut filter);
 
                 let (left_keys, right_keys): (Vec<Column>, Vec<Column>) =
                     keys.into_iter().unzip();
                 // return the logical plan representing the join
-                LogicalPlanBuilder::from(left)
-                    .join(right, join_type, left_keys, right_keys)?
+                let join = LogicalPlanBuilder::from(left)
+                    .join(right, join_type, left_keys, right_keys)?;
+
+                if filter.is_empty() {
+                    join.build()
+                } else if join_type == JoinType::Inner {
+                    join.filter(
+                        filter
+                            .iter()
+                            .skip(1)
+                            .fold(filter[0].clone(), |acc, e| acc.and(e.clone())),
+                    )?
                     .build()
+                } else {
+                    Err(DataFusionError::NotImplemented(format!(
+                        "Unsupported expressions in {:?} JOIN: {:?}",
+                        join_type, filter
+                    )))
+                }
             }
             JoinConstraint::Using(idents) => {
                 let keys: Vec<Column> = idents
@@ -1550,39 +1569,41 @@ fn remove_join_expressions(
     }
 }
 
-/// Parse equijoin ON condition which could be a single Eq or multiple conjunctive Eqs
+/// Extracts equijoin ON condition be a single Eq or multiple conjunctive Eqs
+/// Filters matching this pattern are added to `accum`
+/// Filters that don't match this pattern are added to `accum_filter`
+/// Examples:
 ///
-/// Examples
+/// foo = bar => accum=[(foo, bar)] accum_filter=[]
+/// foo = bar AND bar = baz => accum=[(foo, bar), (bar, baz)] accum_filter=[]
+/// foo = bar AND baz > 1 => accum=[(foo, bar)] accum_filter=[baz > 1]
 ///
-/// foo = bar
-/// foo = bar AND bar = baz AND ...
-///
-fn extract_join_keys(expr: &Expr, accum: &mut Vec<(Column, Column)>) -> Result<()> {
+fn extract_join_keys(
+    expr: &Expr,
+    accum: &mut Vec<(Column, Column)>,
+    accum_filter: &mut Vec<Expr>,
+) {
     match expr {
         Expr::BinaryExpr { left, op, right } => match op {
             Operator::Eq => match (left.as_ref(), right.as_ref()) {
                 (Expr::Column(l), Expr::Column(r)) => {
                     accum.push((l.clone(), r.clone()));
-                    Ok(())
                 }
-                other => Err(DataFusionError::SQL(ParserError(format!(
-                    "Unsupported expression '{:?}' in JOIN condition",
-                    other
-                )))),
+                _other => {
+                    accum_filter.push(expr.clone());
+                }
             },
             Operator::And => {
-                extract_join_keys(left, accum)?;
-                extract_join_keys(right, accum)
+                extract_join_keys(left, accum, accum_filter);
+                extract_join_keys(right, accum, accum_filter);
+            }
+            _other => {
+                accum_filter.push(expr.clone());
             }
-            other => Err(DataFusionError::SQL(ParserError(format!(
-                "Unsupported expression '{:?}' in JOIN condition",
-                other
-            )))),
         },
-        other => Err(DataFusionError::SQL(ParserError(format!(
-            "Unsupported expression '{:?}' in JOIN condition",
-            other
-        )))),
+        _other => {
+            accum_filter.push(expr.clone());
+        }
     }
 }
 
@@ -2703,6 +2724,20 @@ mod tests {
     }
 
     #[test]
+    fn equijoin_unsupported_expression() {
+        let sql = "SELECT id, order_id \
+            FROM person \
+            JOIN orders \
+            ON id = customer_id AND order_id > 1 ";
+        let expected = "Projection: #person.id, #orders.order_id\
+        \n  Filter: #orders.order_id Gt Int64(1)\
+        \n    Join: #person.id = #orders.customer_id\
+        \n      TableScan: person projection=None\
+        \n      TableScan: orders projection=None";
+        quick_test(sql, expected);
+    }
+
+    #[test]
     fn join_with_table_name() {
         let sql = "SELECT id, order_id \
             FROM person \
diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs
index bd73cb1..f6f8b6f 100644
--- a/datafusion/tests/sql.rs
+++ b/datafusion/tests/sql.rs
@@ -1688,6 +1688,28 @@ async fn equijoin() -> Result<()> {
 }
 
 #[tokio::test]
+async fn equijoin_and_other_condition() -> Result<()> {
+    let mut ctx = create_join_context("t1_id", "t2_id")?;
+    let sql =
+        "SELECT t1_id, t1_name, t2_name FROM t1 JOIN t2 ON t1_id = t2_id AND t2_name >= 'y' ORDER BY t1_id";
+    let actual = execute(&mut ctx, sql).await;
+    let expected = vec![vec!["11", "a", "z"], vec!["22", "b", "y"]];
+    assert_eq!(expected, actual);
+    Ok(())
+}
+
+#[tokio::test]
+async fn equijoin_and_unsupported_condition() -> Result<()> {
+    let ctx = create_join_context("t1_id", "t2_id")?;
+    let sql =
+        "SELECT t1_id, t1_name, t2_name FROM t1 LEFT JOIN t2 ON t1_id = t2_id AND t2_name >= 'y' ORDER BY t1_id";
+    let res = ctx.create_logical_plan(sql);
+    assert!(res.is_err());
+    assert_eq!(format!("{}", res.unwrap_err()), "This feature is not implemented: Unsupported expressions in Left JOIN: [#t2.t2_name GtEq Utf8(\"y\")]");
+    Ok(())
+}
+
+#[tokio::test]
 async fn left_join() -> Result<()> {
     let mut ctx = create_join_context("t1_id", "t2_id")?;
     let sql = "SELECT t1_id, t1_name, t2_name FROM t1 LEFT JOIN t2 ON t1_id = t2_id ORDER BY t1_id";