You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/05/10 16:05:58 UTC

[arrow-datafusion] branch main updated: Port remainder of `window.rs` to sqllogictest (#6234)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 8a112484ac Port remainder of `window.rs` to sqllogictest (#6234)
8a112484ac is described below

commit 8a112484ac7ae89afc7006d56c65fba2dab106ce
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Wed May 10 12:05:52 2023 -0400

    Port remainder of `window.rs` to sqllogictest (#6234)
    
    * Port remainder of window.rs to sqllogictest
    
    * fix groupby.slt to use unbounded inputs
---
 datafusion/core/src/test_util/mod.rs               | 180 +-----
 datafusion/core/tests/data/window_1.csv            | 101 ++++
 datafusion/core/tests/data/window_2.csv            | 101 ++++
 datafusion/core/tests/sql/group_by.rs              |  97 ----
 datafusion/core/tests/sql/mod.rs                   |   1 -
 datafusion/core/tests/sql/projection.rs            |  30 +-
 datafusion/core/tests/sql/window.rs                | 525 -----------------
 .../tests/sqllogictests/test_files/groupby.slt     |  89 +++
 .../core/tests/sqllogictests/test_files/select.slt |  48 ++
 .../core/tests/sqllogictests/test_files/window.slt | 622 +++++++++++++++++++++
 10 files changed, 963 insertions(+), 831 deletions(-)

diff --git a/datafusion/core/src/test_util/mod.rs b/datafusion/core/src/test_util/mod.rs
index 9035b6d893..d42379b82a 100644
--- a/datafusion/core/src/test_util/mod.rs
+++ b/datafusion/core/src/test_util/mod.rs
@@ -19,10 +19,8 @@
 
 pub mod parquet;
 
-use arrow::array::Int32Array;
 use std::any::Any;
 use std::collections::HashMap;
-use std::fs::File;
 use std::path::Path;
 use std::pin::Pin;
 use std::task::{Context, Poll};
@@ -42,13 +40,10 @@ use crate::prelude::{CsvReadOptions, SessionContext};
 use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
 use arrow::record_batch::RecordBatch;
 use async_trait::async_trait;
-use datafusion_common::from_slice::FromSlice;
 use datafusion_common::{Statistics, TableReference};
-use datafusion_execution::config::SessionConfig;
-use datafusion_expr::{col, CreateExternalTable, Expr, TableType};
+use datafusion_expr::{CreateExternalTable, Expr, TableType};
 use datafusion_physical_expr::PhysicalSortExpr;
 use futures::Stream;
-use tempfile::TempDir;
 
 /// Compares formatted output of a record batch with an expected
 /// vector of strings, with the result of pretty formatting record
@@ -293,179 +288,6 @@ pub fn aggr_test_schema_with_missing_col() -> SchemaRef {
     Arc::new(schema)
 }
 
-// Return a static RecordBatch and its ordering for tests. RecordBatch is ordered by ts
-fn get_test_data() -> Result<(RecordBatch, Vec<Expr>)> {
-    let ts_field = Field::new("ts", DataType::Int32, false);
-    let inc_field = Field::new("inc_col", DataType::Int32, false);
-    let desc_field = Field::new("desc_col", DataType::Int32, false);
-
-    let schema = Arc::new(Schema::new(vec![ts_field, inc_field, desc_field]));
-
-    let batch = RecordBatch::try_new(
-        schema,
-        vec![
-            Arc::new(Int32Array::from_slice([
-                1, 1, 5, 9, 10, 11, 16, 21, 22, 26, 26, 28, 31, 33, 38, 42, 47, 51, 53,
-                53, 58, 63, 67, 68, 70, 72, 72, 76, 81, 85, 86, 88, 91, 96, 97, 98, 100,
-                101, 102, 104, 104, 108, 112, 113, 113, 114, 114, 117, 122, 126, 131,
-                131, 136, 136, 136, 139, 141, 146, 147, 147, 152, 154, 159, 161, 163,
-                164, 167, 172, 173, 177, 180, 185, 186, 191, 195, 195, 199, 203, 207,
-                210, 213, 218, 221, 224, 226, 230, 232, 235, 238, 238, 239, 244, 245,
-                247, 250, 254, 258, 262, 264, 264,
-            ])),
-            Arc::new(Int32Array::from_slice([
-                1, 5, 10, 15, 20, 21, 26, 29, 30, 33, 37, 40, 43, 44, 45, 49, 51, 53, 58,
-                61, 65, 70, 75, 78, 83, 88, 90, 91, 95, 97, 100, 105, 109, 111, 115, 119,
-                120, 124, 126, 129, 131, 135, 140, 143, 144, 147, 148, 149, 151, 155,
-                156, 159, 160, 163, 165, 170, 172, 177, 181, 182, 186, 187, 192, 196,
-                197, 199, 203, 207, 209, 213, 214, 216, 219, 221, 222, 225, 226, 231,
-                236, 237, 242, 245, 247, 248, 253, 254, 259, 261, 266, 269, 272, 275,
-                278, 283, 286, 289, 291, 296, 301, 305,
-            ])),
-            Arc::new(Int32Array::from_slice([
-                100, 98, 93, 91, 86, 84, 81, 77, 75, 71, 70, 69, 64, 62, 59, 55, 50, 45,
-                41, 40, 39, 36, 31, 28, 23, 22, 17, 13, 10, 6, 5, 2, 1, -1, -4, -5, -6,
-                -8, -12, -16, -17, -19, -24, -25, -29, -34, -37, -42, -47, -48, -49, -53,
-                -57, -58, -61, -65, -67, -68, -71, -73, -75, -76, -78, -83, -87, -91,
-                -95, -98, -101, -105, -106, -111, -114, -116, -120, -125, -128, -129,
-                -134, -139, -142, -143, -146, -150, -154, -158, -163, -168, -172, -176,
-                -181, -184, -189, -193, -196, -201, -203, -208, -210, -213,
-            ])),
-        ],
-    )?;
-    let file_sort_order = vec![col("ts").sort(true, false)];
-    Ok((batch, file_sort_order))
-}
-
-// Return a static RecordBatch and its ordering for tests. RecordBatch is ordered by a, b, c
-fn get_test_data2() -> Result<(RecordBatch, Vec<Expr>)> {
-    let a0 = Field::new("a0", DataType::Int32, false);
-    let a = Field::new("a", DataType::Int32, false);
-    let b = Field::new("b", DataType::Int32, false);
-    let c = Field::new("c", DataType::Int32, false);
-    let d = Field::new("d", DataType::Int32, false);
-
-    let schema = Arc::new(Schema::new(vec![a0, a, b, c, d]));
-
-    let batch = RecordBatch::try_new(
-        schema,
-        vec![
-            Arc::new(Int32Array::from_slice([
-                1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
-                1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
-                1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
-                0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
-                0, 0, 0, 0,
-            ])),
-            Arc::new(Int32Array::from_slice([
-                0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
-                0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
-                0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
-                1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
-                1, 1, 1, 1,
-            ])),
-            Arc::new(Int32Array::from_slice([
-                0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
-                0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
-                1, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2,
-                2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3,
-                3, 3, 3, 3,
-            ])),
-            Arc::new(Int32Array::from_slice([
-                0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20,
-                21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38,
-                39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56,
-                57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74,
-                75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92,
-                93, 94, 95, 96, 97, 98, 99,
-            ])),
-            Arc::new(Int32Array::from_slice([
-                0, 2, 0, 0, 1, 1, 0, 2, 1, 4, 4, 2, 2, 1, 2, 3, 3, 2, 1, 4, 0, 3, 0, 0,
-                4, 0, 2, 0, 1, 1, 3, 4, 2, 2, 4, 0, 1, 4, 0, 1, 1, 3, 3, 2, 3, 0, 0, 1,
-                1, 3, 0, 3, 1, 1, 4, 2, 1, 1, 1, 2, 4, 3, 1, 4, 4, 0, 2, 4, 1, 1, 0, 2,
-                1, 1, 4, 2, 0, 2, 1, 4, 2, 0, 4, 2, 1, 1, 1, 4, 3, 4, 1, 2, 0, 0, 2, 0,
-                4, 2, 4, 3,
-            ])),
-        ],
-    )?;
-    let file_sort_order = vec![
-        col("a").sort(true, false),
-        col("b").sort(true, false),
-        col("c").sort(true, false),
-    ];
-    Ok((batch, file_sort_order))
-}
-
-/// Creates a test_context with table name `annotated_data` which has 100 rows.
-// Columns in the table are ts, inc_col, desc_col. Source is CsvExec which is ordered by
-// ts column.
-pub async fn get_test_context(
-    tmpdir: &TempDir,
-    infinite_source: bool,
-    session_config: SessionConfig,
-) -> Result<SessionContext> {
-    get_test_context_helper(tmpdir, infinite_source, session_config, get_test_data).await
-}
-
-/// Creates a test_context with table name `annotated_data`, which has 100 rows.
-// Columns in the table are a,b,c,d. Source is CsvExec which is ordered by
-// a,b,c column. Column a has cardinality 2, column b has cardinality 4.
-// Column c has cardinality 100 (unique entries). Column d has cardinality 5.
-pub async fn get_test_context2(
-    tmpdir: &TempDir,
-    infinite_source: bool,
-    session_config: SessionConfig,
-) -> Result<SessionContext> {
-    get_test_context_helper(tmpdir, infinite_source, session_config, get_test_data2).await
-}
-
-async fn get_test_context_helper(
-    tmpdir: &TempDir,
-    infinite_source: bool,
-    session_config: SessionConfig,
-    data_receiver: fn() -> Result<(RecordBatch, Vec<Expr>)>,
-) -> Result<SessionContext> {
-    let ctx = SessionContext::with_config(session_config);
-
-    let csv_read_options = CsvReadOptions::default();
-    let (batch, file_sort_order) = data_receiver()?;
-
-    let options_sort = csv_read_options
-        .to_listing_options(&ctx.copied_config())
-        .with_file_sort_order(Some(file_sort_order))
-        .with_infinite_source(infinite_source);
-
-    write_test_data_to_csv(tmpdir, 1, &batch)?;
-    let sql_definition = None;
-    ctx.register_listing_table(
-        "annotated_data",
-        tmpdir.path().to_string_lossy(),
-        options_sort.clone(),
-        Some(batch.schema()),
-        sql_definition,
-    )
-    .await
-    .unwrap();
-    Ok(ctx)
-}
-
-fn write_test_data_to_csv(
-    tmpdir: &TempDir,
-    n_file: usize,
-    batch: &RecordBatch,
-) -> Result<()> {
-    let n_chunk = batch.num_rows() / n_file;
-    for i in 0..n_file {
-        let target_file = tmpdir.path().join(format!("{i}.csv"));
-        let file = File::create(target_file)?;
-        let chunks_start = i * n_chunk;
-        let cur_batch = batch.slice(chunks_start, n_chunk);
-        let mut writer = arrow::csv::Writer::new(file);
-        writer.write(&cur_batch)?;
-    }
-    Ok(())
-}
-
 /// TableFactory for tests
 pub struct TestTableFactory {}
 
diff --git a/datafusion/core/tests/data/window_1.csv b/datafusion/core/tests/data/window_1.csv
new file mode 100644
index 0000000000..588af16d06
--- /dev/null
+++ b/datafusion/core/tests/data/window_1.csv
@@ -0,0 +1,101 @@
+ts,inc_col,desc_col
+1,1,100
+1,5,98
+5,10,93
+9,15,91
+10,20,86
+11,21,84
+16,26,81
+21,29,77
+22,30,75
+26,33,71
+26,37,70
+28,40,69
+31,43,64
+33,44,62
+38,45,59
+42,49,55
+47,51,50
+51,53,45
+53,58,41
+53,61,40
+58,65,39
+63,70,36
+67,75,31
+68,78,28
+70,83,23
+72,88,22
+72,90,17
+76,91,13
+81,95,10
+85,97,6
+86,100,5
+88,105,2
+91,109,1
+96,111,-1
+97,115,-4
+98,119,-5
+100,120,-6
+101,124,-8
+102,126,-12
+104,129,-16
+104,131,-17
+108,135,-19
+112,140,-24
+113,143,-25
+113,144,-29
+114,147,-34
+114,148,-37
+117,149,-42
+122,151,-47
+126,155,-48
+131,156,-49
+131,159,-53
+136,160,-57
+136,163,-58
+136,165,-61
+139,170,-65
+141,172,-67
+146,177,-68
+147,181,-71
+147,182,-73
+152,186,-75
+154,187,-76
+159,192,-78
+161,196,-83
+163,197,-87
+164,199,-91
+167,203,-95
+172,207,-98
+173,209,-101
+177,213,-105
+180,214,-106
+185,216,-111
+186,219,-114
+191,221,-116
+195,222,-120
+195,225,-125
+199,226,-128
+203,231,-129
+207,236,-134
+210,237,-139
+213,242,-142
+218,245,-143
+221,247,-146
+224,248,-150
+226,253,-154
+230,254,-158
+232,259,-163
+235,261,-168
+238,266,-172
+238,269,-176
+239,272,-181
+244,275,-184
+245,278,-189
+247,283,-193
+250,286,-196
+254,289,-201
+258,291,-203
+262,296,-208
+264,301,-210
+264,305,-213
diff --git a/datafusion/core/tests/data/window_2.csv b/datafusion/core/tests/data/window_2.csv
new file mode 100644
index 0000000000..9a2d7c1940
--- /dev/null
+++ b/datafusion/core/tests/data/window_2.csv
@@ -0,0 +1,101 @@
+a0,a,b,c,d
+1,0,0,0,0
+1,0,0,1,2
+1,0,0,2,0
+1,0,0,3,0
+1,0,0,4,1
+1,0,0,5,1
+1,0,0,6,0
+1,0,0,7,2
+1,0,0,8,1
+1,0,0,9,4
+1,0,0,10,4
+1,0,0,11,2
+1,0,0,12,2
+1,0,0,13,1
+1,0,0,14,2
+1,0,0,15,3
+1,0,0,16,3
+1,0,0,17,2
+1,0,0,18,1
+1,0,0,19,4
+1,0,0,20,0
+1,0,0,21,3
+1,0,0,22,0
+1,0,0,23,0
+1,0,0,24,4
+1,0,1,25,0
+1,0,1,26,2
+1,0,1,27,0
+1,0,1,28,1
+1,0,1,29,1
+1,0,1,30,3
+1,0,1,31,4
+1,0,1,32,2
+1,0,1,33,2
+1,0,1,34,4
+1,0,1,35,0
+1,0,1,36,1
+1,0,1,37,4
+1,0,1,38,0
+1,0,1,39,1
+1,0,1,40,1
+1,0,1,41,3
+1,0,1,42,3
+1,0,1,43,2
+1,0,1,44,3
+1,0,1,45,0
+1,0,1,46,0
+1,0,1,47,1
+1,0,1,48,1
+1,0,1,49,3
+0,1,2,50,0
+0,1,2,51,3
+0,1,2,52,1
+0,1,2,53,1
+0,1,2,54,4
+0,1,2,55,2
+0,1,2,56,1
+0,1,2,57,1
+0,1,2,58,1
+0,1,2,59,2
+0,1,2,60,4
+0,1,2,61,3
+0,1,2,62,1
+0,1,2,63,4
+0,1,2,64,4
+0,1,2,65,0
+0,1,2,66,2
+0,1,2,67,4
+0,1,2,68,1
+0,1,2,69,1
+0,1,2,70,0
+0,1,2,71,2
+0,1,2,72,1
+0,1,2,73,1
+0,1,2,74,4
+0,1,3,75,2
+0,1,3,76,0
+0,1,3,77,2
+0,1,3,78,1
+0,1,3,79,4
+0,1,3,80,2
+0,1,3,81,0
+0,1,3,82,4
+0,1,3,83,2
+0,1,3,84,1
+0,1,3,85,1
+0,1,3,86,1
+0,1,3,87,4
+0,1,3,88,3
+0,1,3,89,4
+0,1,3,90,1
+0,1,3,91,2
+0,1,3,92,0
+0,1,3,93,0
+0,1,3,94,2
+0,1,3,95,0
+0,1,3,96,4
+0,1,3,97,2
+0,1,3,98,4
+0,1,3,99,3
diff --git a/datafusion/core/tests/sql/group_by.rs b/datafusion/core/tests/sql/group_by.rs
index 7a4d4d2897..b4a92db3fc 100644
--- a/datafusion/core/tests/sql/group_by.rs
+++ b/datafusion/core/tests/sql/group_by.rs
@@ -16,7 +16,6 @@
 // under the License.
 
 use super::*;
-use datafusion::test_util::get_test_context2;
 
 #[tokio::test]
 async fn group_by_date_trunc() -> Result<()> {
@@ -161,99 +160,3 @@ async fn group_by_dictionary() {
     run_test_case::<UInt32Type>().await;
     run_test_case::<UInt64Type>().await;
 }
-
-#[tokio::test]
-async fn test_source_sorted_groupby() -> Result<()> {
-    let tmpdir = TempDir::new().unwrap();
-    let session_config = SessionConfig::new().with_target_partitions(1);
-    let ctx = get_test_context2(&tmpdir, true, session_config).await?;
-
-    let sql = "SELECT a, b,
-           SUM(c) as summation1
-           FROM annotated_data
-           GROUP BY b, a";
-
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let physical_plan = dataframe.create_physical_plan().await?;
-    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
-    let expected = {
-        vec![
-            "ProjectionExec: expr=[a@1 as a, b@0 as b, SUM(annotated_data.c)@2 as summation1]",
-            "  AggregateExec: mode=Single, gby=[b@1 as b, a@0 as a], aggr=[SUM(annotated_data.c)], ordering_mode=FullyOrdered",
-        ]
-    };
-
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    let actual_len = actual.len();
-    let actual_trim_last = &actual[..actual_len - 1];
-    assert_eq!(
-        expected, actual_trim_last,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    let actual = execute_to_batches(&ctx, sql).await;
-    let expected = vec![
-        "+---+---+------------+",
-        "| a | b | summation1 |",
-        "+---+---+------------+",
-        "| 0 | 0 | 300        |",
-        "| 0 | 1 | 925        |",
-        "| 1 | 2 | 1550       |",
-        "| 1 | 3 | 2175       |",
-        "+---+---+------------+",
-    ];
-    assert_batches_eq!(expected, &actual);
-    Ok(())
-}
-
-#[tokio::test]
-async fn test_source_sorted_groupby2() -> Result<()> {
-    let tmpdir = TempDir::new().unwrap();
-    let session_config = SessionConfig::new().with_target_partitions(1);
-    let ctx = get_test_context2(&tmpdir, true, session_config).await?;
-
-    let sql = "SELECT a, d,
-           SUM(c) as summation1
-           FROM annotated_data
-           GROUP BY d, a";
-
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let physical_plan = dataframe.create_physical_plan().await?;
-    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
-    let expected = {
-        vec![
-            "ProjectionExec: expr=[a@1 as a, d@0 as d, SUM(annotated_data.c)@2 as summation1]",
-            "  AggregateExec: mode=Single, gby=[d@2 as d, a@0 as a], aggr=[SUM(annotated_data.c)], ordering_mode=PartiallyOrdered",
-        ]
-    };
-
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    let actual_len = actual.len();
-    let actual_trim_last = &actual[..actual_len - 1];
-    assert_eq!(
-        expected, actual_trim_last,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    let actual = execute_to_batches(&ctx, sql).await;
-    let expected = vec![
-        "+---+---+------------+",
-        "| a | d | summation1 |",
-        "+---+---+------------+",
-        "| 0 | 0 | 292        |",
-        "| 0 | 2 | 196        |",
-        "| 0 | 1 | 315        |",
-        "| 0 | 4 | 164        |",
-        "| 0 | 3 | 258        |",
-        "| 1 | 0 | 622        |",
-        "| 1 | 3 | 299        |",
-        "| 1 | 1 | 1043       |",
-        "| 1 | 4 | 913        |",
-        "| 1 | 2 | 848        |",
-        "+---+---+------------+",
-    ];
-    assert_batches_eq!(expected, &actual);
-    Ok(())
-}
diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs
index 8b33a888a5..bd9e213dfe 100644
--- a/datafusion/core/tests/sql/mod.rs
+++ b/datafusion/core/tests/sql/mod.rs
@@ -97,7 +97,6 @@ pub mod references;
 pub mod select;
 pub mod timestamp;
 pub mod udf;
-pub mod window;
 
 pub mod explain;
 pub mod information_schema;
diff --git a/datafusion/core/tests/sql/projection.rs b/datafusion/core/tests/sql/projection.rs
index 5f480d46c6..ac697b1176 100644
--- a/datafusion/core/tests/sql/projection.rs
+++ b/datafusion/core/tests/sql/projection.rs
@@ -16,7 +16,7 @@
 // under the License.
 
 use datafusion::datasource::provider_as_source;
-use datafusion::test_util::{get_test_context2, scan_empty};
+use datafusion::test_util::scan_empty;
 use datafusion_expr::{when, LogicalPlanBuilder, UNNAMED_TABLE};
 use tempfile::TempDir;
 
@@ -375,31 +375,3 @@ async fn project_columns_in_memory_without_propagation() -> Result<()> {
 
     Ok(())
 }
-
-#[tokio::test]
-async fn test_source_projection() -> Result<()> {
-    let session_config = SessionConfig::new().with_target_partitions(1);
-    // Source is ordered by a, b, c
-    // Source is finite.
-    let tmpdir1 = TempDir::new()?;
-    let ctx = get_test_context2(&tmpdir1, false, session_config).await?;
-    let sql = "SELECT a FROM annotated_data
-        ORDER BY a
-        LIMIT 5";
-
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let physical_plan = dataframe.create_physical_plan().await?;
-    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
-    // Final plan shouldn't include SortExec.
-    let expected: Vec<&str> = { vec!["GlobalLimitExec: skip=0, fetch=5"] };
-
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    let actual_len = actual.len();
-    let actual_trim_last = &actual[..actual_len - 1];
-    assert_eq!(
-        expected, actual_trim_last,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-    Ok(())
-}
diff --git a/datafusion/core/tests/sql/window.rs b/datafusion/core/tests/sql/window.rs
deleted file mode 100644
index f63395e455..0000000000
--- a/datafusion/core/tests/sql/window.rs
+++ /dev/null
@@ -1,525 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use super::*;
-
-#[tokio::test]
-async fn window_frame_creation_type_checking() -> Result<()> {
-    // The following query has type error. We should test the error could be detected
-    // from either the logical plan (when `skip_failed_rules` is set to `false`) or
-    // the physical plan (when `skip_failed_rules` is set to `true`).
-
-    // We should remove the type checking in physical plan after we don't skip
-    // the failed optimizing rules by default. (see more in https://github.com/apache/arrow-datafusion/issues/4615)
-    async fn check_query(skip_failed_rules: bool, err_msg: &str) -> Result<()> {
-        use datafusion_common::ScalarValue::Boolean;
-        let config = SessionConfig::new().set(
-            "datafusion.optimizer.skip_failed_rules",
-            Boolean(Some(skip_failed_rules)),
-        );
-        let ctx = SessionContext::with_config(config);
-        register_aggregate_csv(&ctx).await?;
-        let df = ctx
-            .sql(
-                "SELECT
-                    COUNT(c1) OVER (ORDER BY c2 RANGE BETWEEN '1 DAY' PRECEDING AND '2 DAY' FOLLOWING)
-                    FROM aggregate_test_100;",
-            )
-            .await?;
-        let results = df.collect().await;
-        assert_contains!(results.err().unwrap().to_string(), err_msg);
-        Ok(())
-    }
-
-    // Error is returned from the physical plan.
-    check_query(
-        true,
-        r#"Execution error: Cannot cast Utf8("1 DAY") to UInt32"#,
-    )
-    .await?;
-
-    // Error is returned from the logical plan.
-    check_query(
-        false,
-        r#"Execution error: Cannot cast Utf8("1 DAY") to UInt32"#,
-    )
-    .await
-}
-
-mod tests {
-    use super::*;
-    use datafusion::test_util::{get_test_context, get_test_context2};
-
-    #[tokio::test]
-    async fn test_source_sorted_aggregate() -> Result<()> {
-        let tmpdir = TempDir::new()?;
-        let session_config = SessionConfig::new().with_target_partitions(1);
-        let ctx = get_test_context(&tmpdir, false, session_config).await?;
-
-        let sql = "SELECT
-            SUM(inc_col) OVER(ORDER BY ts RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING) as sum1,
-            SUM(desc_col) OVER(ORDER BY ts RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING) as sum2,
-            SUM(inc_col) OVER(ORDER BY ts ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING) as sum3,
-            MIN(inc_col) OVER(ORDER BY ts RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING) as min1,
-            MIN(desc_col) OVER(ORDER BY ts RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING) as min2,
-            MIN(inc_col) OVER(ORDER BY ts ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING) as min3,
-            MAX(inc_col) OVER(ORDER BY ts RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING) as max1,
-            MAX(desc_col) OVER(ORDER BY ts RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING) as max2,
-            MAX(inc_col) OVER(ORDER BY ts ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING) as max3,
-            COUNT(*) OVER(ORDER BY ts RANGE BETWEEN 4 PRECEDING AND 8 FOLLOWING) as cnt1,
-            COUNT(*) OVER(ORDER BY ts ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING) as cnt2,
-            SUM(inc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING) as sumr1,
-            SUM(desc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 1 PRECEDING AND 8 FOLLOWING) as sumr2,
-            SUM(desc_col) OVER(ORDER BY ts DESC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sumr3,
-            MIN(inc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING) as minr1,
-            MIN(desc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING) as minr2,
-            MIN(inc_col) OVER(ORDER BY ts DESC ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING) as minr3,
-            MAX(inc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING) as maxr1,
-            MAX(desc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING) as maxr2,
-            MAX(inc_col) OVER(ORDER BY ts DESC ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING) as maxr3,
-            COUNT(*) OVER(ORDER BY ts DESC RANGE BETWEEN 6 PRECEDING AND 2 FOLLOWING) as cntr1,
-            COUNT(*) OVER(ORDER BY ts DESC ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING) as cntr2,
-            SUM(desc_col) OVER(ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING) as sum4,
-            COUNT(*) OVER(ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING) as cnt3
-            FROM annotated_data
-            ORDER BY inc_col DESC
-            LIMIT 5
-            ";
-
-        let msg = format!("Creating logical plan for '{sql}'");
-        let dataframe = ctx.sql(sql).await.expect(&msg);
-        let physical_plan = dataframe.create_physical_plan().await?;
-        let formatted = displayable(physical_plan.as_ref()).indent().to_string();
-        let expected = {
-            vec![
-                "ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, sum3@2 as sum3, min1@3 as min1, min2@4 as min2, min3@5 as min3, max1@6 as max1, max2@7 as max2, max3@8 as max3, cnt1@9 as cnt1, cnt2@10 as cnt2, sumr1@11 as sumr1, sumr2@12 as sumr2, sumr3@13 as sumr3, minr1@14 as minr1, minr2@15 as minr2, minr3@16 as minr3, maxr1@17 as maxr1, maxr2@18 as maxr2, maxr3@19 as maxr3, cntr1@20 as cntr1, cntr2@21 as cntr2, sum4@22 as sum4, cnt3@23 as cnt3]",
-                "  GlobalLimitExec: skip=0, fetch=5",
-                "    SortExec: fetch=5, expr=[inc_col@24 DESC]",
-                "      ProjectionExec: expr=[SUM(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@13 as sum1, SUM(annotated_data.desc_col) ORDER BY [annotated_data.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING@14 as sum2, SUM(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING@15 as sum3, MIN(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS L [...]
-                "        BoundedWindowAggExec: wdw=[SUM(annotated_data.desc_col): Ok(Field { name: \"SUM(annotated_data.desc_col)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(8)), end_bound: Following(UInt64(1)) }, COUNT(UInt8(1)): Ok(Field { name: \"COUNT(UInt8(1))\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units [...]
-                "          ProjectionExec: expr=[inc_col@1 as inc_col, desc_col@2 as desc_col, SUM(annotated_data.inc_col) ORDER BY [annotated_data.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING@3 as SUM(annotated_data.inc_col), SUM(annotated_data.desc_col) ORDER BY [annotated_data.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 8 FOLLOWING@4 as SUM(annotated_data.desc_col), SUM(annotated_data.desc_col) ORDER BY [annotated_data.ts DESC NULLS FIRST] ROWS BETWEEN 1 PRECE [...]
-                "            BoundedWindowAggExec: wdw=[SUM(annotated_data.inc_col): Ok(Field { name: \"SUM(annotated_data.inc_col)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(10)), end_bound: Following(Int32(1)) }, SUM(annotated_data.desc_col): Ok(Field { name: \"SUM(annotated_data.desc_col)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }) [...]
-                "              BoundedWindowAggExec: wdw=[SUM(annotated_data.inc_col): Ok(Field { name: \"SUM(annotated_data.inc_col)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(4)), end_bound: Following(Int32(1)) }, SUM(annotated_data.desc_col): Ok(Field { name: \"SUM(annotated_data.desc_col)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} } [...]
-            ]
-        };
-
-        let actual: Vec<&str> = formatted.trim().lines().collect();
-        let actual_len = actual.len();
-        let actual_trim_last = &actual[..actual_len - 1];
-        assert_eq!(
-            expected, actual_trim_last,
-            "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-        );
-
-        let actual = execute_to_batches(&ctx, sql).await;
-        let expected = vec![
-            "+------+------+------+------+------+------+------+------+------+------+------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+------+",
-            "| sum1 | sum2 | sum3 | min1 | min2 | min3 | max1 | max2 | max3 | cnt1 | cnt2 | sumr1 | sumr2 | sumr3 | minr1 | minr2 | minr3 | maxr1 | maxr2 | maxr3 | cntr1 | cntr2 | sum4  | cnt3 |",
-            "+------+------+------+------+------+------+------+------+------+------+------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+------+",
-            "| 1482 | -631 | 606  | 289  | -213 | 301  | 305  | -208 | 305  | 3    | 9    | 902   | -834  | -1231 | 301   | -213  | 269   | 305   | -210  | 305   | 3     | 2     | -1797 | 9    |",
-            "| 1482 | -631 | 902  | 289  | -213 | 296  | 305  | -208 | 305  | 3    | 10   | 902   | -834  | -1424 | 301   | -213  | 266   | 305   | -210  | 305   | 3     | 3     | -1978 | 10   |",
-            "| 876  | -411 | 1193 | 289  | -208 | 291  | 296  | -203 | 305  | 4    | 10   | 587   | -612  | -1400 | 296   | -213  | 261   | 305   | -208  | 301   | 3     | 4     | -1941 | 10   |",
-            "| 866  | -404 | 1482 | 286  | -203 | 289  | 291  | -201 | 305  | 5    | 10   | 580   | -600  | -1374 | 291   | -208  | 259   | 305   | -203  | 296   | 4     | 5     | -1903 | 10   |",
-            "| 1411 | -397 | 1768 | 275  | -201 | 286  | 289  | -196 | 305  | 4    | 10   | 575   | -590  | -1347 | 289   | -203  | 254   | 305   | -201  | 291   | 2     | 6     | -1863 | 10   |",
-            "+------+------+------+------+------+------+------+------+------+------+------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+------+",
-        ];
-        assert_batches_eq!(expected, &actual);
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn test_source_sorted_builtin() -> Result<()> {
-        let tmpdir = TempDir::new()?;
-        let session_config = SessionConfig::new().with_target_partitions(1);
-        let ctx = get_test_context(&tmpdir, false, session_config).await?;
-
-        let sql = "SELECT
-            FIRST_VALUE(inc_col) OVER(ORDER BY ts RANGE BETWEEN 10 PRECEDING and 1 FOLLOWING) as fv1,
-            FIRST_VALUE(inc_col) OVER(ORDER BY ts ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as fv2,
-            LAST_VALUE(inc_col) OVER(ORDER BY ts RANGE BETWEEN 10 PRECEDING and 1 FOLLOWING) as lv1,
-            LAST_VALUE(inc_col) OVER(ORDER BY ts ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as lv2,
-            NTH_VALUE(inc_col, 5) OVER(ORDER BY ts RANGE BETWEEN 10 PRECEDING and 1 FOLLOWING) as nv1,
-            NTH_VALUE(inc_col, 5) OVER(ORDER BY ts ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as nv2,
-            ROW_NUMBER() OVER(ORDER BY ts RANGE BETWEEN 1 PRECEDING and 10 FOLLOWING) AS rn1,
-            ROW_NUMBER() OVER(ORDER BY ts ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as rn2,
-            RANK() OVER(ORDER BY ts RANGE BETWEEN 1 PRECEDING and 10 FOLLOWING) AS rank1,
-            RANK() OVER(ORDER BY ts ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as rank2,
-            DENSE_RANK() OVER(ORDER BY ts RANGE BETWEEN 1 PRECEDING and 10 FOLLOWING) AS dense_rank1,
-            DENSE_RANK() OVER(ORDER BY ts ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as dense_rank2,
-            LAG(inc_col, 1, 1001) OVER(ORDER BY ts RANGE BETWEEN 1 PRECEDING and 10 FOLLOWING) AS lag1,
-            LAG(inc_col, 2, 1002) OVER(ORDER BY ts ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as lag2,
-            LEAD(inc_col, -1, 1001) OVER(ORDER BY ts RANGE BETWEEN 1 PRECEDING and 10 FOLLOWING) AS lead1,
-            LEAD(inc_col, 4, 1004) OVER(ORDER BY ts ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as lead2,
-            FIRST_VALUE(inc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 10 PRECEDING and 1 FOLLOWING) as fvr1,
-            FIRST_VALUE(inc_col) OVER(ORDER BY ts DESC ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as fvr2,
-            LAST_VALUE(inc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 10 PRECEDING and 1 FOLLOWING) as lvr1,
-            LAST_VALUE(inc_col) OVER(ORDER BY ts DESC ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as lvr2,
-            LAG(inc_col, 1, 1001) OVER(ORDER BY ts DESC RANGE BETWEEN 1 PRECEDING and 10 FOLLOWING) AS lagr1,
-            LAG(inc_col, 2, 1002) OVER(ORDER BY ts DESC ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as lagr2,
-            LEAD(inc_col, -1, 1001) OVER(ORDER BY ts DESC RANGE BETWEEN 1 PRECEDING and 10 FOLLOWING) AS leadr1,
-            LEAD(inc_col, 4, 1004) OVER(ORDER BY ts DESC ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as leadr2
-            FROM annotated_data
-            ORDER BY ts DESC
-            LIMIT 5
-            ";
-
-        let msg = format!("Creating logical plan for '{sql}'");
-        let dataframe = ctx.sql(sql).await.expect(&msg);
-        let physical_plan = dataframe.create_physical_plan().await?;
-        let formatted = displayable(physical_plan.as_ref()).indent().to_string();
-        let expected = {
-            vec![
-                "ProjectionExec: expr=[fv1@0 as fv1, fv2@1 as fv2, lv1@2 as lv1, lv2@3 as lv2, nv1@4 as nv1, nv2@5 as nv2, rn1@6 as rn1, rn2@7 as rn2, rank1@8 as rank1, rank2@9 as rank2, dense_rank1@10 as dense_rank1, dense_rank2@11 as dense_rank2, lag1@12 as lag1, lag2@13 as lag2, lead1@14 as lead1, lead2@15 as lead2, fvr1@16 as fvr1, fvr2@17 as fvr2, lvr1@18 as lvr1, lvr2@19 as lvr2, lagr1@20 as lagr1, lagr2@21 as lagr2, leadr1@22 as leadr1, leadr2@23 as leadr2]",
-                "  GlobalLimitExec: skip=0, fetch=5",
-                "    SortExec: fetch=5, expr=[ts@24 DESC]",
-                "      ProjectionExec: expr=[FIRST_VALUE(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@10 as fv1, FIRST_VALUE(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@11 as fv2, LAST_VALUE(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@12 as lv1, LAST_VALUE(annotated_data.inc_col) ORDER BY [an [...]
-                "        BoundedWindowAggExec: wdw=[FIRST_VALUE(annotated_data.inc_col): Ok(Field { name: \"FIRST_VALUE(annotated_data.inc_col)\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(10)), end_bound: Following(Int32(1)) }, FIRST_VALUE(annotated_data.inc_col): Ok(Field { name: \"FIRST_VALUE(annotated_data.inc_col)\", data_type: Int32, nullable: true, dict_id: 0, dict_is_order [...]
-                "          BoundedWindowAggExec: wdw=[FIRST_VALUE(annotated_data.inc_col): Ok(Field { name: \"FIRST_VALUE(annotated_data.inc_col)\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(1)), end_bound: Following(Int32(10)) }, FIRST_VALUE(annotated_data.inc_col): Ok(Field { name: \"FIRST_VALUE(annotated_data.inc_col)\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ord [...]
-            ]
-        };
-
-        let actual: Vec<&str> = formatted.trim().lines().collect();
-        let actual_len = actual.len();
-        let actual_trim_last = &actual[..actual_len - 1];
-        assert_eq!(
-            expected, actual_trim_last,
-            "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-        );
-
-        let actual = execute_to_batches(&ctx, sql).await;
-        let expected = vec![
-            "+-----+-----+-----+-----+-----+-----+-----+-----+-------+-------+-------------+-------------+------+------+-------+-------+------+------+------+------+-------+-------+--------+--------+",
-            "| fv1 | fv2 | lv1 | lv2 | nv1 | nv2 | rn1 | rn2 | rank1 | rank2 | dense_rank1 | dense_rank2 | lag1 | lag2 | lead1 | lead2 | fvr1 | fvr2 | lvr1 | lvr2 | lagr1 | lagr2 | leadr1 | leadr2 |",
-            "+-----+-----+-----+-----+-----+-----+-----+-----+-------+-------+-------------+-------------+------+------+-------+-------+------+------+------+------+-------+-------+--------+--------+",
-            "| 289 | 269 | 305 | 305 | 305 | 283 | 100 | 100 | 99    | 99    | 86          | 86          | 301  | 296  | 301   | 1004  | 305  | 305  | 301  | 301  | 1001  | 1002  | 1001   | 289    |",
-            "| 289 | 266 | 305 | 305 | 305 | 278 | 99  | 99  | 99    | 99    | 86          | 86          | 296  | 291  | 296   | 1004  | 305  | 305  | 301  | 296  | 305   | 1002  | 305    | 286    |",
-            "| 289 | 261 | 296 | 301 |     | 275 | 98  | 98  | 98    | 98    | 85          | 85          | 291  | 289  | 291   | 1004  | 305  | 305  | 296  | 291  | 301   | 305   | 301    | 283    |",
-            "| 286 | 259 | 291 | 296 |     | 272 | 97  | 97  | 97    | 97    | 84          | 84          | 289  | 286  | 289   | 1004  | 305  | 305  | 291  | 289  | 296   | 301   | 296    | 278    |",
-            "| 275 | 254 | 289 | 291 | 289 | 269 | 96  | 96  | 96    | 96    | 83          | 83          | 286  | 283  | 286   | 305   | 305  | 305  | 289  | 286  | 291   | 296   | 291    | 275    |",
-            "+-----+-----+-----+-----+-----+-----+-----+-----+-------+-------+-------------+-------------+------+------+-------+-------+------+------+------+------+-------+-------+--------+--------+",
-        ];
-        assert_batches_eq!(expected, &actual);
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn test_source_sorted_unbounded_preceding() -> Result<()> {
-        let tmpdir = TempDir::new()?;
-        let session_config = SessionConfig::new().with_target_partitions(1);
-        let ctx = get_test_context(&tmpdir, false, session_config).await?;
-
-        let sql = "SELECT
-            SUM(inc_col) OVER(ORDER BY ts ASC RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING) as sum1,
-            SUM(inc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING) as sum2,
-            MIN(inc_col) OVER(ORDER BY ts ASC RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING) as min1,
-            MIN(inc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING) as min2,
-            MAX(inc_col) OVER(ORDER BY ts ASC RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING) as max1,
-            MAX(inc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING) as max2,
-            COUNT(inc_col) OVER(ORDER BY ts ASC RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING) as count1,
-            COUNT(inc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING) as count2,
-            AVG(inc_col) OVER(ORDER BY ts ASC RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING) as avg1,
-            AVG(inc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING) as avg2
-            FROM annotated_data
-            ORDER BY inc_col ASC
-            LIMIT 5";
-
-        let msg = format!("Creating logical plan for '{sql}'");
-        let dataframe = ctx.sql(sql).await.expect(&msg);
-        let physical_plan = dataframe.create_physical_plan().await?;
-        let formatted = displayable(physical_plan.as_ref()).indent().to_string();
-        let expected = {
-            vec![
-                "ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, min1@2 as min1, min2@3 as min2, max1@4 as max1, max2@5 as max2, count1@6 as count1, count2@7 as count2, avg1@8 as avg1, avg2@9 as avg2]",
-                "  GlobalLimitExec: skip=0, fetch=5",
-                "    SortExec: fetch=5, expr=[inc_col@10 ASC NULLS LAST]",
-                "      ProjectionExec: expr=[SUM(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING@7 as sum1, SUM(annotated_data.inc_col) ORDER BY [annotated_data.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING@2 as sum2, MIN(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING@8 as min1, MIN(annotated_data.inc_col) ORDER BY [annotate [...]
-                "        BoundedWindowAggExec: wdw=[SUM(annotated_data.inc_col): Ok(Field { name: \"SUM(annotated_data.inc_col)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: Following(Int32(5)) }, MIN(annotated_data.inc_col): Ok(Field { name: \"MIN(annotated_data.inc_col)\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), fr [...]
-                "          BoundedWindowAggExec: wdw=[SUM(annotated_data.inc_col): Ok(Field { name: \"SUM(annotated_data.inc_col)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: Following(Int32(3)) }, MIN(annotated_data.inc_col): Ok(Field { name: \"MIN(annotated_data.inc_col)\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }),  [...]
-            ]
-        };
-
-        let actual: Vec<&str> = formatted.trim().lines().collect();
-        let actual_len = actual.len();
-        let actual_trim_last = &actual[..actual_len - 1];
-        assert_eq!(
-            expected, actual_trim_last,
-            "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-        );
-
-        let actual = execute_to_batches(&ctx, sql).await;
-        let expected = vec![
-            "+------+------+------+------+------+------+--------+--------+-------------------+-------------------+",
-            "| sum1 | sum2 | min1 | min2 | max1 | max2 | count1 | count2 | avg1              | avg2              |",
-            "+------+------+------+------+------+------+--------+--------+-------------------+-------------------+",
-            "| 16   | 6    | 1    | 1    | 10   | 5    | 3      | 2      | 5.333333333333333 | 3.0               |",
-            "| 16   | 6    | 1    | 1    | 10   | 5    | 3      | 2      | 5.333333333333333 | 3.0               |",
-            "| 51   | 16   | 1    | 1    | 20   | 10   | 5      | 3      | 10.2              | 5.333333333333333 |",
-            "| 72   | 72   | 1    | 1    | 21   | 21   | 6      | 6      | 12.0              | 12.0              |",
-            "| 72   | 72   | 1    | 1    | 21   | 21   | 6      | 6      | 12.0              | 12.0              |",
-            "+------+------+------+------+------+------+--------+--------+-------------------+-------------------+",
-        ];
-        assert_batches_eq!(expected, &actual);
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn test_source_sorted_unbounded_preceding_builtin() -> Result<()> {
-        let tmpdir = TempDir::new().unwrap();
-        let session_config = SessionConfig::new().with_target_partitions(1);
-        let ctx = get_test_context(&tmpdir, false, session_config).await?;
-
-        let sql = "SELECT
-           FIRST_VALUE(inc_col) OVER(ORDER BY ts ASC ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING) as first_value1,
-           FIRST_VALUE(inc_col) OVER(ORDER BY ts DESC ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING) as first_value2,
-           LAST_VALUE(inc_col) OVER(ORDER BY ts ASC ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING) as last_value1,
-           LAST_VALUE(inc_col) OVER(ORDER BY ts DESC ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING) as last_value2,
-           NTH_VALUE(inc_col, 2) OVER(ORDER BY ts ASC ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING) as nth_value1
-           FROM annotated_data
-           ORDER BY inc_col ASC
-           LIMIT 5";
-
-        let msg = format!("Creating logical plan for '{sql}'");
-        let dataframe = ctx.sql(sql).await.expect(&msg);
-        let physical_plan = dataframe.create_physical_plan().await?;
-        let formatted = displayable(physical_plan.as_ref()).indent().to_string();
-        let expected = {
-            vec![
-                "ProjectionExec: expr=[first_value1@0 as first_value1, first_value2@1 as first_value2, last_value1@2 as last_value1, last_value2@3 as last_value2, nth_value1@4 as nth_value1]",
-                "  GlobalLimitExec: skip=0, fetch=5",
-                "    SortExec: fetch=5, expr=[inc_col@5 ASC NULLS LAST]",
-                "      ProjectionExec: expr=[FIRST_VALUE(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING@4 as first_value1, FIRST_VALUE(annotated_data.inc_col) ORDER BY [annotated_data.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING@2 as first_value2, LAST_VALUE(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING@5 as last_value1, LAS [...]
-                "        BoundedWindowAggExec: wdw=[FIRST_VALUE(annotated_data.inc_col): Ok(Field { name: \"FIRST_VALUE(annotated_data.inc_col)\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(1)) }, LAST_VALUE(annotated_data.inc_col): Ok(Field { name: \"LAST_VALUE(annotated_data.inc_col)\", data_type: Int32, nullable: true, dict_id: 0, dict_is_orde [...]
-                "          BoundedWindowAggExec: wdw=[FIRST_VALUE(annotated_data.inc_col): Ok(Field { name: \"FIRST_VALUE(annotated_data.inc_col)\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(3)) }, LAST_VALUE(annotated_data.inc_col): Ok(Field { name: \"LAST_VALUE(annotated_data.inc_col)\", data_type: Int32, nullable: true, dict_id: 0, dict_is_or [...]
-            ]
-        };
-
-        let actual: Vec<&str> = formatted.trim().lines().collect();
-        let actual_len = actual.len();
-        let actual_trim_last = &actual[..actual_len - 1];
-        assert_eq!(
-            expected, actual_trim_last,
-            "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-        );
-
-        let actual = execute_to_batches(&ctx, sql).await;
-        let expected = vec![
-            "+--------------+--------------+-------------+-------------+------------+",
-            "| first_value1 | first_value2 | last_value1 | last_value2 | nth_value1 |",
-            "+--------------+--------------+-------------+-------------+------------+",
-            "| 1            | 15           | 5           | 1           | 5          |",
-            "| 1            | 20           | 10          | 1           | 5          |",
-            "| 1            | 21           | 15          | 1           | 5          |",
-            "| 1            | 26           | 20          | 1           | 5          |",
-            "| 1            | 29           | 21          | 1           | 5          |",
-            "+--------------+--------------+-------------+-------------+------------+",
-        ];
-        assert_batches_eq!(expected, &actual);
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn test_source_sorted_unbounded_source() -> Result<()> {
-        let tmpdir = TempDir::new().unwrap();
-        let session_config = SessionConfig::new().with_target_partitions(1);
-        // Use an unbounded source
-        let ctx = get_test_context(&tmpdir, true, session_config).await?;
-
-        let sql = "SELECT
-           SUM(inc_col) OVER(ORDER BY ts ASC ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING) as sum1,
-           SUM(inc_col) OVER(ORDER BY ts DESC ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING) as sum2,
-           COUNT(inc_col) OVER(ORDER BY ts ASC ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING) as count1,
-           COUNT(inc_col) OVER(ORDER BY ts DESC ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING) as count2
-           FROM annotated_data
-           ORDER BY ts ASC
-           LIMIT 5";
-
-        let msg = format!("Creating logical plan for '{sql}'");
-        let dataframe = ctx.sql(sql).await.expect(&msg);
-        let physical_plan = dataframe.create_physical_plan().await?;
-        let formatted = displayable(physical_plan.as_ref()).indent().to_string();
-        let expected = {
-            vec![
-                "ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, count1@2 as count1, count2@3 as count2]",
-                "  GlobalLimitExec: skip=0, fetch=5",
-                "    ProjectionExec: expr=[SUM(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING@4 as sum1, SUM(annotated_data.inc_col) ORDER BY [annotated_data.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING@2 as sum2, COUNT(annotated_data.inc_col) ORDER BY [annotated_data.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING@5 as count1, COUNT(annotated_data.inc_col) ORDER BY [annotat [...]
-                "      BoundedWindowAggExec: wdw=[SUM(annotated_data.inc_col): Ok(Field { name: \"SUM(annotated_data.inc_col)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(1)) }, COUNT(annotated_data.inc_col): Ok(Field { name: \"COUNT(annotated_data.inc_col)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), [...]
-                "        BoundedWindowAggExec: wdw=[SUM(annotated_data.inc_col): Ok(Field { name: \"SUM(annotated_data.inc_col)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(3)) }, COUNT(annotated_data.inc_col): Ok(Field { name: \"COUNT(annotated_data.inc_col)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} } [...]
-            ]
-        };
-
-        let actual: Vec<&str> = formatted.trim().lines().collect();
-        let actual_len = actual.len();
-        let actual_trim_last = &actual[..actual_len - 1];
-        assert_eq!(
-            expected, actual_trim_last,
-            "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-        );
-
-        let actual = execute_to_batches(&ctx, sql).await;
-        let expected = vec![
-            "+------+------+--------+--------+",
-            "| sum1 | sum2 | count1 | count2 |",
-            "+------+------+--------+--------+",
-            "| 6    | 31   | 2      | 4      |",
-            "| 16   | 51   | 3      | 5      |",
-            "| 31   | 72   | 4      | 6      |",
-            "| 51   | 98   | 5      | 7      |",
-            "| 72   | 127  | 6      | 8      |",
-            "+------+------+--------+--------+",
-        ];
-        assert_batches_eq!(expected, &actual);
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn test_infinite_source_partition_by() -> Result<()> {
-        let tmpdir = TempDir::new()?;
-        let session_config = SessionConfig::new().with_target_partitions(1);
-        // Source is ordered by a, b, c
-        let ctx = get_test_context2(&tmpdir, true, session_config).await?;
-
-        let sql = "SELECT a, b, c,
-        SUM(c) OVER(PARTITION BY a, d ORDER BY b, c ASC ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING) as sum1,
-        SUM(c) OVER(PARTITION BY a, d ORDER BY b, c ASC ROWS BETWEEN 1 FOLLOWING AND 5 FOLLOWING) as sum2,
-        SUM(c) OVER(PARTITION BY d ORDER BY a, b, c ASC ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING) as sum3,
-        SUM(c) OVER(PARTITION BY d ORDER BY a, b, c ASC ROWS BETWEEN 5 PRECEDING AND 1 PRECEDING) as sum4,
-        SUM(c) OVER(PARTITION BY a, b ORDER BY c ASC ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING) as sum5,
-        SUM(c) OVER(PARTITION BY a, b ORDER BY c ASC ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING) as sum6,
-        SUM(c) OVER(PARTITION BY b, a ORDER BY c ASC ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING) as sum7,
-        SUM(c) OVER(PARTITION BY b, a ORDER BY c ASC ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING) as sum8,
-        SUM(c) OVER(PARTITION BY a, b, d ORDER BY c ASC ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING) as sum9,
-        SUM(c) OVER(PARTITION BY a, b, d ORDER BY c ASC ROWS BETWEEN 5 PRECEDING AND CURRENT ROW) as sum10,
-        SUM(c) OVER(PARTITION BY b, a, d ORDER BY c ASC ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING) as sum11,
-        SUM(c) OVER(PARTITION BY b, a, d ORDER BY c ASC ROWS BETWEEN CURRENT ROW  AND 1 FOLLOWING) as sum12
-           FROM annotated_data
-           LIMIT 5";
-
-        let msg = format!("Creating logical plan for '{sql}'");
-        let dataframe = ctx.sql(sql).await.expect(&msg);
-        let physical_plan = dataframe.create_physical_plan().await?;
-        let formatted = displayable(physical_plan.as_ref()).indent().to_string();
-        let expected = {
-            vec![
-                "ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c, SUM(annotated_data.c) PARTITION BY [annotated_data.a, annotated_data.d] ORDER BY [annotated_data.b ASC NULLS LAST, annotated_data.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING@8 as sum1, SUM(annotated_data.c) PARTITION BY [annotated_data.a, annotated_data.d] ORDER BY [annotated_data.b ASC NULLS LAST, annotated_data.c ASC NULLS LAST] ROWS BETWEEN 1 FOLLOWING AND 5 FOLLOWING@9 as sum2, SUM(annotated_data.c) P [...]
-                "  GlobalLimitExec: skip=0, fetch=5",
-                "    BoundedWindowAggExec: wdw=[SUM(annotated_data.c): Ok(Field { name: \"SUM(annotated_data.c)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data.c): Ok(Field { name: \"SUM(annotated_data.c)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows [...]
-                "      BoundedWindowAggExec: wdw=[SUM(annotated_data.c): Ok(Field { name: \"SUM(annotated_data.c)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data.c): Ok(Field { name: \"SUM(annotated_data.c)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Ro [...]
-                "        BoundedWindowAggExec: wdw=[SUM(annotated_data.c): Ok(Field { name: \"SUM(annotated_data.c)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data.c): Ok(Field { name: \"SUM(annotated_data.c)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units:  [...]
-                "          BoundedWindowAggExec: wdw=[SUM(annotated_data.c): Ok(Field { name: \"SUM(annotated_data.c)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data.c): Ok(Field { name: \"SUM(annotated_data.c)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units [...]
-                "            BoundedWindowAggExec: wdw=[SUM(annotated_data.c): Ok(Field { name: \"SUM(annotated_data.c)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data.c): Ok(Field { name: \"SUM(annotated_data.c)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { uni [...]
-                "              BoundedWindowAggExec: wdw=[SUM(annotated_data.c): Ok(Field { name: \"SUM(annotated_data.c)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data.c): Ok(Field { name: \"SUM(annotated_data.c)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { u [...]
-            ]
-        };
-
-        let actual: Vec<&str> = formatted.trim().lines().collect();
-        let actual_len = actual.len();
-        let actual_trim_last = &actual[..actual_len - 1];
-        assert_eq!(
-            expected, actual_trim_last,
-            "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-        );
-        let actual = execute_to_batches(&ctx, sql).await;
-
-        let expected = vec![
-            "+---+---+---+------+------+------+------+------+------+------+------+------+-------+-------+-------+",
-            "| a | b | c | sum1 | sum2 | sum3 | sum4 | sum5 | sum6 | sum7 | sum8 | sum9 | sum10 | sum11 | sum12 |",
-            "+---+---+---+------+------+------+------+------+------+------+------+------+-------+-------+-------+",
-            "| 0 | 0 | 0 | 2    | 53   | 2    |      | 1    | 15   | 1    | 15   | 2    | 0     | 2     | 2     |",
-            "| 0 | 0 | 1 | 8    | 61   | 8    |      | 3    | 21   | 3    | 21   | 8    | 1     | 8     | 8     |",
-            "| 0 | 0 | 2 | 5    | 74   | 5    | 0    | 6    | 28   | 6    | 28   | 5    | 2     | 5     | 5     |",
-            "| 0 | 0 | 3 | 11   | 96   | 11   | 2    | 10   | 36   | 10   | 36   | 11   | 5     | 11    | 9     |",
-            "| 0 | 0 | 4 | 9    | 72   | 9    |      | 14   | 45   | 14   | 45   | 9    | 4     | 9     | 9     |",
-            "+---+---+---+------+------+------+------+------+------+------+------+------+-------+-------+-------+",
-        ];
-        assert_batches_eq!(expected, &actual);
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn test_finite_source_partition_by() -> Result<()> {
-        let tmpdir = TempDir::new()?;
-        let session_config = SessionConfig::new().with_target_partitions(1);
-        // Source is ordered by a, b, c
-        // Source is finite.
-        let ctx = get_test_context2(&tmpdir, false, session_config).await?;
-
-        let sql = "SELECT a, b, c,
-        SUM(c) OVER(PARTITION BY a, d ORDER BY b, c ASC ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING) as sum1,
-        SUM(c) OVER(PARTITION BY a, d ORDER BY b, c ASC ROWS BETWEEN 1 FOLLOWING AND 5 FOLLOWING) as sum2,
-        SUM(c) OVER(PARTITION BY d ORDER BY a, b, c ASC ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING) as sum3,
-        SUM(c) OVER(PARTITION BY d ORDER BY a, b, c ASC ROWS BETWEEN 5 PRECEDING AND 1 PRECEDING) as sum4,
-        SUM(c) OVER(PARTITION BY a, b ORDER BY c ASC ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING) as sum5,
-        SUM(c) OVER(PARTITION BY a, b ORDER BY c ASC ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING) as sum6,
-        SUM(c) OVER(PARTITION BY b, a ORDER BY c ASC ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING) as sum7,
-        SUM(c) OVER(PARTITION BY b, a ORDER BY c ASC ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING) as sum8,
-        SUM(c) OVER(PARTITION BY a, b, d ORDER BY c ASC ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING) as sum9,
-        SUM(c) OVER(PARTITION BY a, b, d ORDER BY c ASC ROWS BETWEEN 5 PRECEDING AND CURRENT ROW) as sum10,
-        SUM(c) OVER(PARTITION BY b, a, d ORDER BY c ASC ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING) as sum11,
-        SUM(c) OVER(PARTITION BY b, a, d ORDER BY c ASC ROWS BETWEEN CURRENT ROW  AND 1 FOLLOWING) as sum12
-           FROM annotated_data
-           ORDER BY c
-           LIMIT 5";
-
-        let msg = format!("Creating logical plan for '{sql}'");
-        let dataframe = ctx.sql(sql).await.expect(&msg);
-        let physical_plan = dataframe.create_physical_plan().await?;
-        let formatted = displayable(physical_plan.as_ref()).indent().to_string();
-        // We should produce `BoundedWindowAggExec`s that only works in Sorted mode, since source is finite.
-        // To satisfy, requirements for Sorted mode. We should introduce `SortExec`s through physical plan.
-        let expected = {
-            vec![
-                "GlobalLimitExec: skip=0, fetch=5",
-                "  SortExec: fetch=5, expr=[c@2 ASC NULLS LAST]",
-                "    ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c, SUM(annotated_data.c) PARTITION BY [annotated_data.a, annotated_data.d] ORDER BY [annotated_data.b ASC NULLS LAST, annotated_data.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING@8 as sum1, SUM(annotated_data.c) PARTITION BY [annotated_data.a, annotated_data.d] ORDER BY [annotated_data.b ASC NULLS LAST, annotated_data.c ASC NULLS LAST] ROWS BETWEEN 1 FOLLOWING AND 5 FOLLOWING@9 as sum2, SUM(annotated_data. [...]
-                "      BoundedWindowAggExec: wdw=[SUM(annotated_data.c): Ok(Field { name: \"SUM(annotated_data.c)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data.c): Ok(Field { name: \"SUM(annotated_data.c)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Ro [...]
-                "        SortExec: expr=[d@3 ASC NULLS LAST,a@0 ASC NULLS LAST,b@1 ASC NULLS LAST,c@2 ASC NULLS LAST]",
-                "          BoundedWindowAggExec: wdw=[SUM(annotated_data.c): Ok(Field { name: \"SUM(annotated_data.c)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data.c): Ok(Field { name: \"SUM(annotated_data.c)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units [...]
-                "            SortExec: expr=[b@1 ASC NULLS LAST,a@0 ASC NULLS LAST,d@3 ASC NULLS LAST,c@2 ASC NULLS LAST]",
-                "              BoundedWindowAggExec: wdw=[SUM(annotated_data.c): Ok(Field { name: \"SUM(annotated_data.c)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data.c): Ok(Field { name: \"SUM(annotated_data.c)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { u [...]
-                "                SortExec: expr=[b@1 ASC NULLS LAST,a@0 ASC NULLS LAST,c@2 ASC NULLS LAST]",
-                "                  BoundedWindowAggExec: wdw=[SUM(annotated_data.c): Ok(Field { name: \"SUM(annotated_data.c)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data.c): Ok(Field { name: \"SUM(annotated_data.c)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame [...]
-                "                    SortExec: expr=[a@0 ASC NULLS LAST,d@3 ASC NULLS LAST,b@1 ASC NULLS LAST,c@2 ASC NULLS LAST]",
-                "                      BoundedWindowAggExec: wdw=[SUM(annotated_data.c): Ok(Field { name: \"SUM(annotated_data.c)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data.c): Ok(Field { name: \"SUM(annotated_data.c)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowF [...]
-                "                        SortExec: expr=[a@0 ASC NULLS LAST,b@1 ASC NULLS LAST,d@3 ASC NULLS LAST,c@2 ASC NULLS LAST]",
-                "                          BoundedWindowAggExec: wdw=[SUM(annotated_data.c): Ok(Field { name: \"SUM(annotated_data.c)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data.c): Ok(Field { name: \"SUM(annotated_data.c)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: Win [...]
-            ]
-        };
-
-        let actual: Vec<&str> = formatted.trim().lines().collect();
-        let actual_len = actual.len();
-        let actual_trim_last = &actual[..actual_len - 1];
-        assert_eq!(
-            expected, actual_trim_last,
-            "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-        );
-        let actual = execute_to_batches(&ctx, sql).await;
-
-        let expected = vec![
-            "+---+---+---+------+------+------+------+------+------+------+------+------+-------+-------+-------+",
-            "| a | b | c | sum1 | sum2 | sum3 | sum4 | sum5 | sum6 | sum7 | sum8 | sum9 | sum10 | sum11 | sum12 |",
-            "+---+---+---+------+------+------+------+------+------+------+------+------+-------+-------+-------+",
-            "| 0 | 0 | 0 | 2    | 53   | 2    |      | 1    | 15   | 1    | 15   | 2    | 0     | 2     | 2     |",
-            "| 0 | 0 | 1 | 8    | 61   | 8    |      | 3    | 21   | 3    | 21   | 8    | 1     | 8     | 8     |",
-            "| 0 | 0 | 2 | 5    | 74   | 5    | 0    | 6    | 28   | 6    | 28   | 5    | 2     | 5     | 5     |",
-            "| 0 | 0 | 3 | 11   | 96   | 11   | 2    | 10   | 36   | 10   | 36   | 11   | 5     | 11    | 9     |",
-            "| 0 | 0 | 4 | 9    | 72   | 9    |      | 14   | 45   | 14   | 45   | 9    | 4     | 9     | 9     |",
-            "+---+---+---+------+------+------+------+------+------+------+------+------+-------+-------+-------+",
-        ];
-        assert_batches_eq!(expected, &actual);
-        Ok(())
-    }
-}
diff --git a/datafusion/core/tests/sqllogictests/test_files/groupby.slt b/datafusion/core/tests/sqllogictests/test_files/groupby.slt
index 0ecede8806..b565b0dc9a 100644
--- a/datafusion/core/tests/sqllogictests/test_files/groupby.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/groupby.slt
@@ -1921,3 +1921,92 @@ SELECT DISTINCT + col1 FROM tab2 AS cor0 GROUP BY cor0.col1
 41
 59
 61
+
+
+
+# Columns in the table are a,b,c,d. Source is CsvExec which is ordered by
+# a,b,c column. Column a has cardinality 2, column b has cardinality 4.
+# Column c has cardinality 100 (unique entries). Column d has cardinality 5.
+statement ok
+CREATE EXTERNAL TABLE annotated_data_infinite2 (
+  a0 INTEGER,
+  a INTEGER,
+  b INTEGER,
+  c INTEGER,
+  d INTEGER
+)
+STORED AS CSV
+WITH HEADER ROW
+WITH ORDER (a ASC, b ASC, c ASC)
+OPTIONS('infinite_source' 'true')
+LOCATION 'tests/data/window_2.csv';
+
+# test_window_agg_sort
+statement ok
+set datafusion.execution.target_partitions = 1;
+
+# test_source_sorted_groupby
+query TT
+EXPLAIN SELECT a, b,
+ SUM(c) as summation1
+ FROM annotated_data_infinite2
+ GROUP BY b, a
+----
+logical_plan
+Projection: annotated_data_infinite2.a, annotated_data_infinite2.b, SUM(annotated_data_infinite2.c) AS summation1
+  Aggregate: groupBy=[[annotated_data_infinite2.b, annotated_data_infinite2.a]], aggr=[[SUM(annotated_data_infinite2.c)]]
+    TableScan: annotated_data_infinite2 projection=[a, b, c]
+physical_plan
+ProjectionExec: expr=[a@1 as a, b@0 as b, SUM(annotated_data_infinite2.c)@2 as summation1]
+  AggregateExec: mode=Single, gby=[b@1 as b, a@0 as a], aggr=[SUM(annotated_data_infinite2.c)], ordering_mode=FullyOrdered
+    CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], has_header=true
+
+
+query III
+ SELECT a, b,
+ SUM(c) as summation1
+ FROM annotated_data_infinite2
+ GROUP BY b, a
+----
+0 0 300
+0 1 925
+1 2 1550
+1 3 2175
+
+
+# test_source_sorted_groupby2
+
+query TT
+EXPLAIN SELECT a, d,
+ SUM(c) as summation1
+ FROM annotated_data_infinite2
+ GROUP BY d, a
+----
+logical_plan
+Projection: annotated_data_infinite2.a, annotated_data_infinite2.d, SUM(annotated_data_infinite2.c) AS summation1
+  Aggregate: groupBy=[[annotated_data_infinite2.d, annotated_data_infinite2.a]], aggr=[[SUM(annotated_data_infinite2.c)]]
+    TableScan: annotated_data_infinite2 projection=[a, c, d]
+physical_plan
+ProjectionExec: expr=[a@1 as a, d@0 as d, SUM(annotated_data_infinite2.c)@2 as summation1]
+  AggregateExec: mode=Single, gby=[d@2 as d, a@0 as a], aggr=[SUM(annotated_data_infinite2.c)], ordering_mode=PartiallyOrdered
+    CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true
+
+query III
+SELECT a, d,
+ SUM(c) as summation1
+ FROM annotated_data_infinite2
+ GROUP BY d, a
+----
+0 0 292
+0 2 196
+0 1 315
+0 4 164
+0 3 258
+1 0 622
+1 3 299
+1 1 1043
+1 4 913
+1 2 848
+
+statement ok
+drop table annotated_data_infinite2;
diff --git a/datafusion/core/tests/sqllogictests/test_files/select.slt b/datafusion/core/tests/sqllogictests/test_files/select.slt
index 26db6cc059..440e7d8010 100644
--- a/datafusion/core/tests/sqllogictests/test_files/select.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/select.slt
@@ -686,3 +686,51 @@ select "INT32" from case_sensitive_table;
 3
 4
 5
+
+# Columns in the table are a,b,c,d. Source is CsvExec which is ordered by
+# a,b,c column. Column a has cardinality 2, column b has cardinality 4.
+# Column c has cardinality 100 (unique entries). Column d has cardinality 5.
+statement ok
+CREATE EXTERNAL TABLE annotated_data_finite2 (
+  a0 INTEGER,
+  a INTEGER,
+  b INTEGER,
+  c INTEGER,
+  d INTEGER
+)
+STORED AS CSV
+WITH HEADER ROW
+WITH ORDER (a ASC, b ASC, c ASC)
+LOCATION 'tests/data/window_2.csv';
+
+# test_source_projection
+
+
+#  Final plan shouldn't include SortExec.
+query TT
+EXPLAIN SELECT a FROM annotated_data_finite2
+        ORDER BY a
+        LIMIT 5
+----
+logical_plan
+Limit: skip=0, fetch=5
+  Sort: annotated_data_finite2.a ASC NULLS LAST, fetch=5
+    TableScan: annotated_data_finite2 projection=[a]
+physical_plan
+GlobalLimitExec: skip=0, fetch=5
+  CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a], output_ordering=[a@0 ASC NULLS LAST], has_header=true
+
+query I
+SELECT a FROM annotated_data_finite2
+        ORDER BY a
+        LIMIT 5
+----
+0
+0
+0
+0
+0
+
+
+statement ok
+drop table annotated_data_finite2;
diff --git a/datafusion/core/tests/sqllogictests/test_files/window.slt b/datafusion/core/tests/sqllogictests/test_files/window.slt
index 2027358f7c..9790871ff2 100644
--- a/datafusion/core/tests/sqllogictests/test_files/window.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/window.slt
@@ -2378,3 +2378,625 @@ select row_number() over (rows between null preceding and current row) from (sel
 # invalid window frame. negative as following
 statement error DataFusion error: Error during planning: Invalid window frame: frame offsets must be non negative integers 
 select row_number() over (rows between current row and -1 following) from (select 1 a) x
+
+
+# The following query has type error. We should test the error could be detected
+# from either the logical plan (when `skip_failed_rules` is set to `false`) or
+# the physical plan (when `skip_failed_rules` is set to `true`).
+
+# We should remove the type checking in physical plan after we don't skip
+# the failed optimizing rules by default.
+# (see more in https://github.com/apache/arrow-datafusion/issues/4615)
+
+statement ok
+set datafusion.optimizer.skip_failed_rules = true
+
+# Error is returned from the physical plan.
+query error Cannot cast Utf8\("1 DAY"\) to Int8
+SELECT
+  COUNT(c1) OVER (ORDER BY c2 RANGE BETWEEN '1 DAY' PRECEDING AND '2 DAY' FOLLOWING)
+  FROM aggregate_test_100;
+
+statement ok
+set datafusion.optimizer.skip_failed_rules = true
+
+# Error is returned from the logical plan.
+query error Cannot cast Utf8\("1 DAY"\) to Int8
+SELECT
+  COUNT(c1) OVER (ORDER BY c2 RANGE BETWEEN '1 DAY' PRECEDING AND '2 DAY' FOLLOWING)
+  FROM aggregate_test_100;
+
+
+# 100 rows. Columns in the table are ts, inc_col, desc_col.
+# Source is CsvExec which is ordered by ts column.
+# Normal, non infinite source
+statement ok
+CREATE EXTERNAL TABLE annotated_data_finite (
+  ts INTEGER,
+  inc_col INTEGER,
+  desc_col INTEGER,
+)
+STORED AS CSV
+WITH HEADER ROW
+WITH ORDER (ts ASC)
+LOCATION 'tests/data/window_1.csv'
+;
+
+# 100 rows. Columns in the table are ts, inc_col, desc_col.
+# Source is CsvExec which is ordered by ts column.
+# Infinite source
+statement ok
+CREATE EXTERNAL TABLE annotated_data_infinite (
+  ts INTEGER,
+  inc_col INTEGER,
+  desc_col INTEGER,
+)
+STORED AS CSV
+WITH HEADER ROW
+WITH ORDER (ts ASC)
+OPTIONS('infinite_source' 'true')
+LOCATION 'tests/data/window_1.csv';
+
+# test_source_sorted_aggregate
+
+query TT
+EXPLAIN SELECT
+  SUM(inc_col) OVER(ORDER BY ts RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING) as sum1,
+  SUM(desc_col) OVER(ORDER BY ts RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING) as sum2,
+  SUM(inc_col) OVER(ORDER BY ts ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING) as sum3,
+  MIN(inc_col) OVER(ORDER BY ts RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING) as min1,
+  MIN(desc_col) OVER(ORDER BY ts RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING) as min2,
+  MIN(inc_col) OVER(ORDER BY ts ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING) as min3,
+  MAX(inc_col) OVER(ORDER BY ts RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING) as max1,
+  MAX(desc_col) OVER(ORDER BY ts RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING) as max2,
+  MAX(inc_col) OVER(ORDER BY ts ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING) as max3,
+  COUNT(*) OVER(ORDER BY ts RANGE BETWEEN 4 PRECEDING AND 8 FOLLOWING) as cnt1,
+  COUNT(*) OVER(ORDER BY ts ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING) as cnt2,
+  SUM(inc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING) as sumr1,
+  SUM(desc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 1 PRECEDING AND 8 FOLLOWING) as sumr2,
+  SUM(desc_col) OVER(ORDER BY ts DESC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sumr3,
+  MIN(inc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING) as minr1,
+  MIN(desc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING) as minr2,
+  MIN(inc_col) OVER(ORDER BY ts DESC ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING) as minr3,
+  MAX(inc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING) as maxr1,
+  MAX(desc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING) as maxr2,
+  MAX(inc_col) OVER(ORDER BY ts DESC ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING) as maxr3,
+  COUNT(*) OVER(ORDER BY ts DESC RANGE BETWEEN 6 PRECEDING AND 2 FOLLOWING) as cntr1,
+  COUNT(*) OVER(ORDER BY ts DESC ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING) as cntr2,
+  SUM(desc_col) OVER(ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING) as sum4,
+  COUNT(*) OVER(ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING) as cnt3
+  FROM annotated_data_finite
+  ORDER BY inc_col DESC
+  LIMIT 5;
+----
+logical_plan
+Projection: sum1, sum2, sum3, min1, min2, min3, max1, max2, max3, cnt1, cnt2, sumr1, sumr2, sumr3, minr1, minr2, minr3, maxr1, maxr2, maxr3, cntr1, cntr2, sum4, cnt3
+  Limit: skip=0, fetch=5
+    Sort: annotated_data_finite.inc_col DESC NULLS FIRST, fetch=5
+      Projection: SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING AS sum1, SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING AS sum2, SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING AS sum3, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_f [...]
+        WindowAggr: windowExpr=[[SUM(annotated_data_finite.desc_col) ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING, COUNT(UInt8(1)) ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING]]
+          Projection: annotated_data_finite.inc_col, annotated_data_finite.desc_col, SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING, SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 8 FOLLOWING, SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING, MIN(annotated [...]
+  WindowAggr: windowExpr=[[SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NU [...]
+    WindowAggr: windowExpr=[[SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING, SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 8 FOLLOWING, SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts [...]
+      TableScan: annotated_data_finite projection=[ts, inc_col, desc_col]
+physical_plan
+ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, sum3@2 as sum3, min1@3 as min1, min2@4 as min2, min3@5 as min3, max1@6 as max1, max2@7 as max2, max3@8 as max3, cnt1@9 as cnt1, cnt2@10 as cnt2, sumr1@11 as sumr1, sumr2@12 as sumr2, sumr3@13 as sumr3, minr1@14 as minr1, minr2@15 as minr2, minr3@16 as minr3, maxr1@17 as maxr1, maxr2@18 as maxr2, maxr3@19 as maxr3, cntr1@20 as cntr1, cntr2@21 as cntr2, sum4@22 as sum4, cnt3@23 as cnt3]
+  GlobalLimitExec: skip=0, fetch=5
+    SortExec: fetch=5, expr=[inc_col@24 DESC]
+      ProjectionExec: expr=[SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@13 as sum1, SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING@14 as sum2, SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING@15 as sum3, MIN(annotated_data_finite.inc_col) ORDER B [...]
+        BoundedWindowAggExec: wdw=[SUM(annotated_data_finite.desc_col): Ok(Field { name: "SUM(annotated_data_finite.desc_col)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(8)), end_bound: Following(UInt64(1)) }, COUNT(UInt8(1)): Ok(Field { name: "COUNT(UInt8(1))", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, [...]
+          ProjectionExec: expr=[inc_col@1 as inc_col, desc_col@2 as desc_col, SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING@3 as SUM(annotated_data_finite.inc_col), SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 8 FOLLOWING@4 as SUM(annotated_data_finite.desc_col), SUM(annotated_data_finite.desc_col) ORDER BY [annotated_data_finite.ts [...]
+  BoundedWindowAggExec: wdw=[SUM(annotated_data_finite.inc_col): Ok(Field { name: "SUM(annotated_data_finite.inc_col)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(10)), end_bound: Following(Int32(1)) }, SUM(annotated_data_finite.desc_col): Ok(Field { name: "SUM(annotated_data_finite.desc_col)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), f [...]
+    BoundedWindowAggExec: wdw=[SUM(annotated_data_finite.inc_col): Ok(Field { name: "SUM(annotated_data_finite.inc_col)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(4)), end_bound: Following(Int32(1)) }, SUM(annotated_data_finite.desc_col): Ok(Field { name: "SUM(annotated_data_finite.desc_col)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }),  [...]
+      CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_1.csv]]}, projection=[ts, inc_col, desc_col], output_ordering=[ts@0 ASC NULLS LAST], has_header=true
+
+query IIIIIIIIIIIIIIIIIIIIIIII
+SELECT
+  SUM(inc_col) OVER(ORDER BY ts RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING) as sum1,
+  SUM(desc_col) OVER(ORDER BY ts RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING) as sum2,
+  SUM(inc_col) OVER(ORDER BY ts ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING) as sum3,
+  MIN(inc_col) OVER(ORDER BY ts RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING) as min1,
+  MIN(desc_col) OVER(ORDER BY ts RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING) as min2,
+  MIN(inc_col) OVER(ORDER BY ts ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING) as min3,
+  MAX(inc_col) OVER(ORDER BY ts RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING) as max1,
+  MAX(desc_col) OVER(ORDER BY ts RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING) as max2,
+  MAX(inc_col) OVER(ORDER BY ts ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING) as max3,
+  COUNT(*) OVER(ORDER BY ts RANGE BETWEEN 4 PRECEDING AND 8 FOLLOWING) as cnt1,
+  COUNT(*) OVER(ORDER BY ts ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING) as cnt2,
+  SUM(inc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 1 PRECEDING AND 4 FOLLOWING) as sumr1,
+  SUM(desc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 1 PRECEDING AND 8 FOLLOWING) as sumr2,
+  SUM(desc_col) OVER(ORDER BY ts DESC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sumr3,
+  MIN(inc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING) as minr1,
+  MIN(desc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING) as minr2,
+  MIN(inc_col) OVER(ORDER BY ts DESC ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING) as minr3,
+  MAX(inc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING) as maxr1,
+  MAX(desc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 5 PRECEDING AND 1 FOLLOWING) as maxr2,
+  MAX(inc_col) OVER(ORDER BY ts DESC ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING) as maxr3,
+  COUNT(*) OVER(ORDER BY ts DESC RANGE BETWEEN 6 PRECEDING AND 2 FOLLOWING) as cntr1,
+  COUNT(*) OVER(ORDER BY ts DESC ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING) as cntr2,
+  SUM(desc_col) OVER(ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING) as sum4,
+  COUNT(*) OVER(ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING) as cnt3
+  FROM annotated_data_finite
+  ORDER BY inc_col DESC
+  LIMIT 5;
+----
+1482 -631 606 289 -213 301 305 -208 305 3 9 902 -834 -1231 301 -213 269 305 -210 305 3 2 -1797 9
+1482 -631 902 289 -213 296 305 -208 305 3 10 902 -834 -1424 301 -213 266 305 -210 305 3 3 -1978 10
+876 -411 1193 289 -208 291 296 -203 305 4 10 587 -612 -1400 296 -213 261 305 -208 301 3 4 -1941 10
+866 -404 1482 286 -203 289 291 -201 305 5 10 580 -600 -1374 291 -208 259 305 -203 296 4 5 -1903 10
+1411 -397 1768 275 -201 286 289 -196 305 4 10 575 -590 -1347 289 -203 254 305 -201 291 2 6 -1863 10
+
+
+
+# test_source_sorted_builtin
+query TT
+EXPLAIN SELECT
+  FIRST_VALUE(inc_col) OVER(ORDER BY ts RANGE BETWEEN 10 PRECEDING and 1 FOLLOWING) as fv1,
+  FIRST_VALUE(inc_col) OVER(ORDER BY ts ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as fv2,
+  LAST_VALUE(inc_col) OVER(ORDER BY ts RANGE BETWEEN 10 PRECEDING and 1 FOLLOWING) as lv1,
+  LAST_VALUE(inc_col) OVER(ORDER BY ts ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as lv2,
+  NTH_VALUE(inc_col, 5) OVER(ORDER BY ts RANGE BETWEEN 10 PRECEDING and 1 FOLLOWING) as nv1,
+  NTH_VALUE(inc_col, 5) OVER(ORDER BY ts ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as nv2,
+  ROW_NUMBER() OVER(ORDER BY ts RANGE BETWEEN 1 PRECEDING and 10 FOLLOWING) AS rn1,
+  ROW_NUMBER() OVER(ORDER BY ts ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as rn2,
+  RANK() OVER(ORDER BY ts RANGE BETWEEN 1 PRECEDING and 10 FOLLOWING) AS rank1,
+  RANK() OVER(ORDER BY ts ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as rank2,
+  DENSE_RANK() OVER(ORDER BY ts RANGE BETWEEN 1 PRECEDING and 10 FOLLOWING) AS dense_rank1,
+  DENSE_RANK() OVER(ORDER BY ts ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as dense_rank2,
+  LAG(inc_col, 1, 1001) OVER(ORDER BY ts RANGE BETWEEN 1 PRECEDING and 10 FOLLOWING) AS lag1,
+  LAG(inc_col, 2, 1002) OVER(ORDER BY ts ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as lag2,
+  LEAD(inc_col, -1, 1001) OVER(ORDER BY ts RANGE BETWEEN 1 PRECEDING and 10 FOLLOWING) AS lead1,
+  LEAD(inc_col, 4, 1004) OVER(ORDER BY ts ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as lead2,
+  FIRST_VALUE(inc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 10 PRECEDING and 1 FOLLOWING) as fvr1,
+  FIRST_VALUE(inc_col) OVER(ORDER BY ts DESC ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as fvr2,
+  LAST_VALUE(inc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 10 PRECEDING and 1 FOLLOWING) as lvr1,
+  LAST_VALUE(inc_col) OVER(ORDER BY ts DESC ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as lvr2,
+  LAG(inc_col, 1, 1001) OVER(ORDER BY ts DESC RANGE BETWEEN 1 PRECEDING and 10 FOLLOWING) AS lagr1,
+  LAG(inc_col, 2, 1002) OVER(ORDER BY ts DESC ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as lagr2,
+  LEAD(inc_col, -1, 1001) OVER(ORDER BY ts DESC RANGE BETWEEN 1 PRECEDING and 10 FOLLOWING) AS leadr1,
+  LEAD(inc_col, 4, 1004) OVER(ORDER BY ts DESC ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as leadr2
+  FROM annotated_data_finite
+  ORDER BY ts DESC
+  LIMIT 5;
+----
+logical_plan
+Projection: fv1, fv2, lv1, lv2, nv1, nv2, rn1, rn2, rank1, rank2, dense_rank1, dense_rank2, lag1, lag2, lead1, lead2, fvr1, fvr2, lvr1, lvr2, lagr1, lagr2, leadr1, leadr2
+  Limit: skip=0, fetch=5
+    Sort: annotated_data_finite.ts DESC NULLS FIRST, fetch=5
+      Projection: FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING AS fv1, FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING AS fv2, LAST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING AS lv1, LAST_VALUE(annotated_data_finite.inc_col) [...]
+        WindowAggr: windowExpr=[[FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING, LAST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, LAST_VALUE(annotated_data_finite.inc_col) ORDER [...]
+          WindowAggr: windowExpr=[[FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING, LAST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, LAST_VALUE(annotated_data_finite.inc_co [...]
+  TableScan: annotated_data_finite projection=[ts, inc_col]
+physical_plan
+ProjectionExec: expr=[fv1@0 as fv1, fv2@1 as fv2, lv1@2 as lv1, lv2@3 as lv2, nv1@4 as nv1, nv2@5 as nv2, rn1@6 as rn1, rn2@7 as rn2, rank1@8 as rank1, rank2@9 as rank2, dense_rank1@10 as dense_rank1, dense_rank2@11 as dense_rank2, lag1@12 as lag1, lag2@13 as lag2, lead1@14 as lead1, lead2@15 as lead2, fvr1@16 as fvr1, fvr2@17 as fvr2, lvr1@18 as lvr1, lvr2@19 as lvr2, lagr1@20 as lagr1, lagr2@21 as lagr2, leadr1@22 as leadr1, leadr2@23 as leadr2]
+  GlobalLimitExec: skip=0, fetch=5
+    SortExec: fetch=5, expr=[ts@24 DESC]
+      ProjectionExec: expr=[FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@10 as fv1, FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@11 as fv2, LAST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@12 as lv1, LAST_VALUE(annotated_d [...]
+        BoundedWindowAggExec: wdw=[FIRST_VALUE(annotated_data_finite.inc_col): Ok(Field { name: "FIRST_VALUE(annotated_data_finite.inc_col)", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(10)), end_bound: Following(Int32(1)) }, FIRST_VALUE(annotated_data_finite.inc_col): Ok(Field { name: "FIRST_VALUE(annotated_data_finite.inc_col)", data_type: Int32, nullable: true, dict_id: 0, dict_i [...]
+          BoundedWindowAggExec: wdw=[FIRST_VALUE(annotated_data_finite.inc_col): Ok(Field { name: "FIRST_VALUE(annotated_data_finite.inc_col)", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(1)), end_bound: Following(Int32(10)) }, FIRST_VALUE(annotated_data_finite.inc_col): Ok(Field { name: "FIRST_VALUE(annotated_data_finite.inc_col)", data_type: Int32, nullable: true, dict_id: 0, dict [...]
+  CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_1.csv]]}, projection=[ts, inc_col], output_ordering=[ts@0 ASC NULLS LAST], has_header=true
+
+query IIIIIIIIIIIIIIIIIIIIIIII
+SELECT
+  FIRST_VALUE(inc_col) OVER(ORDER BY ts RANGE BETWEEN 10 PRECEDING and 1 FOLLOWING) as fv1,
+  FIRST_VALUE(inc_col) OVER(ORDER BY ts ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as fv2,
+  LAST_VALUE(inc_col) OVER(ORDER BY ts RANGE BETWEEN 10 PRECEDING and 1 FOLLOWING) as lv1,
+  LAST_VALUE(inc_col) OVER(ORDER BY ts ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as lv2,
+  NTH_VALUE(inc_col, 5) OVER(ORDER BY ts RANGE BETWEEN 10 PRECEDING and 1 FOLLOWING) as nv1,
+  NTH_VALUE(inc_col, 5) OVER(ORDER BY ts ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as nv2,
+  ROW_NUMBER() OVER(ORDER BY ts RANGE BETWEEN 1 PRECEDING and 10 FOLLOWING) AS rn1,
+  ROW_NUMBER() OVER(ORDER BY ts ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as rn2,
+  RANK() OVER(ORDER BY ts RANGE BETWEEN 1 PRECEDING and 10 FOLLOWING) AS rank1,
+  RANK() OVER(ORDER BY ts ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as rank2,
+  DENSE_RANK() OVER(ORDER BY ts RANGE BETWEEN 1 PRECEDING and 10 FOLLOWING) AS dense_rank1,
+  DENSE_RANK() OVER(ORDER BY ts ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as dense_rank2,
+  LAG(inc_col, 1, 1001) OVER(ORDER BY ts RANGE BETWEEN 1 PRECEDING and 10 FOLLOWING) AS lag1,
+  LAG(inc_col, 2, 1002) OVER(ORDER BY ts ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as lag2,
+  LEAD(inc_col, -1, 1001) OVER(ORDER BY ts RANGE BETWEEN 1 PRECEDING and 10 FOLLOWING) AS lead1,
+  LEAD(inc_col, 4, 1004) OVER(ORDER BY ts ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as lead2,
+  FIRST_VALUE(inc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 10 PRECEDING and 1 FOLLOWING) as fvr1,
+  FIRST_VALUE(inc_col) OVER(ORDER BY ts DESC ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as fvr2,
+  LAST_VALUE(inc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 10 PRECEDING and 1 FOLLOWING) as lvr1,
+  LAST_VALUE(inc_col) OVER(ORDER BY ts DESC ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as lvr2,
+  LAG(inc_col, 1, 1001) OVER(ORDER BY ts DESC RANGE BETWEEN 1 PRECEDING and 10 FOLLOWING) AS lagr1,
+  LAG(inc_col, 2, 1002) OVER(ORDER BY ts DESC ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as lagr2,
+  LEAD(inc_col, -1, 1001) OVER(ORDER BY ts DESC RANGE BETWEEN 1 PRECEDING and 10 FOLLOWING) AS leadr1,
+  LEAD(inc_col, 4, 1004) OVER(ORDER BY ts DESC ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as leadr2
+  FROM annotated_data_finite
+  ORDER BY ts DESC
+  LIMIT 5;
+----
+289 269 305 305 305 283 100 100 99 99 86 86 301 296 301 1004 305 305 301 301 1001 1002 1001 289
+289 266 305 305 305 278 99 99 99 99 86 86 296 291 296 1004 305 305 301 296 305 1002 305 286
+289 261 296 301 NULL 275 98 98 98 98 85 85 291 289 291 1004 305 305 296 291 301 305 301 283
+286 259 291 296 NULL 272 97 97 97 97 84 84 289 286 289 1004 305 305 291 289 296 301 296 278
+275 254 289 291 289 269 96 96 96 96 83 83 286 283 286 305 305 305 289 286 291 296 291 275
+
+
+# test_source_sorted_unbounded_preceding
+
+query TT
+EXPLAIN SELECT
+  SUM(inc_col) OVER(ORDER BY ts ASC RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING) as sum1,
+  SUM(inc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING) as sum2,
+  MIN(inc_col) OVER(ORDER BY ts ASC RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING) as min1,
+  MIN(inc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING) as min2,
+  MAX(inc_col) OVER(ORDER BY ts ASC RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING) as max1,
+  MAX(inc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING) as max2,
+  COUNT(inc_col) OVER(ORDER BY ts ASC RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING) as count1,
+  COUNT(inc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING) as count2,
+  AVG(inc_col) OVER(ORDER BY ts ASC RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING) as avg1,
+  AVG(inc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING) as avg2
+  FROM annotated_data_finite
+  ORDER BY inc_col ASC
+  LIMIT 5
+----
+logical_plan
+Projection: sum1, sum2, min1, min2, max1, max2, count1, count2, avg1, avg2
+  Limit: skip=0, fetch=5
+    Sort: annotated_data_finite.inc_col ASC NULLS LAST, fetch=5
+      Projection: SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING AS sum1, SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING AS sum2, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING AS min1, MIN(annotated_data_finite.inc_col) OR [...]
+        WindowAggr: windowExpr=[[SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING, MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING, COUNT(annotated_data_finite.inc_col) ORDER BY [a [...]
+          WindowAggr: windowExpr=[[SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING, MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING, COUNT(annotated_data_finite.inc_col) ORD [...]
+  TableScan: annotated_data_finite projection=[ts, inc_col]
+physical_plan
+ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, min1@2 as min1, min2@3 as min2, max1@4 as max1, max2@5 as max2, count1@6 as count1, count2@7 as count2, avg1@8 as avg1, avg2@9 as avg2]
+  GlobalLimitExec: skip=0, fetch=5
+    SortExec: fetch=5, expr=[inc_col@10 ASC NULLS LAST]
+      ProjectionExec: expr=[SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING@7 as sum1, SUM(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING@2 as sum2, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING@8 as min1, MIN(annotated_data_fi [...]
+        BoundedWindowAggExec: wdw=[SUM(annotated_data_finite.inc_col): Ok(Field { name: "SUM(annotated_data_finite.inc_col)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: Following(Int32(5)) }, MIN(annotated_data_finite.inc_col): Ok(Field { name: "MIN(annotated_data_finite.inc_col)", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} [...]
+          BoundedWindowAggExec: wdw=[SUM(annotated_data_finite.inc_col): Ok(Field { name: "SUM(annotated_data_finite.inc_col)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: Following(Int32(3)) }, MIN(annotated_data_finite.inc_col): Ok(Field { name: "MIN(annotated_data_finite.inc_col)", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata:  [...]
+  CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_1.csv]]}, projection=[ts, inc_col], output_ordering=[ts@0 ASC NULLS LAST], has_header=true
+
+query IIIIIIIIRR
+SELECT
+  SUM(inc_col) OVER(ORDER BY ts ASC RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING) as sum1,
+  SUM(inc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING) as sum2,
+  MIN(inc_col) OVER(ORDER BY ts ASC RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING) as min1,
+  MIN(inc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING) as min2,
+  MAX(inc_col) OVER(ORDER BY ts ASC RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING) as max1,
+  MAX(inc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING) as max2,
+  COUNT(inc_col) OVER(ORDER BY ts ASC RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING) as count1,
+  COUNT(inc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING) as count2,
+  AVG(inc_col) OVER(ORDER BY ts ASC RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING) as avg1,
+  AVG(inc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING) as avg2
+  FROM annotated_data_finite
+  ORDER BY inc_col ASC
+  LIMIT 5
+----
+16 6 1 1 10 5 3 2 5.333333333333 3
+16 6 1 1 10 5 3 2 5.333333333333 3
+51 16 1 1 20 10 5 3 10.2 5.333333333333
+72 72 1 1 21 21 6 6 12 12
+72 72 1 1 21 21 6 6 12 12
+
+
+# test_source_sorted_unbounded_preceding_builtin
+
+query TT
+EXPLAIN SELECT
+ FIRST_VALUE(inc_col) OVER(ORDER BY ts ASC ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING) as first_value1,
+ FIRST_VALUE(inc_col) OVER(ORDER BY ts DESC ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING) as first_value2,
+ LAST_VALUE(inc_col) OVER(ORDER BY ts ASC ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING) as last_value1,
+ LAST_VALUE(inc_col) OVER(ORDER BY ts DESC ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING) as last_value2,
+ NTH_VALUE(inc_col, 2) OVER(ORDER BY ts ASC ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING) as nth_value1
+ FROM annotated_data_finite
+ ORDER BY inc_col ASC
+ LIMIT 5
+----
+logical_plan
+Projection: first_value1, first_value2, last_value1, last_value2, nth_value1
+  Limit: skip=0, fetch=5
+    Sort: annotated_data_finite.inc_col ASC NULLS LAST, fetch=5
+      Projection: FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING AS first_value1, FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING AS first_value2, LAST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING AS last_va [...]
+        WindowAggr: windowExpr=[[FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING, LAST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING, NTH_VALUE(annotated_data_finite.inc_col, Int64(2)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING]]
+          WindowAggr: windowExpr=[[FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING, LAST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING]]
+  TableScan: annotated_data_finite projection=[ts, inc_col]
+physical_plan
+ProjectionExec: expr=[first_value1@0 as first_value1, first_value2@1 as first_value2, last_value1@2 as last_value1, last_value2@3 as last_value2, nth_value1@4 as nth_value1]
+  GlobalLimitExec: skip=0, fetch=5
+    SortExec: fetch=5, expr=[inc_col@5 ASC NULLS LAST]
+      ProjectionExec: expr=[FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING@4 as first_value1, FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING@2 as first_value2, LAST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOW [...]
+        BoundedWindowAggExec: wdw=[FIRST_VALUE(annotated_data_finite.inc_col): Ok(Field { name: "FIRST_VALUE(annotated_data_finite.inc_col)", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(1)) }, LAST_VALUE(annotated_data_finite.inc_col): Ok(Field { name: "LAST_VALUE(annotated_data_finite.inc_col)", data_type: Int32, nullable: true, dict_id: 0, dict_ [...]
+          BoundedWindowAggExec: wdw=[FIRST_VALUE(annotated_data_finite.inc_col): Ok(Field { name: "FIRST_VALUE(annotated_data_finite.inc_col)", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(3)) }, LAST_VALUE(annotated_data_finite.inc_col): Ok(Field { name: "LAST_VALUE(annotated_data_finite.inc_col)", data_type: Int32, nullable: true, dict_id: 0, dic [...]
+  CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_1.csv]]}, projection=[ts, inc_col], output_ordering=[ts@0 ASC NULLS LAST], has_header=true
+
+query IIIII
+SELECT
+ FIRST_VALUE(inc_col) OVER(ORDER BY ts ASC ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING) as first_value1,
+ FIRST_VALUE(inc_col) OVER(ORDER BY ts DESC ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING) as first_value2,
+ LAST_VALUE(inc_col) OVER(ORDER BY ts ASC ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING) as last_value1,
+ LAST_VALUE(inc_col) OVER(ORDER BY ts DESC ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING) as last_value2,
+ NTH_VALUE(inc_col, 2) OVER(ORDER BY ts ASC ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING) as nth_value1
+ FROM annotated_data_finite
+ ORDER BY inc_col ASC
+ LIMIT 5
+----
+1 15 5 1 5
+1 20 10 1 5
+1 21 15 1 5
+1 26 20 1 5
+1 29 21 1 5
+
+
+# test_source_sorted_unbounded_source
+query TT
+EXPLAIN SELECT
+ SUM(inc_col) OVER(ORDER BY ts ASC ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING) as sum1,
+ SUM(inc_col) OVER(ORDER BY ts DESC ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING) as sum2,
+ COUNT(inc_col) OVER(ORDER BY ts ASC ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING) as count1,
+ COUNT(inc_col) OVER(ORDER BY ts DESC ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING) as count2
+ FROM annotated_data_infinite
+ ORDER BY ts ASC
+ LIMIT 5
+----
+logical_plan
+Projection: sum1, sum2, count1, count2
+  Limit: skip=0, fetch=5
+    Sort: annotated_data_infinite.ts ASC NULLS LAST, fetch=5
+      Projection: SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING AS sum1, SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING AS sum2, COUNT(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING AS count1, COUNT(annotated_data_inf [...]
+        WindowAggr: windowExpr=[[SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING, COUNT(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING]]
+          WindowAggr: windowExpr=[[SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING, COUNT(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING]]
+  TableScan: annotated_data_infinite projection=[ts, inc_col]
+physical_plan
+ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, count1@2 as count1, count2@3 as count2]
+  GlobalLimitExec: skip=0, fetch=5
+    ProjectionExec: expr=[SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING@4 as sum1, SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING@2 as sum2, COUNT(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING@5 as count1, COUNT(anno [...]
+      BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite.inc_col): Ok(Field { name: "SUM(annotated_data_infinite.inc_col)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(1)) }, COUNT(annotated_data_infinite.inc_col): Ok(Field { name: "COUNT(annotated_data_infinite.inc_col)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, m [...]
+        BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite.inc_col): Ok(Field { name: "SUM(annotated_data_infinite.inc_col)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(3)) }, COUNT(annotated_data_infinite.inc_col): Ok(Field { name: "COUNT(annotated_data_infinite.inc_col)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, [...]
+          CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_1.csv]]}, projection=[ts, inc_col], infinite_source=true, output_ordering=[ts@0 ASC NULLS LAST], has_header=true
+
+
+query IIII
+SELECT
+ SUM(inc_col) OVER(ORDER BY ts ASC ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING) as sum1,
+ SUM(inc_col) OVER(ORDER BY ts DESC ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING) as sum2,
+ COUNT(inc_col) OVER(ORDER BY ts ASC ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING) as count1,
+ COUNT(inc_col) OVER(ORDER BY ts DESC ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING) as count2
+ FROM annotated_data_infinite
+ ORDER BY ts ASC
+ LIMIT 5
+----
+6 31 2 4
+16 51 3 5
+31 72 4 6
+51 98 5 7
+72 127 6 8
+
+
+# test_source_sorted_unbounded_source
+
+query TT
+EXPLAIN SELECT
+ SUM(inc_col) OVER(ORDER BY ts ASC ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING) as sum1,
+ SUM(inc_col) OVER(ORDER BY ts DESC ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING) as sum2,
+ COUNT(inc_col) OVER(ORDER BY ts ASC ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING) as count1,
+ COUNT(inc_col) OVER(ORDER BY ts DESC ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING) as count2
+ FROM annotated_data_infinite
+ ORDER BY ts ASC
+ LIMIT 5
+----
+logical_plan
+Projection: sum1, sum2, count1, count2
+  Limit: skip=0, fetch=5
+    Sort: annotated_data_infinite.ts ASC NULLS LAST, fetch=5
+      Projection: SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING AS sum1, SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING AS sum2, COUNT(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING AS count1, COUNT(annotated_data_inf [...]
+        WindowAggr: windowExpr=[[SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING, COUNT(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING]]
+          WindowAggr: windowExpr=[[SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING, COUNT(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING]]
+  TableScan: annotated_data_infinite projection=[ts, inc_col]
+physical_plan
+ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, count1@2 as count1, count2@3 as count2]
+  GlobalLimitExec: skip=0, fetch=5
+    ProjectionExec: expr=[SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING@4 as sum1, SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING@2 as sum2, COUNT(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING@5 as count1, COUNT(anno [...]
+      BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite.inc_col): Ok(Field { name: "SUM(annotated_data_infinite.inc_col)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(1)) }, COUNT(annotated_data_infinite.inc_col): Ok(Field { name: "COUNT(annotated_data_infinite.inc_col)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, m [...]
+        BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite.inc_col): Ok(Field { name: "SUM(annotated_data_infinite.inc_col)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(3)) }, COUNT(annotated_data_infinite.inc_col): Ok(Field { name: "COUNT(annotated_data_infinite.inc_col)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, [...]
+          CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_1.csv]]}, projection=[ts, inc_col], infinite_source=true, output_ordering=[ts@0 ASC NULLS LAST], has_header=true
+
+
+query IIII
+SELECT
+ SUM(inc_col) OVER(ORDER BY ts ASC ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING) as sum1,
+ SUM(inc_col) OVER(ORDER BY ts DESC ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING) as sum2,
+ COUNT(inc_col) OVER(ORDER BY ts ASC ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING) as count1,
+ COUNT(inc_col) OVER(ORDER BY ts DESC ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING) as count2
+ FROM annotated_data_infinite
+ ORDER BY ts ASC
+ LIMIT 5
+----
+6 31 2 4
+16 51 3 5
+31 72 4 6
+51 98 5 7
+72 127 6 8
+
+
+
+
+statement ok
+drop table annotated_data_finite
+
+
+statement ok
+drop table annotated_data_infinite
+
+
+
+# Columns in the table are a,b,c,d. Source is CsvExec which is ordered by
+# a,b,c column. Column a has cardinality 2, column b has cardinality 4.
+# Column c has cardinality 100 (unique entries). Column d has cardinality 5.
+statement ok
+CREATE EXTERNAL TABLE annotated_data_finite2 (
+  a0 INTEGER,
+  a INTEGER,
+  b INTEGER,
+  c INTEGER,
+  d INTEGER
+)
+STORED AS CSV
+WITH HEADER ROW
+WITH ORDER (a ASC, b ASC, c ASC)
+LOCATION 'tests/data/window_2.csv';
+
+# Columns in the table are a,b,c,d. Source is CsvExec which is ordered by
+# a,b,c column. Column a has cardinality 2, column b has cardinality 4.
+# Column c has cardinality 100 (unique entries). Column d has cardinality 5.
+statement ok
+CREATE EXTERNAL TABLE annotated_data_infinite2 (
+  a0 INTEGER,
+  a INTEGER,
+  b INTEGER,
+  c INTEGER,
+  d INTEGER
+)
+STORED AS CSV
+WITH HEADER ROW
+WITH ORDER (a ASC, b ASC, c ASC)
+OPTIONS('infinite_source' 'true')
+LOCATION 'tests/data/window_2.csv';
+
+
+# test_infinite_source_partition_by
+
+query TT
+EXPLAIN SELECT a, b, c,
+        SUM(c) OVER(PARTITION BY a, d ORDER BY b, c ASC ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING) as sum1,
+        SUM(c) OVER(PARTITION BY a, d ORDER BY b, c ASC ROWS BETWEEN 1 FOLLOWING AND 5 FOLLOWING) as sum2,
+        SUM(c) OVER(PARTITION BY d ORDER BY a, b, c ASC ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING) as sum3,
+        SUM(c) OVER(PARTITION BY d ORDER BY a, b, c ASC ROWS BETWEEN 5 PRECEDING AND 1 PRECEDING) as sum4,
+        SUM(c) OVER(PARTITION BY a, b ORDER BY c ASC ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING) as sum5,
+        SUM(c) OVER(PARTITION BY a, b ORDER BY c ASC ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING) as sum6,
+        SUM(c) OVER(PARTITION BY b, a ORDER BY c ASC ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING) as sum7,
+        SUM(c) OVER(PARTITION BY b, a ORDER BY c ASC ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING) as sum8,
+        SUM(c) OVER(PARTITION BY a, b, d ORDER BY c ASC ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING) as sum9,
+        SUM(c) OVER(PARTITION BY a, b, d ORDER BY c ASC ROWS BETWEEN 5 PRECEDING AND CURRENT ROW) as sum10,
+        SUM(c) OVER(PARTITION BY b, a, d ORDER BY c ASC ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING) as sum11,
+        SUM(c) OVER(PARTITION BY b, a, d ORDER BY c ASC ROWS BETWEEN CURRENT ROW  AND 1 FOLLOWING) as sum12
+ FROM annotated_data_infinite2
+ LIMIT 5
+----
+logical_plan
+Projection: annotated_data_infinite2.a, annotated_data_infinite2.b, annotated_data_infinite2.c, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING AS sum1, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS [...]
+  Limit: skip=0, fetch=5
+    WindowAggr: windowExpr=[[SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS LAST, annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.a ASC NULLS LAST, annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC NULLS L [...]
+      WindowAggr: windowExpr=[[SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN CURRENT ROW AND 1 FOLLOWING]]
+        WindowAggr: windowExpr=[[SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING]]
+          WindowAggr: windowExpr=[[SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 1 FOLL [...]
+  WindowAggr: windowExpr=[[SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND CURRENT ROW]]
+    WindowAggr: windowExpr=[[SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING]]
+      TableScan: annotated_data_infinite2 projection=[a, b, c, d]
+physical_plan
+ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING@8 as sum1, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.c ASC NULL [...]
+  GlobalLimitExec: skip=0, fetch=5
+    BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.c): Ok(Field { name: "SUM(annotated_data_infinite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data_infinite2.c): Ok(Field { name: "SUM(annotated_data_infinite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: Window [...]
+      BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.c): Ok(Field { name: "SUM(annotated_data_infinite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data_infinite2.c): Ok(Field { name: "SUM(annotated_data_infinite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: Wind [...]
+        BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.c): Ok(Field { name: "SUM(annotated_data_infinite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data_infinite2.c): Ok(Field { name: "SUM(annotated_data_infinite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: Wi [...]
+          BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.c): Ok(Field { name: "SUM(annotated_data_infinite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data_infinite2.c): Ok(Field { name: "SUM(annotated_data_infinite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame:  [...]
+  BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.c): Ok(Field { name: "SUM(annotated_data_infinite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data_infinite2.c): Ok(Field { name: "SUM(annotated_data_infinite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFr [...]
+    BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.c): Ok(Field { name: "SUM(annotated_data_infinite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data_infinite2.c): Ok(Field { name: "SUM(annotated_data_infinite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: Window [...]
+      CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], has_header=true
+
+
+query IIIIIIIIIIIIIII
+SELECT a, b, c,
+        SUM(c) OVER(PARTITION BY a, d ORDER BY b, c ASC ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING) as sum1,
+        SUM(c) OVER(PARTITION BY a, d ORDER BY b, c ASC ROWS BETWEEN 1 FOLLOWING AND 5 FOLLOWING) as sum2,
+        SUM(c) OVER(PARTITION BY d ORDER BY a, b, c ASC ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING) as sum3,
+        SUM(c) OVER(PARTITION BY d ORDER BY a, b, c ASC ROWS BETWEEN 5 PRECEDING AND 1 PRECEDING) as sum4,
+        SUM(c) OVER(PARTITION BY a, b ORDER BY c ASC ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING) as sum5,
+        SUM(c) OVER(PARTITION BY a, b ORDER BY c ASC ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING) as sum6,
+        SUM(c) OVER(PARTITION BY b, a ORDER BY c ASC ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING) as sum7,
+        SUM(c) OVER(PARTITION BY b, a ORDER BY c ASC ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING) as sum8,
+        SUM(c) OVER(PARTITION BY a, b, d ORDER BY c ASC ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING) as sum9,
+        SUM(c) OVER(PARTITION BY a, b, d ORDER BY c ASC ROWS BETWEEN 5 PRECEDING AND CURRENT ROW) as sum10,
+        SUM(c) OVER(PARTITION BY b, a, d ORDER BY c ASC ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING) as sum11,
+        SUM(c) OVER(PARTITION BY b, a, d ORDER BY c ASC ROWS BETWEEN CURRENT ROW  AND 1 FOLLOWING) as sum12
+ FROM annotated_data_infinite2
+ LIMIT 5
+----
+0 0 0 2 53 2 NULL 1 15 1 15 2 0 2 2
+0 0 1 8 61 8 NULL 3 21 3 21 8 1 8 8
+0 0 2 5 74 5 0 6 28 6 28 5 2 5 5
+0 0 3 11 96 11 2 10 36 10 36 11 5 11 9
+0 0 4 9 72 9 NULL 14 45 14 45 9 4 9 9
+
+
+
+# test_finite_source_partition_by
+
+query TT
+EXPLAIN SELECT a, b, c,
+        SUM(c) OVER(PARTITION BY a, d ORDER BY b, c ASC ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING) as sum1,
+        SUM(c) OVER(PARTITION BY a, d ORDER BY b, c ASC ROWS BETWEEN 1 FOLLOWING AND 5 FOLLOWING) as sum2,
+        SUM(c) OVER(PARTITION BY d ORDER BY a, b, c ASC ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING) as sum3,
+        SUM(c) OVER(PARTITION BY d ORDER BY a, b, c ASC ROWS BETWEEN 5 PRECEDING AND 1 PRECEDING) as sum4,
+        SUM(c) OVER(PARTITION BY a, b ORDER BY c ASC ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING) as sum5,
+        SUM(c) OVER(PARTITION BY a, b ORDER BY c ASC ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING) as sum6,
+        SUM(c) OVER(PARTITION BY b, a ORDER BY c ASC ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING) as sum7,
+        SUM(c) OVER(PARTITION BY b, a ORDER BY c ASC ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING) as sum8,
+        SUM(c) OVER(PARTITION BY a, b, d ORDER BY c ASC ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING) as sum9,
+        SUM(c) OVER(PARTITION BY a, b, d ORDER BY c ASC ROWS BETWEEN 5 PRECEDING AND CURRENT ROW) as sum10,
+        SUM(c) OVER(PARTITION BY b, a, d ORDER BY c ASC ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING) as sum11,
+        SUM(c) OVER(PARTITION BY b, a, d ORDER BY c ASC ROWS BETWEEN CURRENT ROW  AND 1 FOLLOWING) as sum12
+ FROM annotated_data_finite2
+ ORDER BY c
+ LIMIT 5
+----
+logical_plan
+Limit: skip=0, fetch=5
+  Sort: annotated_data_finite2.c ASC NULLS LAST, fetch=5
+    Projection: annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.c, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING AS sum1, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.b ASC NULLS LAST, annotated_dat [...]
+      WindowAggr: windowExpr=[[SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.d] ORDER BY [annotated_data_finite2.a ASC NULLS LAST, annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.d] ORDER BY [annotated_data_finite2.a ASC NULLS LAST, annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN  [...]
+        WindowAggr: windowExpr=[[SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.b, annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.b, annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN CURRENT ROW AND 1 FOLLOWING]]
+          WindowAggr: windowExpr=[[SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.b, annotated_data_finite2.a] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.b, annotated_data_finite2.a] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING]]
+  WindowAggr: windowExpr=[[SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 1 FOLLOWING AND 5 FOLLOWING]]
+    WindowAggr: windowExpr=[[SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND CURRENT ROW]]
+      WindowAggr: windowExpr=[[SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING]]
+        TableScan: annotated_data_finite2 projection=[a, b, c, d]
+physical_plan
+GlobalLimitExec: skip=0, fetch=5
+  SortExec: fetch=5, expr=[c@2 ASC NULLS LAST]
+    ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING@8 as sum1, SUM(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS LAST] ROWS BET [...]
+      BoundedWindowAggExec: wdw=[SUM(annotated_data_finite2.c): Ok(Field { name: "SUM(annotated_data_finite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data_finite2.c): Ok(Field { name: "SUM(annotated_data_finite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame  [...]
+        SortExec: expr=[d@3 ASC NULLS LAST,a@0 ASC NULLS LAST,b@1 ASC NULLS LAST,c@2 ASC NULLS LAST]
+          BoundedWindowAggExec: wdw=[SUM(annotated_data_finite2.c): Ok(Field { name: "SUM(annotated_data_finite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data_finite2.c): Ok(Field { name: "SUM(annotated_data_finite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFr [...]
+  SortExec: expr=[b@1 ASC NULLS LAST,a@0 ASC NULLS LAST,d@3 ASC NULLS LAST,c@2 ASC NULLS LAST]
+    BoundedWindowAggExec: wdw=[SUM(annotated_data_finite2.c): Ok(Field { name: "SUM(annotated_data_finite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data_finite2.c): Ok(Field { name: "SUM(annotated_data_finite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {  [...]
+      SortExec: expr=[b@1 ASC NULLS LAST,a@0 ASC NULLS LAST,c@2 ASC NULLS LAST]
+        BoundedWindowAggExec: wdw=[SUM(annotated_data_finite2.c): Ok(Field { name: "SUM(annotated_data_finite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data_finite2.c): Ok(Field { name: "SUM(annotated_data_finite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFram [...]
+          SortExec: expr=[a@0 ASC NULLS LAST,d@3 ASC NULLS LAST,b@1 ASC NULLS LAST,c@2 ASC NULLS LAST]
+  BoundedWindowAggExec: wdw=[SUM(annotated_data_finite2.c): Ok(Field { name: "SUM(annotated_data_finite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data_finite2.c): Ok(Field { name: "SUM(annotated_data_finite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { un [...]
+    SortExec: expr=[a@0 ASC NULLS LAST,b@1 ASC NULLS LAST,d@3 ASC NULLS LAST,c@2 ASC NULLS LAST]
+      BoundedWindowAggExec: wdw=[SUM(annotated_data_finite2.c): Ok(Field { name: "SUM(annotated_data_finite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data_finite2.c): Ok(Field { name: "SUM(annotated_data_finite2.c)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame  [...]
+        CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], has_header=true
+
+
+query IIIIIIIIIIIIIII
+SELECT a, b, c,
+        SUM(c) OVER(PARTITION BY a, d ORDER BY b, c ASC ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING) as sum1,
+        SUM(c) OVER(PARTITION BY a, d ORDER BY b, c ASC ROWS BETWEEN 1 FOLLOWING AND 5 FOLLOWING) as sum2,
+        SUM(c) OVER(PARTITION BY d ORDER BY a, b, c ASC ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING) as sum3,
+        SUM(c) OVER(PARTITION BY d ORDER BY a, b, c ASC ROWS BETWEEN 5 PRECEDING AND 1 PRECEDING) as sum4,
+        SUM(c) OVER(PARTITION BY a, b ORDER BY c ASC ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING) as sum5,
+        SUM(c) OVER(PARTITION BY a, b ORDER BY c ASC ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING) as sum6,
+        SUM(c) OVER(PARTITION BY b, a ORDER BY c ASC ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING) as sum7,
+        SUM(c) OVER(PARTITION BY b, a ORDER BY c ASC ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING) as sum8,
+        SUM(c) OVER(PARTITION BY a, b, d ORDER BY c ASC ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING) as sum9,
+        SUM(c) OVER(PARTITION BY a, b, d ORDER BY c ASC ROWS BETWEEN 5 PRECEDING AND CURRENT ROW) as sum10,
+        SUM(c) OVER(PARTITION BY b, a, d ORDER BY c ASC ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING) as sum11,
+        SUM(c) OVER(PARTITION BY b, a, d ORDER BY c ASC ROWS BETWEEN CURRENT ROW  AND 1 FOLLOWING) as sum12
+ FROM annotated_data_finite2
+ ORDER BY c
+ LIMIT 5
+----
+0 0 0 2 53 2 NULL 1 15 1 15 2 0 2 2
+0 0 1 8 61 8 NULL 3 21 3 21 8 1 8 8
+0 0 2 5 74 5 0 6 28 6 28 5 2 5 5
+0 0 3 11 96 11 2 10 36 10 36 11 5 11 9
+0 0 4 9 72 9 NULL 14 45 14 45 9 4 9 9
+
+
+statement ok
+drop table annotated_data_finite2
+
+statement ok
+drop table annotated_data_infinite2