You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ho...@apache.org on 2021/08/02 03:46:47 UTC

[arrow-datafusion] branch master updated: Convert unsupported conditions in left right join to filters (#796)

This is an automated email from the ASF dual-hosted git repository.

houqp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 979dc31  Convert unsupported conditions in left right join to filters (#796)
979dc31 is described below

commit 979dc3139c3e2aa1f2995c018375aea36cdf6274
Author: Daniƫl Heres <da...@gmail.com>
AuthorDate: Mon Aug 2 05:46:43 2021 +0200

    Convert unsupported conditions in left right join to filters (#796)
    
    * Unsupported conditions in join
    
    * Simplify, support right join, add tests
    
    * Add tests
    
    * Add comments
    
    * Clippy
---
 benchmarks/src/bin/tpch.rs    |   5 ++
 datafusion/src/sql/planner.rs | 128 +++++++++++++++++++++++++++++++++++++-----
 datafusion/tests/sql.rs       |  46 ++++++++++++++-
 3 files changed, 164 insertions(+), 15 deletions(-)

diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index 08b8864..42755ec 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -745,6 +745,11 @@ mod tests {
     }
 
     #[tokio::test]
+    async fn run_q13() -> Result<()> {
+        run_query(13).await
+    }
+
+    #[tokio::test]
     async fn run_q14() -> Result<()> {
         run_query(14).await
     }
diff --git a/datafusion/src/sql/planner.rs b/datafusion/src/sql/planner.rs
index 6d9484b..481f12b 100644
--- a/datafusion/src/sql/planner.rs
+++ b/datafusion/src/sql/planner.rs
@@ -31,6 +31,7 @@ use crate::logical_plan::{
     DFSchema, Expr, LogicalPlan, LogicalPlanBuilder, Operator, PlanType, ToDFSchema,
     ToStringifiedPlan,
 };
+use crate::optimizer::utils::exprlist_to_columns;
 use crate::prelude::JoinType;
 use crate::scalar::ScalarValue;
 use crate::{
@@ -325,16 +326,16 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
         let right = self.create_relation(&join.relation, ctes)?;
         match &join.join_operator {
             JoinOperator::LeftOuter(constraint) => {
-                self.parse_join(left, &right, constraint, JoinType::Left)
+                self.parse_join(left, right, constraint, JoinType::Left)
             }
             JoinOperator::RightOuter(constraint) => {
-                self.parse_join(left, &right, constraint, JoinType::Right)
+                self.parse_join(left, right, constraint, JoinType::Right)
             }
             JoinOperator::Inner(constraint) => {
-                self.parse_join(left, &right, constraint, JoinType::Inner)
+                self.parse_join(left, right, constraint, JoinType::Inner)
             }
             JoinOperator::FullOuter(constraint) => {
-                self.parse_join(left, &right, constraint, JoinType::Full)
+                self.parse_join(left, right, constraint, JoinType::Full)
             }
             JoinOperator::CrossJoin => self.parse_cross_join(left, &right),
             other => Err(DataFusionError::NotImplemented(format!(
@@ -354,7 +355,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
     fn parse_join(
         &self,
         left: LogicalPlan,
-        right: &LogicalPlan,
+        right: LogicalPlan,
         constraint: &JoinConstraint,
         join_type: JoinType,
     ) -> Result<LogicalPlan> {
@@ -372,18 +373,26 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                 // extract join keys
                 extract_join_keys(&expr, &mut keys, &mut filter);
 
+                let mut cols = HashSet::new();
+                exprlist_to_columns(&filter, &mut cols)?;
+
                 let (left_keys, right_keys): (Vec<Column>, Vec<Column>) =
                     keys.into_iter().unzip();
-                // return the logical plan representing the join
-                let join = LogicalPlanBuilder::from(left).join(
-                    right,
-                    join_type,
-                    (left_keys, right_keys),
-                )?;
 
+                // return the logical plan representing the join
                 if filter.is_empty() {
+                    let join = LogicalPlanBuilder::from(left).join(
+                        &right,
+                        join_type,
+                        (left_keys, right_keys),
+                    )?;
                     join.build()
                 } else if join_type == JoinType::Inner {
+                    let join = LogicalPlanBuilder::from(left).join(
+                        &right,
+                        join_type,
+                        (left_keys, right_keys),
+                    )?;
                     join.filter(
                         filter
                             .iter()
@@ -391,6 +400,64 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                             .fold(filter[0].clone(), |acc, e| acc.and(e.clone())),
                     )?
                     .build()
+                }
+                // Left join with all non-equijoin expressions from the right
+                // l left join r
+                // on l1=r1 and r2 > [..]
+                else if join_type == JoinType::Left
+                    && cols.iter().all(
+                        |Column {
+                             relation: qualifier,
+                             name,
+                         }| {
+                            right
+                                .schema()
+                                .field_with_name(qualifier.as_deref(), name)
+                                .is_ok()
+                        },
+                    )
+                {
+                    LogicalPlanBuilder::from(left)
+                        .join(
+                            &LogicalPlanBuilder::from(right)
+                                .filter(
+                                    filter
+                                        .iter()
+                                        .skip(1)
+                                        .fold(filter[0].clone(), |acc, e| {
+                                            acc.and(e.clone())
+                                        }),
+                                )?
+                                .build()?,
+                            join_type,
+                            (left_keys, right_keys),
+                        )?
+                        .build()
+                }
+                // Right join with all non-equijoin expressions from the left
+                // l right join r
+                // on l1=r1 and l2 > [..]
+                else if join_type == JoinType::Right
+                    && cols.iter().all(
+                        |Column {
+                             relation: qualifier,
+                             name,
+                         }| {
+                            left.schema()
+                                .field_with_name(qualifier.as_deref(), name)
+                                .is_ok()
+                        },
+                    )
+                {
+                    LogicalPlanBuilder::from(left)
+                        .filter(
+                            filter
+                                .iter()
+                                .skip(1)
+                                .fold(filter[0].clone(), |acc, e| acc.and(e.clone())),
+                        )?
+                        .join(&right, join_type, (left_keys, right_keys))?
+                        .build()
                 } else {
                     Err(DataFusionError::NotImplemented(format!(
                         "Unsupported expressions in {:?} JOIN: {:?}",
@@ -404,7 +471,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                     .map(|x| Column::from_name(x.value.clone()))
                     .collect();
                 LogicalPlanBuilder::from(left)
-                    .join_using(right, join_type, keys)?
+                    .join_using(&right, join_type, keys)?
                     .build()
             }
             JoinConstraint::Natural => {
@@ -1650,9 +1717,16 @@ fn extract_join_keys(
                 extract_join_keys(left, accum, accum_filter);
                 extract_join_keys(right, accum, accum_filter);
             }
-            _other => {
+            _other
+                if matches!(**left, Expr::Column(_))
+                    || matches!(**right, Expr::Column(_)) =>
+            {
                 accum_filter.push(expr.clone());
             }
+            _other => {
+                extract_join_keys(left, accum, accum_filter);
+                extract_join_keys(right, accum, accum_filter);
+            }
         },
         _other => {
             accum_filter.push(expr.clone());
@@ -2812,6 +2886,34 @@ mod tests {
     }
 
     #[test]
+    fn left_equijoin_unsupported_expression() {
+        let sql = "SELECT id, order_id \
+            FROM person \
+            LEFT JOIN orders \
+            ON id = customer_id AND order_id > 1";
+        let expected = "Projection: #person.id, #orders.order_id\
+        \n  Join: #person.id = #orders.customer_id\
+        \n    TableScan: person projection=None\
+        \n    Filter: #orders.order_id Gt Int64(1)\
+        \n      TableScan: orders projection=None";
+        quick_test(sql, expected);
+    }
+
+    #[test]
+    fn right_equijoin_unsupported_expression() {
+        let sql = "SELECT id, order_id \
+            FROM person \
+            RIGHT JOIN orders \
+            ON id = customer_id AND id > 1";
+        let expected = "Projection: #person.id, #orders.order_id\
+        \n  Join: #person.id = #orders.customer_id\
+        \n    Filter: #person.id Gt Int64(1)\
+        \n      TableScan: person projection=None\
+        \n    TableScan: orders projection=None";
+        quick_test(sql, expected);
+    }
+
+    #[test]
     fn join_with_table_name() {
         let sql = "SELECT id, order_id \
             FROM person \
diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs
index bfe2f2f..42a7d20 100644
--- a/datafusion/tests/sql.rs
+++ b/datafusion/tests/sql.rs
@@ -1766,13 +1766,55 @@ async fn equijoin_and_other_condition() -> Result<()> {
 }
 
 #[tokio::test]
+async fn equijoin_left_and_condition_from_right() -> Result<()> {
+    let mut ctx = create_join_context("t1_id", "t2_id")?;
+    let sql =
+        "SELECT t1_id, t1_name, t2_name FROM t1 LEFT JOIN t2 ON t1_id = t2_id AND t2_name >= 'y' ORDER BY t1_id";
+    let res = ctx.create_logical_plan(sql);
+    assert!(res.is_ok());
+    let actual = execute(&mut ctx, sql).await;
+
+    let expected = vec![
+        vec!["11", "a", "z"],
+        vec!["22", "b", "y"],
+        vec!["33", "c", "NULL"],
+        vec!["44", "d", "NULL"],
+    ];
+    assert_eq!(expected, actual);
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn equijoin_right_and_condition_from_left() -> Result<()> {
+    let mut ctx = create_join_context("t1_id", "t2_id")?;
+    let sql =
+        "SELECT t1_id, t1_name, t2_name FROM t1 RIGHT JOIN t2 ON t1_id = t2_id AND t1_id >= 22 ORDER BY t2_name";
+    let res = ctx.create_logical_plan(sql);
+    assert!(res.is_ok());
+    let actual = execute(&mut ctx, sql).await;
+
+    let expected = vec![
+        vec!["NULL", "NULL", "w"],
+        vec!["44", "d", "x"],
+        vec!["22", "b", "y"],
+        vec!["NULL", "NULL", "z"],
+    ];
+    assert_eq!(expected, actual);
+
+    Ok(())
+}
+
+#[tokio::test]
 async fn equijoin_and_unsupported_condition() -> Result<()> {
     let ctx = create_join_context("t1_id", "t2_id")?;
     let sql =
-        "SELECT t1_id, t1_name, t2_name FROM t1 LEFT JOIN t2 ON t1_id = t2_id AND t2_name >= 'y' ORDER BY t1_id";
+        "SELECT t1_id, t1_name, t2_name FROM t1 LEFT JOIN t2 ON t1_id = t2_id AND t1_id >= '44' ORDER BY t1_id";
     let res = ctx.create_logical_plan(sql);
+
     assert!(res.is_err());
-    assert_eq!(format!("{}", res.unwrap_err()), "This feature is not implemented: Unsupported expressions in Left JOIN: [#t2_name GtEq Utf8(\"y\")]");
+    assert_eq!(format!("{}", res.unwrap_err()), "This feature is not implemented: Unsupported expressions in Left JOIN: [#t1_id GtEq Utf8(\"44\")]");
+
     Ok(())
 }