You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/11/30 14:20:08 UTC

[GitHub] [arrow-datafusion] ygf11 opened a new pull request, #4443: Add support for non-column key for equijoin when eliminating cross join to inner

ygf11 opened a new pull request, #4443:
URL: https://github.com/apache/arrow-datafusion/pull/4443

   # Which issue does this PR close?
   
   Closes #4442.
   Part of #4389.
   
   # Rationale for this change
   * Currently `EliminateCrossJoin` will only support eliminating cross join when the join keys are columns. We should support non-column join keys.
   * This pr is part of #4389. 
   
   # What changes are included in this PR?
   
   * Add logics to support non-column join key in `EliminateCrossJoin`.
   * Add unit tests.
   
   # Are these changes tested?
   Yes, unit tests will test it.
   
   # Are there any user-facing changes?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] liukun4515 commented on a diff in pull request #4443: Add support for non-column key for equijoin when eliminating cross join to inner join

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on code in PR #4443:
URL: https://github.com/apache/arrow-datafusion/pull/4443#discussion_r1042903948


##########
datafusion/expr/src/logical_plan/builder.rs:
##########
@@ -991,6 +992,68 @@ pub fn table_scan(
     LogicalPlanBuilder::scan(name.unwrap_or(UNNAMED_TABLE), table_source, projection)
 }
 
