You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/05/10 16:05:58 UTC
[arrow-datafusion] branch main updated: Port remainder of `window.rs` to sqllogictest (#6234)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 8a112484ac Port remainder of `window.rs` to sqllogictest (#6234)
8a112484ac is described below
commit 8a112484ac7ae89afc7006d56c65fba2dab106ce
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Wed May 10 12:05:52 2023 -0400
Port remainder of `window.rs` to sqllogictest (#6234)
* Port remainder of window.rs to sqllogictest
* fix groupby.slt to use unbounded inputs
---
datafusion/core/src/test_util/mod.rs | 180 +-----
datafusion/core/tests/data/window_1.csv | 101 ++++
datafusion/core/tests/data/window_2.csv | 101 ++++
datafusion/core/tests/sql/group_by.rs | 97 ----
datafusion/core/tests/sql/mod.rs | 1 -
datafusion/core/tests/sql/projection.rs | 30 +-
datafusion/core/tests/sql/window.rs | 525 -----------------
.../tests/sqllogictests/test_files/groupby.slt | 89 +++
.../core/tests/sqllogictests/test_files/select.slt | 48 ++
.../core/tests/sqllogictests/test_files/window.slt | 622 +++++++++++++++++++++
10 files changed, 963 insertions(+), 831 deletions(-)
diff --git a/datafusion/core/src/test_util/mod.rs b/datafusion/core/src/test_util/mod.rs
index 9035b6d893..d42379b82a 100644
--- a/datafusion/core/src/test_util/mod.rs
+++ b/datafusion/core/src/test_util/mod.rs
@@ -19,10 +19,8 @@
pub mod parquet;
-use arrow::array::Int32Array;
use std::any::Any;
use std::collections::HashMap;
-use std::fs::File;
use std::path::Path;
use std::pin::Pin;
use std::task::{Context, Poll};
@@ -42,13 +40,10 @@ use crate::prelude::{CsvReadOptions, SessionContext};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
-use datafusion_common::from_slice::FromSlice;
use datafusion_common::{Statistics, TableReference};
-use datafusion_execution::config::SessionConfig;
-use datafusion_expr::{col, CreateExternalTable, Expr, TableType};
+use datafusion_expr::{CreateExternalTable, Expr, TableType};
use datafusion_physical_expr::PhysicalSortExpr;
use futures::Stream;
-use tempfile::TempDir;
/// Compares formatted output of a record batch with an expected
/// vector of strings, with the result of pretty formatting record
@@ -293,179 +288,6 @@ pub fn aggr_test_schema_with_missing_col() -> SchemaRef {
Arc::new(schema)
}
-// Return a static RecordBatch and its ordering for tests. RecordBatch is ordered by ts
-fn get_test_data() -> Result<(RecordBatch, Vec<Expr>)> {
- let ts_field = Field::new("ts", DataType::Int32, false);
- let inc_field = Field::new("inc_col", DataType::Int32, false);
- let desc_field = Field::new("desc_col", DataType::Int32, false);
-
- let schema = Arc::new(Schema::new(vec![ts_field, inc_field, desc_field]));
-
- let batch = RecordBatch::try_new(
- schema,
- vec![
- Arc::new(Int32Array::from_slice([
- 1, 1, 5, 9, 10, 11, 16, 21, 22, 26, 26, 28, 31, 33, 38, 42, 47, 51, 53,
- 53, 58, 63, 67, 68, 70, 72, 72, 76, 81, 85, 86, 88, 91, 96, 97, 98, 100,
- 101, 102, 104, 104, 108, 112, 113, 113, 114, 114, 117, 122, 126, 131,
- 131, 136, 136, 136, 139, 141, 146, 147, 147, 152, 154, 159, 161, 163,
- 164, 167, 172, 173, 177, 180, 185, 186, 191, 195, 195, 199, 203, 207,
- 210, 213, 218, 221, 224, 226, 230, 232, 235, 238, 238, 239, 244, 245,
- 247, 250, 254, 258, 262, 264, 264,
- ])),
- Arc::new(Int32Array::from_slice([
- 1, 5, 10, 15, 20, 21, 26, 29, 30, 33, 37, 40, 43, 44, 45, 49, 51, 53, 58,
- 61, 65, 70, 75, 78, 83, 88, 90, 91, 95, 97, 100, 105, 109, 111, 115, 119,
- 120, 124, 126, 129, 131, 135, 140, 143, 144, 147, 148, 149, 151, 155,
- 156, 159, 160, 163, 165, 170, 172, 177, 181, 182, 186, 187, 192, 196,
- 197, 199, 203, 207, 209, 213, 214, 216, 219, 221, 222, 225, 226, 231,
- 236, 237, 242, 245, 247, 248, 253, 254, 259, 261, 266, 269, 272, 275,
- 278, 283, 286, 289, 291, 296, 301, 305,
- ])),
- Arc::new(Int32Array::from_slice([
- 100, 98, 93, 91, 86, 84, 81, 77, 75, 71, 70, 69, 64, 62, 59, 55, 50, 45,
- 41, 40, 39, 36, 31, 28, 23, 22, 17, 13, 10, 6, 5, 2, 1, -1, -4, -5, -6,
- -8, -12, -16, -17, -19, -24, -25, -29, -34, -37, -42, -47, -48, -49, -53,
- -57, -58, -61, -65, -67, -68, -71, -73, -75, -76, -78, -83, -87, -91,
- -95, -98, -101, -105, -106, -111, -114, -116, -120, -125, -128, -129,
- -134, -139, -142, -143, -146, -150, -154, -158, -163, -168, -172, -176,
- -181, -184, -189, -193, -196, -201, -203, -208, -210, -213,
- ])),
- ],
- )?;
- let file_sort_order = vec![col("ts").sort(true, false)];
- Ok((batch, file_sort_order))
-}
-
-// Return a static RecordBatch and its ordering for tests. RecordBatch is ordered by a, b, c
-fn get_test_data2() -> Result<(RecordBatch, Vec<Expr>)> {
- let a0 = Field::new("a0", DataType::Int32, false);
- let a = Field::new("a", DataType::Int32, false);
- let b = Field::new("b", DataType::Int32, false);
- let c = Field::new("c", DataType::Int32, false);
- let d = Field::new("d", DataType::Int32, false);
-
- let schema = Arc::new(Schema::new(vec![a0, a, b, c, d]));
-
- let batch = RecordBatch::try_new(
- schema,
- vec![
- Arc::new(Int32Array::from_slice([
- 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
- 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
- 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
- 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
- 0, 0, 0, 0,
- ])),
- Arc::new(Int32Array::from_slice([
- 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
- 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
- 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
- 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
- 1, 1, 1, 1,
- ])),
- Arc::new(Int32Array::from_slice([
- 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
- 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
- 1, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2,
- 2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3,
- 3, 3, 3, 3,
- ])),
- Arc::new(Int32Array::from_slice([
- 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20,
- 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38,
- 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56,
- 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74,
- 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92,
- 93, 94, 95, 96, 97, 98, 99,
- ])),
- Arc::new(Int32Array::from_slice([
- 0, 2, 0, 0, 1, 1, 0, 2, 1, 4, 4, 2, 2, 1, 2, 3, 3, 2, 1, 4, 0, 3, 0, 0,
- 4, 0, 2, 0, 1, 1, 3, 4, 2, 2, 4, 0, 1, 4, 0, 1, 1, 3, 3, 2, 3, 0, 0, 1,
- 1, 3, 0, 3, 1, 1, 4, 2, 1, 1, 1, 2, 4, 3, 1, 4, 4, 0, 2, 4, 1, 1, 0, 2,
- 1, 1, 4, 2, 0, 2, 1, 4, 2, 0, 4, 2, 1, 1, 1, 4, 3, 4, 1, 2, 0, 0, 2, 0,
- 4, 2, 4, 3,
- ])),
- ],
- )?;
- let file_sort_order = vec![
- col("a").sort(true, false),
- col("b").sort(true, false),
- col("c").sort(true, false),
- ];
- Ok((batch, file_sort_order))
-}
-
-/// Creates a test_context with table name `annotated_data` which has 100 rows.
-// Columns in the table are ts, inc_col, desc_col. Source is CsvExec which is ordered by
-// ts column.
-pub async fn get_test_context(
- tmpdir: &TempDir,
- infinite_source: bool,
- session_config: SessionConfig,
-) -> Result<SessionContext> {
- get_test_context_helper(tmpdir, infinite_source, session_config, get_test_data).await
-}
-
-/// Creates a test_context with table name `annotated_data`, which has 100 rows.
-// Columns in the table are a,b,c,d. Source is CsvExec which is ordered by
-// a,b,c column. Column a has cardinality 2, column b has cardinality 4.
-// Column c has cardinality 100 (unique entries). Column d has cardinality 5.
-pub async fn get_test_context2(
- tmpdir: &TempDir,
- infinite_source: bool,
- session_config: SessionConfig,
-) -> Result<SessionContext> {
- get_test_context_helper(tmpdir, infinite_source, session_config, get_test_data2).await
-}
-
-async fn get_test_context_helper(
- tmpdir: &TempDir,
- infinite_source: bool,
- session_config: SessionConfig,
- data_receiver: fn() -> Result<(RecordBatch, Vec<Expr>)>,
-) -> Result<SessionContext> {
- let ctx = SessionContext::with_config(session_config);
-
- let csv_read_options = CsvReadOptions::default();
- let (batch, file_sort_order) = data_receiver()?;
-
- let options_sort = csv_read_options
- .to_listing_options(&ctx.copied_config())
- .with_file_sort_order(Some(file_sort_order))
- .with_infinite_source(infinite_source);
-
- write_test_data_to_csv(tmpdir, 1, &batch)?;
- let sql_definition = None;
- ctx.register_listing_table(
- "annotated_data",
- tmpdir.path().to_string_lossy(),
- options_sort.clone(),
- Some(batch.schema()),
- sql_definition,
- )
- .await
- .unwrap();
- Ok(ctx)
-}
-
-fn write_test_data_to_csv(
- tmpdir: &TempDir,
- n_file: usize,
- batch: &RecordBatch,
-) -> Result<()> {
- let n_chunk = batch.num_rows() / n_file;
- for i in 0..n_file {
- let target_file = tmpdir.path().join(format!("{i}.csv"));
- let file = File::create(target_file)?;
- let chunks_start = i * n_chunk;
- let cur_batch = batch.slice(chunks_start, n_chunk);
- let mut writer = arrow::csv::Writer::new(file);
- writer.write(&cur_batch)?;
- }
- Ok(())
-}
-
/// TableFactory for tests
pub struct TestTableFactory {}
diff --git a/datafusion/core/tests/data/window_1.csv b/datafusion/core/tests/data/window_1.csv
new file mode 100644
index 0000000000..588af16d06
--- /dev/null
+++ b/datafusion/core/tests/data/window_1.csv
@@ -0,0 +1,101 @@
+ts,inc_col,desc_col
+1,1,100
+1,5,98
+5,10,93
+9,15,91
+10,20,86
+11,21,84
+16,26,81
+21,29,77
+22,30,75
+26,33,71
+26,37,70
+28,40,69
+31,43,64
+33,44,62
+38,45,59
+42,49,55
+47,51,50
+51,53,45
+53,58,41
+53,61,40
+58,65,39
+63,70,36
+67,75,31
+68,78,28
+70,83,23
+72,88,22
+72,90,17
+76,91,13
+81,95,10
+85,97,6
+86,100,5
+88,105,2
+91,109,1
+96,111,-1
+97,115,-4
+98,119,-5
+100,120,-6
+101,124,-8
+102,126,-12
+104,129,-16
+104,131,-17
+108,135,-19
+112,140,-24
+113,143,-25
+113,144,-29
+114,147,-34
+114,148,-37
+117,149,-42
+122,151,-47
+126,155,-48
+131,156,-49
+131,159,-53
+136,160,-57
+136,163,-58
+136,165,-61
+139,170,-65
+141,172,-67
+146,177,-68
+147,181,-71
+147,182,-73
+152,186,-75
+154,187,-76
+159,192,-78
+161,196,-83
+163,197,-87
+164,199,-91
+167,203,-95
+172,207,-98
+173,209,-101
+177,213,-105
+180,214,-106
+185,216,-111
+186,219,-114
+191,221,-116
+195,222,-120
+195,225,-125
+199,226,-128
+203,231,-129
+207,236,-134
+210,237,-139
+213,242,-142
+218,245,-143
+221,247,-146
+224,248,-150
+226,253,-154
+230,254,-158
+232,259,-163
+235,261,-168
+238,266,-172
+238,269,-176
+239,272,-181
+244,275,-184
+245,278,-189
+247,283,-193
+250,286,-196
+254,289,-201
+258,291,-203
+262,296,-208
+264,301,-210
+264,305,-213
diff --git a/datafusion/core/tests/data/window_2.csv b/datafusion/core/tests/data/window_2.csv
new file mode 100644
index 0000000000..9a2d7c1940
--- /dev/null
+++ b/datafusion/core/tests/data/window_2.csv
@@ -0,0 +1,101 @@
+a0,a,b,c,d
+1,0,0,0,0
+1,0,0,1,2
+1,0,0,2,0
+1,0,0,3,0
+1,0,0,4,1
+1,0,0,5,1
+1,0,0,6,0
+1,0,0,7,2
+1,0,0,8,1
+1,0,0,9,4
+1,0,0,10,4
+1,0,0,11,2
+1,0,0,12,2
+1,0,0,13,1
+1,0,0,14,2
+1,0,0,15,3
+1,0,0,16,3
+1,0,0,17,2
+1,0,0,18,1
+1,0,0,19,4
+1,0,0,20,0
+1,0,0,21,3
+1,0,0,22,0
+1,0,0,23,0
+1,0,0,24,4
+1,0,1,25,0
+1,0,1,26,2
+1,0,1,27,0
+1,0,1,28,1
+1,0,1,29,1
+1,0,1,30,3
+1,0,1,31,4
+1,0,1,32,2
+1,0,1,33,2
+1,0,1,34,4
+1,0,1,35,0
+1,0,1,36,1
+1,0,1,37,4
+1,0,1,38,0
+1,0,1,39,1
+1,0,1,40,1
+1,0,1,41,3
+1,0,1,42,3
+1,0,1,43,2
+1,0,1,44,3
+1,0,1,45,0
+1,0,1,46,0
+1,0,1,47,1
+1,0,1,48,1
+1,0,1,49,3
+0,1,2,50,0
+0,1,2,51,3
+0,1,2,52,1
+0,1,2,53,1
+0,1,2,54,4
+0,1,2,55,2
+0,1,2,56,1
+0,1,2,57,1
+0,1,2,58,1
+0,1,2,59,2
+0,1,2,60,4
+0,1,2,61,3
+0,1,2,62,1
+0,1,2,63,4
+0,1,2,64,4
+0,1,2,65,0
+0,1,2,66,2
+0,1,2,67,4
+0,1,2,68,1
+0,1,2,69,1
+0,1,2,70,0
+0,1,2,71,2
+0,1,2,72,1
+0,1,2,73,1
+0,1,2,74,4
+0,1,3,75,2
+0,1,3,76,0
+0,1,3,77,2
+0,1,3,78,1
+0,1,3,79,4
+0,1,3,80,2
+0,1,3,81,0
+0,1,3,82,4
+0,1,3,83,2
+0,1,3,84,1
+0,1,3,85,1
+0,1,3,86,1
+0,1,3,87,4
+0,1,3,88,3
+0,1,3,89,4
+0,1,3,90,1
+0,1,3,91,2
+0,1,3,92,0
+0,1,3,93,0
+0,1,3,94,2
+0,1,3,95,0
+0,1,3,96,4
+0,1,3,97,2
+0,1,3,98,4
+0,1,3,99,3
diff --git a/datafusion/core/tests/sql/group_by.rs b/datafusion/core/tests/sql/group_by.rs
index 7a4d4d2897..b4a92db3fc 100644
--- a/datafusion/core/tests/sql/group_by.rs
+++ b/datafusion/core/tests/sql/group_by.rs
@@ -16,7 +16,6 @@
// under the License.
use super::*;
-use datafusion::test_util::get_test_context2;
#[tokio::test]
async fn group_by_date_trunc() -> Result<()> {
@@ -161,99 +160,3 @@ async fn group_by_dictionary() {
run_test_case::<UInt32Type>().await;
run_test_case::<UInt64Type>().await;
}
-
-#[tokio::test]
-async fn test_source_sorted_groupby() -> Result<()> {
- let tmpdir = TempDir::new().unwrap();
- let session_config = SessionConfig::new().with_target_partitions(1);
- let ctx = get_test_context2(&tmpdir, true, session_config).await?;
-
- let sql = "SELECT a, b,
- SUM(c) as summation1
- FROM annotated_data
- GROUP BY b, a";
-
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let physical_plan = dataframe.create_physical_plan().await?;
- let formatted = displayable(physical_plan.as_ref()).indent().to_string();
- let expected = {
- vec![
- "ProjectionExec: expr=[a@1 as a, b@0 as b, SUM(annotated_data.c)@2 as summation1]",
- " AggregateExec: mode=Single, gby=[b@1 as b, a@0 as a], aggr=[SUM(annotated_data.c)], ordering_mode=FullyOrdered",
- ]
- };
-
- let actual: Vec<&str> = formatted.trim().lines().collect();
- let actual_len = actual.len();
- let actual_trim_last = &actual[..actual_len - 1];
- assert_eq!(
- expected, actual_trim_last,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
-
- let actual = execute_to_batches(&ctx, sql).await;
- let expected = vec![
- "+---+---+------------+",
- "| a | b | summation1 |",
- "+---+---+------------+",
- "| 0 | 0 | 300 |",
- "| 0 | 1 | 925 |",
- "| 1 | 2 | 1550 |",
- "| 1 | 3 | 2175 |",
- "+---+---+------------+",
- ];
- assert_batches_eq!(expected, &actual);
- Ok(())
-}
-
-#[tokio::test]
-async fn test_source_sorted_groupby2() -> Result<()> {
- let tmpdir = TempDir::new().unwrap();
- let session_config = SessionConfig::new().with_target_partitions(1);
- let ctx = get_test_context2(&tmpdir, true, session_config).await?;
-
- let sql = "SELECT a, d,
- SUM(c) as summation1
- FROM annotated_data
- GROUP BY d, a";
-
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let physical_plan = dataframe.create_physical_plan().await?;
- let formatted = displayable(physical_plan.as_ref()).indent().to_string();
- let expected = {
- vec![
- "ProjectionExec: expr=[a@1 as a, d@0 as d, SUM(annotated_data.c)@2 as summation1]",
- " AggregateExec: mode=Single, gby=[d@2 as d, a@0 as a], aggr=[SUM(annotated_data.c)], ordering_mode=PartiallyOrdered",
- ]
- };
-
- let actual: Vec<&str> = formatted.trim().lines().collect();
- let actual_len = actual.len();
- let actual_trim_last = &actual[..actual_len - 1];
- assert_eq!(
- expected, actual_trim_last,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
-
- let actual = execute_to_batches(&ctx, sql).await;
- let expected = vec![
- "+---+---+------------+",
- "| a | d | summation1 |",
- "+---+---+------------+",
- "| 0 | 0 | 292 |",
- "| 0 | 2 | 196 |",
- "| 0 | 1 | 315 |",
- "| 0 | 4 | 164 |",
- "| 0 | 3 | 258 |",
- "| 1 | 0 | 622 |",
- "| 1 | 3 | 299 |",
- "| 1 | 1 | 1043 |",
- "| 1 | 4 | 913 |",
- "| 1 | 2 | 848 |",
- "+---+---+------------+",
- ];
- assert_batches_eq!(expected, &actual);
- Ok(())
-}
diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs
index 8b33a888a5..bd9e213dfe 100644
--- a/datafusion/core/tests/sql/mod.rs
+++ b/datafusion/core/tests/sql/mod.rs
@@ -97,7 +97,6 @@ pub mod references;
pub mod select;
pub mod timestamp;
pub mod udf;
-pub mod window;
pub mod explain;
pub mod information_schema;
diff --git a/datafusion/core/tests/sql/projection.rs b/datafusion/core/tests/sql/projection.rs
index 5f480d46c6..ac697b1176 100644
--- a/datafusion/core/tests/sql/projection.rs
+++ b/datafusion/core/tests/sql/projection.rs
@@ -16,7 +16,7 @@
// under the License.
use datafusion::datasource::provider_as_source;
-use datafusion::test_util::{get_test_context2, scan_empty};
+use datafusion::test_util::scan_empty;
use datafusion_expr::{when, LogicalPlanBuilder, UNNAMED_TABLE};
use tempfile::TempDir;
@@ -375,31 +375,3 @@ async fn project_columns_in_memory_without_propagation() -> Result<()> {
Ok(())
}
-
-#[tokio::test]
-async fn test_source_projection() -> Result<()> {
- let session_config = SessionConfig::new().with_target_partitions(1);
- // Source is ordered by a, b, c
- // Source is finite.
- let tmpdir1 = TempDir::new()?;
- let ctx = get_test_context2(&tmpdir1, false, session_config).await?;
- let sql = "SELECT a FROM annotated_data
- ORDER BY a
- LIMIT 5";
-
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let physical_plan = dataframe.create_physical_plan().await?;
- let formatted = displayable(physical_plan.as_ref()).indent().to_string();
- // Final plan shouldn't include SortExec.
- let expected: Vec<&str> = { vec!["GlobalLimitExec: skip=0, fetch=5"] };
-
- let actual: Vec<&str> = formatted.trim().lines().collect();
- let actual_len = actual.len();
- let actual_trim_last = &actual[..actual_len - 1];
- assert_eq!(
- expected, actual_trim_last,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
- Ok(())
-}
diff --git a/datafusion/core/tests/sql/window.rs b/datafusion/core/tests/sql/window.rs
deleted file mode 100644
index f63395e455..0000000000
--- a/datafusion/core/tests/sql/window.rs
+++ /dev/null
@@ -1,525 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use super::*;
-
-#[tokio::test]
-async fn window_frame_creation_type_checking() -> Result<()> {
- // The following query has type error. We should test the error could be detected
- // from either the logical plan (when `skip_failed_rules` is set to `false`) or
- // the physical plan (when `skip_failed_rules` is set to `true`).
-
- // We should remove the type checking in physical plan after we don't skip
- // the failed optimizing rules by default. (see more in https://github.com/apache/arrow-datafusion/issues/4615)
- async fn check_query(skip_failed_rules: bool, err_msg: &str) -> Result<()> {
- use datafusion_common::ScalarValue::Boolean;
- let config = SessionConfig::new().set(
- "datafusion.optimizer.skip_failed_rules",
- Boolean(Some(skip_failed_rules)),
- );
- let ctx = SessionContext::with_config(config);
- register_aggregate_csv(&ctx).await?;
- let df = ctx
- .sql(
- "SELECT
- COUNT(c1) OVER (ORDER BY c2 RANGE BETWEEN '1 DAY' PRECEDING AND '2 DAY' FOLLOWING)
- FROM aggregate_test_100;",
- )
- .await?;
- let results = df.collect().await;
- assert_contains!(results.err().unwrap().to_string(), err_msg);
- Ok(())
- }
-
- // Error is returned from the physical plan.
- check_query(
- true,
- r#"Execution error: Cannot cast Utf8("1 DAY") to UInt32"#,
- )
- .await?;
-
- // Error is returned from the logical plan.
- check_query(
- false,
- r#"Execution error: Cannot cast Utf8("1 DAY") to UInt32"#,
- )
- .await
-}
-
-mod tests {
- use super::*;
- use datafusion::test_util::{get_test_context, get_test_context2};
-
- #[tokio::test]
- async fn test_source_sorted_aggregate() -> Result<()> {
- let tmpdir = TempDir::new()?;
- let session_config = SessionConfig::new().with_target_partitions(1);
- let ctx = get_test_context(&tmpdir, false, session_config).await?;
-
- let sql = "SELECT
- SUM(inc_col) OVER(ORDER BY ts RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING) as sum1,
- SUM(desc_col) OVER(ORDER BY ts RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING) as sum2,
- SUM(inc_col) OVER(ORDER BY ts ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING) as sum3,
- MIN(inc_col) OVER(ORDER BY ts RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING) as min1,
- MIN(desc_col) OVER(ORDER BY ts RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING) as min2,
- MIN(inc_col) OVER(ORDER BY ts ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING) as min3,
- MAX(inc_col) OVER(ORDER BY ts RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING) as max1,
- MAX(desc_col) OVER(ORDER BY ts RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING) as max2,
- MAX(inc_col) OVER(ORDER BY ts ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING) as max3,
- COUNT(*) OVER(ORDER BY ts RANGE BETWEEN 4 PRECEDING AND 8 FOLLOWING) as cnt1,
- COUNT(*) OVER(ORDER BY ts ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING) as cnt2,
- SUM(inc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING) as sumr1,
- SUM(desc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 1 PRECEDING AND 8 FOLLOWING) as sumr2,
- SUM(desc_col) OVER(ORDER BY ts DESC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sumr3,
- MIN(inc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING) as minr1,
- MIN(desc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING) as minr2,
- MIN(inc_col) OVER(ORDER BY ts DESC ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING) as minr3,
- MAX(inc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING) as maxr1,
- MAX(desc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING) as maxr2,
- MAX(inc_col) OVER(ORDER BY ts DESC ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING) as maxr3,
- COUNT(*) OVER(ORDER BY ts DESC RANGE BETWEEN 6 PRECEDING AND 2 FOLLOWING) as cntr1,
- COUNT(*) OVER(ORDER BY ts DESC ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING) as cntr2,
- SUM(desc_col) OVER(ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING) as sum4,
- COUNT(*) OVER(ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING) as cnt3
- FROM annotated_data
- ORDER BY inc_col DESC
- LIMIT 5
- ";
-
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let physical_plan = dataframe.create_physical_plan().await?;
- let formatted = displayable(physical_plan.as_ref()).indent().to_string();
- let expected = {
- vec![
- "ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, sum3@2 as sum3, min1@3 as min1, min2@4 as min2, min3@5 as min3, max1@6 as max1, max2@7 as max2, max3@8 as max3, cnt1@9 as cnt1, cnt2@10 as cnt2, sumr1@11 as sumr1, sumr2@12 as sumr2, sumr3@13 as sumr3, minr1@14 as minr1, minr2@15 as minr2, minr3@16 as minr3, maxr1@17 as maxr1, maxr2@18 as maxr2, maxr3@19 as maxr3, cntr1@20 as cntr1, cntr2@21 as cntr2, sum4@22 as sum4, cnt3@23 as cnt3]",
- " GlobalLimitExec: skip=0, fetch=5",
- " SortExec: fetch=5, expr=[inc_col@24 DESC]",
- " ProjectionExec: expr=[SUM(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@13 as sum1, SUM(annotated_data.desc_col) ORDER BY [annotated_data.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING@14 as sum2, SUM(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING@15 as sum3, MIN(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS L [...]
- " BoundedWindowAggExec: wdw=[SUM(annotated_data.desc_col): Ok(Field { name: \"SUM(annotated_data.desc_col)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(8)), end_bound: Following(UInt64(1)) }, COUNT(UInt8(1)): Ok(Field { name: \"COUNT(UInt8(1))\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units [...]
- " ProjectionExec: expr=[inc_col@1 as inc_col, desc_col@2 as desc_col, SUM(annotated_data.inc_col) ORDER BY [annotated_data.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING@3 as SUM(annotated_data.inc_col), SUM(annotated_data.desc_col) ORDER BY [annotated_data.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 8 FOLLOWING@4 as SUM(annotated_data.desc_col), SUM(annotated_data.desc_col) ORDER BY [annotated_data.ts DESC NULLS FIRST] ROWS BETWEEN 1 PRECE [...]
- " BoundedWindowAggExec: wdw=[SUM(annotated_data.inc_col): Ok(Field { name: \"SUM(annotated_data.inc_col)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(10)), end_bound: Following(Int32(1)) }, SUM(annotated_data.desc_col): Ok(Field { name: \"SUM(annotated_data.desc_col)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }) [...]
- " BoundedWindowAggExec: wdw=[SUM(annotated_data.inc_col): Ok(Field { name: \"SUM(annotated_data.inc_col)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(4)), end_bound: Following(Int32(1)) }, SUM(annotated_data.desc_col): Ok(Field { name: \"SUM(annotated_data.desc_col)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} } [...]
- ]
- };
-
- let actual: Vec<&str> = formatted.trim().lines().collect();
- let actual_len = actual.len();
- let actual_trim_last = &actual[..actual_len - 1];
- assert_eq!(
- expected, actual_trim_last,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
-
- let actual = execute_to_batches(&ctx, sql).await;
- let expected = vec![
- "+------+------+------+------+------+------+------+------+------+------+------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+------+",
- "| sum1 | sum2 | sum3 | min1 | min2 | min3 | max1 | max2 | max3 | cnt1 | cnt2 | sumr1 | sumr2 | sumr3 | minr1 | minr2 | minr3 | maxr1 | maxr2 | maxr3 | cntr1 | cntr2 | sum4 | cnt3 |",
- "+------+------+------+------+------+------+------+------+------+------+------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+------+",
- "| 1482 | -631 | 606 | 289 | -213 | 301 | 305 | -208 | 305 | 3 | 9 | 902 | -834 | -1231 | 301 | -213 | 269 | 305 | -210 | 305 | 3 | 2 | -1797 | 9 |",
- "| 1482 | -631 | 902 | 289 | -213 | 296 | 305 | -208 | 305 | 3 | 10 | 902 | -834 | -1424 | 301 | -213 | 266 | 305 | -210 | 305 | 3 | 3 | -1978 | 10 |",
- "| 876 | -411 | 1193 | 289 | -208 | 291 | 296 | -203 | 305 | 4 | 10 | 587 | -612 | -1400 | 296 | -213 | 261 | 305 | -208 | 301 | 3 | 4 | -1941 | 10 |",
- "| 866 | -404 | 1482 | 286 | -203 | 289 | 291 | -201 | 305 | 5 | 10 | 580 | -600 | -1374 | 291 | -208 | 259 | 305 | -203 | 296 | 4 | 5 | -1903 | 10 |",
- "| 1411 | -397 | 1768 | 275 | -201 | 286 | 289 | -196 | 305 | 4 | 10 | 575 | -590 | -1347 | 289 | -203 | 254 | 305 | -201 | 291 | 2 | 6 | -1863 | 10 |",
- "+------+------+------+------+------+------+------+------+------+------+------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+------+",
- ];
- assert_batches_eq!(expected, &actual);
- Ok(())
- }
-
- #[tokio::test]
- async fn test_source_sorted_builtin() -> Result<()> {
- let tmpdir = TempDir::new()?;
- let session_config = SessionConfig::new().with_target_partitions(1);
- let ctx = get_test_context(&tmpdir, false, session_config).await?;
-
- let sql = "SELECT
- FIRST_VALUE(inc_col) OVER(ORDER BY ts RANGE BETWEEN 10 PRECEDING and 1 FOLLOWING) as fv1,
- FIRST_VALUE(inc_col) OVER(ORDER BY ts ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as fv2,
- LAST_VALUE(inc_col) OVER(ORDER BY ts RANGE BETWEEN 10 PRECEDING and 1 FOLLOWING) as lv1,
- LAST_VALUE(inc_col) OVER(ORDER BY ts ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as lv2,
- NTH_VALUE(inc_col, 5) OVER(ORDER BY ts RANGE BETWEEN 10 PRECEDING and 1 FOLLOWING) as nv1,
- NTH_VALUE(inc_col, 5) OVER(ORDER BY ts ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as nv2,
- ROW_NUMBER() OVER(ORDER BY ts RANGE BETWEEN 1 PRECEDING and 10 FOLLOWING) AS rn1,
- ROW_NUMBER() OVER(ORDER BY ts ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as rn2,
- RANK() OVER(ORDER BY ts RANGE BETWEEN 1 PRECEDING and 10 FOLLOWING) AS rank1,
- RANK() OVER(ORDER BY ts ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as rank2,
- DENSE_RANK() OVER(ORDER BY ts RANGE BETWEEN 1 PRECEDING and 10 FOLLOWING) AS dense_rank1,
- DENSE_RANK() OVER(ORDER BY ts ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as dense_rank2,
- LAG(inc_col, 1, 1001) OVER(ORDER BY ts RANGE BETWEEN 1 PRECEDING and 10 FOLLOWING) AS lag1,
- LAG(inc_col, 2, 1002) OVER(ORDER BY ts ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as lag2,
- LEAD(inc_col, -1, 1001) OVER(ORDER BY ts RANGE BETWEEN 1 PRECEDING and 10 FOLLOWING) AS lead1,
- LEAD(inc_col, 4, 1004) OVER(ORDER BY ts ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as lead2,
- FIRST_VALUE(inc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 10 PRECEDING and 1 FOLLOWING) as fvr1,
- FIRST_VALUE(inc_col) OVER(ORDER BY ts DESC ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as fvr2,
- LAST_VALUE(inc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 10 PRECEDING and 1 FOLLOWING) as lvr1,
- LAST_VALUE(inc_col) OVER(ORDER BY ts DESC ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as lvr2,
- LAG(inc_col, 1, 1001) OVER(ORDER BY ts DESC RANGE BETWEEN 1 PRECEDING and 10 FOLLOWING) AS lagr1,
- LAG(inc_col, 2, 1002) OVER(ORDER BY ts DESC ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as lagr2,
- LEAD(inc_col, -1, 1001) OVER(ORDER BY ts DESC RANGE BETWEEN 1 PRECEDING and 10 FOLLOWING) AS leadr1,
- LEAD(inc_col, 4, 1004) OVER(ORDER BY ts DESC ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as leadr2
- FROM annotated_data
- ORDER BY ts DESC
- LIMIT 5
- ";
-
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let physical_plan = dataframe.create_physical_plan().await?;
- let formatted = displayable(physical_plan.as_ref()).indent().to_string();
- let expected = {
- vec![
- "ProjectionExec: expr=[fv1@0 as fv1, fv2@1 as fv2, lv1@2 as lv1, lv2@3 as lv2, nv1@4 as nv1, nv2@5 as nv2, rn1@6 as rn1, rn2@7 as rn2, rank1@8 as rank1, rank2@9 as rank2, dense_rank1@10 as dense_rank1, dense_rank2@11 as dense_rank2, lag1@12 as lag1, lag2@13 as lag2, lead1@14 as lead1, lead2@15 as lead2, fvr1@16 as fvr1, fvr2@17 as fvr2, lvr1@18 as lvr1, lvr2@19 as lvr2, lagr1@20 as lagr1, lagr2@21 as lagr2, leadr1@22 as leadr1, leadr2@23 as leadr2]",
- " GlobalLimitExec: skip=0, fetch=5",
- " SortExec: fetch=5, expr=[ts@24 DESC]",
- " ProjectionExec: expr=[FIRST_VALUE(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@10 as fv1, FIRST_VALUE(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@11 as fv2, LAST_VALUE(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@12 as lv1, LAST_VALUE(annotated_data.inc_col) ORDER BY [an [...]
- " BoundedWindowAggExec: wdw=[FIRST_VALUE(annotated_data.inc_col): Ok(Field { name: \"FIRST_VALUE(annotated_data.inc_col)\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(10)), end_bound: Following(Int32(1)) }, FIRST_VALUE(annotated_data.inc_col): Ok(Field { name: \"FIRST_VALUE(annotated_data.inc_col)\", data_type: Int32, nullable: true, dict_id: 0, dict_is_order [...]
- " BoundedWindowAggExec: wdw=[FIRST_VALUE(annotated_data.inc_col): Ok(Field { name: \"FIRST_VALUE(annotated_data.inc_col)\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(1)), end_bound: Following(Int32(10)) }, FIRST_VALUE(annotated_data.inc_col): Ok(Field { name: \"FIRST_VALUE(annotated_data.inc_col)\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ord [...]
- ]
- };
-
- let actual: Vec<&str> = formatted.trim().lines().collect();
- let actual_len = actual.len();
- let actual_trim_last = &actual[..actual_len - 1];
- assert_eq!(
- expected, actual_trim_last,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
-
- let actual = execute_to_batches(&ctx, sql).await;
- let expected = vec![
- "+-----+-----+-----+-----+-----+-----+-----+-----+-------+-------+-------------+-------------+------+------+-------+-------+------+------+------+------+-------+-------+--------+--------+",
- "| fv1 | fv2 | lv1 | lv2 | nv1 | nv2 | rn1 | rn2 | rank1 | rank2 | dense_rank1 | dense_rank2 | lag1 | lag2 | lead1 | lead2 | fvr1 | fvr2 | lvr1 | lvr2 | lagr1 | lagr2 | leadr1 | leadr2 |",
- "+-----+-----+-----+-----+-----+-----+-----+-----+-------+-------+-------------+-------------+------+------+-------+-------+------+------+------+------+-------+-------+--------+--------+",
- "| 289 | 269 | 305 | 305 | 305 | 283 | 100 | 100 | 99 | 99 | 86 | 86 | 301 | 296 | 301 | 1004 | 305 | 305 | 301 | 301 | 1001 | 1002 | 1001 | 289 |",
- "| 289 | 266 | 305 | 305 | 305 | 278 | 99 | 99 | 99 | 99 | 86 | 86 | 296 | 291 | 296 | 1004 | 305 | 305 | 301 | 296 | 305 | 1002 | 305 | 286 |",
- "| 289 | 261 | 296 | 301 | | 275 | 98 | 98 | 98 | 98 | 85 | 85 | 291 | 289 | 291 | 1004 | 305 | 305 | 296 | 291 | 301 | 305 | 301 | 283 |",
- "| 286 | 259 | 291 | 296 | | 272 | 97 | 97 | 97 | 97 | 84 | 84 | 289 | 286 | 289 | 1004 | 305 | 305 | 291 | 289 | 296 | 301 | 296 | 278 |",
- "| 275 | 254 | 289 | 291 | 289 | 269 | 96 | 96 | 96 | 96 | 83 | 83 | 286 | 283 | 286 | 305 | 305 | 305 | 289 | 286 | 291 | 296 | 291 | 275 |",
- "+-----+-----+-----+-----+-----+-----+-----+-----+-------+-------+-------------+-------------+------+------+-------+-------+------+------+------+------+-------+-------+--------+--------+",
- ];
- assert_batches_eq!(expected, &actual);
- Ok(())
- }
-
- #[tokio::test]
- async fn test_source_sorted_unbounded_preceding() -> Result<()> {
- let tmpdir = TempDir::new()?;
- let session_config = SessionConfig::new().with_target_partitions(1);
- let ctx = get_test_context(&tmpdir, false, session_config).await?;
-
- let sql = "SELECT
- SUM(inc_col) OVER(ORDER BY ts ASC RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING) as sum1,
- SUM(inc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING) as sum2,
- MIN(inc_col) OVER(ORDER BY ts ASC RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING) as min1,
- MIN(inc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING) as min2,
- MAX(inc_col) OVER(ORDER BY ts ASC RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING) as max1,
- MAX(inc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING) as max2,
- COUNT(inc_col) OVER(ORDER BY ts ASC RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING) as count1,
- COUNT(inc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING) as count2,
- AVG(inc_col) OVER(ORDER BY ts ASC RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING) as avg1,
- AVG(inc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING) as avg2
- FROM annotated_data
- ORDER BY inc_col ASC
- LIMIT 5";
-
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let physical_plan = dataframe.create_physical_plan().await?;
- let formatted = displayable(physical_plan.as_ref()).indent().to_string();
- let expected = {
- vec![
- "ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, min1@2 as min1, min2@3 as min2, max1@4 as max1, max2@5 as max2, count1@6 as count1, count2@7 as count2, avg1@8 as avg1, avg2@9 as avg2]",
- " GlobalLimitExec: skip=0, fetch=5",
- " SortExec: fetch=5, expr=[inc_col@10 ASC NULLS LAST]",
- " ProjectionExec: expr=[SUM(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING@7 as sum1, SUM(annotated_data.inc_col) ORDER BY [annotated_data.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING@2 as sum2, MIN(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING@8 as min1, MIN(annotated_data.inc_col) ORDER BY [annotate [...]
- " BoundedWindowAggExec: wdw=[SUM(annotated_data.inc_col): Ok(Field { name: \"SUM(annotated_data.inc_col)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: Following(Int32(5)) }, MIN(annotated_data.inc_col): Ok(Field { name: \"MIN(annotated_data.inc_col)\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), fr [...]
- " BoundedWindowAggExec: wdw=[SUM(annotated_data.inc_col): Ok(Field { name: \"SUM(annotated_data.inc_col)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: Following(Int32(3)) }, MIN(annotated_data.inc_col): Ok(Field { name: \"MIN(annotated_data.inc_col)\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), [...]
- ]
- };
-
- let actual: Vec<&str> = formatted.trim().lines().collect();
- let actual_len = actual.len();
- let actual_trim_last = &actual[..actual_len - 1];
- assert_eq!(
- expected, actual_trim_last,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
-
- let actual = execute_to_batches(&ctx, sql).await;
- let expected = vec![
- "+------+------+------+------+------+------+--------+--------+-------------------+-------------------+",
- "| sum1 | sum2 | min1 | min2 | max1 | max2 | count1 | count2 | avg1 | avg2 |",
- "+------+------+------+------+------+------+--------+--------+-------------------+-------------------+",
- "| 16 | 6 | 1 | 1 | 10 | 5 | 3 | 2 | 5.333333333333333 | 3.0 |",
- "| 16 | 6 | 1 | 1 | 10 | 5 | 3 | 2 | 5.333333333333333 | 3.0 |",
- "| 51 | 16 | 1 | 1 | 20 | 10 | 5 | 3 | 10.2 | 5.333333333333333 |",
- "| 72 | 72 | 1 | 1 | 21 | 21 | 6 | 6 | 12.0 | 12.0 |",
- "| 72 | 72 | 1 | 1 | 21 | 21 | 6 | 6 | 12.0 | 12.0 |",
- "+------+------+------+------+------+------+--------+--------+-------------------+-------------------+",
- ];
- assert_batches_eq!(expected, &actual);
- Ok(())
- }
-
- #[tokio::test]
- async fn test_source_sorted_unbounded_preceding_builtin() -> Result<()> {
- let tmpdir = TempDir::new().unwrap();
- let session_config = SessionConfig::new().with_target_partitions(1);
- let ctx = get_test_context(&tmpdir, false, session_config).await?;
-
- let sql = "SELECT
- FIRST_VALUE(inc_col) OVER(ORDER BY ts ASC ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING) as first_value1,
- FIRST_VALUE(inc_col) OVER(ORDER BY ts DESC ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING) as first_value2,
- LAST_VALUE(inc_col) OVER(ORDER BY ts ASC ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING) as last_value1,
- LAST_VALUE(inc_col) OVER(ORDER BY ts DESC ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING) as last_value2,
- NTH_VALUE(inc_col, 2) OVER(ORDER BY ts ASC ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING) as nth_value1
- FROM annotated_data
- ORDER BY inc_col ASC
- LIMIT 5";
-
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let physical_plan = dataframe.create_physical_plan().await?;
- let formatted = displayable(physical_plan.as_ref()).indent().to_string();
- let expected = {
- vec![
- "ProjectionExec: expr=[first_value1@0 as first_value1, first_value2@1 as first_value2, last_value1@2 as last_value1, last_value2@3 as last_value2, nth_value1@4 as nth_value1]",
- " GlobalLimitExec: skip=0, fetch=5",
- " SortExec: fetch=5, expr=[inc_col@5 ASC NULLS LAST]",
- " ProjectionExec: expr=[FIRST_VALUE(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING@4 as first_value1, FIRST_VALUE(annotated_data.inc_col) ORDER BY [annotated_data.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING@2 as first_value2, LAST_VALUE(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING@5 as last_value1, LAS [...]
- " BoundedWindowAggExec: wdw=[FIRST_VALUE(annotated_data.inc_col): Ok(Field { name: \"FIRST_VALUE(annotated_data.inc_col)\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(1)) }, LAST_VALUE(annotated_data.inc_col): Ok(Field { name: \"LAST_VALUE(annotated_data.inc_col)\", data_type: Int32, nullable: true, dict_id: 0, dict_is_orde [...]
- " BoundedWindowAggExec: wdw=[FIRST_VALUE(annotated_data.inc_col): Ok(Field { name: \"FIRST_VALUE(annotated_data.inc_col)\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(3)) }, LAST_VALUE(annotated_data.inc_col): Ok(Field { name: \"LAST_VALUE(annotated_data.inc_col)\", data_type: Int32, nullable: true, dict_id: 0, dict_is_or [...]
- ]
- };
-
- let actual: Vec<&str> = formatted.trim().lines().collect();
- let actual_len = actual.len();
- let actual_trim_last = &actual[..actual_len - 1];
- assert_eq!(
- expected, actual_trim_last,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
-
- let actual = execute_to_batches(&ctx, sql).await;
- let expected = vec![
- "+--------------+--------------+-------------+-------------+------------+",
- "| first_value1 | first_value2 | last_value1 | last_value2 | nth_value1 |",
- "+--------------+--------------+-------------+-------------+------------+",
- "| 1 | 15 | 5 | 1 | 5 |",
- "| 1 | 20 | 10 | 1 | 5 |",
- "| 1 | 21 | 15 | 1 | 5 |",
- "| 1 | 26 | 20 | 1 | 5 |",
- "| 1 | 29 | 21 | 1 | 5 |",
- "+--------------+--------------+-------------+-------------+------------+",
- ];
- assert_batches_eq!(expected, &actual);
- Ok(())
- }
-
- #[tokio::test]
- async fn test_source_sorted_unbounded_source() -> Result<()> {
- let tmpdir = TempDir::new().unwrap();
- let session_config = SessionConfig::new().with_target_partitions(1);
- // Use an unbounded source
- let ctx = get_test_context(&tmpdir, true, session_config).await?;
-
- let sql = "SELECT
- SUM(inc_col) OVER(ORDER BY ts ASC ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING) as sum1,
- SUM(inc_col) OVER(ORDER BY ts DESC ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING) as sum2,
- COUNT(inc_col) OVER(ORDER BY ts ASC ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING) as count1,
- COUNT(inc_col) OVER(ORDER BY ts DESC ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING) as count2
- FROM annotated_data
- ORDER BY ts ASC
- LIMIT 5";
-
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let physical_plan = dataframe.create_physical_plan().await?;
- let formatted = displayable(physical_plan.as_ref()).indent().to_string();
- let expected = {
- vec![
- "ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, count1@2 as count1, count2@3 as count2]",
- " GlobalLimitExec: skip=0, fetch=5",
- " ProjectionExec: expr=[SUM(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING@4 as sum1, SUM(annotated_data.inc_col) ORDER BY [annotated_data.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING@2 as sum2, COUNT(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING@5 as count1, COUNT(annotated_data.inc_col) ORDER BY [annotat [...]
- " BoundedWindowAggExec: wdw=[SUM(annotated_data.inc_col): Ok(Field { name: \"SUM(annotated_data.inc_col)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(1)) }, COUNT(annotated_data.inc_col): Ok(Field { name: \"COUNT(annotated_data.inc_col)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), [...]
- " BoundedWindowAggExec: wdw=[SUM(annotated_data.inc_col): Ok(Field { name: \"SUM(annotated_data.inc_col)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(3)) }, COUNT(annotated_data.inc_col): Ok(Field { name: \"COUNT(annotated_data.inc_col)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} } [...]
- ]
- };
-
- let actual: Vec<&str> = formatted.trim().lines().collect();
- let actual_len = actual.len();
- let actual_trim_last = &actual[..actual_len - 1];
- assert_eq!(
- expected, actual_trim_last,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
-
- let actual = execute_to_batches(&ctx, sql).await;
- let expected = vec![
- "+------+------+--------+--------+",
- "| sum1 | sum2 | count1 | count2 |",
- "+------+------+--------+--------+",
- "| 6 | 31 | 2 | 4 |",
- "| 16 | 51 | 3 | 5 |",
- "| 31 | 72 | 4 | 6 |",
- "| 51 | 98 | 5 | 7 |",
- "| 72 | 127 | 6 | 8 |",
- "+------+------+--------+--------+",
- ];
- assert_batches_eq!(expected, &actual);
- Ok(())
- }
-
- #[tokio::test]
- async fn test_infinite_source_partition_by() -> Result<()> {
- let tmpdir = TempDir::new()?;
- let session_config = SessionConfig::new().with_target_partitions(1);
- // Source is ordered by a, b, c
- let ctx = get_test_context2(&tmpdir, true, session_config).await?;
-
- let sql = "SELECT a, b, c,
- SUM(c) OVER(PARTITION BY a, d ORDER BY b, c ASC ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING) as sum1,
- SUM(c) OVER(PARTITION BY a, d ORDER BY b, c ASC ROWS BETWEEN 1 FOLLOWING AND 5 FOLLOWING) as sum2,
- SUM(c) OVER(PARTITION BY d ORDER BY a, b, c ASC ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING) as sum3,
- SUM(c) OVER(PARTITION BY d ORDER BY a, b, c ASC ROWS BETWEEN 5 PRECEDING AND 1 PRECEDING) as sum4,
- SUM(c) OVER(PARTITION BY a, b ORDER BY c ASC ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING) as sum5,
- SUM(c) OVER(PARTITION BY a, b ORDER BY c ASC ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING) as sum6,
- SUM(c) OVER(PARTITION BY b, a ORDER BY c ASC ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING) as sum7,
- SUM(c) OVER(PARTITION BY b, a ORDER BY c ASC ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING) as sum8,
- SUM(c) OVER(PARTITION BY a, b, d ORDER BY c ASC ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING) as sum9,
- SUM(c) OVER(PARTITION BY a, b, d ORDER BY c ASC ROWS BETWEEN 5 PRECEDING AND CURRENT ROW) as sum10,
- SUM(c) OVER(PARTITION BY b, a, d ORDER BY c ASC ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING) as sum11,
- SUM(c) OVER(PARTITION BY b, a, d ORDER BY c ASC ROWS BETWEEN CURRENT ROW AND 1 FOLLOWING) as sum12
- FROM annotated_data
- LIMIT 5";
-
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let physical_plan = dataframe.create_physical_plan().await?;
- let formatted = displayable(physical_plan.as_ref()).indent().to_string();
- let expected = {
- vec![
- "ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c, SUM(annotated_data.c) PARTITION BY [annotated_data.a, annotated_data.d] ORDER BY [annotated_data.b ASC NULLS LAST, annotated_data.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING@8 as sum1, SUM(annotated_data.c) PARTITION BY [annotated_data.a, annotated_data.d] ORDER BY [annotated_data.b ASC NULLS LAST, annotated_data.c ASC NULLS LAST] ROWS BETWEEN 1 FOLLOWING AND 5 FOLLOWING@9 as sum2, SUM(annotated_data.c) P [...]
- " GlobalLimitExec: skip=0, fetch=5",
- " BoundedWindowAggExec: wdw=[SUM(annotated_data.c): Ok(Field { name: \"SUM(annotated_data.c)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data.c): Ok(Field { name: \"SUM(annotated_data.c)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows [...]
- " BoundedWindowAggExec: wdw=[SUM(annotated_data.c): Ok(Field { name: \"SUM(annotated_data.c)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data.c): Ok(Field { name: \"SUM(annotated_data.c)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Ro [...]
- " BoundedWindowAggExec: wdw=[SUM(annotated_data.c): Ok(Field { name: \"SUM(annotated_data.c)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data.c): Ok(Field { name: \"SUM(annotated_data.c)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: [...]
- " BoundedWindowAggExec: wdw=[SUM(annotated_data.c): Ok(Field { name: \"SUM(annotated_data.c)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data.c): Ok(Field { name: \"SUM(annotated_data.c)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units [...]
- " BoundedWindowAggExec: wdw=[SUM(annotated_data.c): Ok(Field { name: \"SUM(annotated_data.c)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data.c): Ok(Field { name: \"SUM(annotated_data.c)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { uni [...]
- " BoundedWindowAggExec: wdw=[SUM(annotated_data.c): Ok(Field { name: \"SUM(annotated_data.c)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data.c): Ok(Field { name: \"SUM(annotated_data.c)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { u [...]
- ]
- };
-
- let actual: Vec<&str> = formatted.trim().lines().collect();
- let actual_len = actual.len();
- let actual_trim_last = &actual[..actual_len - 1];
- assert_eq!(
- expected, actual_trim_last,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
- let actual = execute_to_batches(&ctx, sql).await;
-
- let expected = vec![
- "+---+---+---+------+------+------+------+------+------+------+------+------+-------+-------+-------+",
- "| a | b | c | sum1 | sum2 | sum3 | sum4 | sum5 | sum6 | sum7 | sum8 | sum9 | sum10 | sum11 | sum12 |",
- "+---+---+---+------+------+------+------+------+------+------+------+------+-------+-------+-------+",
- "| 0 | 0 | 0 | 2 | 53 | 2 | | 1 | 15 | 1 | 15 | 2 | 0 | 2 | 2 |",
- "| 0 | 0 | 1 | 8 | 61 | 8 | | 3 | 21 | 3 | 21 | 8 | 1 | 8 | 8 |",
- "| 0 | 0 | 2 | 5 | 74 | 5 | 0 | 6 | 28 | 6 | 28 | 5 | 2 | 5 | 5 |",
- "| 0 | 0 | 3 | 11 | 96 | 11 | 2 | 10 | 36 | 10 | 36 | 11 | 5 | 11 | 9 |",
- "| 0 | 0 | 4 | 9 | 72 | 9 | | 14 | 45 | 14 | 45 | 9 | 4 | 9 | 9 |",
- "+---+---+---+------+------+------+------+------+------+------+------+------+-------+-------+-------+",
- ];
- assert_batches_eq!(expected, &actual);
- Ok(())
- }
-
- #[tokio::test]
- async fn test_finite_source_partition_by() -> Result<()> {
- let tmpdir = TempDir::new()?;
- let session_config = SessionConfig::new().with_target_partitions(1);
- // Source is ordered by a, b, c
- // Source is finite.
- let ctx = get_test_context2(&tmpdir, false, session_config).await?;
-
- let sql = "SELECT a, b, c,
- SUM(c) OVER(PARTITION BY a, d ORDER BY b, c ASC ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING) as sum1,
- SUM(c) OVER(PARTITION BY a, d ORDER BY b, c ASC ROWS BETWEEN 1 FOLLOWING AND 5 FOLLOWING) as sum2,
- SUM(c) OVER(PARTITION BY d ORDER BY a, b, c ASC ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING) as sum3,
- SUM(c) OVER(PARTITION BY d ORDER BY a, b, c ASC ROWS BETWEEN 5 PRECEDING AND 1 PRECEDING) as sum4,
- SUM(c) OVER(PARTITION BY a, b ORDER BY c ASC ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING) as sum5,
- SUM(c) OVER(PARTITION BY a, b ORDER BY c ASC ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING) as sum6,
- SUM(c) OVER(PARTITION BY b, a ORDER BY c ASC ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING) as sum7,
- SUM(c) OVER(PARTITION BY b, a ORDER BY c ASC ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING) as sum8,
- SUM(c) OVER(PARTITION BY a, b, d ORDER BY c ASC ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING) as sum9,
- SUM(c) OVER(PARTITION BY a, b, d ORDER BY c ASC ROWS BETWEEN 5 PRECEDING AND CURRENT ROW) as sum10,
- SUM(c) OVER(PARTITION BY b, a, d ORDER BY c ASC ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING) as sum11,
- SUM(c) OVER(PARTITION BY b, a, d ORDER BY c ASC ROWS BETWEEN CURRENT ROW AND 1 FOLLOWING) as sum12
- FROM annotated_data
- ORDER BY c
- LIMIT 5";
-
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let physical_plan = dataframe.create_physical_plan().await?;
- let formatted = displayable(physical_plan.as_ref()).indent().to_string();
- // We should produce `BoundedWindowAggExec`s that only works in Sorted mode, since source is finite.
- // To satisfy, requirements for Sorted mode. We should introduce `SortExec`s through physical plan.
- let expected = {
- vec![
- "GlobalLimitExec: skip=0, fetch=5",
- " SortExec: fetch=5, expr=[c@2 ASC NULLS LAST]",
- " ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c, SUM(annotated_data.c) PARTITION BY [annotated_data.a, annotated_data.d] ORDER BY [annotated_data.b ASC NULLS LAST, annotated_data.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING@8 as sum1, SUM(annotated_data.c) PARTITION BY [annotated_data.a, annotated_data.d] ORDER BY [annotated_data.b ASC NULLS LAST, annotated_data.c ASC NULLS LAST] ROWS BETWEEN 1 FOLLOWING AND 5 FOLLOWING@9 as sum2, SUM(annotated_data. [...]
- " BoundedWindowAggExec: wdw=[SUM(annotated_data.c): Ok(Field { name: \"SUM(annotated_data.c)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data.c): Ok(Field { name: \"SUM(annotated_data.c)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Ro [...]
- " SortExec: expr=[d@3 ASC NULLS LAST,a@0 ASC NULLS LAST,b@1 ASC NULLS LAST,c@2 ASC NULLS LAST]",
- " BoundedWindowAggExec: wdw=[SUM(annotated_data.c): Ok(Field { name: \"SUM(annotated_data.c)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data.c): Ok(Field { name: \"SUM(annotated_data.c)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units [...]
- " SortExec: expr=[b@1 ASC NULLS LAST,a@0 ASC NULLS LAST,d@3 ASC NULLS LAST,c@2 ASC NULLS LAST]",
- " BoundedWindowAggExec: wdw=[SUM(annotated_data.c): Ok(Field { name: \"SUM(annotated_data.c)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data.c): Ok(Field { name: \"SUM(annotated_data.c)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { u [...]
- " SortExec: expr=[b@1 ASC NULLS LAST,a@0 ASC NULLS LAST,c@2 ASC NULLS LAST]",
- " BoundedWindowAggExec: wdw=[SUM(annotated_data.c): Ok(Field { name: \"SUM(annotated_data.c)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data.c): Ok(Field { name: \"SUM(annotated_data.c)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame [...]
- " SortExec: expr=[a@0 ASC NULLS LAST,d@3 ASC NULLS LAST,b@1 ASC NULLS LAST,c@2 ASC NULLS LAST]",
- " BoundedWindowAggExec: wdw=[SUM(annotated_data.c): Ok(Field { name: \"SUM(annotated_data.c)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data.c): Ok(Field { name: \"SUM(annotated_data.c)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowF [...]
- " SortExec: expr=[a@0 ASC NULLS LAST,b@1 ASC NULLS LAST,d@3 ASC NULLS LAST,c@2 ASC NULLS LAST]",
- " BoundedWindowAggExec: wdw=[SUM(annotated_data.c): Ok(Field { name: \"SUM(annotated_data.c)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data.c): Ok(Field { name: \"SUM(annotated_data.c)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: Win [...]
- ]
- };
-
- let actual: Vec<&str> = formatted.trim().lines().collect();
- let actual_len = actual.len();
- let actual_trim_last = &actual[..actual_len - 1];
- assert_eq!(
- expected, actual_trim_last,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
- let actual = execute_to_batches(&ctx, sql).await;
-
- let expected = vec![
- "+---+---+---+------+------+------+------+------+------+------+------+------+-------+-------+-------+",
- "| a | b | c | sum1 | sum2 | sum3 | sum4 | sum5 | sum6 | sum7 | sum8 | sum9 | sum10 | sum11 | sum12 |",
- "+---+---+---+------+------+------+------+------+------+------+------+------+-------+-------+-------+",
- "| 0 | 0 | 0 | 2 | 53 | 2 | | 1 | 15 | 1 | 15 | 2 | 0 | 2 | 2 |",
- "| 0 | 0 | 1 | 8 | 61 | 8 | | 3 | 21 | 3 | 21 | 8 | 1 | 8 | 8 |",
- "| 0 | 0 | 2 | 5 | 74 | 5 | 0 | 6 | 28 | 6 | 28 | 5 | 2 | 5 | 5 |",
- "| 0 | 0 | 3 | 11 | 96 | 11 | 2 | 10 | 36 | 10 | 36 | 11 | 5 | 11 | 9 |",
- "| 0 | 0 | 4 | 9 | 72 | 9 | | 14 | 45 | 14 | 45 | 9 | 4 | 9 | 9 |",
- "+---+---+---+------+------+------+------+------+------+------+------+------+-------+-------+-------+",
- ];
- assert_batches_eq!(expected, &actual);
- Ok(())
- }
-}
diff --git a/datafusion/core/tests/sqllogictests/test_files/groupby.slt b/datafusion/core/tests/sqllogictests/test_files/groupby.slt
index 0ecede8806..b565b0dc9a 100644
--- a/datafusion/core/tests/sqllogictests/test_files/groupby.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/groupby.slt
@@ -1921,3 +1921,92 @@ SELECT DISTINCT + col1 FROM tab2 AS cor0 GROUP BY cor0.col1
41
59
61
+
+
+
+# Columns in the table are a,b,c,d. Source is CsvExec which is ordered by
+# a,b,c column. Column a has cardinality 2, column b has cardinality 4.
+# Column c has cardinality 100 (unique entries). Column d has cardinality 5.
+statement ok
+CREATE EXTERNAL TABLE annotated_data_infinite2 (
+ a0 INTEGER,
+ a INTEGER,
+ b INTEGER,
+ c INTEGER,
+ d INTEGER
+)
+STORED AS CSV
+WITH HEADER ROW
+WITH ORDER (a ASC, b ASC, c ASC)
+OPTIONS('infinite_source' 'true')
+LOCATION 'tests/data/window_2.csv';
+
+# test_window_agg_sort
+statement ok
+set datafusion.execution.target_partitions = 1;
+
+# test_source_sorted_groupby
+query TT
+EXPLAIN SELECT a, b,
+ SUM(c) as summation1
+ FROM annotated_data_infinite2
+ GROUP BY b, a
+----
+logical_plan
+Projection: annotated_data_infinite2.a, annotated_data_infinite2.b, SUM(annotated_data_infinite2.c) AS summation1
+ Aggregate: groupBy=[[annotated_data_infinite2.b, annotated_data_infinite2.a]], aggr=[[SUM(annotated_data_infinite2.c)]]
+ TableScan: annotated_data_infinite2 projection=[a, b, c]
+physical_plan
+ProjectionExec: expr=[a@1 as a, b@0 as b, SUM(annotated_data_infinite2.c)@2 as summation1]
+ AggregateExec: mode=Single, gby=[b@1 as b, a@0 as a], aggr=[SUM(annotated_data_infinite2.c)], ordering_mode=FullyOrdered
+ CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], has_header=true
+
+
+query III
+ SELECT a, b,
+ SUM(c) as summation1
+ FROM annotated_data_infinite2
+ GROUP BY b, a
+----
+0 0 300
+0 1 925
+1 2 1550
+1 3 2175
+
+
+# test_source_sorted_groupby2
+
+query TT
+EXPLAIN SELECT a, d,
+ SUM(c) as summation1
+ FROM annotated_data_infinite2
+ GROUP BY d, a
+----
+logical_plan
+Projection: annotated_data_infinite2.a, annotated_data_infinite2.d, SUM(annotated_data_infinite2.c) AS summation1
+ Aggregate: groupBy=[[annotated_data_infinite2.d, annotated_data_infinite2.a]], aggr=[[SUM(annotated_data_infinite2.c)]]
+ TableScan: annotated_data_infinite2 projection=[a, c, d]
+physical_plan
+ProjectionExec: expr=[a@1 as a, d@0 as d, SUM(annotated_data_infinite2.c)@2 as summation1]
+ AggregateExec: mode=Single, gby=[d@2 as d, a@0 as a], aggr=[SUM(annotated_data_infinite2.c)], ordering_mode=PartiallyOrdered
+ CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true
+
+query III
+SELECT a, d,
+ SUM(c) as summation1
+ FROM annotated_data_infinite2
+ GROUP BY d, a
+----
+0 0 292
+0 2 196
+0 1 315
+0 4 164
+0 3 258
+1 0 622
+1 3 299
+1 1 1043
+1 4 913
+1 2 848
+
+statement ok
+drop table annotated_data_infinite2;
diff --git a/datafusion/core/tests/sqllogictests/test_files/select.slt b/datafusion/core/tests/sqllogictests/test_files/select.slt
index 26db6cc059..440e7d8010 100644
--- a/datafusion/core/tests/sqllogictests/test_files/select.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/select.slt
@@ -686,3 +686,51 @@ select "INT32" from case_sensitive_table;
3
4
5
+
+# Columns in the table are a,b,c,d. Source is CsvExec which is ordered by
+# a,b,c column. Column a has cardinality 2, column b has cardinality 4.
+# Column c has cardinality 100 (unique entries). Column d has cardinality 5.
+statement ok
+CREATE EXTERNAL TABLE annotated_data_finite2 (
+ a0 INTEGER,
+ a INTEGER,
+ b INTEGER,
+ c INTEGER,
+ d INTEGER
+)
+STORED AS CSV
+WITH HEADER ROW
+WITH ORDER (a ASC, b ASC, c ASC)
+LOCATION 'tests/data/window_2.csv';
+
+# test_source_projection
+
+
+# Final plan shouldn't include SortExec.
+query TT
+EXPLAIN SELECT a FROM annotated_data_finite2
+ ORDER BY a
+ LIMIT 5
+----
+logical_plan
+Limit: skip=0, fetch=5
+ Sort: annotated_data_finite2.a ASC NULLS LAST, fetch=5
+ TableScan: annotated_data_finite2 projection=[a]
+physical_plan
+GlobalLimitExec: skip=0, fetch=5
+ CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a], output_ordering=[a@0 ASC NULLS LAST], has_header=true
+
+query I
+SELECT a FROM annotated_data_finite2
+ ORDER BY a
+ LIMIT 5
+----
+0
+0
+0
+0
+0
+
+
+statement ok
+drop table annotated_data_finite2;
diff --git a/datafusion/core/tests/sqllogictests/test_files/window.slt b/datafusion/core/tests/sqllogictests/test_files/window.slt
index 2027358f7c..9790871ff2 100644
--- a/datafusion/core/tests/sqllogictests/test_files/window.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/window.slt
@@ -2378,3 +2378,625 @@ select row_number() over (rows between null preceding and current row) from (sel
# invalid window frame. negative as following
statement error DataFusion error: Error during planning: Invalid window frame: frame offsets must be non negative integers
select row_number() over (rows between current row and -1 following) from (select 1 a) x
+
+
+# The following query has type error. We should test the error could be detected
+# from either the logical plan (when `skip_failed_rules` is set to `false`) or
+# the physical plan (when `skip_failed_rules` is set to `true`).
+
+# We should remove the type checking in physical plan after we don't skip
+# the failed optimizing rules by default.
+# (see more in https://github.com/apache/arrow-datafusion/issues/4615)
+
+statement ok
+set datafusion.optimizer.skip_failed_rules = true
+
+# Error is returned from the physical plan.
+query error Cannot cast Utf8\("1 DAY"\) to Int8
+SELECT
+ COUNT(c1) OVER (ORDER BY c2 RANGE BETWEEN '1 DAY' PRECEDING AND '2 DAY' FOLLOWING)
+ FROM aggregate_test_100;
+
+statement ok
+set datafusion.optimizer.skip_failed_rules = true
+
+# Error is returned from the logical plan.
+query error Cannot cast Utf8\("1 DAY"\) to Int8
+SELECT
+ COUNT(c1) OVER (ORDER BY c2 RANGE BETWEEN '1 DAY' PRECEDING AND '2 DAY' FOLLOWING)
+ FROM aggregate_test_100;
+
+
+# 100 rows. Columns in the table are ts, inc_col, desc_col.
+# Source is CsvExec which is ordered by ts column.
+# Normal, non infinite source
+statement ok
+CREATE EXTERNAL TABLE annotated_data_finite (
+ ts INTEGER,
+ inc_col INTEGER,
+ desc_col INTEGER,
+)
+STORED AS CSV
+WITH HEADER ROW
+WITH ORDER (ts ASC)
+LOCATION 'tests/data/window_1.csv'
+;
+
+# 100 rows. Columns in the table are ts, inc_col, desc_col.
+# Source is CsvExec which is ordered by ts column.
+# Infinite source
+statement ok
+CREATE EXTERNAL TABLE annotated_data_infinite (
+ ts INTEGER,
+ inc_col INTEGER,
+ desc_col INTEGER,
+)
+STORED AS CSV
+WITH HEADER ROW
+WITH ORDER (ts ASC)
+OPTIONS('infinite_source' 'true')
+LOCATION 'tests/data/window_1.csv';
+
+# test_source_sorted_aggregate
+
+query TT
+EXPLAIN SELECT
+ SUM(inc_col) OVER(ORDER BY ts RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING) as sum1,
+ SUM(desc_col) OVER(ORDER BY ts RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING) as sum2,
+ SUM(inc_col) OVER(ORDER BY ts ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING) as sum3,
+ MIN(inc_col) OVER(ORDER BY ts RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING) as min1,
+ MIN(desc_col) OVER(ORDER BY ts RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING) as min2,
+ MIN(inc_col) OVER(ORDER BY ts ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING) as min3,
+ MAX(inc_col) OVER(ORDER BY ts RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING) as max1,
+ MAX(desc_col) OVER(ORDER BY ts RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING) as max2,
+ MAX(inc_col) OVER(ORDER BY ts ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING) as max3,
+ COUNT(*) OVER(ORDER BY ts RANGE BETWEEN 4 PRECEDING AND 8 FOLLOWING) as cnt1,
+ COUNT(*) OVER(ORDER BY ts ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING) as cnt2,
+ SUM(inc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING) as sumr1,
+ SUM(desc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 1 PRECEDING AND 8 FOLLOWING) as sumr2,
+ SUM(desc_col) OVER(ORDER BY ts DESC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sumr3,
+ MIN(inc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING) as minr1,
+ MIN(desc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING) as minr2,
+ MIN(inc_col) OVER(ORDER BY ts DESC ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING) as minr3,
+ MAX(inc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING) as maxr1,
+ MAX(desc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING) as maxr2,
+ MAX(inc_col) OVER(ORDER BY ts DESC ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING) as maxr3,
+ COUNT(*) OVER(ORDER BY ts DESC RANGE BETWEEN 6 PRECEDING AND 2 FOLLOWING) as cntr1,
+ COUNT(*) OVER(ORDER BY ts DESC ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING) as cntr2,
+ SUM(desc_col) OVER(ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING) as sum4,
+ COUNT(*) OVER(ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING) as cnt3
+ FROM annotated_data_finite
+ ORDER BY inc_col DESC
+ LIMIT 5;
+----
+logical_plan
+Projection: sum1, sum2, sum3, min1, min2, min3, max1, max2, max3, cnt1, cnt2, sumr1, sumr2, sumr3, minr1, minr2, minr3, maxr1, maxr2, maxr3, cntr1, cntr2, sum4, cnt3
+ Limit: skip=0, fetch=5
+ Sort: annotated_data_finite.inc_col DESC NULLS FIRST, fetch=5
+ Projection: SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING AS sum1, SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING AS sum2, SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING AS sum3, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_f [...]
+ WindowAggr: windowExpr=[[SUM(annotated_data_finite.desc_col) ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING, COUNT(UInt8(1)) ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING]]
+ Projection: annotated_data_finite.inc_col, annotated_data_finite.desc_col, SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING, SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 8 FOLLOWING, SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING, MIN(annotated [...]
+ WindowAggr: windowExpr=[[SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NU [...]
+ WindowAggr: windowExpr=[[SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING, SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 8 FOLLOWING, SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts [...]
+ TableScan: annotated_data_finite projection=[ts, inc_col, desc_col]
+physical_plan
+ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, sum3@2 as sum3, min1@3 as min1, min2@4 as min2, min3@5 as min3, max1@6 as max1, max2@7 as max2, max3@8 as max3, cnt1@9 as cnt1, cnt2@10 as cnt2, sumr1@11 as sumr1, sumr2@12 as sumr2, sumr3@13 as sumr3, minr1@14 as minr1, minr2@15 as minr2, minr3@16 as minr3, maxr1@17 as maxr1, maxr2@18 as maxr2, maxr3@19 as maxr3, cntr1@20 as cntr1, cntr2@21 as cntr2, sum4@22 as sum4, cnt3@23 as cnt3]
+ GlobalLimitExec: skip=0, fetch=5
+ SortExec: fetch=5, expr=[inc_col@24 DESC]
+ ProjectionExec: expr=[SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@13 as sum1, SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING@14 as sum2, SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING@15 as sum3, MIN(annotated_data_finite.inc_col) ORDER B [...]
+ BoundedWindowAggExec: wdw=[SUM(annotated_data_finite.desc_col): Ok(Field { name: "SUM(annotated_data_finite.desc_col)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(8)), end_bound: Following(UInt64(1)) }, COUNT(UInt8(1)): Ok(Field { name: "COUNT(UInt8(1))", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, [...]
+ ProjectionExec: expr=[inc_col@1 as inc_col, desc_col@2 as desc_col, SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING@3 as SUM(annotated_data_finite.inc_col), SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 8 FOLLOWING@4 as SUM(annotated_data_finite.desc_col), SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts [...]
+ BoundedWindowAggExec: wdw=[SUM(annotated_data_finite.inc_col): Ok(Field { name: "SUM(annotated_data_finite.inc_col)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(10)), end_bound: Following(Int32(1)) }, SUM(annotated_data_finite.desc_col): Ok(Field { name: "SUM(annotated_data_finite.desc_col)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), f [...]
+ BoundedWindowAggExec: wdw=[SUM(annotated_data_finite.inc_col): Ok(Field { name: "SUM(annotated_data_finite.inc_col)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(4)), end_bound: Following(Int32(1)) }, SUM(annotated_data_finite.desc_col): Ok(Field { name: "SUM(annotated_data_finite.desc_col)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), [...]
+ CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_1.csv]]}, projection=[ts, inc_col, desc_col], output_ordering=[ts@0 ASC NULLS LAST], has_header=true
+
+query IIIIIIIIIIIIIIIIIIIIIIII
+SELECT
+ SUM(inc_col) OVER(ORDER BY ts RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING) as sum1,
+ SUM(desc_col) OVER(ORDER BY ts RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING) as sum2,
+ SUM(inc_col) OVER(ORDER BY ts ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING) as sum3,
+ MIN(inc_col) OVER(ORDER BY ts RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING) as min1,
+ MIN(desc_col) OVER(ORDER BY ts RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING) as min2,
+ MIN(inc_col) OVER(ORDER BY ts ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING) as min3,
+ MAX(inc_col) OVER(ORDER BY ts RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING) as max1,
+ MAX(desc_col) OVER(ORDER BY ts RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING) as max2,
+ MAX(inc_col) OVER(ORDER BY ts ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING) as max3,
+ COUNT(*) OVER(ORDER BY ts RANGE BETWEEN 4 PRECEDING AND 8 FOLLOWING) as cnt1,
+ COUNT(*) OVER(ORDER BY ts ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING) as cnt2,
+ SUM(inc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING) as sumr1,
+ SUM(desc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 1 PRECEDING AND 8 FOLLOWING) as sumr2,
+ SUM(desc_col) OVER(ORDER BY ts DESC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sumr3,
+ MIN(inc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING) as minr1,
+ MIN(desc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING) as minr2,
+ MIN(inc_col) OVER(ORDER BY ts DESC ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING) as minr3,
+ MAX(inc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING) as maxr1,
+ MAX(desc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING) as maxr2,
+ MAX(inc_col) OVER(ORDER BY ts DESC ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING) as maxr3,
+ COUNT(*) OVER(ORDER BY ts DESC RANGE BETWEEN 6 PRECEDING AND 2 FOLLOWING) as cntr1,
+ COUNT(*) OVER(ORDER BY ts DESC ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING) as cntr2,
+ SUM(desc_col) OVER(ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING) as sum4,
+ COUNT(*) OVER(ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING) as cnt3
+ FROM annotated_data_finite
+ ORDER BY inc_col DESC
+ LIMIT 5;
+----
+1482 -631 606 289 -213 301 305 -208 305 3 9 902 -834 -1231 301 -213 269 305 -210 305 3 2 -1797 9
+1482 -631 902 289 -213 296 305 -208 305 3 10 902 -834 -1424 301 -213 266 305 -210 305 3 3 -1978 10
+876 -411 1193 289 -208 291 296 -203 305 4 10 587 -612 -1400 296 -213 261 305 -208 301 3 4 -1941 10
+866 -404 1482 286 -203 289 291 -201 305 5 10 580 -600 -1374 291 -208 259 305 -203 296 4 5 -1903 10
+1411 -397 1768 275 -201 286 289 -196 305 4 10 575 -590 -1347 289 -203 254 305 -201 291 2 6 -1863 10
+
+
+
+# test_source_sorted_builtin
+query TT
+EXPLAIN SELECT
+ FIRST_VALUE(inc_col) OVER(ORDER BY ts RANGE BETWEEN 10 PRECEDING and 1 FOLLOWING) as fv1,
+ FIRST_VALUE(inc_col) OVER(ORDER BY ts ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as fv2,
+ LAST_VALUE(inc_col) OVER(ORDER BY ts RANGE BETWEEN 10 PRECEDING and 1 FOLLOWING) as lv1,
+ LAST_VALUE(inc_col) OVER(ORDER BY ts ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as lv2,
+ NTH_VALUE(inc_col, 5) OVER(ORDER BY ts RANGE BETWEEN 10 PRECEDING and 1 FOLLOWING) as nv1,
+ NTH_VALUE(inc_col, 5) OVER(ORDER BY ts ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as nv2,
+ ROW_NUMBER() OVER(ORDER BY ts RANGE BETWEEN 1 PRECEDING and 10 FOLLOWING) AS rn1,
+ ROW_NUMBER() OVER(ORDER BY ts ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as rn2,
+ RANK() OVER(ORDER BY ts RANGE BETWEEN 1 PRECEDING and 10 FOLLOWING) AS rank1,
+ RANK() OVER(ORDER BY ts ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as rank2,
+ DENSE_RANK() OVER(ORDER BY ts RANGE BETWEEN 1 PRECEDING and 10 FOLLOWING) AS dense_rank1,
+ DENSE_RANK() OVER(ORDER BY ts ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as dense_rank2,
+ LAG(inc_col, 1, 1001) OVER(ORDER BY ts RANGE BETWEEN 1 PRECEDING and 10 FOLLOWING) AS lag1,
+ LAG(inc_col, 2, 1002) OVER(ORDER BY ts ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as lag2,
+ LEAD(inc_col, -1, 1001) OVER(ORDER BY ts RANGE BETWEEN 1 PRECEDING and 10 FOLLOWING) AS lead1,
+ LEAD(inc_col, 4, 1004) OVER(ORDER BY ts ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as lead2,
+ FIRST_VALUE(inc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 10 PRECEDING and 1 FOLLOWING) as fvr1,
+ FIRST_VALUE(inc_col) OVER(ORDER BY ts DESC ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as fvr2,
+ LAST_VALUE(inc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 10 PRECEDING and 1 FOLLOWING) as lvr1,
+ LAST_VALUE(inc_col) OVER(ORDER BY ts DESC ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as lvr2,
+ LAG(inc_col, 1, 1001) OVER(ORDER BY ts DESC RANGE BETWEEN 1 PRECEDING and 10 FOLLOWING) AS lagr1,
+ LAG(inc_col, 2, 1002) OVER(ORDER BY ts DESC ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as lagr2,
+ LEAD(inc_col, -1, 1001) OVER(ORDER BY ts DESC RANGE BETWEEN 1 PRECEDING and 10 FOLLOWING) AS leadr1,
+ LEAD(inc_col, 4, 1004) OVER(ORDER BY ts DESC ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as leadr2
+ FROM annotated_data_finite
+ ORDER BY ts DESC
+ LIMIT 5;
+----
+logical_plan
+Projection: fv1, fv2, lv1, lv2, nv1, nv2, rn1, rn2, rank1, rank2, dense_rank1, dense_rank2, lag1, lag2, lead1, lead2, fvr1, fvr2, lvr1, lvr2, lagr1, lagr2, leadr1, leadr2
+ Limit: skip=0, fetch=5
+ Sort: annotated_data_finite.ts DESC NULLS FIRST, fetch=5
+ Projection: FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING AS fv1, FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING AS fv2, LAST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING AS lv1, LAST_VALUE(annotated_data_finite.inc_col) [...]
+ WindowAggr: windowExpr=[[FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING, LAST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, LAST_VALUE(annotated_data_finite.inc_col) ORDER [...]
+ WindowAggr: windowExpr=[[FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING, LAST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, LAST_VALUE(annotated_data_finite.inc_co [...]
+ TableScan: annotated_data_finite projection=[ts, inc_col]
+physical_plan
+ProjectionExec: expr=[fv1@0 as fv1, fv2@1 as fv2, lv1@2 as lv1, lv2@3 as lv2, nv1@4 as nv1, nv2@5 as nv2, rn1@6 as rn1, rn2@7 as rn2, rank1@8 as rank1, rank2@9 as rank2, dense_rank1@10 as dense_rank1, dense_rank2@11 as dense_rank2, lag1@12 as lag1, lag2@13 as lag2, lead1@14 as lead1, lead2@15 as lead2, fvr1@16 as fvr1, fvr2@17 as fvr2, lvr1@18 as lvr1, lvr2@19 as lvr2, lagr1@20 as lagr1, lagr2@21 as lagr2, leadr1@22 as leadr1, leadr2@23 as leadr2]
+ GlobalLimitExec: skip=0, fetch=5
+ SortExec: fetch=5, expr=[ts@24 DESC]
+ ProjectionExec: expr=[FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@10 as fv1, FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@11 as fv2, LAST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@12 as lv1, LAST_VALUE(annotated_d [...]
+ BoundedWindowAggExec: wdw=[FIRST_VALUE(annotated_data_finite.inc_col): Ok(Field { name: "FIRST_VALUE(annotated_data_finite.inc_col)", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(10)), end_bound: Following(Int32(1)) }, FIRST_VALUE(annotated_data_finite.inc_col): Ok(Field { name: "FIRST_VALUE(annotated_data_finite.inc_col)", data_type: Int32, nullable: true, dict_id: 0, dict_i [...]
+ BoundedWindowAggExec: wdw=[FIRST_VALUE(annotated_data_finite.inc_col): Ok(Field { name: "FIRST_VALUE(annotated_data_finite.inc_col)", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(1)), end_bound: Following(Int32(10)) }, FIRST_VALUE(annotated_data_finite.inc_col): Ok(Field { name: "FIRST_VALUE(annotated_data_finite.inc_col)", data_type: Int32, nullable: true, dict_id: 0, dict [...]
+ CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_1.csv]]}, projection=[ts, inc_col], output_ordering=[ts@0 ASC NULLS LAST], has_header=true
+
+query IIIIIIIIIIIIIIIIIIIIIIII
+SELECT
+ FIRST_VALUE(inc_col) OVER(ORDER BY ts RANGE BETWEEN 10 PRECEDING and 1 FOLLOWING) as fv1,
+ FIRST_VALUE(inc_col) OVER(ORDER BY ts ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as fv2,
+ LAST_VALUE(inc_col) OVER(ORDER BY ts RANGE BETWEEN 10 PRECEDING and 1 FOLLOWING) as lv1,
+ LAST_VALUE(inc_col) OVER(ORDER BY ts ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as lv2,
+ NTH_VALUE(inc_col, 5) OVER(ORDER BY ts RANGE BETWEEN 10 PRECEDING and 1 FOLLOWING) as nv1,
+ NTH_VALUE(inc_col, 5) OVER(ORDER BY ts ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as nv2,
+ ROW_NUMBER() OVER(ORDER BY ts RANGE BETWEEN 1 PRECEDING and 10 FOLLOWING) AS rn1,
+ ROW_NUMBER() OVER(ORDER BY ts ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as rn2,
+ RANK() OVER(ORDER BY ts RANGE BETWEEN 1 PRECEDING and 10 FOLLOWING) AS rank1,
+ RANK() OVER(ORDER BY ts ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as rank2,
+ DENSE_RANK() OVER(ORDER BY ts RANGE BETWEEN 1 PRECEDING and 10 FOLLOWING) AS dense_rank1,
+ DENSE_RANK() OVER(ORDER BY ts ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as dense_rank2,
+ LAG(inc_col, 1, 1001) OVER(ORDER BY ts RANGE BETWEEN 1 PRECEDING and 10 FOLLOWING) AS lag1,
+ LAG(inc_col, 2, 1002) OVER(ORDER BY ts ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as lag2,
+ LEAD(inc_col, -1, 1001) OVER(ORDER BY ts RANGE BETWEEN 1 PRECEDING and 10 FOLLOWING) AS lead1,
+ LEAD(inc_col, 4, 1004) OVER(ORDER BY ts ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as lead2,
+ FIRST_VALUE(inc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 10 PRECEDING and 1 FOLLOWING) as fvr1,
+ FIRST_VALUE(inc_col) OVER(ORDER BY ts DESC ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as fvr2,
+ LAST_VALUE(inc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 10 PRECEDING and 1 FOLLOWING) as lvr1,
+ LAST_VALUE(inc_col) OVER(ORDER BY ts DESC ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as lvr2,
+ LAG(inc_col, 1, 1001) OVER(ORDER BY ts DESC RANGE BETWEEN 1 PRECEDING and 10 FOLLOWING) AS lagr1,
+ LAG(inc_col, 2, 1002) OVER(ORDER BY ts DESC ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as lagr2,
+ LEAD(inc_col, -1, 1001) OVER(ORDER BY ts DESC RANGE BETWEEN 1 PRECEDING and 10 FOLLOWING) AS leadr1,
+ LEAD(inc_col, 4, 1004) OVER(ORDER BY ts DESC ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as leadr2
+ FROM annotated_data_finite
+ ORDER BY ts DESC
+ LIMIT 5;
+----
+289 269 305 305 305 283 100 100 99 99 86 86 301 296 301 1004 305 305 301 301 1001 1002 1001 289
+289 266 305 305 305 278 99 99 99 99 86 86 296 291 296 1004 305 305 301 296 305 1002 305 286
+289 261 296 301 NULL 275 98 98 98 98 85 85 291 289 291 1004 305 305 296 291 301 305 301 283
+286 259 291 296 NULL 272 97 97 97 97 84 84 289 286 289 1004 305 305 291 289 296 301 296 278
+275 254 289 291 289 269 96 96 96 96 83 83 286 283 286 305 305 305 289 286 291 296 291 275
+
+
+# test_source_sorted_unbounded_preceding
+
+query TT
+EXPLAIN SELECT
+ SUM(inc_col) OVER(ORDER BY ts ASC RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING) as sum1,
+ SUM(inc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING) as sum2,
+ MIN(inc_col) OVER(ORDER BY ts ASC RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING) as min1,
+ MIN(inc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING) as min2,
+ MAX(inc_col) OVER(ORDER BY ts ASC RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING) as max1,
+ MAX(inc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING) as max2,
+ COUNT(inc_col) OVER(ORDER BY ts ASC RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING) as count1,
+ COUNT(inc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING) as count2,
+ AVG(inc_col) OVER(ORDER BY ts ASC RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING) as avg1,
+ AVG(inc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING) as avg2
+ FROM annotated_data_finite
+ ORDER BY inc_col ASC
+ LIMIT 5
+----
+logical_plan
+Projection: sum1, sum2, min1, min2, max1, max2, count1, count2, avg1, avg2
+ Limit: skip=0, fetch=5
+ Sort: annotated_data_finite.inc_col ASC NULLS LAST, fetch=5
+ Projection: SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING AS sum1, SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING AS sum2, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING AS min1, MIN(annotated_data_finite.inc_col) OR [...]
+ WindowAggr: windowExpr=[[SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING, MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING, COUNT(annotated_data_finite.inc_col) ORDER BY [a [...]
+ WindowAggr: windowExpr=[[SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING, MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING, COUNT(annotated_data_finite.inc_col) ORD [...]
+ TableScan: annotated_data_finite projection=[ts, inc_col]
+physical_plan
+ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, min1@2 as min1, min2@3 as min2, max1@4 as max1, max2@5 as max2, count1@6 as count1, count2@7 as count2, avg1@8 as avg1, avg2@9 as avg2]
+ GlobalLimitExec: skip=0, fetch=5
+ SortExec: fetch=5, expr=[inc_col@10 ASC NULLS LAST]
+ ProjectionExec: expr=[SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING@7 as sum1, SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING@2 as sum2, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING@8 as min1, MIN(annotated_data_fi [...]
+ BoundedWindowAggExec: wdw=[SUM(annotated_data_finite.inc_col): Ok(Field { name: "SUM(annotated_data_finite.inc_col)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: Following(Int32(5)) }, MIN(annotated_data_finite.inc_col): Ok(Field { name: "MIN(annotated_data_finite.inc_col)", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} [...]
+ BoundedWindowAggExec: wdw=[SUM(annotated_data_finite.inc_col): Ok(Field { name: "SUM(annotated_data_finite.inc_col)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: Following(Int32(3)) }, MIN(annotated_data_finite.inc_col): Ok(Field { name: "MIN(annotated_data_finite.inc_col)", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: [...]
+ CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_1.csv]]}, projection=[ts, inc_col], output_ordering=[ts@0 ASC NULLS LAST], has_header=true
+
+query IIIIIIIIRR
+SELECT
+ SUM(inc_col) OVER(ORDER BY ts ASC RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING) as sum1,
+ SUM(inc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING) as sum2,
+ MIN(inc_col) OVER(ORDER BY ts ASC RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING) as min1,
+ MIN(inc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING) as min2,
+ MAX(inc_col) OVER(ORDER BY ts ASC RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING) as max1,
+ MAX(inc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING) as max2,
+ COUNT(inc_col) OVER(ORDER BY ts ASC RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING) as count1,
+ COUNT(inc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING) as count2,
+ AVG(inc_col) OVER(ORDER BY ts ASC RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING) as avg1,
+ AVG(inc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING) as avg2
+ FROM annotated_data_finite
+ ORDER BY inc_col ASC
+ LIMIT 5
+----
+16 6 1 1 10 5 3 2 5.333333333333 3
+16 6 1 1 10 5 3 2 5.333333333333 3
+51 16 1 1 20 10 5 3 10.2 5.333333333333
+72 72 1 1 21 21 6 6 12 12
+72 72 1 1 21 21 6 6 12 12
+
+
+# test_source_sorted_unbounded_preceding_builtin
+
+query TT
+EXPLAIN SELECT
+ FIRST_VALUE(inc_col) OVER(ORDER BY ts ASC ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING) as first_value1,
+ FIRST_VALUE(inc_col) OVER(ORDER BY ts DESC ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING) as first_value2,
+ LAST_VALUE(inc_col) OVER(ORDER BY ts ASC ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING) as last_value1,
+ LAST_VALUE(inc_col) OVER(ORDER BY ts DESC ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING) as last_value2,
+ NTH_VALUE(inc_col, 2) OVER(ORDER BY ts ASC ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING) as nth_value1
+ FROM annotated_data_finite
+ ORDER BY inc_col ASC
+ LIMIT 5
+----
+logical_plan
+Projection: first_value1, first_value2, last_value1, last_value2, nth_value1
+ Limit: skip=0, fetch=5
+ Sort: annotated_data_finite.inc_col ASC NULLS LAST, fetch=5
+ Projection: FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING AS first_value1, FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING AS first_value2, LAST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING AS last_va [...]
+ WindowAggr: windowExpr=[[FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING, LAST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING, NTH_VALUE(annotated_data_finite.inc_col, Int64(2)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING]]
+ WindowAggr: windowExpr=[[FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING, LAST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING]]
+ TableScan: annotated_data_finite projection=[ts, inc_col]
+physical_plan
+ProjectionExec: expr=[first_value1@0 as first_value1, first_value2@1 as first_value2, last_value1@2 as last_value1, last_value2@3 as last_value2, nth_value1@4 as nth_value1]
+ GlobalLimitExec: skip=0, fetch=5
+ SortExec: fetch=5, expr=[inc_col@5 ASC NULLS LAST]
+ ProjectionExec: expr=[FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING@4 as first_value1, FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING@2 as first_value2, LAST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOW [...]
+ BoundedWindowAggExec: wdw=[FIRST_VALUE(annotated_data_finite.inc_col): Ok(Field { name: "FIRST_VALUE(annotated_data_finite.inc_col)", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(1)) }, LAST_VALUE(annotated_data_finite.inc_col): Ok(Field { name: "LAST_VALUE(annotated_data_finite.inc_col)", data_type: Int32, nullable: true, dict_id: 0, dict_ [...]
+ BoundedWindowAggExec: wdw=[FIRST_VALUE(annotated_data_finite.inc_col): Ok(Field { name: "FIRST_VALUE(annotated_data_finite.inc_col)", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(3)) }, LAST_VALUE(annotated_data_finite.inc_col): Ok(Field { name: "LAST_VALUE(annotated_data_finite.inc_col)", data_type: Int32, nullable: true, dict_id: 0, dic [...]
+ CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_1.csv]]}, projection=[ts, inc_col], output_ordering=[ts@0 ASC NULLS LAST], has_header=true
+
+query IIIII
+SELECT
+ FIRST_VALUE(inc_col) OVER(ORDER BY ts ASC ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING) as first_value1,
+ FIRST_VALUE(inc_col) OVER(ORDER BY ts DESC ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING) as first_value2,
+ LAST_VALUE(inc_col) OVER(ORDER BY ts ASC ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING) as last_value1,
+ LAST_VALUE(inc_col) OVER(ORDER BY ts DESC ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING) as last_value2,
+ NTH_VALUE(inc_col, 2) OVER(ORDER BY ts ASC ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING) as nth_value1
+ FROM annotated_data_finite
+ ORDER BY inc_col ASC
+ LIMIT 5
+----
+1 15 5 1 5
+1 20 10 1 5
+1 21 15 1 5
+1 26 20 1 5
+1 29 21 1 5
+
+
+# test_source_sorted_unbounded_source
+query TT
+EXPLAIN SELECT
+ SUM(inc_col) OVER(ORDER BY ts ASC ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING) as sum1,
+ SUM(inc_col) OVER(ORDER BY ts DESC ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING) as sum2,
+ COUNT(inc_col) OVER(ORDER BY ts ASC ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING) as count1,
+ COUNT(inc_col) OVER(ORDER BY ts DESC ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING) as count2
+ FROM annotated_data_infinite
+ ORDER BY ts ASC
+ LIMIT 5
+----
+logical_plan
+Projection: sum1, sum2, count1, count2
+ Limit: skip=0, fetch=5
+ Sort: annotated_data_infinite.ts ASC NULLS LAST, fetch=5
+ Projection: SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING AS sum1, SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING AS sum2, COUNT(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING AS count1, COUNT(annotated_data_inf [...]
+ WindowAggr: windowExpr=[[SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING, COUNT(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING]]
+ WindowAggr: windowExpr=[[SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING, COUNT(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING]]
+ TableScan: annotated_data_infinite projection=[ts, inc_col]
+physical_plan
+ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, count1@2 as count1, count2@3 as count2]
+ GlobalLimitExec: skip=0, fetch=5
+ ProjectionExec: expr=[SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING@4 as sum1, SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING@2 as sum2, COUNT(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING@5 as count1, COUNT(anno [...]
+ BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite.inc_col): Ok(Field { name: "SUM(annotated_data_infinite.inc_col)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(1)) }, COUNT(annotated_data_infinite.inc_col): Ok(Field { name: "COUNT(annotated_data_infinite.inc_col)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, m [...]
+ BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite.inc_col): Ok(Field { name: "SUM(annotated_data_infinite.inc_col)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(3)) }, COUNT(annotated_data_infinite.inc_col): Ok(Field { name: "COUNT(annotated_data_infinite.inc_col)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, [...]
+ CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_1.csv]]}, projection=[ts, inc_col], infinite_source=true, output_ordering=[ts@0 ASC NULLS LAST], has_header=true
+
+
+query IIII
+SELECT
+ SUM(inc_col) OVER(ORDER BY ts ASC ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING) as sum1,
+ SUM(inc_col) OVER(ORDER BY ts DESC ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING) as sum2,
+ COUNT(inc_col) OVER(ORDER BY ts ASC ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING) as count1,
+ COUNT(inc_col) OVER(ORDER BY ts DESC ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING) as count2
+ FROM annotated_data_infinite
+ ORDER BY ts ASC
+ LIMIT 5
+----
+6 31 2 4
+16 51 3 5
+31 72 4 6
+51 98 5 7
+72 127 6 8
+
+
+# test_source_sorted_unbounded_source
+
+query TT
+EXPLAIN SELECT
+ SUM(inc_col) OVER(ORDER BY ts ASC ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING) as sum1,
+ SUM(inc_col) OVER(ORDER BY ts DESC ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING) as sum2,
+ COUNT(inc_col) OVER(ORDER BY ts ASC ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING) as count1,
+ COUNT(inc_col) OVER(ORDER BY ts DESC ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING) as count2
+ FROM annotated_data_infinite
+ ORDER BY ts ASC
+ LIMIT 5
+----
+logical_plan
+Projection: sum1, sum2, count1, count2
+ Limit: skip=0, fetch=5
+ Sort: annotated_data_infinite.ts ASC NULLS LAST, fetch=5
+ Projection: SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING AS sum1, SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING AS sum2, COUNT(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING AS count1, COUNT(annotated_data_inf [...]
+ WindowAggr: windowExpr=[[SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING, COUNT(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING]]
+ WindowAggr: windowExpr=[[SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING, COUNT(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING]]
+ TableScan: annotated_data_infinite projection=[ts, inc_col]
+physical_plan
+ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, count1@2 as count1, count2@3 as count2]
+ GlobalLimitExec: skip=0, fetch=5
+ ProjectionExec: expr=[SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING@4 as sum1, SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING@2 as sum2, COUNT(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING@5 as count1, COUNT(anno [...]
+ BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite.inc_col): Ok(Field { name: "SUM(annotated_data_infinite.inc_col)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(1)) }, COUNT(annotated_data_infinite.inc_col): Ok(Field { name: "COUNT(annotated_data_infinite.inc_col)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, m [...]
+ BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite.inc_col): Ok(Field { name: "SUM(annotated_data_infinite.inc_col)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(3)) }, COUNT(annotated_data_infinite.inc_col): Ok(Field { name: "COUNT(annotated_data_infinite.inc_col)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, [...]
+ CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_1.csv]]}, projection=[ts, inc_col], infinite_source=true, output_ordering=[ts@0 ASC NULLS LAST], has_header=true
+
+
+query IIII
+SELECT
+ SUM(inc_col) OVER(ORDER BY ts ASC ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING) as sum1,
+ SUM(inc_col) OVER(ORDER BY ts DESC ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING) as sum2,
+ COUNT(inc_col) OVER(ORDER BY ts ASC ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING) as count1,
+ COUNT(inc_col) OVER(ORDER BY ts DESC ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING) as count2
+ FROM annotated_data_infinite
+ ORDER BY ts ASC
+ LIMIT 5
+----
+6 31 2 4
+16 51 3 5
+31 72 4 6
+51 98 5 7
+72 127 6 8
+
+
+
+
+statement ok
+drop table annotated_data_finite
+
+
+statement ok
+drop table annotated_data_infinite
+
+
+
+# Columns in the table are a,b,c,d. Source is CsvExec which is ordered by
+# a,b,c column. Column a has cardinality 2, column b has cardinality 4.
+# Column c has cardinality 100 (unique entries). Column d has cardinality 5.
+statement ok
+CREATE EXTERNAL TABLE annotated_data_finite2 (
+ a0 INTEGER,
+ a INTEGER,
+ b INTEGER,
+ c INTEGER,
+ d INTEGER
+)
+STORED AS CSV
+WITH HEADER ROW
+WITH ORDER (a ASC, b ASC, c ASC)
+LOCATION 'tests/data/window_2.csv';
+
+# Columns in the table are a,b,c,d. Source is CsvExec which is ordered by
+# a,b,c column. Column a has cardinality 2, column b has cardinality 4.
+# Column c has cardinality 100 (unique entries). Column d has cardinality 5.
+statement ok
+CREATE EXTERNAL TABLE annotated_data_infinite2 (
+ a0 INTEGER,
+ a INTEGER,
+ b INTEGER,
+ c INTEGER,
+ d INTEGER
+)
+STORED AS CSV
+WITH HEADER ROW
+WITH ORDER (a ASC, b ASC, c ASC)
+OPTIONS('infinite_source' 'true')
+LOCATION 'tests/data/window_2.csv';
+
+
+# test_infinite_source_partition_by
+
+query TT
+EXPLAIN SELECT a, b, c,
+ SUM(c) OVER(PARTITION BY a, d ORDER BY b, c ASC ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING) as sum1,
+ SUM(c) OVER(PARTITION BY a, d ORDER BY b, c ASC ROWS BETWEEN 1 FOLLOWING AND 5 FOLLOWING) as sum2,
+ SUM(c) OVER(PARTITION BY d ORDER BY a, b, c ASC ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING) as sum3,
+ SUM(c) OVER(PARTITION BY d ORDER BY a, b, c ASC ROWS BETWEEN 5 PRECEDING AND 1 PRECEDING) as sum4,
+ SUM(c) OVER(PARTITION BY a, b ORDER BY c ASC ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING) as sum5,
+ SUM(c) OVER(PARTITION BY a, b ORDER BY c ASC ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING) as sum6,
+ SUM(c) OVER(PARTITION BY b, a ORDER BY c ASC ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING) as sum7,
+ SUM(c) OVER(PARTITION BY b, a ORDER BY c ASC ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING) as sum8,
+ SUM(c) OVER(PARTITION BY a, b, d ORDER BY c ASC ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING) as sum9,
+ SUM(c) OVER(PARTITION BY a, b, d ORDER BY c ASC ROWS BETWEEN 5 PRECEDING AND CURRENT ROW) as sum10,
+ SUM(c) OVER(PARTITION BY b, a, d ORDER BY c ASC ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING) as sum11,
+ SUM(c) OVER(PARTITION BY b, a, d ORDER BY c ASC ROWS BETWEEN CURRENT ROW AND 1 FOLLOWING) as sum12
+ FROM annotated_data_infinite2
+ LIMIT 5
+----
+logical_plan
+Projection: annotated_data_infinite2.a, annotated_data_infinite2.b, annotated_data_infinite2.c, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING AS sum1, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS [...]
+ Limit: skip=0, fetch=5
+ WindowAggr: windowExpr=[[SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS LAST, annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS LAST, annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC NULLS L [...]
+ WindowAggr: windowExpr=[[SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN CURRENT ROW AND 1 FOLLOWING]]
+ WindowAggr: windowExpr=[[SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING]]
+ WindowAggr: windowExpr=[[SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 1 FOLL [...]
+ WindowAggr: windowExpr=[[SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND CURRENT ROW]]
+ WindowAggr: windowExpr=[[SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING]]
+ TableScan: annotated_data_infinite2 projection=[a, b, c, d]
+physical_plan
+ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING@8 as sum1, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC NULL [...]
+ GlobalLimitExec: skip=0, fetch=5
+ BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.c): Ok(Field { name: "SUM(annotated_data_infinite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data_infinite2.c): Ok(Field { name: "SUM(annotated_data_infinite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: Window [...]
+ BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.c): Ok(Field { name: "SUM(annotated_data_infinite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data_infinite2.c): Ok(Field { name: "SUM(annotated_data_infinite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: Wind [...]
+ BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.c): Ok(Field { name: "SUM(annotated_data_infinite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data_infinite2.c): Ok(Field { name: "SUM(annotated_data_infinite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: Wi [...]
+ BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.c): Ok(Field { name: "SUM(annotated_data_infinite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data_infinite2.c): Ok(Field { name: "SUM(annotated_data_infinite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: [...]
+ BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.c): Ok(Field { name: "SUM(annotated_data_infinite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data_infinite2.c): Ok(Field { name: "SUM(annotated_data_infinite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFr [...]
+ BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.c): Ok(Field { name: "SUM(annotated_data_infinite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data_infinite2.c): Ok(Field { name: "SUM(annotated_data_infinite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: Window [...]
+ CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], has_header=true
+
+
+query IIIIIIIIIIIIIII
+SELECT a, b, c,
+ SUM(c) OVER(PARTITION BY a, d ORDER BY b, c ASC ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING) as sum1,
+ SUM(c) OVER(PARTITION BY a, d ORDER BY b, c ASC ROWS BETWEEN 1 FOLLOWING AND 5 FOLLOWING) as sum2,
+ SUM(c) OVER(PARTITION BY d ORDER BY a, b, c ASC ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING) as sum3,
+ SUM(c) OVER(PARTITION BY d ORDER BY a, b, c ASC ROWS BETWEEN 5 PRECEDING AND 1 PRECEDING) as sum4,
+ SUM(c) OVER(PARTITION BY a, b ORDER BY c ASC ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING) as sum5,
+ SUM(c) OVER(PARTITION BY a, b ORDER BY c ASC ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING) as sum6,
+ SUM(c) OVER(PARTITION BY b, a ORDER BY c ASC ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING) as sum7,
+ SUM(c) OVER(PARTITION BY b, a ORDER BY c ASC ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING) as sum8,
+ SUM(c) OVER(PARTITION BY a, b, d ORDER BY c ASC ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING) as sum9,
+ SUM(c) OVER(PARTITION BY a, b, d ORDER BY c ASC ROWS BETWEEN 5 PRECEDING AND CURRENT ROW) as sum10,
+ SUM(c) OVER(PARTITION BY b, a, d ORDER BY c ASC ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING) as sum11,
+ SUM(c) OVER(PARTITION BY b, a, d ORDER BY c ASC ROWS BETWEEN CURRENT ROW AND 1 FOLLOWING) as sum12
+ FROM annotated_data_infinite2
+ LIMIT 5
+----
+0 0 0 2 53 2 NULL 1 15 1 15 2 0 2 2
+0 0 1 8 61 8 NULL 3 21 3 21 8 1 8 8
+0 0 2 5 74 5 0 6 28 6 28 5 2 5 5
+0 0 3 11 96 11 2 10 36 10 36 11 5 11 9
+0 0 4 9 72 9 NULL 14 45 14 45 9 4 9 9
+
+
+
+# test_finite_source_partition_by
+
+query TT
+EXPLAIN SELECT a, b, c,
+ SUM(c) OVER(PARTITION BY a, d ORDER BY b, c ASC ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING) as sum1,
+ SUM(c) OVER(PARTITION BY a, d ORDER BY b, c ASC ROWS BETWEEN 1 FOLLOWING AND 5 FOLLOWING) as sum2,
+ SUM(c) OVER(PARTITION BY d ORDER BY a, b, c ASC ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING) as sum3,
+ SUM(c) OVER(PARTITION BY d ORDER BY a, b, c ASC ROWS BETWEEN 5 PRECEDING AND 1 PRECEDING) as sum4,
+ SUM(c) OVER(PARTITION BY a, b ORDER BY c ASC ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING) as sum5,
+ SUM(c) OVER(PARTITION BY a, b ORDER BY c ASC ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING) as sum6,
+ SUM(c) OVER(PARTITION BY b, a ORDER BY c ASC ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING) as sum7,
+ SUM(c) OVER(PARTITION BY b, a ORDER BY c ASC ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING) as sum8,
+ SUM(c) OVER(PARTITION BY a, b, d ORDER BY c ASC ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING) as sum9,
+ SUM(c) OVER(PARTITION BY a, b, d ORDER BY c ASC ROWS BETWEEN 5 PRECEDING AND CURRENT ROW) as sum10,
+ SUM(c) OVER(PARTITION BY b, a, d ORDER BY c ASC ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING) as sum11,
+ SUM(c) OVER(PARTITION BY b, a, d ORDER BY c ASC ROWS BETWEEN CURRENT ROW AND 1 FOLLOWING) as sum12
+ FROM annotated_data_finite2
+ ORDER BY c
+ LIMIT 5
+----
+logical_plan
+Limit: skip=0, fetch=5
+ Sort: annotated_data_finite2.c ASC NULLS LAST, fetch=5
+ Projection: annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.c, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING AS sum1, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.b ASC NULLS LAST, annotated_dat [...]
+ WindowAggr: windowExpr=[[SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.d] ORDER BY [annotated_data_finite2.a ASC NULLS LAST, annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.d] ORDER BY [annotated_data_finite2.a ASC NULLS LAST, annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN [...]
+ WindowAggr: windowExpr=[[SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.b, annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.b, annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN CURRENT ROW AND 1 FOLLOWING]]
+ WindowAggr: windowExpr=[[SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.b, annotated_data_finite2.a] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.b, annotated_data_finite2.a] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING]]
+ WindowAggr: windowExpr=[[SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 1 FOLLOWING AND 5 FOLLOWING]]
+ WindowAggr: windowExpr=[[SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND CURRENT ROW]]
+ WindowAggr: windowExpr=[[SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING]]
+ TableScan: annotated_data_finite2 projection=[a, b, c, d]
+physical_plan
+GlobalLimitExec: skip=0, fetch=5
+ SortExec: fetch=5, expr=[c@2 ASC NULLS LAST]
+ ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING@8 as sum1, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS LAST] ROWS BET [...]
+ BoundedWindowAggExec: wdw=[SUM(annotated_data_finite2.c): Ok(Field { name: "SUM(annotated_data_finite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data_finite2.c): Ok(Field { name: "SUM(annotated_data_finite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame [...]
+ SortExec: expr=[d@3 ASC NULLS LAST,a@0 ASC NULLS LAST,b@1 ASC NULLS LAST,c@2 ASC NULLS LAST]
+ BoundedWindowAggExec: wdw=[SUM(annotated_data_finite2.c): Ok(Field { name: "SUM(annotated_data_finite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data_finite2.c): Ok(Field { name: "SUM(annotated_data_finite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFr [...]
+ SortExec: expr=[b@1 ASC NULLS LAST,a@0 ASC NULLS LAST,d@3 ASC NULLS LAST,c@2 ASC NULLS LAST]
+ BoundedWindowAggExec: wdw=[SUM(annotated_data_finite2.c): Ok(Field { name: "SUM(annotated_data_finite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data_finite2.c): Ok(Field { name: "SUM(annotated_data_finite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { [...]
+ SortExec: expr=[b@1 ASC NULLS LAST,a@0 ASC NULLS LAST,c@2 ASC NULLS LAST]
+ BoundedWindowAggExec: wdw=[SUM(annotated_data_finite2.c): Ok(Field { name: "SUM(annotated_data_finite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data_finite2.c): Ok(Field { name: "SUM(annotated_data_finite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFram [...]
+ SortExec: expr=[a@0 ASC NULLS LAST,d@3 ASC NULLS LAST,b@1 ASC NULLS LAST,c@2 ASC NULLS LAST]
+ BoundedWindowAggExec: wdw=[SUM(annotated_data_finite2.c): Ok(Field { name: "SUM(annotated_data_finite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data_finite2.c): Ok(Field { name: "SUM(annotated_data_finite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { un [...]
+ SortExec: expr=[a@0 ASC NULLS LAST,b@1 ASC NULLS LAST,d@3 ASC NULLS LAST,c@2 ASC NULLS LAST]
+ BoundedWindowAggExec: wdw=[SUM(annotated_data_finite2.c): Ok(Field { name: "SUM(annotated_data_finite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data_finite2.c): Ok(Field { name: "SUM(annotated_data_finite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame [...]
+ CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], has_header=true
+
+
+query IIIIIIIIIIIIIII
+SELECT a, b, c,
+ SUM(c) OVER(PARTITION BY a, d ORDER BY b, c ASC ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING) as sum1,
+ SUM(c) OVER(PARTITION BY a, d ORDER BY b, c ASC ROWS BETWEEN 1 FOLLOWING AND 5 FOLLOWING) as sum2,
+ SUM(c) OVER(PARTITION BY d ORDER BY a, b, c ASC ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING) as sum3,
+ SUM(c) OVER(PARTITION BY d ORDER BY a, b, c ASC ROWS BETWEEN 5 PRECEDING AND 1 PRECEDING) as sum4,
+ SUM(c) OVER(PARTITION BY a, b ORDER BY c ASC ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING) as sum5,
+ SUM(c) OVER(PARTITION BY a, b ORDER BY c ASC ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING) as sum6,
+ SUM(c) OVER(PARTITION BY b, a ORDER BY c ASC ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING) as sum7,
+ SUM(c) OVER(PARTITION BY b, a ORDER BY c ASC ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING) as sum8,
+ SUM(c) OVER(PARTITION BY a, b, d ORDER BY c ASC ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING) as sum9,
+ SUM(c) OVER(PARTITION BY a, b, d ORDER BY c ASC ROWS BETWEEN 5 PRECEDING AND CURRENT ROW) as sum10,
+ SUM(c) OVER(PARTITION BY b, a, d ORDER BY c ASC ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING) as sum11,
+ SUM(c) OVER(PARTITION BY b, a, d ORDER BY c ASC ROWS BETWEEN CURRENT ROW AND 1 FOLLOWING) as sum12
+ FROM annotated_data_finite2
+ ORDER BY c
+ LIMIT 5
+----
+0 0 0 2 53 2 NULL 1 15 1 15 2 0 2 2
+0 0 1 8 61 8 NULL 3 21 3 21 8 1 8 8
+0 0 2 5 74 5 0 6 28 6 28 5 2 5 5
+0 0 3 11 96 11 2 10 36 10 36 11 5 11 9
+0 0 4 9 72 9 NULL 14 45 14 45 9 4 9 9
+
+
+statement ok
+drop table annotated_data_finite2
+
+statement ok
+drop table annotated_data_infinite2