You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/11/17 19:38:12 UTC

(arrow-datafusion) branch main updated: Port tests in projection.rs to sqllogictest (#8240)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 91eec3f925 Port tests in projection.rs to sqllogictest (#8240)
91eec3f925 is described below

commit 91eec3f92567f979e6b104793669dfe5bf35390b
Author: Chojan Shang <ps...@apache.org>
AuthorDate: Sat Nov 18 03:38:06 2023 +0800

    Port tests in projection.rs to sqllogictest (#8240)
    
    * Port tests in projection.rs to sqllogictest
    
    Signed-off-by: Chojan Shang <ps...@apache.org>
    
    * Minor update
    
    Signed-off-by: Chojan Shang <ps...@apache.org>
    
    * Make test happy
    
    Signed-off-by: Chojan Shang <ps...@apache.org>
    
    * Minor update
    
    Signed-off-by: Chojan Shang <ps...@apache.org>
    
    * Refine tests
    
    Signed-off-by: Chojan Shang <ps...@apache.org>
    
    * chore: remove unused code
    
    Signed-off-by: Chojan Shang <ps...@apache.org>
    
    ---------
    
    Signed-off-by: Chojan Shang <ps...@apache.org>
---
 datafusion/core/tests/sql/mod.rs                  |  18 --
 datafusion/core/tests/sql/partitioned_csv.rs      |  20 +-
 datafusion/core/tests/sql/projection.rs           | 373 ----------------------
 datafusion/sqllogictest/test_files/projection.slt | 235 ++++++++++++++
 4 files changed, 236 insertions(+), 410 deletions(-)

diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs
index 1d58bba876..b04ba573af 100644
--- a/datafusion/core/tests/sql/mod.rs
+++ b/datafusion/core/tests/sql/mod.rs
@@ -86,7 +86,6 @@ pub mod parquet;
 pub mod parquet_schema;
 pub mod partitioned_csv;
 pub mod predicates;
-pub mod projection;
 pub mod references;
 pub mod repartition;
 pub mod select;
@@ -455,23 +454,6 @@ async fn register_aggregate_csv_by_sql(ctx: &SessionContext) {
     );
 }
 
-async fn register_aggregate_simple_csv(ctx: &SessionContext) -> Result<()> {
-    // It's not possible to use aggregate_test_100 as it doesn't have enough similar values to test grouping on floats.
-    let schema = Arc::new(Schema::new(vec![
-        Field::new("c1", DataType::Float32, false),
-        Field::new("c2", DataType::Float64, false),
-        Field::new("c3", DataType::Boolean, false),
-    ]));
-
-    ctx.register_csv(
-        "aggregate_simple",
-        "tests/data/aggregate_simple.csv",
-        CsvReadOptions::new().schema(&schema),
-    )
-    .await?;
-    Ok(())
-}
-
 async fn register_aggregate_csv(ctx: &SessionContext) -> Result<()> {
     let testdata = datafusion::test_util::arrow_test_data();
     let schema = test_util::aggr_test_schema();
diff --git a/datafusion/core/tests/sql/partitioned_csv.rs b/datafusion/core/tests/sql/partitioned_csv.rs
index d5a1c2f0b4..b77557a66c 100644
--- a/datafusion/core/tests/sql/partitioned_csv.rs
+++ b/datafusion/core/tests/sql/partitioned_csv.rs
@@ -19,31 +19,13 @@
 
 use std::{io::Write, sync::Arc};
 
-use arrow::{
-    datatypes::{DataType, Field, Schema, SchemaRef},
-    record_batch::RecordBatch,
-};
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
 use datafusion::{
     error::Result,
     prelude::{CsvReadOptions, SessionConfig, SessionContext},
 };
 use tempfile::TempDir;
 
-/// Execute SQL and return results
-async fn plan_and_collect(
-    ctx: &mut SessionContext,
-    sql: &str,
-) -> Result<Vec<RecordBatch>> {
-    ctx.sql(sql).await?.collect().await
-}
-
-/// Execute SQL and return results
-pub async fn execute(sql: &str, partition_count: usize) -> Result<Vec<RecordBatch>> {
-    let tmp_dir = TempDir::new()?;
-    let mut ctx = create_ctx(&tmp_dir, partition_count).await?;
-    plan_and_collect(&mut ctx, sql).await
-}
-
 /// Generate CSV partitions within the supplied directory
 fn populate_csv_partitions(
     tmp_dir: &TempDir,
diff --git a/datafusion/core/tests/sql/projection.rs b/datafusion/core/tests/sql/projection.rs
deleted file mode 100644
index b31cb34f52..0000000000
--- a/datafusion/core/tests/sql/projection.rs
+++ /dev/null
@@ -1,373 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use datafusion::datasource::provider_as_source;
-use datafusion::test_util::scan_empty;
-use datafusion_expr::{when, LogicalPlanBuilder, UNNAMED_TABLE};
-use tempfile::TempDir;
-
-use super::*;
-
-#[tokio::test]
-async fn projection_same_fields() -> Result<()> {
-    let ctx = SessionContext::new();
-
-    let sql = "select (1+1) as a from (select 1 as a) as b;";
-    let actual = execute_to_batches(&ctx, sql).await;
-
-    #[rustfmt::skip]
-    let expected = ["+---+",
-        "| a |",
-        "+---+",
-        "| 2 |",
-        "+---+"];
-    assert_batches_eq!(expected, &actual);
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn projection_type_alias() -> Result<()> {
-    let ctx = SessionContext::new();
-    register_aggregate_simple_csv(&ctx).await?;
-
-    // Query that aliases one column to the name of a different column
-    // that also has a different type (c1 == float32, c3 == boolean)
-    let sql = "SELECT c1 as c3 FROM aggregate_simple ORDER BY c3 LIMIT 2";
-    let actual = execute_to_batches(&ctx, sql).await;
-
-    let expected = [
-        "+---------+",
-        "| c3      |",
-        "+---------+",
-        "| 0.00001 |",
-        "| 0.00002 |",
-        "+---------+",
-    ];
-    assert_batches_eq!(expected, &actual);
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn csv_query_group_by_avg_with_projection() -> Result<()> {
-    let ctx = SessionContext::new();
-    register_aggregate_csv(&ctx).await?;
-    let sql = "SELECT avg(c12), c1 FROM aggregate_test_100 GROUP BY c1";
-    let actual = execute_to_batches(&ctx, sql).await;
-    let expected = [
-        "+-----------------------------+----+",
-        "| AVG(aggregate_test_100.c12) | c1 |",
-        "+-----------------------------+----+",
-        "| 0.41040709263815384         | b  |",
-        "| 0.48600669271341534         | e  |",
-        "| 0.48754517466109415         | a  |",
-        "| 0.48855379387549824         | d  |",
-        "| 0.6600456536439784          | c  |",
-        "+-----------------------------+----+",
-    ];
-    assert_batches_sorted_eq!(expected, &actual);
-    Ok(())
-}
-
-#[tokio::test]
-async fn parallel_projection() -> Result<()> {
-    let partition_count = 4;
-    let results =
-        partitioned_csv::execute("SELECT c1, c2 FROM test", partition_count).await?;
-
-    let expected = vec![
-        "+----+----+",
-        "| c1 | c2 |",
-        "+----+----+",
-        "| 3  | 1  |",
-        "| 3  | 2  |",
-        "| 3  | 3  |",
-        "| 3  | 4  |",
-        "| 3  | 5  |",
-        "| 3  | 6  |",
-        "| 3  | 7  |",
-        "| 3  | 8  |",
-        "| 3  | 9  |",
-        "| 3  | 10 |",
-        "| 2  | 1  |",
-        "| 2  | 2  |",
-        "| 2  | 3  |",
-        "| 2  | 4  |",
-        "| 2  | 5  |",
-        "| 2  | 6  |",
-        "| 2  | 7  |",
-        "| 2  | 8  |",
-        "| 2  | 9  |",
-        "| 2  | 10 |",
-        "| 1  | 1  |",
-        "| 1  | 2  |",
-        "| 1  | 3  |",
-        "| 1  | 4  |",
-        "| 1  | 5  |",
-        "| 1  | 6  |",
-        "| 1  | 7  |",
-        "| 1  | 8  |",
-        "| 1  | 9  |",
-        "| 1  | 10 |",
-        "| 0  | 1  |",
-        "| 0  | 2  |",
-        "| 0  | 3  |",
-        "| 0  | 4  |",
-        "| 0  | 5  |",
-        "| 0  | 6  |",
-        "| 0  | 7  |",
-        "| 0  | 8  |",
-        "| 0  | 9  |",
-        "| 0  | 10 |",
-        "+----+----+",
-    ];
-    assert_batches_sorted_eq!(expected, &results);
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn subquery_alias_case_insensitive() -> Result<()> {
-    let partition_count = 1;
-    let results =
-        partitioned_csv::execute("SELECT V1.c1, v1.C2 FROM (SELECT test.C1, TEST.c2 FROM test) V1 ORDER BY v1.c1, V1.C2 LIMIT 1", partition_count).await?;
-
-    let expected = [
-        "+----+----+",
-        "| c1 | c2 |",
-        "+----+----+",
-        "| 0  | 1  |",
-        "+----+----+",
-    ];
-    assert_batches_sorted_eq!(expected, &results);
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn projection_on_table_scan() -> Result<()> {
-    let tmp_dir = TempDir::new()?;
-    let partition_count = 4;
-    let ctx = partitioned_csv::create_ctx(&tmp_dir, partition_count).await?;
-
-    let table = ctx.table("test").await?;
-    let logical_plan = LogicalPlanBuilder::from(table.into_optimized_plan()?)
-        .project(vec![col("c2")])?
-        .build()?;
-
-    let state = ctx.state();
-    let optimized_plan = state.optimize(&logical_plan)?;
-    match &optimized_plan {
-        LogicalPlan::TableScan(TableScan {
-            source,
-            projected_schema,
-            ..
-        }) => {
-            assert_eq!(source.schema().fields().len(), 3);
-            assert_eq!(projected_schema.fields().len(), 1);
-        }
-        _ => panic!("input to projection should be TableScan"),
-    }
-
-    let expected = "TableScan: test projection=[c2]";
-    assert_eq!(format!("{optimized_plan:?}"), expected);
-
-    let physical_plan = state.create_physical_plan(&optimized_plan).await?;
-
-    assert_eq!(1, physical_plan.schema().fields().len());
-    assert_eq!("c2", physical_plan.schema().field(0).name().as_str());
-    let batches = collect(physical_plan, state.task_ctx()).await?;
-    assert_eq!(40, batches.iter().map(|x| x.num_rows()).sum::<usize>());
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn preserve_nullability_on_projection() -> Result<()> {
-    let tmp_dir = TempDir::new()?;
-    let ctx = partitioned_csv::create_ctx(&tmp_dir, 1).await?;
-
-    let schema: Schema = ctx.table("test").await.unwrap().schema().clone().into();
-    assert!(!schema.field_with_name("c1")?.is_nullable());
-
-    let plan = scan_empty(None, &schema, None)?
-        .project(vec![col("c1")])?
-        .build()?;
-
-    let dataframe = DataFrame::new(ctx.state(), plan);
-    let physical_plan = dataframe.create_physical_plan().await?;
-    assert!(!physical_plan.schema().field_with_name("c1")?.is_nullable());
-    Ok(())
-}
-
-#[tokio::test]
-async fn project_cast_dictionary() {
-    let ctx = SessionContext::new();
-
-    let host: DictionaryArray<Int32Type> = vec![Some("host1"), None, Some("host2")]
-        .into_iter()
-        .collect();
-
-    let batch = RecordBatch::try_from_iter(vec![("host", Arc::new(host) as _)]).unwrap();
-
-    let t = MemTable::try_new(batch.schema(), vec![vec![batch]]).unwrap();
-
-    // Note that `host` is a dictionary array but `lit("")` is a DataType::Utf8 that needs to be cast
-    let expr = when(col("host").is_null(), lit(""))
-        .otherwise(col("host"))
-        .unwrap();
-
-    let projection = None;
-    let builder = LogicalPlanBuilder::scan(
-        "cpu_load_short",
-        provider_as_source(Arc::new(t)),
-        projection,
-    )
-    .unwrap();
-
-    let logical_plan = builder.project(vec![expr]).unwrap().build().unwrap();
-    let df = DataFrame::new(ctx.state(), logical_plan);
-    let actual = df.collect().await.unwrap();
-
-    let expected = ["+----------------------------------------------------------------------------------+",
-        "| CASE WHEN cpu_load_short.host IS NULL THEN Utf8(\"\") ELSE cpu_load_short.host END |",
-        "+----------------------------------------------------------------------------------+",
-        "| host1                                                                            |",
-        "|                                                                                  |",
-        "| host2                                                                            |",
-        "+----------------------------------------------------------------------------------+"];
-    assert_batches_eq!(expected, &actual);
-}
-
-#[tokio::test]
-async fn projection_on_memory_scan() -> Result<()> {
-    let schema = Schema::new(vec![
-        Field::new("a", DataType::Int32, false),
-        Field::new("b", DataType::Int32, false),
-        Field::new("c", DataType::Int32, false),
-    ]);
-    let schema = SchemaRef::new(schema);
-
-    let partitions = vec![vec![RecordBatch::try_new(
-        schema.clone(),
-        vec![
-            Arc::new(Int32Array::from(vec![1, 10, 10, 100])),
-            Arc::new(Int32Array::from(vec![2, 12, 12, 120])),
-            Arc::new(Int32Array::from(vec![3, 12, 12, 120])),
-        ],
-    )?]];
-
-    let provider = Arc::new(MemTable::try_new(schema, partitions)?);
-    let plan =
-        LogicalPlanBuilder::scan(UNNAMED_TABLE, provider_as_source(provider), None)?
-            .project(vec![col("b")])?
-            .build()?;
-    assert_fields_eq(&plan, vec!["b"]);
-
-    let ctx = SessionContext::new();
-    let state = ctx.state();
-    let optimized_plan = state.optimize(&plan)?;
-    match &optimized_plan {
-        LogicalPlan::TableScan(TableScan {
-            source,
-            projected_schema,
-            ..
-        }) => {
-            assert_eq!(source.schema().fields().len(), 3);
-            assert_eq!(projected_schema.fields().len(), 1);
-        }
-        _ => panic!("input to projection should be InMemoryScan"),
-    }
-
-    let expected = format!("TableScan: {UNNAMED_TABLE} projection=[b]");
-    assert_eq!(format!("{optimized_plan:?}"), expected);
-
-    let physical_plan = state.create_physical_plan(&optimized_plan).await?;
-
-    assert_eq!(1, physical_plan.schema().fields().len());
-    assert_eq!("b", physical_plan.schema().field(0).name().as_str());
-
-    let batches = collect(physical_plan, state.task_ctx()).await?;
-    assert_eq!(1, batches.len());
-    assert_eq!(1, batches[0].num_columns());
-    assert_eq!(4, batches[0].num_rows());
-
-    Ok(())
-}
-
-fn assert_fields_eq(plan: &LogicalPlan, expected: Vec<&str>) {
-    let actual: Vec<String> = plan
-        .schema()
-        .fields()
-        .iter()
-        .map(|f| f.name().clone())
-        .collect();
-    assert_eq!(actual, expected);
-}
-
-#[tokio::test]
-async fn project_column_with_same_name_as_relation() -> Result<()> {
-    let ctx = SessionContext::new();
-
-    let sql = "select a.a from (select 1 as a) as a;";
-    let actual = execute_to_batches(&ctx, sql).await;
-
-    let expected = ["+---+", "| a |", "+---+", "| 1 |", "+---+"];
-    assert_batches_sorted_eq!(expected, &actual);
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn project_column_with_filters_that_cant_pushed_down_always_false() -> Result<()> {
-    let ctx = SessionContext::new();
-
-    let sql = "select * from (select 1 as a) f where f.a=2;";
-    let actual = execute_to_batches(&ctx, sql).await;
-
-    let expected = ["++", "++"];
-    assert_batches_sorted_eq!(expected, &actual);
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn project_column_with_filters_that_cant_pushed_down_always_true() -> Result<()> {
-    let ctx = SessionContext::new();
-
-    let sql = "select * from (select 1 as a) f where f.a=1;";
-    let actual = execute_to_batches(&ctx, sql).await;
-
-    let expected = ["+---+", "| a |", "+---+", "| 1 |", "+---+"];
-    assert_batches_sorted_eq!(expected, &actual);
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn project_columns_in_memory_without_propagation() -> Result<()> {
-    let ctx = SessionContext::new();
-
-    let sql = "select column1 as a from (values (1), (2)) f where f.column1 = 2;";
-    let actual = execute_to_batches(&ctx, sql).await;
-
-    let expected = ["+---+", "| a |", "+---+", "| 2 |", "+---+"];
-    assert_batches_sorted_eq!(expected, &actual);
-
-    Ok(())
-}
diff --git a/datafusion/sqllogictest/test_files/projection.slt b/datafusion/sqllogictest/test_files/projection.slt
new file mode 100644
index 0000000000..b752f5644b
--- /dev/null
+++ b/datafusion/sqllogictest/test_files/projection.slt
@@ -0,0 +1,235 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+
+#   http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+##########
+## Projection Statement Tests
+##########
+
+# prepare data
+statement ok
+CREATE EXTERNAL TABLE aggregate_test_100 (
+  c1  VARCHAR NOT NULL,
+  c2  TINYINT NOT NULL,
+  c3  SMALLINT NOT NULL,
+  c4  SMALLINT,
+  c5  INT,
+  c6  BIGINT NOT NULL,
+  c7  SMALLINT NOT NULL,
+  c8  INT NOT NULL,
+  c9  BIGINT UNSIGNED NOT NULL,
+  c10 VARCHAR NOT NULL,
+  c11 FLOAT NOT NULL,
+  c12 DOUBLE NOT NULL,
+  c13 VARCHAR NOT NULL
+)
+STORED AS CSV
+WITH HEADER ROW
+LOCATION '../../testing/data/csv/aggregate_test_100.csv'
+
+statement ok
+CREATE EXTERNAL TABLE aggregate_simple (
+  c1 FLOAT NOT NULL,
+  c2 DOUBLE NOT NULL,
+  c3 BOOLEAN NOT NULL
+)
+STORED AS CSV
+WITH HEADER ROW
+LOCATION '../core/tests/data/aggregate_simple.csv'
+
+statement ok
+CREATE TABLE memory_table(a INT NOT NULL, b INT NOT NULL, c INT NOT NULL) AS VALUES
+(1, 2, 3),
+(10, 12, 12),
+(10, 12, 12),
+(100, 120, 120);
+
+statement ok
+CREATE TABLE cpu_load_short(host STRING NOT NULL) AS VALUES
+('host1'),
+('host2');
+
+statement ok
+CREATE EXTERNAL TABLE test (c1 int, c2 bigint, c3 boolean)
+STORED AS CSV LOCATION '../core/tests/data/partitioned_csv';
+
+statement ok
+CREATE EXTERNAL TABLE test_simple (c1 int, c2 bigint, c3 boolean)
+STORED AS CSV LOCATION '../core/tests/data/partitioned_csv/partition-0.csv';
+
+# projection same fields
+query I rowsort
+select (1+1) as a from (select 1 as a) as b;
+----
+2
+
+# projection type alias
+query R rowsort
+SELECT c1 as c3 FROM aggregate_simple ORDER BY c3 LIMIT 2;
+----
+0.00001
+0.00002
+
+# csv query group by avg with projection
+query RT rowsort
+SELECT avg(c12), c1 FROM aggregate_test_100 GROUP BY c1;
+----
+0.410407092638 b
+0.486006692713 e
+0.487545174661 a
+0.488553793875 d
+0.660045653644 c
+
+# parallel projection
+query II
+SELECT c1, c2 FROM test ORDER BY c1 DESC, c2 ASC
+----
+3 0
+3 1
+3 2
+3 3
+3 4
+3 5
+3 6
+3 7
+3 8
+3 9
+3 10
+2 0
+2 1
+2 2
+2 3
+2 4
+2 5
+2 6
+2 7
+2 8
+2 9
+2 10
+1 0
+1 1
+1 2
+1 3
+1 4
+1 5
+1 6
+1 7
+1 8
+1 9
+1 10
+0 0
+0 1
+0 2
+0 3
+0 4
+0 5
+0 6
+0 7
+0 8
+0 9
+0 10
+
+# subquery alias case insensitive
+query II
+SELECT V1.c1, v1.C2 FROM (SELECT test_simple.C1, TEST_SIMPLE.c2 FROM test_simple) V1 ORDER BY v1.c1, V1.C2 LIMIT 1;
+----
+0 0
+
+# projection on table scan
+statement ok
+set datafusion.explain.logical_plan_only = true
+
+query TT
+EXPLAIN SELECT c2 FROM test;
+----
+logical_plan TableScan: test projection=[c2]
+
+statement count 44
+select c2 from test;
+
+statement ok
+set datafusion.explain.logical_plan_only = false
+
+# project cast dictionary
+query T
+SELECT 
+    CASE 
+        WHEN cpu_load_short.host IS NULL THEN ''
+        ELSE cpu_load_short.host
+    END AS host
+FROM 
+    cpu_load_short;
+----
+host1
+host2
+
+# projection on memory scan
+query TT
+explain select b from memory_table;
+----
+logical_plan TableScan: memory_table projection=[b]
+physical_plan MemoryExec: partitions=1, partition_sizes=[1]
+
+query I
+select b from memory_table;
+----
+2
+12
+12
+120
+
+# project column with same name as relation
+query I
+select a.a from (select 1 as a) as a;
+----
+1
+
+# project column with filters that cant pushed down always false
+query I
+select * from (select 1 as a) f where f.a=2;
+----
+
+
+# project column with filters that cant pushed down always true
+query I
+select * from (select 1 as a) f where f.a=1;
+----
+1
+
+# project columns in memory without propagation
+query I
+SELECT column1 as a from (values (1), (2)) f where f.column1 = 2;
+----
+2
+
+# clean data
+statement ok
+DROP TABLE aggregate_simple;
+
+statement ok
+DROP TABLE aggregate_test_100;
+
+statement ok
+DROP TABLE memory_table;
+
+statement ok
+DROP TABLE cpu_load_short;
+
+statement ok
+DROP TABLE test;
+
+statement ok
+DROP TABLE test_simple;