+/// Wrap projection for a plan, if the join keys contains normal expression.
+pub fn wrap_projection_for_join_if_necessary(
+    join_keys: &[Expr],
+    input: LogicalPlan,
+) -> Result<(LogicalPlan, Vec<Column>, bool)> {
+    let input_schema = input.schema();
+    let alias_join_keys: Vec<Expr> = join_keys
+        .iter()
+        .map(|key| {
+            // The display_name() of cast expression will ignore the cast info, and show the inner expression name.
+            // If we do not add alais, it will throw same field name error in the schema when adding projection.

Review Comment:
   good comments



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] ygf11 commented on pull request #4443: Add support for non-column key for equijoin when eliminating cross join to inner join

Posted by GitBox <gi...@apache.org>.
ygf11 commented on PR #4443:
URL: https://github.com/apache/arrow-datafusion/pull/4443#issuecomment-1344148046

   > Maybe we can add sql test in the follow up pr.
   
   Thanks, I will add the test case in the follow up pr.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #4443: Add support for non-column key for equijoin when eliminating cross join to inner join

Posted by GitBox <gi...@apache.org>.
mingmwang commented on code in PR #4443:
URL: https://github.com/apache/arrow-datafusion/pull/4443#discussion_r1041728697


##########
datafusion/expr/src/logical_plan/builder.rs:
##########
@@ -1012,6 +1013,68 @@ pub fn table_scan(
     LogicalPlanBuilder::scan(name.unwrap_or(UNNAMED_TABLE), table_source, projection)
 }
 
+/// Wrap projection for a plan, if the join keys contains normal expression.
+pub fn wrap_projection_for_join_if_necessary(
+    join_keys: &[Expr],
+    input: LogicalPlan,
+) -> Result<(LogicalPlan, Vec<Column>, bool)> {
+    let input_schema = input.schema();
+    let alias_join_keys: Vec<Expr> = join_keys
+        .iter()
+        .map(|key| {
+            // The display_name() of cast expression will ignore the cast info, and show the inner expression name.
+            // If we do not add alais, it will throw same field name error in the schema when adding projection.
+            // For example:
+            //    input scan : [a, b, c],
+            //    join keys: [cast(a as int)]
+            //
+            //  then a and cast(a as int) will use the same field name - `a` in projection schema.

Review Comment:
   Why the `Cast` expressions ignore the cast info ? Looks like this is not the expected behavior. 
   I think no matter the Cast is explicit or implicit, it should display the cast info.
   
   https://github.com/apache/arrow-datafusion/blob/cedb05aedf3cea030bfa8774b8575d8f4806a1c8/datafusion/expr/src/expr.rs#L1108-L1115
   
   @andygrove @liukun4515 @jackwener 
   How do you think? 
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb merged pull request #4443: Add support for non-column key for equijoin when eliminating cross join to inner join

Posted by GitBox <gi...@apache.org>.
alamb merged PR #4443:
URL: https://github.com/apache/arrow-datafusion/pull/4443


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on pull request #4443: Add support for non-column key for equijoin when eliminating cross join to inner join

Posted by GitBox <gi...@apache.org>.
mingmwang commented on PR #4443:
URL: https://github.com/apache/arrow-datafusion/pull/4443#issuecomment-1340511357

   The PR itself LGTM. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] jackwener commented on a diff in pull request #4443: Add support for non-column key for equijoin when eliminating cross join to inner join

Posted by GitBox <gi...@apache.org>.
jackwener commented on code in PR #4443:
URL: https://github.com/apache/arrow-datafusion/pull/4443#discussion_r1042021305


##########
datafusion/expr/src/logical_plan/builder.rs:
##########
@@ -1012,6 +1013,68 @@ pub fn table_scan(
     LogicalPlanBuilder::scan(name.unwrap_or(UNNAMED_TABLE), table_source, projection)
 }
 
+/// Wrap projection for a plan, if the join keys contains normal expression.
+pub fn wrap_projection_for_join_if_necessary(
+    join_keys: &[Expr],
+    input: LogicalPlan,
+) -> Result<(LogicalPlan, Vec<Column>, bool)> {
+    let input_schema = input.schema();
+    let alias_join_keys: Vec<Expr> = join_keys
+        .iter()
+        .map(|key| {
+            // The display_name() of cast expression will ignore the cast info, and show the inner expression name.
+            // If we do not add alais, it will throw same field name error in the schema when adding projection.
+            // For example:
+            //    input scan : [a, b, c],
+            //    join keys: [cast(a as int)]
+            //
+            //  then a and cast(a as int) will use the same field name - `a` in projection schema.

Review Comment:
   Yes, I think we need to consider to the `Cast`.
   1. sql can explicitly include `cast`
   2. after `infer cast`, it will generate cast.
   But I need to time to think carefully about its detail.



##########
datafusion/expr/src/logical_plan/builder.rs:
##########
@@ -1012,6 +1013,68 @@ pub fn table_scan(
     LogicalPlanBuilder::scan(name.unwrap_or(UNNAMED_TABLE), table_source, projection)
 }
 
+/// Wrap projection for a plan, if the join keys contains normal expression.
+pub fn wrap_projection_for_join_if_necessary(
+    join_keys: &[Expr],
+    input: LogicalPlan,
+) -> Result<(LogicalPlan, Vec<Column>, bool)> {
+    let input_schema = input.schema();
+    let alias_join_keys: Vec<Expr> = join_keys
+        .iter()
+        .map(|key| {
+            // The display_name() of cast expression will ignore the cast info, and show the inner expression name.
+            // If we do not add alais, it will throw same field name error in the schema when adding projection.
+            // For example:
+            //    input scan : [a, b, c],
+            //    join keys: [cast(a as int)]
+            //
+            //  then a and cast(a as int) will use the same field name - `a` in projection schema.

Review Comment:
   Yes, I think we need to consider to the `Cast`.
   1. sql can explicitly include `cast`
   2. after `infer cast`, it will generate cast.
   
   But I need to time to think carefully about its detail.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] jackwener commented on a diff in pull request #4443: Add support for non-column key for equijoin when eliminating cross join to inner join

Posted by GitBox <gi...@apache.org>.
jackwener commented on code in PR #4443:
URL: https://github.com/apache/arrow-datafusion/pull/4443#discussion_r1041947341


##########
datafusion/core/tests/sql/joins.rs:
##########
@@ -2304,3 +2304,99 @@ async fn error_cross_join() -> Result<()> {
 
     Ok(())
 }
+
+#[tokio::test]
+async fn reduce_cross_join_with_expr_join_key_all() -> Result<()> {
+    let test_repartition_joins = vec![true, false];
+    for repartition_joins in test_repartition_joins {
+        let ctx = create_join_context("t1_id", "t2_id", repartition_joins)?;
+
+        // reduce to inner join
+        let sql = "select * from t1 cross join t2 where t1.t1_id + 12 = t2.t2_id + 1";
+        let msg = format!("Creating logical plan for '{}'", sql);
+        let plan = ctx
+            .create_logical_plan(&("explain ".to_owned() + sql))
+            .expect(&msg);
+        let state = ctx.state();
+        let plan = state.optimize(&plan)?;
+        let expected = vec![
+            "Explain [plan_type:Utf8, plan:Utf8]",
+            "  Projection: t1.t1_id, t1.t1_name, t1.t1_int, t2.t2_id, t2.t2_name, t2.t2_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+            "    Inner Join: t1.t1_id + Int64(12) = t2.t2_id + Int64(1) [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t1.t1_id + Int64(12):Int64;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N, t2.t2_id + Int64(1):Int64;N]",
+            "      Projection: t1.t1_id, t1.t1_name, t1.t1_int, CAST(t1.t1_id AS Int64) + Int64(12) [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t1.t1_id + Int64(12):Int64;N]",
+            "        TableScan: t1 projection=[t1_id, t1_name, t1_int] [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
+            "      Projection: t2.t2_id, t2.t2_name, t2.t2_int, CAST(t2.t2_id AS Int64) + Int64(1) [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N, t2.t2_id + Int64(1):Int64;N]",
+            "        TableScan: t2 projection=[t2_id, t2_name, t2_int] [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+        ];
+        let formatted = plan.display_indent_schema().to_string();
+        let actual: Vec<&str> = formatted.trim().lines().collect();
+        assert_eq!(
+            expected, actual,
+            "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
+            expected, actual
+        );
+        let expected = vec![
+            "+-------+---------+--------+-------+---------+--------+",
+            "| t1_id | t1_name | t1_int | t2_id | t2_name | t2_int |",
+            "+-------+---------+--------+-------+---------+--------+",
+            "| 11    | a       | 1      | 22    | y       | 1      |",
+            "| 33    | c       | 3      | 44    | x       | 3      |",
+            "| 44    | d       | 4      | 55    | w       | 3      |",
+            "+-------+---------+--------+-------+---------+--------+",
+        ];
+
+        let results = execute_to_batches(&ctx, sql).await;
+        assert_batches_sorted_eq!(expected, &results);
+    }
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn reduce_cross_join_with_cast_expr_join_key() -> Result<()> {
+    let test_repartition_joins = vec![true, false];
+    for repartition_joins in test_repartition_joins {
+        let ctx = create_join_context("t1_id", "t2_id", repartition_joins)?;
+
+        // reduce to inner join, t2.t2_id will insert cast.
+        let sql =
+            "select t1.t1_id, t2.t2_id, t1.t1_name from t1 cross join t2 where t1.t1_id + 11 = cast(t2.t2_id as BIGINT)";
+        let msg = format!("Creating logical plan for '{}'", sql);
+        let plan = ctx
+            .create_logical_plan(&("explain ".to_owned() + sql))
+            .expect(&msg);
+        let state = ctx.state();
+        let plan = state.optimize(&plan)?;
+        let expected = vec![
+            "Explain [plan_type:Utf8, plan:Utf8]",
+            "  Projection: t1.t1_id, t2.t2_id, t1.t1_name [t1_id:UInt32;N, t2_id:UInt32;N, t1_name:Utf8;N]",
+            "    Projection: t1.t1_id, t1.t1_name, t2.t2_id [t1_id:UInt32;N, t1_name:Utf8;N, t2_id:UInt32;N]",

Review Comment:
   This isn't related with this PR. This projection is generated due to keep the same order.
   It should need do more optimization in `PushDownProjection`.
   
   Recent I am doing a enhancement about `PushDownProjection`, it should can resolve this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #4443: Add support for non-column key for equijoin when eliminating cross join to inner join

Posted by GitBox <gi...@apache.org>.
alamb commented on PR #4443:
URL: https://github.com/apache/arrow-datafusion/pull/4443#issuecomment-1344402435

   Thanks @ygf11  for the contribution and everyone for the reviews!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] ygf11 commented on pull request #4443: Add support for non-column key for equijoin when eliminating cross join to inner join

Posted by GitBox <gi...@apache.org>.
ygf11 commented on PR #4443:
URL: https://github.com/apache/arrow-datafusion/pull/4443#issuecomment-1333282091

   cc @alamb @jackwener @mingmwang 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] liukun4515 commented on pull request #4443: Add support for non-column key for equijoin when eliminating cross join to inner join

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on PR #4443:
URL: https://github.com/apache/arrow-datafusion/pull/4443#issuecomment-1333545952

   @ygf11 thanks for your contribution, could you please add some IT in the `datafusion/core/tests/sql/joins.rs` file with your SQL like 
   ```
   select * from test0 as t0 cross join test1 as t1 where t0.a + 1 = t1.a * 2;
   select * from test0 as t0 cross join test1 as t1 where t0.a * 2 = t1.a - 1;
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] jackwener commented on a diff in pull request #4443: Add support for non-column key for equijoin when eliminating cross join to inner join

Posted by GitBox <gi...@apache.org>.
jackwener commented on code in PR #4443:
URL: https://github.com/apache/arrow-datafusion/pull/4443#discussion_r1041947341


##########
datafusion/core/tests/sql/joins.rs:
##########
@@ -2304,3 +2304,99 @@ async fn error_cross_join() -> Result<()> {
 
     Ok(())
 }
+
+#[tokio::test]
+async fn reduce_cross_join_with_expr_join_key_all() -> Result<()> {
+    let test_repartition_joins = vec![true, false];
+    for repartition_joins in test_repartition_joins {
+        let ctx = create_join_context("t1_id", "t2_id", repartition_joins)?;
+
+        // reduce to inner join
+        let sql = "select * from t1 cross join t2 where t1.t1_id + 12 = t2.t2_id + 1";
+        let msg = format!("Creating logical plan for '{}'", sql);
+        let plan = ctx
+            .create_logical_plan(&("explain ".to_owned() + sql))
+            .expect(&msg);
+        let state = ctx.state();
+        let plan = state.optimize(&plan)?;
+        let expected = vec![
+            "Explain [plan_type:Utf8, plan:Utf8]",
+            "  Projection: t1.t1_id, t1.t1_name, t1.t1_int, t2.t2_id, t2.t2_name, t2.t2_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+            "    Inner Join: t1.t1_id + Int64(12) = t2.t2_id + Int64(1) [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t1.t1_id + Int64(12):Int64;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N, t2.t2_id + Int64(1):Int64;N]",
+            "      Projection: t1.t1_id, t1.t1_name, t1.t1_int, CAST(t1.t1_id AS Int64) + Int64(12) [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t1.t1_id + Int64(12):Int64;N]",
+            "        TableScan: t1 projection=[t1_id, t1_name, t1_int] [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
+            "      Projection: t2.t2_id, t2.t2_name, t2.t2_int, CAST(t2.t2_id AS Int64) + Int64(1) [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N, t2.t2_id + Int64(1):Int64;N]",
+            "        TableScan: t2 projection=[t2_id, t2_name, t2_int] [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+        ];
+        let formatted = plan.display_indent_schema().to_string();
+        let actual: Vec<&str> = formatted.trim().lines().collect();
+        assert_eq!(
+            expected, actual,
+            "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
+            expected, actual
+        );
+        let expected = vec![
+            "+-------+---------+--------+-------+---------+--------+",
+            "| t1_id | t1_name | t1_int | t2_id | t2_name | t2_int |",
+            "+-------+---------+--------+-------+---------+--------+",
+            "| 11    | a       | 1      | 22    | y       | 1      |",
+            "| 33    | c       | 3      | 44    | x       | 3      |",
+            "| 44    | d       | 4      | 55    | w       | 3      |",
+            "+-------+---------+--------+-------+---------+--------+",
+        ];
+
+        let results = execute_to_batches(&ctx, sql).await;
+        assert_batches_sorted_eq!(expected, &results);
+    }
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn reduce_cross_join_with_cast_expr_join_key() -> Result<()> {
+    let test_repartition_joins = vec![true, false];
+    for repartition_joins in test_repartition_joins {
+        let ctx = create_join_context("t1_id", "t2_id", repartition_joins)?;
+
+        // reduce to inner join, t2.t2_id will insert cast.
+        let sql =
+            "select t1.t1_id, t2.t2_id, t1.t1_name from t1 cross join t2 where t1.t1_id + 11 = cast(t2.t2_id as BIGINT)";
+        let msg = format!("Creating logical plan for '{}'", sql);
+        let plan = ctx
+            .create_logical_plan(&("explain ".to_owned() + sql))
+            .expect(&msg);
+        let state = ctx.state();
+        let plan = state.optimize(&plan)?;
+        let expected = vec![
+            "Explain [plan_type:Utf8, plan:Utf8]",
+            "  Projection: t1.t1_id, t2.t2_id, t1.t1_name [t1_id:UInt32;N, t2_id:UInt32;N, t1_name:Utf8;N]",
+            "    Projection: t1.t1_id, t1.t1_name, t2.t2_id [t1_id:UInt32;N, t1_name:Utf8;N, t2_id:UInt32;N]",

Review Comment:
   Look like we need rebase and look the new result.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] ygf11 commented on pull request #4443: Add support for non-column key for equijoin when eliminating cross join to inner join

Posted by GitBox <gi...@apache.org>.
ygf11 commented on PR #4443:
URL: https://github.com/apache/arrow-datafusion/pull/4443#issuecomment-1334981985

   > thanks for your contribution, could you please add some IT in the datafusion/core/tests/sql/joins.rs file with your SQL like
   
   @liukun4515 Added.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] liukun4515 commented on a diff in pull request #4443: Add support for non-column key for equijoin when eliminating cross join to inner join

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on code in PR #4443:
URL: https://github.com/apache/arrow-datafusion/pull/4443#discussion_r1041829969


##########
datafusion/core/tests/sql/joins.rs:
##########
@@ -2304,3 +2304,99 @@ async fn error_cross_join() -> Result<()> {
 
     Ok(())
 }
+
+#[tokio::test]
+async fn reduce_cross_join_with_expr_join_key_all() -> Result<()> {
+    let test_repartition_joins = vec![true, false];
+    for repartition_joins in test_repartition_joins {
+        let ctx = create_join_context("t1_id", "t2_id", repartition_joins)?;
+
+        // reduce to inner join
+        let sql = "select * from t1 cross join t2 where t1.t1_id + 12 = t2.t2_id + 1";
+        let msg = format!("Creating logical plan for '{}'", sql);
+        let plan = ctx
+            .create_logical_plan(&("explain ".to_owned() + sql))
+            .expect(&msg);
+        let state = ctx.state();
+        let plan = state.optimize(&plan)?;
+        let expected = vec![
+            "Explain [plan_type:Utf8, plan:Utf8]",
+            "  Projection: t1.t1_id, t1.t1_name, t1.t1_int, t2.t2_id, t2.t2_name, t2.t2_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+            "    Inner Join: t1.t1_id + Int64(12) = t2.t2_id + Int64(1) [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t1.t1_id + Int64(12):Int64;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N, t2.t2_id + Int64(1):Int64;N]",
+            "      Projection: t1.t1_id, t1.t1_name, t1.t1_int, CAST(t1.t1_id AS Int64) + Int64(12) [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t1.t1_id + Int64(12):Int64;N]",
+            "        TableScan: t1 projection=[t1_id, t1_name, t1_int] [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
+            "      Projection: t2.t2_id, t2.t2_name, t2.t2_int, CAST(t2.t2_id AS Int64) + Int64(1) [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N, t2.t2_id + Int64(1):Int64;N]",
+            "        TableScan: t2 projection=[t2_id, t2_name, t2_int] [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+        ];
+        let formatted = plan.display_indent_schema().to_string();
+        let actual: Vec<&str> = formatted.trim().lines().collect();
+        assert_eq!(
+            expected, actual,
+            "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
+            expected, actual
+        );
+        let expected = vec![
+            "+-------+---------+--------+-------+---------+--------+",
+            "| t1_id | t1_name | t1_int | t2_id | t2_name | t2_int |",
+            "+-------+---------+--------+-------+---------+--------+",
+            "| 11    | a       | 1      | 22    | y       | 1      |",
+            "| 33    | c       | 3      | 44    | x       | 3      |",
+            "| 44    | d       | 4      | 55    | w       | 3      |",
+            "+-------+---------+--------+-------+---------+--------+",
+        ];
+
+        let results = execute_to_batches(&ctx, sql).await;
+        assert_batches_sorted_eq!(expected, &results);
+    }
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn reduce_cross_join_with_cast_expr_join_key() -> Result<()> {
+    let test_repartition_joins = vec![true, false];
+    for repartition_joins in test_repartition_joins {
+        let ctx = create_join_context("t1_id", "t2_id", repartition_joins)?;
+
+        // reduce to inner join, t2.t2_id will insert cast.
+        let sql =
+            "select t1.t1_id, t2.t2_id, t1.t1_name from t1 cross join t2 where t1.t1_id + 11 = cast(t2.t2_id as BIGINT)";
+        let msg = format!("Creating logical plan for '{}'", sql);
+        let plan = ctx
+            .create_logical_plan(&("explain ".to_owned() + sql))
+            .expect(&msg);
+        let state = ctx.state();
+        let plan = state.optimize(&plan)?;
+        let expected = vec![
+            "Explain [plan_type:Utf8, plan:Utf8]",
+            "  Projection: t1.t1_id, t2.t2_id, t1.t1_name [t1_id:UInt32;N, t2_id:UInt32;N, t1_name:Utf8;N]",
+            "    Projection: t1.t1_id, t1.t1_name, t2.t2_id [t1_id:UInt32;N, t1_name:Utf8;N, t2_id:UInt32;N]",

Review Comment:
   cc @jackwener 
   maybe @jackwener  has fixed that in https://github.com/apache/arrow-datafusion/pull/4487



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #4443: Add support for non-column key for equijoin when eliminating cross join to inner join

Posted by GitBox <gi...@apache.org>.
alamb commented on code in PR #4443:
URL: https://github.com/apache/arrow-datafusion/pull/4443#discussion_r1042114683


##########
datafusion/core/tests/sql/joins.rs:
##########
@@ -2302,3 +2302,99 @@ async fn error_cross_join() -> Result<()> {
 
     Ok(())
 }
+
+#[tokio::test]
+async fn reduce_cross_join_with_expr_join_key_all() -> Result<()> {
+    let test_repartition_joins = vec![true, false];
+    for repartition_joins in test_repartition_joins {
+        let ctx = create_join_context("t1_id", "t2_id", repartition_joins)?;
+
+        // reduce to inner join
+        let sql = "select * from t1 cross join t2 where t1.t1_id + 12 = t2.t2_id + 1";
+        let msg = format!("Creating logical plan for '{}'", sql);
+        let plan = ctx
+            .create_logical_plan(&("explain ".to_owned() + sql))
+            .expect(&msg);
+        let state = ctx.state();
+        let plan = state.optimize(&plan)?;
+        let expected = vec![
+            "Explain [plan_type:Utf8, plan:Utf8]",
+            "  Projection: t1.t1_id, t1.t1_name, t1.t1_int, t2.t2_id, t2.t2_name, t2.t2_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+            "    Inner Join: t1.t1_id + Int64(12) = t2.t2_id + Int64(1) [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t1.t1_id + Int64(12):Int64;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N, t2.t2_id + Int64(1):Int64;N]",
+            "      Projection: t1.t1_id, t1.t1_name, t1.t1_int, CAST(t1.t1_id AS Int64) + Int64(12) [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t1.t1_id + Int64(12):Int64;N]",
+            "        TableScan: t1 projection=[t1_id, t1_name, t1_int] [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
+            "      Projection: t2.t2_id, t2.t2_name, t2.t2_int, CAST(t2.t2_id AS Int64) + Int64(1) [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N, t2.t2_id + Int64(1):Int64;N]",
+            "        TableScan: t2 projection=[t2_id, t2_name, t2_int] [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+        ];
+        let formatted = plan.display_indent_schema().to_string();
+        let actual: Vec<&str> = formatted.trim().lines().collect();
+        assert_eq!(
+            expected, actual,
+            "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
+            expected, actual
+        );
+        let expected = vec![
+            "+-------+---------+--------+-------+---------+--------+",
+            "| t1_id | t1_name | t1_int | t2_id | t2_name | t2_int |",
+            "+-------+---------+--------+-------+---------+--------+",
+            "| 11    | a       | 1      | 22    | y       | 1      |",
+            "| 33    | c       | 3      | 44    | x       | 3      |",
+            "| 44    | d       | 4      | 55    | w       | 3      |",
+            "+-------+---------+--------+-------+---------+--------+",
+        ];
+
+        let results = execute_to_batches(&ctx, sql).await;
+        assert_batches_sorted_eq!(expected, &results);
+    }
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn reduce_cross_join_with_cast_expr_join_key() -> Result<()> {
+    let test_repartition_joins = vec![true, false];
+    for repartition_joins in test_repartition_joins {
+        let ctx = create_join_context("t1_id", "t2_id", repartition_joins)?;
+
+        // reduce to inner join, t2.t2_id will insert cast.
+        let sql =
+            "select t1.t1_id, t2.t2_id, t1.t1_name from t1 cross join t2 where t1.t1_id + 11 = cast(t2.t2_id as BIGINT)";

Review Comment:
   👍 



##########
datafusion/sql/src/planner.rs:
##########
@@ -3115,32 +3101,6 @@ fn extract_join_keys(
     Ok(())
 }
 
-/// Wrap projection for a plan, if the join keys contains normal expression.
-fn wrap_projection_for_join_if_necessary(

Review Comment:
   it is very nice to not do this in the planner anymore



##########
datafusion/optimizer/src/eliminate_cross_join.rs:
##########
@@ -1055,4 +1092,168 @@ mod tests {
 
         Ok(())
     }
+
+    #[test]
+    fn eliminate_cross_join_with_expr_and() -> Result<()> {
+        let t1 = test_table_scan_with_name("t1")?;
+        let t2 = test_table_scan_with_name("t2")?;
+
+        // could eliminate to inner join since filter has Join predicates
+        let plan = LogicalPlanBuilder::from(t1)
+            .cross_join(&t2)?
+            .filter(binary_expr(
+                (col("t1.a") + lit(100u32)).eq(col("t2.a") * lit(2u32)),
+                And,
+                col("t2.c").lt(lit(20u32)),
+            ))?
+            .build()?;
+
+        let expected = vec![
+              "Filter: t2.c < UInt32(20) [a:UInt32, b:UInt32, c:UInt32, a:UInt32, b:UInt32, c:UInt32]",
+              "  Projection: t1.a, t1.b, t1.c, t2.a, t2.b, t2.c [a:UInt32, b:UInt32, c:UInt32, a:UInt32, b:UInt32, c:UInt32]",
+              "    Inner Join: t1.a + UInt32(100) = t2.a * UInt32(2) [a:UInt32, b:UInt32, c:UInt32, t1.a + UInt32(100):UInt32, a:UInt32, b:UInt32, c:UInt32, t2.a * UInt32(2):UInt32]",

Review Comment:
   this is very cool



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #4443: Add support for non-column key for equijoin when eliminating cross join to inner join

Posted by GitBox <gi...@apache.org>.
mingmwang commented on code in PR #4443:
URL: https://github.com/apache/arrow-datafusion/pull/4443#discussion_r1041719436


##########
datafusion/core/tests/sql/joins.rs:
##########
@@ -2304,3 +2304,99 @@ async fn error_cross_join() -> Result<()> {
 
     Ok(())
 }
+
+#[tokio::test]
+async fn reduce_cross_join_with_expr_join_key_all() -> Result<()> {
+    let test_repartition_joins = vec![true, false];
+    for repartition_joins in test_repartition_joins {
+        let ctx = create_join_context("t1_id", "t2_id", repartition_joins)?;
+
+        // reduce to inner join
+        let sql = "select * from t1 cross join t2 where t1.t1_id + 12 = t2.t2_id + 1";
+        let msg = format!("Creating logical plan for '{}'", sql);
+        let plan = ctx
+            .create_logical_plan(&("explain ".to_owned() + sql))
+            .expect(&msg);
+        let state = ctx.state();
+        let plan = state.optimize(&plan)?;
+        let expected = vec![
+            "Explain [plan_type:Utf8, plan:Utf8]",
+            "  Projection: t1.t1_id, t1.t1_name, t1.t1_int, t2.t2_id, t2.t2_name, t2.t2_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+            "    Inner Join: t1.t1_id + Int64(12) = t2.t2_id + Int64(1) [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t1.t1_id + Int64(12):Int64;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N, t2.t2_id + Int64(1):Int64;N]",
+            "      Projection: t1.t1_id, t1.t1_name, t1.t1_int, CAST(t1.t1_id AS Int64) + Int64(12) [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t1.t1_id + Int64(12):Int64;N]",
+            "        TableScan: t1 projection=[t1_id, t1_name, t1_int] [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
+            "      Projection: t2.t2_id, t2.t2_name, t2.t2_int, CAST(t2.t2_id AS Int64) + Int64(1) [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N, t2.t2_id + Int64(1):Int64;N]",
+            "        TableScan: t2 projection=[t2_id, t2_name, t2_int] [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+        ];
+        let formatted = plan.display_indent_schema().to_string();
+        let actual: Vec<&str> = formatted.trim().lines().collect();
+        assert_eq!(
+            expected, actual,
+            "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
+            expected, actual
+        );
+        let expected = vec![
+            "+-------+---------+--------+-------+---------+--------+",
+            "| t1_id | t1_name | t1_int | t2_id | t2_name | t2_int |",
+            "+-------+---------+--------+-------+---------+--------+",
+            "| 11    | a       | 1      | 22    | y       | 1      |",
+            "| 33    | c       | 3      | 44    | x       | 3      |",
+            "| 44    | d       | 4      | 55    | w       | 3      |",
+            "+-------+---------+--------+-------+---------+--------+",
+        ];
+
+        let results = execute_to_batches(&ctx, sql).await;
+        assert_batches_sorted_eq!(expected, &results);
+    }
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn reduce_cross_join_with_cast_expr_join_key() -> Result<()> {
+    let test_repartition_joins = vec![true, false];
+    for repartition_joins in test_repartition_joins {
+        let ctx = create_join_context("t1_id", "t2_id", repartition_joins)?;
+
+        // reduce to inner join, t2.t2_id will insert cast.
+        let sql =
+            "select t1.t1_id, t2.t2_id, t1.t1_name from t1 cross join t2 where t1.t1_id + 11 = cast(t2.t2_id as BIGINT)";
+        let msg = format!("Creating logical plan for '{}'", sql);
+        let plan = ctx
+            .create_logical_plan(&("explain ".to_owned() + sql))
+            .expect(&msg);
+        let state = ctx.state();
+        let plan = state.optimize(&plan)?;
+        let expected = vec![
+            "Explain [plan_type:Utf8, plan:Utf8]",
+            "  Projection: t1.t1_id, t2.t2_id, t1.t1_name [t1_id:UInt32;N, t2_id:UInt32;N, t1_name:Utf8;N]",
+            "    Projection: t1.t1_id, t1.t1_name, t2.t2_id [t1_id:UInt32;N, t1_name:Utf8;N, t2_id:UInt32;N]",

Review Comment:
   Looks like these two projections are not collapsed. This is unrelated to this PR. But we need to take a look at the `PushDownProjection` rule.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org