You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ja...@apache.org on 2023/06/13 15:07:26 UTC

[arrow-datafusion] branch main updated: fix: port unstable subquery to sqllogicaltest (#6659)

This is an automated email from the ASF dual-hosted git repository.

jakevin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 6558a8216a fix: port unstable subquery to sqllogicaltest (#6659)
6558a8216a is described below

commit 6558a8216a45603b0b2d261bd41c121d1a840520
Author: jakevin <ja...@gmail.com>
AuthorDate: Tue Jun 13 23:07:20 2023 +0800

    fix: port unstable subquery to sqllogicaltest (#6659)
    
    * refactor: port subquery to sqllogicaltest
    
    * add rowsort
---
 datafusion/core/tests/sql/subqueries.rs            | 187 ---------------------
 .../tests/sqllogictests/test_files/subquery.slt    | 141 ++++++++++++++++
 2 files changed, 141 insertions(+), 187 deletions(-)

diff --git a/datafusion/core/tests/sql/subqueries.rs b/datafusion/core/tests/sql/subqueries.rs
index d96c6eb700..1fd627bc77 100644
--- a/datafusion/core/tests/sql/subqueries.rs
+++ b/datafusion/core/tests/sql/subqueries.rs
@@ -380,133 +380,6 @@ async fn non_equal_correlated_scalar_subquery() -> Result<()> {
     Ok(())
 }
 
-#[tokio::test]
-async fn aggregated_correlated_scalar_subquery() -> Result<()> {
-    let ctx = create_join_context("t1_id", "t2_id", true)?;
-
-    let sql = "SELECT t1_id, (SELECT sum(t2_int) FROM t2 WHERE t2.t2_id = t1.t1_id) as t2_sum from t1";
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let plan = dataframe.into_optimized_plan()?;
-
-    let expected = vec![
-        "Projection: t1.t1_id, __scalar_sq_1.SUM(t2.t2_int) AS t2_sum [t1_id:UInt32;N, t2_sum:UInt64;N]",
-        "  Left Join: t1.t1_id = __scalar_sq_1.t2_id [t1_id:UInt32;N, SUM(t2.t2_int):UInt64;N, t2_id:UInt32;N]",
-        "    TableScan: t1 projection=[t1_id] [t1_id:UInt32;N]",
-        "    SubqueryAlias: __scalar_sq_1 [SUM(t2.t2_int):UInt64;N, t2_id:UInt32;N]",
-        "      Projection: SUM(t2.t2_int), t2.t2_id [SUM(t2.t2_int):UInt64;N, t2_id:UInt32;N]",
-        "        Aggregate: groupBy=[[t2.t2_id]], aggr=[[SUM(t2.t2_int)]] [t2_id:UInt32;N, SUM(t2.t2_int):UInt64;N]",
-        "          TableScan: t2 projection=[t2_id, t2_int] [t2_id:UInt32;N, t2_int:UInt32;N]",
-    ];
-    let formatted = plan.display_indent_schema().to_string();
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    assert_eq!(
-        expected, actual,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    // assert data
-    let results = execute_to_batches(&ctx, sql).await;
-    let expected = vec![
-        "+-------+--------+",
-        "| t1_id | t2_sum |",
-        "+-------+--------+",
-        "| 11    | 3      |",
-        "| 22    | 1      |",
-        "| 44    | 3      |",
-        "| 33    |        |",
-        "+-------+--------+",
-    ];
-    assert_batches_eq!(expected, &results);
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn aggregated_correlated_scalar_subquery_with_having() -> Result<()> {
-    let ctx = create_join_context("t1_id", "t2_id", true)?;
-
-    let sql = "SELECT t1_id, (SELECT sum(t2_int) FROM t2 WHERE t2.t2_id = t1.t1_id having sum(t2_int) < 3) as t2_sum from t1";
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let plan = dataframe.into_optimized_plan()?;
-
-    let expected = vec![
-        "Projection: t1.t1_id, __scalar_sq_1.SUM(t2.t2_int) AS t2_sum [t1_id:UInt32;N, t2_sum:UInt64;N]",
-        "  Left Join: t1.t1_id = __scalar_sq_1.t2_id [t1_id:UInt32;N, SUM(t2.t2_int):UInt64;N, t2_id:UInt32;N]",
-        "    TableScan: t1 projection=[t1_id] [t1_id:UInt32;N]",
-        "    SubqueryAlias: __scalar_sq_1 [SUM(t2.t2_int):UInt64;N, t2_id:UInt32;N]",
-        "      Projection: SUM(t2.t2_int), t2.t2_id [SUM(t2.t2_int):UInt64;N, t2_id:UInt32;N]",
-        "        Filter: SUM(t2.t2_int) < UInt64(3) [t2_id:UInt32;N, SUM(t2.t2_int):UInt64;N]",
-        "          Aggregate: groupBy=[[t2.t2_id]], aggr=[[SUM(t2.t2_int)]] [t2_id:UInt32;N, SUM(t2.t2_int):UInt64;N]",
-        "            TableScan: t2 projection=[t2_id, t2_int] [t2_id:UInt32;N, t2_int:UInt32;N]",
-    ];
-    let formatted = plan.display_indent_schema().to_string();
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    assert_eq!(
-        expected, actual,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    // assert data
-    let results = execute_to_batches(&ctx, sql).await;
-    let expected = vec![
-        "+-------+--------+",
-        "| t1_id | t2_sum |",
-        "+-------+--------+",
-        "| 22    | 1      |",
-        "| 11    |        |",
-        "| 33    |        |",
-        "| 44    |        |",
-        "+-------+--------+",
-    ];
-    assert_batches_eq!(expected, &results);
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn aggregated_correlated_scalar_subquery_with_cast() -> Result<()> {
-    let ctx = create_join_context("t1_id", "t2_id", true)?;
-
-    let sql = "SELECT t1_id, (SELECT sum(t2_int * 1.0) + 1 FROM t2 WHERE t2.t2_id = t1.t1_id) as t2_sum from t1";
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let plan = dataframe.into_optimized_plan()?;
-
-    let expected = vec![
-        "Projection: t1.t1_id, __scalar_sq_1.SUM(t2.t2_int * Float64(1)) + Int64(1) AS t2_sum [t1_id:UInt32;N, t2_sum:Float64;N]",
-        "  Left Join: t1.t1_id = __scalar_sq_1.t2_id [t1_id:UInt32;N, SUM(t2.t2_int * Float64(1)) + Int64(1):Float64;N, t2_id:UInt32;N]",
-        "    TableScan: t1 projection=[t1_id] [t1_id:UInt32;N]",
-        "    SubqueryAlias: __scalar_sq_1 [SUM(t2.t2_int * Float64(1)) + Int64(1):Float64;N, t2_id:UInt32;N]",
-        "      Projection: SUM(t2.t2_int * Float64(1)) + Float64(1) AS SUM(t2.t2_int * Float64(1)) + Int64(1), t2.t2_id [SUM(t2.t2_int * Float64(1)) + Int64(1):Float64;N, t2_id:UInt32;N]",
-        "        Aggregate: groupBy=[[t2.t2_id]], aggr=[[SUM(CAST(t2.t2_int AS Float64)) AS SUM(t2.t2_int * Float64(1))]] [t2_id:UInt32;N, SUM(t2.t2_int * Float64(1)):Float64;N]",
-        "          TableScan: t2 projection=[t2_id, t2_int] [t2_id:UInt32;N, t2_int:UInt32;N]",
-    ];
-    let formatted = plan.display_indent_schema().to_string();
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    assert_eq!(
-        expected, actual,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    // assert data
-    let results = execute_to_batches(&ctx, sql).await;
-    let expected = vec![
-        "+-------+--------+",
-        "| t1_id | t2_sum |",
-        "+-------+--------+",
-        "| 11    | 4.0    |",
-        "| 22    | 2.0    |",
-        "| 44    | 4.0    |",
-        "| 33    |        |",
-        "+-------+--------+",
-    ];
-    assert_batches_eq!(expected, &results);
-
-    Ok(())
-}
-
 #[tokio::test]
 async fn aggregated_correlated_scalar_subquery_with_extra_group_by_columns() -> Result<()>
 {
@@ -525,66 +398,6 @@ async fn aggregated_correlated_scalar_subquery_with_extra_group_by_columns() ->
     Ok(())
 }
 
-#[tokio::test]
-async fn aggregated_correlated_scalar_subquery_with_extra_group_by_constant() -> Result<()>
-{
-    let ctx = create_join_context("t1_id", "t2_id", true)?;
-
-    let sql = "SELECT t1_id, (SELECT sum(t2_int) FROM t2 WHERE t2.t2_id = t1.t1_id group by t2_id, 'a') as t2_sum from t1";
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let plan = dataframe.into_optimized_plan()?;
-
-    let expected = vec![
-        "Projection: t1.t1_id, __scalar_sq_1.SUM(t2.t2_int) AS t2_sum [t1_id:UInt32;N, t2_sum:UInt64;N]",
-        "  Left Join: t1.t1_id = __scalar_sq_1.t2_id [t1_id:UInt32;N, SUM(t2.t2_int):UInt64;N, t2_id:UInt32;N]",
-        "    TableScan: t1 projection=[t1_id] [t1_id:UInt32;N]",
-        "    SubqueryAlias: __scalar_sq_1 [SUM(t2.t2_int):UInt64;N, t2_id:UInt32;N]",
-        "      Projection: SUM(t2.t2_int), t2.t2_id [SUM(t2.t2_int):UInt64;N, t2_id:UInt32;N]",
-        "        Aggregate: groupBy=[[t2.t2_id, Utf8(\"a\")]], aggr=[[SUM(t2.t2_int)]] [t2_id:UInt32;N, Utf8(\"a\"):Utf8, SUM(t2.t2_int):UInt64;N]",
-        "          TableScan: t2 projection=[t2_id, t2_int] [t2_id:UInt32;N, t2_int:UInt32;N]",
-    ];
-    let formatted = plan.display_indent_schema().to_string();
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    assert_eq!(
-        expected, actual,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    // assert data
-    let results = execute_to_batches(&ctx, sql).await;
-    let expected = vec![
-        "+-------+--------+",
-        "| t1_id | t2_sum |",
-        "+-------+--------+",
-        "| 11    | 3      |",
-        "| 22    | 1      |",
-        "| 44    | 3      |",
-        "| 33    |        |",
-        "+-------+--------+",
-    ];
-    assert_batches_eq!(expected, &results);
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn group_by_correlated_scalar_subquery() -> Result<()> {
-    let ctx = create_join_context("t1_id", "t2_id", true)?;
-    let sql = "SELECT sum(t1_int) from t1 GROUP BY (SELECT sum(t2_int) FROM t2 WHERE t2.t2_id = t1.t1_id)";
-
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let err = dataframe.into_optimized_plan().err().unwrap();
-
-    assert_eq!(
-        r#"Context("check_analyzed_plan", Plan("Correlated scalar subquery in the GROUP BY clause must also be in the aggregate expressions"))"#,
-        &format!("{err:?}")
-    );
-
-    Ok(())
-}
-
 #[tokio::test]
 async fn support_agg_correlated_columns() -> Result<()> {
     let ctx = create_join_context("t1_id", "t2_id", true)?;
diff --git a/datafusion/core/tests/sqllogictests/test_files/subquery.slt b/datafusion/core/tests/sqllogictests/test_files/subquery.slt
index 780b24be63..b47901b277 100644
--- a/datafusion/core/tests/sqllogictests/test_files/subquery.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/subquery.slt
@@ -107,3 +107,144 @@ from t1
 where t1_int = (select max(i) from (values (1)) as s(i));
 ----
 11
+
+# aggregated_correlated_scalar_subquery
+query TT
+explain SELECT t1_id, (SELECT sum(t2_int) FROM t2 WHERE t2.t2_id = t1.t1_id) as t2_sum from t1
+----
+logical_plan
+Projection: t1.t1_id, __scalar_sq_2.SUM(t2.t2_int) AS t2_sum
+--Left Join: t1.t1_id = __scalar_sq_2.t2_id
+----TableScan: t1 projection=[t1_id]
+----SubqueryAlias: __scalar_sq_2
+------Projection: SUM(t2.t2_int), t2.t2_id
+--------Aggregate: groupBy=[[t2.t2_id]], aggr=[[SUM(t2.t2_int)]]
+----------TableScan: t2 projection=[t2_id, t2_int]
+physical_plan
+ProjectionExec: expr=[t1_id@0 as t1_id, SUM(t2.t2_int)@1 as t2_sum]
+--CoalesceBatchesExec: target_batch_size=8192
+----HashJoinExec: mode=Partitioned, join_type=Left, on=[(Column { name: "t1_id", index: 0 }, Column { name: "t2_id", index: 1 })]
+------CoalesceBatchesExec: target_batch_size=8192
+--------RepartitionExec: partitioning=Hash([Column { name: "t1_id", index: 0 }], 4), input_partitions=4
+----------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+------ProjectionExec: expr=[SUM(t2.t2_int)@1 as SUM(t2.t2_int), t2_id@0 as t2_id]
+--------AggregateExec: mode=FinalPartitioned, gby=[t2_id@0 as t2_id], aggr=[SUM(t2.t2_int)]
+----------CoalesceBatchesExec: target_batch_size=8192
+------------RepartitionExec: partitioning=Hash([Column { name: "t2_id", index: 0 }], 4), input_partitions=4
+--------------AggregateExec: mode=Partial, gby=[t2_id@0 as t2_id], aggr=[SUM(t2.t2_int)]
+----------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+
+query II rowsort
+SELECT t1_id, (SELECT sum(t2_int) FROM t2 WHERE t2.t2_id = t1.t1_id) as t2_sum from t1
+----
+11 3
+22 1
+33 NULL
+44 3
+
+# aggregated_correlated_scalar_subquery_with_cast
+query TT
+explain SELECT t1_id, (SELECT sum(t2_int * 1.0) + 1 FROM t2 WHERE t2.t2_id = t1.t1_id) as t2_sum from t1
+----
+logical_plan
+Projection: t1.t1_id, __scalar_sq_4.SUM(t2.t2_int * Float64(1)) + Int64(1) AS t2_sum
+--Left Join: t1.t1_id = __scalar_sq_4.t2_id
+----TableScan: t1 projection=[t1_id]
+----SubqueryAlias: __scalar_sq_4
+------Projection: SUM(t2.t2_int * Float64(1)) + Float64(1) AS SUM(t2.t2_int * Float64(1)) + Int64(1), t2.t2_id
+--------Aggregate: groupBy=[[t2.t2_id]], aggr=[[SUM(CAST(t2.t2_int AS Float64)) AS SUM(t2.t2_int * Float64(1))]]
+----------TableScan: t2 projection=[t2_id, t2_int]
+physical_plan
+ProjectionExec: expr=[t1_id@0 as t1_id, SUM(t2.t2_int * Float64(1)) + Int64(1)@1 as t2_sum]
+--CoalesceBatchesExec: target_batch_size=8192
+----HashJoinExec: mode=Partitioned, join_type=Left, on=[(Column { name: "t1_id", index: 0 }, Column { name: "t2_id", index: 1 })]
+------CoalesceBatchesExec: target_batch_size=8192
+--------RepartitionExec: partitioning=Hash([Column { name: "t1_id", index: 0 }], 4), input_partitions=4
+----------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+------ProjectionExec: expr=[SUM(t2.t2_int * Float64(1))@1 + 1 as SUM(t2.t2_int * Float64(1)) + Int64(1), t2_id@0 as t2_id]
+--------AggregateExec: mode=FinalPartitioned, gby=[t2_id@0 as t2_id], aggr=[SUM(t2.t2_int * Float64(1))]
+----------CoalesceBatchesExec: target_batch_size=8192
+------------RepartitionExec: partitioning=Hash([Column { name: "t2_id", index: 0 }], 4), input_partitions=4
+--------------AggregateExec: mode=Partial, gby=[t2_id@0 as t2_id], aggr=[SUM(t2.t2_int * Float64(1))]
+----------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+
+query IR rowsort
+SELECT t1_id, (SELECT sum(t2_int * 1.0) + 1 FROM t2 WHERE t2.t2_id = t1.t1_id) as t2_sum from t1
+----
+11 4
+22 2
+33 NULL
+44 4
+
+# aggregated_correlated_scalar_subquery_with_extra_group_by_constant
+query TT
+explain SELECT t1_id, (SELECT sum(t2_int) FROM t2 WHERE t2.t2_id = t1.t1_id group by t2_id, 'a') as t2_sum from t1
+----
+logical_plan
+Projection: t1.t1_id, __scalar_sq_6.SUM(t2.t2_int) AS t2_sum
+--Left Join: t1.t1_id = __scalar_sq_6.t2_id
+----TableScan: t1 projection=[t1_id]
+----SubqueryAlias: __scalar_sq_6
+------Projection: SUM(t2.t2_int), t2.t2_id
+--------Aggregate: groupBy=[[t2.t2_id, Utf8("a")]], aggr=[[SUM(t2.t2_int)]]
+----------TableScan: t2 projection=[t2_id, t2_int]
+physical_plan
+ProjectionExec: expr=[t1_id@0 as t1_id, SUM(t2.t2_int)@1 as t2_sum]
+--CoalesceBatchesExec: target_batch_size=8192
+----HashJoinExec: mode=Partitioned, join_type=Left, on=[(Column { name: "t1_id", index: 0 }, Column { name: "t2_id", index: 1 })]
+------CoalesceBatchesExec: target_batch_size=8192
+--------RepartitionExec: partitioning=Hash([Column { name: "t1_id", index: 0 }], 4), input_partitions=4
+----------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+------CoalesceBatchesExec: target_batch_size=8192
+--------RepartitionExec: partitioning=Hash([Column { name: "t2_id", index: 1 }], 4), input_partitions=4
+----------ProjectionExec: expr=[SUM(t2.t2_int)@2 as SUM(t2.t2_int), t2_id@0 as t2_id]
+------------AggregateExec: mode=FinalPartitioned, gby=[t2_id@0 as t2_id, Utf8("a")@1 as Utf8("a")], aggr=[SUM(t2.t2_int)]
+--------------CoalesceBatchesExec: target_batch_size=8192
+----------------RepartitionExec: partitioning=Hash([Column { name: "t2_id", index: 0 }, Column { name: "Utf8(\"a\")", index: 1 }], 4), input_partitions=4
+------------------AggregateExec: mode=Partial, gby=[t2_id@0 as t2_id, a as Utf8("a")], aggr=[SUM(t2.t2_int)]
+--------------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+
+query II rowsort
+SELECT t1_id, (SELECT sum(t2_int) FROM t2 WHERE t2.t2_id = t1.t1_id group by t2_id, 'a') as t2_sum from t1
+----
+11 3
+22 1
+33 NULL
+44 3
+
+# aggregated_correlated_scalar_subquery_with_having
+query TT
+explain SELECT t1_id, (SELECT sum(t2_int) FROM t2 WHERE t2.t2_id = t1.t1_id having sum(t2_int) < 3) as t2_sum from t1
+----
+logical_plan
+Projection: t1.t1_id, __scalar_sq_8.SUM(t2.t2_int) AS t2_sum
+--Left Join: t1.t1_id = __scalar_sq_8.t2_id
+----TableScan: t1 projection=[t1_id]
+----SubqueryAlias: __scalar_sq_8
+------Projection: SUM(t2.t2_int), t2.t2_id
+--------Filter: SUM(t2.t2_int) < Int64(3)
+----------Aggregate: groupBy=[[t2.t2_id]], aggr=[[SUM(t2.t2_int)]]
+------------TableScan: t2 projection=[t2_id, t2_int]
+physical_plan
+ProjectionExec: expr=[t1_id@0 as t1_id, SUM(t2.t2_int)@1 as t2_sum]
+--CoalesceBatchesExec: target_batch_size=8192
+----HashJoinExec: mode=Partitioned, join_type=Left, on=[(Column { name: "t1_id", index: 0 }, Column { name: "t2_id", index: 1 })]
+------CoalesceBatchesExec: target_batch_size=8192
+--------RepartitionExec: partitioning=Hash([Column { name: "t1_id", index: 0 }], 4), input_partitions=4
+----------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+------ProjectionExec: expr=[SUM(t2.t2_int)@1 as SUM(t2.t2_int), t2_id@0 as t2_id]
+--------CoalesceBatchesExec: target_batch_size=8192
+----------FilterExec: SUM(t2.t2_int)@1 < 3
+------------AggregateExec: mode=FinalPartitioned, gby=[t2_id@0 as t2_id], aggr=[SUM(t2.t2_int)]
+--------------CoalesceBatchesExec: target_batch_size=8192
+----------------RepartitionExec: partitioning=Hash([Column { name: "t2_id", index: 0 }], 4), input_partitions=4
+------------------AggregateExec: mode=Partial, gby=[t2_id@0 as t2_id], aggr=[SUM(t2.t2_int)]
+--------------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+
+query II rowsort
+SELECT t1_id, (SELECT sum(t2_int) FROM t2 WHERE t2.t2_id = t1.t1_id having sum(t2_int) < 3) as t2_sum from t1
+----
+11 NULL
+22 1
+33 NULL
+44 NULL