You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/11/10 20:23:25 UTC

[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #4155: Window frame GROUPS mode support

alamb commented on code in PR #4155:
URL: https://github.com/apache/arrow-datafusion/pull/4155#discussion_r1019560970


##########
datafusion/core/src/physical_plan/planner.rs:
##########
@@ -1511,12 +1511,6 @@ pub fn create_window_expr_with_name(
                 })
                 .collect::<Result<Vec<_>>>()?;
             if let Some(ref window_frame) = window_frame {
-                if window_frame.units == WindowFrameUnits::Groups {

Review Comment:
   ❤️ 



##########
datafusion/common/src/bisect.rs:
##########
@@ -60,22 +60,37 @@ pub fn bisect<const SIDE: bool>(
     target: &[ScalarValue],
     sort_options: &[SortOptions],
 ) -> Result<usize> {
-    let mut low: usize = 0;
-    let mut high: usize = item_columns
+    let low: usize = 0;
+    let high: usize = item_columns
         .get(0)
         .ok_or_else(|| {
             DataFusionError::Internal("Column array shouldn't be empty".to_string())
         })?
         .len();
+    let compare_fn = |current: &[ScalarValue], target: &[ScalarValue]| {
+        let cmp = compare(current, target, sort_options)?;
+        Ok(if SIDE { cmp.is_lt() } else { cmp.is_le() })
+    };
+    find_bisect_point(item_columns, target, compare_fn, low, high)
+}
+
+pub fn find_bisect_point<F>(

Review Comment:
   I wonder if we could perhaps add some docstrings to this function. Specifically what the expected behavior of `compare_fn` and what its output is used for. 



##########
datafusion/core/tests/sql/window.rs:
##########
@@ -1189,24 +1189,120 @@ async fn window_frame_ranges_unbounded_preceding_err() -> Result<()> {
 }
 
 #[tokio::test]
-async fn window_frame_groups_query() -> Result<()> {
+async fn window_frame_groups_preceding_following_desc() -> Result<()> {
+    let ctx = SessionContext::new();
+    register_aggregate_csv(&ctx).await?;
+    let sql = "SELECT
+        SUM(c4) OVER(ORDER BY c2 DESC GROUPS BETWEEN 1 PRECEDING AND 1 FOLLOWING),
+        SUM(c3) OVER(ORDER BY c2 DESC GROUPS BETWEEN 10000 PRECEDING AND 10000 FOLLOWING),
+        COUNT(*) OVER(ORDER BY c2 DESC GROUPS BETWEEN 1 PRECEDING AND 1 FOLLOWING)
+        FROM aggregate_test_100
+        ORDER BY c9
+        LIMIT 5";
+    let actual = execute_to_batches(&ctx, sql).await;
+    let expected = vec![
+        "+----------------------------+----------------------------+-----------------+",
+        "| SUM(aggregate_test_100.c4) | SUM(aggregate_test_100.c3) | COUNT(UInt8(1)) |",
+        "+----------------------------+----------------------------+-----------------+",
+        "| 52276                      | 781                        | 56              |",
+        "| 260620                     | 781                        | 63              |",
+        "| -28623                     | 781                        | 37              |",
+        "| 260620                     | 781                        | 63              |",
+        "| 260620                     | 781                        | 63              |",
+        "+----------------------------+----------------------------+-----------------+",
+    ];
+    assert_batches_eq!(expected, &actual);
+    Ok(())
+}
+
+#[tokio::test]
+async fn window_frame_groups_order_by_null_desc() -> Result<()> {
+    let ctx = SessionContext::new();
+    register_aggregate_null_cases_csv(&ctx).await?;
+    let sql = "SELECT
+        COUNT(c2) OVER (ORDER BY c1 DESC GROUPS BETWEEN 5 PRECEDING AND 3 FOLLOWING)
+        FROM null_cases
+        LIMIT 5";
+    let actual = execute_to_batches(&ctx, sql).await;
+    let expected = vec![
+        "+----------------------+",
+        "| COUNT(null_cases.c2) |",
+        "+----------------------+",
+        "| 12                   |",
+        "| 12                   |",
+        "| 12                   |",
+        "| 12                   |",
+        "| 12                   |",
+        "+----------------------+",
+    ];
+    assert_batches_eq!(expected, &actual);
+    Ok(())
+}
+
+#[tokio::test]
+async fn window_frame_groups() -> Result<()> {
+    let ctx = SessionContext::new();
+    register_aggregate_null_cases_csv(&ctx).await?;
+    let sql = "SELECT
+        SUM(c1) OVER (ORDER BY c3 GROUPS BETWEEN 9 PRECEDING AND 11 FOLLOWING) as a,
+        SUM(c1) OVER (ORDER BY c3 DESC GROUPS BETWEEN 9 PRECEDING AND 11 FOLLOWING) as b,
+        SUM(c1) OVER (ORDER BY c3 NULLS first GROUPS BETWEEN 9 PRECEDING AND 11 FOLLOWING) as c,
+        SUM(c1) OVER (ORDER BY c3 DESC NULLS last GROUPS BETWEEN 9 PRECEDING AND 11 FOLLOWING) as d,
+        SUM(c1) OVER (ORDER BY c3 DESC NULLS first GROUPS BETWEEN 9 PRECEDING AND 11 FOLLOWING) as e,
+        SUM(c1) OVER (ORDER BY c3 NULLS first GROUPS BETWEEN 9 PRECEDING AND 11 FOLLOWING) as f,
+        SUM(c1) OVER (ORDER BY c3 GROUPS current row) as a1,
+        SUM(c1) OVER (ORDER BY c3 GROUPS BETWEEN 9 PRECEDING AND 5 PRECEDING) as a2,
+        SUM(c1) OVER (ORDER BY c3 GROUPS BETWEEN UNBOUNDED PRECEDING AND 5 PRECEDING) as a3,
+        SUM(c1) OVER (ORDER BY c3 GROUPS BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING) as a4,
+        SUM(c1) OVER (ORDER BY c3 GROUPS BETWEEN UNBOUNDED PRECEDING AND current row) as a5,
+        SUM(c1) OVER (ORDER BY c3 GROUPS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) as a6,
+        SUM(c1) OVER (ORDER BY c3 GROUPS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING) as a7,
+        SUM(c1) OVER (ORDER BY c3 GROUPS BETWEEN 3 FOLLOWING AND UNBOUNDED FOLLOWING) as a8,
+        SUM(c1) OVER (ORDER BY c3 GROUPS BETWEEN current row AND UNBOUNDED FOLLOWING) as a9,
+        SUM(c1) OVER (ORDER BY c3 GROUPS BETWEEN current row AND 3 FOLLOWING) as a10,
+        SUM(c1) OVER (ORDER BY c3 GROUPS BETWEEN 5 FOLLOWING AND 7 FOLLOWING) as a11
+        FROM null_cases
+        ORDER BY c3
+        LIMIT 10";
+    let actual = execute_to_batches(&ctx, sql).await;
+    let expected = vec![
+        "+-----+-----+-----+-----+-----+-----+----+-----+-----+-----+-----+------+------+------+------+-----+-----+",
+        "| a   | b   | c   | d   | e   | f   | a1 | a2  | a3  | a4  | a5  | a6   | a7   | a8   | a9   | a10 | a11 |",
+        "+-----+-----+-----+-----+-----+-----+----+-----+-----+-----+-----+------+------+------+------+-----+-----+",
+        "| 412 | 307 | 412 | 307 | 307 | 412 |    |     |     | 412 |     | 4627 | 4627 | 4531 | 4627 | 115 | 85  |",
+        "| 488 | 339 | 488 | 339 | 339 | 488 | 72 |     |     | 488 | 72  | 4627 | 4627 | 4512 | 4627 | 140 | 153 |",
+        "| 543 | 412 | 543 | 412 | 412 | 543 | 24 |     |     | 543 | 96  | 4627 | 4627 | 4487 | 4555 | 82  | 122 |",
+        "| 553 | 488 | 553 | 488 | 488 | 553 | 19 |     |     | 553 | 115 | 4627 | 4555 | 4473 | 4531 | 89  | 114 |",
+        "| 553 | 543 | 553 | 543 | 543 | 553 | 25 |     |     | 553 | 140 | 4627 | 4531 | 4442 | 4512 | 110 | 105 |",
+        "| 591 | 553 | 591 | 553 | 553 | 591 | 14 |     |     | 591 | 154 | 4627 | 4512 | 4402 | 4487 | 167 | 181 |",
+        "| 651 | 553 | 651 | 553 | 553 | 651 | 31 | 72  | 72  | 651 | 185 | 4627 | 4487 | 4320 | 4473 | 153 | 204 |",
+        "| 662 | 591 | 662 | 591 | 591 | 662 | 40 | 96  | 96  | 662 | 225 | 4627 | 4473 | 4320 | 4442 | 154 | 141 |",
+        "| 697 | 651 | 697 | 651 | 651 | 697 | 82 | 115 | 115 | 697 | 307 | 4627 | 4442 | 4288 | 4402 | 187 | 65  |",
+        "| 758 | 662 | 758 | 662 | 662 | 758 |    | 140 | 140 | 758 | 307 | 4627 | 4402 | 4215 | 4320 | 181 | 48  |",
+        "+-----+-----+-----+-----+-----+-----+----+-----+-----+-----+-----+------+------+------+------+-----+-----+",
+    ];
+    assert_batches_eq!(expected, &actual);
+    Ok(())
+}
+
+#[tokio::test]
+async fn window_frame_groups_without_order_by() -> Result<()> {
     let ctx = SessionContext::new();
     register_aggregate_csv(&ctx).await?;
     // execute the query
     let df = ctx
         .sql(
             "SELECT
-                COUNT(c1) OVER (ORDER BY c2 GROUPS BETWEEN 1 PRECEDING AND 1 FOLLOWING)
-                FROM aggregate_test_100;",
+            SUM(c4) OVER(PARTITION BY c2 GROUPS BETWEEN 1 PRECEDING AND 1 FOLLOWING)
+            FROM aggregate_test_100
+            ORDER BY c9;",
         )
         .await?;
-    let results = df.collect().await;
-    assert!(results
-        .as_ref()
-        .err()
-        .unwrap()
-        .to_string()
-        .contains("Window frame definitions involving GROUPS are not supported yet"));
+    let err = df.collect().await.unwrap_err();
+    assert_contains!(
+        err.to_string(),
+        "Execution error: GROUPS mode requires an ORDER BY clause".to_owned()

Review Comment:
   👍 



##########
datafusion/physical-expr/src/window/built_in.rs:
##########
@@ -113,10 +114,10 @@ impl WindowExpr for BuiltInWindowExpr {
                     .iter()
                     .map(|v| v.slice(partition_range.start, length))
                     .collect::<Vec<_>>();
+                let mut window_frame_ctx = WindowFrameContext::new(&window_frame);

Review Comment:
   This is a very nice encapsulation of the window frame calculation. Thank you



##########
datafusion/physical-expr/src/window/window_frame_state.rs:
##########
@@ -0,0 +1,972 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This module provides utilities for window frame index calculations depending on the window frame mode:
+//! RANGE, ROWS, GROUPS.
+
+use arrow::array::ArrayRef;
+use arrow::compute::kernels::sort::SortOptions;
+use datafusion_common::bisect::{bisect, find_bisect_point};
+use datafusion_common::{DataFusionError, Result, ScalarValue};
+use datafusion_expr::{WindowFrame, WindowFrameBound, WindowFrameUnits};
+use std::cmp::min;
+use std::collections::VecDeque;
+use std::fmt::Debug;
+use std::sync::Arc;
+
+/// This object stores the window frame state for use in incremental calculations.
+#[derive(Debug)]
+pub enum WindowFrameContext<'a> {
+    // ROWS-frames are inherently stateless:
+    Rows(&'a Arc<WindowFrame>),

Review Comment:
   As a matter of style, I find `&Arc<WindowFrame>` strange. I would expect either an owned Arc (aka increment the ref count):
   
   ```suggestion
       Rows(Arc<WindowFrame>),
   ```
   
   Or else just a reference:
   
   ```suggestion
       Rows(&'a WindowFrame),
   ```
   
   
   I don't understand the need to have a reference to the Arc



##########
datafusion/physical-expr/src/window/window_frame_state.rs:
##########
@@ -0,0 +1,972 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This module provides utilities for window frame index calculations depending on the window frame mode:
+//! RANGE, ROWS, GROUPS.
+
+use arrow::array::ArrayRef;
+use arrow::compute::kernels::sort::SortOptions;
+use datafusion_common::bisect::{bisect, find_bisect_point};
+use datafusion_common::{DataFusionError, Result, ScalarValue};
+use datafusion_expr::{WindowFrame, WindowFrameBound, WindowFrameUnits};
+use std::cmp::min;
+use std::collections::VecDeque;
+use std::fmt::Debug;
+use std::sync::Arc;
+
+/// This object stores the window frame state for use in incremental calculations.
+#[derive(Debug)]
+pub enum WindowFrameContext<'a> {
+    // ROWS-frames are inherently stateless:
+    Rows(&'a Arc<WindowFrame>),
+    // RANGE-frames will soon have a stateful implementation that is more efficient than a stateless one:
+    Range {
+        window_frame: &'a Arc<WindowFrame>,
+        state: WindowFrameStateRange,
+    },
+    // GROUPS-frames have a stateful implementation that is more efficient than a stateless one:
+    Groups {
+        window_frame: &'a Arc<WindowFrame>,
+        state: WindowFrameStateGroups,
+    },
+    Default,
+}
+
+impl<'a> WindowFrameContext<'a> {
+    /// Create a new default state for the given window frame.
+    pub fn new(window_frame: &'a Option<Arc<WindowFrame>>) -> Self {
+        if let Some(window_frame) = window_frame {
+            match window_frame.units {
+                WindowFrameUnits::Rows => WindowFrameContext::Rows(window_frame),
+                WindowFrameUnits::Range => WindowFrameContext::Range {
+                    window_frame,
+                    state: WindowFrameStateRange::default(),
+                },
+                WindowFrameUnits::Groups => WindowFrameContext::Groups {
+                    window_frame,
+                    state: WindowFrameStateGroups::default(),
+                },
+            }
+        } else {
+            WindowFrameContext::Default
+        }
+    }
+
+    /// This function calculates beginning/ending indices for the frame of the current row.
+    pub fn calculate_range(
+        &mut self,
+        range_columns: &[ArrayRef],
+        sort_options: &[SortOptions],
+        length: usize,
+        idx: usize,
+    ) -> Result<(usize, usize)> {
+        match *self {
+            WindowFrameContext::Rows(window_frame) => {
+                Self::calculate_range_rows(window_frame, length, idx)
+            }
+            WindowFrameContext::Range {
+                window_frame,
+                ref mut state,
+            } => state.calculate_range(
+                window_frame,
+                range_columns,
+                sort_options,
+                length,
+                idx,
+            ),
+            WindowFrameContext::Groups {
+                window_frame,
+                ref mut state,
+            } => state.calculate_range(
+                window_frame,
+                range_columns,
+                sort_options,
+                length,
+                idx,
+            ),
+            WindowFrameContext::Default => Ok((0, length)),
+        }
+    }
+
+    /// This function calculates beginning/ending indices for the frame of the current row.
+    fn calculate_range_rows(
+        window_frame: &Arc<WindowFrame>,
+        length: usize,
+        idx: usize,
+    ) -> Result<(usize, usize)> {
+        let start = match window_frame.start_bound {
+            // UNBOUNDED PRECEDING
+            WindowFrameBound::Preceding(ScalarValue::UInt64(None)) => 0,
+            WindowFrameBound::Preceding(ScalarValue::UInt64(Some(n))) => {
+                if idx >= n as usize {
+                    idx - n as usize
+                } else {
+                    0
+                }
+            }
+            WindowFrameBound::CurrentRow => idx,
+            // UNBOUNDED FOLLOWING
+            WindowFrameBound::Following(ScalarValue::UInt64(None)) => {
+                return Err(DataFusionError::Internal(format!(
+                    "Frame start cannot be UNBOUNDED FOLLOWING '{:?}'",
+                    window_frame
+                )))
+            }
+            WindowFrameBound::Following(ScalarValue::UInt64(Some(n))) => {
+                min(idx + n as usize, length)
+            }
+            // ERRONEOUS FRAMES
+            WindowFrameBound::Preceding(_) | WindowFrameBound::Following(_) => {
+                return Err(DataFusionError::Internal("Rows should be Uint".to_string()))
+            }
+        };
+        let end = match window_frame.end_bound {
+            // UNBOUNDED PRECEDING
+            WindowFrameBound::Preceding(ScalarValue::UInt64(None)) => {
+                return Err(DataFusionError::Internal(format!(
+                    "Frame end cannot be UNBOUNDED PRECEDING '{:?}'",
+                    window_frame
+                )))
+            }
+            WindowFrameBound::Preceding(ScalarValue::UInt64(Some(n))) => {
+                if idx >= n as usize {
+                    idx - n as usize + 1
+                } else {
+                    0
+                }
+            }
+            WindowFrameBound::CurrentRow => idx + 1,
+            // UNBOUNDED FOLLOWING
+            WindowFrameBound::Following(ScalarValue::UInt64(None)) => length,
+            WindowFrameBound::Following(ScalarValue::UInt64(Some(n))) => {
+                min(idx + n as usize + 1, length)
+            }
+            // ERRONEOUS FRAMES
+            WindowFrameBound::Preceding(_) | WindowFrameBound::Following(_) => {
+                return Err(DataFusionError::Internal("Rows should be Uint".to_string()))
+            }
+        };
+        Ok((start, end))
+    }
+}
+
+/// This structure encapsulates all the state information we require as we
+/// scan ranges of data while processing window frames. Currently we calculate
+/// things from scratch every time, but we will make this incremental in the future.
+#[derive(Debug, Default)]
+pub struct WindowFrameStateRange {}
+
+impl WindowFrameStateRange {
+    /// This function calculates beginning/ending indices for the frame of the current row.
+    fn calculate_range(
+        &mut self,
+        window_frame: &Arc<WindowFrame>,
+        range_columns: &[ArrayRef],
+        sort_options: &[SortOptions],
+        length: usize,
+        idx: usize,
+    ) -> Result<(usize, usize)> {
+        let start = match window_frame.start_bound {
+            WindowFrameBound::Preceding(ref n) => {
+                if n.is_null() {
+                    // UNBOUNDED PRECEDING
+                    0
+                } else {
+                    self.calculate_index_of_row::<true, true>(
+                        range_columns,
+                        sort_options,
+                        idx,
+                        Some(n),
+                    )?
+                }
+            }
+            WindowFrameBound::CurrentRow => {
+                if range_columns.is_empty() {
+                    0
+                } else {
+                    self.calculate_index_of_row::<true, true>(
+                        range_columns,
+                        sort_options,
+                        idx,
+                        None,
+                    )?
+                }
+            }
+            WindowFrameBound::Following(ref n) => self
+                .calculate_index_of_row::<true, false>(
+                    range_columns,
+                    sort_options,
+                    idx,
+                    Some(n),
+                )?,
+        };
+        let end = match window_frame.end_bound {
+            WindowFrameBound::Preceding(ref n) => self
+                .calculate_index_of_row::<false, true>(
+                    range_columns,
+                    sort_options,
+                    idx,
+                    Some(n),
+                )?,
+            WindowFrameBound::CurrentRow => {
+                if range_columns.is_empty() {
+                    length
+                } else {
+                    self.calculate_index_of_row::<false, false>(
+                        range_columns,
+                        sort_options,
+                        idx,
+                        None,
+                    )?
+                }
+            }
+            WindowFrameBound::Following(ref n) => {
+                if n.is_null() {
+                    // UNBOUNDED FOLLOWING
+                    length
+                } else {
+                    self.calculate_index_of_row::<false, false>(
+                        range_columns,
+                        sort_options,
+                        idx,
+                        Some(n),
+                    )?
+                }
+            }
+        };
+        Ok((start, end))
+    }
+
+    /// This function does the heavy lifting when finding range boundaries. It is meant to be
+    /// called twice, in succession, to get window frame start and end indices (with `BISECT_SIDE`
+    /// supplied as false and true, respectively).
+    fn calculate_index_of_row<const BISECT_SIDE: bool, const SEARCH_SIDE: bool>(
+        &mut self,
+        range_columns: &[ArrayRef],
+        sort_options: &[SortOptions],
+        idx: usize,
+        delta: Option<&ScalarValue>,
+    ) -> Result<usize> {
+        let current_row_values = range_columns
+            .iter()
+            .map(|col| ScalarValue::try_from_array(col, idx))
+            .collect::<Result<Vec<ScalarValue>>>()?;
+        let end_range = if let Some(delta) = delta {
+            let is_descending: bool = sort_options
+                .first()
+                .ok_or_else(|| DataFusionError::Internal("Array is empty".to_string()))?
+                .descending;
+
+            current_row_values
+                .iter()
+                .map(|value| {
+                    if value.is_null() {
+                        return Ok(value.clone());
+                    }
+                    if SEARCH_SIDE == is_descending {
+                        // TODO: Handle positive overflows
+                        value.add(delta)
+                    } else if value.is_unsigned() && value < delta {
+                        // NOTE: This gets a polymorphic zero without having long coercion code for ScalarValue.
+                        //       If we decide to implement a "default" construction mechanism for ScalarValue,
+                        //       change the following statement to use that.
+                        value.sub(value)
+                    } else {
+                        // TODO: Handle negative overflows
+                        value.sub(delta)
+                    }
+                })
+                .collect::<Result<Vec<ScalarValue>>>()?
+        } else {
+            current_row_values
+        };
+        // `BISECT_SIDE` true means bisect_left, false means bisect_right
+        bisect::<BISECT_SIDE>(range_columns, &end_range, sort_options)
+    }
+}
+
+// In GROUPS mode, rows with duplicate sorting values are grouped together.
+// Therefore, there must be an ORDER BY clause in the window definition to use GROUPS mode.
+// The syntax is as follows:
+//     GROUPS frame_start [ frame_exclusion ]
+//     GROUPS BETWEEN frame_start AND frame_end [ frame_exclusion ]
+// The optional frame_exclusion specifier is not yet supported.
+// The frame_start and frame_end parameters allow us to specify which rows the window
+// frame starts and ends with. They accept the following values:
+//    - UNBOUNDED PRECEDING: Start with the first row of the partition. Possible only in frame_start.
+//    - offset PRECEDING: When used in frame_start, it refers to the first row of the group
+//                        that comes "offset" groups before the current group (i.e. the group
+//                        containing the current row). When used in frame_end, it refers to the
+//                        last row of the group that comes "offset" groups before the current group.
+//    - CURRENT ROW: When used in frame_start, it refers to the first row of the group containing
+//                   the current row. When used in frame_end, it refers to the last row of the group
+//                   containing the current row.
+//    - offset FOLLOWING: When used in frame_start, it refers to the first row of the group
+//                        that comes "offset" groups after the current group (i.e. the group
+//                        containing the current row). When used in frame_end, it refers to the
+//                        last row of the group that comes "offset" groups after the current group.
+//    - UNBOUNDED FOLLOWING: End with the last row of the partition. Possible only in frame_end.
+
+// This structure encapsulates all the state information we require as we
+// scan groups of data while processing window frames.
+#[derive(Debug, Default)]
+pub struct WindowFrameStateGroups {
+    current_group_idx: u64,
+    group_start_indices: VecDeque<(Vec<ScalarValue>, usize)>,
+    previous_row_values: Option<Vec<ScalarValue>>,
+    reached_end: bool,
+    window_frame_end_idx: u64,
+    window_frame_start_idx: u64,
+}
+
+impl WindowFrameStateGroups {
+    /// This function calculates beginning/ending indices for the frame of the current row.
+    fn calculate_range(
+        &mut self,
+        window_frame: &Arc<WindowFrame>,
+        range_columns: &[ArrayRef],
+        _sort_options: &[SortOptions],

Review Comment:
   if the sort options are not used, I wonder why pass it to `calculate_range`



##########
datafusion/physical-expr/src/window/window_frame_state.rs:
##########
@@ -0,0 +1,972 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This module provides utilities for window frame index calculations depending on the window frame mode:
+//! RANGE, ROWS, GROUPS.
+
+use arrow::array::ArrayRef;
+use arrow::compute::kernels::sort::SortOptions;
+use datafusion_common::bisect::{bisect, find_bisect_point};
+use datafusion_common::{DataFusionError, Result, ScalarValue};
+use datafusion_expr::{WindowFrame, WindowFrameBound, WindowFrameUnits};
+use std::cmp::min;
+use std::collections::VecDeque;
+use std::fmt::Debug;
+use std::sync::Arc;
+
+/// This object stores the window frame state for use in incremental calculations.
+#[derive(Debug)]
+pub enum WindowFrameContext<'a> {
+    // ROWS-frames are inherently stateless:
+    Rows(&'a Arc<WindowFrame>),
+    // RANGE-frames will soon have a stateful implementation that is more efficient than a stateless one:
+    Range {
+        window_frame: &'a Arc<WindowFrame>,
+        state: WindowFrameStateRange,
+    },
+    // GROUPS-frames have a stateful implementation that is more efficient than a stateless one:
+    Groups {
+        window_frame: &'a Arc<WindowFrame>,
+        state: WindowFrameStateGroups,
+    },
+    Default,
+}
+
+impl<'a> WindowFrameContext<'a> {
+    /// Create a new default state for the given window frame.
+    pub fn new(window_frame: &'a Option<Arc<WindowFrame>>) -> Self {
+        if let Some(window_frame) = window_frame {
+            match window_frame.units {
+                WindowFrameUnits::Rows => WindowFrameContext::Rows(window_frame),
+                WindowFrameUnits::Range => WindowFrameContext::Range {
+                    window_frame,
+                    state: WindowFrameStateRange::default(),
+                },
+                WindowFrameUnits::Groups => WindowFrameContext::Groups {
+                    window_frame,
+                    state: WindowFrameStateGroups::default(),
+                },
+            }
+        } else {
+            WindowFrameContext::Default
+        }
+    }
+
+    /// This function calculates beginning/ending indices for the frame of the current row.
+    pub fn calculate_range(
+        &mut self,
+        range_columns: &[ArrayRef],
+        sort_options: &[SortOptions],
+        length: usize,
+        idx: usize,
+    ) -> Result<(usize, usize)> {
+        match *self {
+            WindowFrameContext::Rows(window_frame) => {
+                Self::calculate_range_rows(window_frame, length, idx)
+            }
+            WindowFrameContext::Range {
+                window_frame,
+                ref mut state,
+            } => state.calculate_range(
+                window_frame,
+                range_columns,
+                sort_options,
+                length,
+                idx,
+            ),
+            WindowFrameContext::Groups {
+                window_frame,
+                ref mut state,
+            } => state.calculate_range(
+                window_frame,
+                range_columns,
+                sort_options,
+                length,
+                idx,
+            ),
+            WindowFrameContext::Default => Ok((0, length)),
+        }
+    }
+
+    /// This function calculates beginning/ending indices for the frame of the current row.
+    fn calculate_range_rows(
+        window_frame: &Arc<WindowFrame>,
+        length: usize,
+        idx: usize,
+    ) -> Result<(usize, usize)> {
+        let start = match window_frame.start_bound {
+            // UNBOUNDED PRECEDING
+            WindowFrameBound::Preceding(ScalarValue::UInt64(None)) => 0,
+            WindowFrameBound::Preceding(ScalarValue::UInt64(Some(n))) => {
+                if idx >= n as usize {
+                    idx - n as usize
+                } else {
+                    0
+                }
+            }
+            WindowFrameBound::CurrentRow => idx,
+            // UNBOUNDED FOLLOWING
+            WindowFrameBound::Following(ScalarValue::UInt64(None)) => {
+                return Err(DataFusionError::Internal(format!(
+                    "Frame start cannot be UNBOUNDED FOLLOWING '{:?}'",
+                    window_frame
+                )))
+            }
+            WindowFrameBound::Following(ScalarValue::UInt64(Some(n))) => {
+                min(idx + n as usize, length)
+            }
+            // ERRONEOUS FRAMES
+            WindowFrameBound::Preceding(_) | WindowFrameBound::Following(_) => {
+                return Err(DataFusionError::Internal("Rows should be Uint".to_string()))
+            }
+        };
+        let end = match window_frame.end_bound {
+            // UNBOUNDED PRECEDING
+            WindowFrameBound::Preceding(ScalarValue::UInt64(None)) => {
+                return Err(DataFusionError::Internal(format!(
+                    "Frame end cannot be UNBOUNDED PRECEDING '{:?}'",
+                    window_frame
+                )))
+            }
+            WindowFrameBound::Preceding(ScalarValue::UInt64(Some(n))) => {
+                if idx >= n as usize {
+                    idx - n as usize + 1
+                } else {
+                    0
+                }
+            }
+            WindowFrameBound::CurrentRow => idx + 1,
+            // UNBOUNDED FOLLOWING
+            WindowFrameBound::Following(ScalarValue::UInt64(None)) => length,
+            WindowFrameBound::Following(ScalarValue::UInt64(Some(n))) => {
+                min(idx + n as usize + 1, length)
+            }
+            // ERRONEOUS FRAMES
+            WindowFrameBound::Preceding(_) | WindowFrameBound::Following(_) => {
+                return Err(DataFusionError::Internal("Rows should be Uint".to_string()))
+            }
+        };
+        Ok((start, end))
+    }
+}
+
+/// This structure encapsulates all the state information we require as we
+/// scan ranges of data while processing window frames. Currently we calculate
+/// things from scratch every time, but we will make this incremental in the future.
+#[derive(Debug, Default)]
+pub struct WindowFrameStateRange {}
+
+impl WindowFrameStateRange {
+    /// This function calculates beginning/ending indices for the frame of the current row.
+    fn calculate_range(
+        &mut self,
+        window_frame: &Arc<WindowFrame>,
+        range_columns: &[ArrayRef],
+        sort_options: &[SortOptions],
+        length: usize,
+        idx: usize,
+    ) -> Result<(usize, usize)> {
+        let start = match window_frame.start_bound {
+            WindowFrameBound::Preceding(ref n) => {
+                if n.is_null() {
+                    // UNBOUNDED PRECEDING
+                    0
+                } else {
+                    self.calculate_index_of_row::<true, true>(
+                        range_columns,
+                        sort_options,
+                        idx,
+                        Some(n),
+                    )?
+                }
+            }
+            WindowFrameBound::CurrentRow => {
+                if range_columns.is_empty() {
+                    0
+                } else {
+                    self.calculate_index_of_row::<true, true>(
+                        range_columns,
+                        sort_options,
+                        idx,
+                        None,
+                    )?
+                }
+            }
+            WindowFrameBound::Following(ref n) => self
+                .calculate_index_of_row::<true, false>(
+                    range_columns,
+                    sort_options,
+                    idx,
+                    Some(n),
+                )?,
+        };
+        let end = match window_frame.end_bound {
+            WindowFrameBound::Preceding(ref n) => self
+                .calculate_index_of_row::<false, true>(
+                    range_columns,
+                    sort_options,
+                    idx,
+                    Some(n),
+                )?,
+            WindowFrameBound::CurrentRow => {
+                if range_columns.is_empty() {
+                    length
+                } else {
+                    self.calculate_index_of_row::<false, false>(
+                        range_columns,
+                        sort_options,
+                        idx,
+                        None,
+                    )?
+                }
+            }
+            WindowFrameBound::Following(ref n) => {
+                if n.is_null() {
+                    // UNBOUNDED FOLLOWING
+                    length
+                } else {
+                    self.calculate_index_of_row::<false, false>(
+                        range_columns,
+                        sort_options,
+                        idx,
+                        Some(n),
+                    )?
+                }
+            }
+        };
+        Ok((start, end))
+    }
+
+    /// This function does the heavy lifting when finding range boundaries. It is meant to be
+    /// called twice, in succession, to get window frame start and end indices (with `BISECT_SIDE`
+    /// supplied as false and true, respectively).
+    fn calculate_index_of_row<const BISECT_SIDE: bool, const SEARCH_SIDE: bool>(
+        &mut self,
+        range_columns: &[ArrayRef],
+        sort_options: &[SortOptions],
+        idx: usize,
+        delta: Option<&ScalarValue>,
+    ) -> Result<usize> {
+        let current_row_values = range_columns
+            .iter()
+            .map(|col| ScalarValue::try_from_array(col, idx))
+            .collect::<Result<Vec<ScalarValue>>>()?;
+        let end_range = if let Some(delta) = delta {
+            let is_descending: bool = sort_options
+                .first()
+                .ok_or_else(|| DataFusionError::Internal("Array is empty".to_string()))?
+                .descending;
+
+            current_row_values
+                .iter()
+                .map(|value| {
+                    if value.is_null() {
+                        return Ok(value.clone());
+                    }
+                    if SEARCH_SIDE == is_descending {
+                        // TODO: Handle positive overflows
+                        value.add(delta)
+                    } else if value.is_unsigned() && value < delta {
+                        // NOTE: This gets a polymorphic zero without having long coercion code for ScalarValue.
+                        //       If we decide to implement a "default" construction mechanism for ScalarValue,
+                        //       change the following statement to use that.
+                        value.sub(value)
+                    } else {
+                        // TODO: Handle negative overflows
+                        value.sub(delta)
+                    }
+                })
+                .collect::<Result<Vec<ScalarValue>>>()?
+        } else {
+            current_row_values
+        };
+        // `BISECT_SIDE` true means bisect_left, false means bisect_right
+        bisect::<BISECT_SIDE>(range_columns, &end_range, sort_options)
+    }
+}
+
+// In GROUPS mode, rows with duplicate sorting values are grouped together.
+// Therefore, there must be an ORDER BY clause in the window definition to use GROUPS mode.
+// The syntax is as follows:
+//     GROUPS frame_start [ frame_exclusion ]
+//     GROUPS BETWEEN frame_start AND frame_end [ frame_exclusion ]
+// The optional frame_exclusion specifier is not yet supported.
+// The frame_start and frame_end parameters allow us to specify which rows the window
+// frame starts and ends with. They accept the following values:
+//    - UNBOUNDED PRECEDING: Start with the first row of the partition. Possible only in frame_start.
+//    - offset PRECEDING: When used in frame_start, it refers to the first row of the group
+//                        that comes "offset" groups before the current group (i.e. the group
+//                        containing the current row). When used in frame_end, it refers to the
+//                        last row of the group that comes "offset" groups before the current group.
+//    - CURRENT ROW: When used in frame_start, it refers to the first row of the group containing
+//                   the current row. When used in frame_end, it refers to the last row of the group
+//                   containing the current row.
+//    - offset FOLLOWING: When used in frame_start, it refers to the first row of the group
+//                        that comes "offset" groups after the current group (i.e. the group
+//                        containing the current row). When used in frame_end, it refers to the
+//                        last row of the group that comes "offset" groups after the current group.
+//    - UNBOUNDED FOLLOWING: End with the last row of the partition. Possible only in frame_end.
+
+// This structure encapsulates all the state information we require as we
+// scan groups of data while processing window frames.
+#[derive(Debug, Default)]
+pub struct WindowFrameStateGroups {

Review Comment:
   This structure is fine -- I just wanted to point out a pattern for writing state machines in Rust is to encapsulate each state in an enum variant, and then you can use the type system to ensure the state transitions are valid
   
   So like
   
   ```rust 
   pub enum WindowFrameStateGroupsState {
     Start { },
     InGroup { ... }
     End {}
   }
   ```
   
   Or something similar



##########
datafusion/physical-expr/src/window/window_frame_state.rs:
##########
@@ -0,0 +1,972 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This module provides utilities for window frame index calculations depending on the window frame mode:
+//! RANGE, ROWS, GROUPS.
+
+use arrow::array::ArrayRef;
+use arrow::compute::kernels::sort::SortOptions;
+use datafusion_common::bisect::{bisect, find_bisect_point};
+use datafusion_common::{DataFusionError, Result, ScalarValue};
+use datafusion_expr::{WindowFrame, WindowFrameBound, WindowFrameUnits};
+use std::cmp::min;
+use std::collections::VecDeque;
+use std::fmt::Debug;
+use std::sync::Arc;
+
+/// This object stores the window frame state for use in incremental calculations.
+#[derive(Debug)]
+pub enum WindowFrameContext<'a> {
+    // ROWS-frames are inherently stateless:
+    Rows(&'a Arc<WindowFrame>),
+    // RANGE-frames will soon have a stateful implementation that is more efficient than a stateless one:
+    Range {
+        window_frame: &'a Arc<WindowFrame>,
+        state: WindowFrameStateRange,
+    },
+    // GROUPS-frames have a stateful implementation that is more efficient than a stateless one:
+    Groups {
+        window_frame: &'a Arc<WindowFrame>,
+        state: WindowFrameStateGroups,
+    },
+    Default,
+}
+
+impl<'a> WindowFrameContext<'a> {
+    /// Create a new default state for the given window frame.
+    pub fn new(window_frame: &'a Option<Arc<WindowFrame>>) -> Self {
+        if let Some(window_frame) = window_frame {
+            match window_frame.units {
+                WindowFrameUnits::Rows => WindowFrameContext::Rows(window_frame),
+                WindowFrameUnits::Range => WindowFrameContext::Range {
+                    window_frame,
+                    state: WindowFrameStateRange::default(),
+                },
+                WindowFrameUnits::Groups => WindowFrameContext::Groups {
+                    window_frame,
+                    state: WindowFrameStateGroups::default(),
+                },
+            }
+        } else {
+            WindowFrameContext::Default
+        }
+    }
+
+    /// This function calculates beginning/ending indices for the frame of the current row.
+    pub fn calculate_range(
+        &mut self,
+        range_columns: &[ArrayRef],
+        sort_options: &[SortOptions],
+        length: usize,
+        idx: usize,
+    ) -> Result<(usize, usize)> {
+        match *self {
+            WindowFrameContext::Rows(window_frame) => {
+                Self::calculate_range_rows(window_frame, length, idx)
+            }
+            WindowFrameContext::Range {
+                window_frame,
+                ref mut state,
+            } => state.calculate_range(
+                window_frame,
+                range_columns,
+                sort_options,
+                length,
+                idx,
+            ),
+            WindowFrameContext::Groups {
+                window_frame,
+                ref mut state,
+            } => state.calculate_range(
+                window_frame,
+                range_columns,
+                sort_options,
+                length,
+                idx,
+            ),
+            WindowFrameContext::Default => Ok((0, length)),
+        }
+    }
+
+    /// This function calculates beginning/ending indices for the frame of the current row.
+    fn calculate_range_rows(
+        window_frame: &Arc<WindowFrame>,
+        length: usize,
+        idx: usize,
+    ) -> Result<(usize, usize)> {
+        let start = match window_frame.start_bound {
+            // UNBOUNDED PRECEDING
+            WindowFrameBound::Preceding(ScalarValue::UInt64(None)) => 0,
+            WindowFrameBound::Preceding(ScalarValue::UInt64(Some(n))) => {
+                if idx >= n as usize {
+                    idx - n as usize
+                } else {
+                    0
+                }
+            }
+            WindowFrameBound::CurrentRow => idx,
+            // UNBOUNDED FOLLOWING
+            WindowFrameBound::Following(ScalarValue::UInt64(None)) => {
+                return Err(DataFusionError::Internal(format!(
+                    "Frame start cannot be UNBOUNDED FOLLOWING '{:?}'",
+                    window_frame
+                )))
+            }
+            WindowFrameBound::Following(ScalarValue::UInt64(Some(n))) => {
+                min(idx + n as usize, length)
+            }
+            // ERRONEOUS FRAMES
+            WindowFrameBound::Preceding(_) | WindowFrameBound::Following(_) => {
+                return Err(DataFusionError::Internal("Rows should be Uint".to_string()))
+            }
+        };
+        let end = match window_frame.end_bound {
+            // UNBOUNDED PRECEDING
+            WindowFrameBound::Preceding(ScalarValue::UInt64(None)) => {
+                return Err(DataFusionError::Internal(format!(
+                    "Frame end cannot be UNBOUNDED PRECEDING '{:?}'",
+                    window_frame
+                )))
+            }
+            WindowFrameBound::Preceding(ScalarValue::UInt64(Some(n))) => {
+                if idx >= n as usize {
+                    idx - n as usize + 1
+                } else {
+                    0
+                }
+            }
+            WindowFrameBound::CurrentRow => idx + 1,
+            // UNBOUNDED FOLLOWING
+            WindowFrameBound::Following(ScalarValue::UInt64(None)) => length,
+            WindowFrameBound::Following(ScalarValue::UInt64(Some(n))) => {
+                min(idx + n as usize + 1, length)
+            }
+            // ERRONEOUS FRAMES
+            WindowFrameBound::Preceding(_) | WindowFrameBound::Following(_) => {
+                return Err(DataFusionError::Internal("Rows should be Uint".to_string()))
+            }
+        };
+        Ok((start, end))
+    }
+}
+
+/// This structure encapsulates all the state information we require as we
+/// scan ranges of data while processing window frames. Currently we calculate
+/// things from scratch every time, but we will make this incremental in the future.
+#[derive(Debug, Default)]
+pub struct WindowFrameStateRange {}
+
+impl WindowFrameStateRange {
+    /// This function calculates beginning/ending indices for the frame of the current row.
+    fn calculate_range(
+        &mut self,
+        window_frame: &Arc<WindowFrame>,
+        range_columns: &[ArrayRef],
+        sort_options: &[SortOptions],
+        length: usize,
+        idx: usize,
+    ) -> Result<(usize, usize)> {
+        let start = match window_frame.start_bound {
+            WindowFrameBound::Preceding(ref n) => {
+                if n.is_null() {
+                    // UNBOUNDED PRECEDING
+                    0
+                } else {
+                    self.calculate_index_of_row::<true, true>(
+                        range_columns,
+                        sort_options,
+                        idx,
+                        Some(n),
+                    )?
+                }
+            }
+            WindowFrameBound::CurrentRow => {
+                if range_columns.is_empty() {
+                    0
+                } else {
+                    self.calculate_index_of_row::<true, true>(
+                        range_columns,
+                        sort_options,
+                        idx,
+                        None,
+                    )?
+                }
+            }
+            WindowFrameBound::Following(ref n) => self
+                .calculate_index_of_row::<true, false>(
+                    range_columns,
+                    sort_options,
+                    idx,
+                    Some(n),
+                )?,
+        };
+        let end = match window_frame.end_bound {
+            WindowFrameBound::Preceding(ref n) => self
+                .calculate_index_of_row::<false, true>(
+                    range_columns,
+                    sort_options,
+                    idx,
+                    Some(n),
+                )?,
+            WindowFrameBound::CurrentRow => {
+                if range_columns.is_empty() {
+                    length
+                } else {
+                    self.calculate_index_of_row::<false, false>(
+                        range_columns,
+                        sort_options,
+                        idx,
+                        None,
+                    )?
+                }
+            }
+            WindowFrameBound::Following(ref n) => {
+                if n.is_null() {
+                    // UNBOUNDED FOLLOWING
+                    length
+                } else {
+                    self.calculate_index_of_row::<false, false>(
+                        range_columns,
+                        sort_options,
+                        idx,
+                        Some(n),
+                    )?
+                }
+            }
+        };
+        Ok((start, end))
+    }
+
+    /// This function does the heavy lifting when finding range boundaries. It is meant to be
+    /// called twice, in succession, to get window frame start and end indices (with `BISECT_SIDE`
+    /// supplied as false and true, respectively).
+    fn calculate_index_of_row<const BISECT_SIDE: bool, const SEARCH_SIDE: bool>(
+        &mut self,
+        range_columns: &[ArrayRef],
+        sort_options: &[SortOptions],
+        idx: usize,
+        delta: Option<&ScalarValue>,
+    ) -> Result<usize> {
+        let current_row_values = range_columns
+            .iter()
+            .map(|col| ScalarValue::try_from_array(col, idx))
+            .collect::<Result<Vec<ScalarValue>>>()?;
+        let end_range = if let Some(delta) = delta {
+            let is_descending: bool = sort_options
+                .first()
+                .ok_or_else(|| DataFusionError::Internal("Array is empty".to_string()))?
+                .descending;
+
+            current_row_values
+                .iter()
+                .map(|value| {
+                    if value.is_null() {
+                        return Ok(value.clone());
+                    }
+                    if SEARCH_SIDE == is_descending {
+                        // TODO: Handle positive overflows
+                        value.add(delta)
+                    } else if value.is_unsigned() && value < delta {
+                        // NOTE: This gets a polymorphic zero without having long coercion code for ScalarValue.
+                        //       If we decide to implement a "default" construction mechanism for ScalarValue,
+                        //       change the following statement to use that.
+                        value.sub(value)
+                    } else {
+                        // TODO: Handle negative overflows
+                        value.sub(delta)
+                    }
+                })
+                .collect::<Result<Vec<ScalarValue>>>()?
+        } else {
+            current_row_values
+        };
+        // `BISECT_SIDE` true means bisect_left, false means bisect_right
+        bisect::<BISECT_SIDE>(range_columns, &end_range, sort_options)
+    }
+}
+
+// In GROUPS mode, rows with duplicate sorting values are grouped together.
+// Therefore, there must be an ORDER BY clause in the window definition to use GROUPS mode.
+// The syntax is as follows:
+//     GROUPS frame_start [ frame_exclusion ]
+//     GROUPS BETWEEN frame_start AND frame_end [ frame_exclusion ]
+// The optional frame_exclusion specifier is not yet supported.
+// The frame_start and frame_end parameters allow us to specify which rows the window
+// frame starts and ends with. They accept the following values:
+//    - UNBOUNDED PRECEDING: Start with the first row of the partition. Possible only in frame_start.
+//    - offset PRECEDING: When used in frame_start, it refers to the first row of the group
+//                        that comes "offset" groups before the current group (i.e. the group
+//                        containing the current row). When used in frame_end, it refers to the
+//                        last row of the group that comes "offset" groups before the current group.
+//    - CURRENT ROW: When used in frame_start, it refers to the first row of the group containing
+//                   the current row. When used in frame_end, it refers to the last row of the group
+//                   containing the current row.
+//    - offset FOLLOWING: When used in frame_start, it refers to the first row of the group
+//                        that comes "offset" groups after the current group (i.e. the group
+//                        containing the current row). When used in frame_end, it refers to the
+//                        last row of the group that comes "offset" groups after the current group.
+//    - UNBOUNDED FOLLOWING: End with the last row of the partition. Possible only in frame_end.
+
+// This structure encapsulates all the state information we require as we
+// scan groups of data while processing window frames.
+#[derive(Debug, Default)]
+pub struct WindowFrameStateGroups {
+    current_group_idx: u64,
+    group_start_indices: VecDeque<(Vec<ScalarValue>, usize)>,
+    previous_row_values: Option<Vec<ScalarValue>>,
+    reached_end: bool,
+    window_frame_end_idx: u64,
+    window_frame_start_idx: u64,
+}
+
+impl WindowFrameStateGroups {
+    /// This function calculates beginning/ending indices for the frame of the current row.
+    fn calculate_range(
+        &mut self,
+        window_frame: &Arc<WindowFrame>,
+        range_columns: &[ArrayRef],
+        _sort_options: &[SortOptions],
+        length: usize,
+        idx: usize,
+    ) -> Result<(usize, usize)> {
+        if range_columns.is_empty() {
+            return Err(DataFusionError::Execution(
+                "GROUPS mode requires an ORDER BY clause".to_string(),
+            ));
+        }
+        let start = match window_frame.start_bound {
+            // UNBOUNDED PRECEDING
+            WindowFrameBound::Preceding(ScalarValue::UInt64(None)) => 0,
+            WindowFrameBound::Preceding(ScalarValue::UInt64(Some(n))) => self
+                .calculate_index_of_group::<true, true>(range_columns, idx, n, length)?,
+            WindowFrameBound::CurrentRow => self.calculate_index_of_group::<true, true>(
+                range_columns,
+                idx,
+                0,
+                length,
+            )?,
+            WindowFrameBound::Following(ScalarValue::UInt64(Some(n))) => self
+                .calculate_index_of_group::<true, false>(range_columns, idx, n, length)?,
+            // UNBOUNDED FOLLOWING
+            WindowFrameBound::Following(ScalarValue::UInt64(None)) => {
+                return Err(DataFusionError::Internal(format!(
+                    "Frame start cannot be UNBOUNDED FOLLOWING '{:?}'",
+                    window_frame
+                )))
+            }
+            // ERRONEOUS FRAMES
+            WindowFrameBound::Preceding(_) | WindowFrameBound::Following(_) => {
+                return Err(DataFusionError::Internal(
+                    "Groups should be Uint".to_string(),
+                ))
+            }
+        };
+        let end = match window_frame.end_bound {
+            // UNBOUNDED PRECEDING
+            WindowFrameBound::Preceding(ScalarValue::UInt64(None)) => {
+                return Err(DataFusionError::Internal(format!(
+                    "Frame end cannot be UNBOUNDED PRECEDING '{:?}'",
+                    window_frame
+                )))
+            }
+            WindowFrameBound::Preceding(ScalarValue::UInt64(Some(n))) => self
+                .calculate_index_of_group::<false, true>(range_columns, idx, n, length)?,
+            WindowFrameBound::CurrentRow => self
+                .calculate_index_of_group::<false, false>(
+                    range_columns,
+                    idx,
+                    0,
+                    length,
+                )?,
+            WindowFrameBound::Following(ScalarValue::UInt64(Some(n))) => self
+                .calculate_index_of_group::<false, false>(
+                    range_columns,
+                    idx,
+                    n,
+                    length,
+                )?,
+            // UNBOUNDED FOLLOWING
+            WindowFrameBound::Following(ScalarValue::UInt64(None)) => length,
+            // ERRONEOUS FRAMES
+            WindowFrameBound::Preceding(_) | WindowFrameBound::Following(_) => {
+                return Err(DataFusionError::Internal(
+                    "Groups should be Uint".to_string(),
+                ))
+            }
+        };
+        Ok((start, end))
+    }
+
+    /// This function does the heavy lifting when finding group boundaries. It is meant to be
+    /// called twice, in succession, to get window frame start and end indices (with `BISECT_SIDE`
+    /// supplied as false and true, respectively).
+    fn calculate_index_of_group<const BISECT_SIDE: bool, const SEARCH_SIDE: bool>(
+        &mut self,
+        range_columns: &[ArrayRef],
+        idx: usize,
+        delta: u64,
+        length: usize,
+    ) -> Result<usize> {
+        let current_row_values = range_columns
+            .iter()
+            .map(|col| ScalarValue::try_from_array(col, idx))
+            .collect::<Result<Vec<ScalarValue>>>()?;
+
+        if BISECT_SIDE {
+            // When we call this function to get the window frame start index, it tries to initialize
+            // the internal grouping state if this is not already done before. This initialization takes
+            // place only when the window frame start index is greater than or equal to zero. In this
+            // case, the current row is stored in group_start_indices, with row values as the group
+            // identifier and row index as the start index of the group.
+            if !self.initialized() {
+                self.initialize::<SEARCH_SIDE>(delta, range_columns)?;
+            }
+        } else if !self.reached_end {
+            // When we call this function to get the window frame end index, it extends the window
+            // frame one by one until the current row's window frame end index is reached by finding
+            // the next group.
+            self.extend_window_frame_if_necessary::<SEARCH_SIDE>(range_columns, delta)?;
+        }
+        // We keep track of previous row values, so that a group change can be identified.
+        // If there is a group change, the window frame is advanced and shifted by one group.
+        let group_change = match &self.previous_row_values {
+            None => false,
+            Some(values) => &current_row_values != values,
+        };
+        if self.previous_row_values.is_none() || group_change {
+            self.previous_row_values = Some(current_row_values);
+        }
+        if group_change {
+            self.current_group_idx += 1;
+            self.advance_one_group::<SEARCH_SIDE>(range_columns)?;
+            self.shift_one_group::<SEARCH_SIDE>(delta);
+        }
+        Ok(if self.group_start_indices.is_empty() {
+            if self.reached_end {
+                length
+            } else {
+                0
+            }
+        } else if BISECT_SIDE {
+            match self.group_start_indices.get(0) {
+                Some(&(_, idx)) => idx,
+                None => 0,
+            }
+        } else {
+            match (self.reached_end, self.group_start_indices.back()) {
+                (false, Some(&(_, idx))) => idx,
+                _ => length,
+            }
+        })
+    }
+
+    fn extend_window_frame_if_necessary<const SEARCH_SIDE: bool>(
+        &mut self,
+        range_columns: &[ArrayRef],
+        delta: u64,
+    ) -> Result<()> {
+        let current_window_frame_end_idx = if !SEARCH_SIDE {
+            self.current_group_idx + delta + 1
+        } else if self.current_group_idx >= delta {
+            self.current_group_idx - delta + 1
+        } else {
+            0
+        };
+        if current_window_frame_end_idx == 0 {
+            // the end index of the window frame is still before the first index
+            return Ok(());
+        }
+        if self.group_start_indices.is_empty() {
+            self.initialize_window_frame_start(range_columns)?;
+        }
+        while !self.reached_end
+            && self.window_frame_end_idx <= current_window_frame_end_idx
+        {
+            self.advance_one_group::<SEARCH_SIDE>(range_columns)?;
+        }
+        Ok(())
+    }
+
+    fn initialize<const SEARCH_SIDE: bool>(
+        &mut self,
+        delta: u64,
+        range_columns: &[ArrayRef],
+    ) -> Result<()> {
+        if !SEARCH_SIDE {
+            self.window_frame_start_idx = self.current_group_idx + delta;
+            self.initialize_window_frame_start(range_columns)
+        } else if self.current_group_idx >= delta {
+            self.window_frame_start_idx = self.current_group_idx - delta;
+            self.initialize_window_frame_start(range_columns)
+        } else {
+            Ok(())
+        }
+    }
+
+    fn initialize_window_frame_start(
+        &mut self,
+        range_columns: &[ArrayRef],
+    ) -> Result<()> {
+        let mut group_values = range_columns
+            .iter()
+            .map(|col| ScalarValue::try_from_array(col, 0))
+            .collect::<Result<Vec<ScalarValue>>>()?;
+        let mut start_idx: usize = 0;
+        for _ in 0..self.window_frame_start_idx {
+            let next_group_and_start_index =
+                WindowFrameStateGroups::find_next_group_and_start_index(
+                    range_columns,
+                    &group_values,
+                    start_idx,
+                )?;
+            if let Some(entry) = next_group_and_start_index {
+                (group_values, start_idx) = entry;
+            } else {
+                // not enough groups to generate a window frame
+                self.window_frame_end_idx = self.window_frame_start_idx;
+                self.reached_end = true;
+                return Ok(());
+            }
+        }
+        self.group_start_indices
+            .push_back((group_values, start_idx));
+        self.window_frame_end_idx = self.window_frame_start_idx + 1;
+        Ok(())
+    }
+
+    fn initialized(&self) -> bool {
+        self.reached_end || !self.group_start_indices.is_empty()
+    }
+
+    /// This function advances the window frame by one group.
+    fn advance_one_group<const SEARCH_SIDE: bool>(
+        &mut self,
+        range_columns: &[ArrayRef],
+    ) -> Result<()> {
+        let last_group_values = self.group_start_indices.back();
+        let last_group_values = if let Some(values) = last_group_values {
+            values
+        } else {
+            return Ok(());
+        };
+        let next_group_and_start_index =
+            WindowFrameStateGroups::find_next_group_and_start_index(
+                range_columns,
+                &last_group_values.0,
+                last_group_values.1,
+            )?;
+        if let Some(entry) = next_group_and_start_index {
+            self.group_start_indices.push_back(entry);
+            self.window_frame_end_idx += 1;
+        } else {
+            // not enough groups to proceed
+            self.reached_end = true;
+        }
+        Ok(())
+    }
+
+    /// This function drops the oldest group from the window frame.
+    fn shift_one_group<const SEARCH_SIDE: bool>(&mut self, delta: u64) {
+        let current_window_frame_start_idx = if !SEARCH_SIDE {
+            self.current_group_idx + delta
+        } else if self.current_group_idx >= delta {
+            self.current_group_idx - delta
+        } else {
+            0
+        };
+        if current_window_frame_start_idx > self.window_frame_start_idx {
+            self.group_start_indices.pop_front();
+            self.window_frame_start_idx += 1;
+        }
+    }
+
+    /// This function finds the next group and its start index for a given group and start index.
+    /// It utilizes an exponentially growing step size to find the group boundary.
+    // TODO: For small group sizes, proceeding one-by-one to find the group change can be more efficient.
+    // Statistics about previous group sizes can be used to choose one-by-one vs. exponentially growing,
+    // or even to set the base step_size when exponentially growing. We can also create a benchmark
+    // implementation to get insights about the crossover point.
+    fn find_next_group_and_start_index(
+        range_columns: &[ArrayRef],
+        current_row_values: &[ScalarValue],
+        idx: usize,
+    ) -> Result<Option<(Vec<ScalarValue>, usize)>> {
+        let mut step_size: usize = 1;
+        let data_size: usize = range_columns
+            .get(0)
+            .ok_or_else(|| {
+                DataFusionError::Internal("Column array shouldn't be empty".to_string())
+            })?
+            .len();
+        let mut low = idx;
+        let mut high = idx + step_size;
+        while high < data_size {
+            let val = range_columns
+                .iter()
+                .map(|arr| ScalarValue::try_from_array(arr, high))
+                .collect::<Result<Vec<ScalarValue>>>()?;
+            if val == current_row_values {
+                low = high;
+                step_size *= 2;
+                high += step_size;
+            } else {
+                break;
+            }
+        }
+        low = find_bisect_point(
+            range_columns,
+            current_row_values,
+            |current, to_compare| Ok(current == to_compare),
+            low,
+            min(high, data_size),
+        )?;
+        if low == data_size {
+            return Ok(None);
+        }
+        let val = range_columns
+            .iter()
+            .map(|arr| ScalarValue::try_from_array(arr, low))
+            .collect::<Result<Vec<ScalarValue>>>()?;
+        Ok(Some((val, low)))
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use arrow::array::Float64Array;
+    use datafusion_common::ScalarValue;
+    use std::sync::Arc;
+
+    use crate::from_slice::FromSlice;
+
+    use super::*;
+
+    #[test]
+    fn test_find_next_group_and_start_index() {
+        const NUM_ROWS: usize = 6;
+        const NEXT_INDICES: [usize; 5] = [1, 2, 4, 4, 5];
+
+        let arrays: Vec<ArrayRef> = vec![
+            Arc::new(Float64Array::from_slice([5.0, 7.0, 8.0, 8., 9., 10.])),
+            Arc::new(Float64Array::from_slice([2.0, 3.0, 3.0, 3., 4.0, 5.0])),
+            Arc::new(Float64Array::from_slice([5.0, 7.0, 8.0, 8., 10., 11.0])),
+            Arc::new(Float64Array::from_slice([15.0, 13.0, 8.0, 8., 5., 0.0])),
+        ];
+        for (current_idx, next_idx) in NEXT_INDICES.iter().enumerate() {
+            let current_row_values = arrays
+                .iter()
+                .map(|col| ScalarValue::try_from_array(col, current_idx))
+                .collect::<Result<Vec<ScalarValue>>>()
+                .unwrap();
+            let next_row_values = arrays
+                .iter()
+                .map(|col| ScalarValue::try_from_array(col, *next_idx))
+                .collect::<Result<Vec<ScalarValue>>>()
+                .unwrap();
+            let res = WindowFrameStateGroups::find_next_group_and_start_index(
+                &arrays,
+                &current_row_values,
+                current_idx,
+            )
+            .unwrap();
+            assert_eq!(res, Some((next_row_values, *next_idx)));
+        }
+        let current_idx = NUM_ROWS - 1;
+        let current_row_values = arrays
+            .iter()
+            .map(|col| ScalarValue::try_from_array(col, current_idx))
+            .collect::<Result<Vec<ScalarValue>>>()
+            .unwrap();
+        let res = WindowFrameStateGroups::find_next_group_and_start_index(
+            &arrays,
+            &current_row_values,
+            current_idx,
+        )
+        .unwrap();
+        assert_eq!(res, None);
+    }
+
+    #[test]
+    fn test_window_frame_groups_preceding_huge_delta() {

Review Comment:
   Why is a delta of 10 "huge"? Is it because the delta is larger than the size of the arrays?



##########
datafusion/physical-expr/src/window/window_frame_state.rs:
##########
@@ -0,0 +1,972 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This module provides utilities for window frame index calculations depending on the window frame mode:
+//! RANGE, ROWS, GROUPS.
+
+use arrow::array::ArrayRef;
+use arrow::compute::kernels::sort::SortOptions;
+use datafusion_common::bisect::{bisect, find_bisect_point};
+use datafusion_common::{DataFusionError, Result, ScalarValue};
+use datafusion_expr::{WindowFrame, WindowFrameBound, WindowFrameUnits};
+use std::cmp::min;
+use std::collections::VecDeque;
+use std::fmt::Debug;
+use std::sync::Arc;
+
+/// This object stores the window frame state for use in incremental calculations.
+#[derive(Debug)]
+pub enum WindowFrameContext<'a> {
+    // ROWS-frames are inherently stateless:
+    Rows(&'a Arc<WindowFrame>),
+    // RANGE-frames will soon have a stateful implementation that is more efficient than a stateless one:
+    Range {
+        window_frame: &'a Arc<WindowFrame>,
+        state: WindowFrameStateRange,
+    },
+    // GROUPS-frames have a stateful implementation that is more efficient than a stateless one:
+    Groups {
+        window_frame: &'a Arc<WindowFrame>,
+        state: WindowFrameStateGroups,
+    },
+    Default,
+}
+
+impl<'a> WindowFrameContext<'a> {
+    /// Create a new default state for the given window frame.
+    pub fn new(window_frame: &'a Option<Arc<WindowFrame>>) -> Self {
+        if let Some(window_frame) = window_frame {
+            match window_frame.units {
+                WindowFrameUnits::Rows => WindowFrameContext::Rows(window_frame),
+                WindowFrameUnits::Range => WindowFrameContext::Range {
+                    window_frame,
+                    state: WindowFrameStateRange::default(),
+                },
+                WindowFrameUnits::Groups => WindowFrameContext::Groups {
+                    window_frame,
+                    state: WindowFrameStateGroups::default(),
+                },
+            }
+        } else {
+            WindowFrameContext::Default
+        }
+    }
+
+    /// This function calculates beginning/ending indices for the frame of the current row.
+    pub fn calculate_range(
+        &mut self,
+        range_columns: &[ArrayRef],
+        sort_options: &[SortOptions],
+        length: usize,
+        idx: usize,
+    ) -> Result<(usize, usize)> {
+        match *self {
+            WindowFrameContext::Rows(window_frame) => {
+                Self::calculate_range_rows(window_frame, length, idx)
+            }
+            WindowFrameContext::Range {
+                window_frame,
+                ref mut state,
+            } => state.calculate_range(
+                window_frame,
+                range_columns,
+                sort_options,
+                length,
+                idx,
+            ),
+            WindowFrameContext::Groups {
+                window_frame,
+                ref mut state,
+            } => state.calculate_range(
+                window_frame,
+                range_columns,
+                sort_options,
+                length,
+                idx,
+            ),
+            WindowFrameContext::Default => Ok((0, length)),
+        }
+    }
+
+    /// This function calculates beginning/ending indices for the frame of the current row.
+    fn calculate_range_rows(
+        window_frame: &Arc<WindowFrame>,
+        length: usize,
+        idx: usize,
+    ) -> Result<(usize, usize)> {
+        let start = match window_frame.start_bound {
+            // UNBOUNDED PRECEDING
+            WindowFrameBound::Preceding(ScalarValue::UInt64(None)) => 0,
+            WindowFrameBound::Preceding(ScalarValue::UInt64(Some(n))) => {
+                if idx >= n as usize {
+                    idx - n as usize
+                } else {
+                    0
+                }
+            }
+            WindowFrameBound::CurrentRow => idx,
+            // UNBOUNDED FOLLOWING
+            WindowFrameBound::Following(ScalarValue::UInt64(None)) => {
+                return Err(DataFusionError::Internal(format!(
+                    "Frame start cannot be UNBOUNDED FOLLOWING '{:?}'",
+                    window_frame
+                )))
+            }
+            WindowFrameBound::Following(ScalarValue::UInt64(Some(n))) => {
+                min(idx + n as usize, length)
+            }
+            // ERRONEOUS FRAMES
+            WindowFrameBound::Preceding(_) | WindowFrameBound::Following(_) => {
+                return Err(DataFusionError::Internal("Rows should be Uint".to_string()))
+            }
+        };
+        let end = match window_frame.end_bound {
+            // UNBOUNDED PRECEDING
+            WindowFrameBound::Preceding(ScalarValue::UInt64(None)) => {
+                return Err(DataFusionError::Internal(format!(
+                    "Frame end cannot be UNBOUNDED PRECEDING '{:?}'",
+                    window_frame
+                )))
+            }
+            WindowFrameBound::Preceding(ScalarValue::UInt64(Some(n))) => {
+                if idx >= n as usize {
+                    idx - n as usize + 1
+                } else {
+                    0
+                }
+            }
+            WindowFrameBound::CurrentRow => idx + 1,
+            // UNBOUNDED FOLLOWING
+            WindowFrameBound::Following(ScalarValue::UInt64(None)) => length,
+            WindowFrameBound::Following(ScalarValue::UInt64(Some(n))) => {
+                min(idx + n as usize + 1, length)
+            }
+            // ERRONEOUS FRAMES
+            WindowFrameBound::Preceding(_) | WindowFrameBound::Following(_) => {
+                return Err(DataFusionError::Internal("Rows should be Uint".to_string()))
+            }
+        };
+        Ok((start, end))
+    }
+}
+
+/// This structure encapsulates all the state information we require as we
+/// scan ranges of data while processing window frames. Currently we calculate
+/// things from scratch every time, but we will make this incremental in the future.
+#[derive(Debug, Default)]
+pub struct WindowFrameStateRange {}
+
+impl WindowFrameStateRange {
+    /// This function calculates beginning/ending indices for the frame of the current row.
+    fn calculate_range(
+        &mut self,
+        window_frame: &Arc<WindowFrame>,
+        range_columns: &[ArrayRef],
+        sort_options: &[SortOptions],
+        length: usize,
+        idx: usize,
+    ) -> Result<(usize, usize)> {
+        let start = match window_frame.start_bound {
+            WindowFrameBound::Preceding(ref n) => {
+                if n.is_null() {
+                    // UNBOUNDED PRECEDING
+                    0
+                } else {
+                    self.calculate_index_of_row::<true, true>(
+                        range_columns,
+                        sort_options,
+                        idx,
+                        Some(n),
+                    )?
+                }
+            }
+            WindowFrameBound::CurrentRow => {
+                if range_columns.is_empty() {
+                    0
+                } else {
+                    self.calculate_index_of_row::<true, true>(
+                        range_columns,
+                        sort_options,
+                        idx,
+                        None,
+                    )?
+                }
+            }
+            WindowFrameBound::Following(ref n) => self
+                .calculate_index_of_row::<true, false>(
+                    range_columns,
+                    sort_options,
+                    idx,
+                    Some(n),
+                )?,
+        };
+        let end = match window_frame.end_bound {
+            WindowFrameBound::Preceding(ref n) => self
+                .calculate_index_of_row::<false, true>(
+                    range_columns,
+                    sort_options,
+                    idx,
+                    Some(n),
+                )?,
+            WindowFrameBound::CurrentRow => {
+                if range_columns.is_empty() {
+                    length
+                } else {
+                    self.calculate_index_of_row::<false, false>(
+                        range_columns,
+                        sort_options,
+                        idx,
+                        None,
+                    )?
+                }
+            }
+            WindowFrameBound::Following(ref n) => {
+                if n.is_null() {
+                    // UNBOUNDED FOLLOWING
+                    length
+                } else {
+                    self.calculate_index_of_row::<false, false>(
+                        range_columns,
+                        sort_options,
+                        idx,
+                        Some(n),
+                    )?
+                }
+            }
+        };
+        Ok((start, end))
+    }
+
+    /// This function does the heavy lifting when finding range boundaries. It is meant to be
+    /// called twice, in succession, to get window frame start and end indices (with `BISECT_SIDE`
+    /// supplied as false and true, respectively).
+    fn calculate_index_of_row<const BISECT_SIDE: bool, const SEARCH_SIDE: bool>(
+        &mut self,
+        range_columns: &[ArrayRef],
+        sort_options: &[SortOptions],
+        idx: usize,
+        delta: Option<&ScalarValue>,
+    ) -> Result<usize> {
+        let current_row_values = range_columns
+            .iter()
+            .map(|col| ScalarValue::try_from_array(col, idx))
+            .collect::<Result<Vec<ScalarValue>>>()?;
+        let end_range = if let Some(delta) = delta {
+            let is_descending: bool = sort_options
+                .first()
+                .ok_or_else(|| DataFusionError::Internal("Array is empty".to_string()))?
+                .descending;
+
+            current_row_values
+                .iter()
+                .map(|value| {
+                    if value.is_null() {
+                        return Ok(value.clone());
+                    }
+                    if SEARCH_SIDE == is_descending {
+                        // TODO: Handle positive overflows
+                        value.add(delta)
+                    } else if value.is_unsigned() && value < delta {
+                        // NOTE: This gets a polymorphic zero without having long coercion code for ScalarValue.
+                        //       If we decide to implement a "default" construction mechanism for ScalarValue,
+                        //       change the following statement to use that.
+                        value.sub(value)
+                    } else {
+                        // TODO: Handle negative overflows
+                        value.sub(delta)
+                    }
+                })
+                .collect::<Result<Vec<ScalarValue>>>()?
+        } else {
+            current_row_values
+        };
+        // `BISECT_SIDE` true means bisect_left, false means bisect_right
+        bisect::<BISECT_SIDE>(range_columns, &end_range, sort_options)
+    }
+}
+
+// In GROUPS mode, rows with duplicate sorting values are grouped together.
+// Therefore, there must be an ORDER BY clause in the window definition to use GROUPS mode.
+// The syntax is as follows:
+//     GROUPS frame_start [ frame_exclusion ]
+//     GROUPS BETWEEN frame_start AND frame_end [ frame_exclusion ]
+// The optional frame_exclusion specifier is not yet supported.
+// The frame_start and frame_end parameters allow us to specify which rows the window
+// frame starts and ends with. They accept the following values:
+//    - UNBOUNDED PRECEDING: Start with the first row of the partition. Possible only in frame_start.
+//    - offset PRECEDING: When used in frame_start, it refers to the first row of the group
+//                        that comes "offset" groups before the current group (i.e. the group
+//                        containing the current row). When used in frame_end, it refers to the
+//                        last row of the group that comes "offset" groups before the current group.
+//    - CURRENT ROW: When used in frame_start, it refers to the first row of the group containing
+//                   the current row. When used in frame_end, it refers to the last row of the group
+//                   containing the current row.
+//    - offset FOLLOWING: When used in frame_start, it refers to the first row of the group
+//                        that comes "offset" groups after the current group (i.e. the group
+//                        containing the current row). When used in frame_end, it refers to the
+//                        last row of the group that comes "offset" groups after the current group.
+//    - UNBOUNDED FOLLOWING: End with the last row of the partition. Possible only in frame_end.
+
+// This structure encapsulates all the state information we require as we
+// scan groups of data while processing window frames.
+#[derive(Debug, Default)]
+pub struct WindowFrameStateGroups {
+    current_group_idx: u64,
+    group_start_indices: VecDeque<(Vec<ScalarValue>, usize)>,
+    previous_row_values: Option<Vec<ScalarValue>>,
+    reached_end: bool,
+    window_frame_end_idx: u64,
+    window_frame_start_idx: u64,
+}
+
+impl WindowFrameStateGroups {
+    /// This function calculates beginning/ending indices for the frame of the current row.
+    fn calculate_range(
+        &mut self,
+        window_frame: &Arc<WindowFrame>,
+        range_columns: &[ArrayRef],
+        _sort_options: &[SortOptions],
+        length: usize,
+        idx: usize,
+    ) -> Result<(usize, usize)> {
+        if range_columns.is_empty() {
+            return Err(DataFusionError::Execution(
+                "GROUPS mode requires an ORDER BY clause".to_string(),
+            ));
+        }
+        let start = match window_frame.start_bound {
+            // UNBOUNDED PRECEDING
+            WindowFrameBound::Preceding(ScalarValue::UInt64(None)) => 0,
+            WindowFrameBound::Preceding(ScalarValue::UInt64(Some(n))) => self
+                .calculate_index_of_group::<true, true>(range_columns, idx, n, length)?,
+            WindowFrameBound::CurrentRow => self.calculate_index_of_group::<true, true>(
+                range_columns,
+                idx,
+                0,
+                length,
+            )?,
+            WindowFrameBound::Following(ScalarValue::UInt64(Some(n))) => self
+                .calculate_index_of_group::<true, false>(range_columns, idx, n, length)?,
+            // UNBOUNDED FOLLOWING
+            WindowFrameBound::Following(ScalarValue::UInt64(None)) => {
+                return Err(DataFusionError::Internal(format!(
+                    "Frame start cannot be UNBOUNDED FOLLOWING '{:?}'",
+                    window_frame
+                )))
+            }
+            // ERRONEOUS FRAMES
+            WindowFrameBound::Preceding(_) | WindowFrameBound::Following(_) => {
+                return Err(DataFusionError::Internal(
+                    "Groups should be Uint".to_string(),
+                ))
+            }
+        };
+        let end = match window_frame.end_bound {
+            // UNBOUNDED PRECEDING
+            WindowFrameBound::Preceding(ScalarValue::UInt64(None)) => {
+                return Err(DataFusionError::Internal(format!(
+                    "Frame end cannot be UNBOUNDED PRECEDING '{:?}'",
+                    window_frame
+                )))
+            }
+            WindowFrameBound::Preceding(ScalarValue::UInt64(Some(n))) => self
+                .calculate_index_of_group::<false, true>(range_columns, idx, n, length)?,
+            WindowFrameBound::CurrentRow => self
+                .calculate_index_of_group::<false, false>(
+                    range_columns,
+                    idx,
+                    0,
+                    length,
+                )?,
+            WindowFrameBound::Following(ScalarValue::UInt64(Some(n))) => self
+                .calculate_index_of_group::<false, false>(
+                    range_columns,
+                    idx,
+                    n,
+                    length,
+                )?,
+            // UNBOUNDED FOLLOWING
+            WindowFrameBound::Following(ScalarValue::UInt64(None)) => length,
+            // ERRONEOUS FRAMES
+            WindowFrameBound::Preceding(_) | WindowFrameBound::Following(_) => {
+                return Err(DataFusionError::Internal(
+                    "Groups should be Uint".to_string(),
+                ))
+            }
+        };
+        Ok((start, end))
+    }
+
+    /// This function does the heavy lifting when finding group boundaries. It is meant to be
+    /// called twice, in succession, to get window frame start and end indices (with `BISECT_SIDE`
+    /// supplied as false and true, respectively).
+    fn calculate_index_of_group<const BISECT_SIDE: bool, const SEARCH_SIDE: bool>(
+        &mut self,
+        range_columns: &[ArrayRef],
+        idx: usize,
+        delta: u64,
+        length: usize,
+    ) -> Result<usize> {
+        let current_row_values = range_columns
+            .iter()
+            .map(|col| ScalarValue::try_from_array(col, idx))
+            .collect::<Result<Vec<ScalarValue>>>()?;
+
+        if BISECT_SIDE {
+            // When we call this function to get the window frame start index, it tries to initialize
+            // the internal grouping state if this is not already done before. This initialization takes
+            // place only when the window frame start index is greater than or equal to zero. In this
+            // case, the current row is stored in group_start_indices, with row values as the group
+            // identifier and row index as the start index of the group.
+            if !self.initialized() {
+                self.initialize::<SEARCH_SIDE>(delta, range_columns)?;
+            }
+        } else if !self.reached_end {
+            // When we call this function to get the window frame end index, it extends the window
+            // frame one by one until the current row's window frame end index is reached by finding
+            // the next group.
+            self.extend_window_frame_if_necessary::<SEARCH_SIDE>(range_columns, delta)?;
+        }
+        // We keep track of previous row values, so that a group change can be identified.
+        // If there is a group change, the window frame is advanced and shifted by one group.
+        let group_change = match &self.previous_row_values {
+            None => false,
+            Some(values) => &current_row_values != values,
+        };
+        if self.previous_row_values.is_none() || group_change {
+            self.previous_row_values = Some(current_row_values);
+        }
+        if group_change {
+            self.current_group_idx += 1;
+            self.advance_one_group::<SEARCH_SIDE>(range_columns)?;
+            self.shift_one_group::<SEARCH_SIDE>(delta);
+        }
+        Ok(if self.group_start_indices.is_empty() {
+            if self.reached_end {
+                length
+            } else {
+                0
+            }
+        } else if BISECT_SIDE {
+            match self.group_start_indices.get(0) {
+                Some(&(_, idx)) => idx,
+                None => 0,
+            }
+        } else {
+            match (self.reached_end, self.group_start_indices.back()) {
+                (false, Some(&(_, idx))) => idx,
+                _ => length,
+            }
+        })
+    }
+
+    fn extend_window_frame_if_necessary<const SEARCH_SIDE: bool>(
+        &mut self,
+        range_columns: &[ArrayRef],
+        delta: u64,
+    ) -> Result<()> {
+        let current_window_frame_end_idx = if !SEARCH_SIDE {
+            self.current_group_idx + delta + 1
+        } else if self.current_group_idx >= delta {
+            self.current_group_idx - delta + 1
+        } else {
+            0
+        };
+        if current_window_frame_end_idx == 0 {
+            // the end index of the window frame is still before the first index
+            return Ok(());
+        }
+        if self.group_start_indices.is_empty() {
+            self.initialize_window_frame_start(range_columns)?;
+        }
+        while !self.reached_end
+            && self.window_frame_end_idx <= current_window_frame_end_idx
+        {
+            self.advance_one_group::<SEARCH_SIDE>(range_columns)?;
+        }
+        Ok(())
+    }
+
+    fn initialize<const SEARCH_SIDE: bool>(
+        &mut self,
+        delta: u64,
+        range_columns: &[ArrayRef],
+    ) -> Result<()> {
+        if !SEARCH_SIDE {
+            self.window_frame_start_idx = self.current_group_idx + delta;
+            self.initialize_window_frame_start(range_columns)
+        } else if self.current_group_idx >= delta {
+            self.window_frame_start_idx = self.current_group_idx - delta;
+            self.initialize_window_frame_start(range_columns)
+        } else {
+            Ok(())
+        }
+    }
+
+    fn initialize_window_frame_start(
+        &mut self,
+        range_columns: &[ArrayRef],
+    ) -> Result<()> {
+        let mut group_values = range_columns
+            .iter()
+            .map(|col| ScalarValue::try_from_array(col, 0))
+            .collect::<Result<Vec<ScalarValue>>>()?;
+        let mut start_idx: usize = 0;
+        for _ in 0..self.window_frame_start_idx {
+            let next_group_and_start_index =
+                WindowFrameStateGroups::find_next_group_and_start_index(
+                    range_columns,
+                    &group_values,
+                    start_idx,
+                )?;
+            if let Some(entry) = next_group_and_start_index {
+                (group_values, start_idx) = entry;
+            } else {
+                // not enough groups to generate a window frame
+                self.window_frame_end_idx = self.window_frame_start_idx;
+                self.reached_end = true;
+                return Ok(());
+            }
+        }
+        self.group_start_indices
+            .push_back((group_values, start_idx));
+        self.window_frame_end_idx = self.window_frame_start_idx + 1;
+        Ok(())
+    }
+
+    fn initialized(&self) -> bool {
+        self.reached_end || !self.group_start_indices.is_empty()
+    }
+
+    /// This function advances the window frame by one group.
+    fn advance_one_group<const SEARCH_SIDE: bool>(
+        &mut self,
+        range_columns: &[ArrayRef],
+    ) -> Result<()> {
+        let last_group_values = self.group_start_indices.back();
+        let last_group_values = if let Some(values) = last_group_values {
+            values
+        } else {
+            return Ok(());
+        };
+        let next_group_and_start_index =
+            WindowFrameStateGroups::find_next_group_and_start_index(
+                range_columns,
+                &last_group_values.0,
+                last_group_values.1,
+            )?;
+        if let Some(entry) = next_group_and_start_index {
+            self.group_start_indices.push_back(entry);
+            self.window_frame_end_idx += 1;
+        } else {
+            // not enough groups to proceed
+            self.reached_end = true;
+        }
+        Ok(())
+    }
+
+    /// This function drops the oldest group from the window frame.
+    fn shift_one_group<const SEARCH_SIDE: bool>(&mut self, delta: u64) {
+        let current_window_frame_start_idx = if !SEARCH_SIDE {
+            self.current_group_idx + delta
+        } else if self.current_group_idx >= delta {
+            self.current_group_idx - delta
+        } else {
+            0
+        };
+        if current_window_frame_start_idx > self.window_frame_start_idx {
+            self.group_start_indices.pop_front();
+            self.window_frame_start_idx += 1;
+        }
+    }
+
+    /// This function finds the next group and its start index for a given group and start index.
+    /// It utilizes an exponentially growing step size to find the group boundary.
+    // TODO: For small group sizes, proceeding one-by-one to find the group change can be more efficient.
+    // Statistics about previous group sizes can be used to choose one-by-one vs. exponentially growing,

Review Comment:
   I agree there are likely many ways to improve the performance of this algorithm -- however, I think the first thing to do is getting a working implementation that is well tested and structured (as this PR is) and as you say we can drive additional optimizations from there. 



##########
datafusion/physical-expr/src/window/window_frame_state.rs:
##########
@@ -0,0 +1,972 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This module provides utilities for window frame index calculations depending on the window frame mode:
+//! RANGE, ROWS, GROUPS.
+
+use arrow::array::ArrayRef;
+use arrow::compute::kernels::sort::SortOptions;
+use datafusion_common::bisect::{bisect, find_bisect_point};
+use datafusion_common::{DataFusionError, Result, ScalarValue};
+use datafusion_expr::{WindowFrame, WindowFrameBound, WindowFrameUnits};
+use std::cmp::min;
+use std::collections::VecDeque;
+use std::fmt::Debug;
+use std::sync::Arc;
+
+/// This object stores the window frame state for use in incremental calculations.
+#[derive(Debug)]
+pub enum WindowFrameContext<'a> {
+    // ROWS-frames are inherently stateless:
+    Rows(&'a Arc<WindowFrame>),
+    // RANGE-frames will soon have a stateful implementation that is more efficient than a stateless one:
+    Range {
+        window_frame: &'a Arc<WindowFrame>,
+        state: WindowFrameStateRange,
+    },
+    // GROUPS-frames have a stateful implementation that is more efficient than a stateless one:
+    Groups {
+        window_frame: &'a Arc<WindowFrame>,
+        state: WindowFrameStateGroups,
+    },
+    Default,
+}
+
+impl<'a> WindowFrameContext<'a> {
+    /// Create a new default state for the given window frame.
+    pub fn new(window_frame: &'a Option<Arc<WindowFrame>>) -> Self {
+        if let Some(window_frame) = window_frame {
+            match window_frame.units {
+                WindowFrameUnits::Rows => WindowFrameContext::Rows(window_frame),
+                WindowFrameUnits::Range => WindowFrameContext::Range {
+                    window_frame,
+                    state: WindowFrameStateRange::default(),
+                },
+                WindowFrameUnits::Groups => WindowFrameContext::Groups {
+                    window_frame,
+                    state: WindowFrameStateGroups::default(),
+                },
+            }
+        } else {
+            WindowFrameContext::Default
+        }
+    }
+
+    /// This function calculates beginning/ending indices for the frame of the current row.
+    pub fn calculate_range(
+        &mut self,
+        range_columns: &[ArrayRef],
+        sort_options: &[SortOptions],
+        length: usize,
+        idx: usize,
+    ) -> Result<(usize, usize)> {
+        match *self {
+            WindowFrameContext::Rows(window_frame) => {
+                Self::calculate_range_rows(window_frame, length, idx)
+            }
+            WindowFrameContext::Range {
+                window_frame,
+                ref mut state,
+            } => state.calculate_range(
+                window_frame,
+                range_columns,
+                sort_options,
+                length,
+                idx,
+            ),
+            WindowFrameContext::Groups {
+                window_frame,
+                ref mut state,
+            } => state.calculate_range(
+                window_frame,
+                range_columns,
+                sort_options,
+                length,
+                idx,
+            ),
+            WindowFrameContext::Default => Ok((0, length)),
+        }
+    }
+
+    /// This function calculates beginning/ending indices for the frame of the current row.
+    fn calculate_range_rows(
+        window_frame: &Arc<WindowFrame>,
+        length: usize,
+        idx: usize,
+    ) -> Result<(usize, usize)> {
+        let start = match window_frame.start_bound {
+            // UNBOUNDED PRECEDING
+            WindowFrameBound::Preceding(ScalarValue::UInt64(None)) => 0,
+            WindowFrameBound::Preceding(ScalarValue::UInt64(Some(n))) => {
+                if idx >= n as usize {
+                    idx - n as usize
+                } else {
+                    0
+                }
+            }
+            WindowFrameBound::CurrentRow => idx,
+            // UNBOUNDED FOLLOWING
+            WindowFrameBound::Following(ScalarValue::UInt64(None)) => {
+                return Err(DataFusionError::Internal(format!(
+                    "Frame start cannot be UNBOUNDED FOLLOWING '{:?}'",
+                    window_frame
+                )))
+            }
+            WindowFrameBound::Following(ScalarValue::UInt64(Some(n))) => {
+                min(idx + n as usize, length)
+            }
+            // ERRONEOUS FRAMES
+            WindowFrameBound::Preceding(_) | WindowFrameBound::Following(_) => {
+                return Err(DataFusionError::Internal("Rows should be Uint".to_string()))
+            }
+        };
+        let end = match window_frame.end_bound {
+            // UNBOUNDED PRECEDING
+            WindowFrameBound::Preceding(ScalarValue::UInt64(None)) => {
+                return Err(DataFusionError::Internal(format!(
+                    "Frame end cannot be UNBOUNDED PRECEDING '{:?}'",
+                    window_frame
+                )))
+            }
+            WindowFrameBound::Preceding(ScalarValue::UInt64(Some(n))) => {
+                if idx >= n as usize {
+                    idx - n as usize + 1
+                } else {
+                    0
+                }
+            }
+            WindowFrameBound::CurrentRow => idx + 1,
+            // UNBOUNDED FOLLOWING
+            WindowFrameBound::Following(ScalarValue::UInt64(None)) => length,
+            WindowFrameBound::Following(ScalarValue::UInt64(Some(n))) => {
+                min(idx + n as usize + 1, length)
+            }
+            // ERRONEOUS FRAMES
+            WindowFrameBound::Preceding(_) | WindowFrameBound::Following(_) => {
+                return Err(DataFusionError::Internal("Rows should be Uint".to_string()))
+            }
+        };
+        Ok((start, end))
+    }
+}
+
+/// This structure encapsulates all the state information we require as we
+/// scan ranges of data while processing window frames. Currently we calculate
+/// things from scratch every time, but we will make this incremental in the future.
+#[derive(Debug, Default)]
+pub struct WindowFrameStateRange {}
+
+impl WindowFrameStateRange {
+    /// This function calculates beginning/ending indices for the frame of the current row.
+    fn calculate_range(
+        &mut self,
+        window_frame: &Arc<WindowFrame>,
+        range_columns: &[ArrayRef],
+        sort_options: &[SortOptions],
+        length: usize,
+        idx: usize,
+    ) -> Result<(usize, usize)> {
+        let start = match window_frame.start_bound {
+            WindowFrameBound::Preceding(ref n) => {
+                if n.is_null() {
+                    // UNBOUNDED PRECEDING
+                    0
+                } else {
+                    self.calculate_index_of_row::<true, true>(
+                        range_columns,
+                        sort_options,
+                        idx,
+                        Some(n),
+                    )?
+                }
+            }
+            WindowFrameBound::CurrentRow => {
+                if range_columns.is_empty() {
+                    0
+                } else {
+                    self.calculate_index_of_row::<true, true>(
+                        range_columns,
+                        sort_options,
+                        idx,
+                        None,
+                    )?
+                }
+            }
+            WindowFrameBound::Following(ref n) => self
+                .calculate_index_of_row::<true, false>(
+                    range_columns,
+                    sort_options,
+                    idx,
+                    Some(n),
+                )?,
+        };
+        let end = match window_frame.end_bound {
+            WindowFrameBound::Preceding(ref n) => self
+                .calculate_index_of_row::<false, true>(
+                    range_columns,
+                    sort_options,
+                    idx,
+                    Some(n),
+                )?,
+            WindowFrameBound::CurrentRow => {
+                if range_columns.is_empty() {
+                    length
+                } else {
+                    self.calculate_index_of_row::<false, false>(
+                        range_columns,
+                        sort_options,
+                        idx,
+                        None,
+                    )?
+                }
+            }
+            WindowFrameBound::Following(ref n) => {
+                if n.is_null() {
+                    // UNBOUNDED FOLLOWING
+                    length
+                } else {
+                    self.calculate_index_of_row::<false, false>(
+                        range_columns,
+                        sort_options,
+                        idx,
+                        Some(n),
+                    )?
+                }
+            }
+        };
+        Ok((start, end))
+    }
+
+    /// This function does the heavy lifting when finding range boundaries. It is meant to be
+    /// called twice, in succession, to get window frame start and end indices (with `BISECT_SIDE`
+    /// supplied as false and true, respectively).
+    fn calculate_index_of_row<const BISECT_SIDE: bool, const SEARCH_SIDE: bool>(
+        &mut self,
+        range_columns: &[ArrayRef],
+        sort_options: &[SortOptions],
+        idx: usize,
+        delta: Option<&ScalarValue>,
+    ) -> Result<usize> {
+        let current_row_values = range_columns
+            .iter()
+            .map(|col| ScalarValue::try_from_array(col, idx))
+            .collect::<Result<Vec<ScalarValue>>>()?;
+        let end_range = if let Some(delta) = delta {
+            let is_descending: bool = sort_options
+                .first()
+                .ok_or_else(|| DataFusionError::Internal("Array is empty".to_string()))?
+                .descending;
+
+            current_row_values
+                .iter()
+                .map(|value| {
+                    if value.is_null() {
+                        return Ok(value.clone());
+                    }
+                    if SEARCH_SIDE == is_descending {
+                        // TODO: Handle positive overflows
+                        value.add(delta)
+                    } else if value.is_unsigned() && value < delta {
+                        // NOTE: This gets a polymorphic zero without having long coercion code for ScalarValue.
+                        //       If we decide to implement a "default" construction mechanism for ScalarValue,
+                        //       change the following statement to use that.
+                        value.sub(value)
+                    } else {
+                        // TODO: Handle negative overflows
+                        value.sub(delta)
+                    }
+                })
+                .collect::<Result<Vec<ScalarValue>>>()?
+        } else {
+            current_row_values
+        };
+        // `BISECT_SIDE` true means bisect_left, false means bisect_right
+        bisect::<BISECT_SIDE>(range_columns, &end_range, sort_options)
+    }
+}
+
+// In GROUPS mode, rows with duplicate sorting values are grouped together.
+// Therefore, there must be an ORDER BY clause in the window definition to use GROUPS mode.
+// The syntax is as follows:
+//     GROUPS frame_start [ frame_exclusion ]
+//     GROUPS BETWEEN frame_start AND frame_end [ frame_exclusion ]
+// The optional frame_exclusion specifier is not yet supported.
+// The frame_start and frame_end parameters allow us to specify which rows the window
+// frame starts and ends with. They accept the following values:
+//    - UNBOUNDED PRECEDING: Start with the first row of the partition. Possible only in frame_start.
+//    - offset PRECEDING: When used in frame_start, it refers to the first row of the group
+//                        that comes "offset" groups before the current group (i.e. the group
+//                        containing the current row). When used in frame_end, it refers to the
+//                        last row of the group that comes "offset" groups before the current group.
+//    - CURRENT ROW: When used in frame_start, it refers to the first row of the group containing
+//                   the current row. When used in frame_end, it refers to the last row of the group
+//                   containing the current row.
+//    - offset FOLLOWING: When used in frame_start, it refers to the first row of the group
+//                        that comes "offset" groups after the current group (i.e. the group
+//                        containing the current row). When used in frame_end, it refers to the
+//                        last row of the group that comes "offset" groups after the current group.
+//    - UNBOUNDED FOLLOWING: End with the last row of the partition. Possible only in frame_end.
+
+// This structure encapsulates all the state information we require as we
+// scan groups of data while processing window frames.
+#[derive(Debug, Default)]
+pub struct WindowFrameStateGroups {
+    current_group_idx: u64,
+    group_start_indices: VecDeque<(Vec<ScalarValue>, usize)>,
+    previous_row_values: Option<Vec<ScalarValue>>,
+    reached_end: bool,
+    window_frame_end_idx: u64,
+    window_frame_start_idx: u64,
+}
+
+impl WindowFrameStateGroups {
+    /// This function calculates beginning/ending indices for the frame of the current row.
+    fn calculate_range(
+        &mut self,
+        window_frame: &Arc<WindowFrame>,
+        range_columns: &[ArrayRef],
+        _sort_options: &[SortOptions],
+        length: usize,
+        idx: usize,
+    ) -> Result<(usize, usize)> {
+        if range_columns.is_empty() {
+            return Err(DataFusionError::Execution(
+                "GROUPS mode requires an ORDER BY clause".to_string(),
+            ));
+        }
+        let start = match window_frame.start_bound {
+            // UNBOUNDED PRECEDING
+            WindowFrameBound::Preceding(ScalarValue::UInt64(None)) => 0,
+            WindowFrameBound::Preceding(ScalarValue::UInt64(Some(n))) => self
+                .calculate_index_of_group::<true, true>(range_columns, idx, n, length)?,
+            WindowFrameBound::CurrentRow => self.calculate_index_of_group::<true, true>(
+                range_columns,
+                idx,
+                0,
+                length,
+            )?,
+            WindowFrameBound::Following(ScalarValue::UInt64(Some(n))) => self
+                .calculate_index_of_group::<true, false>(range_columns, idx, n, length)?,
+            // UNBOUNDED FOLLOWING
+            WindowFrameBound::Following(ScalarValue::UInt64(None)) => {
+                return Err(DataFusionError::Internal(format!(
+                    "Frame start cannot be UNBOUNDED FOLLOWING '{:?}'",
+                    window_frame
+                )))
+            }
+            // ERRONEOUS FRAMES
+            WindowFrameBound::Preceding(_) | WindowFrameBound::Following(_) => {
+                return Err(DataFusionError::Internal(
+                    "Groups should be Uint".to_string(),
+                ))
+            }
+        };
+        let end = match window_frame.end_bound {
+            // UNBOUNDED PRECEDING
+            WindowFrameBound::Preceding(ScalarValue::UInt64(None)) => {
+                return Err(DataFusionError::Internal(format!(
+                    "Frame end cannot be UNBOUNDED PRECEDING '{:?}'",
+                    window_frame
+                )))
+            }
+            WindowFrameBound::Preceding(ScalarValue::UInt64(Some(n))) => self
+                .calculate_index_of_group::<false, true>(range_columns, idx, n, length)?,
+            WindowFrameBound::CurrentRow => self
+                .calculate_index_of_group::<false, false>(
+                    range_columns,
+                    idx,
+                    0,
+                    length,
+                )?,
+            WindowFrameBound::Following(ScalarValue::UInt64(Some(n))) => self
+                .calculate_index_of_group::<false, false>(
+                    range_columns,
+                    idx,
+                    n,
+                    length,
+                )?,
+            // UNBOUNDED FOLLOWING
+            WindowFrameBound::Following(ScalarValue::UInt64(None)) => length,
+            // ERRONEOUS FRAMES
+            WindowFrameBound::Preceding(_) | WindowFrameBound::Following(_) => {
+                return Err(DataFusionError::Internal(
+                    "Groups should be Uint".to_string(),
+                ))
+            }
+        };
+        Ok((start, end))
+    }
+
+    /// This function does the heavy lifting when finding group boundaries. It is meant to be
+    /// called twice, in succession, to get window frame start and end indices (with `BISECT_SIDE`
+    /// supplied as false and true, respectively).
+    fn calculate_index_of_group<const BISECT_SIDE: bool, const SEARCH_SIDE: bool>(
+        &mut self,
+        range_columns: &[ArrayRef],
+        idx: usize,
+        delta: u64,
+        length: usize,
+    ) -> Result<usize> {
+        let current_row_values = range_columns
+            .iter()
+            .map(|col| ScalarValue::try_from_array(col, idx))
+            .collect::<Result<Vec<ScalarValue>>>()?;
+
+        if BISECT_SIDE {
+            // When we call this function to get the window frame start index, it tries to initialize
+            // the internal grouping state if this is not already done before. This initialization takes
+            // place only when the window frame start index is greater than or equal to zero. In this
+            // case, the current row is stored in group_start_indices, with row values as the group
+            // identifier and row index as the start index of the group.
+            if !self.initialized() {
+                self.initialize::<SEARCH_SIDE>(delta, range_columns)?;
+            }
+        } else if !self.reached_end {
+            // When we call this function to get the window frame end index, it extends the window
+            // frame one by one until the current row's window frame end index is reached by finding
+            // the next group.
+            self.extend_window_frame_if_necessary::<SEARCH_SIDE>(range_columns, delta)?;
+        }
+        // We keep track of previous row values, so that a group change can be identified.
+        // If there is a group change, the window frame is advanced and shifted by one group.
+        let group_change = match &self.previous_row_values {
+            None => false,
+            Some(values) => &current_row_values != values,
+        };
+        if self.previous_row_values.is_none() || group_change {
+            self.previous_row_values = Some(current_row_values);
+        }
+        if group_change {
+            self.current_group_idx += 1;
+            self.advance_one_group::<SEARCH_SIDE>(range_columns)?;
+            self.shift_one_group::<SEARCH_SIDE>(delta);
+        }
+        Ok(if self.group_start_indices.is_empty() {
+            if self.reached_end {
+                length
+            } else {
+                0
+            }
+        } else if BISECT_SIDE {
+            match self.group_start_indices.get(0) {
+                Some(&(_, idx)) => idx,
+                None => 0,
+            }
+        } else {
+            match (self.reached_end, self.group_start_indices.back()) {
+                (false, Some(&(_, idx))) => idx,
+                _ => length,
+            }
+        })
+    }
+
+    fn extend_window_frame_if_necessary<const SEARCH_SIDE: bool>(
+        &mut self,
+        range_columns: &[ArrayRef],
+        delta: u64,
+    ) -> Result<()> {
+        let current_window_frame_end_idx = if !SEARCH_SIDE {
+            self.current_group_idx + delta + 1
+        } else if self.current_group_idx >= delta {
+            self.current_group_idx - delta + 1
+        } else {
+            0
+        };
+        if current_window_frame_end_idx == 0 {
+            // the end index of the window frame is still before the first index
+            return Ok(());
+        }
+        if self.group_start_indices.is_empty() {
+            self.initialize_window_frame_start(range_columns)?;
+        }
+        while !self.reached_end
+            && self.window_frame_end_idx <= current_window_frame_end_idx
+        {
+            self.advance_one_group::<SEARCH_SIDE>(range_columns)?;
+        }
+        Ok(())
+    }
+
+    fn initialize<const SEARCH_SIDE: bool>(
+        &mut self,
+        delta: u64,
+        range_columns: &[ArrayRef],
+    ) -> Result<()> {
+        if !SEARCH_SIDE {
+            self.window_frame_start_idx = self.current_group_idx + delta;
+            self.initialize_window_frame_start(range_columns)
+        } else if self.current_group_idx >= delta {
+            self.window_frame_start_idx = self.current_group_idx - delta;
+            self.initialize_window_frame_start(range_columns)
+        } else {
+            Ok(())
+        }
+    }
+
+    fn initialize_window_frame_start(
+        &mut self,
+        range_columns: &[ArrayRef],
+    ) -> Result<()> {
+        let mut group_values = range_columns
+            .iter()
+            .map(|col| ScalarValue::try_from_array(col, 0))
+            .collect::<Result<Vec<ScalarValue>>>()?;
+        let mut start_idx: usize = 0;
+        for _ in 0..self.window_frame_start_idx {
+            let next_group_and_start_index =
+                WindowFrameStateGroups::find_next_group_and_start_index(
+                    range_columns,
+                    &group_values,
+                    start_idx,
+                )?;
+            if let Some(entry) = next_group_and_start_index {
+                (group_values, start_idx) = entry;
+            } else {
+                // not enough groups to generate a window frame
+                self.window_frame_end_idx = self.window_frame_start_idx;
+                self.reached_end = true;
+                return Ok(());
+            }
+        }
+        self.group_start_indices
+            .push_back((group_values, start_idx));
+        self.window_frame_end_idx = self.window_frame_start_idx + 1;
+        Ok(())
+    }
+
+    fn initialized(&self) -> bool {
+        self.reached_end || !self.group_start_indices.is_empty()
+    }
+
+    /// This function advances the window frame by one group.
+    fn advance_one_group<const SEARCH_SIDE: bool>(
+        &mut self,
+        range_columns: &[ArrayRef],
+    ) -> Result<()> {
+        let last_group_values = self.group_start_indices.back();
+        let last_group_values = if let Some(values) = last_group_values {
+            values
+        } else {
+            return Ok(());
+        };
+        let next_group_and_start_index =
+            WindowFrameStateGroups::find_next_group_and_start_index(
+                range_columns,
+                &last_group_values.0,
+                last_group_values.1,
+            )?;
+        if let Some(entry) = next_group_and_start_index {
+            self.group_start_indices.push_back(entry);
+            self.window_frame_end_idx += 1;
+        } else {
+            // not enough groups to proceed
+            self.reached_end = true;
+        }
+        Ok(())
+    }
+
+    /// This function drops the oldest group from the window frame.
+    fn shift_one_group<const SEARCH_SIDE: bool>(&mut self, delta: u64) {
+        let current_window_frame_start_idx = if !SEARCH_SIDE {
+            self.current_group_idx + delta
+        } else if self.current_group_idx >= delta {
+            self.current_group_idx - delta
+        } else {
+            0
+        };
+        if current_window_frame_start_idx > self.window_frame_start_idx {
+            self.group_start_indices.pop_front();
+            self.window_frame_start_idx += 1;
+        }
+    }
+
+    /// This function finds the next group and its start index for a given group and start index.
+    /// It utilizes an exponentially growing step size to find the group boundary.
+    // TODO: For small group sizes, proceeding one-by-one to find the group change can be more efficient.
+    // Statistics about previous group sizes can be used to choose one-by-one vs. exponentially growing,
+    // or even to set the base step_size when exponentially growing. We can also create a benchmark
+    // implementation to get insights about the crossover point.
+    fn find_next_group_and_start_index(
+        range_columns: &[ArrayRef],
+        current_row_values: &[ScalarValue],
+        idx: usize,
+    ) -> Result<Option<(Vec<ScalarValue>, usize)>> {
+        let mut step_size: usize = 1;
+        let data_size: usize = range_columns
+            .get(0)
+            .ok_or_else(|| {
+                DataFusionError::Internal("Column array shouldn't be empty".to_string())
+            })?
+            .len();
+        let mut low = idx;
+        let mut high = idx + step_size;
+        while high < data_size {
+            let val = range_columns
+                .iter()
+                .map(|arr| ScalarValue::try_from_array(arr, high))
+                .collect::<Result<Vec<ScalarValue>>>()?;
+            if val == current_row_values {
+                low = high;
+                step_size *= 2;
+                high += step_size;
+            } else {
+                break;
+            }
+        }
+        low = find_bisect_point(
+            range_columns,
+            current_row_values,
+            |current, to_compare| Ok(current == to_compare),
+            low,
+            min(high, data_size),
+        )?;
+        if low == data_size {
+            return Ok(None);
+        }
+        let val = range_columns
+            .iter()
+            .map(|arr| ScalarValue::try_from_array(arr, low))
+            .collect::<Result<Vec<ScalarValue>>>()?;
+        Ok(Some((val, low)))
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use arrow::array::Float64Array;
+    use datafusion_common::ScalarValue;
+    use std::sync::Arc;
+
+    use crate::from_slice::FromSlice;
+
+    use super::*;
+
+    #[test]
+    fn test_find_next_group_and_start_index() {
+        const NUM_ROWS: usize = 6;
+        const NEXT_INDICES: [usize; 5] = [1, 2, 4, 4, 5];
+
+        let arrays: Vec<ArrayRef> = vec![
+            Arc::new(Float64Array::from_slice([5.0, 7.0, 8.0, 8., 9., 10.])),
+            Arc::new(Float64Array::from_slice([2.0, 3.0, 3.0, 3., 4.0, 5.0])),
+            Arc::new(Float64Array::from_slice([5.0, 7.0, 8.0, 8., 10., 11.0])),
+            Arc::new(Float64Array::from_slice([15.0, 13.0, 8.0, 8., 5., 0.0])),
+        ];
+        for (current_idx, next_idx) in NEXT_INDICES.iter().enumerate() {
+            let current_row_values = arrays
+                .iter()
+                .map(|col| ScalarValue::try_from_array(col, current_idx))
+                .collect::<Result<Vec<ScalarValue>>>()
+                .unwrap();
+            let next_row_values = arrays
+                .iter()
+                .map(|col| ScalarValue::try_from_array(col, *next_idx))
+                .collect::<Result<Vec<ScalarValue>>>()
+                .unwrap();
+            let res = WindowFrameStateGroups::find_next_group_and_start_index(
+                &arrays,
+                &current_row_values,
+                current_idx,
+            )
+            .unwrap();
+            assert_eq!(res, Some((next_row_values, *next_idx)));
+        }
+        let current_idx = NUM_ROWS - 1;
+        let current_row_values = arrays
+            .iter()
+            .map(|col| ScalarValue::try_from_array(col, current_idx))
+            .collect::<Result<Vec<ScalarValue>>>()
+            .unwrap();
+        let res = WindowFrameStateGroups::find_next_group_and_start_index(
+            &arrays,
+            &current_row_values,
+            current_idx,
+        )
+        .unwrap();
+        assert_eq!(res, None);
+    }
+
+    #[test]
+    fn test_window_frame_groups_preceding_huge_delta() {
+        const START: bool = true;
+        const END: bool = false;
+        const PRECEDING: bool = true;
+        const DELTA: u64 = 10;
+        const NUM_ROWS: usize = 5;
+
+        let arrays: Vec<ArrayRef> = vec![
+            Arc::new(Float64Array::from_slice([5.0, 7.0, 8.0, 9., 10.])),
+            Arc::new(Float64Array::from_slice([2.0, 3.0, 3.0, 4.0, 5.0])),
+            Arc::new(Float64Array::from_slice([5.0, 7.0, 8.0, 10., 11.0])),
+            Arc::new(Float64Array::from_slice([15.0, 13.0, 8.0, 5., 0.0])),
+        ];

Review Comment:
   Stylistically, you can probably avoid a non trivial amount of repetition from these tests by refactoring the array functions into an individual function.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org