You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/12/29 12:47:40 UTC
[arrow-datafusion] branch master updated: Add alias check for equijoin in from_plan (#4755)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 310e8713c Add alias check for equijoin in from_plan (#4755)
310e8713c is described below
commit 310e8713cf4600b331ea9609a95493226f8657d9
Author: ygf11 <ya...@gmail.com>
AuthorDate: Thu Dec 29 20:47:35 2022 +0800
Add alias check for equijoin in from_plan (#4755)
* Add alias check for equijoin in from_plan
* fix cargo fmt
* fix comment
* fix cargo fmt
---
datafusion/core/tests/sql/joins.rs | 48 +++++++++++-----------
datafusion/expr/src/utils.rs | 9 ++--
.../src/simplify_expressions/simplify_exprs.rs | 29 ++++++++++++-
3 files changed, 57 insertions(+), 29 deletions(-)
diff --git a/datafusion/core/tests/sql/joins.rs b/datafusion/core/tests/sql/joins.rs
index b6c78b0cf..5c4171484 100644
--- a/datafusion/core/tests/sql/joins.rs
+++ b/datafusion/core/tests/sql/joins.rs
@@ -2452,29 +2452,29 @@ async fn both_side_expr_key_inner_join() -> Result<()> {
"ProjectionExec: expr=[t1_id@0 as t1_id, t2_id@2 as t2_id, t1_name@1 as t1_name]",
" ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t2_id@3 as t2_id]",
" CoalesceBatchesExec: target_batch_size=4096",
- " HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"t1.t1_id + Int64(12)\", index: 2 }, Column { name: \"t2.t2_id + Int64(1)\", index: 1 })]",
+ " HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"t1.t1_id + UInt32(12)\", index: 2 }, Column { name: \"t2.t2_id + UInt32(1)\", index: 1 })]",
" CoalesceBatchesExec: target_batch_size=4096",
- " RepartitionExec: partitioning=Hash([Column { name: \"t1.t1_id + Int64(12)\", index: 2 }], 2)",
- " ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_id@0 + CAST(12 AS UInt32) as t1.t1_id + Int64(12)]",
+ " RepartitionExec: partitioning=Hash([Column { name: \"t1.t1_id + UInt32(12)\", index: 2 }], 2)",
+ " ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_id@0 + 12 as t1.t1_id + UInt32(12)]",
" RepartitionExec: partitioning=RoundRobinBatch(2)",
" MemoryExec: partitions=1, partition_sizes=[1]",
" CoalesceBatchesExec: target_batch_size=4096",
- " RepartitionExec: partitioning=Hash([Column { name: \"t2.t2_id + Int64(1)\", index: 1 }], 2)",
- " ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 + CAST(1 AS UInt32) as t2.t2_id + Int64(1)]",
+ " RepartitionExec: partitioning=Hash([Column { name: \"t2.t2_id + UInt32(1)\", index: 1 }], 2)",
+ " ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 + 1 as t2.t2_id + UInt32(1)]",
" RepartitionExec: partitioning=RoundRobinBatch(2)",
" MemoryExec: partitions=1, partition_sizes=[1]",
- ]
+ ]
} else {
vec![
"ProjectionExec: expr=[t1_id@0 as t1_id, t2_id@2 as t2_id, t1_name@1 as t1_name]",
" ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t2_id@3 as t2_id]",
" CoalesceBatchesExec: target_batch_size=4096",
- " HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(Column { name: \"t1.t1_id + Int64(12)\", index: 2 }, Column { name: \"t2.t2_id + Int64(1)\", index: 1 })]",
+ " HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(Column { name: \"t1.t1_id + UInt32(12)\", index: 2 }, Column { name: \"t2.t2_id + UInt32(1)\", index: 1 })]",
" CoalescePartitionsExec",
- " ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_id@0 + CAST(12 AS UInt32) as t1.t1_id + Int64(12)]",
+ " ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_id@0 + 12 as t1.t1_id + UInt32(12)]",
" RepartitionExec: partitioning=RoundRobinBatch(2)",
" MemoryExec: partitions=1, partition_sizes=[1]",
- " ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 + CAST(1 AS UInt32) as t2.t2_id + Int64(1)]",
+ " ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 + 1 as t2.t2_id + UInt32(1)]",
" RepartitionExec: partitioning=RoundRobinBatch(2)",
" MemoryExec: partitions=1, partition_sizes=[1]",
]
@@ -2524,10 +2524,10 @@ async fn left_side_expr_key_inner_join() -> Result<()> {
"ProjectionExec: expr=[t1_id@0 as t1_id, t2_id@2 as t2_id, t1_name@1 as t1_name]",
" ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t2_id@3 as t2_id]",
" CoalesceBatchesExec: target_batch_size=4096",
- " HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"t1.t1_id + Int64(11)\", index: 2 }, Column { name: \"t2_id\", index: 0 })]",
+ " HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"t1.t1_id + UInt32(11)\", index: 2 }, Column { name: \"t2_id\", index: 0 })]",
" CoalesceBatchesExec: target_batch_size=4096",
- " RepartitionExec: partitioning=Hash([Column { name: \"t1.t1_id + Int64(11)\", index: 2 }], 2)",
- " ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_id@0 + CAST(11 AS UInt32) as t1.t1_id + Int64(11)]",
+ " RepartitionExec: partitioning=Hash([Column { name: \"t1.t1_id + UInt32(11)\", index: 2 }], 2)",
+ " ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_id@0 + 11 as t1.t1_id + UInt32(11)]",
" RepartitionExec: partitioning=RoundRobinBatch(2)",
" MemoryExec: partitions=1, partition_sizes=[1]",
" CoalesceBatchesExec: target_batch_size=4096",
@@ -2541,9 +2541,9 @@ async fn left_side_expr_key_inner_join() -> Result<()> {
" ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t2_id@3 as t2_id]",
" RepartitionExec: partitioning=RoundRobinBatch(2)",
" CoalesceBatchesExec: target_batch_size=4096",
- " HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(Column { name: \"t1.t1_id + Int64(11)\", index: 2 }, Column { name: \"t2_id\", index: 0 })]",
+ " HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(Column { name: \"t1.t1_id + UInt32(11)\", index: 2 }, Column { name: \"t2_id\", index: 0 })]",
" CoalescePartitionsExec",
- " ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_id@0 + CAST(11 AS UInt32) as t1.t1_id + Int64(11)]",
+ " ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_id@0 + 11 as t1.t1_id + UInt32(11)]",
" RepartitionExec: partitioning=RoundRobinBatch(2)",
" MemoryExec: partitions=1, partition_sizes=[1]",
" MemoryExec: partitions=1, partition_sizes=[1]",
@@ -2594,14 +2594,14 @@ async fn right_side_expr_key_inner_join() -> Result<()> {
"ProjectionExec: expr=[t1_id@0 as t1_id, t2_id@2 as t2_id, t1_name@1 as t1_name]",
" ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t2_id@2 as t2_id]",
" CoalesceBatchesExec: target_batch_size=4096",
- " HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"t1_id\", index: 0 }, Column { name: \"t2.t2_id - Int64(11)\", index: 1 })]",
+ " HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"t1_id\", index: 0 }, Column { name: \"t2.t2_id - UInt32(11)\", index: 1 })]",
" CoalesceBatchesExec: target_batch_size=4096",
" RepartitionExec: partitioning=Hash([Column { name: \"t1_id\", index: 0 }], 2)",
" RepartitionExec: partitioning=RoundRobinBatch(2)",
" MemoryExec: partitions=1, partition_sizes=[1]",
" CoalesceBatchesExec: target_batch_size=4096",
- " RepartitionExec: partitioning=Hash([Column { name: \"t2.t2_id - Int64(11)\", index: 1 }], 2)",
- " ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 - CAST(11 AS UInt32) as t2.t2_id - Int64(11)]",
+ " RepartitionExec: partitioning=Hash([Column { name: \"t2.t2_id - UInt32(11)\", index: 1 }], 2)",
+ " ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 - 11 as t2.t2_id - UInt32(11)]",
" RepartitionExec: partitioning=RoundRobinBatch(2)",
" MemoryExec: partitions=1, partition_sizes=[1]",
]
@@ -2610,9 +2610,9 @@ async fn right_side_expr_key_inner_join() -> Result<()> {
"ProjectionExec: expr=[t1_id@0 as t1_id, t2_id@2 as t2_id, t1_name@1 as t1_name]",
" ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t2_id@2 as t2_id]",
" CoalesceBatchesExec: target_batch_size=4096",
- " HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(Column { name: \"t1_id\", index: 0 }, Column { name: \"t2.t2_id - Int64(11)\", index: 1 })]",
+ " HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(Column { name: \"t1_id\", index: 0 }, Column { name: \"t2.t2_id - UInt32(11)\", index: 1 })]",
" MemoryExec: partitions=1, partition_sizes=[1]",
- " ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 - CAST(11 AS UInt32) as t2.t2_id - Int64(11)]",
+ " ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 - 11 as t2.t2_id - UInt32(11)]",
" RepartitionExec: partitioning=RoundRobinBatch(2)",
" MemoryExec: partitions=1, partition_sizes=[1]",
]
@@ -2662,14 +2662,14 @@ async fn select_wildcard_with_expr_key_inner_join() -> Result<()> {
"ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_int@2 as t1_int, t2_id@3 as t2_id, t2_name@4 as t2_name, t2_int@5 as t2_int]",
" ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_int@2 as t1_int, t2_id@3 as t2_id, t2_name@4 as t2_name, t2_int@5 as t2_int]",
" CoalesceBatchesExec: target_batch_size=4096",
- " HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"t1_id\", index: 0 }, Column { name: \"t2.t2_id - Int64(11)\", index: 3 })]",
+ " HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: \"t1_id\", index: 0 }, Column { name: \"t2.t2_id - UInt32(11)\", index: 3 })]",
" CoalesceBatchesExec: target_batch_size=4096",
" RepartitionExec: partitioning=Hash([Column { name: \"t1_id\", index: 0 }], 2)",
" RepartitionExec: partitioning=RoundRobinBatch(2)",
" MemoryExec: partitions=1, partition_sizes=[1]",
" CoalesceBatchesExec: target_batch_size=4096",
- " RepartitionExec: partitioning=Hash([Column { name: \"t2.t2_id - Int64(11)\", index: 3 }], 2)",
- " ProjectionExec: expr=[t2_id@0 as t2_id, t2_name@1 as t2_name, t2_int@2 as t2_int, t2_id@0 - CAST(11 AS UInt32) as t2.t2_id - Int64(11)]",
+ " RepartitionExec: partitioning=Hash([Column { name: \"t2.t2_id - UInt32(11)\", index: 3 }], 2)",
+ " ProjectionExec: expr=[t2_id@0 as t2_id, t2_name@1 as t2_name, t2_int@2 as t2_int, t2_id@0 - 11 as t2.t2_id - UInt32(11)]",
" RepartitionExec: partitioning=RoundRobinBatch(2)",
" MemoryExec: partitions=1, partition_sizes=[1]",
]
@@ -2678,9 +2678,9 @@ async fn select_wildcard_with_expr_key_inner_join() -> Result<()> {
"ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_int@2 as t1_int, t2_id@3 as t2_id, t2_name@4 as t2_name, t2_int@5 as t2_int]",
" ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_int@2 as t1_int, t2_id@3 as t2_id, t2_name@4 as t2_name, t2_int@5 as t2_int]",
" CoalesceBatchesExec: target_batch_size=4096",
- " HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(Column { name: \"t1_id\", index: 0 }, Column { name: \"t2.t2_id - Int64(11)\", index: 3 })]",
+ " HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(Column { name: \"t1_id\", index: 0 }, Column { name: \"t2.t2_id - UInt32(11)\", index: 3 })]",
" MemoryExec: partitions=1, partition_sizes=[1]",
- " ProjectionExec: expr=[t2_id@0 as t2_id, t2_name@1 as t2_name, t2_int@2 as t2_int, t2_id@0 - CAST(11 AS UInt32) as t2.t2_id - Int64(11)]",
+ " ProjectionExec: expr=[t2_id@0 as t2_id, t2_name@1 as t2_name, t2_int@2 as t2_int, t2_id@0 - 11 as t2.t2_id - UInt32(11)]",
" RepartitionExec: partitioning=RoundRobinBatch(2)",
" MemoryExec: partitions=1, partition_sizes=[1]",
]
diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs
index a3b240e65..87c7b2597 100644
--- a/datafusion/expr/src/utils.rs
+++ b/datafusion/expr/src/utils.rs
@@ -586,12 +586,13 @@ pub fn from_plan(
// The preceding part of expr is equi-exprs,
// and the struct of each equi-expr is like `left-expr = right-expr`.
let new_on:Vec<(Expr,Expr)> = expr.iter().take(equi_expr_count).map(|equi_expr| {
- if let Expr::BinaryExpr(BinaryExpr { left, op, right }) = equi_expr {
- assert!(op == &Operator::Eq);
- Ok(((**left).clone(), (**right).clone()))
+ // SimplifyExpression rule may add alias to the equi_expr.
+ let unalias_expr = equi_expr.clone().unalias();
+ if let Expr::BinaryExpr(BinaryExpr { left, op:Operator::Eq, right }) = unalias_expr {
+ Ok((*left, *right))
} else {
Err(DataFusionError::Internal(format!(
- "The front part expressions should be an binary expression, actual:{}",
+ "The front part expressions should be an binary equiality expression, actual:{}",
equi_expr
)))
}
diff --git a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs
index 6645284e3..e4fe2e137 100644
--- a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs
+++ b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs
@@ -120,6 +120,7 @@ mod tests {
use crate::simplify_expressions::utils::for_test::{
cast_to_int64_expr, now_expr, to_timestamp_expr,
};
+ use crate::test::test_table_scan_with_name;
use super::*;
use arrow::datatypes::{DataType, Field, Schema};
@@ -131,7 +132,7 @@ mod tests {
use datafusion_expr::logical_plan::table_scan;
use datafusion_expr::{
and, binary_expr, col, lit, logical_plan::builder::LogicalPlanBuilder, Expr,
- ExprSchemable,
+ ExprSchemable, JoinType,
};
/// A macro to assert that one string is contained within another with
@@ -789,4 +790,30 @@ mod tests {
assert_optimized_plan_eq(&plan, expected)
}
+
+ #[test]
+ fn simplify_equijoin_predicate() -> Result<()> {
+ let t1 = test_table_scan_with_name("t1")?;
+ let t2 = test_table_scan_with_name("t2")?;
+
+ let left_key = col("t1.a") + lit(1i64).cast_to(&DataType::UInt32, t1.schema())?;
+ let right_key =
+ col("t2.a") + lit(2i64).cast_to(&DataType::UInt32, t2.schema())?;
+ let plan = LogicalPlanBuilder::from(t1)
+ .join_with_expr_keys(
+ t2,
+ JoinType::Inner,
+ (vec![left_key], vec![right_key]),
+ None,
+ )?
+ .build()?;
+
+ // before simplify: t1.a + CAST(Int64(1), UInt32) = t2.a + CAST(Int64(2), UInt32)
+ // after simplify: t1.a + UInt32(1) = t2.a + UInt32(2) AS t1.a + Int64(1) = t2.a + Int64(2)
+ let expected = "Inner Join: t1.a + UInt32(1) = t2.a + UInt32(2)\
+ \n TableScan: t1\
+ \n TableScan: t2";
+
+ assert_optimized_plan_eq(&plan, expected)
+ }
}