You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2024/01/15 15:03:23 UTC
(arrow-datafusion) branch main updated: test: Port `order.rs` tests to sqllogictest (#8857)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new aff709446a test: Port `order.rs` tests to sqllogictest (#8857)
aff709446a is described below
commit aff709446a1a96755d4d0c4a17014ac0e4edd991
Author: Dejan Simic <10...@users.noreply.github.com>
AuthorDate: Mon Jan 15 16:03:17 2024 +0100
test: Port `order.rs` tests to sqllogictest (#8857)
* Migrate order tests to sqllogictest
* Update unit test for pre-known order
* Revert moving parquet file
---
datafusion/core/tests/sql/mod.rs | 1 -
datafusion/core/tests/sql/order.rs | 252 ---------------------
.../data/repeat_much.snappy.parquet | Bin
datafusion/sqllogictest/test_files/order.slt | 208 ++++++++++++++++-
4 files changed, 207 insertions(+), 254 deletions(-)
diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs
index d24f87ba38..80c6c81ef9 100644
--- a/datafusion/core/tests/sql/mod.rs
+++ b/datafusion/core/tests/sql/mod.rs
@@ -76,7 +76,6 @@ pub mod csv_files;
pub mod explain_analyze;
pub mod expr;
pub mod joins;
-pub mod order;
pub mod partitioned_csv;
pub mod predicates;
pub mod references;
diff --git a/datafusion/core/tests/sql/order.rs b/datafusion/core/tests/sql/order.rs
deleted file mode 100644
index 6e3f6319e1..0000000000
--- a/datafusion/core/tests/sql/order.rs
+++ /dev/null
@@ -1,252 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use super::*;
-use datafusion::datasource::listing::ListingTable;
-use datafusion::datasource::listing_table_factory::ListingTableFactory;
-use datafusion::datasource::provider::TableProviderFactory;
-use datafusion_expr::logical_plan::DdlStatement;
-use test_utils::{batches_to_vec, partitions_to_sorted_vec};
-
-#[tokio::test]
-async fn sort_with_lots_of_repetition_values() -> Result<()> {
- let ctx = SessionContext::new();
- let filename = "tests/data/repeat_much.snappy.parquet";
-
- ctx.register_parquet("rep", filename, ParquetReadOptions::default())
- .await?;
- let sql = "select a from rep order by a";
- let actual = execute_to_batches(&ctx, sql).await;
- let actual = batches_to_vec(&actual);
-
- let sql1 = "select a from rep";
- let expected = execute_to_batches(&ctx, sql1).await;
- let expected = partitions_to_sorted_vec(&[expected]);
-
- assert_eq!(actual.len(), expected.len());
- for i in 0..actual.len() {
- assert_eq!(actual[i], expected[i]);
- }
- Ok(())
-}
-
-#[tokio::test]
-async fn create_external_table_with_order() -> Result<()> {
- let ctx = SessionContext::new();
- let sql = "CREATE EXTERNAL TABLE dt (a_id integer, a_str string, a_bool boolean) STORED AS CSV WITH ORDER (a_id ASC) LOCATION 'file://path/to/table';";
- let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) =
- ctx.state().create_logical_plan(sql).await?
- else {
- panic!("Wrong command")
- };
-
- let listing_table_factory = Arc::new(ListingTableFactory::new());
- let table_dyn = listing_table_factory.create(&ctx.state(), &cmd).await?;
- let table = table_dyn.as_any().downcast_ref::<ListingTable>().unwrap();
- assert_eq!(cmd.order_exprs.len(), 1);
- assert_eq!(cmd.order_exprs, table.options().file_sort_order);
- Ok(())
-}
-
-#[tokio::test]
-async fn create_external_table_with_ddl_ordered_non_cols() -> Result<()> {
- let ctx = SessionContext::new();
- let sql = "CREATE EXTERNAL TABLE dt (a_id integer, a_str string, a_bool boolean) STORED AS CSV WITH ORDER (a ASC) LOCATION 'file://path/to/table';";
- match ctx.state().create_logical_plan(sql).await {
- Ok(_) => panic!("Expecting error."),
- Err(e) => {
- assert_eq!(
- e.strip_backtrace(),
- "Error during planning: Column a is not in schema"
- )
- }
- }
- Ok(())
-}
-
-#[tokio::test]
-async fn create_external_table_with_ddl_ordered_without_schema() -> Result<()> {
- let ctx = SessionContext::new();
- let sql = "CREATE EXTERNAL TABLE dt STORED AS CSV WITH ORDER (a ASC) LOCATION 'file://path/to/table';";
- match ctx.state().create_logical_plan(sql).await {
- Ok(_) => panic!("Expecting error."),
- Err(e) => {
- assert_eq!(e.strip_backtrace(), "Error during planning: Provide a schema before specifying the order while creating a table.")
- }
- }
- Ok(())
-}
-
-#[tokio::test]
-async fn sort_with_duplicate_sort_exprs() -> Result<()> {
- let ctx = SessionContext::new();
-
- let t1_schema = Arc::new(Schema::new(vec![
- Field::new("id", DataType::Int32, true),
- Field::new("name", DataType::Utf8, true),
- ]));
-
- let t1_data = RecordBatch::try_new(
- t1_schema.clone(),
- vec![
- Arc::new(Int32Array::from(vec![2, 4, 9, 3, 4])),
- Arc::new(StringArray::from(vec!["a", "b", "c", "d", "e"])),
- ],
- )?;
- ctx.register_batch("t1", t1_data)?;
-
- let sql = "select * from t1 order by id desc, id, name, id asc";
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let plan = dataframe.into_optimized_plan().unwrap();
- let expected = vec![
- "Sort: t1.id DESC NULLS FIRST, t1.name ASC NULLS LAST [id:Int32;N, name:Utf8;N]",
- " TableScan: t1 projection=[id, name] [id:Int32;N, name:Utf8;N]",
- ];
-
- let formatted = plan.display_indent_schema().to_string();
- let actual: Vec<&str> = formatted.trim().lines().collect();
- assert_eq!(
- expected, actual,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
-
- let expected = [
- "+----+------+",
- "| id | name |",
- "+----+------+",
- "| 9 | c |",
- "| 4 | b |",
- "| 4 | e |",
- "| 3 | d |",
- "| 2 | a |",
- "+----+------+",
- ];
-
- let results = execute_to_batches(&ctx, sql).await;
- assert_batches_eq!(expected, &results);
-
- let sql = "select * from t1 order by id asc, id, name, id desc;";
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let plan = dataframe.into_optimized_plan().unwrap();
- let expected = vec![
- "Sort: t1.id ASC NULLS LAST, t1.name ASC NULLS LAST [id:Int32;N, name:Utf8;N]",
- " TableScan: t1 projection=[id, name] [id:Int32;N, name:Utf8;N]",
- ];
-
- let formatted = plan.display_indent_schema().to_string();
- let actual: Vec<&str> = formatted.trim().lines().collect();
- assert_eq!(
- expected, actual,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
-
- let expected = [
- "+----+------+",
- "| id | name |",
- "+----+------+",
- "| 2 | a |",
- "| 3 | d |",
- "| 4 | b |",
- "| 4 | e |",
- "| 9 | c |",
- "+----+------+",
- ];
-
- let results = execute_to_batches(&ctx, sql).await;
- assert_batches_eq!(expected, &results);
-
- Ok(())
-}
-
-/// Minimal test case for https://github.com/apache/arrow-datafusion/issues/5970
-#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
-async fn test_issue5970_mini() -> Result<()> {
- let config = SessionConfig::new()
- .with_target_partitions(2)
- .with_repartition_sorts(true);
- let ctx = SessionContext::new_with_config(config);
- let sql = "
-WITH
- m0(t) AS (
- VALUES (0), (1), (2)),
- m1(t) AS (
- VALUES (0), (1)),
- u AS (
- SELECT 0 as m, t FROM m0 GROUP BY 1, 2),
- v AS (
- SELECT 1 as m, t FROM m1 GROUP BY 1, 2)
-SELECT * FROM u
-UNION ALL
-SELECT * FROM v
-ORDER BY 1, 2;
- ";
-
- // check phys. plan
- let dataframe = ctx.sql(sql).await.unwrap();
- let plan = dataframe.into_optimized_plan().unwrap();
- let plan = ctx.state().create_physical_plan(&plan).await.unwrap();
- let expected = vec![
- "SortPreservingMergeExec: [m@0 ASC NULLS LAST,t@1 ASC NULLS LAST]",
- " SortExec: expr=[m@0 ASC NULLS LAST,t@1 ASC NULLS LAST]",
- " InterleaveExec",
- " ProjectionExec: expr=[Int64(0)@0 as m, t@1 as t]",
- " AggregateExec: mode=FinalPartitioned, gby=[Int64(0)@0 as Int64(0), t@1 as t], aggr=[]",
- " CoalesceBatchesExec: target_batch_size=8192",
- " RepartitionExec: partitioning=Hash([Int64(0)@0, t@1], 2), input_partitions=2",
- " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
- " AggregateExec: mode=Partial, gby=[0 as Int64(0), t@0 as t], aggr=[]",
- " ProjectionExec: expr=[column1@0 as t]",
- " ValuesExec",
- " ProjectionExec: expr=[Int64(1)@0 as m, t@1 as t]",
- " AggregateExec: mode=FinalPartitioned, gby=[Int64(1)@0 as Int64(1), t@1 as t], aggr=[]",
- " CoalesceBatchesExec: target_batch_size=8192",
- " RepartitionExec: partitioning=Hash([Int64(1)@0, t@1], 2), input_partitions=2",
- " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
- " AggregateExec: mode=Partial, gby=[1 as Int64(1), t@0 as t], aggr=[]",
- " ProjectionExec: expr=[column1@0 as t]",
- " ValuesExec",
- ];
- let formatted = displayable(plan.as_ref()).indent(true).to_string();
- let actual: Vec<&str> = formatted.trim().lines().collect();
- assert_eq!(
- expected, actual,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
-
- // sometimes it "just works"
- for i in 0..10 {
- println!("run: {i}");
- let actual = execute_to_batches(&ctx, sql).await;
-
- // in https://github.com/apache/arrow-datafusion/issues/5970 the order of the output was sometimes not right
- let expected = [
- "+---+---+",
- "| m | t |",
- "+---+---+",
- "| 0 | 0 |",
- "| 0 | 1 |",
- "| 0 | 2 |",
- "| 1 | 0 |",
- "| 1 | 1 |",
- "+---+---+",
- ];
- assert_batches_eq!(expected, &actual);
- }
- Ok(())
-}
diff --git a/datafusion/core/tests/data/repeat_much.snappy.parquet b/datafusion/sqllogictest/data/repeat_much.snappy.parquet
similarity index 100%
rename from datafusion/core/tests/data/repeat_much.snappy.parquet
rename to datafusion/sqllogictest/data/repeat_much.snappy.parquet
diff --git a/datafusion/sqllogictest/test_files/order.slt b/datafusion/sqllogictest/test_files/order.slt
index 77df9e0bb4..61888eb800 100644
--- a/datafusion/sqllogictest/test_files/order.slt
+++ b/datafusion/sqllogictest/test_files/order.slt
@@ -429,7 +429,7 @@ WITH ORDER (c ASC)
LOCATION '../core/tests/data/window_2.csv';
query TT
-EXPLAIN SELECT (b+a+c) AS result
+EXPLAIN SELECT (b+a+c) AS result
FROM multiple_ordered_table
ORDER BY result;
----
@@ -578,3 +578,209 @@ SortPreservingMergeExec: [log_c12_base_c11@0 DESC]
statement ok
drop table aggregate_test_100;
+
+
+# Sort with lots of repetition values
+# Test sorting a parquet file with 2 million records that has lots of values that are repeated
+statement ok
+CREATE EXTERNAL TABLE repeat_much STORED AS PARQUET LOCATION 'data/repeat_much.snappy.parquet';
+
+query I
+SELECT a FROM repeat_much ORDER BY a LIMIT 20;
+----
+2450962
+2450962
+2450962
+2450962
+2450962
+2450962
+2450962
+2450962
+2450962
+2450962
+2450962
+2450962
+2450962
+2450962
+2450962
+2450962
+2450962
+2450962
+2450962
+2450962
+
+
+# Create external table with optional pre-known sort order
+# ORDER BY query should be able to take advantage of the pre-existing order and drop SortExec
+# in the physical plan
+statement ok
+set datafusion.catalog.information_schema = true;
+
+statement ok
+CREATE EXTERNAL TABLE IF NOT EXISTS orders (
+ o_orderkey BIGINT,
+ o_custkey BIGINT,
+ o_orderstatus VARCHAR,
+ o_totalprice DECIMAL(15, 2),
+ o_orderdate DATE,
+ o_orderpriority VARCHAR,
+ o_clerk VARCHAR,
+ o_shippriority INTEGER,
+ o_comment VARCHAR,
+) STORED AS CSV WITH ORDER (o_orderkey ASC) DELIMITER ',' WITH HEADER ROW LOCATION '../core/tests/tpch-csv/orders.csv';
+
+query TT
+EXPLAIN SELECT o_orderkey, o_orderstatus FROM orders ORDER BY o_orderkey ASC
+----
+logical_plan
+Sort: orders.o_orderkey ASC NULLS LAST
+--TableScan: orders projection=[o_orderkey, o_orderstatus]
+physical_plan CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/tpch-csv/orders.csv]]}, projection=[o_orderkey, o_orderstatus], output_ordering=[o_orderkey@0 ASC NULLS LAST], has_header=true
+
+
+# Create external table with DDL ordered columns that are missing
+# When columns are missing the query is expected to fail
+query error DataFusion error: Error during planning: Column a is not in schema
+CREATE EXTERNAL TABLE dt (a_id integer, a_str string, a_bool boolean) STORED AS CSV WITH ORDER (a ASC) LOCATION 'file://path/to/table';
+
+
+# Create external table with DDL ordered columns without schema
+# When schema is missing the query is expected to fail
+query error DataFusion error: Error during planning: Provide a schema before specifying the order while creating a table\.
+CREATE EXTERNAL TABLE dt STORED AS CSV WITH ORDER (a ASC) LOCATION 'file://path/to/table';
+
+
+# Sort with duplicate sort expressions
+# Table is sorted multiple times on the same column name and should not fail
+statement ok
+CREATE TABLE t1 (
+ id INT,
+ name TEXT
+) AS VALUES
+(2, 'a'),
+(4, 'b'),
+(9, 'c'),
+(3, 'd'),
+(4, 'e');
+
+
+query IT
+SELECT * FROM t1 ORDER BY id DESC, id, name, id ASC;
+----
+9 c
+4 b
+4 e
+3 d
+2 a
+
+query TT
+EXPLAIN SELECT * FROM t1 ORDER BY id DESC, id, name, id ASC;
+----
+logical_plan
+Sort: t1.id DESC NULLS FIRST, t1.name ASC NULLS LAST
+--TableScan: t1 projection=[id, name]
+physical_plan
+SortExec: expr=[id@0 DESC,name@1 ASC NULLS LAST]
+--MemoryExec: partitions=1, partition_sizes=[1]
+
+query IT
+SELECT * FROM t1 ORDER BY id ASC, id, name, id DESC;
+----
+2 a
+3 d
+4 b
+4 e
+9 c
+
+query TT
+EXPLAIN SELECT * FROM t1 ORDER BY id ASC, id, name, id DESC;
+----
+logical_plan
+Sort: t1.id ASC NULLS LAST, t1.name ASC NULLS LAST
+--TableScan: t1 projection=[id, name]
+physical_plan
+SortExec: expr=[id@0 ASC NULLS LAST,name@1 ASC NULLS LAST]
+--MemoryExec: partitions=1, partition_sizes=[1]
+
+
+# Minimal reproduction of issue 5970
+# https://github.com/apache/arrow-datafusion/issues/5970
+statement ok
+set datafusion.execution.target_partitions = 2;
+
+statement ok
+set datafusion.optimizer.repartition_sorts = true;
+
+
+query II
+WITH
+ m0(t) AS (
+ VALUES (0), (1), (2)),
+ m1(t) AS (
+ VALUES (0), (1)),
+ u AS (
+ SELECT 0 as m, t FROM m0 GROUP BY 1, 2),
+ v AS (
+ SELECT 1 as m, t FROM m1 GROUP BY 1, 2)
+SELECT * FROM u
+UNION ALL
+SELECT * FROM v
+ORDER BY 1, 2;
+----
+0 0
+0 1
+0 2
+1 0
+1 1
+
+query TT
+EXPLAIN
+WITH
+ m0(t) AS (
+ VALUES (0), (1), (2)),
+ m1(t) AS (
+ VALUES (0), (1)),
+ u AS (
+ SELECT 0 as m, t FROM m0 GROUP BY 1, 2),
+ v AS (
+ SELECT 1 as m, t FROM m1 GROUP BY 1, 2)
+SELECT * FROM u
+UNION ALL
+SELECT * FROM v
+ORDER BY 1, 2;
+----
+logical_plan
+Sort: u.m ASC NULLS LAST, u.t ASC NULLS LAST
+--Union
+----SubqueryAlias: u
+------Projection: Int64(0) AS m, m0.t
+--------Aggregate: groupBy=[[Int64(0), m0.t]], aggr=[[]]
+----------SubqueryAlias: m0
+------------Projection: column1 AS t
+--------------Values: (Int64(0)), (Int64(1)), (Int64(2))
+----SubqueryAlias: v
+------Projection: Int64(1) AS m, m1.t
+--------Aggregate: groupBy=[[Int64(1), m1.t]], aggr=[[]]
+----------SubqueryAlias: m1
+------------Projection: column1 AS t
+--------------Values: (Int64(0)), (Int64(1))
+physical_plan
+SortPreservingMergeExec: [m@0 ASC NULLS LAST,t@1 ASC NULLS LAST]
+--SortExec: expr=[m@0 ASC NULLS LAST,t@1 ASC NULLS LAST]
+----InterleaveExec
+------ProjectionExec: expr=[Int64(0)@0 as m, t@1 as t]
+--------AggregateExec: mode=FinalPartitioned, gby=[Int64(0)@0 as Int64(0), t@1 as t], aggr=[]
+----------CoalesceBatchesExec: target_batch_size=8192
+------------RepartitionExec: partitioning=Hash([Int64(0)@0, t@1], 2), input_partitions=2
+--------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+----------------AggregateExec: mode=Partial, gby=[0 as Int64(0), t@0 as t], aggr=[]
+------------------ProjectionExec: expr=[column1@0 as t]
+--------------------ValuesExec
+------ProjectionExec: expr=[Int64(1)@0 as m, t@1 as t]
+--------AggregateExec: mode=FinalPartitioned, gby=[Int64(1)@0 as Int64(1), t@1 as t], aggr=[]
+----------CoalesceBatchesExec: target_batch_size=8192
+------------RepartitionExec: partitioning=Hash([Int64(1)@0, t@1], 2), input_partitions=2
+--------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+----------------AggregateExec: mode=Partial, gby=[1 as Int64(1), t@0 as t], aggr=[]
+------------------ProjectionExec: expr=[column1@0 as t]
+--------------------ValuesExec