You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "alamb (via GitHub)" <gi...@apache.org> on 2023/02/28 22:42:20 UTC

[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #5434: Port more window tests to sqlogictests

alamb commented on code in PR #5434:
URL: https://github.com/apache/arrow-datafusion/pull/5434#discussion_r1120885388


##########
datafusion/core/tests/sql/window.rs:
##########
@@ -61,912 +61,6 @@ async fn window_frame_creation_type_checking() -> Result<()> {
     ).await
 }
 
-#[tokio::test]
-async fn test_window_agg_sort() -> Result<()> {
-    // We need to specify the target partition number.
-    // Otherwise, the default value used may vary on different environment
-    // with different cpu core number, which may cause the UT failure.
-    let ctx = SessionContext::with_config(SessionConfig::new().with_target_partitions(2));
-    register_aggregate_csv(&ctx).await?;
-    let sql = "SELECT
-      c9,
-      SUM(c9) OVER(ORDER BY c9) as sum1,
-      SUM(c9) OVER(ORDER BY c9, c8) as sum2
-      FROM aggregate_test_100";
-
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let physical_plan = dataframe.create_physical_plan().await?;
-    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
-    // Only 1 SortExec was added
-    let expected = {
-        vec![
-            "ProjectionExec: expr=[c9@8 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@14 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c8 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@13 as sum2]",
-            "  BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
-            "    BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
-            "      SortExec: expr=[c9@8 ASC NULLS LAST,c8@7 ASC NULLS LAST]",
-        ]
-    };
-
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    let actual_len = actual.len();
-    let actual_trim_last = &actual[..actual_len - 1];
-    assert_eq!(
-        expected, actual_trim_last,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual_trim_last:#?}\n\n"
-    );
-    Ok(())
-}
-
-#[tokio::test]
-async fn over_order_by_sort_keys_sorting_prefix_compacting() -> Result<()> {
-    let ctx = SessionContext::with_config(SessionConfig::new().with_target_partitions(2));
-    register_aggregate_csv(&ctx).await?;
-
-    let sql = "SELECT c2, MAX(c9) OVER (ORDER BY c2), SUM(c9) OVER (), MIN(c9) OVER (ORDER BY c2, c9) from aggregate_test_100";
-
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let physical_plan = dataframe.create_physical_plan().await?;
-    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
-    // Only 1 SortExec was added
-    let expected = {
-        vec![
-            "ProjectionExec: expr=[c2@1 as c2, MAX(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@14 as MAX(aggregate_test_100.c9), SUM(aggregate_test_100.c9) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@15 as SUM(aggregate_test_100.c9), MIN(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@13 as MIN(aggregate_test_100.c9)]",
-            "  WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }]",
-            "    BoundedWindowAggExec: wdw=[MAX(aggregate_test_100.c9): Ok(Field { name: \"MAX(aggregate_test_100.c9)\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
-            "      BoundedWindowAggExec: wdw=[MIN(aggregate_test_100.c9): Ok(Field { name: \"MIN(aggregate_test_100.c9)\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
-            "        SortExec: expr=[c2@1 ASC NULLS LAST,c9@8 ASC NULLS LAST]",
-        ]
-    };
-
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    let actual_len = actual.len();
-    let actual_trim_last = &actual[..actual_len - 1];
-    assert_eq!(
-        expected, actual_trim_last,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual_trim_last:#?}\n\n"
-    );
-    Ok(())
-}
-
-/// FIXME: for now we are not detecting prefix of sorting keys in order to re-arrange with global and save one SortExec
-#[tokio::test]
-async fn over_order_by_sort_keys_sorting_global_order_compacting() -> Result<()> {
-    let ctx = SessionContext::with_config(SessionConfig::new().with_target_partitions(2));
-    register_aggregate_csv(&ctx).await?;
-
-    let sql = "SELECT c2, MAX(c9) OVER (ORDER BY c9, c2), SUM(c9) OVER (), MIN(c9) OVER (ORDER BY c2, c9) from aggregate_test_100 ORDER BY c2";
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let physical_plan = dataframe.create_physical_plan().await?;
-    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
-    // 3 SortExec are added
-    let expected = {
-        vec![
-            "SortExec: expr=[c2@0 ASC NULLS LAST]",
-            "  ProjectionExec: expr=[c2@1 as c2, MAX(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@14 as MAX(aggregate_test_100.c9), SUM(aggregate_test_100.c9) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@15 as SUM(aggregate_test_100.c9), MIN(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@13 as MIN(aggregate_test_100.c9)]",
-            "    WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }]",
-            "      BoundedWindowAggExec: wdw=[MAX(aggregate_test_100.c9): Ok(Field { name: \"MAX(aggregate_test_100.c9)\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
-            "        SortExec: expr=[c9@8 ASC NULLS LAST,c2@1 ASC NULLS LAST]",
-            "          BoundedWindowAggExec: wdw=[MIN(aggregate_test_100.c9): Ok(Field { name: \"MIN(aggregate_test_100.c9)\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
-            "            SortExec: expr=[c2@1 ASC NULLS LAST,c9@8 ASC NULLS LAST]",
-        ]
-    };
-
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    let actual_len = actual.len();
-    let actual_trim_last = &actual[..actual_len - 1];
-    assert_eq!(
-        expected, actual_trim_last,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual_trim_last:#?}\n\n"
-    );
-    Ok(())
-}
-
-#[tokio::test]
-async fn test_window_partition_by_order_by() -> Result<()> {
-    let ctx = SessionContext::with_config(
-        SessionConfig::new()
-            .with_target_partitions(2)
-            .with_batch_size(4096),
-    );
-    register_aggregate_csv(&ctx).await?;
-
-    let sql = "SELECT \
-               SUM(c4) OVER(PARTITION BY c1, c2 ORDER BY c2 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING),\
-               COUNT(*) OVER(PARTITION BY c1 ORDER BY c2 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) \
-               FROM aggregate_test_100";
-
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let physical_plan = dataframe.create_physical_plan().await?;
-    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
-    let expected = {
-        vec![
-            "ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@13 as SUM(aggregate_test_100.c4), COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@14 as COUNT(UInt8(1))]",
-            "  BoundedWindowAggExec: wdw=[COUNT(UInt8(1)): Ok(Field { name: \"COUNT(UInt8(1))\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }]",
-            "    SortExec: expr=[c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST]",
-            "      CoalesceBatchesExec: target_batch_size=4096",
-            "        RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 2), input_partitions=2",
-            "          BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c4): Ok(Field { name: \"SUM(aggregate_test_100.c4)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }]",
-            "            SortExec: expr=[c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST]",
-            "              CoalesceBatchesExec: target_batch_size=4096",
-            "                RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }, Column { name: \"c2\", index: 1 }], 2), input_partitions=2",
-            "                  RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
-        ]
-    };
-
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    let actual_len = actual.len();
-    let actual_trim_last = &actual[..actual_len - 1];
-    assert_eq!(
-        expected, actual_trim_last,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual_trim_last:#?}\n\n"
-    );
-    Ok(())
-}
-
-#[tokio::test]
-async fn test_window_agg_sort_reversed_plan() -> Result<()> {
-    let ctx = SessionContext::with_config(SessionConfig::new().with_target_partitions(2));
-    register_aggregate_csv(&ctx).await?;
-    let sql = "SELECT
-    c9,
-    SUM(c9) OVER(ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum1,
-    SUM(c9) OVER(ORDER BY c9 DESC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum2
-    FROM aggregate_test_100
-    LIMIT 5";
-
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let physical_plan = dataframe.create_physical_plan().await?;
-    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
-    // Only 1 SortExec was added
-    let expected = {
-        vec![
-            "ProjectionExec: expr=[c9@8 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@14 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@13 as sum2]",
-            "  GlobalLimitExec: skip=0, fetch=5",
-            "    BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1)) }]",
-            "      BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
-            "        SortExec: expr=[c9@8 DESC]",
-        ]
-    };
-
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    let actual_len = actual.len();
-    let actual_trim_last = &actual[..actual_len - 1];
-    assert_eq!(
-        expected, actual_trim_last,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    let actual = execute_to_batches(&ctx, sql).await;
-    let expected = vec![
-        "+------------+-------------+-------------+",
-        "| c9         | sum1        | sum2        |",
-        "+------------+-------------+-------------+",
-        "| 4268716378 | 8498370520  | 24997484146 |",
-        "| 4229654142 | 12714811027 | 29012926487 |",
-        "| 4216440507 | 16858984380 | 28743001064 |",
-        "| 4144173353 | 20935849039 | 28472563256 |",
-        "| 4076864659 | 24997484146 | 28118515915 |",
-        "+------------+-------------+-------------+",
-    ];
-    assert_batches_eq!(expected, &actual);
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn test_window_agg_sort_reversed_plan_builtin() -> Result<()> {
-    let ctx = SessionContext::with_config(SessionConfig::new().with_target_partitions(2));
-    register_aggregate_csv(&ctx).await?;
-    let sql = "SELECT
-    c9,
-    FIRST_VALUE(c9) OVER(ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as fv1,
-    FIRST_VALUE(c9) OVER(ORDER BY c9 DESC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as fv2,
-    LAG(c9, 2, 10101) OVER(ORDER BY c9 ASC) as lag1,
-    LAG(c9, 2, 10101) OVER(ORDER BY c9 DESC ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as lag2,
-    LEAD(c9, 2, 10101) OVER(ORDER BY c9 ASC) as lead1,
-    LEAD(c9, 2, 10101) OVER(ORDER BY c9 DESC ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as lead2
-    FROM aggregate_test_100
-    LIMIT 5";
-
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let physical_plan = dataframe.create_physical_plan().await?;
-    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
-    // Only 1 SortExec was added
-    let expected = {
-        vec![
-            "ProjectionExec: expr=[c9@8 as c9, FIRST_VALUE(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@16 as fv1, FIRST_VALUE(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@13 as fv2, LAG(aggregate_test_100.c9,Int64(2),Int64(10101)) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@17 as lag1, LAG(aggregate_test_100.c9,Int64(2),Int64(10101)) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@14 as lag2, LEAD(aggregate_test_100.c9,Int64(2),Int64(10101)) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@18 as lead1, LEAD(aggregate_test_100.c9,Int64(2),Int64(10101)) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@15 as lead2]",
-            "  GlobalLimitExec: skip=0, fetch=5",
-            "    BoundedWindowAggExec: wdw=[FIRST_VALUE(aggregate_test_100.c9): Ok(Field { name: \"FIRST_VALUE(aggregate_test_100.c9)\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1)) }, LAG(aggregate_test_100.c9,Int64(2),Int64(10101)): Ok(Field { name: \"LAG(aggregate_test_100.c9,Int64(2),Int64(10101))\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(UInt32(NULL)) }, LEAD(aggregate_test_100.c9,Int64(2),Int64(10101)): Ok(Field { name: \"LEAD(aggregate_test_100.c9,Int64(2),Int64(10101))\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(UInt32(NULL)) }]",
-            "      BoundedWindowAggExec: wdw=[FIRST_VALUE(aggregate_test_100.c9): Ok(Field { name: \"FIRST_VALUE(aggregate_test_100.c9)\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }, LAG(aggregate_test_100.c9,Int64(2),Int64(10101)): Ok(Field { name: \"LAG(aggregate_test_100.c9,Int64(2),Int64(10101))\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(10)), end_bound: Following(UInt64(1)) }, LEAD(aggregate_test_100.c9,Int64(2),Int64(10101)): Ok(Field { name: \"LEAD(aggregate_test_100.c9,Int64(2),Int64(10101))\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(10)), end_bound: Following(UInt64(1)) }]",
-            "        SortExec: expr=[c9@8 DESC]",
-        ]
-    };
-
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    let actual_len = actual.len();
-    let actual_trim_last = &actual[..actual_len - 1];
-    assert_eq!(
-        expected, actual_trim_last,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    let actual = execute_to_batches(&ctx, sql).await;
-    let expected = vec![
-        "+------------+------------+------------+------------+------------+------------+------------+",
-        "| c9         | fv1        | fv2        | lag1       | lag2       | lead1      | lead2      |",
-        "+------------+------------+------------+------------+------------+------------+------------+",
-        "| 4268716378 | 4229654142 | 4268716378 | 4216440507 | 10101      | 10101      | 4216440507 |",
-        "| 4229654142 | 4216440507 | 4268716378 | 4144173353 | 10101      | 10101      | 4144173353 |",
-        "| 4216440507 | 4144173353 | 4229654142 | 4076864659 | 4268716378 | 4268716378 | 4076864659 |",
-        "| 4144173353 | 4076864659 | 4216440507 | 4061635107 | 4229654142 | 4229654142 | 4061635107 |",
-        "| 4076864659 | 4061635107 | 4144173353 | 4015442341 | 4216440507 | 4216440507 | 4015442341 |",
-        "+------------+------------+------------+------------+------------+------------+------------+",
-    ];
-    assert_batches_eq!(expected, &actual);
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn test_window_agg_sort_non_reversed_plan() -> Result<()> {
-    let ctx = SessionContext::with_config(SessionConfig::new().with_target_partitions(2));
-    register_aggregate_csv(&ctx).await?;
-    let sql = "SELECT
-    c9,
-    ROW_NUMBER() OVER(ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as rn1,
-    ROW_NUMBER() OVER(ORDER BY c9 DESC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as rn2
-    FROM aggregate_test_100
-    LIMIT 5";
-
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let physical_plan = dataframe.create_physical_plan().await?;
-    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
-    // We cannot reverse each window function (ROW_NUMBER is not reversible)
-    let expected = {
-        vec![
-            "ProjectionExec: expr=[c9@8 as c9, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@14 as rn1, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@13 as rn2]",
-            "  GlobalLimitExec: skip=0, fetch=5",
-            "    BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: \"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
-            "      SortExec: expr=[c9@8 ASC NULLS LAST]",
-            "        BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: \"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
-            "          SortExec: expr=[c9@8 DESC]",
-        ]
-    };
-
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    let actual_len = actual.len();
-    let actual_trim_last = &actual[..actual_len - 1];
-    assert_eq!(
-        expected, actual_trim_last,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    let actual = execute_to_batches(&ctx, sql).await;
-    let expected = vec![
-        "+-----------+-----+-----+",
-        "| c9        | rn1 | rn2 |",
-        "+-----------+-----+-----+",
-        "| 28774375  | 1   | 100 |",
-        "| 63044568  | 2   | 99  |",
-        "| 141047417 | 3   | 98  |",
-        "| 141680161 | 4   | 97  |",
-        "| 145294611 | 5   | 96  |",
-        "+-----------+-----+-----+",
-    ];
-    assert_batches_eq!(expected, &actual);
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn test_window_agg_sort_multi_layer_non_reversed_plan() -> Result<()> {
-    let ctx = SessionContext::with_config(SessionConfig::new().with_target_partitions(2));
-    register_aggregate_csv(&ctx).await?;
-    let sql = "SELECT
-    c9,
-    SUM(c9) OVER(ORDER BY c9 ASC, c1 ASC, c2 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum1,
-    SUM(c9) OVER(ORDER BY c9 DESC, c1 DESC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum2,
-    ROW_NUMBER() OVER(ORDER BY c9 DESC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as rn2
-    FROM aggregate_test_100
-    LIMIT 5";
-
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let physical_plan = dataframe.create_physical_plan().await?;
-    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
-    // We cannot reverse each window function (ROW_NUMBER is not reversible)
-    let expected = {
-        vec![
-            "ProjectionExec: expr=[c9@8 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@15 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c1 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@13 as sum2, ROW_NUMBER() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@14 as rn2]",
-            "  GlobalLimitExec: skip=0, fetch=5",
-            "    BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
-            "      SortExec: expr=[c9@8 ASC NULLS LAST,c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST]",
-            "        BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: \"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
-            "          BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
-            "            SortExec: expr=[c9@8 DESC,c1@0 DESC]",
-        ]
-    };
-
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    let actual_len = actual.len();
-    let actual_trim_last = &actual[..actual_len - 1];
-    assert_eq!(
-        expected, actual_trim_last,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    let actual = execute_to_batches(&ctx, sql).await;
-    let expected = vec![
-        "+-----------+------------+-----------+-----+",
-        "| c9        | sum1       | sum2      | rn2 |",
-        "+-----------+------------+-----------+-----+",
-        "| 28774375  | 745354217  | 91818943  | 100 |",
-        "| 63044568  | 988558066  | 232866360 | 99  |",
-        "| 141047417 | 1285934966 | 374546521 | 98  |",
-        "| 141680161 | 1654839259 | 519841132 | 97  |",
-        "| 145294611 | 1980231675 | 745354217 | 96  |",
-        "+-----------+------------+-----------+-----+",
-    ];
-    assert_batches_eq!(expected, &actual);
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn test_window_agg_complex_plan() -> Result<()> {
-    let ctx = SessionContext::with_config(SessionConfig::new().with_target_partitions(2));
-    register_aggregate_null_cases_csv(&ctx).await?;
-    let sql = "SELECT
-    SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING) as a,
-    SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING) as b,
-    SUM(c1) OVER (ORDER BY c3 DESC RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING) as c,
-    SUM(c1) OVER (ORDER BY c3 NULLS first RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING) as d,
-    SUM(c1) OVER (ORDER BY c3 DESC NULLS last RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING) as e,
-    SUM(c1) OVER (ORDER BY c3 DESC NULLS first RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING) as f,
-    SUM(c1) OVER (ORDER BY c3 NULLS first RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING) as g,
-    SUM(c1) OVER (ORDER BY c3) as h,
-    SUM(c1) OVER (ORDER BY c3 DESC) as i,
-    SUM(c1) OVER (ORDER BY c3 NULLS first) as j,
-    SUM(c1) OVER (ORDER BY c3 DESC NULLS first) as k,
-    SUM(c1) OVER (ORDER BY c3 DESC NULLS last) as l,
-    SUM(c1) OVER (ORDER BY c3, c2) as m,
-    SUM(c1) OVER (ORDER BY c3, c1 DESC) as n,
-    SUM(c1) OVER (ORDER BY c3 DESC, c1) as o,
-    SUM(c1) OVER (ORDER BY c3, c1 NULLs first) as p,
-    SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING) as a1,
-    SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING) as b1,
-    SUM(c1) OVER (ORDER BY c3 DESC RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING) as c1,
-    SUM(c1) OVER (ORDER BY c3 NULLS first RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING) as d1,
-    SUM(c1) OVER (ORDER BY c3 DESC NULLS last RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING) as e1,
-    SUM(c1) OVER (ORDER BY c3 DESC NULLS first RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING) as f1,
-    SUM(c1) OVER (ORDER BY c3 NULLS first RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING) as g1,
-    SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN UNBOUNDED PRECEDING AND current row) as h1,
-    SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN UNBOUNDED PRECEDING AND current row) as j1,
-    SUM(c1) OVER (ORDER BY c3 DESC RANGE BETWEEN UNBOUNDED PRECEDING AND current row) as k1,
-    SUM(c1) OVER (ORDER BY c3 NULLS first RANGE BETWEEN UNBOUNDED PRECEDING AND current row) as l1,
-    SUM(c1) OVER (ORDER BY c3 DESC NULLS last RANGE BETWEEN UNBOUNDED PRECEDING AND current row) as m1,
-    SUM(c1) OVER (ORDER BY c3 DESC NULLS first RANGE BETWEEN UNBOUNDED PRECEDING AND current row) as n1,
-    SUM(c1) OVER (ORDER BY c3 NULLS first RANGE BETWEEN UNBOUNDED PRECEDING AND current row) as o1,
-    SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN current row AND UNBOUNDED FOLLOWING) as h11,
-    SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN current row AND UNBOUNDED FOLLOWING) as j11,
-    SUM(c1) OVER (ORDER BY c3 DESC RANGE BETWEEN current row AND UNBOUNDED FOLLOWING) as k11,
-    SUM(c1) OVER (ORDER BY c3 NULLS first RANGE BETWEEN current row AND UNBOUNDED FOLLOWING) as l11,
-    SUM(c1) OVER (ORDER BY c3 DESC NULLS last RANGE BETWEEN current row AND UNBOUNDED FOLLOWING) as m11,
-    SUM(c1) OVER (ORDER BY c3 DESC NULLS first RANGE BETWEEN current row AND UNBOUNDED FOLLOWING) as n11,
-    SUM(c1) OVER (ORDER BY c3 NULLS first RANGE BETWEEN current row AND UNBOUNDED FOLLOWING) as o11
-    FROM null_cases
-    LIMIT 5";
-
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let physical_plan = dataframe.create_physical_plan().await?;
-    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
-    // Unnecessary SortExecs are removed
-    let expected = {
-        vec![
-            "ProjectionExec: expr=[SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@19 as a, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@19 as b, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@4 as c, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@12 as d, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@8 as e, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@4 as f, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING@12 as g, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@20 as h, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FI
 RST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as i, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@13 as j, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as k, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@9 as l, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST, null_cases.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@18 as m, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST, null_cases.c1 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@16 as n, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST, null_cases.c1 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as o, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST, null_cases.c1 ASC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CUR
 RENT ROW@17 as p, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING@21 as a1, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING@21 as b1, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING@6 as c1, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING@14 as d1, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING@10 as e1, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING@6 as f1, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING@14 as g1, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@20 as h1, SUM(null_cases.
 c1) ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@20 as j1, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as k1, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@13 as l1, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@9 as m1, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as n1, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@13 as o1, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING@22 as h11, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS LAST] RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING@22 as j11, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS 
 FIRST] RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING@7 as k11, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING@15 as l11, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS LAST] RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING@11 as m11, SUM(null_cases.c1) ORDER BY [null_cases.c3 DESC NULLS FIRST] RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING@7 as n11, SUM(null_cases.c1) ORDER BY [null_cases.c3 ASC NULLS FIRST] RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING@15 as o11]",
-            "  GlobalLimitExec: skip=0, fetch=5",
-            "    WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(10)), end_bound: Following(Int64(11)) }, SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }, SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: Following(Int64(11)) }, SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow,
  end_bound: Following(Int64(NULL)) }]",
-            "      BoundedWindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }]",
-            "        SortExec: expr=[c3@2 ASC NULLS LAST,c2@1 ASC NULLS LAST]",
-            "          BoundedWindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }]",
-            "            SortExec: expr=[c3@2 ASC NULLS LAST,c1@0 ASC]",
-            "              WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(Int64(NULL)) }]",
-            "                WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(11)), end_bound: Following(Int64(10)) }, SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(Int64(NULL)) }, SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(11)), end_bound: Following(Int64(NULL)) }, SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound:
  Preceding(Int64(NULL)), end_bound: CurrentRow }]",
-            "                  WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(10)), end_bound: Following(Int64(11)) }, SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }, SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: Following(Int64(11)) }, SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_boun
 d: CurrentRow, end_bound: Following(Int64(NULL)) }]",
-            "                    WindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(10)), end_bound: Following(Int64(11)) }, SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }, SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: Following(Int64(11)) }, SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bo
 und: CurrentRow, end_bound: Following(Int64(NULL)) }]",
-            "                      BoundedWindowAggExec: wdw=[SUM(null_cases.c1): Ok(Field { name: \"SUM(null_cases.c1)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }]",
-            "                        SortExec: expr=[c3@2 DESC,c1@0 ASC NULLS LAST]",
-        ]
-    };
-
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    let actual_len = actual.len();
-    let actual_trim_last = &actual[..actual_len - 1];
-    assert_eq!(
-        expected, actual_trim_last,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn test_window_agg_sort_orderby_reversed_partitionby_plan() -> Result<()> {
-    let config = SessionConfig::new()
-        .with_repartition_windows(false)
-        .with_target_partitions(2);
-    let ctx = SessionContext::with_config(config);
-    register_aggregate_csv(&ctx).await?;
-    let sql = "SELECT
-    c9,
-    SUM(c9) OVER(ORDER BY c1, c9 DESC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum1,
-    SUM(c9) OVER(PARTITION BY c1 ORDER BY c9 DESC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum2
-    FROM aggregate_test_100
-    LIMIT 5";
-
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let physical_plan = dataframe.create_physical_plan().await?;
-    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
-    // Only 1 SortExec was added
-    let expected = {
-        vec![
-            "ProjectionExec: expr=[c9@8 as c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@13 as sum1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@14 as sum2]",
-            "  GlobalLimitExec: skip=0, fetch=5",
-            "    BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
-            "      BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
-            "        SortExec: expr=[c1@0 ASC NULLS LAST,c9@8 DESC]",
-        ]
-    };
-
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    let actual_len = actual.len();
-    let actual_trim_last = &actual[..actual_len - 1];
-    assert_eq!(
-        expected, actual_trim_last,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    let actual = execute_to_batches(&ctx, sql).await;
-    let expected = vec![
-        "+------------+-------------+-------------+",
-        "| c9         | sum1        | sum2        |",
-        "+------------+-------------+-------------+",
-        "| 4015442341 | 21907044499 | 21907044499 |",
-        "| 3998790955 | 24576419362 | 24576419362 |",
-        "| 3959216334 | 23063303501 | 23063303501 |",
-        "| 3717551163 | 21560567246 | 21560567246 |",
-        "| 3276123488 | 19815386638 | 19815386638 |",
-        "+------------+-------------+-------------+",
-    ];
-    assert_batches_eq!(expected, &actual);
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn test_window_agg_sort_partitionby_reversed_plan() -> Result<()> {
-    let config = SessionConfig::new()
-        .with_repartition_windows(false)
-        .with_target_partitions(2);
-    let ctx = SessionContext::with_config(config);
-    register_aggregate_csv(&ctx).await?;
-    let sql = "SELECT
-    c9,
-    SUM(c9) OVER(PARTITION BY c1 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum1,
-    SUM(c9) OVER(PARTITION BY c1 ORDER BY c9 DESC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum2
-    FROM aggregate_test_100
-    LIMIT 5";
-
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let physical_plan = dataframe.create_physical_plan().await?;
-    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
-    // Only 1 SortExec was added
-    let expected = {
-        vec![
-            "ProjectionExec: expr=[c9@8 as c9, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@14 as sum1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@13 as sum2]",
-            "  GlobalLimitExec: skip=0, fetch=5",
-            "    BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(1)) }]",
-            "      BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
-            "        SortExec: expr=[c1@0 ASC NULLS LAST,c9@8 DESC]",
-        ]
-    };
-
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    let actual_len = actual.len();
-    let actual_trim_last = &actual[..actual_len - 1];
-    assert_eq!(
-        expected, actual_trim_last,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    let actual = execute_to_batches(&ctx, sql).await;
-    let expected = vec![
-        "+------------+-------------+-------------+",
-        "| c9         | sum1        | sum2        |",
-        "+------------+-------------+-------------+",
-        "| 4015442341 | 8014233296  | 21907044499 |",
-        "| 3998790955 | 11973449630 | 24576419362 |",
-        "| 3959216334 | 15691000793 | 23063303501 |",
-        "| 3717551163 | 18967124281 | 21560567246 |",
-        "| 3276123488 | 21907044499 | 19815386638 |",
-        "+------------+-------------+-------------+",
-    ];
-    assert_batches_eq!(expected, &actual);
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn test_window_agg_sort_orderby_reversed_binary_expr() -> Result<()> {
-    let config = SessionConfig::new()
-        .with_repartition_windows(false)
-        .with_target_partitions(2);
-    let ctx = SessionContext::with_config(config);
-    register_aggregate_csv(&ctx).await?;
-    let sql = "SELECT c3,
-    SUM(c9) OVER(ORDER BY c3+c4 DESC, c9 DESC, c2 ASC) as sum1,
-    SUM(c9) OVER(ORDER BY c3+c4 ASC, c9 ASC ) as sum2
-    FROM aggregate_test_100
-    LIMIT 5";
-
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let physical_plan = dataframe.create_physical_plan().await?;
-    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
-    // Only 1 SortExec was added
-    let expected = {
-        vec![
-            "ProjectionExec: expr=[c3@2 as c3, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@13 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@14 as sum2]",
-            "  GlobalLimitExec: skip=0, fetch=5",
-            "    WindowAggExec: wdw=[SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: \"SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(Int16(NULL)) }]",
-            "      BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: \"SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int16(NULL)), end_bound: CurrentRow }]",
-            "        SortExec: expr=[CAST(c3@2 AS Int16) + c4@3 DESC,c9@8 DESC,c2@1 ASC NULLS LAST]",
-        ]
-    };
-
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    let actual_len = actual.len();
-    let actual_trim_last = &actual[..actual_len - 1];
-    assert_eq!(
-        expected, actual_trim_last,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    let actual = execute_to_batches(&ctx, sql).await;
-    let expected = vec![
-        "+-----+-------------+--------------+",
-        "| c3  | sum1        | sum2         |",
-        "+-----+-------------+--------------+",
-        "| -86 | 2861911482  | 222089770060 |",
-        "| 13  | 5075947208  | 219227858578 |",
-        "| 125 | 8701233618  | 217013822852 |",
-        "| 123 | 11293564174 | 213388536442 |",
-        "| 97  | 14767488750 | 210796205886 |",
-        "+-----+-------------+--------------+",
-    ];
-    assert_batches_eq!(expected, &actual);
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn test_remove_unnecessary_sort_in_sub_query() -> Result<()> {
-    let config = SessionConfig::new()
-        .with_target_partitions(8)
-        .with_batch_size(4096)
-        .with_repartition_windows(true);
-    let ctx = SessionContext::with_config(config);
-    register_aggregate_csv(&ctx).await?;
-    let sql = "SELECT count(*) as global_count FROM
-                 (SELECT count(*), c1
-                  FROM aggregate_test_100
-                  WHERE c13 != 'C2GT5KVyOPZpgKVl110TyZO0NcJ434'
-                  GROUP BY c1
-                  ORDER BY c1 ) AS a ";
-
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let physical_plan = dataframe.create_physical_plan().await?;
-    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
-    // Unnecessary Sort in the sub query is removed
-    let expected = {
-        vec![
-            "ProjectionExec: expr=[COUNT(UInt8(1))@0 as global_count]",
-            "  AggregateExec: mode=Final, gby=[], aggr=[COUNT(UInt8(1))]",
-            "    CoalescePartitionsExec",
-            "      AggregateExec: mode=Partial, gby=[], aggr=[COUNT(UInt8(1))]",
-            "        RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=8",
-            "          ProjectionExec: expr=[c1@0 as c1]",
-            "            AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[COUNT(UInt8(1))]",
-            "              CoalesceBatchesExec: target_batch_size=4096",
-            "                RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 8), input_partitions=8",
-            "                  AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[COUNT(UInt8(1))]",
-            "                    ProjectionExec: expr=[c1@0 as c1]",
-            "                      CoalesceBatchesExec: target_batch_size=4096",
-            "                        FilterExec: c13@1 != C2GT5KVyOPZpgKVl110TyZO0NcJ434",
-            "                          RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
-        ]
-    };
-
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    let actual_len = actual.len();
-    let actual_trim_last = &actual[..actual_len - 1];
-    assert_eq!(
-        expected, actual_trim_last,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    let actual = execute_to_batches(&ctx, sql).await;
-    let expected = vec![
-        "+--------------+",
-        "| global_count |",
-        "+--------------+",
-        "| 5            |",
-        "+--------------+",
-    ];
-    assert_batches_eq!(expected, &actual);
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn test_window_agg_sort_orderby_reversed_partitionby_reversed_plan() -> Result<()> {
-    let config = SessionConfig::new()
-        .with_repartition_windows(false)
-        .with_target_partitions(2);
-    let ctx = SessionContext::with_config(config);
-    register_aggregate_csv(&ctx).await?;
-    let sql = "SELECT c3,
-    SUM(c9) OVER(ORDER BY c3 DESC, c9 DESC, c2 ASC) as sum1,
-    SUM(c9) OVER(PARTITION BY c3 ORDER BY c9 DESC ) as sum2
-    FROM aggregate_test_100
-    LIMIT 5";
-
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let physical_plan = dataframe.create_physical_plan().await?;
-    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
-    // Only 1 SortExec was added
-    let expected = {
-        vec![
-            "ProjectionExec: expr=[c3@2 as c3, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@13 as sum1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c3] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@14 as sum2]",
-            "  GlobalLimitExec: skip=0, fetch=5",
-            "    BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]",
-            "      BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int8(NULL)), end_bound: CurrentRow }]",
-            "        SortExec: expr=[c3@2 DESC,c9@8 DESC,c2@1 ASC NULLS LAST]",
-        ]
-    };
-
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    let actual_len = actual.len();
-    let actual_trim_last = &actual[..actual_len - 1];
-    assert_eq!(
-        expected, actual_trim_last,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    let actual = execute_to_batches(&ctx, sql).await;
-    let expected = vec![
-        "+-----+-------------+------------+",
-        "| c3  | sum1        | sum2       |",
-        "+-----+-------------+------------+",
-        "| 125 | 3625286410  | 3625286410 |",
-        "| 123 | 7192027599  | 3566741189 |",
-        "| 123 | 9784358155  | 6159071745 |",
-        "| 122 | 13845993262 | 4061635107 |",
-        "| 120 | 16676974334 | 2830981072 |",
-        "+-----+-------------+------------+",
-    ];
-    assert_batches_eq!(expected, &actual);
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn test_window_agg_global_sort() -> Result<()> {
-    let config = SessionConfig::new()
-        .with_repartition_windows(true)
-        .with_target_partitions(2)
-        .with_repartition_sorts(true);
-    let ctx = SessionContext::with_config(config);
-    register_aggregate_csv(&ctx).await?;
-    let sql = "SELECT c1, ROW_NUMBER() OVER (PARTITION BY c1) as rn1 FROM aggregate_test_100 ORDER BY c1 ASC";
-
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let physical_plan = dataframe.create_physical_plan().await?;
-    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
-    // Only 1 SortExec was added
-    let expected = {
-        vec![
-            "SortPreservingMergeExec: [c1@0 ASC NULLS LAST]",
-            "  ProjectionExec: expr=[c1@0 as c1, ROW_NUMBER() PARTITION BY [aggregate_test_100.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@13 as rn1]",
-            "    BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: \"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }]",
-            "      SortExec: expr=[c1@0 ASC NULLS LAST]",
-            "        CoalesceBatchesExec: target_batch_size=8192",
-            "          RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 2), input_partitions=2",
-            "            RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
-        ]
-    };
-
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    let actual_len = actual.len();
-    let actual_trim_last = &actual[..actual_len - 1];
-    assert_eq!(
-        expected, actual_trim_last,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn test_window_agg_global_sort_parallelize_sort_disabled() -> Result<()> {
-    let config = SessionConfig::new()
-        .with_repartition_windows(true)
-        .with_target_partitions(2)
-        .with_repartition_sorts(false);
-    let ctx = SessionContext::with_config(config);
-    register_aggregate_csv(&ctx).await?;
-    let sql = "SELECT c1, ROW_NUMBER() OVER (PARTITION BY c1) as rn1 FROM aggregate_test_100 ORDER BY c1 ASC";
-
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let physical_plan = dataframe.create_physical_plan().await?;
-    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
-    // Only 1 SortExec was added
-    let expected = {
-        vec![
-            "SortExec: expr=[c1@0 ASC NULLS LAST]",
-            "  CoalescePartitionsExec",
-            "    ProjectionExec: expr=[c1@0 as c1, ROW_NUMBER() PARTITION BY [aggregate_test_100.c1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@13 as rn1]",
-            "      BoundedWindowAggExec: wdw=[ROW_NUMBER(): Ok(Field { name: \"ROW_NUMBER()\", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }]",
-            "        SortExec: expr=[c1@0 ASC NULLS LAST]",
-            "          CoalesceBatchesExec: target_batch_size=8192",
-            "            RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 2), input_partitions=2",
-            "              RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
-        ]
-    };
-
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    let actual_len = actual.len();
-    let actual_trim_last = &actual[..actual_len - 1];
-    assert_eq!(
-        expected, actual_trim_last,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn test_window_agg_global_sort_intermediate_parallel_sort() -> Result<()> {
-    let config = SessionConfig::new()
-        .with_repartition_windows(true)
-        .with_target_partitions(2)
-        .with_repartition_sorts(true);
-    let ctx = SessionContext::with_config(config);
-    register_aggregate_csv(&ctx).await?;
-    let sql = "SELECT c1, \
-    SUM(C9) OVER (PARTITION BY C1 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 3 FOLLOWING) as sum1, \
-    SUM(C9) OVER (ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum2 \
-    FROM aggregate_test_100 ORDER BY c1 ASC";
-
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let physical_plan = dataframe.create_physical_plan().await?;
-    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
-    // Only 1 SortExec was added
-    let expected = {
-        vec![
-            "SortExec: expr=[c1@0 ASC NULLS LAST]",
-            "  ProjectionExec: expr=[c1@0 as c1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 3 FOLLOWING@13 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@14 as sum2]",
-            "    BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]",
-            "      SortPreservingMergeExec: [c9@8 ASC NULLS LAST]",
-            "        SortExec: expr=[c9@8 ASC NULLS LAST]",
-            "          BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(3)) }]",
-            "            SortExec: expr=[c1@0 ASC NULLS LAST,c9@8 ASC NULLS LAST]",
-            "              CoalesceBatchesExec: target_batch_size=8192",
-            "                RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 2), input_partitions=2",
-            "                  RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
-        ]
-    };
-
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    let actual_len = actual.len();
-    let actual_trim_last = &actual[..actual_len - 1];
-    assert_eq!(
-        expected, actual_trim_last,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn test_window_agg_with_global_limit() -> Result<()> {
-    let config = SessionConfig::new()
-        .with_repartition_windows(false)
-        .with_target_partitions(1);
-    let ctx = SessionContext::with_config(config);
-    register_aggregate_csv(&ctx).await?;
-    let sql = "SELECT ARRAY_AGG(c13) as array_agg1 FROM (SELECT * FROM aggregate_test_100 ORDER BY c13 LIMIT 1)";
-
-    let msg = format!("Creating logical plan for '{sql}'");
-    let dataframe = ctx.sql(sql).await.expect(&msg);
-    let physical_plan = dataframe.create_physical_plan().await?;
-    let formatted = displayable(physical_plan.as_ref()).indent().to_string();
-    // Only 1 SortExec was added
-    let expected = {
-        vec![
-            "ProjectionExec: expr=[ARRAYAGG(aggregate_test_100.c13)@0 as array_agg1]",
-            "  AggregateExec: mode=Final, gby=[], aggr=[ARRAYAGG(aggregate_test_100.c13)]",
-            "    AggregateExec: mode=Partial, gby=[], aggr=[ARRAYAGG(aggregate_test_100.c13)]",
-            "      GlobalLimitExec: skip=0, fetch=1",
-            "        SortExec: fetch=1, expr=[c13@0 ASC NULLS LAST]",
-        ]
-    };
-
-    let actual: Vec<&str> = formatted.trim().lines().collect();
-    let actual_len = actual.len();
-    let actual_trim_last = &actual[..actual_len - 1];
-    assert_eq!(
-        expected, actual_trim_last,
-        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-    );
-
-    let actual = execute_to_batches(&ctx, sql).await;
-    let expected = vec![
-        "+----------------------------------+",
-        "| array_agg1                       |",
-        "+----------------------------------+",
-        "| [0VVIHzxWtNOFLtnhjHEKjXaJOSLJfm] |",
-        "+----------------------------------+",
-    ];
-    assert_batches_eq!(expected, &actual);
-
-    Ok(())
-}
-
-#[tokio::test]
-async fn test_window_agg_low_cardinality() -> Result<()> {
-    let config = SessionConfig::new().with_target_partitions(32);
-    let ctx = SessionContext::with_config(config);
-    register_aggregate_csv(&ctx).await?;
-    let sql = "SELECT
-        SUM(c4) OVER(PARTITION BY c4 ORDER BY c3 GROUPS BETWEEN 1 PRECEDING AND 3 FOLLOWING) as summation1,
-        SUM(c5) OVER(PARTITION BY c4 ORDER BY c4 GROUPS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) as summation2
-    FROM aggregate_test_100
-    ORDER BY c9
-    LIMIT 5";
-
-    let actual = execute_to_batches(&ctx, sql).await;
-    let expected = vec![
-        "+------------+-------------+",
-        "| summation1 | summation2  |",
-        "+------------+-------------+",
-        "| -16110     | 61035129    |",
-        "| 3917       | -108973366  |",
-        "| -16974     | 623103518   |",
-        "| -1114      | -1927628110 |",
-        "| 15673      | -1899175111 |",
-        "+------------+-------------+",
-    ];
-    assert_batches_eq!(expected, &actual);
-
-    Ok(())
-}
-
 fn write_test_data_to_parquet(tmpdir: &TempDir, n_file: usize) -> Result<()> {

Review Comment:
   this file has only a few hundred lines after this one 😅 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org