You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/04/25 10:12:32 UTC
[arrow-datafusion] branch master updated: move sql tests from `context.rs` to corresponding test files in `tests/sql` (#2329)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 2237c7ed4 move sql tests from `context.rs` to corresponding test files in `tests/sql` (#2329)
2237c7ed4 is described below
commit 2237c7ed43547150219cf7ac6dbf5d07d6c1b75c
Author: DuRipeng <45...@qq.com>
AuthorDate: Mon Apr 25 18:12:25 2022 +0800
move sql tests from `context.rs` to corresponding test files in `tests/sql` (#2329)
* move context.rs ut out
* fix clippy
* fix fmt
---
datafusion/core/src/execution/context.rs | 1288 +-----------------------------
datafusion/core/tests/sql/aggregates.rs | 303 +++++++
datafusion/core/tests/sql/functions.rs | 237 ++++++
datafusion/core/tests/sql/group_by.rs | 196 +++++
datafusion/core/tests/sql/joins.rs | 107 +++
datafusion/core/tests/sql/limit.rs | 74 ++
datafusion/core/tests/sql/mod.rs | 95 +++
datafusion/core/tests/sql/select.rs | 146 ++++
datafusion/core/tests/sql/window.rs | 156 ++++
9 files changed, 1344 insertions(+), 1258 deletions(-)
diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs
index eae531970..5378e38da 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -1602,7 +1602,6 @@ impl FunctionRegistry for TaskContext {
mod tests {
use super::*;
use crate::execution::context::QueryPlanner;
- use crate::from_slice::FromSlice;
use crate::logical_plan::{binary_expr, lit, Operator};
use crate::physical_plan::functions::make_scalar_function;
use crate::test;
@@ -1611,15 +1610,8 @@ mod tests {
assert_batches_eq, assert_batches_sorted_eq,
logical_plan::{col, create_udf, sum, Expr},
};
- use crate::{
- datasource::MemTable, logical_plan::create_udaf,
- physical_plan::expressions::AvgAccumulator,
- };
- use arrow::array::{
- Array, ArrayRef, DictionaryArray, Float32Array, Float64Array, Int16Array,
- Int32Array, Int64Array, Int8Array, LargeStringArray, UInt16Array, UInt32Array,
- UInt64Array, UInt8Array,
- };
+ use crate::{logical_plan::create_udaf, physical_plan::expressions::AvgAccumulator};
+ use arrow::array::ArrayRef;
use arrow::datatypes::*;
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
@@ -1663,783 +1655,48 @@ mod tests {
}
#[tokio::test]
- async fn create_variable_expr() -> Result<()> {
- let tmp_dir = TempDir::new()?;
- let partition_count = 4;
- let mut ctx = create_ctx(&tmp_dir, partition_count).await?;
-
- let variable_provider = test::variable::SystemVar::new();
- ctx.register_variable(VarType::System, Arc::new(variable_provider));
- let variable_provider = test::variable::UserDefinedVar::new();
- ctx.register_variable(VarType::UserDefined, Arc::new(variable_provider));
-
- let provider = test::create_table_dual();
- ctx.register_table("dual", provider)?;
-
- let results =
- plan_and_collect(&ctx, "SELECT @@version, @name, @integer + 1 FROM dual")
- .await?;
-
- let expected = vec![
- "+----------------------+------------------------+------------------------+",
- "| @@version | @name | @integer Plus Int64(1) |",
- "+----------------------+------------------------+------------------------+",
- "| system-var-@@version | user-defined-var-@name | 42 |",
- "+----------------------+------------------------+------------------------+",
- ];
- assert_batches_eq!(expected, &results);
-
- Ok(())
- }
-
- #[tokio::test]
- async fn register_deregister() -> Result<()> {
- let tmp_dir = TempDir::new()?;
- let partition_count = 4;
- let ctx = create_ctx(&tmp_dir, partition_count).await?;
-
- let provider = test::create_table_dual();
- ctx.register_table("dual", provider)?;
-
- assert!(ctx.deregister_table("dual")?.is_some());
- assert!(ctx.deregister_table("dual")?.is_none());
-
- Ok(())
- }
-
- #[tokio::test]
- async fn left_join_using() -> Result<()> {
- let results = execute(
- "SELECT t1.c1, t2.c2 FROM test t1 JOIN test t2 USING (c2) ORDER BY t2.c2",
- 1,
- )
- .await?;
- assert_eq!(results.len(), 1);
-
- let expected = vec![
- "+----+----+",
- "| c1 | c2 |",
- "+----+----+",
- "| 0 | 1 |",
- "| 0 | 2 |",
- "| 0 | 3 |",
- "| 0 | 4 |",
- "| 0 | 5 |",
- "| 0 | 6 |",
- "| 0 | 7 |",
- "| 0 | 8 |",
- "| 0 | 9 |",
- "| 0 | 10 |",
- "+----+----+",
- ];
-
- assert_batches_eq!(expected, &results);
- Ok(())
- }
-
- #[tokio::test]
- async fn left_join_using_join_key_projection() -> Result<()> {
- let results = execute(
- "SELECT t1.c1, t1.c2, t2.c2 FROM test t1 JOIN test t2 USING (c2) ORDER BY t2.c2",
- 1,
- )
- .await?;
- assert_eq!(results.len(), 1);
-
- let expected = vec![
- "+----+----+----+",
- "| c1 | c2 | c2 |",
- "+----+----+----+",
- "| 0 | 1 | 1 |",
- "| 0 | 2 | 2 |",
- "| 0 | 3 | 3 |",
- "| 0 | 4 | 4 |",
- "| 0 | 5 | 5 |",
- "| 0 | 6 | 6 |",
- "| 0 | 7 | 7 |",
- "| 0 | 8 | 8 |",
- "| 0 | 9 | 9 |",
- "| 0 | 10 | 10 |",
- "+----+----+----+",
- ];
-
- assert_batches_eq!(expected, &results);
- Ok(())
- }
-
- #[tokio::test]
- async fn left_join() -> Result<()> {
- let results = execute(
- "SELECT t1.c1, t1.c2, t2.c2 FROM test t1 JOIN test t2 ON t1.c2 = t2.c2 ORDER BY t1.c2",
- 1,
- )
- .await?;
- assert_eq!(results.len(), 1);
-
- let expected = vec![
- "+----+----+----+",
- "| c1 | c2 | c2 |",
- "+----+----+----+",
- "| 0 | 1 | 1 |",
- "| 0 | 2 | 2 |",
- "| 0 | 3 | 3 |",
- "| 0 | 4 | 4 |",
- "| 0 | 5 | 5 |",
- "| 0 | 6 | 6 |",
- "| 0 | 7 | 7 |",
- "| 0 | 8 | 8 |",
- "| 0 | 9 | 9 |",
- "| 0 | 10 | 10 |",
- "+----+----+----+",
- ];
-
- assert_batches_eq!(expected, &results);
- Ok(())
- }
-
- #[tokio::test]
- async fn window() -> Result<()> {
- let results = execute(
- "SELECT \
- c1, \
- c2, \
- SUM(c2) OVER (), \
- COUNT(c2) OVER (), \
- MAX(c2) OVER (), \
- MIN(c2) OVER (), \
- AVG(c2) OVER () \
- FROM test \
- ORDER BY c1, c2 \
- LIMIT 5",
- 4,
- )
- .await?;
- // result in one batch, although e.g. having 2 batches do not change
- // result semantics, having a len=1 assertion upfront keeps surprises
- // at bay
- assert_eq!(results.len(), 1);
-
- let expected = vec![
- "+----+----+--------------+----------------+--------------+--------------+--------------+",
- "| c1 | c2 | SUM(test.c2) | COUNT(test.c2) | MAX(test.c2) | MIN(test.c2) | AVG(test.c2) |",
- "+----+----+--------------+----------------+--------------+--------------+--------------+",
- "| 0 | 1 | 220 | 40 | 10 | 1 | 5.5 |",
- "| 0 | 2 | 220 | 40 | 10 | 1 | 5.5 |",
- "| 0 | 3 | 220 | 40 | 10 | 1 | 5.5 |",
- "| 0 | 4 | 220 | 40 | 10 | 1 | 5.5 |",
- "| 0 | 5 | 220 | 40 | 10 | 1 | 5.5 |",
- "+----+----+--------------+----------------+--------------+--------------+--------------+",
- ];
-
- // window function shall respect ordering
- assert_batches_eq!(expected, &results);
- Ok(())
- }
-
- #[tokio::test]
- async fn window_order_by() -> Result<()> {
- let results = execute(
- "SELECT \
- c1, \
- c2, \
- ROW_NUMBER() OVER (ORDER BY c1, c2), \
- FIRST_VALUE(c2) OVER (ORDER BY c1, c2), \
- LAST_VALUE(c2) OVER (ORDER BY c1, c2), \
- NTH_VALUE(c2, 2) OVER (ORDER BY c1, c2), \
- SUM(c2) OVER (ORDER BY c1, c2), \
- COUNT(c2) OVER (ORDER BY c1, c2), \
- MAX(c2) OVER (ORDER BY c1, c2), \
- MIN(c2) OVER (ORDER BY c1, c2), \
- AVG(c2) OVER (ORDER BY c1, c2) \
- FROM test \
- ORDER BY c1, c2 \
- LIMIT 5",
- 4,
- )
- .await?;
- // result in one batch, although e.g. having 2 batches do not change
- // result semantics, having a len=1 assertion upfront keeps surprises
- // at bay
- assert_eq!(results.len(), 1);
-
- let expected = vec![
- "+----+----+--------------+----------------------+---------------------+-----------------------------+--------------+----------------+--------------+--------------+--------------+",
- "| c1 | c2 | ROW_NUMBER() | FIRST_VALUE(test.c2) | LAST_VALUE(test.c2) | NTH_VALUE(test.c2,Int64(2)) | SUM(test.c2) | COUNT(test.c2) | MAX(test.c2) | MIN(test.c2) | AVG(test.c2) |",
- "+----+----+--------------+----------------------+---------------------+-----------------------------+--------------+----------------+--------------+--------------+--------------+",
- "| 0 | 1 | 1 | 1 | 1 | | 1 | 1 | 1 | 1 | 1 |",
- "| 0 | 2 | 2 | 1 | 2 | 2 | 3 | 2 | 2 | 1 | 1.5 |",
- "| 0 | 3 | 3 | 1 | 3 | 2 | 6 | 3 | 3 | 1 | 2 |",
- "| 0 | 4 | 4 | 1 | 4 | 2 | 10 | 4 | 4 | 1 | 2.5 |",
- "| 0 | 5 | 5 | 1 | 5 | 2 | 15 | 5 | 5 | 1 | 3 |",
- "+----+----+--------------+----------------------+---------------------+-----------------------------+--------------+----------------+--------------+--------------+--------------+",
- ];
-
- // window function shall respect ordering
- assert_batches_eq!(expected, &results);
- Ok(())
- }
-
- #[tokio::test]
- async fn window_partition_by() -> Result<()> {
- let results = execute(
- "SELECT \
- c1, \
- c2, \
- SUM(c2) OVER (PARTITION BY c2), \
- COUNT(c2) OVER (PARTITION BY c2), \
- MAX(c2) OVER (PARTITION BY c2), \
- MIN(c2) OVER (PARTITION BY c2), \
- AVG(c2) OVER (PARTITION BY c2) \
- FROM test \
- ORDER BY c1, c2 \
- LIMIT 5",
- 4,
- )
- .await?;
-
- let expected = vec![
- "+----+----+--------------+----------------+--------------+--------------+--------------+",
- "| c1 | c2 | SUM(test.c2) | COUNT(test.c2) | MAX(test.c2) | MIN(test.c2) | AVG(test.c2) |",
- "+----+----+--------------+----------------+--------------+--------------+--------------+",
- "| 0 | 1 | 4 | 4 | 1 | 1 | 1 |",
- "| 0 | 2 | 8 | 4 | 2 | 2 | 2 |",
- "| 0 | 3 | 12 | 4 | 3 | 3 | 3 |",
- "| 0 | 4 | 16 | 4 | 4 | 4 | 4 |",
- "| 0 | 5 | 20 | 4 | 5 | 5 | 5 |",
- "+----+----+--------------+----------------+--------------+--------------+--------------+",
- ];
-
- // window function shall respect ordering
- assert_batches_eq!(expected, &results);
- Ok(())
- }
-
- #[tokio::test]
- async fn window_partition_by_order_by() -> Result<()> {
- let results = execute(
- "SELECT \
- c1, \
- c2, \
- ROW_NUMBER() OVER (PARTITION BY c2 ORDER BY c1), \
- FIRST_VALUE(c2 + c1) OVER (PARTITION BY c2 ORDER BY c1), \
- LAST_VALUE(c2 + c1) OVER (PARTITION BY c2 ORDER BY c1), \
- NTH_VALUE(c2 + c1, 1) OVER (PARTITION BY c2 ORDER BY c1), \
- SUM(c2) OVER (PARTITION BY c2 ORDER BY c1), \
- COUNT(c2) OVER (PARTITION BY c2 ORDER BY c1), \
- MAX(c2) OVER (PARTITION BY c2 ORDER BY c1), \
- MIN(c2) OVER (PARTITION BY c2 ORDER BY c1), \
- AVG(c2) OVER (PARTITION BY c2 ORDER BY c1) \
- FROM test \
- ORDER BY c1, c2 \
- LIMIT 5",
- 4,
- )
- .await?;
-
- let expected = vec![
- "+----+----+--------------+--------------------------------+-------------------------------+---------------------------------------+--------------+----------------+--------------+--------------+--------------+",
- "| c1 | c2 | ROW_NUMBER() | FIRST_VALUE(test.c2 + test.c1) | LAST_VALUE(test.c2 + test.c1) | NTH_VALUE(test.c2 + test.c1,Int64(1)) | SUM(test.c2) | COUNT(test.c2) | MAX(test.c2) | MIN(test.c2) | AVG(test.c2) |",
- "+----+----+--------------+--------------------------------+-------------------------------+---------------------------------------+--------------+----------------+--------------+--------------+--------------+",
- "| 0 | 1 | 1 | 1 | 1 | 1 | 1 | 1 | 1 | 1 | 1 |",
- "| 0 | 2 | 1 | 2 | 2 | 2 | 2 | 1 | 2 | 2 | 2 |",
- "| 0 | 3 | 1 | 3 | 3 | 3 | 3 | 1 | 3 | 3 | 3 |",
- "| 0 | 4 | 1 | 4 | 4 | 4 | 4 | 1 | 4 | 4 | 4 |",
- "| 0 | 5 | 1 | 5 | 5 | 5 | 5 | 1 | 5 | 5 | 5 |",
- "+----+----+--------------+--------------------------------+-------------------------------+---------------------------------------+--------------+----------------+--------------+--------------+--------------+",
- ];
-
- // window function shall respect ordering
- assert_batches_eq!(expected, &results);
- Ok(())
- }
-
- #[tokio::test]
- async fn aggregate_decimal_min() -> Result<()> {
- let ctx = SessionContext::new();
- // the data type of c1 is decimal(10,3)
- ctx.register_table("d_table", test::table_with_decimal())
- .unwrap();
- let result = plan_and_collect(&ctx, "select min(c1) from d_table")
- .await
- .unwrap();
- let expected = vec![
- "+-----------------+",
- "| MIN(d_table.c1) |",
- "+-----------------+",
- "| -100.009 |",
- "+-----------------+",
- ];
- assert_eq!(
- &DataType::Decimal(10, 3),
- result[0].schema().field(0).data_type()
- );
- assert_batches_sorted_eq!(expected, &result);
- Ok(())
- }
-
- #[tokio::test]
- async fn aggregate_decimal_max() -> Result<()> {
- let ctx = SessionContext::new();
- // the data type of c1 is decimal(10,3)
- ctx.register_table("d_table", test::table_with_decimal())
- .unwrap();
-
- let result = plan_and_collect(&ctx, "select max(c1) from d_table")
- .await
- .unwrap();
- let expected = vec![
- "+-----------------+",
- "| MAX(d_table.c1) |",
- "+-----------------+",
- "| 110.009 |",
- "+-----------------+",
- ];
- assert_eq!(
- &DataType::Decimal(10, 3),
- result[0].schema().field(0).data_type()
- );
- assert_batches_sorted_eq!(expected, &result);
- Ok(())
- }
-
- #[tokio::test]
- async fn aggregate_decimal_sum() -> Result<()> {
- let ctx = SessionContext::new();
- // the data type of c1 is decimal(10,3)
- ctx.register_table("d_table", test::table_with_decimal())
- .unwrap();
- let result = plan_and_collect(&ctx, "select sum(c1) from d_table")
- .await
- .unwrap();
- let expected = vec![
- "+-----------------+",
- "| SUM(d_table.c1) |",
- "+-----------------+",
- "| 100.000 |",
- "+-----------------+",
- ];
- assert_eq!(
- &DataType::Decimal(20, 3),
- result[0].schema().field(0).data_type()
- );
- assert_batches_sorted_eq!(expected, &result);
- Ok(())
- }
-
- #[tokio::test]
- async fn aggregate_decimal_avg() -> Result<()> {
- let ctx = SessionContext::new();
- // the data type of c1 is decimal(10,3)
- ctx.register_table("d_table", test::table_with_decimal())
- .unwrap();
- let result = plan_and_collect(&ctx, "select avg(c1) from d_table")
- .await
- .unwrap();
- let expected = vec![
- "+-----------------+",
- "| AVG(d_table.c1) |",
- "+-----------------+",
- "| 5.0000000 |",
- "+-----------------+",
- ];
- assert_eq!(
- &DataType::Decimal(14, 7),
- result[0].schema().field(0).data_type()
- );
- assert_batches_sorted_eq!(expected, &result);
- Ok(())
- }
-
- #[tokio::test]
- async fn aggregate() -> Result<()> {
- let results = execute("SELECT SUM(c1), SUM(c2) FROM test", 4).await?;
- assert_eq!(results.len(), 1);
-
- let expected = vec![
- "+--------------+--------------+",
- "| SUM(test.c1) | SUM(test.c2) |",
- "+--------------+--------------+",
- "| 60 | 220 |",
- "+--------------+--------------+",
- ];
- assert_batches_sorted_eq!(expected, &results);
-
- Ok(())
- }
-
- #[tokio::test]
- async fn aggregate_empty() -> Result<()> {
- // The predicate on this query purposely generates no results
- let results = execute("SELECT SUM(c1), SUM(c2) FROM test where c1 > 100000", 4)
- .await
- .unwrap();
-
- assert_eq!(results.len(), 1);
-
- let expected = vec![
- "+--------------+--------------+",
- "| SUM(test.c1) | SUM(test.c2) |",
- "+--------------+--------------+",
- "| | |",
- "+--------------+--------------+",
- ];
- assert_batches_sorted_eq!(expected, &results);
-
- Ok(())
- }
-
- #[tokio::test]
- async fn aggregate_avg() -> Result<()> {
- let results = execute("SELECT AVG(c1), AVG(c2) FROM test", 4).await?;
- assert_eq!(results.len(), 1);
-
- let expected = vec![
- "+--------------+--------------+",
- "| AVG(test.c1) | AVG(test.c2) |",
- "+--------------+--------------+",
- "| 1.5 | 5.5 |",
- "+--------------+--------------+",
- ];
- assert_batches_sorted_eq!(expected, &results);
-
- Ok(())
- }
-
- #[tokio::test]
- async fn aggregate_max() -> Result<()> {
- let results = execute("SELECT MAX(c1), MAX(c2) FROM test", 4).await?;
- assert_eq!(results.len(), 1);
-
- let expected = vec![
- "+--------------+--------------+",
- "| MAX(test.c1) | MAX(test.c2) |",
- "+--------------+--------------+",
- "| 3 | 10 |",
- "+--------------+--------------+",
- ];
- assert_batches_sorted_eq!(expected, &results);
-
- Ok(())
- }
-
- #[tokio::test]
- async fn aggregate_min() -> Result<()> {
- let results = execute("SELECT MIN(c1), MIN(c2) FROM test", 4).await?;
- assert_eq!(results.len(), 1);
-
- let expected = vec![
- "+--------------+--------------+",
- "| MIN(test.c1) | MIN(test.c2) |",
- "+--------------+--------------+",
- "| 0 | 1 |",
- "+--------------+--------------+",
- ];
- assert_batches_sorted_eq!(expected, &results);
-
- Ok(())
- }
-
- #[tokio::test]
- async fn aggregate_grouped() -> Result<()> {
- let results = execute("SELECT c1, SUM(c2) FROM test GROUP BY c1", 4).await?;
-
- let expected = vec![
- "+----+--------------+",
- "| c1 | SUM(test.c2) |",
- "+----+--------------+",
- "| 0 | 55 |",
- "| 1 | 55 |",
- "| 2 | 55 |",
- "| 3 | 55 |",
- "+----+--------------+",
- ];
- assert_batches_sorted_eq!(expected, &results);
-
- Ok(())
- }
-
- #[tokio::test]
- async fn aggregate_grouped_avg() -> Result<()> {
- let results = execute("SELECT c1, AVG(c2) FROM test GROUP BY c1", 4).await?;
-
- let expected = vec![
- "+----+--------------+",
- "| c1 | AVG(test.c2) |",
- "+----+--------------+",
- "| 0 | 5.5 |",
- "| 1 | 5.5 |",
- "| 2 | 5.5 |",
- "| 3 | 5.5 |",
- "+----+--------------+",
- ];
- assert_batches_sorted_eq!(expected, &results);
-
- Ok(())
- }
-
- #[tokio::test]
- async fn boolean_literal() -> Result<()> {
- let results =
- execute("SELECT c1, c3 FROM test WHERE c1 > 2 AND c3 = true", 4).await?;
-
- let expected = vec![
- "+----+------+",
- "| c1 | c3 |",
- "+----+------+",
- "| 3 | true |",
- "| 3 | true |",
- "| 3 | true |",
- "| 3 | true |",
- "| 3 | true |",
- "+----+------+",
- ];
- assert_batches_sorted_eq!(expected, &results);
-
- Ok(())
- }
-
- #[tokio::test]
- async fn aggregate_grouped_empty() -> Result<()> {
- let results =
- execute("SELECT c1, AVG(c2) FROM test WHERE c1 = 123 GROUP BY c1", 4).await?;
-
- let expected = vec![
- "+----+--------------+",
- "| c1 | AVG(test.c2) |",
- "+----+--------------+",
- "+----+--------------+",
- ];
- assert_batches_sorted_eq!(expected, &results);
-
- Ok(())
- }
-
- #[tokio::test]
- async fn aggregate_grouped_max() -> Result<()> {
- let results = execute("SELECT c1, MAX(c2) FROM test GROUP BY c1", 4).await?;
-
- let expected = vec![
- "+----+--------------+",
- "| c1 | MAX(test.c2) |",
- "+----+--------------+",
- "| 0 | 10 |",
- "| 1 | 10 |",
- "| 2 | 10 |",
- "| 3 | 10 |",
- "+----+--------------+",
- ];
- assert_batches_sorted_eq!(expected, &results);
-
- Ok(())
- }
-
- #[tokio::test]
- async fn aggregate_grouped_min() -> Result<()> {
- let results = execute("SELECT c1, MIN(c2) FROM test GROUP BY c1", 4).await?;
-
- let expected = vec![
- "+----+--------------+",
- "| c1 | MIN(test.c2) |",
- "+----+--------------+",
- "| 0 | 1 |",
- "| 1 | 1 |",
- "| 2 | 1 |",
- "| 3 | 1 |",
- "+----+--------------+",
- ];
- assert_batches_sorted_eq!(expected, &results);
-
- Ok(())
- }
-
- #[tokio::test]
- async fn aggregate_avg_add() -> Result<()> {
- let results = execute(
- "SELECT AVG(c1), AVG(c1) + 1, AVG(c1) + 2, 1 + AVG(c1) FROM test",
- 4,
- )
- .await?;
- assert_eq!(results.len(), 1);
-
- let expected = vec![
- "+--------------+----------------------------+----------------------------+----------------------------+",
- "| AVG(test.c1) | AVG(test.c1) Plus Int64(1) | AVG(test.c1) Plus Int64(2) | Int64(1) Plus AVG(test.c1) |",
- "+--------------+----------------------------+----------------------------+----------------------------+",
- "| 1.5 | 2.5 | 3.5 | 2.5 |",
- "+--------------+----------------------------+----------------------------+----------------------------+",
- ];
- assert_batches_sorted_eq!(expected, &results);
-
- Ok(())
- }
-
- #[tokio::test]
- async fn join_partitioned() -> Result<()> {
- // self join on partition id (workaround for duplicate column name)
- let results = execute(
- "SELECT 1 FROM test JOIN (SELECT c1 AS id1 FROM test) AS a ON c1=id1",
- 4,
- )
- .await?;
-
- assert_eq!(
- results.iter().map(|b| b.num_rows()).sum::<usize>(),
- 4 * 10 * 10
- );
-
- Ok(())
- }
-
- #[tokio::test]
- async fn count_basic() -> Result<()> {
- let results = execute("SELECT COUNT(c1), COUNT(c2) FROM test", 1).await?;
- assert_eq!(results.len(), 1);
-
- let expected = vec![
- "+----------------+----------------+",
- "| COUNT(test.c1) | COUNT(test.c2) |",
- "+----------------+----------------+",
- "| 10 | 10 |",
- "+----------------+----------------+",
- ];
- assert_batches_sorted_eq!(expected, &results);
- Ok(())
- }
-
- #[tokio::test]
- async fn count_partitioned() -> Result<()> {
- let results = execute("SELECT COUNT(c1), COUNT(c2) FROM test", 4).await?;
- assert_eq!(results.len(), 1);
-
- let expected = vec![
- "+----------------+----------------+",
- "| COUNT(test.c1) | COUNT(test.c2) |",
- "+----------------+----------------+",
- "| 40 | 40 |",
- "+----------------+----------------+",
- ];
- assert_batches_sorted_eq!(expected, &results);
- Ok(())
- }
-
- #[tokio::test]
- async fn count_aggregated() -> Result<()> {
- let results = execute("SELECT c1, COUNT(c2) FROM test GROUP BY c1", 4).await?;
-
- let expected = vec![
- "+----+----------------+",
- "| c1 | COUNT(test.c2) |",
- "+----+----------------+",
- "| 0 | 10 |",
- "| 1 | 10 |",
- "| 2 | 10 |",
- "| 3 | 10 |",
- "+----+----------------+",
- ];
- assert_batches_sorted_eq!(expected, &results);
- Ok(())
- }
-
- #[tokio::test]
- async fn group_by_date_trunc() -> Result<()> {
- let tmp_dir = TempDir::new()?;
- let ctx = SessionContext::new();
- let schema = Arc::new(Schema::new(vec![
- Field::new("c2", DataType::UInt64, false),
- Field::new(
- "t1",
- DataType::Timestamp(TimeUnit::Microsecond, None),
- false,
- ),
- ]));
-
- // generate a partitioned file
- for partition in 0..4 {
- let filename = format!("partition-{}.{}", partition, "csv");
- let file_path = tmp_dir.path().join(&filename);
- let mut file = File::create(file_path)?;
+ async fn create_variable_expr() -> Result<()> {
+ let tmp_dir = TempDir::new()?;
+ let partition_count = 4;
+ let mut ctx = create_ctx(&tmp_dir, partition_count).await?;
- // generate some data
- for i in 0..10 {
- let data = format!("{},2020-12-{}T00:00:00.000Z\n", i, i + 10);
- file.write_all(data.as_bytes())?;
- }
- }
+ let variable_provider = test::variable::SystemVar::new();
+ ctx.register_variable(VarType::System, Arc::new(variable_provider));
+ let variable_provider = test::variable::UserDefinedVar::new();
+ ctx.register_variable(VarType::UserDefined, Arc::new(variable_provider));
- ctx.register_csv(
- "test",
- tmp_dir.path().to_str().unwrap(),
- CsvReadOptions::new().schema(&schema).has_header(false),
- )
- .await?;
+ let provider = test::create_table_dual();
+ ctx.register_table("dual", provider)?;
- let results = plan_and_collect(
- &ctx,
- "SELECT date_trunc('week', t1) as week, SUM(c2) FROM test GROUP BY date_trunc('week', t1)",
- ).await?;
+ let results =
+ plan_and_collect(&ctx, "SELECT @@version, @name, @integer + 1 FROM dual")
+ .await?;
let expected = vec![
- "+---------------------+--------------+",
- "| week | SUM(test.c2) |",
- "+---------------------+--------------+",
- "| 2020-12-07 00:00:00 | 24 |",
- "| 2020-12-14 00:00:00 | 156 |",
- "+---------------------+--------------+",
+ "+----------------------+------------------------+------------------------+",
+ "| @@version | @name | @integer Plus Int64(1) |",
+ "+----------------------+------------------------+------------------------+",
+ "| system-var-@@version | user-defined-var-@name | 42 |",
+ "+----------------------+------------------------+------------------------+",
];
- assert_batches_sorted_eq!(expected, &results);
+ assert_batches_eq!(expected, &results);
Ok(())
}
#[tokio::test]
- async fn group_by_largeutf8() {
- {
- let ctx = SessionContext::new();
-
- // input data looks like:
- // A, 1
- // B, 2
- // A, 2
- // A, 4
- // C, 1
- // A, 1
-
- let str_array: LargeStringArray = vec!["A", "B", "A", "A", "C", "A"]
- .into_iter()
- .map(Some)
- .collect();
- let str_array = Arc::new(str_array);
-
- let val_array: Int64Array = vec![1, 2, 2, 4, 1, 1].into();
- let val_array = Arc::new(val_array);
-
- let schema = Arc::new(Schema::new(vec![
- Field::new("str", str_array.data_type().clone(), false),
- Field::new("val", val_array.data_type().clone(), false),
- ]));
-
- let batch =
- RecordBatch::try_new(schema.clone(), vec![str_array, val_array]).unwrap();
+ async fn register_deregister() -> Result<()> {
+ let tmp_dir = TempDir::new()?;
+ let partition_count = 4;
+ let ctx = create_ctx(&tmp_dir, partition_count).await?;
- let provider = MemTable::try_new(schema.clone(), vec![vec![batch]]).unwrap();
- ctx.register_table("t", Arc::new(provider)).unwrap();
+ let provider = test::create_table_dual();
+ ctx.register_table("dual", provider)?;
- let results =
- plan_and_collect(&ctx, "SELECT str, count(val) FROM t GROUP BY str")
- .await
- .expect("ran plan correctly");
+ assert!(ctx.deregister_table("dual")?.is_some());
+ assert!(ctx.deregister_table("dual")?.is_none());
- let expected = vec![
- "+-----+--------------+",
- "| str | COUNT(t.val) |",
- "+-----+--------------+",
- "| A | 4 |",
- "| B | 1 |",
- "| C | 1 |",
- "+-----+--------------+",
- ];
- assert_batches_sorted_eq!(expected, &results);
- }
+ Ok(())
}
#[tokio::test]
@@ -2466,224 +1723,6 @@ mod tests {
assert_batches_sorted_eq!(expected, &results);
}
- #[tokio::test]
- async fn group_by_dictionary() {
- async fn run_test_case<K: ArrowDictionaryKeyType>() {
- let ctx = SessionContext::new();
-
- // input data looks like:
- // A, 1
- // B, 2
- // A, 2
- // A, 4
- // C, 1
- // A, 1
-
- let dict_array: DictionaryArray<K> =
- vec!["A", "B", "A", "A", "C", "A"].into_iter().collect();
- let dict_array = Arc::new(dict_array);
-
- let val_array: Int64Array = vec![1, 2, 2, 4, 1, 1].into();
- let val_array = Arc::new(val_array);
-
- let schema = Arc::new(Schema::new(vec![
- Field::new("dict", dict_array.data_type().clone(), false),
- Field::new("val", val_array.data_type().clone(), false),
- ]));
-
- let batch = RecordBatch::try_new(schema.clone(), vec![dict_array, val_array])
- .unwrap();
-
- let provider = MemTable::try_new(schema.clone(), vec![vec![batch]]).unwrap();
- ctx.register_table("t", Arc::new(provider)).unwrap();
-
- let results =
- plan_and_collect(&ctx, "SELECT dict, count(val) FROM t GROUP BY dict")
- .await
- .expect("ran plan correctly");
-
- let expected = vec![
- "+------+--------------+",
- "| dict | COUNT(t.val) |",
- "+------+--------------+",
- "| A | 4 |",
- "| B | 1 |",
- "| C | 1 |",
- "+------+--------------+",
- ];
- assert_batches_sorted_eq!(expected, &results);
-
- // Now, use dict as an aggregate
- let results =
- plan_and_collect(&ctx, "SELECT val, count(dict) FROM t GROUP BY val")
- .await
- .expect("ran plan correctly");
-
- let expected = vec![
- "+-----+---------------+",
- "| val | COUNT(t.dict) |",
- "+-----+---------------+",
- "| 1 | 3 |",
- "| 2 | 2 |",
- "| 4 | 1 |",
- "+-----+---------------+",
- ];
- assert_batches_sorted_eq!(expected, &results);
-
- // Now, use dict as an aggregate
- let results = plan_and_collect(
- &ctx,
- "SELECT val, count(distinct dict) FROM t GROUP BY val",
- )
- .await
- .expect("ran plan correctly");
-
- let expected = vec![
- "+-----+------------------------+",
- "| val | COUNT(DISTINCT t.dict) |",
- "+-----+------------------------+",
- "| 1 | 2 |",
- "| 2 | 2 |",
- "| 4 | 1 |",
- "+-----+------------------------+",
- ];
- assert_batches_sorted_eq!(expected, &results);
- }
-
- run_test_case::<Int8Type>().await;
- run_test_case::<Int16Type>().await;
- run_test_case::<Int32Type>().await;
- run_test_case::<Int64Type>().await;
- run_test_case::<UInt8Type>().await;
- run_test_case::<UInt16Type>().await;
- run_test_case::<UInt32Type>().await;
- run_test_case::<UInt64Type>().await;
- }
-
- async fn run_count_distinct_integers_aggregated_scenario(
- partitions: Vec<Vec<(&str, u64)>>,
- ) -> Result<Vec<RecordBatch>> {
- let tmp_dir = TempDir::new()?;
- let ctx = SessionContext::new();
- let schema = Arc::new(Schema::new(vec![
- Field::new("c_group", DataType::Utf8, false),
- Field::new("c_int8", DataType::Int8, false),
- Field::new("c_int16", DataType::Int16, false),
- Field::new("c_int32", DataType::Int32, false),
- Field::new("c_int64", DataType::Int64, false),
- Field::new("c_uint8", DataType::UInt8, false),
- Field::new("c_uint16", DataType::UInt16, false),
- Field::new("c_uint32", DataType::UInt32, false),
- Field::new("c_uint64", DataType::UInt64, false),
- ]));
-
- for (i, partition) in partitions.iter().enumerate() {
- let filename = format!("partition-{}.csv", i);
- let file_path = tmp_dir.path().join(&filename);
- let mut file = File::create(file_path)?;
- for row in partition {
- let row_str = format!(
- "{},{}\n",
- row.0,
- // Populate values for each of the integer fields in the
- // schema.
- (0..8)
- .map(|_| { row.1.to_string() })
- .collect::<Vec<_>>()
- .join(","),
- );
- file.write_all(row_str.as_bytes())?;
- }
- }
- ctx.register_csv(
- "test",
- tmp_dir.path().to_str().unwrap(),
- CsvReadOptions::new().schema(&schema).has_header(false),
- )
- .await?;
-
- let results = plan_and_collect(
- &ctx,
- "
- SELECT
- c_group,
- COUNT(c_uint64),
- COUNT(DISTINCT c_int8),
- COUNT(DISTINCT c_int16),
- COUNT(DISTINCT c_int32),
- COUNT(DISTINCT c_int64),
- COUNT(DISTINCT c_uint8),
- COUNT(DISTINCT c_uint16),
- COUNT(DISTINCT c_uint32),
- COUNT(DISTINCT c_uint64)
- FROM test
- GROUP BY c_group
- ",
- )
- .await?;
-
- Ok(results)
- }
-
- #[tokio::test]
- async fn count_distinct_integers_aggregated_single_partition() -> Result<()> {
- let partitions = vec![
- // The first member of each tuple will be the value for the
- // `c_group` column, and the second member will be the value for
- // each of the int/uint fields.
- vec![
- ("a", 1),
- ("a", 1),
- ("a", 2),
- ("b", 9),
- ("c", 9),
- ("c", 10),
- ("c", 9),
- ],
- ];
-
- let results = run_count_distinct_integers_aggregated_scenario(partitions).await?;
-
- let expected = vec![
- "+---------+----------------------+-----------------------------+------------------------------+------------------------------+------------------------------+------------------------------+-------------------------------+-------------------------------+-------------------------------+",
- "| c_group | COUNT(test.c_uint64) | COUNT(DISTINCT test.c_int8) | COUNT(DISTINCT test.c_int16) | COUNT(DISTINCT test.c_int32) | COUNT(DISTINCT test.c_int64) | COUNT(DISTINCT test.c_uint8) | COUNT(DISTINCT test.c_uint16) | COUNT(DISTINCT test.c_uint32) | COUNT(DISTINCT test.c_uint64) |",
- "+---------+----------------------+-----------------------------+------------------------------+------------------------------+------------------------------+------------------------------+-------------------------------+-------------------------------+-------------------------------+",
- "| a | 3 | 2 | 2 | 2 | 2 | 2 | 2 | 2 | 2 |",
- "| b | 1 | 1 | 1 | 1 | 1 | 1 | 1 | 1 | 1 |",
- "| c | 3 | 2 | 2 | 2 | 2 | 2 | 2 | 2 | 2 |",
- "+---------+----------------------+-----------------------------+------------------------------+------------------------------+------------------------------+------------------------------+-------------------------------+-------------------------------+-------------------------------+",
- ];
- assert_batches_sorted_eq!(expected, &results);
-
- Ok(())
- }
-
- #[tokio::test]
- async fn count_distinct_integers_aggregated_multiple_partitions() -> Result<()> {
- let partitions = vec![
- // The first member of each tuple will be the value for the
- // `c_group` column, and the second member will be the value for
- // each of the int/uint fields.
- vec![("a", 1), ("a", 1), ("a", 2), ("b", 9), ("c", 9)],
- vec![("a", 1), ("a", 3), ("b", 8), ("b", 9), ("b", 10), ("b", 11)],
- ];
-
- let results = run_count_distinct_integers_aggregated_scenario(partitions).await?;
-
- let expected = vec![
- "+---------+----------------------+-----------------------------+------------------------------+------------------------------+------------------------------+------------------------------+-------------------------------+-------------------------------+-------------------------------+",
- "| c_group | COUNT(test.c_uint64) | COUNT(DISTINCT test.c_int8) | COUNT(DISTINCT test.c_int16) | COUNT(DISTINCT test.c_int32) | COUNT(DISTINCT test.c_int64) | COUNT(DISTINCT test.c_uint8) | COUNT(DISTINCT test.c_uint16) | COUNT(DISTINCT test.c_uint32) | COUNT(DISTINCT test.c_uint64) |",
- "+---------+----------------------+-----------------------------+------------------------------+------------------------------+------------------------------+------------------------------+-------------------------------+-------------------------------+-------------------------------+",
- "| a | 5 | 3 | 3 | 3 | 3 | 3 | 3 | 3 | 3 |",
- "| b | 5 | 4 | 4 | 4 | 4 | 4 | 4 | 4 | 4 |",
- "| c | 1 | 1 | 1 | 1 | 1 | 1 | 1 | 1 | 1 |",
- "+---------+----------------------+-----------------------------+------------------------------+------------------------------+------------------------------+------------------------------+-------------------------------+-------------------------------+-------------------------------+",
- ];
- assert_batches_sorted_eq!(expected, &results);
-
- Ok(())
- }
-
#[tokio::test]
async fn aggregate_with_alias() -> Result<()> {
let tmp_dir = TempDir::new()?;
@@ -2710,190 +1749,6 @@ mod tests {
Ok(())
}
- #[tokio::test]
- async fn limit() -> Result<()> {
- let tmp_dir = TempDir::new()?;
- let ctx = create_ctx(&tmp_dir, 1).await?;
- ctx.register_table("t", test::table_with_sequence(1, 1000).unwrap())
- .unwrap();
-
- let results = plan_and_collect(&ctx, "SELECT i FROM t ORDER BY i DESC limit 3")
- .await
- .unwrap();
-
- let expected = vec![
- "+------+", "| i |", "+------+", "| 1000 |", "| 999 |", "| 998 |",
- "+------+",
- ];
-
- assert_batches_eq!(expected, &results);
-
- let results = plan_and_collect(&ctx, "SELECT i FROM t ORDER BY i limit 3")
- .await
- .unwrap();
-
- let expected = vec![
- "+---+", "| i |", "+---+", "| 1 |", "| 2 |", "| 3 |", "+---+",
- ];
-
- assert_batches_eq!(expected, &results);
-
- let results = plan_and_collect(&ctx, "SELECT i FROM t limit 3")
- .await
- .unwrap();
-
- // the actual rows are not guaranteed, so only check the count (should be 3)
- let num_rows: usize = results.into_iter().map(|b| b.num_rows()).sum();
- assert_eq!(num_rows, 3);
-
- Ok(())
- }
-
- #[tokio::test]
- async fn limit_multi_partitions() -> Result<()> {
- let tmp_dir = TempDir::new()?;
- let ctx = create_ctx(&tmp_dir, 1).await?;
-
- let partitions = vec![
- vec![test::make_partition(0)],
- vec![test::make_partition(1)],
- vec![test::make_partition(2)],
- vec![test::make_partition(3)],
- vec![test::make_partition(4)],
- vec![test::make_partition(5)],
- ];
- let schema = partitions[0][0].schema();
- let provider = Arc::new(MemTable::try_new(schema, partitions).unwrap());
-
- ctx.register_table("t", provider).unwrap();
-
- // select all rows
- let results = plan_and_collect(&ctx, "SELECT i FROM t").await.unwrap();
-
- let num_rows: usize = results.into_iter().map(|b| b.num_rows()).sum();
- assert_eq!(num_rows, 15);
-
- for limit in 1..10 {
- let query = format!("SELECT i FROM t limit {}", limit);
- let results = plan_and_collect(&ctx, &query).await.unwrap();
-
- let num_rows: usize = results.into_iter().map(|b| b.num_rows()).sum();
- assert_eq!(num_rows, limit, "mismatch with query {}", query);
- }
-
- Ok(())
- }
-
- #[tokio::test]
- async fn case_sensitive_identifiers_functions() {
- let ctx = SessionContext::new();
- ctx.register_table("t", test::table_with_sequence(1, 1).unwrap())
- .unwrap();
-
- let expected = vec![
- "+-----------+",
- "| sqrt(t.i) |",
- "+-----------+",
- "| 1 |",
- "+-----------+",
- ];
-
- let results = plan_and_collect(&ctx, "SELECT sqrt(i) FROM t")
- .await
- .unwrap();
-
- assert_batches_sorted_eq!(expected, &results);
-
- let results = plan_and_collect(&ctx, "SELECT SQRT(i) FROM t")
- .await
- .unwrap();
- assert_batches_sorted_eq!(expected, &results);
-
- // Using double quotes allows specifying the function name with capitalization
- let err = plan_and_collect(&ctx, "SELECT \"SQRT\"(i) FROM t")
- .await
- .unwrap_err();
- assert_eq!(
- err.to_string(),
- "Error during planning: Invalid function 'SQRT'"
- );
-
- let results = plan_and_collect(&ctx, "SELECT \"sqrt\"(i) FROM t")
- .await
- .unwrap();
- assert_batches_sorted_eq!(expected, &results);
- }
-
- #[tokio::test]
- async fn case_builtin_math_expression() {
- let ctx = SessionContext::new();
-
- let type_values = vec![
- (
- DataType::Int8,
- Arc::new(Int8Array::from_slice(&[1])) as ArrayRef,
- ),
- (
- DataType::Int16,
- Arc::new(Int16Array::from_slice(&[1])) as ArrayRef,
- ),
- (
- DataType::Int32,
- Arc::new(Int32Array::from_slice(&[1])) as ArrayRef,
- ),
- (
- DataType::Int64,
- Arc::new(Int64Array::from_slice(&[1])) as ArrayRef,
- ),
- (
- DataType::UInt8,
- Arc::new(UInt8Array::from_slice(&[1])) as ArrayRef,
- ),
- (
- DataType::UInt16,
- Arc::new(UInt16Array::from_slice(&[1])) as ArrayRef,
- ),
- (
- DataType::UInt32,
- Arc::new(UInt32Array::from_slice(&[1])) as ArrayRef,
- ),
- (
- DataType::UInt64,
- Arc::new(UInt64Array::from_slice(&[1])) as ArrayRef,
- ),
- (
- DataType::Float32,
- Arc::new(Float32Array::from_slice(&[1.0_f32])) as ArrayRef,
- ),
- (
- DataType::Float64,
- Arc::new(Float64Array::from_slice(&[1.0_f64])) as ArrayRef,
- ),
- ];
-
- for (data_type, array) in type_values.iter() {
- let schema =
- Arc::new(Schema::new(vec![Field::new("v", data_type.clone(), false)]));
- let batch =
- RecordBatch::try_new(schema.clone(), vec![array.clone()]).unwrap();
- let provider = MemTable::try_new(schema, vec![vec![batch]]).unwrap();
- ctx.deregister_table("t").unwrap();
- ctx.register_table("t", Arc::new(provider)).unwrap();
- let expected = vec![
- "+-----------+",
- "| sqrt(t.v) |",
- "+-----------+",
- "| 1 |",
- "+-----------+",
- ];
- let results = plan_and_collect(&ctx, "SELECT sqrt(v) FROM t")
- .await
- .unwrap();
-
- assert_batches_sorted_eq!(expected, &results);
- }
- }
-
#[tokio::test]
async fn case_sensitive_identifiers_user_defined_functions() -> Result<()> {
let mut ctx = SessionContext::new();
@@ -2935,46 +1790,6 @@ mod tests {
Ok(())
}
- #[tokio::test]
- async fn case_sensitive_identifiers_aggregates() {
- let ctx = SessionContext::new();
- ctx.register_table("t", test::table_with_sequence(1, 1).unwrap())
- .unwrap();
-
- let expected = vec![
- "+----------+",
- "| MAX(t.i) |",
- "+----------+",
- "| 1 |",
- "+----------+",
- ];
-
- let results = plan_and_collect(&ctx, "SELECT max(i) FROM t")
- .await
- .unwrap();
-
- assert_batches_sorted_eq!(expected, &results);
-
- let results = plan_and_collect(&ctx, "SELECT MAX(i) FROM t")
- .await
- .unwrap();
- assert_batches_sorted_eq!(expected, &results);
-
- // Using double quotes allows specifying the function name with capitalization
- let err = plan_and_collect(&ctx, "SELECT \"MAX\"(i) FROM t")
- .await
- .unwrap_err();
- assert_eq!(
- err.to_string(),
- "Error during planning: Invalid function 'MAX'"
- );
-
- let results = plan_and_collect(&ctx, "SELECT \"max\"(i) FROM t")
- .await
- .unwrap();
- assert_batches_sorted_eq!(expected, &results);
- }
-
#[tokio::test]
async fn case_sensitive_identifiers_user_defined_aggregates() -> Result<()> {
let mut ctx = SessionContext::new();
@@ -3097,42 +1912,6 @@ mod tests {
Ok(())
}
- #[tokio::test]
- async fn simple_avg() -> Result<()> {
- let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
-
- let batch1 = RecordBatch::try_new(
- Arc::new(schema.clone()),
- vec![Arc::new(Int32Array::from_slice(&[1, 2, 3]))],
- )?;
- let batch2 = RecordBatch::try_new(
- Arc::new(schema.clone()),
- vec![Arc::new(Int32Array::from_slice(&[4, 5]))],
- )?;
-
- let ctx = SessionContext::new();
-
- let provider =
- MemTable::try_new(Arc::new(schema), vec![vec![batch1], vec![batch2]])?;
- ctx.register_table("t", Arc::new(provider))?;
-
- let result = plan_and_collect(&ctx, "SELECT AVG(a) FROM t").await?;
-
- let batch = &result[0];
- assert_eq!(1, batch.num_columns());
- assert_eq!(1, batch.num_rows());
-
- let values = batch
- .column(0)
- .as_any()
- .downcast_ref::<Float64Array>()
- .expect("failed to cast version");
- assert_eq!(values.len(), 1);
- // avg(1,2,3,4,5) = 3.0
- assert_eq!(values.value(0), 3.0_f64);
- Ok(())
- }
-
#[tokio::test]
async fn custom_query_planner() -> Result<()> {
let runtime = Arc::new(RuntimeEnv::new(RuntimeConfig::default()).unwrap());
@@ -3385,13 +2164,6 @@ mod tests {
ctx.sql(sql).await?.collect().await
}
- /// Execute SQL and return results
- async fn execute(sql: &str, partition_count: usize) -> Result<Vec<RecordBatch>> {
- let tmp_dir = TempDir::new()?;
- let ctx = create_ctx(&tmp_dir, partition_count).await?;
- plan_and_collect(&ctx, sql).await
- }
-
/// Generate CSV partitions within the supplied directory
fn populate_csv_partitions(
tmp_dir: &TempDir,
diff --git a/datafusion/core/tests/sql/aggregates.rs b/datafusion/core/tests/sql/aggregates.rs
index 272bb82d6..f84a7c2b3 100644
--- a/datafusion/core/tests/sql/aggregates.rs
+++ b/datafusion/core/tests/sql/aggregates.rs
@@ -804,3 +804,306 @@ async fn aggregate_timestamps_avg() -> Result<()> {
assert_eq!(results.to_string(), "Error during planning: The function Avg does not support inputs of type Timestamp(Nanosecond, None).");
Ok(())
}
+
+#[tokio::test]
+async fn aggregate_decimal_min() -> Result<()> {
+ let ctx = SessionContext::new();
+ // the data type of c1 is decimal(10,3)
+ ctx.register_table("d_table", table_with_decimal()).unwrap();
+ let result = plan_and_collect(&ctx, "select min(c1) from d_table")
+ .await
+ .unwrap();
+ let expected = vec![
+ "+-----------------+",
+ "| MIN(d_table.c1) |",
+ "+-----------------+",
+ "| -100.009 |",
+ "+-----------------+",
+ ];
+ assert_eq!(
+ &DataType::Decimal(10, 3),
+ result[0].schema().field(0).data_type()
+ );
+ assert_batches_sorted_eq!(expected, &result);
+ Ok(())
+}
+
+#[tokio::test]
+async fn aggregate_decimal_max() -> Result<()> {
+ let ctx = SessionContext::new();
+ // the data type of c1 is decimal(10,3)
+ ctx.register_table("d_table", table_with_decimal()).unwrap();
+
+ let result = plan_and_collect(&ctx, "select max(c1) from d_table")
+ .await
+ .unwrap();
+ let expected = vec![
+ "+-----------------+",
+ "| MAX(d_table.c1) |",
+ "+-----------------+",
+ "| 110.009 |",
+ "+-----------------+",
+ ];
+ assert_eq!(
+ &DataType::Decimal(10, 3),
+ result[0].schema().field(0).data_type()
+ );
+ assert_batches_sorted_eq!(expected, &result);
+ Ok(())
+}
+
+#[tokio::test]
+async fn aggregate_decimal_sum() -> Result<()> {
+ let ctx = SessionContext::new();
+ // the data type of c1 is decimal(10,3)
+ ctx.register_table("d_table", table_with_decimal()).unwrap();
+ let result = plan_and_collect(&ctx, "select sum(c1) from d_table")
+ .await
+ .unwrap();
+ let expected = vec![
+ "+-----------------+",
+ "| SUM(d_table.c1) |",
+ "+-----------------+",
+ "| 100.000 |",
+ "+-----------------+",
+ ];
+ assert_eq!(
+ &DataType::Decimal(20, 3),
+ result[0].schema().field(0).data_type()
+ );
+ assert_batches_sorted_eq!(expected, &result);
+ Ok(())
+}
+
+#[tokio::test]
+async fn aggregate_decimal_avg() -> Result<()> {
+ let ctx = SessionContext::new();
+ // the data type of c1 is decimal(10,3)
+ ctx.register_table("d_table", table_with_decimal()).unwrap();
+ let result = plan_and_collect(&ctx, "select avg(c1) from d_table")
+ .await
+ .unwrap();
+ let expected = vec![
+ "+-----------------+",
+ "| AVG(d_table.c1) |",
+ "+-----------------+",
+ "| 5.0000000 |",
+ "+-----------------+",
+ ];
+ assert_eq!(
+ &DataType::Decimal(14, 7),
+ result[0].schema().field(0).data_type()
+ );
+ assert_batches_sorted_eq!(expected, &result);
+ Ok(())
+}
+
+#[tokio::test]
+async fn aggregate() -> Result<()> {
+ let results = execute_with_partition("SELECT SUM(c1), SUM(c2) FROM test", 4).await?;
+ assert_eq!(results.len(), 1);
+
+ let expected = vec![
+ "+--------------+--------------+",
+ "| SUM(test.c1) | SUM(test.c2) |",
+ "+--------------+--------------+",
+ "| 60 | 220 |",
+ "+--------------+--------------+",
+ ];
+ assert_batches_sorted_eq!(expected, &results);
+
+ Ok(())
+}
+
+#[tokio::test]
+async fn aggregate_empty() -> Result<()> {
+ // The predicate on this query purposely generates no results
+ let results =
+ execute_with_partition("SELECT SUM(c1), SUM(c2) FROM test where c1 > 100000", 4)
+ .await
+ .unwrap();
+
+ assert_eq!(results.len(), 1);
+
+ let expected = vec![
+ "+--------------+--------------+",
+ "| SUM(test.c1) | SUM(test.c2) |",
+ "+--------------+--------------+",
+ "| | |",
+ "+--------------+--------------+",
+ ];
+ assert_batches_sorted_eq!(expected, &results);
+
+ Ok(())
+}
+
+#[tokio::test]
+async fn aggregate_avg() -> Result<()> {
+ let results = execute_with_partition("SELECT AVG(c1), AVG(c2) FROM test", 4).await?;
+ assert_eq!(results.len(), 1);
+
+ let expected = vec![
+ "+--------------+--------------+",
+ "| AVG(test.c1) | AVG(test.c2) |",
+ "+--------------+--------------+",
+ "| 1.5 | 5.5 |",
+ "+--------------+--------------+",
+ ];
+ assert_batches_sorted_eq!(expected, &results);
+
+ Ok(())
+}
+
+#[tokio::test]
+async fn aggregate_max() -> Result<()> {
+ let results = execute_with_partition("SELECT MAX(c1), MAX(c2) FROM test", 4).await?;
+ assert_eq!(results.len(), 1);
+
+ let expected = vec![
+ "+--------------+--------------+",
+ "| MAX(test.c1) | MAX(test.c2) |",
+ "+--------------+--------------+",
+ "| 3 | 10 |",
+ "+--------------+--------------+",
+ ];
+ assert_batches_sorted_eq!(expected, &results);
+
+ Ok(())
+}
+
+#[tokio::test]
+async fn aggregate_min() -> Result<()> {
+ let results = execute_with_partition("SELECT MIN(c1), MIN(c2) FROM test", 4).await?;
+ assert_eq!(results.len(), 1);
+
+ let expected = vec![
+ "+--------------+--------------+",
+ "| MIN(test.c1) | MIN(test.c2) |",
+ "+--------------+--------------+",
+ "| 0 | 1 |",
+ "+--------------+--------------+",
+ ];
+ assert_batches_sorted_eq!(expected, &results);
+
+ Ok(())
+}
+
+#[tokio::test]
+async fn aggregate_grouped() -> Result<()> {
+ let results =
+ execute_with_partition("SELECT c1, SUM(c2) FROM test GROUP BY c1", 4).await?;
+
+ let expected = vec![
+ "+----+--------------+",
+ "| c1 | SUM(test.c2) |",
+ "+----+--------------+",
+ "| 0 | 55 |",
+ "| 1 | 55 |",
+ "| 2 | 55 |",
+ "| 3 | 55 |",
+ "+----+--------------+",
+ ];
+ assert_batches_sorted_eq!(expected, &results);
+
+ Ok(())
+}
+
+#[tokio::test]
+async fn aggregate_grouped_avg() -> Result<()> {
+ let results =
+ execute_with_partition("SELECT c1, AVG(c2) FROM test GROUP BY c1", 4).await?;
+
+ let expected = vec![
+ "+----+--------------+",
+ "| c1 | AVG(test.c2) |",
+ "+----+--------------+",
+ "| 0 | 5.5 |",
+ "| 1 | 5.5 |",
+ "| 2 | 5.5 |",
+ "| 3 | 5.5 |",
+ "+----+--------------+",
+ ];
+ assert_batches_sorted_eq!(expected, &results);
+
+ Ok(())
+}
+
+#[tokio::test]
+async fn aggregate_grouped_empty() -> Result<()> {
+ let results = execute_with_partition(
+ "SELECT c1, AVG(c2) FROM test WHERE c1 = 123 GROUP BY c1",
+ 4,
+ )
+ .await?;
+
+ let expected = vec![
+ "+----+--------------+",
+ "| c1 | AVG(test.c2) |",
+ "+----+--------------+",
+ "+----+--------------+",
+ ];
+ assert_batches_sorted_eq!(expected, &results);
+
+ Ok(())
+}
+
+#[tokio::test]
+async fn aggregate_grouped_max() -> Result<()> {
+ let results =
+ execute_with_partition("SELECT c1, MAX(c2) FROM test GROUP BY c1", 4).await?;
+
+ let expected = vec![
+ "+----+--------------+",
+ "| c1 | MAX(test.c2) |",
+ "+----+--------------+",
+ "| 0 | 10 |",
+ "| 1 | 10 |",
+ "| 2 | 10 |",
+ "| 3 | 10 |",
+ "+----+--------------+",
+ ];
+ assert_batches_sorted_eq!(expected, &results);
+
+ Ok(())
+}
+
+#[tokio::test]
+async fn aggregate_grouped_min() -> Result<()> {
+ let results =
+ execute_with_partition("SELECT c1, MIN(c2) FROM test GROUP BY c1", 4).await?;
+
+ let expected = vec![
+ "+----+--------------+",
+ "| c1 | MIN(test.c2) |",
+ "+----+--------------+",
+ "| 0 | 1 |",
+ "| 1 | 1 |",
+ "| 2 | 1 |",
+ "| 3 | 1 |",
+ "+----+--------------+",
+ ];
+ assert_batches_sorted_eq!(expected, &results);
+
+ Ok(())
+}
+
+#[tokio::test]
+async fn aggregate_avg_add() -> Result<()> {
+ let results = execute_with_partition(
+ "SELECT AVG(c1), AVG(c1) + 1, AVG(c1) + 2, 1 + AVG(c1) FROM test",
+ 4,
+ )
+ .await?;
+ assert_eq!(results.len(), 1);
+
+ let expected = vec![
+ "+--------------+----------------------------+----------------------------+----------------------------+",
+ "| AVG(test.c1) | AVG(test.c1) Plus Int64(1) | AVG(test.c1) Plus Int64(2) | Int64(1) Plus AVG(test.c1) |",
+ "+--------------+----------------------------+----------------------------+----------------------------+",
+ "| 1.5 | 2.5 | 3.5 | 2.5 |",
+ "+--------------+----------------------------+----------------------------+----------------------------+",
+ ];
+ assert_batches_sorted_eq!(expected, &results);
+
+ Ok(())
+}
diff --git a/datafusion/core/tests/sql/functions.rs b/datafusion/core/tests/sql/functions.rs
index 226bb8159..06ddea09d 100644
--- a/datafusion/core/tests/sql/functions.rs
+++ b/datafusion/core/tests/sql/functions.rs
@@ -357,3 +357,240 @@ async fn coalesce_mul_with_default_value() -> Result<()> {
assert_batches_eq!(expected, &actual);
Ok(())
}
+
+#[tokio::test]
+async fn count_basic() -> Result<()> {
+ let results =
+ execute_with_partition("SELECT COUNT(c1), COUNT(c2) FROM test", 1).await?;
+ assert_eq!(results.len(), 1);
+
+ let expected = vec![
+ "+----------------+----------------+",
+ "| COUNT(test.c1) | COUNT(test.c2) |",
+ "+----------------+----------------+",
+ "| 10 | 10 |",
+ "+----------------+----------------+",
+ ];
+ assert_batches_sorted_eq!(expected, &results);
+ Ok(())
+}
+
+#[tokio::test]
+async fn count_partitioned() -> Result<()> {
+ let results =
+ execute_with_partition("SELECT COUNT(c1), COUNT(c2) FROM test", 4).await?;
+ assert_eq!(results.len(), 1);
+
+ let expected = vec![
+ "+----------------+----------------+",
+ "| COUNT(test.c1) | COUNT(test.c2) |",
+ "+----------------+----------------+",
+ "| 40 | 40 |",
+ "+----------------+----------------+",
+ ];
+ assert_batches_sorted_eq!(expected, &results);
+ Ok(())
+}
+
+#[tokio::test]
+async fn count_aggregated() -> Result<()> {
+ let results =
+ execute_with_partition("SELECT c1, COUNT(c2) FROM test GROUP BY c1", 4).await?;
+
+ let expected = vec![
+ "+----+----------------+",
+ "| c1 | COUNT(test.c2) |",
+ "+----+----------------+",
+ "| 0 | 10 |",
+ "| 1 | 10 |",
+ "| 2 | 10 |",
+ "| 3 | 10 |",
+ "+----+----------------+",
+ ];
+ assert_batches_sorted_eq!(expected, &results);
+ Ok(())
+}
+
+#[tokio::test]
+async fn simple_avg() -> Result<()> {
+ let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
+
+ let batch1 = RecordBatch::try_new(
+ Arc::new(schema.clone()),
+ vec![Arc::new(Int32Array::from_slice(&[1, 2, 3]))],
+ )?;
+ let batch2 = RecordBatch::try_new(
+ Arc::new(schema.clone()),
+ vec![Arc::new(Int32Array::from_slice(&[4, 5]))],
+ )?;
+
+ let ctx = SessionContext::new();
+
+ let provider = MemTable::try_new(Arc::new(schema), vec![vec![batch1], vec![batch2]])?;
+ ctx.register_table("t", Arc::new(provider))?;
+
+ let result = plan_and_collect(&ctx, "SELECT AVG(a) FROM t").await?;
+
+ let batch = &result[0];
+ assert_eq!(1, batch.num_columns());
+ assert_eq!(1, batch.num_rows());
+
+ let values = batch
+ .column(0)
+ .as_any()
+ .downcast_ref::<Float64Array>()
+ .expect("failed to cast version");
+ assert_eq!(values.len(), 1);
+ // avg(1,2,3,4,5) = 3.0
+ assert_eq!(values.value(0), 3.0_f64);
+ Ok(())
+}
+
+#[tokio::test]
+async fn case_sensitive_identifiers_functions() {
+ let ctx = SessionContext::new();
+ ctx.register_table("t", table_with_sequence(1, 1).unwrap())
+ .unwrap();
+
+ let expected = vec![
+ "+-----------+",
+ "| sqrt(t.i) |",
+ "+-----------+",
+ "| 1 |",
+ "+-----------+",
+ ];
+
+ let results = plan_and_collect(&ctx, "SELECT sqrt(i) FROM t")
+ .await
+ .unwrap();
+
+ assert_batches_sorted_eq!(expected, &results);
+
+ let results = plan_and_collect(&ctx, "SELECT SQRT(i) FROM t")
+ .await
+ .unwrap();
+ assert_batches_sorted_eq!(expected, &results);
+
+ // Using double quotes allows specifying the function name with capitalization
+ let err = plan_and_collect(&ctx, "SELECT \"SQRT\"(i) FROM t")
+ .await
+ .unwrap_err();
+ assert_eq!(
+ err.to_string(),
+ "Error during planning: Invalid function 'SQRT'"
+ );
+
+ let results = plan_and_collect(&ctx, "SELECT \"sqrt\"(i) FROM t")
+ .await
+ .unwrap();
+ assert_batches_sorted_eq!(expected, &results);
+}
+
+#[tokio::test]
+async fn case_builtin_math_expression() {
+ let ctx = SessionContext::new();
+
+ let type_values = vec![
+ (
+ DataType::Int8,
+ Arc::new(Int8Array::from_slice(&[1])) as ArrayRef,
+ ),
+ (
+ DataType::Int16,
+ Arc::new(Int16Array::from_slice(&[1])) as ArrayRef,
+ ),
+ (
+ DataType::Int32,
+ Arc::new(Int32Array::from_slice(&[1])) as ArrayRef,
+ ),
+ (
+ DataType::Int64,
+ Arc::new(Int64Array::from_slice(&[1])) as ArrayRef,
+ ),
+ (
+ DataType::UInt8,
+ Arc::new(UInt8Array::from_slice(&[1])) as ArrayRef,
+ ),
+ (
+ DataType::UInt16,
+ Arc::new(UInt16Array::from_slice(&[1])) as ArrayRef,
+ ),
+ (
+ DataType::UInt32,
+ Arc::new(UInt32Array::from_slice(&[1])) as ArrayRef,
+ ),
+ (
+ DataType::UInt64,
+ Arc::new(UInt64Array::from_slice(&[1])) as ArrayRef,
+ ),
+ (
+ DataType::Float32,
+ Arc::new(Float32Array::from_slice(&[1.0_f32])) as ArrayRef,
+ ),
+ (
+ DataType::Float64,
+ Arc::new(Float64Array::from_slice(&[1.0_f64])) as ArrayRef,
+ ),
+ ];
+
+ for (data_type, array) in type_values.iter() {
+ let schema =
+ Arc::new(Schema::new(vec![Field::new("v", data_type.clone(), false)]));
+ let batch = RecordBatch::try_new(schema.clone(), vec![array.clone()]).unwrap();
+ let provider = MemTable::try_new(schema, vec![vec![batch]]).unwrap();
+ ctx.deregister_table("t").unwrap();
+ ctx.register_table("t", Arc::new(provider)).unwrap();
+ let expected = vec![
+ "+-----------+",
+ "| sqrt(t.v) |",
+ "+-----------+",
+ "| 1 |",
+ "+-----------+",
+ ];
+ let results = plan_and_collect(&ctx, "SELECT sqrt(v) FROM t")
+ .await
+ .unwrap();
+
+ assert_batches_sorted_eq!(expected, &results);
+ }
+}
+
+#[tokio::test]
+async fn case_sensitive_identifiers_aggregates() {
+ let ctx = SessionContext::new();
+ ctx.register_table("t", table_with_sequence(1, 1).unwrap())
+ .unwrap();
+
+ let expected = vec![
+ "+----------+",
+ "| MAX(t.i) |",
+ "+----------+",
+ "| 1 |",
+ "+----------+",
+ ];
+
+ let results = plan_and_collect(&ctx, "SELECT max(i) FROM t")
+ .await
+ .unwrap();
+
+ assert_batches_sorted_eq!(expected, &results);
+
+ let results = plan_and_collect(&ctx, "SELECT MAX(i) FROM t")
+ .await
+ .unwrap();
+ assert_batches_sorted_eq!(expected, &results);
+
+ // Using double quotes allows specifying the function name with capitalization
+ let err = plan_and_collect(&ctx, "SELECT \"MAX\"(i) FROM t")
+ .await
+ .unwrap_err();
+ assert_eq!(
+ err.to_string(),
+ "Error during planning: Invalid function 'MAX'"
+ );
+
+ let results = plan_and_collect(&ctx, "SELECT \"max\"(i) FROM t")
+ .await
+ .unwrap();
+ assert_batches_sorted_eq!(expected, &results);
+}
diff --git a/datafusion/core/tests/sql/group_by.rs b/datafusion/core/tests/sql/group_by.rs
index 78323310a..7f38fbda8 100644
--- a/datafusion/core/tests/sql/group_by.rs
+++ b/datafusion/core/tests/sql/group_by.rs
@@ -442,3 +442,199 @@ async fn csv_group_by_date() -> Result<()> {
assert_batches_sorted_eq!(expected, &actual);
Ok(())
}
+
+#[tokio::test]
+async fn group_by_date_trunc() -> Result<()> {
+ let tmp_dir = TempDir::new()?;
+ let ctx = SessionContext::new();
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("c2", DataType::UInt64, false),
+ Field::new(
+ "t1",
+ DataType::Timestamp(TimeUnit::Microsecond, None),
+ false,
+ ),
+ ]));
+
+ // generate a partitioned file
+ for partition in 0..4 {
+ let filename = format!("partition-{}.{}", partition, "csv");
+ let file_path = tmp_dir.path().join(&filename);
+ let mut file = File::create(file_path)?;
+
+ // generate some data
+ for i in 0..10 {
+ let data = format!("{},2020-12-{}T00:00:00.000Z\n", i, i + 10);
+ file.write_all(data.as_bytes())?;
+ }
+ }
+
+ ctx.register_csv(
+ "test",
+ tmp_dir.path().to_str().unwrap(),
+ CsvReadOptions::new().schema(&schema).has_header(false),
+ )
+ .await?;
+
+ let results = plan_and_collect(
+ &ctx,
+ "SELECT date_trunc('week', t1) as week, SUM(c2) FROM test GROUP BY date_trunc('week', t1)",
+ ).await?;
+
+ let expected = vec![
+ "+---------------------+--------------+",
+ "| week | SUM(test.c2) |",
+ "+---------------------+--------------+",
+ "| 2020-12-07 00:00:00 | 24 |",
+ "| 2020-12-14 00:00:00 | 156 |",
+ "+---------------------+--------------+",
+ ];
+ assert_batches_sorted_eq!(expected, &results);
+
+ Ok(())
+}
+
+#[tokio::test]
+async fn group_by_largeutf8() {
+ {
+ let ctx = SessionContext::new();
+
+ // input data looks like:
+ // A, 1
+ // B, 2
+ // A, 2
+ // A, 4
+ // C, 1
+ // A, 1
+
+ let str_array: LargeStringArray = vec!["A", "B", "A", "A", "C", "A"]
+ .into_iter()
+ .map(Some)
+ .collect();
+ let str_array = Arc::new(str_array);
+
+ let val_array: Int64Array = vec![1, 2, 2, 4, 1, 1].into();
+ let val_array = Arc::new(val_array);
+
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("str", str_array.data_type().clone(), false),
+ Field::new("val", val_array.data_type().clone(), false),
+ ]));
+
+ let batch =
+ RecordBatch::try_new(schema.clone(), vec![str_array, val_array]).unwrap();
+
+ let provider = MemTable::try_new(schema.clone(), vec![vec![batch]]).unwrap();
+ ctx.register_table("t", Arc::new(provider)).unwrap();
+
+ let results =
+ plan_and_collect(&ctx, "SELECT str, count(val) FROM t GROUP BY str")
+ .await
+ .expect("ran plan correctly");
+
+ let expected = vec![
+ "+-----+--------------+",
+ "| str | COUNT(t.val) |",
+ "+-----+--------------+",
+ "| A | 4 |",
+ "| B | 1 |",
+ "| C | 1 |",
+ "+-----+--------------+",
+ ];
+ assert_batches_sorted_eq!(expected, &results);
+ }
+}
+
+#[tokio::test]
+async fn group_by_dictionary() {
+ async fn run_test_case<K: ArrowDictionaryKeyType>() {
+ let ctx = SessionContext::new();
+
+ // input data looks like:
+ // A, 1
+ // B, 2
+ // A, 2
+ // A, 4
+ // C, 1
+ // A, 1
+
+ let dict_array: DictionaryArray<K> =
+ vec!["A", "B", "A", "A", "C", "A"].into_iter().collect();
+ let dict_array = Arc::new(dict_array);
+
+ let val_array: Int64Array = vec![1, 2, 2, 4, 1, 1].into();
+ let val_array = Arc::new(val_array);
+
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("dict", dict_array.data_type().clone(), false),
+ Field::new("val", val_array.data_type().clone(), false),
+ ]));
+
+ let batch =
+ RecordBatch::try_new(schema.clone(), vec![dict_array, val_array]).unwrap();
+
+ let provider = MemTable::try_new(schema.clone(), vec![vec![batch]]).unwrap();
+ ctx.register_table("t", Arc::new(provider)).unwrap();
+
+ let results =
+ plan_and_collect(&ctx, "SELECT dict, count(val) FROM t GROUP BY dict")
+ .await
+ .expect("ran plan correctly");
+
+ let expected = vec![
+ "+------+--------------+",
+ "| dict | COUNT(t.val) |",
+ "+------+--------------+",
+ "| A | 4 |",
+ "| B | 1 |",
+ "| C | 1 |",
+ "+------+--------------+",
+ ];
+ assert_batches_sorted_eq!(expected, &results);
+
+ // Now, use dict as an aggregate
+ let results =
+ plan_and_collect(&ctx, "SELECT val, count(dict) FROM t GROUP BY val")
+ .await
+ .expect("ran plan correctly");
+
+ let expected = vec![
+ "+-----+---------------+",
+ "| val | COUNT(t.dict) |",
+ "+-----+---------------+",
+ "| 1 | 3 |",
+ "| 2 | 2 |",
+ "| 4 | 1 |",
+ "+-----+---------------+",
+ ];
+ assert_batches_sorted_eq!(expected, &results);
+
+ // Now, use dict as an aggregate
+ let results = plan_and_collect(
+ &ctx,
+ "SELECT val, count(distinct dict) FROM t GROUP BY val",
+ )
+ .await
+ .expect("ran plan correctly");
+
+ let expected = vec![
+ "+-----+------------------------+",
+ "| val | COUNT(DISTINCT t.dict) |",
+ "+-----+------------------------+",
+ "| 1 | 2 |",
+ "| 2 | 2 |",
+ "| 4 | 1 |",
+ "+-----+------------------------+",
+ ];
+ assert_batches_sorted_eq!(expected, &results);
+ }
+
+ run_test_case::<Int8Type>().await;
+ run_test_case::<Int16Type>().await;
+ run_test_case::<Int32Type>().await;
+ run_test_case::<Int64Type>().await;
+ run_test_case::<UInt8Type>().await;
+ run_test_case::<UInt16Type>().await;
+ run_test_case::<UInt32Type>().await;
+ run_test_case::<UInt64Type>().await;
+}
diff --git a/datafusion/core/tests/sql/joins.rs b/datafusion/core/tests/sql/joins.rs
index 6bfc3f384..aaa8adac5 100644
--- a/datafusion/core/tests/sql/joins.rs
+++ b/datafusion/core/tests/sql/joins.rs
@@ -1020,3 +1020,110 @@ async fn left_join_should_not_panic_with_empty_side() -> Result<()> {
Ok(())
}
+
+#[tokio::test]
+async fn left_join_using_2() -> Result<()> {
+ let results = execute_with_partition(
+ "SELECT t1.c1, t2.c2 FROM test t1 JOIN test t2 USING (c2) ORDER BY t2.c2",
+ 1,
+ )
+ .await?;
+ assert_eq!(results.len(), 1);
+
+ let expected = vec![
+ "+----+----+",
+ "| c1 | c2 |",
+ "+----+----+",
+ "| 0 | 1 |",
+ "| 0 | 2 |",
+ "| 0 | 3 |",
+ "| 0 | 4 |",
+ "| 0 | 5 |",
+ "| 0 | 6 |",
+ "| 0 | 7 |",
+ "| 0 | 8 |",
+ "| 0 | 9 |",
+ "| 0 | 10 |",
+ "+----+----+",
+ ];
+
+ assert_batches_eq!(expected, &results);
+ Ok(())
+}
+
+#[tokio::test]
+async fn left_join_using_join_key_projection() -> Result<()> {
+ let results = execute_with_partition(
+ "SELECT t1.c1, t1.c2, t2.c2 FROM test t1 JOIN test t2 USING (c2) ORDER BY t2.c2",
+ 1,
+ )
+ .await?;
+ assert_eq!(results.len(), 1);
+
+ let expected = vec![
+ "+----+----+----+",
+ "| c1 | c2 | c2 |",
+ "+----+----+----+",
+ "| 0 | 1 | 1 |",
+ "| 0 | 2 | 2 |",
+ "| 0 | 3 | 3 |",
+ "| 0 | 4 | 4 |",
+ "| 0 | 5 | 5 |",
+ "| 0 | 6 | 6 |",
+ "| 0 | 7 | 7 |",
+ "| 0 | 8 | 8 |",
+ "| 0 | 9 | 9 |",
+ "| 0 | 10 | 10 |",
+ "+----+----+----+",
+ ];
+
+ assert_batches_eq!(expected, &results);
+ Ok(())
+}
+
+#[tokio::test]
+async fn left_join_2() -> Result<()> {
+ let results = execute_with_partition(
+ "SELECT t1.c1, t1.c2, t2.c2 FROM test t1 JOIN test t2 ON t1.c2 = t2.c2 ORDER BY t1.c2",
+ 1,
+ )
+ .await?;
+ assert_eq!(results.len(), 1);
+
+ let expected = vec![
+ "+----+----+----+",
+ "| c1 | c2 | c2 |",
+ "+----+----+----+",
+ "| 0 | 1 | 1 |",
+ "| 0 | 2 | 2 |",
+ "| 0 | 3 | 3 |",
+ "| 0 | 4 | 4 |",
+ "| 0 | 5 | 5 |",
+ "| 0 | 6 | 6 |",
+ "| 0 | 7 | 7 |",
+ "| 0 | 8 | 8 |",
+ "| 0 | 9 | 9 |",
+ "| 0 | 10 | 10 |",
+ "+----+----+----+",
+ ];
+
+ assert_batches_eq!(expected, &results);
+ Ok(())
+}
+
+#[tokio::test]
+async fn join_partitioned() -> Result<()> {
+ // self join on partition id (workaround for duplicate column name)
+ let results = execute_with_partition(
+ "SELECT 1 FROM test JOIN (SELECT c1 AS id1 FROM test) AS a ON c1=id1",
+ 4,
+ )
+ .await?;
+
+ assert_eq!(
+ results.iter().map(|b| b.num_rows()).sum::<usize>(),
+ 4 * 10 * 10
+ );
+
+ Ok(())
+}
diff --git a/datafusion/core/tests/sql/limit.rs b/datafusion/core/tests/sql/limit.rs
index fc2dc4c95..e3b1466bc 100644
--- a/datafusion/core/tests/sql/limit.rs
+++ b/datafusion/core/tests/sql/limit.rs
@@ -89,3 +89,77 @@ async fn csv_query_limit_zero() -> Result<()> {
assert_batches_eq!(expected, &actual);
Ok(())
}
+
+#[tokio::test]
+async fn limit() -> Result<()> {
+ let tmp_dir = TempDir::new()?;
+ let ctx = create_ctx_with_partition(&tmp_dir, 1).await?;
+ ctx.register_table("t", table_with_sequence(1, 1000).unwrap())
+ .unwrap();
+
+ let results = plan_and_collect(&ctx, "SELECT i FROM t ORDER BY i DESC limit 3")
+ .await
+ .unwrap();
+
+ let expected = vec![
+ "+------+", "| i |", "+------+", "| 1000 |", "| 999 |", "| 998 |",
+ "+------+",
+ ];
+
+ assert_batches_eq!(expected, &results);
+
+ let results = plan_and_collect(&ctx, "SELECT i FROM t ORDER BY i limit 3")
+ .await
+ .unwrap();
+
+ let expected = vec![
+ "+---+", "| i |", "+---+", "| 1 |", "| 2 |", "| 3 |", "+---+",
+ ];
+
+ assert_batches_eq!(expected, &results);
+
+ let results = plan_and_collect(&ctx, "SELECT i FROM t limit 3")
+ .await
+ .unwrap();
+
+ // the actual rows are not guaranteed, so only check the count (should be 3)
+ let num_rows: usize = results.into_iter().map(|b| b.num_rows()).sum();
+ assert_eq!(num_rows, 3);
+
+ Ok(())
+}
+
+#[tokio::test]
+async fn limit_multi_partitions() -> Result<()> {
+ let tmp_dir = TempDir::new()?;
+ let ctx = create_ctx_with_partition(&tmp_dir, 1).await?;
+
+ let partitions = vec![
+ vec![make_partition(0)],
+ vec![make_partition(1)],
+ vec![make_partition(2)],
+ vec![make_partition(3)],
+ vec![make_partition(4)],
+ vec![make_partition(5)],
+ ];
+ let schema = partitions[0][0].schema();
+ let provider = Arc::new(MemTable::try_new(schema, partitions).unwrap());
+
+ ctx.register_table("t", provider).unwrap();
+
+ // select all rows
+ let results = plan_and_collect(&ctx, "SELECT i FROM t").await.unwrap();
+
+ let num_rows: usize = results.into_iter().map(|b| b.num_rows()).sum();
+ assert_eq!(num_rows, 15);
+
+ for limit in 1..10 {
+ let query = format!("SELECT i FROM t limit {}", limit);
+ let results = plan_and_collect(&ctx, &query).await.unwrap();
+
+ let num_rows: usize = results.into_iter().map(|b| b.num_rows()).sum();
+ assert_eq!(num_rows, limit, "mismatch with query {}", query);
+ }
+
+ Ok(())
+}
diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs
index 300ad0d9f..12570a419 100644
--- a/datafusion/core/tests/sql/mod.rs
+++ b/datafusion/core/tests/sql/mod.rs
@@ -46,6 +46,9 @@ use datafusion::{
};
use datafusion::{execution::context::SessionContext, physical_plan::displayable};
use datafusion_expr::Volatility;
+use std::fs::File;
+use std::io::Write;
+use tempfile::TempDir;
/// A macro to assert that some particular line contains two substrings
///
@@ -556,6 +559,98 @@ async fn execute(ctx: &SessionContext, sql: &str) -> Vec<Vec<String>> {
result_vec(&execute_to_batches(ctx, sql).await)
}
+/// Execute SQL and return results
+async fn execute_with_partition(
+ sql: &str,
+ partition_count: usize,
+) -> Result<Vec<RecordBatch>> {
+ let tmp_dir = TempDir::new()?;
+ let ctx = create_ctx_with_partition(&tmp_dir, partition_count).await?;
+ plan_and_collect(&ctx, sql).await
+}
+
+/// Generate a partitioned CSV file and register it with an execution context
+async fn create_ctx_with_partition(
+ tmp_dir: &TempDir,
+ partition_count: usize,
+) -> Result<SessionContext> {
+ let ctx = SessionContext::with_config(SessionConfig::new().with_target_partitions(8));
+
+ let schema = populate_csv_partitions(tmp_dir, partition_count, ".csv")?;
+
+ // register csv file with the execution context
+ ctx.register_csv(
+ "test",
+ tmp_dir.path().to_str().unwrap(),
+ CsvReadOptions::new().schema(&schema),
+ )
+ .await?;
+
+ Ok(ctx)
+}
+
+/// Generate CSV partitions within the supplied directory
+fn populate_csv_partitions(
+ tmp_dir: &TempDir,
+ partition_count: usize,
+ file_extension: &str,
+) -> Result<SchemaRef> {
+ // define schema for data source (csv file)
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("c1", DataType::UInt32, false),
+ Field::new("c2", DataType::UInt64, false),
+ Field::new("c3", DataType::Boolean, false),
+ ]));
+
+ // generate a partitioned file
+ for partition in 0..partition_count {
+ let filename = format!("partition-{}.{}", partition, file_extension);
+ let file_path = tmp_dir.path().join(&filename);
+ let mut file = File::create(file_path)?;
+
+ // generate some data
+ for i in 0..=10 {
+ let data = format!("{},{},{}\n", partition, i, i % 2 == 0);
+ file.write_all(data.as_bytes())?;
+ }
+ }
+
+ Ok(schema)
+}
+
+/// Return a new table which provide this decimal column
+pub fn table_with_decimal() -> Arc<dyn TableProvider> {
+ let batch_decimal = make_decimal();
+ let schema = batch_decimal.schema();
+ let partitions = vec![vec![batch_decimal]];
+ Arc::new(MemTable::try_new(schema, partitions).unwrap())
+}
+
+fn make_decimal() -> RecordBatch {
+ let mut decimal_builder = DecimalBuilder::new(20, 10, 3);
+ for i in 110000..110010 {
+ decimal_builder.append_value(i as i128).unwrap();
+ }
+ for i in 100000..100010 {
+ decimal_builder.append_value(-i as i128).unwrap();
+ }
+ let array = decimal_builder.finish();
+ let schema = Schema::new(vec![Field::new("c1", array.data_type().clone(), true)]);
+ RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap()
+}
+
+/// Return a RecordBatch with a single Int32 array with values (0..sz)
+pub fn make_partition(sz: i32) -> RecordBatch {
+ let seq_start = 0;
+ let seq_end = sz;
+ let values = (seq_start..seq_end).collect::<Vec<_>>();
+ let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
+ let arr = Arc::new(Int32Array::from(values));
+ let arr = arr as ArrayRef;
+
+ RecordBatch::try_new(schema, vec![arr]).unwrap()
+}
+
/// Specialised String representation
fn col_str(column: &ArrayRef, row_index: usize) -> String {
if column.is_null(row_index) {
diff --git a/datafusion/core/tests/sql/select.rs b/datafusion/core/tests/sql/select.rs
index 8ce74ebf6..f6cf74257 100644
--- a/datafusion/core/tests/sql/select.rs
+++ b/datafusion/core/tests/sql/select.rs
@@ -1008,3 +1008,149 @@ async fn query_empty_table() {
let expected = vec!["++", "++"];
assert_batches_sorted_eq!(expected, &result);
}
+
+#[tokio::test]
+async fn boolean_literal() -> Result<()> {
+ let results =
+ execute_with_partition("SELECT c1, c3 FROM test WHERE c1 > 2 AND c3 = true", 4)
+ .await?;
+
+ let expected = vec![
+ "+----+------+",
+ "| c1 | c3 |",
+ "+----+------+",
+ "| 3 | true |",
+ "| 3 | true |",
+ "| 3 | true |",
+ "| 3 | true |",
+ "| 3 | true |",
+ "+----+------+",
+ ];
+ assert_batches_sorted_eq!(expected, &results);
+
+ Ok(())
+}
+
+async fn run_count_distinct_integers_aggregated_scenario(
+ partitions: Vec<Vec<(&str, u64)>>,
+) -> Result<Vec<RecordBatch>> {
+ let tmp_dir = TempDir::new()?;
+ let ctx = SessionContext::new();
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("c_group", DataType::Utf8, false),
+ Field::new("c_int8", DataType::Int8, false),
+ Field::new("c_int16", DataType::Int16, false),
+ Field::new("c_int32", DataType::Int32, false),
+ Field::new("c_int64", DataType::Int64, false),
+ Field::new("c_uint8", DataType::UInt8, false),
+ Field::new("c_uint16", DataType::UInt16, false),
+ Field::new("c_uint32", DataType::UInt32, false),
+ Field::new("c_uint64", DataType::UInt64, false),
+ ]));
+
+ for (i, partition) in partitions.iter().enumerate() {
+ let filename = format!("partition-{}.csv", i);
+ let file_path = tmp_dir.path().join(&filename);
+ let mut file = File::create(file_path)?;
+ for row in partition {
+ let row_str = format!(
+ "{},{}\n",
+ row.0,
+ // Populate values for each of the integer fields in the
+ // schema.
+ (0..8)
+ .map(|_| { row.1.to_string() })
+ .collect::<Vec<_>>()
+ .join(","),
+ );
+ file.write_all(row_str.as_bytes())?;
+ }
+ }
+ ctx.register_csv(
+ "test",
+ tmp_dir.path().to_str().unwrap(),
+ CsvReadOptions::new().schema(&schema).has_header(false),
+ )
+ .await?;
+
+ let results = plan_and_collect(
+ &ctx,
+ "
+ SELECT
+ c_group,
+ COUNT(c_uint64),
+ COUNT(DISTINCT c_int8),
+ COUNT(DISTINCT c_int16),
+ COUNT(DISTINCT c_int32),
+ COUNT(DISTINCT c_int64),
+ COUNT(DISTINCT c_uint8),
+ COUNT(DISTINCT c_uint16),
+ COUNT(DISTINCT c_uint32),
+ COUNT(DISTINCT c_uint64)
+ FROM test
+ GROUP BY c_group
+ ",
+ )
+ .await?;
+
+ Ok(results)
+}
+
+#[tokio::test]
+async fn count_distinct_integers_aggregated_single_partition() -> Result<()> {
+ let partitions = vec![
+ // The first member of each tuple will be the value for the
+ // `c_group` column, and the second member will be the value for
+ // each of the int/uint fields.
+ vec![
+ ("a", 1),
+ ("a", 1),
+ ("a", 2),
+ ("b", 9),
+ ("c", 9),
+ ("c", 10),
+ ("c", 9),
+ ],
+ ];
+
+ let results = run_count_distinct_integers_aggregated_scenario(partitions).await?;
+
+ let expected = vec![
+ "+---------+----------------------+-----------------------------+------------------------------+------------------------------+------------------------------+------------------------------+-------------------------------+-------------------------------+-------------------------------+",
+ "| c_group | COUNT(test.c_uint64) | COUNT(DISTINCT test.c_int8) | COUNT(DISTINCT test.c_int16) | COUNT(DISTINCT test.c_int32) | COUNT(DISTINCT test.c_int64) | COUNT(DISTINCT test.c_uint8) | COUNT(DISTINCT test.c_uint16) | COUNT(DISTINCT test.c_uint32) | COUNT(DISTINCT test.c_uint64) |",
+ "+---------+----------------------+-----------------------------+------------------------------+------------------------------+------------------------------+------------------------------+-------------------------------+-------------------------------+-------------------------------+",
+ "| a | 3 | 2 | 2 | 2 | 2 | 2 | 2 | 2 | 2 |",
+ "| b | 1 | 1 | 1 | 1 | 1 | 1 | 1 | 1 | 1 |",
+ "| c | 3 | 2 | 2 | 2 | 2 | 2 | 2 | 2 | 2 |",
+ "+---------+----------------------+-----------------------------+------------------------------+------------------------------+------------------------------+------------------------------+-------------------------------+-------------------------------+-------------------------------+",
+ ];
+ assert_batches_sorted_eq!(expected, &results);
+
+ Ok(())
+}
+
+#[tokio::test]
+async fn count_distinct_integers_aggregated_multiple_partitions() -> Result<()> {
+ let partitions = vec![
+ // The first member of each tuple will be the value for the
+ // `c_group` column, and the second member will be the value for
+ // each of the int/uint fields.
+ vec![("a", 1), ("a", 1), ("a", 2), ("b", 9), ("c", 9)],
+ vec![("a", 1), ("a", 3), ("b", 8), ("b", 9), ("b", 10), ("b", 11)],
+ ];
+
+ let results = run_count_distinct_integers_aggregated_scenario(partitions).await?;
+
+ let expected = vec![
+ "+---------+----------------------+-----------------------------+------------------------------+------------------------------+------------------------------+------------------------------+-------------------------------+-------------------------------+-------------------------------+",
+ "| c_group | COUNT(test.c_uint64) | COUNT(DISTINCT test.c_int8) | COUNT(DISTINCT test.c_int16) | COUNT(DISTINCT test.c_int32) | COUNT(DISTINCT test.c_int64) | COUNT(DISTINCT test.c_uint8) | COUNT(DISTINCT test.c_uint16) | COUNT(DISTINCT test.c_uint32) | COUNT(DISTINCT test.c_uint64) |",
+ "+---------+----------------------+-----------------------------+------------------------------+------------------------------+------------------------------+------------------------------+-------------------------------+-------------------------------+-------------------------------+",
+ "| a | 5 | 3 | 3 | 3 | 3 | 3 | 3 | 3 | 3 |",
+ "| b | 5 | 4 | 4 | 4 | 4 | 4 | 4 | 4 | 4 |",
+ "| c | 1 | 1 | 1 | 1 | 1 | 1 | 1 | 1 | 1 |",
+ "+---------+----------------------+-----------------------------+------------------------------+------------------------------+------------------------------+------------------------------+-------------------------------+-------------------------------+-------------------------------+",
+ ];
+ assert_batches_sorted_eq!(expected, &results);
+
+ Ok(())
+}
diff --git a/datafusion/core/tests/sql/window.rs b/datafusion/core/tests/sql/window.rs
index ba493e6fe..7e36177fd 100644
--- a/datafusion/core/tests/sql/window.rs
+++ b/datafusion/core/tests/sql/window.rs
@@ -142,3 +142,159 @@ async fn csv_query_window_with_partition_by_order_by() -> Result<()> {
assert_batches_eq!(expected, &actual);
Ok(())
}
+
+#[tokio::test]
+async fn window() -> Result<()> {
+ let results = execute_with_partition(
+ "SELECT \
+ c1, \
+ c2, \
+ SUM(c2) OVER (), \
+ COUNT(c2) OVER (), \
+ MAX(c2) OVER (), \
+ MIN(c2) OVER (), \
+ AVG(c2) OVER () \
+ FROM test \
+ ORDER BY c1, c2 \
+ LIMIT 5",
+ 4,
+ )
+ .await?;
+ // result in one batch, although e.g. having 2 batches do not change
+ // result semantics, having a len=1 assertion upfront keeps surprises
+ // at bay
+ assert_eq!(results.len(), 1);
+
+ let expected = vec![
+ "+----+----+--------------+----------------+--------------+--------------+--------------+",
+ "| c1 | c2 | SUM(test.c2) | COUNT(test.c2) | MAX(test.c2) | MIN(test.c2) | AVG(test.c2) |",
+ "+----+----+--------------+----------------+--------------+--------------+--------------+",
+ "| 0 | 1 | 220 | 40 | 10 | 1 | 5.5 |",
+ "| 0 | 2 | 220 | 40 | 10 | 1 | 5.5 |",
+ "| 0 | 3 | 220 | 40 | 10 | 1 | 5.5 |",
+ "| 0 | 4 | 220 | 40 | 10 | 1 | 5.5 |",
+ "| 0 | 5 | 220 | 40 | 10 | 1 | 5.5 |",
+ "+----+----+--------------+----------------+--------------+--------------+--------------+",
+ ];
+
+ // window function shall respect ordering
+ assert_batches_eq!(expected, &results);
+ Ok(())
+}
+
+#[tokio::test]
+async fn window_order_by() -> Result<()> {
+ let results = execute_with_partition(
+ "SELECT \
+ c1, \
+ c2, \
+ ROW_NUMBER() OVER (ORDER BY c1, c2), \
+ FIRST_VALUE(c2) OVER (ORDER BY c1, c2), \
+ LAST_VALUE(c2) OVER (ORDER BY c1, c2), \
+ NTH_VALUE(c2, 2) OVER (ORDER BY c1, c2), \
+ SUM(c2) OVER (ORDER BY c1, c2), \
+ COUNT(c2) OVER (ORDER BY c1, c2), \
+ MAX(c2) OVER (ORDER BY c1, c2), \
+ MIN(c2) OVER (ORDER BY c1, c2), \
+ AVG(c2) OVER (ORDER BY c1, c2) \
+ FROM test \
+ ORDER BY c1, c2 \
+ LIMIT 5",
+ 4,
+ )
+ .await?;
+ // result in one batch, although e.g. having 2 batches do not change
+ // result semantics, having a len=1 assertion upfront keeps surprises
+ // at bay
+ assert_eq!(results.len(), 1);
+
+ let expected = vec![
+ "+----+----+--------------+----------------------+---------------------+-----------------------------+--------------+----------------+--------------+--------------+--------------+",
+ "| c1 | c2 | ROW_NUMBER() | FIRST_VALUE(test.c2) | LAST_VALUE(test.c2) | NTH_VALUE(test.c2,Int64(2)) | SUM(test.c2) | COUNT(test.c2) | MAX(test.c2) | MIN(test.c2) | AVG(test.c2) |",
+ "+----+----+--------------+----------------------+---------------------+-----------------------------+--------------+----------------+--------------+--------------+--------------+",
+ "| 0 | 1 | 1 | 1 | 1 | | 1 | 1 | 1 | 1 | 1 |",
+ "| 0 | 2 | 2 | 1 | 2 | 2 | 3 | 2 | 2 | 1 | 1.5 |",
+ "| 0 | 3 | 3 | 1 | 3 | 2 | 6 | 3 | 3 | 1 | 2 |",
+ "| 0 | 4 | 4 | 1 | 4 | 2 | 10 | 4 | 4 | 1 | 2.5 |",
+ "| 0 | 5 | 5 | 1 | 5 | 2 | 15 | 5 | 5 | 1 | 3 |",
+ "+----+----+--------------+----------------------+---------------------+-----------------------------+--------------+----------------+--------------+--------------+--------------+",
+ ];
+
+ // window function shall respect ordering
+ assert_batches_eq!(expected, &results);
+ Ok(())
+}
+
+#[tokio::test]
+async fn window_partition_by() -> Result<()> {
+ let results = execute_with_partition(
+ "SELECT \
+ c1, \
+ c2, \
+ SUM(c2) OVER (PARTITION BY c2), \
+ COUNT(c2) OVER (PARTITION BY c2), \
+ MAX(c2) OVER (PARTITION BY c2), \
+ MIN(c2) OVER (PARTITION BY c2), \
+ AVG(c2) OVER (PARTITION BY c2) \
+ FROM test \
+ ORDER BY c1, c2 \
+ LIMIT 5",
+ 4,
+ )
+ .await?;
+
+ let expected = vec![
+ "+----+----+--------------+----------------+--------------+--------------+--------------+",
+ "| c1 | c2 | SUM(test.c2) | COUNT(test.c2) | MAX(test.c2) | MIN(test.c2) | AVG(test.c2) |",
+ "+----+----+--------------+----------------+--------------+--------------+--------------+",
+ "| 0 | 1 | 4 | 4 | 1 | 1 | 1 |",
+ "| 0 | 2 | 8 | 4 | 2 | 2 | 2 |",
+ "| 0 | 3 | 12 | 4 | 3 | 3 | 3 |",
+ "| 0 | 4 | 16 | 4 | 4 | 4 | 4 |",
+ "| 0 | 5 | 20 | 4 | 5 | 5 | 5 |",
+ "+----+----+--------------+----------------+--------------+--------------+--------------+",
+ ];
+
+ // window function shall respect ordering
+ assert_batches_eq!(expected, &results);
+ Ok(())
+}
+
+#[tokio::test]
+async fn window_partition_by_order_by() -> Result<()> {
+ let results = execute_with_partition(
+ "SELECT \
+ c1, \
+ c2, \
+ ROW_NUMBER() OVER (PARTITION BY c2 ORDER BY c1), \
+ FIRST_VALUE(c2 + c1) OVER (PARTITION BY c2 ORDER BY c1), \
+ LAST_VALUE(c2 + c1) OVER (PARTITION BY c2 ORDER BY c1), \
+ NTH_VALUE(c2 + c1, 1) OVER (PARTITION BY c2 ORDER BY c1), \
+ SUM(c2) OVER (PARTITION BY c2 ORDER BY c1), \
+ COUNT(c2) OVER (PARTITION BY c2 ORDER BY c1), \
+ MAX(c2) OVER (PARTITION BY c2 ORDER BY c1), \
+ MIN(c2) OVER (PARTITION BY c2 ORDER BY c1), \
+ AVG(c2) OVER (PARTITION BY c2 ORDER BY c1) \
+ FROM test \
+ ORDER BY c1, c2 \
+ LIMIT 5",
+ 4,
+ )
+ .await?;
+
+ let expected = vec![
+ "+----+----+--------------+--------------------------------+-------------------------------+---------------------------------------+--------------+----------------+--------------+--------------+--------------+",
+ "| c1 | c2 | ROW_NUMBER() | FIRST_VALUE(test.c2 + test.c1) | LAST_VALUE(test.c2 + test.c1) | NTH_VALUE(test.c2 + test.c1,Int64(1)) | SUM(test.c2) | COUNT(test.c2) | MAX(test.c2) | MIN(test.c2) | AVG(test.c2) |",
+ "+----+----+--------------+--------------------------------+-------------------------------+---------------------------------------+--------------+----------------+--------------+--------------+--------------+",
+ "| 0 | 1 | 1 | 1 | 1 | 1 | 1 | 1 | 1 | 1 | 1 |",
+ "| 0 | 2 | 1 | 2 | 2 | 2 | 2 | 1 | 2 | 2 | 2 |",
+ "| 0 | 3 | 1 | 3 | 3 | 3 | 3 | 1 | 3 | 3 | 3 |",
+ "| 0 | 4 | 1 | 4 | 4 | 4 | 4 | 1 | 4 | 4 | 4 |",
+ "| 0 | 5 | 1 | 5 | 5 | 5 | 5 | 1 | 5 | 5 | 5 |",
+ "+----+----+--------------+--------------------------------+-------------------------------+---------------------------------------+--------------+----------------+--------------+--------------+--------------+",
+ ];
+
+ // window function shall respect ordering
+ assert_batches_eq!(expected, &results);
+ Ok(())
+}