You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ja...@apache.org on 2023/06/13 15:07:26 UTC
[arrow-datafusion] branch main updated: fix: port unstable subquery to sqllogicaltest (#6659)
This is an automated email from the ASF dual-hosted git repository.
jakevin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 6558a8216a fix: port unstable subquery to sqllogicaltest (#6659)
6558a8216a is described below
commit 6558a8216a45603b0b2d261bd41c121d1a840520
Author: jakevin <ja...@gmail.com>
AuthorDate: Tue Jun 13 23:07:20 2023 +0800
fix: port unstable subquery to sqllogicaltest (#6659)
* refactor: port subquery to sqllogicaltest
* add rowsort
---
datafusion/core/tests/sql/subqueries.rs | 187 ---------------------
.../tests/sqllogictests/test_files/subquery.slt | 141 ++++++++++++++++
2 files changed, 141 insertions(+), 187 deletions(-)
diff --git a/datafusion/core/tests/sql/subqueries.rs b/datafusion/core/tests/sql/subqueries.rs
index d96c6eb700..1fd627bc77 100644
--- a/datafusion/core/tests/sql/subqueries.rs
+++ b/datafusion/core/tests/sql/subqueries.rs
@@ -380,133 +380,6 @@ async fn non_equal_correlated_scalar_subquery() -> Result<()> {
Ok(())
}
-#[tokio::test]
-async fn aggregated_correlated_scalar_subquery() -> Result<()> {
- let ctx = create_join_context("t1_id", "t2_id", true)?;
-
- let sql = "SELECT t1_id, (SELECT sum(t2_int) FROM t2 WHERE t2.t2_id = t1.t1_id) as t2_sum from t1";
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let plan = dataframe.into_optimized_plan()?;
-
- let expected = vec![
- "Projection: t1.t1_id, __scalar_sq_1.SUM(t2.t2_int) AS t2_sum [t1_id:UInt32;N, t2_sum:UInt64;N]",
- " Left Join: t1.t1_id = __scalar_sq_1.t2_id [t1_id:UInt32;N, SUM(t2.t2_int):UInt64;N, t2_id:UInt32;N]",
- " TableScan: t1 projection=[t1_id] [t1_id:UInt32;N]",
- " SubqueryAlias: __scalar_sq_1 [SUM(t2.t2_int):UInt64;N, t2_id:UInt32;N]",
- " Projection: SUM(t2.t2_int), t2.t2_id [SUM(t2.t2_int):UInt64;N, t2_id:UInt32;N]",
- " Aggregate: groupBy=[[t2.t2_id]], aggr=[[SUM(t2.t2_int)]] [t2_id:UInt32;N, SUM(t2.t2_int):UInt64;N]",
- " TableScan: t2 projection=[t2_id, t2_int] [t2_id:UInt32;N, t2_int:UInt32;N]",
- ];
- let formatted = plan.display_indent_schema().to_string();
- let actual: Vec<&str> = formatted.trim().lines().collect();
- assert_eq!(
- expected, actual,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
-
- // assert data
- let results = execute_to_batches(&ctx, sql).await;
- let expected = vec![
- "+-------+--------+",
- "| t1_id | t2_sum |",
- "+-------+--------+",
- "| 11 | 3 |",
- "| 22 | 1 |",
- "| 44 | 3 |",
- "| 33 | |",
- "+-------+--------+",
- ];
- assert_batches_eq!(expected, &results);
-
- Ok(())
-}
-
-#[tokio::test]
-async fn aggregated_correlated_scalar_subquery_with_having() -> Result<()> {
- let ctx = create_join_context("t1_id", "t2_id", true)?;
-
- let sql = "SELECT t1_id, (SELECT sum(t2_int) FROM t2 WHERE t2.t2_id = t1.t1_id having sum(t2_int) < 3) as t2_sum from t1";
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let plan = dataframe.into_optimized_plan()?;
-
- let expected = vec![
- "Projection: t1.t1_id, __scalar_sq_1.SUM(t2.t2_int) AS t2_sum [t1_id:UInt32;N, t2_sum:UInt64;N]",
- " Left Join: t1.t1_id = __scalar_sq_1.t2_id [t1_id:UInt32;N, SUM(t2.t2_int):UInt64;N, t2_id:UInt32;N]",
- " TableScan: t1 projection=[t1_id] [t1_id:UInt32;N]",
- " SubqueryAlias: __scalar_sq_1 [SUM(t2.t2_int):UInt64;N, t2_id:UInt32;N]",
- " Projection: SUM(t2.t2_int), t2.t2_id [SUM(t2.t2_int):UInt64;N, t2_id:UInt32;N]",
- " Filter: SUM(t2.t2_int) < UInt64(3) [t2_id:UInt32;N, SUM(t2.t2_int):UInt64;N]",
- " Aggregate: groupBy=[[t2.t2_id]], aggr=[[SUM(t2.t2_int)]] [t2_id:UInt32;N, SUM(t2.t2_int):UInt64;N]",
- " TableScan: t2 projection=[t2_id, t2_int] [t2_id:UInt32;N, t2_int:UInt32;N]",
- ];
- let formatted = plan.display_indent_schema().to_string();
- let actual: Vec<&str> = formatted.trim().lines().collect();
- assert_eq!(
- expected, actual,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
-
- // assert data
- let results = execute_to_batches(&ctx, sql).await;
- let expected = vec![
- "+-------+--------+",
- "| t1_id | t2_sum |",
- "+-------+--------+",
- "| 22 | 1 |",
- "| 11 | |",
- "| 33 | |",
- "| 44 | |",
- "+-------+--------+",
- ];
- assert_batches_eq!(expected, &results);
-
- Ok(())
-}
-
-#[tokio::test]
-async fn aggregated_correlated_scalar_subquery_with_cast() -> Result<()> {
- let ctx = create_join_context("t1_id", "t2_id", true)?;
-
- let sql = "SELECT t1_id, (SELECT sum(t2_int * 1.0) + 1 FROM t2 WHERE t2.t2_id = t1.t1_id) as t2_sum from t1";
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let plan = dataframe.into_optimized_plan()?;
-
- let expected = vec![
- "Projection: t1.t1_id, __scalar_sq_1.SUM(t2.t2_int * Float64(1)) + Int64(1) AS t2_sum [t1_id:UInt32;N, t2_sum:Float64;N]",
- " Left Join: t1.t1_id = __scalar_sq_1.t2_id [t1_id:UInt32;N, SUM(t2.t2_int * Float64(1)) + Int64(1):Float64;N, t2_id:UInt32;N]",
- " TableScan: t1 projection=[t1_id] [t1_id:UInt32;N]",
- " SubqueryAlias: __scalar_sq_1 [SUM(t2.t2_int * Float64(1)) + Int64(1):Float64;N, t2_id:UInt32;N]",
- " Projection: SUM(t2.t2_int * Float64(1)) + Float64(1) AS SUM(t2.t2_int * Float64(1)) + Int64(1), t2.t2_id [SUM(t2.t2_int * Float64(1)) + Int64(1):Float64;N, t2_id:UInt32;N]",
- " Aggregate: groupBy=[[t2.t2_id]], aggr=[[SUM(CAST(t2.t2_int AS Float64)) AS SUM(t2.t2_int * Float64(1))]] [t2_id:UInt32;N, SUM(t2.t2_int * Float64(1)):Float64;N]",
- " TableScan: t2 projection=[t2_id, t2_int] [t2_id:UInt32;N, t2_int:UInt32;N]",
- ];
- let formatted = plan.display_indent_schema().to_string();
- let actual: Vec<&str> = formatted.trim().lines().collect();
- assert_eq!(
- expected, actual,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
-
- // assert data
- let results = execute_to_batches(&ctx, sql).await;
- let expected = vec![
- "+-------+--------+",
- "| t1_id | t2_sum |",
- "+-------+--------+",
- "| 11 | 4.0 |",
- "| 22 | 2.0 |",
- "| 44 | 4.0 |",
- "| 33 | |",
- "+-------+--------+",
- ];
- assert_batches_eq!(expected, &results);
-
- Ok(())
-}
-
#[tokio::test]
async fn aggregated_correlated_scalar_subquery_with_extra_group_by_columns() -> Result<()>
{
@@ -525,66 +398,6 @@ async fn aggregated_correlated_scalar_subquery_with_extra_group_by_columns() ->
Ok(())
}
-#[tokio::test]
-async fn aggregated_correlated_scalar_subquery_with_extra_group_by_constant() -> Result<()>
-{
- let ctx = create_join_context("t1_id", "t2_id", true)?;
-
- let sql = "SELECT t1_id, (SELECT sum(t2_int) FROM t2 WHERE t2.t2_id = t1.t1_id group by t2_id, 'a') as t2_sum from t1";
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let plan = dataframe.into_optimized_plan()?;
-
- let expected = vec![
- "Projection: t1.t1_id, __scalar_sq_1.SUM(t2.t2_int) AS t2_sum [t1_id:UInt32;N, t2_sum:UInt64;N]",
- " Left Join: t1.t1_id = __scalar_sq_1.t2_id [t1_id:UInt32;N, SUM(t2.t2_int):UInt64;N, t2_id:UInt32;N]",
- " TableScan: t1 projection=[t1_id] [t1_id:UInt32;N]",
- " SubqueryAlias: __scalar_sq_1 [SUM(t2.t2_int):UInt64;N, t2_id:UInt32;N]",
- " Projection: SUM(t2.t2_int), t2.t2_id [SUM(t2.t2_int):UInt64;N, t2_id:UInt32;N]",
- " Aggregate: groupBy=[[t2.t2_id, Utf8(\"a\")]], aggr=[[SUM(t2.t2_int)]] [t2_id:UInt32;N, Utf8(\"a\"):Utf8, SUM(t2.t2_int):UInt64;N]",
- " TableScan: t2 projection=[t2_id, t2_int] [t2_id:UInt32;N, t2_int:UInt32;N]",
- ];
- let formatted = plan.display_indent_schema().to_string();
- let actual: Vec<&str> = formatted.trim().lines().collect();
- assert_eq!(
- expected, actual,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
-
- // assert data
- let results = execute_to_batches(&ctx, sql).await;
- let expected = vec![
- "+-------+--------+",
- "| t1_id | t2_sum |",
- "+-------+--------+",
- "| 11 | 3 |",
- "| 22 | 1 |",
- "| 44 | 3 |",
- "| 33 | |",
- "+-------+--------+",
- ];
- assert_batches_eq!(expected, &results);
-
- Ok(())
-}
-
-#[tokio::test]
-async fn group_by_correlated_scalar_subquery() -> Result<()> {
- let ctx = create_join_context("t1_id", "t2_id", true)?;
- let sql = "SELECT sum(t1_int) from t1 GROUP BY (SELECT sum(t2_int) FROM t2 WHERE t2.t2_id = t1.t1_id)";
-
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let err = dataframe.into_optimized_plan().err().unwrap();
-
- assert_eq!(
- r#"Context("check_analyzed_plan", Plan("Correlated scalar subquery in the GROUP BY clause must also be in the aggregate expressions"))"#,
- &format!("{err:?}")
- );
-
- Ok(())
-}
-
#[tokio::test]
async fn support_agg_correlated_columns() -> Result<()> {
let ctx = create_join_context("t1_id", "t2_id", true)?;
diff --git a/datafusion/core/tests/sqllogictests/test_files/subquery.slt b/datafusion/core/tests/sqllogictests/test_files/subquery.slt
index 780b24be63..b47901b277 100644
--- a/datafusion/core/tests/sqllogictests/test_files/subquery.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/subquery.slt
@@ -107,3 +107,144 @@ from t1
where t1_int = (select max(i) from (values (1)) as s(i));
----
11
+
+# aggregated_correlated_scalar_subquery
+query TT
+explain SELECT t1_id, (SELECT sum(t2_int) FROM t2 WHERE t2.t2_id = t1.t1_id) as t2_sum from t1
+----
+logical_plan
+Projection: t1.t1_id, __scalar_sq_2.SUM(t2.t2_int) AS t2_sum
+--Left Join: t1.t1_id = __scalar_sq_2.t2_id
+----TableScan: t1 projection=[t1_id]
+----SubqueryAlias: __scalar_sq_2
+------Projection: SUM(t2.t2_int), t2.t2_id
+--------Aggregate: groupBy=[[t2.t2_id]], aggr=[[SUM(t2.t2_int)]]
+----------TableScan: t2 projection=[t2_id, t2_int]
+physical_plan
+ProjectionExec: expr=[t1_id@0 as t1_id, SUM(t2.t2_int)@1 as t2_sum]
+--CoalesceBatchesExec: target_batch_size=8192
+----HashJoinExec: mode=Partitioned, join_type=Left, on=[(Column { name: "t1_id", index: 0 }, Column { name: "t2_id", index: 1 })]
+------CoalesceBatchesExec: target_batch_size=8192
+--------RepartitionExec: partitioning=Hash([Column { name: "t1_id", index: 0 }], 4), input_partitions=4
+----------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+------ProjectionExec: expr=[SUM(t2.t2_int)@1 as SUM(t2.t2_int), t2_id@0 as t2_id]
+--------AggregateExec: mode=FinalPartitioned, gby=[t2_id@0 as t2_id], aggr=[SUM(t2.t2_int)]
+----------CoalesceBatchesExec: target_batch_size=8192
+------------RepartitionExec: partitioning=Hash([Column { name: "t2_id", index: 0 }], 4), input_partitions=4
+--------------AggregateExec: mode=Partial, gby=[t2_id@0 as t2_id], aggr=[SUM(t2.t2_int)]
+----------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+
+query II rowsort
+SELECT t1_id, (SELECT sum(t2_int) FROM t2 WHERE t2.t2_id = t1.t1_id) as t2_sum from t1
+----
+11 3
+22 1
+33 NULL
+44 3
+
+# aggregated_correlated_scalar_subquery_with_cast
+query TT
+explain SELECT t1_id, (SELECT sum(t2_int * 1.0) + 1 FROM t2 WHERE t2.t2_id = t1.t1_id) as t2_sum from t1
+----
+logical_plan
+Projection: t1.t1_id, __scalar_sq_4.SUM(t2.t2_int * Float64(1)) + Int64(1) AS t2_sum
+--Left Join: t1.t1_id = __scalar_sq_4.t2_id
+----TableScan: t1 projection=[t1_id]
+----SubqueryAlias: __scalar_sq_4
+------Projection: SUM(t2.t2_int * Float64(1)) + Float64(1) AS SUM(t2.t2_int * Float64(1)) + Int64(1), t2.t2_id
+--------Aggregate: groupBy=[[t2.t2_id]], aggr=[[SUM(CAST(t2.t2_int AS Float64)) AS SUM(t2.t2_int * Float64(1))]]
+----------TableScan: t2 projection=[t2_id, t2_int]
+physical_plan
+ProjectionExec: expr=[t1_id@0 as t1_id, SUM(t2.t2_int * Float64(1)) + Int64(1)@1 as t2_sum]
+--CoalesceBatchesExec: target_batch_size=8192
+----HashJoinExec: mode=Partitioned, join_type=Left, on=[(Column { name: "t1_id", index: 0 }, Column { name: "t2_id", index: 1 })]
+------CoalesceBatchesExec: target_batch_size=8192
+--------RepartitionExec: partitioning=Hash([Column { name: "t1_id", index: 0 }], 4), input_partitions=4
+----------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+------ProjectionExec: expr=[SUM(t2.t2_int * Float64(1))@1 + 1 as SUM(t2.t2_int * Float64(1)) + Int64(1), t2_id@0 as t2_id]
+--------AggregateExec: mode=FinalPartitioned, gby=[t2_id@0 as t2_id], aggr=[SUM(t2.t2_int * Float64(1))]
+----------CoalesceBatchesExec: target_batch_size=8192
+------------RepartitionExec: partitioning=Hash([Column { name: "t2_id", index: 0 }], 4), input_partitions=4
+--------------AggregateExec: mode=Partial, gby=[t2_id@0 as t2_id], aggr=[SUM(t2.t2_int * Float64(1))]
+----------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+
+query IR rowsort
+SELECT t1_id, (SELECT sum(t2_int * 1.0) + 1 FROM t2 WHERE t2.t2_id = t1.t1_id) as t2_sum from t1
+----
+11 4
+22 2
+33 NULL
+44 4
+
+# aggregated_correlated_scalar_subquery_with_extra_group_by_constant
+query TT
+explain SELECT t1_id, (SELECT sum(t2_int) FROM t2 WHERE t2.t2_id = t1.t1_id group by t2_id, 'a') as t2_sum from t1
+----
+logical_plan
+Projection: t1.t1_id, __scalar_sq_6.SUM(t2.t2_int) AS t2_sum
+--Left Join: t1.t1_id = __scalar_sq_6.t2_id
+----TableScan: t1 projection=[t1_id]
+----SubqueryAlias: __scalar_sq_6
+------Projection: SUM(t2.t2_int), t2.t2_id
+--------Aggregate: groupBy=[[t2.t2_id, Utf8("a")]], aggr=[[SUM(t2.t2_int)]]
+----------TableScan: t2 projection=[t2_id, t2_int]
+physical_plan
+ProjectionExec: expr=[t1_id@0 as t1_id, SUM(t2.t2_int)@1 as t2_sum]
+--CoalesceBatchesExec: target_batch_size=8192
+----HashJoinExec: mode=Partitioned, join_type=Left, on=[(Column { name: "t1_id", index: 0 }, Column { name: "t2_id", index: 1 })]
+------CoalesceBatchesExec: target_batch_size=8192
+--------RepartitionExec: partitioning=Hash([Column { name: "t1_id", index: 0 }], 4), input_partitions=4
+----------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+------CoalesceBatchesExec: target_batch_size=8192
+--------RepartitionExec: partitioning=Hash([Column { name: "t2_id", index: 1 }], 4), input_partitions=4
+----------ProjectionExec: expr=[SUM(t2.t2_int)@2 as SUM(t2.t2_int), t2_id@0 as t2_id]
+------------AggregateExec: mode=FinalPartitioned, gby=[t2_id@0 as t2_id, Utf8("a")@1 as Utf8("a")], aggr=[SUM(t2.t2_int)]
+--------------CoalesceBatchesExec: target_batch_size=8192
+----------------RepartitionExec: partitioning=Hash([Column { name: "t2_id", index: 0 }, Column { name: "Utf8(\"a\")", index: 1 }], 4), input_partitions=4
+------------------AggregateExec: mode=Partial, gby=[t2_id@0 as t2_id, a as Utf8("a")], aggr=[SUM(t2.t2_int)]
+--------------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+
+query II rowsort
+SELECT t1_id, (SELECT sum(t2_int) FROM t2 WHERE t2.t2_id = t1.t1_id group by t2_id, 'a') as t2_sum from t1
+----
+11 3
+22 1
+33 NULL
+44 3
+
+# aggregated_correlated_scalar_subquery_with_having
+query TT
+explain SELECT t1_id, (SELECT sum(t2_int) FROM t2 WHERE t2.t2_id = t1.t1_id having sum(t2_int) < 3) as t2_sum from t1
+----
+logical_plan
+Projection: t1.t1_id, __scalar_sq_8.SUM(t2.t2_int) AS t2_sum
+--Left Join: t1.t1_id = __scalar_sq_8.t2_id
+----TableScan: t1 projection=[t1_id]
+----SubqueryAlias: __scalar_sq_8
+------Projection: SUM(t2.t2_int), t2.t2_id
+--------Filter: SUM(t2.t2_int) < Int64(3)
+----------Aggregate: groupBy=[[t2.t2_id]], aggr=[[SUM(t2.t2_int)]]
+------------TableScan: t2 projection=[t2_id, t2_int]
+physical_plan
+ProjectionExec: expr=[t1_id@0 as t1_id, SUM(t2.t2_int)@1 as t2_sum]
+--CoalesceBatchesExec: target_batch_size=8192
+----HashJoinExec: mode=Partitioned, join_type=Left, on=[(Column { name: "t1_id", index: 0 }, Column { name: "t2_id", index: 1 })]
+------CoalesceBatchesExec: target_batch_size=8192
+--------RepartitionExec: partitioning=Hash([Column { name: "t1_id", index: 0 }], 4), input_partitions=4
+----------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+------ProjectionExec: expr=[SUM(t2.t2_int)@1 as SUM(t2.t2_int), t2_id@0 as t2_id]
+--------CoalesceBatchesExec: target_batch_size=8192
+----------FilterExec: SUM(t2.t2_int)@1 < 3
+------------AggregateExec: mode=FinalPartitioned, gby=[t2_id@0 as t2_id], aggr=[SUM(t2.t2_int)]
+--------------CoalesceBatchesExec: target_batch_size=8192
+----------------RepartitionExec: partitioning=Hash([Column { name: "t2_id", index: 0 }], 4), input_partitions=4
+------------------AggregateExec: mode=Partial, gby=[t2_id@0 as t2_id], aggr=[SUM(t2.t2_int)]
+--------------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+
+query II rowsort
+SELECT t1_id, (SELECT sum(t2_int) FROM t2 WHERE t2.t2_id = t1.t1_id having sum(t2_int) < 3) as t2_sum from t1
+----
+11 NULL
+22 1
+33 NULL
+44 NULL