You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2024/01/15 15:03:23 UTC

(arrow-datafusion) branch main updated: test: Port `order.rs` tests to sqllogictest (#8857)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new aff709446a test: Port `order.rs` tests to sqllogictest (#8857)
aff709446a is described below

commit aff709446a1a96755d4d0c4a17014ac0e4edd991
Author: Dejan Simic <10...@users.noreply.github.com>
AuthorDate: Mon Jan 15 16:03:17 2024 +0100

    test: Port `order.rs` tests to sqllogictest (#8857)
    
    * Migrate order tests to sqllogictest
    
    * Update unit test for pre-known order
    
    * Revert moving parquet file
---
 datafusion/core/tests/sql/mod.rs                   |   1 -
 datafusion/core/tests/sql/order.rs                 | 252 ---------------------
 .../data/repeat_much.snappy.parquet                | Bin
 datafusion/sqllogictest/test_files/order.slt       | 208 ++++++++++++++++-
 4 files changed, 207 insertions(+), 254 deletions(-)

diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs
index d24f87ba38..80c6c81ef9 100644
--- a/datafusion/core/tests/sql/mod.rs
+++ b/datafusion/core/tests/sql/mod.rs
@@ -76,7 +76,6 @@ pub mod csv_files;
 pub mod explain_analyze;
 pub mod expr;
 pub mod joins;
-pub mod order;
 pub mod partitioned_csv;
 pub mod predicates;
 pub mod references;
diff --git a/datafusion/core/tests/sql/order.rs b/datafusion/core/tests/sql/order.rs
deleted file mode 100644
index 6e3f6319e1..0000000000
--- a/datafusion/core/tests/sql/order.rs
+++ /dev/null
@@ -1,252 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use super::*;
-use datafusion::datasource::listing::ListingTable;
-use datafusion::datasource::listing_table_factory::ListingTableFactory;
-use datafusion::datasource::provider::TableProviderFactory;
-use datafusion_expr::logical_plan::DdlStatement;
-use test_utils::{batches_to_vec, partitions_to_sorted_vec};
-
-#[tokio::test]
-async fn sort_with_lots_of_repetition_values() -> Result<()> {
-    let ctx = SessionContext::new();
-    let filename = "tests/data/repeat_much.snappy.parquet";
-
-    ctx.register_parquet("rep", filename, ParquetReadOptions::default())
-        .await?;
-    let sql = "select a from rep order by a";
-    let actual = execute_to_batches(&ctx, sql).await;
-    let actual = batches_to_vec(&actual);
-
-    let sql1 = "select a from rep";
-    let expected = execute_to_batches(&ctx, sql1).await;
-    let expected = partitions_to_sorted_vec(&[expected]);
-
-    assert_eq!(actual.len(), expected.len());
-    for i in 0..actual.len() {
-        assert_eq!(actual[i], expected[i]);
-    }
-    Ok(())
-}
-
-#[tokio::test]
-async fn create_external_table_with_order() -> Result<()> {
-    let ctx = SessionContext::new();
-    let sql = "CREATE EXTERNAL TABLE dt (a_id integer, a_str string, a_bool boolean) STORED AS CSV WITH ORDER (a_id ASC) LOCATION 'file://path/to/table';";
-    let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) =
-        ctx.state().create_logical_plan(sql).await?
-    else {
-        panic!("Wrong command")
-    };
-
-    let listing_table_factory = Arc::new(ListingTableFactory::new());
-    let table_dyn = listing_table_factory.create(&ctx.state(), &cmd).await?;
-    let table = table_dyn.as_any().downcast_ref::<ListingTable>().unwrap();
-    assert_eq!(cmd.order_exprs.len(), 1);
-    assert_eq!(cmd.order_exprs, table.options().file_sort_order);
-    Ok(())
-}
-
-#[tokio::test]
-async fn create_external_table_with_ddl_ordered_non_cols() -> Result<()> {
-    let ctx = SessionContext::new();
-    let sql = "CREATE EXTERNAL TABLE dt (a_id integer, a_str string, a_bool boolean) STORED AS CSV WITH ORDER (a ASC) LOCATION 'file://path/to/table';";
-    match ctx.state().create_logical_plan(sql).await {
-        Ok(_) => panic!("Expecting error."),
-        Err(e) => {
-            assert_eq!(
-                e.strip_backtrace(),
-                "Error during planning: Column a is not in schema"
-            )
-        }
-    }
-    Ok(())
-}
-
-#[tokio::test]
-async fn create_external_table_with_ddl_ordered_without_schema() -> Result<()> {
-    let ctx = SessionContext::new();
-    let sql = "CREATE EXTERNAL TABLE dt STORED AS CSV WITH ORDER (a ASC) LOCATION 'file://path/to/table';";
-    match ctx.state().create_logical_plan(sql).await {
-        Ok(_) => panic!("Expecting error."),
-        Err(e) => {
-            assert_eq!(e.strip_backtrace(), "Error during planning: Provide a schema before specifying the order while creating a table.")
-        }
-    }
-    Ok(())
-}
-
-#[tokio::test]
-async fn sort_with_duplicate_sort_exprs() -> Result<()> {
-    let ctx = SessionContext::new();
-
-    let t1_schema = Arc::new(Schema::new(vec![
-        Field::new("id", DataType::Int32, true),
-        Field::new("name", DataType::Utf8, true),
-    ]));
-
-    let t1_data = RecordBatch::try_new(
-        t1_schema.clone(),
-        vec![
-            Arc::new(Int32Array::from(vec![2, 4, 9, 3, 4])),
-            Arc::new(StringArray::from(vec!["a", "b", "c", "d", "e"])),
-        ],
-    )?;
-    ctx.register_batch("t1", t1_data)?;
-
-    let sql = "select * from t1 order by id desc, id, name, id asc";
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let plan = dataframe.into_optimized_plan().unwrap();
-    let expected = vec![
-        "Sort: t1.id DESC NULLS FIRST, t1.name ASC NULLS LAST [id:Int32;N, name:Utf8;N]",
-        "  TableScan: t1 projection=[id, name] [id:Int32;N, name:Utf8;N]",
-    ];
-
-    let formatted = plan.display_indent_schema().to_string();
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    assert_eq!(
-        expected, actual,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    let expected = [
-        "+----+------+",
-        "| id | name |",
-        "+----+------+",
-        "| 9  | c    |",
-        "| 4  | b    |",
-        "| 4  | e    |",
-        "| 3  | d    |",
-        "| 2  | a    |",
-        "+----+------+",
-    ];
-
-    let results = execute_to_batches(&ctx, sql).await;
-    assert_batches_eq!(expected, &results);
-
-    let sql = "select * from t1 order by id asc, id, name, id desc;";
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let plan = dataframe.into_optimized_plan().unwrap();
-    let expected = vec![
-        "Sort: t1.id ASC NULLS LAST, t1.name ASC NULLS LAST [id:Int32;N, name:Utf8;N]",
-        "  TableScan: t1 projection=[id, name] [id:Int32;N, name:Utf8;N]",
-    ];
-
-    let formatted = plan.display_indent_schema().to_string();
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    assert_eq!(
-        expected, actual,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    let expected = [
-        "+----+------+",
-        "| id | name |",
-        "+----+------+",
-        "| 2  | a    |",
-        "| 3  | d    |",
-        "| 4  | b    |",
-        "| 4  | e    |",
-        "| 9  | c    |",
-        "+----+------+",
-    ];
-
-    let results = execute_to_batches(&ctx, sql).await;
-    assert_batches_eq!(expected, &results);
-
-    Ok(())
-}
-
-/// Minimal test case for https://github.com/apache/arrow-datafusion/issues/5970
-#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
-async fn test_issue5970_mini() -> Result<()> {
-    let config = SessionConfig::new()
-        .with_target_partitions(2)
-        .with_repartition_sorts(true);
-    let ctx = SessionContext::new_with_config(config);
-    let sql = "
-WITH
-    m0(t) AS (
-        VALUES (0), (1), (2)),
-    m1(t) AS (
-        VALUES (0), (1)),
-    u AS (
-        SELECT 0 as m, t FROM m0 GROUP BY 1, 2),
-    v AS (
-        SELECT 1 as m, t FROM m1 GROUP BY 1, 2)
-SELECT * FROM u
-UNION ALL
-SELECT * FROM v
-ORDER BY 1, 2;
-    ";
-
-    // check phys. plan
-    let dataframe = ctx.sql(sql).await.unwrap();
-    let plan = dataframe.into_optimized_plan().unwrap();
-    let plan = ctx.state().create_physical_plan(&plan).await.unwrap();
-    let expected = vec![
-        "SortPreservingMergeExec: [m@0 ASC NULLS LAST,t@1 ASC NULLS LAST]",
-        "  SortExec: expr=[m@0 ASC NULLS LAST,t@1 ASC NULLS LAST]",
-        "    InterleaveExec",
-        "      ProjectionExec: expr=[Int64(0)@0 as m, t@1 as t]",
-        "        AggregateExec: mode=FinalPartitioned, gby=[Int64(0)@0 as Int64(0), t@1 as t], aggr=[]",
-        "          CoalesceBatchesExec: target_batch_size=8192",
-        "            RepartitionExec: partitioning=Hash([Int64(0)@0, t@1], 2), input_partitions=2",
-        "              RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
-        "                AggregateExec: mode=Partial, gby=[0 as Int64(0), t@0 as t], aggr=[]",
-        "                  ProjectionExec: expr=[column1@0 as t]",
-        "                    ValuesExec",
-        "      ProjectionExec: expr=[Int64(1)@0 as m, t@1 as t]",
-        "        AggregateExec: mode=FinalPartitioned, gby=[Int64(1)@0 as Int64(1), t@1 as t], aggr=[]",
-        "          CoalesceBatchesExec: target_batch_size=8192",
-        "            RepartitionExec: partitioning=Hash([Int64(1)@0, t@1], 2), input_partitions=2",
-        "              RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
-        "                AggregateExec: mode=Partial, gby=[1 as Int64(1), t@0 as t], aggr=[]",
-        "                  ProjectionExec: expr=[column1@0 as t]",
-        "                    ValuesExec",
-    ];
-    let formatted = displayable(plan.as_ref()).indent(true).to_string();
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    assert_eq!(
-        expected, actual,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    // sometimes it "just works"
-    for i in 0..10 {
-        println!("run: {i}");
-        let actual = execute_to_batches(&ctx, sql).await;
-
-        // in https://github.com/apache/arrow-datafusion/issues/5970 the order of the output was sometimes not right
-        let expected = [
-            "+---+---+",
-            "| m | t |",
-            "+---+---+",
-            "| 0 | 0 |",
-            "| 0 | 1 |",
-            "| 0 | 2 |",
-            "| 1 | 0 |",
-            "| 1 | 1 |",
-            "+---+---+",
-        ];
-        assert_batches_eq!(expected, &actual);
-    }
-    Ok(())
-}
diff --git a/datafusion/core/tests/data/repeat_much.snappy.parquet b/datafusion/sqllogictest/data/repeat_much.snappy.parquet
similarity index 100%
rename from datafusion/core/tests/data/repeat_much.snappy.parquet
rename to datafusion/sqllogictest/data/repeat_much.snappy.parquet
diff --git a/datafusion/sqllogictest/test_files/order.slt b/datafusion/sqllogictest/test_files/order.slt
index 77df9e0bb4..61888eb800 100644
--- a/datafusion/sqllogictest/test_files/order.slt
+++ b/datafusion/sqllogictest/test_files/order.slt
@@ -429,7 +429,7 @@ WITH ORDER (c ASC)
 LOCATION '../core/tests/data/window_2.csv';
 
 query TT
-EXPLAIN SELECT (b+a+c) AS result 
+EXPLAIN SELECT (b+a+c) AS result
 FROM multiple_ordered_table
 ORDER BY result;
 ----
@@ -578,3 +578,209 @@ SortPreservingMergeExec: [log_c12_base_c11@0 DESC]
 
 statement ok
 drop table aggregate_test_100;
+
+
+# Sort with lots of repetition values
+# Test sorting a parquet file with 2 million records that has lots of values that are repeated
+statement ok
+CREATE EXTERNAL TABLE repeat_much STORED AS PARQUET LOCATION 'data/repeat_much.snappy.parquet';
+
+query I
+SELECT a FROM repeat_much ORDER BY a LIMIT 20;
+----
+2450962
+2450962
+2450962
+2450962
+2450962
+2450962
+2450962
+2450962
+2450962
+2450962
+2450962
+2450962
+2450962
+2450962
+2450962
+2450962
+2450962
+2450962
+2450962
+2450962
+
+
+# Create external table with optional pre-known sort order
+# ORDER BY query should be able to take advantage of the pre-existing order and drop SortExec
+# in the physical plan
+statement ok
+set datafusion.catalog.information_schema = true;
+
+statement ok
+CREATE EXTERNAL TABLE IF NOT EXISTS orders (
+        o_orderkey BIGINT,
+        o_custkey BIGINT,
+        o_orderstatus VARCHAR,
+        o_totalprice DECIMAL(15, 2),
+        o_orderdate DATE,
+        o_orderpriority VARCHAR,
+        o_clerk VARCHAR,
+        o_shippriority INTEGER,
+        o_comment VARCHAR,
+) STORED AS CSV  WITH ORDER (o_orderkey ASC) DELIMITER ',' WITH HEADER ROW LOCATION '../core/tests/tpch-csv/orders.csv';
+
+query TT
+EXPLAIN SELECT o_orderkey, o_orderstatus FROM orders ORDER BY o_orderkey ASC
+----
+logical_plan
+Sort: orders.o_orderkey ASC NULLS LAST
+--TableScan: orders projection=[o_orderkey, o_orderstatus]
+physical_plan CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/tpch-csv/orders.csv]]}, projection=[o_orderkey, o_orderstatus], output_ordering=[o_orderkey@0 ASC NULLS LAST], has_header=true
+
+
+# Create external table with DDL ordered columns that are missing
+# When columns are missing the query is expected to fail
+query error DataFusion error: Error during planning: Column a is not in schema
+CREATE EXTERNAL TABLE dt (a_id integer, a_str string, a_bool boolean) STORED AS CSV WITH ORDER (a ASC) LOCATION 'file://path/to/table';
+
+
+# Create external table with DDL ordered columns without schema
+# When schema is missing the query is expected to fail
+query error DataFusion error: Error during planning: Provide a schema before specifying the order while creating a table\.
+CREATE EXTERNAL TABLE dt STORED AS CSV WITH ORDER (a ASC) LOCATION 'file://path/to/table';
+
+
+# Sort with duplicate sort expressions
+# Table is sorted multiple times on the same column name and should not fail
+statement ok
+CREATE TABLE t1 (
+  id INT,
+  name TEXT
+) AS VALUES
+(2, 'a'),
+(4, 'b'),
+(9, 'c'),
+(3, 'd'),
+(4, 'e');
+
+
+query IT
+SELECT * FROM t1 ORDER BY id DESC, id, name, id ASC;
+----
+9 c
+4 b
+4 e
+3 d
+2 a
+
+query TT
+EXPLAIN SELECT * FROM t1 ORDER BY id DESC, id, name, id ASC;
+----
+logical_plan
+Sort: t1.id DESC NULLS FIRST, t1.name ASC NULLS LAST
+--TableScan: t1 projection=[id, name]
+physical_plan
+SortExec: expr=[id@0 DESC,name@1 ASC NULLS LAST]
+--MemoryExec: partitions=1, partition_sizes=[1]
+
+query IT
+SELECT * FROM t1 ORDER BY id ASC, id, name, id DESC;
+----
+2 a
+3 d
+4 b
+4 e
+9 c
+
+query TT
+EXPLAIN SELECT * FROM t1 ORDER BY id ASC, id, name, id DESC;
+----
+logical_plan
+Sort: t1.id ASC NULLS LAST, t1.name ASC NULLS LAST
+--TableScan: t1 projection=[id, name]
+physical_plan
+SortExec: expr=[id@0 ASC NULLS LAST,name@1 ASC NULLS LAST]
+--MemoryExec: partitions=1, partition_sizes=[1]
+
+
+# Minimal reproduction of issue 5970
+# https://github.com/apache/arrow-datafusion/issues/5970
+statement ok
+set datafusion.execution.target_partitions = 2;
+
+statement ok
+set datafusion.optimizer.repartition_sorts = true;
+
+
+query II
+WITH
+    m0(t) AS (
+        VALUES (0), (1), (2)),
+    m1(t) AS (
+        VALUES (0), (1)),
+    u AS (
+        SELECT 0 as m, t FROM m0 GROUP BY 1, 2),
+    v AS (
+        SELECT 1 as m, t FROM m1 GROUP BY 1, 2)
+SELECT * FROM u
+UNION ALL
+SELECT * FROM v
+ORDER BY 1, 2;
+----
+0 0
+0 1
+0 2
+1 0
+1 1
+
+query TT
+EXPLAIN
+WITH
+    m0(t) AS (
+        VALUES (0), (1), (2)),
+    m1(t) AS (
+        VALUES (0), (1)),
+    u AS (
+        SELECT 0 as m, t FROM m0 GROUP BY 1, 2),
+    v AS (
+        SELECT 1 as m, t FROM m1 GROUP BY 1, 2)
+SELECT * FROM u
+UNION ALL
+SELECT * FROM v
+ORDER BY 1, 2;
+----
+logical_plan
+Sort: u.m ASC NULLS LAST, u.t ASC NULLS LAST
+--Union
+----SubqueryAlias: u
+------Projection: Int64(0) AS m, m0.t
+--------Aggregate: groupBy=[[Int64(0), m0.t]], aggr=[[]]
+----------SubqueryAlias: m0
+------------Projection: column1 AS t
+--------------Values: (Int64(0)), (Int64(1)), (Int64(2))
+----SubqueryAlias: v
+------Projection: Int64(1) AS m, m1.t
+--------Aggregate: groupBy=[[Int64(1), m1.t]], aggr=[[]]
+----------SubqueryAlias: m1
+------------Projection: column1 AS t
+--------------Values: (Int64(0)), (Int64(1))
+physical_plan
+SortPreservingMergeExec: [m@0 ASC NULLS LAST,t@1 ASC NULLS LAST]
+--SortExec: expr=[m@0 ASC NULLS LAST,t@1 ASC NULLS LAST]
+----InterleaveExec
+------ProjectionExec: expr=[Int64(0)@0 as m, t@1 as t]
+--------AggregateExec: mode=FinalPartitioned, gby=[Int64(0)@0 as Int64(0), t@1 as t], aggr=[]
+----------CoalesceBatchesExec: target_batch_size=8192
+------------RepartitionExec: partitioning=Hash([Int64(0)@0, t@1], 2), input_partitions=2
+--------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+----------------AggregateExec: mode=Partial, gby=[0 as Int64(0), t@0 as t], aggr=[]
+------------------ProjectionExec: expr=[column1@0 as t]
+--------------------ValuesExec
+------ProjectionExec: expr=[Int64(1)@0 as m, t@1 as t]
+--------AggregateExec: mode=FinalPartitioned, gby=[Int64(1)@0 as Int64(1), t@1 as t], aggr=[]
+----------CoalesceBatchesExec: target_batch_size=8192
+------------RepartitionExec: partitioning=Hash([Int64(1)@0, t@1], 2), input_partitions=2
+--------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+----------------AggregateExec: mode=Partial, gby=[1 as Int64(1), t@0 as t], aggr=[]
+------------------ProjectionExec: expr=[column1@0 as t]
+--------------------ValuesExec