You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/05/05 12:49:07 UTC
[arrow-datafusion] branch main updated: Port test in union.rs to sqllogic (#6224)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 7d11eaf1ce Port test in union.rs to sqllogic (#6224)
7d11eaf1ce is described below
commit 7d11eaf1ce905c562296d8a4ac9cd5b048f995fc
Author: parkma99 <84...@users.noreply.github.com>
AuthorDate: Fri May 5 20:49:02 2023 +0800
Port test in union.rs to sqllogic (#6224)
* Port test in union.rs to sqllogic
* port rest tests in union.rs
* fix clippy error and slt error
---
datafusion/core/tests/sql/mod.rs | 24 ---
datafusion/core/tests/sql/union.rs | 182 ---------------------
.../core/tests/sqllogictests/test_files/union.slt | 170 +++++++++++++++++++
3 files changed, 170 insertions(+), 206 deletions(-)
diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs
index 8c7f50c9a6..ba2329839f 100644
--- a/datafusion/core/tests/sql/mod.rs
+++ b/datafusion/core/tests/sql/mod.rs
@@ -99,7 +99,6 @@ pub mod references;
pub mod select;
pub mod timestamp;
pub mod udf;
-pub mod union;
pub mod wildcard;
pub mod window;
@@ -750,29 +749,6 @@ fn create_sort_merge_join_datatype_context() -> Result<SessionContext> {
Ok(ctx)
}
-fn create_union_context() -> Result<SessionContext> {
- let ctx = SessionContext::with_config(
- SessionConfig::new()
- .with_target_partitions(4)
- .with_batch_size(4096),
- );
- let t1_schema = Arc::new(Schema::new(vec![
- Field::new("id", DataType::Int32, true),
- Field::new("name", DataType::UInt8, true),
- ]));
- let t1_data = RecordBatch::new_empty(t1_schema);
- ctx.register_batch("t1", t1_data)?;
-
- let t2_schema = Arc::new(Schema::new(vec![
- Field::new("id", DataType::UInt8, true),
- Field::new("name", DataType::UInt8, true),
- ]));
- let t2_data = RecordBatch::new_empty(t2_schema);
- ctx.register_batch("t2", t2_data)?;
-
- Ok(ctx)
-}
-
fn create_nested_loop_join_context() -> Result<SessionContext> {
let ctx = SessionContext::with_config(
SessionConfig::new()
diff --git a/datafusion/core/tests/sql/union.rs b/datafusion/core/tests/sql/union.rs
deleted file mode 100644
index cca9c23e67..0000000000
--- a/datafusion/core/tests/sql/union.rs
+++ /dev/null
@@ -1,182 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use super::*;
-
-#[tokio::test]
-async fn union_with_except_input() -> Result<()> {
- let ctx = create_union_context()?;
- let sql = "(
- SELECT name FROM t1
- EXCEPT
- SELECT name FROM t2
- )
- UNION ALL
- (
- SELECT name FROM t2
- EXCEPT
- SELECT name FROM t1
- )";
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(&("explain ".to_owned() + sql)).await.expect(&msg);
- let plan = dataframe.into_optimized_plan()?;
- let expected = vec![
- "Explain [plan_type:Utf8, plan:Utf8]",
- " Union [name:UInt8;N]",
- " LeftAnti Join: t1.name = t2.name [name:UInt8;N]",
- " Aggregate: groupBy=[[t1.name]], aggr=[[]] [name:UInt8;N]",
- " TableScan: t1 projection=[name] [name:UInt8;N]",
- " TableScan: t2 projection=[name] [name:UInt8;N]",
- " LeftAnti Join: t2.name = t1.name [name:UInt8;N]",
- " Aggregate: groupBy=[[t2.name]], aggr=[[]] [name:UInt8;N]",
- " TableScan: t2 projection=[name] [name:UInt8;N]",
- " TableScan: t1 projection=[name] [name:UInt8;N]",
- ];
-
- let formatted = plan.display_indent_schema().to_string();
- let actual: Vec<&str> = formatted.trim().lines().collect();
- assert_eq!(
- expected, actual,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
- Ok(())
-}
-
-#[tokio::test]
-async fn union_with_type_coercion() -> Result<()> {
- let ctx = create_union_context()?;
- let sql = "(
- SELECT id, name FROM t1
- EXCEPT
- SELECT id, name FROM t2
- )
- UNION ALL
- (
- SELECT id, name FROM t2
- EXCEPT
- SELECT id, name FROM t1
- )";
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(&("explain ".to_owned() + sql)).await.expect(&msg);
- let plan = dataframe.into_optimized_plan()?;
- let expected = vec![
- "Explain [plan_type:Utf8, plan:Utf8]",
- " Union [id:Int32;N, name:UInt8;N]",
- " LeftAnti Join: t1.id = CAST(t2.id AS Int32), t1.name = t2.name [id:Int32;N, name:UInt8;N]",
- " Aggregate: groupBy=[[t1.id, t1.name]], aggr=[[]] [id:Int32;N, name:UInt8;N]",
- " TableScan: t1 projection=[id, name] [id:Int32;N, name:UInt8;N]",
- " TableScan: t2 projection=[id, name] [id:UInt8;N, name:UInt8;N]",
- " Projection: CAST(t2.id AS Int32) AS id, t2.name [id:Int32;N, name:UInt8;N]",
- " LeftAnti Join: CAST(t2.id AS Int32) = t1.id, t2.name = t1.name [id:UInt8;N, name:UInt8;N]",
- " Aggregate: groupBy=[[t2.id, t2.name]], aggr=[[]] [id:UInt8;N, name:UInt8;N]",
- " TableScan: t2 projection=[id, name] [id:UInt8;N, name:UInt8;N]",
- " TableScan: t1 projection=[id, name] [id:Int32;N, name:UInt8;N]",
- ];
- let formatted = plan.display_indent_schema().to_string();
- let actual: Vec<&str> = formatted.trim().lines().collect();
- assert_eq!(
- expected, actual,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
- Ok(())
-}
-
-#[tokio::test]
-async fn test_union_upcast_types() -> Result<()> {
- let config = SessionConfig::new()
- .with_repartition_windows(false)
- .with_target_partitions(1);
- let ctx = SessionContext::with_config(config);
- register_aggregate_csv(&ctx).await?;
- let sql = "SELECT c1, c9 FROM aggregate_test_100
- UNION ALL
- SELECT c1, c3 FROM aggregate_test_100
- ORDER BY c9 DESC LIMIT 5";
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
-
- let expected_logical_plan = vec![
- "Limit: skip=0, fetch=5 [c1:Utf8, c9:Int64]",
- " Sort: aggregate_test_100.c9 DESC NULLS FIRST [c1:Utf8, c9:Int64]",
- " Union [c1:Utf8, c9:Int64]",
- " Projection: aggregate_test_100.c1, CAST(aggregate_test_100.c9 AS Int64) AS c9 [c1:Utf8, c9:Int64]",
- " TableScan: aggregate_test_100 [c1:Utf8, c2:UInt32, c3:Int8, c4:Int16, c5:Int32, c6:Int64, c7:UInt8, c8:UInt16, c9:UInt32, c10:UInt64, c11:Float32, c12:Float64, c13:Utf8]",
- " Projection: aggregate_test_100.c1, CAST(aggregate_test_100.c3 AS Int64) AS c9 [c1:Utf8, c9:Int64]",
- " TableScan: aggregate_test_100 [c1:Utf8, c2:UInt32, c3:Int8, c4:Int16, c5:Int32, c6:Int64, c7:UInt8, c8:UInt16, c9:UInt32, c10:UInt64, c11:Float32, c12:Float64, c13:Utf8]",
- ];
- let formatted_logical_plan =
- dataframe.logical_plan().display_indent_schema().to_string();
- let actual_logical_plan: Vec<&str> = formatted_logical_plan.trim().lines().collect();
- assert_eq!(expected_logical_plan, actual_logical_plan, "\n\nexpected:\n\n{expected_logical_plan:#?}\nactual:\n\n{actual_logical_plan:#?}\n\n");
-
- let actual = execute_to_batches(&ctx, sql).await;
-
- let expected = vec![
- "+----+------------+",
- "| c1 | c9 |",
- "+----+------------+",
- "| c | 4268716378 |",
- "| e | 4229654142 |",
- "| d | 4216440507 |",
- "| e | 4144173353 |",
- "| b | 4076864659 |",
- "+----+------------+",
- ];
- assert_batches_eq!(expected, &actual);
-
- Ok(())
-}
-
-#[tokio::test]
-async fn union_with_hash_aggregate() -> Result<()> {
- let ctx = create_union_context()?;
- let sql = "select count(*) from (
- select distinct name from t1
- union all
- select distinct name from t2
- ) group by name";
-
- let dataframe = ctx.sql(sql).await.unwrap();
- let plan = dataframe.into_optimized_plan().unwrap();
- let plan = ctx.state().create_physical_plan(&plan).await.unwrap();
- let formatted = displayable(plan.as_ref()).indent().to_string();
- let actual: Vec<&str> = formatted.trim().lines().collect();
-
- let expected = vec![
- "ProjectionExec: expr=[COUNT(UInt8(1))@1 as COUNT(UInt8(1))]",
- " AggregateExec: mode=Single, gby=[name@0 as name], aggr=[COUNT(UInt8(1))]",
- " InterleaveExec",
- " AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[]",
- " CoalesceBatchesExec: target_batch_size=4096",
- " RepartitionExec: partitioning=Hash([Column { name: \"name\", index: 0 }], 4), input_partitions=4",
- " RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1",
- " AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[]",
- " MemoryExec: partitions=1, partition_sizes=[1]",
- " AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[]",
- " CoalesceBatchesExec: target_batch_size=4096",
- " RepartitionExec: partitioning=Hash([Column { name: \"name\", index: 0 }], 4), input_partitions=4",
- " RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1",
- " AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[]",
- " MemoryExec: partitions=1, partition_sizes=[1]",
- ];
-
- assert_eq!(
- expected, actual,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
- Ok(())
-}
diff --git a/datafusion/core/tests/sqllogictests/test_files/union.slt b/datafusion/core/tests/sqllogictests/test_files/union.slt
index 8a13dfe36f..4860627cff 100644
--- a/datafusion/core/tests/sqllogictests/test_files/union.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/union.slt
@@ -175,6 +175,61 @@ Alice
John
# union_with_type_coercion
+query TT
+explain
+(
+ SELECT id, name FROM t1
+ EXCEPT
+ SELECT id, name FROM t2
+)
+UNION ALL
+(
+ SELECT id, name FROM t2
+ EXCEPT
+ SELECT id, name FROM t1
+)
+----
+logical_plan
+Union
+ LeftAnti Join: t1.id = CAST(t2.id AS Int32), t1.name = t2.name
+ Aggregate: groupBy=[[t1.id, t1.name]], aggr=[[]]
+ TableScan: t1 projection=[id, name]
+ TableScan: t2 projection=[id, name]
+ Projection: CAST(t2.id AS Int32) AS id, t2.name
+ LeftAnti Join: CAST(t2.id AS Int32) = t1.id, t2.name = t1.name
+ Aggregate: groupBy=[[t2.id, t2.name]], aggr=[[]]
+ TableScan: t2 projection=[id, name]
+ TableScan: t1 projection=[id, name]
+physical_plan
+UnionExec
+ ProjectionExec: expr=[id@0 as id, name@1 as name]
+ CoalesceBatchesExec: target_batch_size=8192
+ HashJoinExec: mode=Partitioned, join_type=LeftAnti, on=[(Column { name: "id", index: 0 }, Column { name: "CAST(t2.id AS Int32)", index: 2 }), (Column { name: "name", index: 1 }, Column { name: "name", index: 1 })]
+ AggregateExec: mode=FinalPartitioned, gby=[id@0 as id, name@1 as name], aggr=[]
+ CoalesceBatchesExec: target_batch_size=8192
+ RepartitionExec: partitioning=Hash([Column { name: "id", index: 0 }, Column { name: "name", index: 1 }], 4), input_partitions=4
+ AggregateExec: mode=Partial, gby=[id@0 as id, name@1 as name], aggr=[]
+ MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+ CoalesceBatchesExec: target_batch_size=8192
+ RepartitionExec: partitioning=Hash([Column { name: "CAST(t2.id AS Int32)", index: 2 }, Column { name: "name", index: 1 }], 4), input_partitions=4
+ ProjectionExec: expr=[id@0 as id, name@1 as name, CAST(id@0 AS Int32) as CAST(t2.id AS Int32)]
+ MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+ ProjectionExec: expr=[CAST(id@0 AS Int32) as id, name@1 as name]
+ ProjectionExec: expr=[id@0 as id, name@1 as name]
+ CoalesceBatchesExec: target_batch_size=8192
+ HashJoinExec: mode=Partitioned, join_type=LeftAnti, on=[(Column { name: "CAST(t2.id AS Int32)", index: 2 }, Column { name: "id", index: 0 }), (Column { name: "name", index: 1 }, Column { name: "name", index: 1 })]
+ CoalesceBatchesExec: target_batch_size=8192
+ RepartitionExec: partitioning=Hash([Column { name: "CAST(t2.id AS Int32)", index: 2 }, Column { name: "name", index: 1 }], 4), input_partitions=4
+ ProjectionExec: expr=[id@0 as id, name@1 as name, CAST(id@0 AS Int32) as CAST(t2.id AS Int32)]
+ AggregateExec: mode=FinalPartitioned, gby=[id@0 as id, name@1 as name], aggr=[]
+ CoalesceBatchesExec: target_batch_size=8192
+ RepartitionExec: partitioning=Hash([Column { name: "id", index: 0 }, Column { name: "name", index: 1 }], 4), input_partitions=4
+ AggregateExec: mode=Partial, gby=[id@0 as id, name@1 as name], aggr=[]
+ MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+ CoalesceBatchesExec: target_batch_size=8192
+ RepartitionExec: partitioning=Hash([Column { name: "id", index: 0 }, Column { name: "name", index: 1 }], 4), input_partitions=4
+ MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+
query IT rowsort
(
SELECT id, name FROM t1
@@ -191,6 +246,121 @@ UNION ALL
3 Alice
3 John
+# union_with_except_input
+query TT
+explain
+(
+ SELECT name FROM t1
+ EXCEPT
+ SELECT name FROM t2
+)
+UNION ALL
+(
+ SELECT name FROM t2
+ EXCEPT
+ SELECT name FROM t1
+)
+----
+logical_plan
+Union
+ LeftAnti Join: t1.name = t2.name
+ Aggregate: groupBy=[[t1.name]], aggr=[[]]
+ TableScan: t1 projection=[name]
+ TableScan: t2 projection=[name]
+ LeftAnti Join: t2.name = t1.name
+ Aggregate: groupBy=[[t2.name]], aggr=[[]]
+ TableScan: t2 projection=[name]
+ TableScan: t1 projection=[name]
+physical_plan
+ InterleaveExec
+ CoalesceBatchesExec: target_batch_size=8192
+ HashJoinExec: mode=Partitioned, join_type=LeftAnti, on=[(Column { name: "name", index: 0 }, Column { name: "name", index: 0 })]
+ AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[]
+ CoalesceBatchesExec: target_batch_size=8192
+ RepartitionExec: partitioning=Hash([Column { name: "name", index: 0 }], 4), input_partitions=4
+ AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[]
+ MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+ CoalesceBatchesExec: target_batch_size=8192
+ RepartitionExec: partitioning=Hash([Column { name: "name", index: 0 }], 4), input_partitions=4
+ MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+ CoalesceBatchesExec: target_batch_size=8192
+ HashJoinExec: mode=Partitioned, join_type=LeftAnti, on=[(Column { name: "name", index: 0 }, Column { name: "name", index: 0 })]
+ AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[]
+ CoalesceBatchesExec: target_batch_size=8192
+ RepartitionExec: partitioning=Hash([Column { name: "name", index: 0 }], 4), input_partitions=4
+ AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[]
+ MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+ CoalesceBatchesExec: target_batch_size=8192
+ RepartitionExec: partitioning=Hash([Column { name: "name", index: 0 }], 4), input_partitions=4
+ MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+
+# union_upcast_types
+query TT
+explain SELECT c1, c9 FROM aggregate_test_100 UNION ALL SELECT c1, c3 FROM aggregate_test_100 ORDER BY c9 DESC LIMIT 5
+----
+logical_plan
+Limit: skip=0, fetch=5
+ Sort: aggregate_test_100.c9 DESC NULLS FIRST, fetch=5
+ Union
+ Projection: aggregate_test_100.c1, CAST(aggregate_test_100.c9 AS Int64) AS c9
+ TableScan: aggregate_test_100 projection=[c1, c9]
+ Projection: aggregate_test_100.c1, CAST(aggregate_test_100.c3 AS Int64) AS c9
+ TableScan: aggregate_test_100 projection=[c1, c3]
+physical_plan
+GlobalLimitExec: skip=0, fetch=5
+ SortPreservingMergeExec: [c9@1 DESC]
+ UnionExec
+ SortExec: expr=[c9@1 DESC]
+ ProjectionExec: expr=[c1@0 as c1, CAST(c9@1 AS Int64) as c9]
+ RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+ CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c9], has_header=true
+ SortExec: expr=[c9@1 DESC]
+ ProjectionExec: expr=[c1@0 as c1, CAST(c3@1 AS Int64) as c9]
+ RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+ CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c3], has_header=true
+
+query TI
+SELECT c1, c9 FROM aggregate_test_100 UNION ALL SELECT c1, c3 FROM aggregate_test_100 ORDER BY c9 DESC LIMIT 5
+----
+c 4268716378
+e 4229654142
+d 4216440507
+e 4144173353
+b 4076864659
+
+# union_with_hash_aggregate
+query TT
+explain
+SELECT count(*) FROM (
+ SELECT distinct name FROM t1
+ UNION ALL
+ SELECT distinct name FROM t2
+) GROUP BY name
+----
+logical_plan
+Projection: COUNT(UInt8(1))
+ Aggregate: groupBy=[[t1.name]], aggr=[[COUNT(UInt8(1))]]
+ Union
+ Aggregate: groupBy=[[t1.name]], aggr=[[]]
+ TableScan: t1 projection=[name]
+ Aggregate: groupBy=[[t2.name]], aggr=[[]]
+ TableScan: t2 projection=[name]
+physical_plan
+ProjectionExec: expr=[COUNT(UInt8(1))@1 as COUNT(UInt8(1))]
+ AggregateExec: mode=Single, gby=[name@0 as name], aggr=[COUNT(UInt8(1))]
+ InterleaveExec
+ AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[]
+ CoalesceBatchesExec: target_batch_size=8192
+ RepartitionExec: partitioning=Hash([Column { name: "name", index: 0 }], 4), input_partitions=4
+ AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[]
+ MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+ AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[]
+ CoalesceBatchesExec: target_batch_size=8192
+ RepartitionExec: partitioning=Hash([Column { name: "name", index: 0 }], 4), input_partitions=4
+ AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[]
+ MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+
+
########
# Clean up after the test
########