You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/12/29 12:47:40 UTC

[arrow-datafusion] branch master updated: Add alias check for equijoin in from_plan (#4755)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 310e8713c Add alias check for equijoin in from_plan (#4755)
310e8713c is described below

commit 310e8713cf4600b331ea9609a95493226f8657d9
Author: ygf11 <ya...@gmail.com>
AuthorDate: Thu Dec 29 20:47:35 2022 +0800

    Add alias check for equijoin in from_plan (#4755)
    
    * Add alias check for equijoin in from_plan
    
    * fix cargo fmt
    
    * fix comment
    
    * fix cargo fmt
---
 datafusion/core/tests/sql/joins.rs                 | 48 +++++++++++-----------
 datafusion/expr/src/utils.rs                       |  9 ++--
 .../src/simplify_expressions/simplify_exprs.rs     | 29 ++++++++++++-
 3 files changed, 57 insertions(+), 29 deletions(-)

diff --git a/datafusion/core/tests/sql/joins.rs b/datafusion/core/tests/sql/joins.rs
index b6c78b0cf..5c4171484 100644
--- a/datafusion/core/tests/sql/joins.rs
+++ b/datafusion/core/tests/sql/joins.rs
@@ -2452,29 +2452,29 @@ async fn both_side_expr_key_inner_join() -> Result<()> {
                 "ProjectionExec: expr=[t1_id@0 as t1_id, t2_id@2 as t2_id, t1_name@1 as t1_name]",
                 "  ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t2_id@3 as t2_id]",
                 "    CoalesceBatchesExec: target_batch_size=4096",
-                "      HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"t1.t1_id + Int64(12)\", index: 2 }, Column { name: \"t2.t2_id + Int64(1)\", index: 1 })]",
+                "      HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"t1.t1_id + UInt32(12)\", index: 2 }, Column { name: \"t2.t2_id + UInt32(1)\", index: 1 })]",
                 "        CoalesceBatchesExec: target_batch_size=4096",
-                "          RepartitionExec: partitioning=Hash([Column { name: \"t1.t1_id + Int64(12)\", index: 2 }], 2)",
-                "            ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_id@0 + CAST(12 AS UInt32) as t1.t1_id + Int64(12)]",
+                "          RepartitionExec: partitioning=Hash([Column { name: \"t1.t1_id + UInt32(12)\", index: 2 }], 2)",
+                "            ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_id@0 + 12 as t1.t1_id + UInt32(12)]",
                 "              RepartitionExec: partitioning=RoundRobinBatch(2)",
                 "                MemoryExec: partitions=1, partition_sizes=[1]",
                 "        CoalesceBatchesExec: target_batch_size=4096",
-                "          RepartitionExec: partitioning=Hash([Column { name: \"t2.t2_id + Int64(1)\", index: 1 }], 2)",
-                "            ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 + CAST(1 AS UInt32) as t2.t2_id + Int64(1)]",
+                "          RepartitionExec: partitioning=Hash([Column { name: \"t2.t2_id + UInt32(1)\", index: 1 }], 2)",
+                "            ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 + 1 as t2.t2_id + UInt32(1)]",
                 "              RepartitionExec: partitioning=RoundRobinBatch(2)",
                 "                MemoryExec: partitions=1, partition_sizes=[1]",
-           ]
+            ]
         } else {
             vec![
                 "ProjectionExec: expr=[t1_id@0 as t1_id, t2_id@2 as t2_id, t1_name@1 as t1_name]",
                 "  ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t2_id@3 as t2_id]",
                 "    CoalesceBatchesExec: target_batch_size=4096",
-                "      HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(Column { name: \"t1.t1_id + Int64(12)\", index: 2 }, Column { name: \"t2.t2_id + Int64(1)\", index: 1 })]",
+                "      HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(Column { name: \"t1.t1_id + UInt32(12)\", index: 2 }, Column { name: \"t2.t2_id + UInt32(1)\", index: 1 })]",
                 "        CoalescePartitionsExec",
-                "          ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_id@0 + CAST(12 AS UInt32) as t1.t1_id + Int64(12)]",
+                "          ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_id@0 + 12 as t1.t1_id + UInt32(12)]",
                 "            RepartitionExec: partitioning=RoundRobinBatch(2)",
                 "              MemoryExec: partitions=1, partition_sizes=[1]",
-                "        ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 + CAST(1 AS UInt32) as t2.t2_id + Int64(1)]",
+                "        ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 + 1 as t2.t2_id + UInt32(1)]",
                 "          RepartitionExec: partitioning=RoundRobinBatch(2)",
                 "            MemoryExec: partitions=1, partition_sizes=[1]",
             ]
@@ -2524,10 +2524,10 @@ async fn left_side_expr_key_inner_join() -> Result<()> {
                 "ProjectionExec: expr=[t1_id@0 as t1_id, t2_id@2 as t2_id, t1_name@1 as t1_name]",
                 "  ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t2_id@3 as t2_id]",
                 "    CoalesceBatchesExec: target_batch_size=4096",
-                "      HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"t1.t1_id + Int64(11)\", index: 2 }, Column { name: \"t2_id\", index: 0 })]",
+                "      HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"t1.t1_id + UInt32(11)\", index: 2 }, Column { name: \"t2_id\", index: 0 })]",
                 "        CoalesceBatchesExec: target_batch_size=4096",
-                "          RepartitionExec: partitioning=Hash([Column { name: \"t1.t1_id + Int64(11)\", index: 2 }], 2)",
-                "            ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_id@0 + CAST(11 AS UInt32) as t1.t1_id + Int64(11)]",
+                "          RepartitionExec: partitioning=Hash([Column { name: \"t1.t1_id + UInt32(11)\", index: 2 }], 2)",
+                "            ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_id@0 + 11 as t1.t1_id + UInt32(11)]",
                 "              RepartitionExec: partitioning=RoundRobinBatch(2)",
                 "                MemoryExec: partitions=1, partition_sizes=[1]",
                 "        CoalesceBatchesExec: target_batch_size=4096",
@@ -2541,9 +2541,9 @@ async fn left_side_expr_key_inner_join() -> Result<()> {
                 "  ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t2_id@3 as t2_id]",
                 "    RepartitionExec: partitioning=RoundRobinBatch(2)",
                 "      CoalesceBatchesExec: target_batch_size=4096",
-                "        HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(Column { name: \"t1.t1_id + Int64(11)\", index: 2 }, Column { name: \"t2_id\", index: 0 })]",
+                "        HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(Column { name: \"t1.t1_id + UInt32(11)\", index: 2 }, Column { name: \"t2_id\", index: 0 })]",
                 "          CoalescePartitionsExec",
-                "            ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_id@0 + CAST(11 AS UInt32) as t1.t1_id + Int64(11)]",
+                "            ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_id@0 + 11 as t1.t1_id + UInt32(11)]",
                 "              RepartitionExec: partitioning=RoundRobinBatch(2)",
                 "                MemoryExec: partitions=1, partition_sizes=[1]",
                 "          MemoryExec: partitions=1, partition_sizes=[1]",
@@ -2594,14 +2594,14 @@ async fn right_side_expr_key_inner_join() -> Result<()> {
                 "ProjectionExec: expr=[t1_id@0 as t1_id, t2_id@2 as t2_id, t1_name@1 as t1_name]",
                 "  ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t2_id@2 as t2_id]",
                 "    CoalesceBatchesExec: target_batch_size=4096",
-                "      HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"t1_id\", index: 0 }, Column { name: \"t2.t2_id - Int64(11)\", index: 1 })]",
+                "      HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"t1_id\", index: 0 }, Column { name: \"t2.t2_id - UInt32(11)\", index: 1 })]",
                 "        CoalesceBatchesExec: target_batch_size=4096",
                 "          RepartitionExec: partitioning=Hash([Column { name: \"t1_id\", index: 0 }], 2)",
                 "            RepartitionExec: partitioning=RoundRobinBatch(2)",
                 "              MemoryExec: partitions=1, partition_sizes=[1]",
                 "        CoalesceBatchesExec: target_batch_size=4096",
-                "          RepartitionExec: partitioning=Hash([Column { name: \"t2.t2_id - Int64(11)\", index: 1 }], 2)",
-                "            ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 - CAST(11 AS UInt32) as t2.t2_id - Int64(11)]",
+                "          RepartitionExec: partitioning=Hash([Column { name: \"t2.t2_id - UInt32(11)\", index: 1 }], 2)",
+                "            ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 - 11 as t2.t2_id - UInt32(11)]",
                 "              RepartitionExec: partitioning=RoundRobinBatch(2)",
                 "                MemoryExec: partitions=1, partition_sizes=[1]",
            ]
@@ -2610,9 +2610,9 @@ async fn right_side_expr_key_inner_join() -> Result<()> {
                 "ProjectionExec: expr=[t1_id@0 as t1_id, t2_id@2 as t2_id, t1_name@1 as t1_name]",
                 "  ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t2_id@2 as t2_id]",
                 "    CoalesceBatchesExec: target_batch_size=4096",
-                "      HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(Column { name: \"t1_id\", index: 0 }, Column { name: \"t2.t2_id - Int64(11)\", index: 1 })]",
+                "      HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(Column { name: \"t1_id\", index: 0 }, Column { name: \"t2.t2_id - UInt32(11)\", index: 1 })]",
                 "        MemoryExec: partitions=1, partition_sizes=[1]",
-                "        ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 - CAST(11 AS UInt32) as t2.t2_id - Int64(11)]",
+                "        ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 - 11 as t2.t2_id - UInt32(11)]",
                 "          RepartitionExec: partitioning=RoundRobinBatch(2)",
                 "            MemoryExec: partitions=1, partition_sizes=[1]",
             ]
@@ -2662,14 +2662,14 @@ async fn select_wildcard_with_expr_key_inner_join() -> Result<()> {
                 "ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_int@2 as t1_int, t2_id@3 as t2_id, t2_name@4 as t2_name, t2_int@5 as t2_int]",
                 "  ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_int@2 as t1_int, t2_id@3 as t2_id, t2_name@4 as t2_name, t2_int@5 as t2_int]",
                 "    CoalesceBatchesExec: target_batch_size=4096",
-                "      HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"t1_id\", index: 0 }, Column { name: \"t2.t2_id - Int64(11)\", index: 3 })]",
+                "      HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"t1_id\", index: 0 }, Column { name: \"t2.t2_id - UInt32(11)\", index: 3 })]",
                 "        CoalesceBatchesExec: target_batch_size=4096",
                 "          RepartitionExec: partitioning=Hash([Column { name: \"t1_id\", index: 0 }], 2)",
                 "            RepartitionExec: partitioning=RoundRobinBatch(2)",
                 "              MemoryExec: partitions=1, partition_sizes=[1]",
                 "        CoalesceBatchesExec: target_batch_size=4096",
-                "          RepartitionExec: partitioning=Hash([Column { name: \"t2.t2_id - Int64(11)\", index: 3 }], 2)",
-                "            ProjectionExec: expr=[t2_id@0 as t2_id, t2_name@1 as t2_name, t2_int@2 as t2_int, t2_id@0 - CAST(11 AS UInt32) as t2.t2_id - Int64(11)]",
+                "          RepartitionExec: partitioning=Hash([Column { name: \"t2.t2_id - UInt32(11)\", index: 3 }], 2)",
+                "            ProjectionExec: expr=[t2_id@0 as t2_id, t2_name@1 as t2_name, t2_int@2 as t2_int, t2_id@0 - 11 as t2.t2_id - UInt32(11)]",
                 "              RepartitionExec: partitioning=RoundRobinBatch(2)",
                 "                MemoryExec: partitions=1, partition_sizes=[1]",
            ]
@@ -2678,9 +2678,9 @@ async fn select_wildcard_with_expr_key_inner_join() -> Result<()> {
                 "ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_int@2 as t1_int, t2_id@3 as t2_id, t2_name@4 as t2_name, t2_int@5 as t2_int]",
                 "  ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_int@2 as t1_int, t2_id@3 as t2_id, t2_name@4 as t2_name, t2_int@5 as t2_int]",
                 "    CoalesceBatchesExec: target_batch_size=4096",
-                "      HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(Column { name: \"t1_id\", index: 0 }, Column { name: \"t2.t2_id - Int64(11)\", index: 3 })]",
+                "      HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(Column { name: \"t1_id\", index: 0 }, Column { name: \"t2.t2_id - UInt32(11)\", index: 3 })]",
                 "        MemoryExec: partitions=1, partition_sizes=[1]",
-                "        ProjectionExec: expr=[t2_id@0 as t2_id, t2_name@1 as t2_name, t2_int@2 as t2_int, t2_id@0 - CAST(11 AS UInt32) as t2.t2_id - Int64(11)]",
+                "        ProjectionExec: expr=[t2_id@0 as t2_id, t2_name@1 as t2_name, t2_int@2 as t2_int, t2_id@0 - 11 as t2.t2_id - UInt32(11)]",
                 "          RepartitionExec: partitioning=RoundRobinBatch(2)",
                 "            MemoryExec: partitions=1, partition_sizes=[1]",
             ]
diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs
index a3b240e65..87c7b2597 100644
--- a/datafusion/expr/src/utils.rs
+++ b/datafusion/expr/src/utils.rs
@@ -586,12 +586,13 @@ pub fn from_plan(
             // The preceding part of expr is equi-exprs,
             // and the struct of each equi-expr is like `left-expr = right-expr`.
             let new_on:Vec<(Expr,Expr)> = expr.iter().take(equi_expr_count).map(|equi_expr| {
-                    if let Expr::BinaryExpr(BinaryExpr { left, op, right }) = equi_expr {
-                        assert!(op == &Operator::Eq);
-                        Ok(((**left).clone(), (**right).clone()))
+                    // SimplifyExpression rule may add alias to the equi_expr.
+                    let unalias_expr = equi_expr.clone().unalias();
+                    if let Expr::BinaryExpr(BinaryExpr { left, op:Operator::Eq, right }) = unalias_expr {
+                        Ok((*left, *right))
                     } else {
                         Err(DataFusionError::Internal(format!(
-                            "The front part expressions should be an binary expression, actual:{}",
+                            "The front part expressions should be an binary equiality expression, actual:{}",
                             equi_expr
                         )))
                     }
diff --git a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs
index 6645284e3..e4fe2e137 100644
--- a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs
+++ b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs
@@ -120,6 +120,7 @@ mod tests {
     use crate::simplify_expressions::utils::for_test::{
         cast_to_int64_expr, now_expr, to_timestamp_expr,
     };
+    use crate::test::test_table_scan_with_name;
 
     use super::*;
     use arrow::datatypes::{DataType, Field, Schema};
@@ -131,7 +132,7 @@ mod tests {
     use datafusion_expr::logical_plan::table_scan;
     use datafusion_expr::{
         and, binary_expr, col, lit, logical_plan::builder::LogicalPlanBuilder, Expr,
-        ExprSchemable,
+        ExprSchemable, JoinType,
     };
 
     /// A macro to assert that one string is contained within another with
@@ -789,4 +790,30 @@ mod tests {
 
         assert_optimized_plan_eq(&plan, expected)
     }
+
+    #[test]
+    fn simplify_equijoin_predicate() -> Result<()> {
+        let t1 = test_table_scan_with_name("t1")?;
+        let t2 = test_table_scan_with_name("t2")?;
+
+        let left_key = col("t1.a") + lit(1i64).cast_to(&DataType::UInt32, t1.schema())?;
+        let right_key =
+            col("t2.a") + lit(2i64).cast_to(&DataType::UInt32, t2.schema())?;
+        let plan = LogicalPlanBuilder::from(t1)
+            .join_with_expr_keys(
+                t2,
+                JoinType::Inner,
+                (vec![left_key], vec![right_key]),
+                None,
+            )?
+            .build()?;
+
+        // before simplify: t1.a + CAST(Int64(1), UInt32) = t2.a + CAST(Int64(2), UInt32)
+        // after simplify: t1.a + UInt32(1) = t2.a + UInt32(2) AS t1.a + Int64(1) = t2.a + Int64(2)
+        let expected = "Inner Join: t1.a + UInt32(1) = t2.a + UInt32(2)\
+            \n  TableScan: t1\
+            \n  TableScan: t2";
+
+        assert_optimized_plan_eq(&plan, expected)
+    }
 }