You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/05/05 12:49:07 UTC

[arrow-datafusion] branch main updated: Port test in union.rs to sqllogic (#6224)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 7d11eaf1ce Port test in union.rs to sqllogic (#6224)
7d11eaf1ce is described below

commit 7d11eaf1ce905c562296d8a4ac9cd5b048f995fc
Author: parkma99 <84...@users.noreply.github.com>
AuthorDate: Fri May 5 20:49:02 2023 +0800

    Port test in union.rs to sqllogic (#6224)
    
    * Port test in union.rs to sqllogic
    
    * port rest tests in union.rs
    
    * fix clippy error and slt error
---
 datafusion/core/tests/sql/mod.rs                   |  24 ---
 datafusion/core/tests/sql/union.rs                 | 182 ---------------------
 .../core/tests/sqllogictests/test_files/union.slt  | 170 +++++++++++++++++++
 3 files changed, 170 insertions(+), 206 deletions(-)

diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs
index 8c7f50c9a6..ba2329839f 100644
--- a/datafusion/core/tests/sql/mod.rs
+++ b/datafusion/core/tests/sql/mod.rs
@@ -99,7 +99,6 @@ pub mod references;
 pub mod select;
 pub mod timestamp;
 pub mod udf;
-pub mod union;
 pub mod wildcard;
 pub mod window;
 
@@ -750,29 +749,6 @@ fn create_sort_merge_join_datatype_context() -> Result<SessionContext> {
     Ok(ctx)
 }
 
-fn create_union_context() -> Result<SessionContext> {
-    let ctx = SessionContext::with_config(
-        SessionConfig::new()
-            .with_target_partitions(4)
-            .with_batch_size(4096),
-    );
-    let t1_schema = Arc::new(Schema::new(vec![
-        Field::new("id", DataType::Int32, true),
-        Field::new("name", DataType::UInt8, true),
-    ]));
-    let t1_data = RecordBatch::new_empty(t1_schema);
-    ctx.register_batch("t1", t1_data)?;
-
-    let t2_schema = Arc::new(Schema::new(vec![
-        Field::new("id", DataType::UInt8, true),
-        Field::new("name", DataType::UInt8, true),
-    ]));
-    let t2_data = RecordBatch::new_empty(t2_schema);
-    ctx.register_batch("t2", t2_data)?;
-
-    Ok(ctx)
-}
-
 fn create_nested_loop_join_context() -> Result<SessionContext> {
     let ctx = SessionContext::with_config(
         SessionConfig::new()
diff --git a/datafusion/core/tests/sql/union.rs b/datafusion/core/tests/sql/union.rs
deleted file mode 100644
index cca9c23e67..0000000000
--- a/datafusion/core/tests/sql/union.rs
+++ /dev/null
@@ -1,182 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use super::*;
-
-#[tokio::test]
-async fn union_with_except_input() -> Result<()> {
-    let ctx = create_union_context()?;
-    let sql = "(
-        SELECT name FROM t1
-        EXCEPT
-        SELECT name FROM t2
-    )
-    UNION ALL
-    (
-        SELECT name FROM t2
-        EXCEPT
-        SELECT name FROM t1
-    )";
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(&("explain ".to_owned() + sql)).await.expect(&msg);
-    let plan = dataframe.into_optimized_plan()?;
-    let expected = vec![
-        "Explain [plan_type:Utf8, plan:Utf8]",
-        "  Union [name:UInt8;N]",
-        "    LeftAnti Join: t1.name = t2.name [name:UInt8;N]",
-        "      Aggregate: groupBy=[[t1.name]], aggr=[[]] [name:UInt8;N]",
-        "        TableScan: t1 projection=[name] [name:UInt8;N]",
-        "      TableScan: t2 projection=[name] [name:UInt8;N]",
-        "    LeftAnti Join: t2.name = t1.name [name:UInt8;N]",
-        "      Aggregate: groupBy=[[t2.name]], aggr=[[]] [name:UInt8;N]",
-        "        TableScan: t2 projection=[name] [name:UInt8;N]",
-        "      TableScan: t1 projection=[name] [name:UInt8;N]",
-    ];
-
-    let formatted = plan.display_indent_schema().to_string();
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    assert_eq!(
-        expected, actual,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-    Ok(())
-}
-
-#[tokio::test]
-async fn union_with_type_coercion() -> Result<()> {
-    let ctx = create_union_context()?;
-    let sql = "(
-        SELECT id, name FROM t1
-        EXCEPT
-        SELECT id, name FROM t2
-    )
-    UNION ALL
-    (
-        SELECT id, name FROM t2
-        EXCEPT
-        SELECT id, name FROM t1
-    )";
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(&("explain ".to_owned() + sql)).await.expect(&msg);
-    let plan = dataframe.into_optimized_plan()?;
-    let expected = vec![
-        "Explain [plan_type:Utf8, plan:Utf8]",
-        "  Union [id:Int32;N, name:UInt8;N]",
-        "    LeftAnti Join: t1.id = CAST(t2.id AS Int32), t1.name = t2.name [id:Int32;N, name:UInt8;N]",
-        "      Aggregate: groupBy=[[t1.id, t1.name]], aggr=[[]] [id:Int32;N, name:UInt8;N]",
-        "        TableScan: t1 projection=[id, name] [id:Int32;N, name:UInt8;N]",
-        "      TableScan: t2 projection=[id, name] [id:UInt8;N, name:UInt8;N]",
-        "    Projection: CAST(t2.id AS Int32) AS id, t2.name [id:Int32;N, name:UInt8;N]",
-        "      LeftAnti Join: CAST(t2.id AS Int32) = t1.id, t2.name = t1.name [id:UInt8;N, name:UInt8;N]",
-        "        Aggregate: groupBy=[[t2.id, t2.name]], aggr=[[]] [id:UInt8;N, name:UInt8;N]",
-        "          TableScan: t2 projection=[id, name] [id:UInt8;N, name:UInt8;N]",
-        "        TableScan: t1 projection=[id, name] [id:Int32;N, name:UInt8;N]",
-    ];
-    let formatted = plan.display_indent_schema().to_string();
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    assert_eq!(
-        expected, actual,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-    Ok(())
-}
-
-#[tokio::test]
-async fn test_union_upcast_types() -> Result<()> {
-    let config = SessionConfig::new()
-        .with_repartition_windows(false)
-        .with_target_partitions(1);
-    let ctx = SessionContext::with_config(config);
-    register_aggregate_csv(&ctx).await?;
-    let sql = "SELECT c1, c9 FROM aggregate_test_100 
-                     UNION ALL 
-                     SELECT c1, c3 FROM aggregate_test_100 
-                     ORDER BY c9 DESC LIMIT 5";
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-
-    let expected_logical_plan = vec![
-        "Limit: skip=0, fetch=5 [c1:Utf8, c9:Int64]",
-        "  Sort: aggregate_test_100.c9 DESC NULLS FIRST [c1:Utf8, c9:Int64]",
-        "    Union [c1:Utf8, c9:Int64]",
-        "      Projection: aggregate_test_100.c1, CAST(aggregate_test_100.c9 AS Int64) AS c9 [c1:Utf8, c9:Int64]",
-        "        TableScan: aggregate_test_100 [c1:Utf8, c2:UInt32, c3:Int8, c4:Int16, c5:Int32, c6:Int64, c7:UInt8, c8:UInt16, c9:UInt32, c10:UInt64, c11:Float32, c12:Float64, c13:Utf8]",
-        "      Projection: aggregate_test_100.c1, CAST(aggregate_test_100.c3 AS Int64) AS c9 [c1:Utf8, c9:Int64]",
-        "        TableScan: aggregate_test_100 [c1:Utf8, c2:UInt32, c3:Int8, c4:Int16, c5:Int32, c6:Int64, c7:UInt8, c8:UInt16, c9:UInt32, c10:UInt64, c11:Float32, c12:Float64, c13:Utf8]",
-    ];
-    let formatted_logical_plan =
-        dataframe.logical_plan().display_indent_schema().to_string();
-    let actual_logical_plan: Vec<&str> = formatted_logical_plan.trim().lines().collect();
-    assert_eq!(expected_logical_plan, actual_logical_plan, "\n\nexpected:\n\n{expected_logical_plan:#?}\nactual:\n\n{actual_logical_plan:#?}\n\n");
-
-    let actual = execute_to_batches(&ctx, sql).await;
-
-    let expected = vec![
-        "+----+------------+",
-        "| c1 | c9         |",
-        "+----+------------+",
-        "| c  | 4268716378 |",
-        "| e  | 4229654142 |",
-        "| d  | 4216440507 |",
-        "| e  | 4144173353 |",
-        "| b  | 4076864659 |",
-        "+----+------------+",
-    ];
-    assert_batches_eq!(expected, &actual);
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn union_with_hash_aggregate() -> Result<()> {
-    let ctx = create_union_context()?;
-    let sql = "select count(*) from (
-        select distinct name from t1
-        union all
-        select distinct name from t2
-        ) group by name";
-
-    let dataframe = ctx.sql(sql).await.unwrap();
-    let plan = dataframe.into_optimized_plan().unwrap();
-    let plan = ctx.state().create_physical_plan(&plan).await.unwrap();
-    let formatted = displayable(plan.as_ref()).indent().to_string();
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-
-    let expected = vec![
-        "ProjectionExec: expr=[COUNT(UInt8(1))@1 as COUNT(UInt8(1))]",
-        "  AggregateExec: mode=Single, gby=[name@0 as name], aggr=[COUNT(UInt8(1))]",
-        "    InterleaveExec",
-        "      AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[]",
-        "        CoalesceBatchesExec: target_batch_size=4096",
-        "          RepartitionExec: partitioning=Hash([Column { name: \"name\", index: 0 }], 4), input_partitions=4",
-        "            RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1",
-        "              AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[]",
-        "                MemoryExec: partitions=1, partition_sizes=[1]",
-        "      AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[]",
-        "        CoalesceBatchesExec: target_batch_size=4096",
-        "          RepartitionExec: partitioning=Hash([Column { name: \"name\", index: 0 }], 4), input_partitions=4",
-        "            RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1",
-        "              AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[]",
-        "                MemoryExec: partitions=1, partition_sizes=[1]",
-    ];
-
-    assert_eq!(
-        expected, actual,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-    Ok(())
-}
diff --git a/datafusion/core/tests/sqllogictests/test_files/union.slt b/datafusion/core/tests/sqllogictests/test_files/union.slt
index 8a13dfe36f..4860627cff 100644
--- a/datafusion/core/tests/sqllogictests/test_files/union.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/union.slt
@@ -175,6 +175,61 @@ Alice
 John
 
 # union_with_type_coercion
+query TT
+explain
+(
+    SELECT id, name FROM t1
+    EXCEPT
+    SELECT id, name FROM t2
+)
+UNION ALL
+(
+    SELECT id, name FROM t2
+    EXCEPT
+    SELECT id, name FROM t1
+)
+----
+logical_plan
+Union
+  LeftAnti Join: t1.id = CAST(t2.id AS Int32), t1.name = t2.name
+    Aggregate: groupBy=[[t1.id, t1.name]], aggr=[[]]
+      TableScan: t1 projection=[id, name]
+    TableScan: t2 projection=[id, name]
+  Projection: CAST(t2.id AS Int32) AS id, t2.name
+    LeftAnti Join: CAST(t2.id AS Int32) = t1.id, t2.name = t1.name
+      Aggregate: groupBy=[[t2.id, t2.name]], aggr=[[]]
+        TableScan: t2 projection=[id, name]
+      TableScan: t1 projection=[id, name]
+physical_plan
+UnionExec
+  ProjectionExec: expr=[id@0 as id, name@1 as name]
+    CoalesceBatchesExec: target_batch_size=8192
+      HashJoinExec: mode=Partitioned, join_type=LeftAnti, on=[(Column { name: "id", index: 0 }, Column { name: "CAST(t2.id AS Int32)", index: 2 }), (Column { name: "name", index: 1 }, Column { name: "name", index: 1 })]
+        AggregateExec: mode=FinalPartitioned, gby=[id@0 as id, name@1 as name], aggr=[]
+          CoalesceBatchesExec: target_batch_size=8192
+            RepartitionExec: partitioning=Hash([Column { name: "id", index: 0 }, Column { name: "name", index: 1 }], 4), input_partitions=4
+              AggregateExec: mode=Partial, gby=[id@0 as id, name@1 as name], aggr=[]
+                MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+        CoalesceBatchesExec: target_batch_size=8192
+          RepartitionExec: partitioning=Hash([Column { name: "CAST(t2.id AS Int32)", index: 2 }, Column { name: "name", index: 1 }], 4), input_partitions=4
+            ProjectionExec: expr=[id@0 as id, name@1 as name, CAST(id@0 AS Int32) as CAST(t2.id AS Int32)]
+              MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+  ProjectionExec: expr=[CAST(id@0 AS Int32) as id, name@1 as name]
+    ProjectionExec: expr=[id@0 as id, name@1 as name]
+      CoalesceBatchesExec: target_batch_size=8192
+        HashJoinExec: mode=Partitioned, join_type=LeftAnti, on=[(Column { name: "CAST(t2.id AS Int32)", index: 2 }, Column { name: "id", index: 0 }), (Column { name: "name", index: 1 }, Column { name: "name", index: 1 })]
+          CoalesceBatchesExec: target_batch_size=8192
+            RepartitionExec: partitioning=Hash([Column { name: "CAST(t2.id AS Int32)", index: 2 }, Column { name: "name", index: 1 }], 4), input_partitions=4
+              ProjectionExec: expr=[id@0 as id, name@1 as name, CAST(id@0 AS Int32) as CAST(t2.id AS Int32)]
+                AggregateExec: mode=FinalPartitioned, gby=[id@0 as id, name@1 as name], aggr=[]
+                  CoalesceBatchesExec: target_batch_size=8192
+                    RepartitionExec: partitioning=Hash([Column { name: "id", index: 0 }, Column { name: "name", index: 1 }], 4), input_partitions=4
+                      AggregateExec: mode=Partial, gby=[id@0 as id, name@1 as name], aggr=[]
+                        MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+          CoalesceBatchesExec: target_batch_size=8192
+            RepartitionExec: partitioning=Hash([Column { name: "id", index: 0 }, Column { name: "name", index: 1 }], 4), input_partitions=4
+              MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+
 query IT rowsort
 (
     SELECT id, name FROM t1
@@ -191,6 +246,121 @@ UNION ALL
 3 Alice
 3 John
 
+# union_with_except_input
+query TT
+explain
+(
+    SELECT name FROM t1
+    EXCEPT
+    SELECT name FROM t2
+)
+UNION ALL
+(
+    SELECT name FROM t2
+    EXCEPT
+    SELECT name FROM t1
+)
+----
+logical_plan
+Union
+  LeftAnti Join: t1.name = t2.name
+    Aggregate: groupBy=[[t1.name]], aggr=[[]]
+      TableScan: t1 projection=[name]
+    TableScan: t2 projection=[name]
+  LeftAnti Join: t2.name = t1.name
+    Aggregate: groupBy=[[t2.name]], aggr=[[]]
+      TableScan: t2 projection=[name]
+    TableScan: t1 projection=[name]
+physical_plan
+  InterleaveExec
+    CoalesceBatchesExec: target_batch_size=8192
+      HashJoinExec: mode=Partitioned, join_type=LeftAnti, on=[(Column { name: "name", index: 0 }, Column { name: "name", index: 0 })]
+        AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[]
+          CoalesceBatchesExec: target_batch_size=8192
+            RepartitionExec: partitioning=Hash([Column { name: "name", index: 0 }], 4), input_partitions=4
+              AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[]
+                MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+        CoalesceBatchesExec: target_batch_size=8192
+          RepartitionExec: partitioning=Hash([Column { name: "name", index: 0 }], 4), input_partitions=4
+            MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+    CoalesceBatchesExec: target_batch_size=8192
+      HashJoinExec: mode=Partitioned, join_type=LeftAnti, on=[(Column { name: "name", index: 0 }, Column { name: "name", index: 0 })]
+        AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[]
+          CoalesceBatchesExec: target_batch_size=8192
+            RepartitionExec: partitioning=Hash([Column { name: "name", index: 0 }], 4), input_partitions=4
+              AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[]
+                MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+        CoalesceBatchesExec: target_batch_size=8192
+          RepartitionExec: partitioning=Hash([Column { name: "name", index: 0 }], 4), input_partitions=4
+            MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+
+# union_upcast_types
+query TT
+explain SELECT c1, c9 FROM aggregate_test_100 UNION ALL SELECT c1, c3 FROM aggregate_test_100 ORDER BY c9 DESC LIMIT 5
+----
+logical_plan
+Limit: skip=0, fetch=5
+  Sort: aggregate_test_100.c9 DESC NULLS FIRST, fetch=5
+    Union
+      Projection: aggregate_test_100.c1, CAST(aggregate_test_100.c9 AS Int64) AS c9
+        TableScan: aggregate_test_100 projection=[c1, c9]
+      Projection: aggregate_test_100.c1, CAST(aggregate_test_100.c3 AS Int64) AS c9
+        TableScan: aggregate_test_100 projection=[c1, c3]
+physical_plan
+GlobalLimitExec: skip=0, fetch=5
+  SortPreservingMergeExec: [c9@1 DESC]
+    UnionExec
+      SortExec: expr=[c9@1 DESC]
+        ProjectionExec: expr=[c1@0 as c1, CAST(c9@1 AS Int64) as c9]
+          RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+            CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c9], has_header=true
+      SortExec: expr=[c9@1 DESC]
+        ProjectionExec: expr=[c1@0 as c1, CAST(c3@1 AS Int64) as c9]
+          RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+            CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c3], has_header=true
+
+query TI
+SELECT c1, c9 FROM aggregate_test_100 UNION ALL SELECT c1, c3 FROM aggregate_test_100 ORDER BY c9 DESC LIMIT 5
+----
+c 4268716378
+e 4229654142
+d 4216440507
+e 4144173353
+b 4076864659
+
+# union_with_hash_aggregate
+query TT
+explain
+SELECT count(*) FROM (
+    SELECT distinct name FROM t1
+    UNION ALL
+    SELECT distinct name FROM t2
+) GROUP BY name
+----
+logical_plan
+Projection: COUNT(UInt8(1))
+  Aggregate: groupBy=[[t1.name]], aggr=[[COUNT(UInt8(1))]]
+    Union
+      Aggregate: groupBy=[[t1.name]], aggr=[[]]
+        TableScan: t1 projection=[name]
+      Aggregate: groupBy=[[t2.name]], aggr=[[]]
+        TableScan: t2 projection=[name]
+physical_plan
+ProjectionExec: expr=[COUNT(UInt8(1))@1 as COUNT(UInt8(1))]
+  AggregateExec: mode=Single, gby=[name@0 as name], aggr=[COUNT(UInt8(1))]
+    InterleaveExec
+      AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[]
+        CoalesceBatchesExec: target_batch_size=8192
+          RepartitionExec: partitioning=Hash([Column { name: "name", index: 0 }], 4), input_partitions=4
+            AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[]
+              MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+      AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[]
+        CoalesceBatchesExec: target_batch_size=8192
+          RepartitionExec: partitioning=Hash([Column { name: "name", index: 0 }], 4), input_partitions=4
+            AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[]
+              MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+
+
 ########
 # Clean up after the test
 ########