You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/11/11 20:59:38 UTC

[arrow-datafusion] branch master updated: Window frame GROUPS mode support (#4155)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 129654cd8 Window frame GROUPS mode support (#4155)
129654cd8 is described below

commit 129654cd8466e7634bf186782c3d271541f0671e
Author: zembunia <ze...@gmail.com>
AuthorDate: Fri Nov 11 23:59:30 2022 +0300

    Window frame GROUPS mode support (#4155)
    
    * Implementation of GROUPS mode in window frame
    
    * Break down/flatten some functions, move comments inline
    
    * Change find_next_group_and_start_index to use an exponentially growing search algorithm
    
    * Removed the TODO after verification of correct implementation
    
    * Window frame state capturing the state of the window frame calculations
    
    * Do not use unnecessary traits and structs for window frame states
    
    * Refactor to avoid carrying the window frame object around
    
    Co-authored-by: Mehmet Ozan Kabak <oz...@gmail.com>
---
 datafusion/common/src/bisect.rs                    |   29 +-
 datafusion/core/src/physical_plan/planner.rs       |    8 +-
 datafusion/core/tests/sql/window.rs                |  167 +++-
 datafusion/physical-expr/src/window/aggregate.rs   |    6 +-
 datafusion/physical-expr/src/window/built_in.rs    |    5 +-
 datafusion/physical-expr/src/window/mod.rs         |    1 +
 datafusion/physical-expr/src/window/window_expr.rs |  211 +---
 .../physical-expr/src/window/window_frame_state.rs | 1003 ++++++++++++++++++++
 integration-tests/sqls/simple_window_groups.sql    |   71 ++
 integration-tests/test_psql_parity.py              |    3 +-
 10 files changed, 1266 insertions(+), 238 deletions(-)

diff --git a/datafusion/common/src/bisect.rs b/datafusion/common/src/bisect.rs
index 5060bf013..796598be2 100644
--- a/datafusion/common/src/bisect.rs
+++ b/datafusion/common/src/bisect.rs
@@ -60,22 +60,41 @@ pub fn bisect<const SIDE: bool>(
     target: &[ScalarValue],
     sort_options: &[SortOptions],
 ) -> Result<usize> {
-    let mut low: usize = 0;
-    let mut high: usize = item_columns
+    let low: usize = 0;
+    let high: usize = item_columns
         .get(0)
         .ok_or_else(|| {
             DataFusionError::Internal("Column array shouldn't be empty".to_string())
         })?
         .len();
+    let compare_fn = |current: &[ScalarValue], target: &[ScalarValue]| {
+        let cmp = compare(current, target, sort_options)?;
+        Ok(if SIDE { cmp.is_lt() } else { cmp.is_le() })
+    };
+    find_bisect_point(item_columns, target, compare_fn, low, high)
+}
+
+/// This function searches for a tuple of target values among the given rows using the bisection algorithm.
+/// The boolean-valued function `compare_fn` specifies whether we bisect on the left (with return value `false`),
+/// or on the right (with return value `true`) as we compare the target value with the current value as we iteratively
+/// bisect the input.
+pub fn find_bisect_point<F>(
+    item_columns: &[ArrayRef],
+    target: &[ScalarValue],
+    compare_fn: F,
+    mut low: usize,
+    mut high: usize,
+) -> Result<usize>
+where
+    F: Fn(&[ScalarValue], &[ScalarValue]) -> Result<bool>,
+{
     while low < high {
         let mid = ((high - low) / 2) + low;
         let val = item_columns
             .iter()
             .map(|arr| ScalarValue::try_from_array(arr, mid))
             .collect::<Result<Vec<ScalarValue>>>()?;
-        let cmp = compare(&val, target, sort_options)?;
-        let flag = if SIDE { cmp.is_lt() } else { cmp.is_le() };
-        if flag {
+        if compare_fn(&val, target)? {
             low = mid + 1;
         } else {
             high = mid;
diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs
index d2c148c3f..757ab7205 100644
--- a/datafusion/core/src/physical_plan/planner.rs
+++ b/datafusion/core/src/physical_plan/planner.rs
@@ -64,7 +64,7 @@ use datafusion_expr::expr::{
 };
 use datafusion_expr::expr_rewriter::unnormalize_cols;
 use datafusion_expr::utils::expand_wildcard;
-use datafusion_expr::{WindowFrame, WindowFrameBound, WindowFrameUnits};
+use datafusion_expr::{WindowFrame, WindowFrameBound};
 use datafusion_optimizer::utils::unalias;
 use datafusion_physical_expr::expressions::Literal;
 use datafusion_sql::utils::window_expr_common_partition_keys;
@@ -1457,12 +1457,6 @@ pub fn create_window_expr_with_name(
                 })
                 .collect::<Result<Vec<_>>>()?;
             if let Some(ref window_frame) = window_frame {
-                if window_frame.units == WindowFrameUnits::Groups {
-                    return Err(DataFusionError::NotImplemented(
-                        "Window frame definitions involving GROUPS are not supported yet"
-                            .to_string(),
-                    ));
-                }
                 if !is_window_valid(window_frame) {
                     return Err(DataFusionError::Execution(format!(
                         "Invalid window frame: start bound ({}) cannot be larger than end bound ({})",
diff --git a/datafusion/core/tests/sql/window.rs b/datafusion/core/tests/sql/window.rs
index a36d90c2a..9367ee63b 100644
--- a/datafusion/core/tests/sql/window.rs
+++ b/datafusion/core/tests/sql/window.rs
@@ -1189,24 +1189,171 @@ async fn window_frame_ranges_unbounded_preceding_err() -> Result<()> {
 }
 
 #[tokio::test]
-async fn window_frame_groups_query() -> Result<()> {
+async fn window_frame_groups_preceding_following_desc() -> Result<()> {
+    let ctx = SessionContext::new();
+    register_aggregate_csv(&ctx).await?;
+    let sql = "SELECT
+        SUM(c4) OVER(ORDER BY c2 DESC GROUPS BETWEEN 1 PRECEDING AND 1 FOLLOWING),
+        SUM(c3) OVER(ORDER BY c2 DESC GROUPS BETWEEN 10000 PRECEDING AND 10000 FOLLOWING),
+        COUNT(*) OVER(ORDER BY c2 DESC GROUPS BETWEEN 1 PRECEDING AND 1 FOLLOWING)
+        FROM aggregate_test_100
+        ORDER BY c9
+        LIMIT 5";
+    let actual = execute_to_batches(&ctx, sql).await;
+    let expected = vec![
+        "+----------------------------+----------------------------+-----------------+",
+        "| SUM(aggregate_test_100.c4) | SUM(aggregate_test_100.c3) | COUNT(UInt8(1)) |",
+        "+----------------------------+----------------------------+-----------------+",
+        "| 52276                      | 781                        | 56              |",
+        "| 260620                     | 781                        | 63              |",
+        "| -28623                     | 781                        | 37              |",
+        "| 260620                     | 781                        | 63              |",
+        "| 260620                     | 781                        | 63              |",
+        "+----------------------------+----------------------------+-----------------+",
+    ];
+    assert_batches_eq!(expected, &actual);
+    Ok(())
+}
+
+#[tokio::test]
+async fn window_frame_groups_order_by_null_desc() -> Result<()> {
+    let ctx = SessionContext::new();
+    register_aggregate_null_cases_csv(&ctx).await?;
+    let sql = "SELECT
+        COUNT(c2) OVER (ORDER BY c1 DESC GROUPS BETWEEN 5 PRECEDING AND 3 FOLLOWING)
+        FROM null_cases
+        LIMIT 5";
+    let actual = execute_to_batches(&ctx, sql).await;
+    let expected = vec![
+        "+----------------------+",
+        "| COUNT(null_cases.c2) |",
+        "+----------------------+",
+        "| 12                   |",
+        "| 12                   |",
+        "| 12                   |",
+        "| 12                   |",
+        "| 12                   |",
+        "+----------------------+",
+    ];
+    assert_batches_eq!(expected, &actual);
+    Ok(())
+}
+
+#[tokio::test]
+async fn window_frame_groups() -> Result<()> {
+    let ctx = SessionContext::new();
+    register_aggregate_null_cases_csv(&ctx).await?;
+    let sql = "SELECT
+        SUM(c1) OVER (ORDER BY c3 GROUPS BETWEEN 9 PRECEDING AND 11 FOLLOWING) as a,
+        SUM(c1) OVER (ORDER BY c3 DESC GROUPS BETWEEN 9 PRECEDING AND 11 FOLLOWING) as b,
+        SUM(c1) OVER (ORDER BY c3 NULLS first GROUPS BETWEEN 9 PRECEDING AND 11 FOLLOWING) as c,
+        SUM(c1) OVER (ORDER BY c3 DESC NULLS last GROUPS BETWEEN 9 PRECEDING AND 11 FOLLOWING) as d,
+        SUM(c1) OVER (ORDER BY c3 DESC NULLS first GROUPS BETWEEN 9 PRECEDING AND 11 FOLLOWING) as e,
+        SUM(c1) OVER (ORDER BY c3 NULLS first GROUPS BETWEEN 9 PRECEDING AND 11 FOLLOWING) as f,
+        SUM(c1) OVER (ORDER BY c3 GROUPS current row) as a1,
+        SUM(c1) OVER (ORDER BY c3 GROUPS BETWEEN 9 PRECEDING AND 5 PRECEDING) as a2,
+        SUM(c1) OVER (ORDER BY c3 GROUPS BETWEEN UNBOUNDED PRECEDING AND 5 PRECEDING) as a3,
+        SUM(c1) OVER (ORDER BY c3 GROUPS BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING) as a4,
+        SUM(c1) OVER (ORDER BY c3 GROUPS BETWEEN UNBOUNDED PRECEDING AND current row) as a5,
+        SUM(c1) OVER (ORDER BY c3 GROUPS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) as a6,
+        SUM(c1) OVER (ORDER BY c3 GROUPS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING) as a7,
+        SUM(c1) OVER (ORDER BY c3 GROUPS BETWEEN 3 FOLLOWING AND UNBOUNDED FOLLOWING) as a8,
+        SUM(c1) OVER (ORDER BY c3 GROUPS BETWEEN current row AND UNBOUNDED FOLLOWING) as a9,
+        SUM(c1) OVER (ORDER BY c3 GROUPS BETWEEN current row AND 3 FOLLOWING) as a10,
+        SUM(c1) OVER (ORDER BY c3 GROUPS BETWEEN 5 FOLLOWING AND 7 FOLLOWING) as a11,
+        SUM(c1) OVER (ORDER BY c3 DESC GROUPS current row) as a21,
+        SUM(c1) OVER (ORDER BY c3 NULLS first GROUPS BETWEEN 9 PRECEDING AND 5 PRECEDING) as a22,
+        SUM(c1) OVER (ORDER BY c3 DESC NULLS last GROUPS BETWEEN UNBOUNDED PRECEDING AND 5 PRECEDING) as a23,
+        SUM(c1) OVER (ORDER BY c3 NULLS last GROUPS BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING) as a24,
+        SUM(c1) OVER (ORDER BY c3 DESC NULLS first GROUPS BETWEEN UNBOUNDED PRECEDING AND current row) as a25
+        FROM null_cases
+        ORDER BY c3
+        LIMIT 10";
+    let actual = execute_to_batches(&ctx, sql).await;
+    let expected = vec![
+        "+-----+-----+-----+-----+-----+-----+----+-----+-----+-----+-----+------+------+------+------+-----+-----+-----+-----+------+-----+------+",
+        "| a   | b   | c   | d   | e   | f   | a1 | a2  | a3  | a4  | a5  | a6   | a7   | a8   | a9   | a10 | a11 | a21 | a22 | a23  | a24 | a25  |",
+        "+-----+-----+-----+-----+-----+-----+----+-----+-----+-----+-----+------+------+------+------+-----+-----+-----+-----+------+-----+------+",
+        "| 412 | 307 | 412 | 307 | 307 | 412 |    |     |     | 412 |     | 4627 | 4627 | 4531 | 4627 | 115 | 85  |     |     | 4487 | 412 | 4627 |",
+        "| 488 | 339 | 488 | 339 | 339 | 488 | 72 |     |     | 488 | 72  | 4627 | 4627 | 4512 | 4627 | 140 | 153 | 72  |     | 4473 | 488 | 4627 |",
+        "| 543 | 412 | 543 | 412 | 412 | 543 | 24 |     |     | 543 | 96  | 4627 | 4627 | 4487 | 4555 | 82  | 122 | 24  |     | 4442 | 543 | 4555 |",
+        "| 553 | 488 | 553 | 488 | 488 | 553 | 19 |     |     | 553 | 115 | 4627 | 4555 | 4473 | 4531 | 89  | 114 | 19  |     | 4402 | 553 | 4531 |",
+        "| 553 | 543 | 553 | 543 | 543 | 553 | 25 |     |     | 553 | 140 | 4627 | 4531 | 4442 | 4512 | 110 | 105 | 25  |     | 4320 | 553 | 4512 |",
+        "| 591 | 553 | 591 | 553 | 553 | 591 | 14 |     |     | 591 | 154 | 4627 | 4512 | 4402 | 4487 | 167 | 181 | 14  |     | 4320 | 591 | 4487 |",
+        "| 651 | 553 | 651 | 553 | 553 | 651 | 31 | 72  | 72  | 651 | 185 | 4627 | 4487 | 4320 | 4473 | 153 | 204 | 31  | 72  | 4288 | 651 | 4473 |",
+        "| 662 | 591 | 662 | 591 | 591 | 662 | 40 | 96  | 96  | 662 | 225 | 4627 | 4473 | 4320 | 4442 | 154 | 141 | 40  | 96  | 4215 | 662 | 4442 |",
+        "| 697 | 651 | 697 | 651 | 651 | 697 | 82 | 115 | 115 | 697 | 307 | 4627 | 4442 | 4288 | 4402 | 187 | 65  | 82  | 115 | 4139 | 697 | 4402 |",
+        "| 758 | 662 | 758 | 662 | 662 | 758 |    | 140 | 140 | 758 | 307 | 4627 | 4402 | 4215 | 4320 | 181 | 48  |     | 140 | 4084 | 758 | 4320 |",
+        "+-----+-----+-----+-----+-----+-----+----+-----+-----+-----+-----+------+------+------+------+-----+-----+-----+-----+------+-----+------+",
+    ];
+    assert_batches_eq!(expected, &actual);
+    Ok(())
+}
+
+#[tokio::test]
+async fn window_frame_groups_multiple_order_columns() -> Result<()> {
+    let ctx = SessionContext::new();
+    register_aggregate_null_cases_csv(&ctx).await?;
+    let sql = "SELECT
+        SUM(c1) OVER (ORDER BY c2, c3 GROUPS BETWEEN 9 PRECEDING AND 11 FOLLOWING) as a,
+        SUM(c1) OVER (ORDER BY c2, c3 DESC GROUPS BETWEEN 9 PRECEDING AND 11 FOLLOWING) as b,
+        SUM(c1) OVER (ORDER BY c2, c3 NULLS first GROUPS BETWEEN 9 PRECEDING AND 11 FOLLOWING) as c,
+        SUM(c1) OVER (ORDER BY c2, c3 DESC NULLS last GROUPS BETWEEN 9 PRECEDING AND 11 FOLLOWING) as d,
+        SUM(c1) OVER (ORDER BY c2, c3 DESC NULLS first GROUPS BETWEEN 9 PRECEDING AND 11 FOLLOWING) as e,
+        SUM(c1) OVER (ORDER BY c2, c3 NULLS first GROUPS BETWEEN 9 PRECEDING AND 11 FOLLOWING) as f,
+        SUM(c1) OVER (ORDER BY c2, c3 GROUPS current row) as a1,
+        SUM(c1) OVER (ORDER BY c2, c3 GROUPS BETWEEN 9 PRECEDING AND 5 PRECEDING) as a2,
+        SUM(c1) OVER (ORDER BY c2, c3 GROUPS BETWEEN UNBOUNDED PRECEDING AND 5 PRECEDING) as a3,
+        SUM(c1) OVER (ORDER BY c2, c3 GROUPS BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING) as a4,
+        SUM(c1) OVER (ORDER BY c2, c3 GROUPS BETWEEN UNBOUNDED PRECEDING AND current row) as a5,
+        SUM(c1) OVER (ORDER BY c2, c3 GROUPS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) as a6,
+        SUM(c1) OVER (ORDER BY c2, c3 GROUPS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING) as a7,
+        SUM(c1) OVER (ORDER BY c2, c3 GROUPS BETWEEN 3 FOLLOWING AND UNBOUNDED FOLLOWING) as a8,
+        SUM(c1) OVER (ORDER BY c2, c3 GROUPS BETWEEN current row AND UNBOUNDED FOLLOWING) as a9,
+        SUM(c1) OVER (ORDER BY c2, c3 GROUPS BETWEEN current row AND 3 FOLLOWING) as a10,
+        SUM(c1) OVER (ORDER BY c2, c3 GROUPS BETWEEN 5 FOLLOWING AND 7 FOLLOWING) as a11
+        FROM null_cases
+        ORDER BY c3
+        LIMIT 10";
+    let actual = execute_to_batches(&ctx, sql).await;
+    let expected = vec![
+        "+------+-----+------+-----+-----+------+----+-----+------+------+------+------+------+------+------+-----+-----+",
+        "| a    | b   | c    | d   | e   | f    | a1 | a2  | a3   | a4   | a5   | a6   | a7   | a8   | a9   | a10 | a11 |",
+        "+------+-----+------+-----+-----+------+----+-----+------+------+------+------+------+------+------+-----+-----+",
+        "| 818  | 910 | 818  | 910 | 910 | 818  |    | 249 | 249  | 818  | 432  | 4627 | 4234 | 4157 | 4195 | 98  | 82  |",
+        "| 537  | 979 | 537  | 979 | 979 | 537  | 72 |     |      | 537  | 210  | 4627 | 4569 | 4378 | 4489 | 169 | 55  |",
+        "| 811  | 838 | 811  | 838 | 838 | 811  | 24 | 221 | 3075 | 3665 | 3311 | 4627 | 1390 | 1276 | 1340 | 117 | 144 |",
+        "| 763  | 464 | 763  | 464 | 464 | 763  | 19 | 168 | 3572 | 4167 | 3684 | 4627 | 962  | 829  | 962  | 194 | 80  |",
+        "| 552  | 964 | 552  | 964 | 964 | 552  | 25 |     |      | 552  | 235  | 4627 | 4489 | 4320 | 4417 | 167 | 39  |",
+        "| 963  | 930 | 963  | 930 | 930 | 963  | 14 | 201 | 818  | 1580 | 1098 | 4627 | 3638 | 3455 | 3543 | 177 | 224 |",
+        "| 1113 | 814 | 1113 | 814 | 814 | 1113 | 31 | 415 | 2653 | 3351 | 2885 | 4627 | 1798 | 1694 | 1773 | 165 | 162 |",
+        "| 780  | 868 | 780  | 868 | 868 | 780  | 40 | 258 | 3143 | 3665 | 3351 | 4627 | 1340 | 1223 | 1316 | 117 | 102 |",
+        "| 740  | 466 | 740  | 466 | 466 | 740  | 82 | 164 | 3592 | 4168 | 3766 | 4627 | 962  | 768  | 943  | 244 | 122 |",
+        "| 772  | 832 | 772  | 832 | 832 | 772  |    | 277 | 3189 | 3684 | 3351 | 4627 | 1316 | 1199 | 1276 | 119 | 64  |",
+        "+------+-----+------+-----+-----+------+----+-----+------+------+------+------+------+------+------+-----+-----+",
+    ];
+    assert_batches_eq!(expected, &actual);
+    Ok(())
+}
+
+#[tokio::test]
+async fn window_frame_groups_without_order_by() -> Result<()> {
     let ctx = SessionContext::new();
     register_aggregate_csv(&ctx).await?;
     // execute the query
     let df = ctx
         .sql(
             "SELECT
-                COUNT(c1) OVER (ORDER BY c2 GROUPS BETWEEN 1 PRECEDING AND 1 FOLLOWING)
-                FROM aggregate_test_100;",
+            SUM(c4) OVER(PARTITION BY c2 GROUPS BETWEEN 1 PRECEDING AND 1 FOLLOWING)
+            FROM aggregate_test_100
+            ORDER BY c9;",
         )
         .await?;
-    let results = df.collect().await;
-    assert!(results
-        .as_ref()
-        .err()
-        .unwrap()
-        .to_string()
-        .contains("Window frame definitions involving GROUPS are not supported yet"));
+    let err = df.collect().await.unwrap_err();
+    assert_contains!(
+        err.to_string(),
+        "Execution error: GROUPS mode requires an ORDER BY clause".to_owned()
+    );
     Ok(())
 }
 
diff --git a/datafusion/physical-expr/src/window/aggregate.rs b/datafusion/physical-expr/src/window/aggregate.rs
index e6c754387..69b0812c1 100644
--- a/datafusion/physical-expr/src/window/aggregate.rs
+++ b/datafusion/physical-expr/src/window/aggregate.rs
@@ -33,6 +33,8 @@ use datafusion_expr::WindowFrame;
 use crate::{expressions::PhysicalSortExpr, PhysicalExpr};
 use crate::{window::WindowExpr, AggregateExpr};
 
+use super::window_frame_state::WindowFrameContext;
+
 /// A window expr that takes the form of an aggregate function
 #[derive(Debug)]
 pub struct AggregateWindowExpr {
@@ -114,13 +116,13 @@ impl WindowExpr for AggregateWindowExpr {
                 .map(|v| v.slice(partition_range.start, length))
                 .collect::<Vec<_>>();
 
+            let mut window_frame_ctx = WindowFrameContext::new(&window_frame);
             let mut last_range: (usize, usize) = (0, 0);
 
             // We iterate on each row to perform a running calculation.
             // First, cur_range is calculated, then it is compared with last_range.
             for i in 0..length {
-                let cur_range = self.calculate_range(
-                    &window_frame,
+                let cur_range = window_frame_ctx.calculate_range(
                     &slice_order_bys,
                     &sort_options,
                     length,
diff --git a/datafusion/physical-expr/src/window/built_in.rs b/datafusion/physical-expr/src/window/built_in.rs
index e4e377175..da551d534 100644
--- a/datafusion/physical-expr/src/window/built_in.rs
+++ b/datafusion/physical-expr/src/window/built_in.rs
@@ -17,6 +17,7 @@
 
 //! Physical exec for built-in window function expressions.
 
+use super::window_frame_state::WindowFrameContext;
 use super::BuiltInWindowFunctionExpr;
 use super::WindowExpr;
 use crate::{expressions::PhysicalSortExpr, PhysicalExpr};
@@ -113,10 +114,10 @@ impl WindowExpr for BuiltInWindowExpr {
                     .iter()
                     .map(|v| v.slice(partition_range.start, length))
                     .collect::<Vec<_>>();
+                let mut window_frame_ctx = WindowFrameContext::new(&window_frame);
                 // We iterate on each row to calculate window frame range and and window function result
                 for idx in 0..length {
-                    let range = self.calculate_range(
-                        &window_frame,
+                    let range = window_frame_ctx.calculate_range(
                         &slice_order_bys,
                         &sort_options,
                         num_rows,
diff --git a/datafusion/physical-expr/src/window/mod.rs b/datafusion/physical-expr/src/window/mod.rs
index 059f8b886..40ed658ee 100644
--- a/datafusion/physical-expr/src/window/mod.rs
+++ b/datafusion/physical-expr/src/window/mod.rs
@@ -25,6 +25,7 @@ pub(crate) mod partition_evaluator;
 pub(crate) mod rank;
 pub(crate) mod row_number;
 mod window_expr;
+mod window_frame_state;
 
 pub use aggregate::AggregateWindowExpr;
 pub use built_in::BuiltInWindowExpr;
diff --git a/datafusion/physical-expr/src/window/window_expr.rs b/datafusion/physical-expr/src/window/window_expr.rs
index 9c4b1b179..67caba51d 100644
--- a/datafusion/physical-expr/src/window/window_expr.rs
+++ b/datafusion/physical-expr/src/window/window_expr.rs
@@ -20,17 +20,12 @@ use arrow::compute::kernels::partition::lexicographical_partition_ranges;
 use arrow::compute::kernels::sort::{SortColumn, SortOptions};
 use arrow::record_batch::RecordBatch;
 use arrow::{array::ArrayRef, datatypes::Field};
-use datafusion_common::bisect::bisect;
-use datafusion_common::{DataFusionError, Result, ScalarValue};
+use datafusion_common::{DataFusionError, Result};
 use std::any::Any;
-use std::cmp::min;
 use std::fmt::Debug;
 use std::ops::Range;
 use std::sync::Arc;
 
-use datafusion_expr::WindowFrameBound;
-use datafusion_expr::{WindowFrame, WindowFrameUnits};
-
 /// A window expression that:
 /// * knows its resulting field
 pub trait WindowExpr: Send + Sync + Debug {
@@ -115,208 +110,4 @@ pub trait WindowExpr: Send + Sync + Debug {
         sort_columns.extend(order_by_columns);
         Ok(sort_columns)
     }
-
-    /// We use start and end bounds to calculate current row's starting and ending range.
-    /// This function supports different modes, but we currently do not support window calculation for GROUPS inside window frames.
-    fn calculate_range(
-        &self,
-        window_frame: &Option<Arc<WindowFrame>>,
-        range_columns: &[ArrayRef],
-        sort_options: &[SortOptions],
-        length: usize,
-        idx: usize,
-    ) -> Result<(usize, usize)> {
-        if let Some(window_frame) = window_frame {
-            match window_frame.units {
-                WindowFrameUnits::Range => {
-                    let start = match &window_frame.start_bound {
-                        // UNBOUNDED PRECEDING
-                        WindowFrameBound::Preceding(n) => {
-                            if n.is_null() {
-                                0
-                            } else {
-                                calculate_index_of_row::<true, true>(
-                                    range_columns,
-                                    sort_options,
-                                    idx,
-                                    Some(n),
-                                )?
-                            }
-                        }
-                        WindowFrameBound::CurrentRow => {
-                            if range_columns.is_empty() {
-                                0
-                            } else {
-                                calculate_index_of_row::<true, true>(
-                                    range_columns,
-                                    sort_options,
-                                    idx,
-                                    None,
-                                )?
-                            }
-                        }
-                        WindowFrameBound::Following(n) => {
-                            calculate_index_of_row::<true, false>(
-                                range_columns,
-                                sort_options,
-                                idx,
-                                Some(n),
-                            )?
-                        }
-                    };
-                    let end = match &window_frame.end_bound {
-                        WindowFrameBound::Preceding(n) => {
-                            calculate_index_of_row::<false, true>(
-                                range_columns,
-                                sort_options,
-                                idx,
-                                Some(n),
-                            )?
-                        }
-                        WindowFrameBound::CurrentRow => {
-                            if range_columns.is_empty() {
-                                length
-                            } else {
-                                calculate_index_of_row::<false, false>(
-                                    range_columns,
-                                    sort_options,
-                                    idx,
-                                    None,
-                                )?
-                            }
-                        }
-                        WindowFrameBound::Following(n) => {
-                            if n.is_null() {
-                                // UNBOUNDED FOLLOWING
-                                length
-                            } else {
-                                calculate_index_of_row::<false, false>(
-                                    range_columns,
-                                    sort_options,
-                                    idx,
-                                    Some(n),
-                                )?
-                            }
-                        }
-                    };
-                    Ok((start, end))
-                }
-                WindowFrameUnits::Rows => {
-                    let start = match window_frame.start_bound {
-                        // UNBOUNDED PRECEDING
-                        WindowFrameBound::Preceding(ScalarValue::UInt64(None)) => 0,
-                        WindowFrameBound::Preceding(ScalarValue::UInt64(Some(n))) => {
-                            if idx >= n as usize {
-                                idx - n as usize
-                            } else {
-                                0
-                            }
-                        }
-                        WindowFrameBound::Preceding(_) => {
-                            return Err(DataFusionError::Internal(
-                                "Rows should be Uint".to_string(),
-                            ))
-                        }
-                        WindowFrameBound::CurrentRow => idx,
-                        // UNBOUNDED FOLLOWING
-                        WindowFrameBound::Following(ScalarValue::UInt64(None)) => {
-                            return Err(DataFusionError::Internal(format!(
-                                "Frame start cannot be UNBOUNDED FOLLOWING '{:?}'",
-                                window_frame
-                            )))
-                        }
-                        WindowFrameBound::Following(ScalarValue::UInt64(Some(n))) => {
-                            min(idx + n as usize, length)
-                        }
-                        WindowFrameBound::Following(_) => {
-                            return Err(DataFusionError::Internal(
-                                "Rows should be Uint".to_string(),
-                            ))
-                        }
-                    };
-                    let end = match window_frame.end_bound {
-                        // UNBOUNDED PRECEDING
-                        WindowFrameBound::Preceding(ScalarValue::UInt64(None)) => {
-                            return Err(DataFusionError::Internal(format!(
-                                "Frame end cannot be UNBOUNDED PRECEDING '{:?}'",
-                                window_frame
-                            )))
-                        }
-                        WindowFrameBound::Preceding(ScalarValue::UInt64(Some(n))) => {
-                            if idx >= n as usize {
-                                idx - n as usize + 1
-                            } else {
-                                0
-                            }
-                        }
-                        WindowFrameBound::Preceding(_) => {
-                            return Err(DataFusionError::Internal(
-                                "Rows should be Uint".to_string(),
-                            ))
-                        }
-                        WindowFrameBound::CurrentRow => idx + 1,
-                        // UNBOUNDED FOLLOWING
-                        WindowFrameBound::Following(ScalarValue::UInt64(None)) => length,
-                        WindowFrameBound::Following(ScalarValue::UInt64(Some(n))) => {
-                            min(idx + n as usize + 1, length)
-                        }
-                        WindowFrameBound::Following(_) => {
-                            return Err(DataFusionError::Internal(
-                                "Rows should be Uint".to_string(),
-                            ))
-                        }
-                    };
-                    Ok((start, end))
-                }
-                WindowFrameUnits::Groups => Err(DataFusionError::NotImplemented(
-                    "Window frame for groups is not implemented".to_string(),
-                )),
-            }
-        } else {
-            Ok((0, length))
-        }
-    }
-}
-
-fn calculate_index_of_row<const BISECT_SIDE: bool, const SEARCH_SIDE: bool>(
-    range_columns: &[ArrayRef],
-    sort_options: &[SortOptions],
-    idx: usize,
-    delta: Option<&ScalarValue>,
-) -> Result<usize> {
-    let current_row_values = range_columns
-        .iter()
-        .map(|col| ScalarValue::try_from_array(col, idx))
-        .collect::<Result<Vec<ScalarValue>>>()?;
-    let end_range = if let Some(delta) = delta {
-        let is_descending: bool = sort_options
-            .first()
-            .ok_or_else(|| DataFusionError::Internal("Array is empty".to_string()))?
-            .descending;
-
-        current_row_values
-            .iter()
-            .map(|value| {
-                if value.is_null() {
-                    return Ok(value.clone());
-                }
-                if SEARCH_SIDE == is_descending {
-                    // TODO: Handle positive overflows
-                    value.add(delta)
-                } else if value.is_unsigned() && value < delta {
-                    // NOTE: This gets a polymorphic zero without having long coercion code for ScalarValue.
-                    //       If we decide to implement a "default" construction mechanism for ScalarValue,
-                    //       change the following statement to use that.
-                    value.sub(value)
-                } else {
-                    // TODO: Handle negative overflows
-                    value.sub(delta)
-                }
-            })
-            .collect::<Result<Vec<ScalarValue>>>()?
-    } else {
-        current_row_values
-    };
-    // `BISECT_SIDE` true means bisect_left, false means bisect_right
-    bisect::<BISECT_SIDE>(range_columns, &end_range, sort_options)
 }
diff --git a/datafusion/physical-expr/src/window/window_frame_state.rs b/datafusion/physical-expr/src/window/window_frame_state.rs
new file mode 100644
index 000000000..fc8d7d03a
--- /dev/null
+++ b/datafusion/physical-expr/src/window/window_frame_state.rs
@@ -0,0 +1,1003 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This module provides utilities for window frame index calculations depending on the window frame mode:
+//! RANGE, ROWS, GROUPS.
+
+use arrow::array::ArrayRef;
+use arrow::compute::kernels::sort::SortOptions;
+use datafusion_common::bisect::{bisect, find_bisect_point};
+use datafusion_common::{DataFusionError, Result, ScalarValue};
+use datafusion_expr::{WindowFrame, WindowFrameBound, WindowFrameUnits};
+use std::cmp::min;
+use std::collections::VecDeque;
+use std::fmt::Debug;
+use std::sync::Arc;
+
+/// This object stores the window frame state for use in incremental calculations.
+#[derive(Debug)]
+pub enum WindowFrameContext<'a> {
+    // ROWS-frames are inherently stateless:
+    Rows(&'a Arc<WindowFrame>),
+    // RANGE-frames will soon have a stateful implementation that is more efficient than a stateless one:
+    Range {
+        window_frame: &'a Arc<WindowFrame>,
+        state: WindowFrameStateRange,
+    },
+    // GROUPS-frames have a stateful implementation that is more efficient than a stateless one:
+    Groups {
+        window_frame: &'a Arc<WindowFrame>,
+        state: WindowFrameStateGroups,
+    },
+    Default,
+}
+
+impl<'a> WindowFrameContext<'a> {
+    /// Create a new default state for the given window frame.
+    pub fn new(window_frame: &'a Option<Arc<WindowFrame>>) -> Self {
+        if let Some(window_frame) = window_frame {
+            match window_frame.units {
+                WindowFrameUnits::Rows => WindowFrameContext::Rows(window_frame),
+                WindowFrameUnits::Range => WindowFrameContext::Range {
+                    window_frame,
+                    state: WindowFrameStateRange::default(),
+                },
+                WindowFrameUnits::Groups => WindowFrameContext::Groups {
+                    window_frame,
+                    state: WindowFrameStateGroups::default(),
+                },
+            }
+        } else {
+            WindowFrameContext::Default
+        }
+    }
+
+    /// This function calculates beginning/ending indices for the frame of the current row.
+    pub fn calculate_range(
+        &mut self,
+        range_columns: &[ArrayRef],
+        sort_options: &[SortOptions],
+        length: usize,
+        idx: usize,
+    ) -> Result<(usize, usize)> {
+        match *self {
+            WindowFrameContext::Rows(window_frame) => {
+                Self::calculate_range_rows(window_frame, length, idx)
+            }
+            // sort_options is used in RANGE mode calculations because the ordering and the position of the nulls
+            // have impact on the range calculations and comparison of the rows.
+            WindowFrameContext::Range {
+                window_frame,
+                ref mut state,
+            } => state.calculate_range(
+                window_frame,
+                range_columns,
+                sort_options,
+                length,
+                idx,
+            ),
+            // sort_options is not used in GROUPS mode calculations as the inequality of two rows is the indicator
+            // of a group change, and the ordering and the position of the nulls do not have impact on inequality.
+            WindowFrameContext::Groups {
+                window_frame,
+                ref mut state,
+            } => state.calculate_range(window_frame, range_columns, length, idx),
+            WindowFrameContext::Default => Ok((0, length)),
+        }
+    }
+
+    /// This function calculates beginning/ending indices for the frame of the current row.
+    fn calculate_range_rows(
+        window_frame: &Arc<WindowFrame>,
+        length: usize,
+        idx: usize,
+    ) -> Result<(usize, usize)> {
+        let start = match window_frame.start_bound {
+            // UNBOUNDED PRECEDING
+            WindowFrameBound::Preceding(ScalarValue::UInt64(None)) => 0,
+            WindowFrameBound::Preceding(ScalarValue::UInt64(Some(n))) => {
+                if idx >= n as usize {
+                    idx - n as usize
+                } else {
+                    0
+                }
+            }
+            WindowFrameBound::CurrentRow => idx,
+            // UNBOUNDED FOLLOWING
+            WindowFrameBound::Following(ScalarValue::UInt64(None)) => {
+                return Err(DataFusionError::Internal(format!(
+                    "Frame start cannot be UNBOUNDED FOLLOWING '{:?}'",
+                    window_frame
+                )))
+            }
+            WindowFrameBound::Following(ScalarValue::UInt64(Some(n))) => {
+                min(idx + n as usize, length)
+            }
+            // ERRONEOUS FRAMES
+            WindowFrameBound::Preceding(_) | WindowFrameBound::Following(_) => {
+                return Err(DataFusionError::Internal("Rows should be Uint".to_string()))
+            }
+        };
+        let end = match window_frame.end_bound {
+            // UNBOUNDED PRECEDING
+            WindowFrameBound::Preceding(ScalarValue::UInt64(None)) => {
+                return Err(DataFusionError::Internal(format!(
+                    "Frame end cannot be UNBOUNDED PRECEDING '{:?}'",
+                    window_frame
+                )))
+            }
+            WindowFrameBound::Preceding(ScalarValue::UInt64(Some(n))) => {
+                if idx >= n as usize {
+                    idx - n as usize + 1
+                } else {
+                    0
+                }
+            }
+            WindowFrameBound::CurrentRow => idx + 1,
+            // UNBOUNDED FOLLOWING
+            WindowFrameBound::Following(ScalarValue::UInt64(None)) => length,
+            WindowFrameBound::Following(ScalarValue::UInt64(Some(n))) => {
+                min(idx + n as usize + 1, length)
+            }
+            // ERRONEOUS FRAMES
+            WindowFrameBound::Preceding(_) | WindowFrameBound::Following(_) => {
+                return Err(DataFusionError::Internal("Rows should be Uint".to_string()))
+            }
+        };
+        Ok((start, end))
+    }
+}
+
+/// This structure encapsulates all the state information we require as we
+/// scan ranges of data while processing window frames. Currently we calculate
+/// things from scratch every time, but we will make this incremental in the future.
+#[derive(Debug, Default)]
+pub struct WindowFrameStateRange {}
+
+impl WindowFrameStateRange {
+    /// This function calculates beginning/ending indices for the frame of the current row.
+    fn calculate_range(
+        &mut self,
+        window_frame: &Arc<WindowFrame>,
+        range_columns: &[ArrayRef],
+        sort_options: &[SortOptions],
+        length: usize,
+        idx: usize,
+    ) -> Result<(usize, usize)> {
+        let start = match window_frame.start_bound {
+            WindowFrameBound::Preceding(ref n) => {
+                if n.is_null() {
+                    // UNBOUNDED PRECEDING
+                    0
+                } else {
+                    self.calculate_index_of_row::<true, true>(
+                        range_columns,
+                        sort_options,
+                        idx,
+                        Some(n),
+                    )?
+                }
+            }
+            WindowFrameBound::CurrentRow => {
+                if range_columns.is_empty() {
+                    0
+                } else {
+                    self.calculate_index_of_row::<true, true>(
+                        range_columns,
+                        sort_options,
+                        idx,
+                        None,
+                    )?
+                }
+            }
+            WindowFrameBound::Following(ref n) => self
+                .calculate_index_of_row::<true, false>(
+                    range_columns,
+                    sort_options,
+                    idx,
+                    Some(n),
+                )?,
+        };
+        let end = match window_frame.end_bound {
+            WindowFrameBound::Preceding(ref n) => self
+                .calculate_index_of_row::<false, true>(
+                    range_columns,
+                    sort_options,
+                    idx,
+                    Some(n),
+                )?,
+            WindowFrameBound::CurrentRow => {
+                if range_columns.is_empty() {
+                    length
+                } else {
+                    self.calculate_index_of_row::<false, false>(
+                        range_columns,
+                        sort_options,
+                        idx,
+                        None,
+                    )?
+                }
+            }
+            WindowFrameBound::Following(ref n) => {
+                if n.is_null() {
+                    // UNBOUNDED FOLLOWING
+                    length
+                } else {
+                    self.calculate_index_of_row::<false, false>(
+                        range_columns,
+                        sort_options,
+                        idx,
+                        Some(n),
+                    )?
+                }
+            }
+        };
+        Ok((start, end))
+    }
+
+    /// This function does the heavy lifting when finding range boundaries. It is meant to be
+    /// called twice, in succession, to get window frame start and end indices (with `BISECT_SIDE`
+    /// supplied as false and true, respectively).
+    fn calculate_index_of_row<const BISECT_SIDE: bool, const SEARCH_SIDE: bool>(
+        &mut self,
+        range_columns: &[ArrayRef],
+        sort_options: &[SortOptions],
+        idx: usize,
+        delta: Option<&ScalarValue>,
+    ) -> Result<usize> {
+        let current_row_values = range_columns
+            .iter()
+            .map(|col| ScalarValue::try_from_array(col, idx))
+            .collect::<Result<Vec<ScalarValue>>>()?;
+        let end_range = if let Some(delta) = delta {
+            let is_descending: bool = sort_options
+                .first()
+                .ok_or_else(|| DataFusionError::Internal("Array is empty".to_string()))?
+                .descending;
+
+            current_row_values
+                .iter()
+                .map(|value| {
+                    if value.is_null() {
+                        return Ok(value.clone());
+                    }
+                    if SEARCH_SIDE == is_descending {
+                        // TODO: Handle positive overflows
+                        value.add(delta)
+                    } else if value.is_unsigned() && value < delta {
+                        // NOTE: This gets a polymorphic zero without having long coercion code for ScalarValue.
+                        //       If we decide to implement a "default" construction mechanism for ScalarValue,
+                        //       change the following statement to use that.
+                        value.sub(value)
+                    } else {
+                        // TODO: Handle negative overflows
+                        value.sub(delta)
+                    }
+                })
+                .collect::<Result<Vec<ScalarValue>>>()?
+        } else {
+            current_row_values
+        };
+        // `BISECT_SIDE` true means bisect_left, false means bisect_right
+        bisect::<BISECT_SIDE>(range_columns, &end_range, sort_options)
+    }
+}
+
+// In GROUPS mode, rows with duplicate sorting values are grouped together.
+// Therefore, there must be an ORDER BY clause in the window definition to use GROUPS mode.
+// The syntax is as follows:
+//     GROUPS frame_start [ frame_exclusion ]
+//     GROUPS BETWEEN frame_start AND frame_end [ frame_exclusion ]
+// The optional frame_exclusion specifier is not yet supported.
+// The frame_start and frame_end parameters allow us to specify which rows the window
+// frame starts and ends with. They accept the following values:
+//    - UNBOUNDED PRECEDING: Start with the first row of the partition. Possible only in frame_start.
+//    - offset PRECEDING: When used in frame_start, it refers to the first row of the group
+//                        that comes "offset" groups before the current group (i.e. the group
+//                        containing the current row). When used in frame_end, it refers to the
+//                        last row of the group that comes "offset" groups before the current group.
+//    - CURRENT ROW: When used in frame_start, it refers to the first row of the group containing
+//                   the current row. When used in frame_end, it refers to the last row of the group
+//                   containing the current row.
+//    - offset FOLLOWING: When used in frame_start, it refers to the first row of the group
+//                        that comes "offset" groups after the current group (i.e. the group
+//                        containing the current row). When used in frame_end, it refers to the
+//                        last row of the group that comes "offset" groups after the current group.
+//    - UNBOUNDED FOLLOWING: End with the last row of the partition. Possible only in frame_end.
+
+// This structure encapsulates all the state information we require as we
+// scan groups of data while processing window frames.
+#[derive(Debug, Default)]
+pub struct WindowFrameStateGroups {
+    current_group_idx: u64,
+    group_start_indices: VecDeque<(Vec<ScalarValue>, usize)>,
+    previous_row_values: Option<Vec<ScalarValue>>,
+    reached_end: bool,
+    window_frame_end_idx: u64,
+    window_frame_start_idx: u64,
+}
+
+impl WindowFrameStateGroups {
+    /// This function calculates beginning/ending indices for the frame of the current row.
+    fn calculate_range(
+        &mut self,
+        window_frame: &Arc<WindowFrame>,
+        range_columns: &[ArrayRef],
+        length: usize,
+        idx: usize,
+    ) -> Result<(usize, usize)> {
+        if range_columns.is_empty() {
+            return Err(DataFusionError::Execution(
+                "GROUPS mode requires an ORDER BY clause".to_string(),
+            ));
+        }
+        let start = match window_frame.start_bound {
+            // UNBOUNDED PRECEDING
+            WindowFrameBound::Preceding(ScalarValue::UInt64(None)) => 0,
+            WindowFrameBound::Preceding(ScalarValue::UInt64(Some(n))) => self
+                .calculate_index_of_group::<true, true>(range_columns, idx, n, length)?,
+            WindowFrameBound::CurrentRow => self.calculate_index_of_group::<true, true>(
+                range_columns,
+                idx,
+                0,
+                length,
+            )?,
+            WindowFrameBound::Following(ScalarValue::UInt64(Some(n))) => self
+                .calculate_index_of_group::<true, false>(range_columns, idx, n, length)?,
+            // UNBOUNDED FOLLOWING
+            WindowFrameBound::Following(ScalarValue::UInt64(None)) => {
+                return Err(DataFusionError::Internal(format!(
+                    "Frame start cannot be UNBOUNDED FOLLOWING '{:?}'",
+                    window_frame
+                )))
+            }
+            // ERRONEOUS FRAMES
+            WindowFrameBound::Preceding(_) | WindowFrameBound::Following(_) => {
+                return Err(DataFusionError::Internal(
+                    "Groups should be Uint".to_string(),
+                ))
+            }
+        };
+        let end = match window_frame.end_bound {
+            // UNBOUNDED PRECEDING
+            WindowFrameBound::Preceding(ScalarValue::UInt64(None)) => {
+                return Err(DataFusionError::Internal(format!(
+                    "Frame end cannot be UNBOUNDED PRECEDING '{:?}'",
+                    window_frame
+                )))
+            }
+            WindowFrameBound::Preceding(ScalarValue::UInt64(Some(n))) => self
+                .calculate_index_of_group::<false, true>(range_columns, idx, n, length)?,
+            WindowFrameBound::CurrentRow => self
+                .calculate_index_of_group::<false, false>(
+                    range_columns,
+                    idx,
+                    0,
+                    length,
+                )?,
+            WindowFrameBound::Following(ScalarValue::UInt64(Some(n))) => self
+                .calculate_index_of_group::<false, false>(
+                    range_columns,
+                    idx,
+                    n,
+                    length,
+                )?,
+            // UNBOUNDED FOLLOWING
+            WindowFrameBound::Following(ScalarValue::UInt64(None)) => length,
+            // ERRONEOUS FRAMES
+            WindowFrameBound::Preceding(_) | WindowFrameBound::Following(_) => {
+                return Err(DataFusionError::Internal(
+                    "Groups should be Uint".to_string(),
+                ))
+            }
+        };
+        Ok((start, end))
+    }
+
+    /// This function does the heavy lifting when finding group boundaries. It is meant to be
+    /// called twice, in succession, to get window frame start and end indices (with `BISECT_SIDE`
+    /// supplied as false and true, respectively).
+    fn calculate_index_of_group<const BISECT_SIDE: bool, const SEARCH_SIDE: bool>(
+        &mut self,
+        range_columns: &[ArrayRef],
+        idx: usize,
+        delta: u64,
+        length: usize,
+    ) -> Result<usize> {
+        let current_row_values = range_columns
+            .iter()
+            .map(|col| ScalarValue::try_from_array(col, idx))
+            .collect::<Result<Vec<ScalarValue>>>()?;
+
+        if BISECT_SIDE {
+            // When we call this function to get the window frame start index, it tries to initialize
+            // the internal grouping state if this is not already done before. This initialization takes
+            // place only when the window frame start index is greater than or equal to zero. In this
+            // case, the current row is stored in group_start_indices, with row values as the group
+            // identifier and row index as the start index of the group.
+            if !self.initialized() {
+                self.initialize::<SEARCH_SIDE>(delta, range_columns)?;
+            }
+        } else if !self.reached_end {
+            // When we call this function to get the window frame end index, it extends the window
+            // frame one by one until the current row's window frame end index is reached by finding
+            // the next group.
+            self.extend_window_frame_if_necessary::<SEARCH_SIDE>(range_columns, delta)?;
+        }
+        // We keep track of previous row values, so that a group change can be identified.
+        // If there is a group change, the window frame is advanced and shifted by one group.
+        let group_change = match &self.previous_row_values {
+            None => false,
+            Some(values) => &current_row_values != values,
+        };
+        if self.previous_row_values.is_none() || group_change {
+            self.previous_row_values = Some(current_row_values);
+        }
+        if group_change {
+            self.current_group_idx += 1;
+            self.advance_one_group::<SEARCH_SIDE>(range_columns)?;
+            self.shift_one_group::<SEARCH_SIDE>(delta);
+        }
+        Ok(if self.group_start_indices.is_empty() {
+            if self.reached_end {
+                length
+            } else {
+                0
+            }
+        } else if BISECT_SIDE {
+            match self.group_start_indices.get(0) {
+                Some(&(_, idx)) => idx,
+                None => 0,
+            }
+        } else {
+            match (self.reached_end, self.group_start_indices.back()) {
+                (false, Some(&(_, idx))) => idx,
+                _ => length,
+            }
+        })
+    }
+
+    fn extend_window_frame_if_necessary<const SEARCH_SIDE: bool>(
+        &mut self,
+        range_columns: &[ArrayRef],
+        delta: u64,
+    ) -> Result<()> {
+        let current_window_frame_end_idx = if !SEARCH_SIDE {
+            self.current_group_idx + delta + 1
+        } else if self.current_group_idx >= delta {
+            self.current_group_idx - delta + 1
+        } else {
+            0
+        };
+        if current_window_frame_end_idx == 0 {
+            // the end index of the window frame is still before the first index
+            return Ok(());
+        }
+        if self.group_start_indices.is_empty() {
+            self.initialize_window_frame_start(range_columns)?;
+        }
+        while !self.reached_end
+            && self.window_frame_end_idx <= current_window_frame_end_idx
+        {
+            self.advance_one_group::<SEARCH_SIDE>(range_columns)?;
+        }
+        Ok(())
+    }
+
+    fn initialize<const SEARCH_SIDE: bool>(
+        &mut self,
+        delta: u64,
+        range_columns: &[ArrayRef],
+    ) -> Result<()> {
+        if !SEARCH_SIDE {
+            self.window_frame_start_idx = self.current_group_idx + delta;
+            self.initialize_window_frame_start(range_columns)
+        } else if self.current_group_idx >= delta {
+            self.window_frame_start_idx = self.current_group_idx - delta;
+            self.initialize_window_frame_start(range_columns)
+        } else {
+            Ok(())
+        }
+    }
+
+    fn initialize_window_frame_start(
+        &mut self,
+        range_columns: &[ArrayRef],
+    ) -> Result<()> {
+        let mut group_values = range_columns
+            .iter()
+            .map(|col| ScalarValue::try_from_array(col, 0))
+            .collect::<Result<Vec<ScalarValue>>>()?;
+        let mut start_idx: usize = 0;
+        for _ in 0..self.window_frame_start_idx {
+            let next_group_and_start_index =
+                WindowFrameStateGroups::find_next_group_and_start_index(
+                    range_columns,
+                    &group_values,
+                    start_idx,
+                )?;
+            if let Some(entry) = next_group_and_start_index {
+                (group_values, start_idx) = entry;
+            } else {
+                // not enough groups to generate a window frame
+                self.window_frame_end_idx = self.window_frame_start_idx;
+                self.reached_end = true;
+                return Ok(());
+            }
+        }
+        self.group_start_indices
+            .push_back((group_values, start_idx));
+        self.window_frame_end_idx = self.window_frame_start_idx + 1;
+        Ok(())
+    }
+
+    fn initialized(&self) -> bool {
+        self.reached_end || !self.group_start_indices.is_empty()
+    }
+
+    /// This function advances the window frame by one group.
+    fn advance_one_group<const SEARCH_SIDE: bool>(
+        &mut self,
+        range_columns: &[ArrayRef],
+    ) -> Result<()> {
+        let last_group_values = self.group_start_indices.back();
+        let last_group_values = if let Some(values) = last_group_values {
+            values
+        } else {
+            return Ok(());
+        };
+        let next_group_and_start_index =
+            WindowFrameStateGroups::find_next_group_and_start_index(
+                range_columns,
+                &last_group_values.0,
+                last_group_values.1,
+            )?;
+        if let Some(entry) = next_group_and_start_index {
+            self.group_start_indices.push_back(entry);
+            self.window_frame_end_idx += 1;
+        } else {
+            // not enough groups to proceed
+            self.reached_end = true;
+        }
+        Ok(())
+    }
+
+    /// This function drops the oldest group from the window frame.
+    fn shift_one_group<const SEARCH_SIDE: bool>(&mut self, delta: u64) {
+        let current_window_frame_start_idx = if !SEARCH_SIDE {
+            self.current_group_idx + delta
+        } else if self.current_group_idx >= delta {
+            self.current_group_idx - delta
+        } else {
+            0
+        };
+        if current_window_frame_start_idx > self.window_frame_start_idx {
+            self.group_start_indices.pop_front();
+            self.window_frame_start_idx += 1;
+        }
+    }
+
+    /// This function finds the next group and its start index for a given group and start index.
+    /// It utilizes an exponentially growing step size to find the group boundary.
+    // TODO: For small group sizes, proceeding one-by-one to find the group change can be more efficient.
+    // Statistics about previous group sizes can be used to choose one-by-one vs. exponentially growing,
+    // or even to set the base step_size when exponentially growing. We can also create a benchmark
+    // implementation to get insights about the crossover point.
+    fn find_next_group_and_start_index(
+        range_columns: &[ArrayRef],
+        current_row_values: &[ScalarValue],
+        idx: usize,
+    ) -> Result<Option<(Vec<ScalarValue>, usize)>> {
+        let mut step_size: usize = 1;
+        let data_size: usize = range_columns
+            .get(0)
+            .ok_or_else(|| {
+                DataFusionError::Internal("Column array shouldn't be empty".to_string())
+            })?
+            .len();
+        let mut low = idx;
+        let mut high = idx + step_size;
+        while high < data_size {
+            let val = range_columns
+                .iter()
+                .map(|arr| ScalarValue::try_from_array(arr, high))
+                .collect::<Result<Vec<ScalarValue>>>()?;
+            if val == current_row_values {
+                low = high;
+                step_size *= 2;
+                high += step_size;
+            } else {
+                break;
+            }
+        }
+        low = find_bisect_point(
+            range_columns,
+            current_row_values,
+            |current, to_compare| Ok(current == to_compare),
+            low,
+            min(high, data_size),
+        )?;
+        if low == data_size {
+            return Ok(None);
+        }
+        let val = range_columns
+            .iter()
+            .map(|arr| ScalarValue::try_from_array(arr, low))
+            .collect::<Result<Vec<ScalarValue>>>()?;
+        Ok(Some((val, low)))
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use arrow::array::Float64Array;
+    use datafusion_common::ScalarValue;
+    use std::sync::Arc;
+
+    use crate::from_slice::FromSlice;
+
+    use super::*;
+
+    struct TestData {
+        arrays: Vec<ArrayRef>,
+        group_indices: [usize; 6],
+        num_groups: usize,
+        num_rows: usize,
+        next_group_indices: [usize; 5],
+    }
+
+    fn test_data() -> TestData {
+        let num_groups: usize = 5;
+        let num_rows: usize = 6;
+        let group_indices = [0, 1, 2, 2, 4, 5];
+        let next_group_indices = [1, 2, 4, 4, 5];
+
+        let arrays: Vec<ArrayRef> = vec![
+            Arc::new(Float64Array::from_slice([5.0, 7.0, 8.0, 8., 9., 10.])),
+            Arc::new(Float64Array::from_slice([2.0, 3.0, 3.0, 3., 4.0, 5.0])),
+            Arc::new(Float64Array::from_slice([5.0, 7.0, 8.0, 8., 10., 11.0])),
+            Arc::new(Float64Array::from_slice([15.0, 13.0, 8.0, 8., 5., 0.0])),
+        ];
+        TestData {
+            arrays,
+            group_indices,
+            num_groups,
+            num_rows,
+            next_group_indices,
+        }
+    }
+
+    #[test]
+    fn test_find_next_group_and_start_index() {
+        let test_data = test_data();
+        for (current_idx, next_idx) in test_data.next_group_indices.iter().enumerate() {
+            let current_row_values = test_data
+                .arrays
+                .iter()
+                .map(|col| ScalarValue::try_from_array(col, current_idx))
+                .collect::<Result<Vec<ScalarValue>>>()
+                .unwrap();
+            let next_row_values = test_data
+                .arrays
+                .iter()
+                .map(|col| ScalarValue::try_from_array(col, *next_idx))
+                .collect::<Result<Vec<ScalarValue>>>()
+                .unwrap();
+            let res = WindowFrameStateGroups::find_next_group_and_start_index(
+                &test_data.arrays,
+                &current_row_values,
+                current_idx,
+            )
+            .unwrap();
+            assert_eq!(res, Some((next_row_values, *next_idx)));
+        }
+        let current_idx = test_data.num_rows - 1;
+        let current_row_values = test_data
+            .arrays
+            .iter()
+            .map(|col| ScalarValue::try_from_array(col, current_idx))
+            .collect::<Result<Vec<ScalarValue>>>()
+            .unwrap();
+        let res = WindowFrameStateGroups::find_next_group_and_start_index(
+            &test_data.arrays,
+            &current_row_values,
+            current_idx,
+        )
+        .unwrap();
+        assert_eq!(res, None);
+    }
+
+    #[test]
+    fn test_window_frame_groups_preceding_delta_greater_than_partition_size() {
+        const START: bool = true;
+        const END: bool = false;
+        const PRECEDING: bool = true;
+        const DELTA: u64 = 10;
+
+        let test_data = test_data();
+        let mut window_frame_groups = WindowFrameStateGroups::default();
+        window_frame_groups
+            .initialize::<PRECEDING>(DELTA, &test_data.arrays)
+            .unwrap();
+        assert_eq!(window_frame_groups.window_frame_start_idx, 0);
+        assert_eq!(window_frame_groups.window_frame_end_idx, 0);
+        assert!(!window_frame_groups.reached_end);
+        assert_eq!(window_frame_groups.group_start_indices.len(), 0);
+
+        window_frame_groups
+            .extend_window_frame_if_necessary::<PRECEDING>(&test_data.arrays, DELTA)
+            .unwrap();
+        assert_eq!(window_frame_groups.window_frame_start_idx, 0);
+        assert_eq!(window_frame_groups.window_frame_end_idx, 0);
+        assert!(!window_frame_groups.reached_end);
+        assert_eq!(window_frame_groups.group_start_indices.len(), 0);
+
+        for idx in 0..test_data.num_rows {
+            let start = window_frame_groups
+                .calculate_index_of_group::<START, PRECEDING>(
+                    &test_data.arrays,
+                    idx,
+                    DELTA,
+                    test_data.num_rows,
+                )
+                .unwrap();
+            assert_eq!(start, 0);
+            let end = window_frame_groups
+                .calculate_index_of_group::<END, PRECEDING>(
+                    &test_data.arrays,
+                    idx,
+                    DELTA,
+                    test_data.num_rows,
+                )
+                .unwrap();
+            assert_eq!(end, 0);
+        }
+    }
+
+    #[test]
+    fn test_window_frame_groups_following_delta_greater_than_partition_size() {
+        const START: bool = true;
+        const END: bool = false;
+        const FOLLOWING: bool = false;
+        const DELTA: u64 = 10;
+
+        let test_data = test_data();
+        let mut window_frame_groups = WindowFrameStateGroups::default();
+        window_frame_groups
+            .initialize::<FOLLOWING>(DELTA, &test_data.arrays)
+            .unwrap();
+        assert_eq!(window_frame_groups.window_frame_start_idx, DELTA);
+        assert_eq!(window_frame_groups.window_frame_end_idx, DELTA);
+        assert!(window_frame_groups.reached_end);
+        assert_eq!(window_frame_groups.group_start_indices.len(), 0);
+
+        window_frame_groups
+            .extend_window_frame_if_necessary::<FOLLOWING>(&test_data.arrays, DELTA)
+            .unwrap();
+        assert_eq!(window_frame_groups.window_frame_start_idx, DELTA);
+        assert_eq!(window_frame_groups.window_frame_end_idx, DELTA);
+        assert!(window_frame_groups.reached_end);
+        assert_eq!(window_frame_groups.group_start_indices.len(), 0);
+
+        for idx in 0..test_data.num_rows {
+            let start = window_frame_groups
+                .calculate_index_of_group::<START, FOLLOWING>(
+                    &test_data.arrays,
+                    idx,
+                    DELTA,
+                    test_data.num_rows,
+                )
+                .unwrap();
+            assert_eq!(start, test_data.num_rows);
+            let end = window_frame_groups
+                .calculate_index_of_group::<END, FOLLOWING>(
+                    &test_data.arrays,
+                    idx,
+                    DELTA,
+                    test_data.num_rows,
+                )
+                .unwrap();
+            assert_eq!(end, test_data.num_rows);
+        }
+    }
+
+    #[test]
+    fn test_window_frame_groups_preceding_and_following_delta_greater_than_partition_size(
+    ) {
+        const START: bool = true;
+        const END: bool = false;
+        const FOLLOWING: bool = false;
+        const PRECEDING: bool = true;
+        const DELTA: u64 = 10;
+
+        let test_data = test_data();
+        let mut window_frame_groups = WindowFrameStateGroups::default();
+        window_frame_groups
+            .initialize::<PRECEDING>(DELTA, &test_data.arrays)
+            .unwrap();
+        assert_eq!(window_frame_groups.window_frame_start_idx, 0);
+        assert_eq!(window_frame_groups.window_frame_end_idx, 0);
+        assert!(!window_frame_groups.reached_end);
+        assert_eq!(window_frame_groups.group_start_indices.len(), 0);
+
+        window_frame_groups
+            .extend_window_frame_if_necessary::<FOLLOWING>(&test_data.arrays, DELTA)
+            .unwrap();
+        assert_eq!(window_frame_groups.window_frame_start_idx, 0);
+        assert_eq!(
+            window_frame_groups.window_frame_end_idx,
+            test_data.num_groups as u64
+        );
+        assert!(window_frame_groups.reached_end);
+        assert_eq!(
+            window_frame_groups.group_start_indices.len(),
+            test_data.num_groups
+        );
+
+        for idx in 0..test_data.num_rows {
+            let start = window_frame_groups
+                .calculate_index_of_group::<START, PRECEDING>(
+                    &test_data.arrays,
+                    idx,
+                    DELTA,
+                    test_data.num_rows,
+                )
+                .unwrap();
+            assert_eq!(start, 0);
+            let end = window_frame_groups
+                .calculate_index_of_group::<END, FOLLOWING>(
+                    &test_data.arrays,
+                    idx,
+                    DELTA,
+                    test_data.num_rows,
+                )
+                .unwrap();
+            assert_eq!(end, test_data.num_rows);
+        }
+    }
+
+    #[test]
+    fn test_window_frame_groups_preceding_and_following_1() {
+        const START: bool = true;
+        const END: bool = false;
+        const FOLLOWING: bool = false;
+        const PRECEDING: bool = true;
+        const DELTA: u64 = 1;
+
+        let test_data = test_data();
+        let mut window_frame_groups = WindowFrameStateGroups::default();
+        window_frame_groups
+            .initialize::<PRECEDING>(DELTA, &test_data.arrays)
+            .unwrap();
+        assert_eq!(window_frame_groups.window_frame_start_idx, 0);
+        assert_eq!(window_frame_groups.window_frame_end_idx, 0);
+        assert!(!window_frame_groups.reached_end);
+        assert_eq!(window_frame_groups.group_start_indices.len(), 0);
+
+        window_frame_groups
+            .extend_window_frame_if_necessary::<FOLLOWING>(&test_data.arrays, DELTA)
+            .unwrap();
+        assert_eq!(window_frame_groups.window_frame_start_idx, 0);
+        assert_eq!(window_frame_groups.window_frame_end_idx, 2 * DELTA + 1);
+        assert!(!window_frame_groups.reached_end);
+        assert_eq!(
+            window_frame_groups.group_start_indices.len(),
+            2 * DELTA as usize + 1
+        );
+
+        for idx in 0..test_data.num_rows {
+            let start_idx = if idx < DELTA as usize {
+                0
+            } else {
+                test_data.group_indices[idx] - DELTA as usize
+            };
+            let start = window_frame_groups
+                .calculate_index_of_group::<START, PRECEDING>(
+                    &test_data.arrays,
+                    idx,
+                    DELTA,
+                    test_data.num_rows,
+                )
+                .unwrap();
+            assert_eq!(start, test_data.group_indices[start_idx]);
+            let mut end_idx = if idx >= test_data.num_groups {
+                test_data.num_rows
+            } else {
+                test_data.next_group_indices[idx]
+            };
+            for _ in 0..DELTA {
+                end_idx = if end_idx >= test_data.num_groups {
+                    test_data.num_rows
+                } else {
+                    test_data.next_group_indices[end_idx]
+                };
+            }
+            let end = window_frame_groups
+                .calculate_index_of_group::<END, FOLLOWING>(
+                    &test_data.arrays,
+                    idx,
+                    DELTA,
+                    test_data.num_rows,
+                )
+                .unwrap();
+            assert_eq!(end, end_idx);
+        }
+    }
+
+    #[test]
+    fn test_window_frame_groups_preceding_1_and_unbounded_following() {
+        const START: bool = true;
+        const PRECEDING: bool = true;
+        const DELTA: u64 = 1;
+
+        let test_data = test_data();
+        let mut window_frame_groups = WindowFrameStateGroups::default();
+        window_frame_groups
+            .initialize::<PRECEDING>(DELTA, &test_data.arrays)
+            .unwrap();
+        assert_eq!(window_frame_groups.window_frame_start_idx, 0);
+        assert_eq!(window_frame_groups.window_frame_end_idx, 0);
+        assert!(!window_frame_groups.reached_end);
+        assert_eq!(window_frame_groups.group_start_indices.len(), 0);
+
+        for idx in 0..test_data.num_rows {
+            let start_idx = if idx < DELTA as usize {
+                0
+            } else {
+                test_data.group_indices[idx] - DELTA as usize
+            };
+            let start = window_frame_groups
+                .calculate_index_of_group::<START, PRECEDING>(
+                    &test_data.arrays,
+                    idx,
+                    DELTA,
+                    test_data.num_rows,
+                )
+                .unwrap();
+            assert_eq!(start, test_data.group_indices[start_idx]);
+        }
+    }
+
+    #[test]
+    fn test_window_frame_groups_current_and_unbounded_following() {
+        const START: bool = true;
+        const PRECEDING: bool = true;
+        const DELTA: u64 = 0;
+
+        let test_data = test_data();
+        let mut window_frame_groups = WindowFrameStateGroups::default();
+        window_frame_groups
+            .initialize::<PRECEDING>(DELTA, &test_data.arrays)
+            .unwrap();
+        assert_eq!(window_frame_groups.window_frame_start_idx, 0);
+        assert_eq!(window_frame_groups.window_frame_end_idx, 1);
+        assert!(!window_frame_groups.reached_end);
+        assert_eq!(window_frame_groups.group_start_indices.len(), 1);
+
+        for idx in 0..test_data.num_rows {
+            let start = window_frame_groups
+                .calculate_index_of_group::<START, PRECEDING>(
+                    &test_data.arrays,
+                    idx,
+                    DELTA,
+                    test_data.num_rows,
+                )
+                .unwrap();
+            assert_eq!(start, test_data.group_indices[idx]);
+        }
+    }
+}
diff --git a/integration-tests/sqls/simple_window_groups.sql b/integration-tests/sqls/simple_window_groups.sql
new file mode 100644
index 000000000..dd369c28c
--- /dev/null
+++ b/integration-tests/sqls/simple_window_groups.sql
@@ -0,0 +1,71 @@
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+
+-- http://www.apache.org/licenses/LICENSE-2.0
+
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+SELECT
+    SUM(c4) OVER(ORDER BY c3 DESC GROUPS BETWEEN 3 PRECEDING AND 1 FOLLOWING) as summation1,
+    SUM(c4) OVER(ORDER BY c3 DESC GROUPS BETWEEN 3 PRECEDING AND 2 PRECEDING) as summation2,
+    SUM(c5) OVER(ORDER BY c4 DESC GROUPS BETWEEN 2 PRECEDING AND 2 PRECEDING) as summation3,
+    SUM(c4) OVER(ORDER BY c3 DESC GROUPS BETWEEN 1 FOLLOWING AND 3 FOLLOWING) as summation4,
+    SUM(c3) OVER(ORDER BY c4 DESC GROUPS BETWEEN 1 FOLLOWING AND 1 FOLLOWING) as summation5,
+    SUM(c5) OVER(ORDER BY c3 DESC GROUPS 2 PRECEDING) as summation6,
+    SUM(c5) OVER(ORDER BY c3 DESC GROUPS BETWEEN CURRENT ROW AND 3 FOLLOWING) as summation7,
+    SUM(c5) OVER(ORDER BY c4 DESC GROUPS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) as summation8,
+    SUM(c4) OVER(ORDER BY c4 DESC GROUPS UNBOUNDED PRECEDING) as summation9,
+    SUM(c5) OVER(ORDER BY c4 DESC GROUPS CURRENT ROW) as summation10,
+    SUM(c5) OVER(ORDER BY c4 DESC GROUPS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) as summation11,
+    SUM(c5) OVER(ORDER BY c4 DESC GROUPS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING) as summation12,
+    SUM(c5) OVER(ORDER BY c4 DESC GROUPS BETWEEN 1 FOLLOWING AND UNBOUNDED FOLLOWING) as summation13,
+    SUM(c4) OVER(PARTITION BY c4 ORDER BY c3 GROUPS BETWEEN 3 PRECEDING AND 1 FOLLOWING) as summation21,
+    SUM(c4) OVER(PARTITION BY c4 ORDER BY c3 GROUPS BETWEEN 3 PRECEDING AND 2 PRECEDING) as summation22,
+    SUM(c5) OVER(PARTITION BY c4 ORDER BY c4 GROUPS BETWEEN 2 PRECEDING AND 2 PRECEDING) as summation23,
+    SUM(c4) OVER(PARTITION BY c4 ORDER BY c3 GROUPS BETWEEN 1 FOLLOWING AND 3 FOLLOWING) as summation24,
+    SUM(c3) OVER(PARTITION BY c4 ORDER BY c4 GROUPS BETWEEN 1 FOLLOWING AND 1 FOLLOWING) as summation25,
+    SUM(c5) OVER(PARTITION BY c4 ORDER BY c3 GROUPS 2 PRECEDING) as summation26,
+    SUM(c5) OVER(PARTITION BY c4 ORDER BY c3 GROUPS BETWEEN CURRENT ROW AND 3 FOLLOWING) as summation27,
+    SUM(c5) OVER(PARTITION BY c4 ORDER BY c4 GROUPS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) as summation28,
+    SUM(c4) OVER(PARTITION BY c4 ORDER BY c4 GROUPS UNBOUNDED PRECEDING) as summation29,
+    SUM(c5) OVER(PARTITION BY c4 ORDER BY c4 GROUPS CURRENT ROW) as summation30,
+    SUM(c5) OVER(PARTITION BY c4 ORDER BY c4 GROUPS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) as summation31,
+    SUM(c5) OVER(PARTITION BY c4 ORDER BY c4 GROUPS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING) as summation32,
+    SUM(c5) OVER(PARTITION BY c4 ORDER BY c4 GROUPS BETWEEN 1 FOLLOWING AND UNBOUNDED FOLLOWING) as summation33,
+    SUM(c4) OVER(ORDER BY c5, c3 GROUPS BETWEEN 3 PRECEDING AND 1 FOLLOWING) as summation41,
+    SUM(c4) OVER(ORDER BY c5, c3 GROUPS BETWEEN 3 PRECEDING AND 2 PRECEDING) as summation42,
+    SUM(c5) OVER(ORDER BY c5, c4 GROUPS BETWEEN 2 PRECEDING AND 2 PRECEDING) as summation43,
+    SUM(c4) OVER(ORDER BY c5, c3 GROUPS BETWEEN 1 FOLLOWING AND 3 FOLLOWING) as summation44,
+    SUM(c3) OVER(ORDER BY c5, c4 GROUPS BETWEEN 1 FOLLOWING AND 1 FOLLOWING) as summation45,
+    SUM(c5) OVER(ORDER BY c5, c3 GROUPS 2 PRECEDING) as summation46,
+    SUM(c5) OVER(ORDER BY c5, c3 GROUPS BETWEEN CURRENT ROW AND 3 FOLLOWING) as summation47,
+    SUM(c5) OVER(ORDER BY c5, c4 GROUPS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) as summation48,
+    SUM(c4) OVER(ORDER BY c5, c4 GROUPS UNBOUNDED PRECEDING) as summation49,
+    SUM(c5) OVER(ORDER BY c5, c4 GROUPS CURRENT ROW) as summation50,
+    SUM(c5) OVER(ORDER BY c5, c4 GROUPS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) as summation51,
+    SUM(c5) OVER(ORDER BY c5, c4 GROUPS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING) as summation52,
+    SUM(c5) OVER(ORDER BY c5, c4 GROUPS BETWEEN 1 FOLLOWING AND UNBOUNDED FOLLOWING) as summation53,
+    SUM(c4) OVER(ORDER BY c3 GROUPS BETWEEN 3 PRECEDING AND 1 FOLLOWING) as summation61,
+    SUM(c4) OVER(ORDER BY c3 GROUPS BETWEEN 3 PRECEDING AND 2 PRECEDING) as summation62,
+    SUM(c5) OVER(ORDER BY c4 GROUPS BETWEEN 2 PRECEDING AND 2 PRECEDING) as summation63,
+    SUM(c4) OVER(ORDER BY c3 GROUPS BETWEEN 1 FOLLOWING AND 3 FOLLOWING) as summation64,
+    SUM(c3) OVER(ORDER BY c4 GROUPS BETWEEN 1 FOLLOWING AND 1 FOLLOWING) as summation65,
+    SUM(c5) OVER(ORDER BY c3 GROUPS 2 PRECEDING) as summation66,
+    SUM(c5) OVER(ORDER BY c3 GROUPS BETWEEN CURRENT ROW AND 3 FOLLOWING) as summation67,
+    SUM(c5) OVER(ORDER BY c4 GROUPS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) as summation68,
+    SUM(c4) OVER(ORDER BY c4 GROUPS UNBOUNDED PRECEDING) as summation69,
+    SUM(c5) OVER(ORDER BY c4 GROUPS CURRENT ROW) as summation70,
+    SUM(c5) OVER(ORDER BY c4 GROUPS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) as summation71,
+    SUM(c5) OVER(ORDER BY c4 GROUPS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING) as summation72,
+    SUM(c5) OVER(ORDER BY c4 GROUPS BETWEEN 1 FOLLOWING AND UNBOUNDED FOLLOWING) as summation73
+FROM test
+ORDER BY c9;
diff --git a/integration-tests/test_psql_parity.py b/integration-tests/test_psql_parity.py
index be197eb61..e4c2bb17e 100644
--- a/integration-tests/test_psql_parity.py
+++ b/integration-tests/test_psql_parity.py
@@ -82,11 +82,10 @@ test_files = set(root.glob("*.sql"))
 
 class TestPsqlParity:
     def test_tests_count(self):
-        assert len(test_files) == 25, "tests are missed"
+        assert len(test_files) == 26, "tests are missed"
 
     @pytest.mark.parametrize("fname", test_files, ids=str)
     def test_sql_file(self, fname):
         datafusion_output = pd.read_csv(io.BytesIO(generate_csv_from_datafusion(fname)))
         psql_output = pd.read_csv(io.BytesIO(generate_csv_from_psql(fname)))
         np.testing.assert_allclose(datafusion_output, psql_output, equal_nan=True, verbose=True)
-