You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/11/11 20:59:38 UTC
[arrow-datafusion] branch master updated: Window frame GROUPS mode support (#4155)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 129654cd8 Window frame GROUPS mode support (#4155)
129654cd8 is described below
commit 129654cd8466e7634bf186782c3d271541f0671e
Author: zembunia <ze...@gmail.com>
AuthorDate: Fri Nov 11 23:59:30 2022 +0300
Window frame GROUPS mode support (#4155)
* Implementation of GROUPS mode in window frame
* Break down/flatten some functions, move comments inline
* Change find_next_group_and_start_index to use an exponentially growing search algorithm
* Removed the TODO after verification of correct implementation
* Window frame state capturing the state of the window frame calculations
* Do not use unnecessary traits and structs for window frame states
* Refactor to avoid carrying the window frame object around
Co-authored-by: Mehmet Ozan Kabak <oz...@gmail.com>
---
datafusion/common/src/bisect.rs | 29 +-
datafusion/core/src/physical_plan/planner.rs | 8 +-
datafusion/core/tests/sql/window.rs | 167 +++-
datafusion/physical-expr/src/window/aggregate.rs | 6 +-
datafusion/physical-expr/src/window/built_in.rs | 5 +-
datafusion/physical-expr/src/window/mod.rs | 1 +
datafusion/physical-expr/src/window/window_expr.rs | 211 +---
.../physical-expr/src/window/window_frame_state.rs | 1003 ++++++++++++++++++++
integration-tests/sqls/simple_window_groups.sql | 71 ++
integration-tests/test_psql_parity.py | 3 +-
10 files changed, 1266 insertions(+), 238 deletions(-)
diff --git a/datafusion/common/src/bisect.rs b/datafusion/common/src/bisect.rs
index 5060bf013..796598be2 100644
--- a/datafusion/common/src/bisect.rs
+++ b/datafusion/common/src/bisect.rs
@@ -60,22 +60,41 @@ pub fn bisect<const SIDE: bool>(
target: &[ScalarValue],
sort_options: &[SortOptions],
) -> Result<usize> {
- let mut low: usize = 0;
- let mut high: usize = item_columns
+ let low: usize = 0;
+ let high: usize = item_columns
.get(0)
.ok_or_else(|| {
DataFusionError::Internal("Column array shouldn't be empty".to_string())
})?
.len();
+ let compare_fn = |current: &[ScalarValue], target: &[ScalarValue]| {
+ let cmp = compare(current, target, sort_options)?;
+ Ok(if SIDE { cmp.is_lt() } else { cmp.is_le() })
+ };
+ find_bisect_point(item_columns, target, compare_fn, low, high)
+}
+
+/// This function searches for a tuple of target values among the given rows using the bisection algorithm.
+/// The boolean-valued function `compare_fn` specifies whether we bisect on the left (with return value `false`),
+/// or on the right (with return value `true`) as we compare the target value with the current value as we iteratively
+/// bisect the input.
+pub fn find_bisect_point<F>(
+ item_columns: &[ArrayRef],
+ target: &[ScalarValue],
+ compare_fn: F,
+ mut low: usize,
+ mut high: usize,
+) -> Result<usize>
+where
+ F: Fn(&[ScalarValue], &[ScalarValue]) -> Result<bool>,
+{
while low < high {
let mid = ((high - low) / 2) + low;
let val = item_columns
.iter()
.map(|arr| ScalarValue::try_from_array(arr, mid))
.collect::<Result<Vec<ScalarValue>>>()?;
- let cmp = compare(&val, target, sort_options)?;
- let flag = if SIDE { cmp.is_lt() } else { cmp.is_le() };
- if flag {
+ if compare_fn(&val, target)? {
low = mid + 1;
} else {
high = mid;
diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs
index d2c148c3f..757ab7205 100644
--- a/datafusion/core/src/physical_plan/planner.rs
+++ b/datafusion/core/src/physical_plan/planner.rs
@@ -64,7 +64,7 @@ use datafusion_expr::expr::{
};
use datafusion_expr::expr_rewriter::unnormalize_cols;
use datafusion_expr::utils::expand_wildcard;
-use datafusion_expr::{WindowFrame, WindowFrameBound, WindowFrameUnits};
+use datafusion_expr::{WindowFrame, WindowFrameBound};
use datafusion_optimizer::utils::unalias;
use datafusion_physical_expr::expressions::Literal;
use datafusion_sql::utils::window_expr_common_partition_keys;
@@ -1457,12 +1457,6 @@ pub fn create_window_expr_with_name(
})
.collect::<Result<Vec<_>>>()?;
if let Some(ref window_frame) = window_frame {
- if window_frame.units == WindowFrameUnits::Groups {
- return Err(DataFusionError::NotImplemented(
- "Window frame definitions involving GROUPS are not supported yet"
- .to_string(),
- ));
- }
if !is_window_valid(window_frame) {
return Err(DataFusionError::Execution(format!(
"Invalid window frame: start bound ({}) cannot be larger than end bound ({})",
diff --git a/datafusion/core/tests/sql/window.rs b/datafusion/core/tests/sql/window.rs
index a36d90c2a..9367ee63b 100644
--- a/datafusion/core/tests/sql/window.rs
+++ b/datafusion/core/tests/sql/window.rs
@@ -1189,24 +1189,171 @@ async fn window_frame_ranges_unbounded_preceding_err() -> Result<()> {
}
#[tokio::test]
-async fn window_frame_groups_query() -> Result<()> {
+async fn window_frame_groups_preceding_following_desc() -> Result<()> {
+ let ctx = SessionContext::new();
+ register_aggregate_csv(&ctx).await?;
+ let sql = "SELECT
+ SUM(c4) OVER(ORDER BY c2 DESC GROUPS BETWEEN 1 PRECEDING AND 1 FOLLOWING),
+ SUM(c3) OVER(ORDER BY c2 DESC GROUPS BETWEEN 10000 PRECEDING AND 10000 FOLLOWING),
+ COUNT(*) OVER(ORDER BY c2 DESC GROUPS BETWEEN 1 PRECEDING AND 1 FOLLOWING)
+ FROM aggregate_test_100
+ ORDER BY c9
+ LIMIT 5";
+ let actual = execute_to_batches(&ctx, sql).await;
+ let expected = vec![
+ "+----------------------------+----------------------------+-----------------+",
+ "| SUM(aggregate_test_100.c4) | SUM(aggregate_test_100.c3) | COUNT(UInt8(1)) |",
+ "+----------------------------+----------------------------+-----------------+",
+ "| 52276 | 781 | 56 |",
+ "| 260620 | 781 | 63 |",
+ "| -28623 | 781 | 37 |",
+ "| 260620 | 781 | 63 |",
+ "| 260620 | 781 | 63 |",
+ "+----------------------------+----------------------------+-----------------+",
+ ];
+ assert_batches_eq!(expected, &actual);
+ Ok(())
+}
+
+#[tokio::test]
+async fn window_frame_groups_order_by_null_desc() -> Result<()> {
+ let ctx = SessionContext::new();
+ register_aggregate_null_cases_csv(&ctx).await?;
+ let sql = "SELECT
+ COUNT(c2) OVER (ORDER BY c1 DESC GROUPS BETWEEN 5 PRECEDING AND 3 FOLLOWING)
+ FROM null_cases
+ LIMIT 5";
+ let actual = execute_to_batches(&ctx, sql).await;
+ let expected = vec![
+ "+----------------------+",
+ "| COUNT(null_cases.c2) |",
+ "+----------------------+",
+ "| 12 |",
+ "| 12 |",
+ "| 12 |",
+ "| 12 |",
+ "| 12 |",
+ "+----------------------+",
+ ];
+ assert_batches_eq!(expected, &actual);
+ Ok(())
+}
+
+#[tokio::test]
+async fn window_frame_groups() -> Result<()> {
+ let ctx = SessionContext::new();
+ register_aggregate_null_cases_csv(&ctx).await?;
+ let sql = "SELECT
+ SUM(c1) OVER (ORDER BY c3 GROUPS BETWEEN 9 PRECEDING AND 11 FOLLOWING) as a,
+ SUM(c1) OVER (ORDER BY c3 DESC GROUPS BETWEEN 9 PRECEDING AND 11 FOLLOWING) as b,
+ SUM(c1) OVER (ORDER BY c3 NULLS first GROUPS BETWEEN 9 PRECEDING AND 11 FOLLOWING) as c,
+ SUM(c1) OVER (ORDER BY c3 DESC NULLS last GROUPS BETWEEN 9 PRECEDING AND 11 FOLLOWING) as d,
+ SUM(c1) OVER (ORDER BY c3 DESC NULLS first GROUPS BETWEEN 9 PRECEDING AND 11 FOLLOWING) as e,
+ SUM(c1) OVER (ORDER BY c3 NULLS first GROUPS BETWEEN 9 PRECEDING AND 11 FOLLOWING) as f,
+ SUM(c1) OVER (ORDER BY c3 GROUPS current row) as a1,
+ SUM(c1) OVER (ORDER BY c3 GROUPS BETWEEN 9 PRECEDING AND 5 PRECEDING) as a2,
+ SUM(c1) OVER (ORDER BY c3 GROUPS BETWEEN UNBOUNDED PRECEDING AND 5 PRECEDING) as a3,
+ SUM(c1) OVER (ORDER BY c3 GROUPS BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING) as a4,
+ SUM(c1) OVER (ORDER BY c3 GROUPS BETWEEN UNBOUNDED PRECEDING AND current row) as a5,
+ SUM(c1) OVER (ORDER BY c3 GROUPS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) as a6,
+ SUM(c1) OVER (ORDER BY c3 GROUPS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING) as a7,
+ SUM(c1) OVER (ORDER BY c3 GROUPS BETWEEN 3 FOLLOWING AND UNBOUNDED FOLLOWING) as a8,
+ SUM(c1) OVER (ORDER BY c3 GROUPS BETWEEN current row AND UNBOUNDED FOLLOWING) as a9,
+ SUM(c1) OVER (ORDER BY c3 GROUPS BETWEEN current row AND 3 FOLLOWING) as a10,
+ SUM(c1) OVER (ORDER BY c3 GROUPS BETWEEN 5 FOLLOWING AND 7 FOLLOWING) as a11,
+ SUM(c1) OVER (ORDER BY c3 DESC GROUPS current row) as a21,
+ SUM(c1) OVER (ORDER BY c3 NULLS first GROUPS BETWEEN 9 PRECEDING AND 5 PRECEDING) as a22,
+ SUM(c1) OVER (ORDER BY c3 DESC NULLS last GROUPS BETWEEN UNBOUNDED PRECEDING AND 5 PRECEDING) as a23,
+ SUM(c1) OVER (ORDER BY c3 NULLS last GROUPS BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING) as a24,
+ SUM(c1) OVER (ORDER BY c3 DESC NULLS first GROUPS BETWEEN UNBOUNDED PRECEDING AND current row) as a25
+ FROM null_cases
+ ORDER BY c3
+ LIMIT 10";
+ let actual = execute_to_batches(&ctx, sql).await;
+ let expected = vec![
+ "+-----+-----+-----+-----+-----+-----+----+-----+-----+-----+-----+------+------+------+------+-----+-----+-----+-----+------+-----+------+",
+ "| a | b | c | d | e | f | a1 | a2 | a3 | a4 | a5 | a6 | a7 | a8 | a9 | a10 | a11 | a21 | a22 | a23 | a24 | a25 |",
+ "+-----+-----+-----+-----+-----+-----+----+-----+-----+-----+-----+------+------+------+------+-----+-----+-----+-----+------+-----+------+",
+ "| 412 | 307 | 412 | 307 | 307 | 412 | | | | 412 | | 4627 | 4627 | 4531 | 4627 | 115 | 85 | | | 4487 | 412 | 4627 |",
+ "| 488 | 339 | 488 | 339 | 339 | 488 | 72 | | | 488 | 72 | 4627 | 4627 | 4512 | 4627 | 140 | 153 | 72 | | 4473 | 488 | 4627 |",
+ "| 543 | 412 | 543 | 412 | 412 | 543 | 24 | | | 543 | 96 | 4627 | 4627 | 4487 | 4555 | 82 | 122 | 24 | | 4442 | 543 | 4555 |",
+ "| 553 | 488 | 553 | 488 | 488 | 553 | 19 | | | 553 | 115 | 4627 | 4555 | 4473 | 4531 | 89 | 114 | 19 | | 4402 | 553 | 4531 |",
+ "| 553 | 543 | 553 | 543 | 543 | 553 | 25 | | | 553 | 140 | 4627 | 4531 | 4442 | 4512 | 110 | 105 | 25 | | 4320 | 553 | 4512 |",
+ "| 591 | 553 | 591 | 553 | 553 | 591 | 14 | | | 591 | 154 | 4627 | 4512 | 4402 | 4487 | 167 | 181 | 14 | | 4320 | 591 | 4487 |",
+ "| 651 | 553 | 651 | 553 | 553 | 651 | 31 | 72 | 72 | 651 | 185 | 4627 | 4487 | 4320 | 4473 | 153 | 204 | 31 | 72 | 4288 | 651 | 4473 |",
+ "| 662 | 591 | 662 | 591 | 591 | 662 | 40 | 96 | 96 | 662 | 225 | 4627 | 4473 | 4320 | 4442 | 154 | 141 | 40 | 96 | 4215 | 662 | 4442 |",
+ "| 697 | 651 | 697 | 651 | 651 | 697 | 82 | 115 | 115 | 697 | 307 | 4627 | 4442 | 4288 | 4402 | 187 | 65 | 82 | 115 | 4139 | 697 | 4402 |",
+ "| 758 | 662 | 758 | 662 | 662 | 758 | | 140 | 140 | 758 | 307 | 4627 | 4402 | 4215 | 4320 | 181 | 48 | | 140 | 4084 | 758 | 4320 |",
+ "+-----+-----+-----+-----+-----+-----+----+-----+-----+-----+-----+------+------+------+------+-----+-----+-----+-----+------+-----+------+",
+ ];
+ assert_batches_eq!(expected, &actual);
+ Ok(())
+}
+
+#[tokio::test]
+async fn window_frame_groups_multiple_order_columns() -> Result<()> {
+ let ctx = SessionContext::new();
+ register_aggregate_null_cases_csv(&ctx).await?;
+ let sql = "SELECT
+ SUM(c1) OVER (ORDER BY c2, c3 GROUPS BETWEEN 9 PRECEDING AND 11 FOLLOWING) as a,
+ SUM(c1) OVER (ORDER BY c2, c3 DESC GROUPS BETWEEN 9 PRECEDING AND 11 FOLLOWING) as b,
+ SUM(c1) OVER (ORDER BY c2, c3 NULLS first GROUPS BETWEEN 9 PRECEDING AND 11 FOLLOWING) as c,
+ SUM(c1) OVER (ORDER BY c2, c3 DESC NULLS last GROUPS BETWEEN 9 PRECEDING AND 11 FOLLOWING) as d,
+ SUM(c1) OVER (ORDER BY c2, c3 DESC NULLS first GROUPS BETWEEN 9 PRECEDING AND 11 FOLLOWING) as e,
+ SUM(c1) OVER (ORDER BY c2, c3 NULLS first GROUPS BETWEEN 9 PRECEDING AND 11 FOLLOWING) as f,
+ SUM(c1) OVER (ORDER BY c2, c3 GROUPS current row) as a1,
+ SUM(c1) OVER (ORDER BY c2, c3 GROUPS BETWEEN 9 PRECEDING AND 5 PRECEDING) as a2,
+ SUM(c1) OVER (ORDER BY c2, c3 GROUPS BETWEEN UNBOUNDED PRECEDING AND 5 PRECEDING) as a3,
+ SUM(c1) OVER (ORDER BY c2, c3 GROUPS BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING) as a4,
+ SUM(c1) OVER (ORDER BY c2, c3 GROUPS BETWEEN UNBOUNDED PRECEDING AND current row) as a5,
+ SUM(c1) OVER (ORDER BY c2, c3 GROUPS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) as a6,
+ SUM(c1) OVER (ORDER BY c2, c3 GROUPS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING) as a7,
+ SUM(c1) OVER (ORDER BY c2, c3 GROUPS BETWEEN 3 FOLLOWING AND UNBOUNDED FOLLOWING) as a8,
+ SUM(c1) OVER (ORDER BY c2, c3 GROUPS BETWEEN current row AND UNBOUNDED FOLLOWING) as a9,
+ SUM(c1) OVER (ORDER BY c2, c3 GROUPS BETWEEN current row AND 3 FOLLOWING) as a10,
+ SUM(c1) OVER (ORDER BY c2, c3 GROUPS BETWEEN 5 FOLLOWING AND 7 FOLLOWING) as a11
+ FROM null_cases
+ ORDER BY c3
+ LIMIT 10";
+ let actual = execute_to_batches(&ctx, sql).await;
+ let expected = vec![
+ "+------+-----+------+-----+-----+------+----+-----+------+------+------+------+------+------+------+-----+-----+",
+ "| a | b | c | d | e | f | a1 | a2 | a3 | a4 | a5 | a6 | a7 | a8 | a9 | a10 | a11 |",
+ "+------+-----+------+-----+-----+------+----+-----+------+------+------+------+------+------+------+-----+-----+",
+ "| 818 | 910 | 818 | 910 | 910 | 818 | | 249 | 249 | 818 | 432 | 4627 | 4234 | 4157 | 4195 | 98 | 82 |",
+ "| 537 | 979 | 537 | 979 | 979 | 537 | 72 | | | 537 | 210 | 4627 | 4569 | 4378 | 4489 | 169 | 55 |",
+ "| 811 | 838 | 811 | 838 | 838 | 811 | 24 | 221 | 3075 | 3665 | 3311 | 4627 | 1390 | 1276 | 1340 | 117 | 144 |",
+ "| 763 | 464 | 763 | 464 | 464 | 763 | 19 | 168 | 3572 | 4167 | 3684 | 4627 | 962 | 829 | 962 | 194 | 80 |",
+ "| 552 | 964 | 552 | 964 | 964 | 552 | 25 | | | 552 | 235 | 4627 | 4489 | 4320 | 4417 | 167 | 39 |",
+ "| 963 | 930 | 963 | 930 | 930 | 963 | 14 | 201 | 818 | 1580 | 1098 | 4627 | 3638 | 3455 | 3543 | 177 | 224 |",
+ "| 1113 | 814 | 1113 | 814 | 814 | 1113 | 31 | 415 | 2653 | 3351 | 2885 | 4627 | 1798 | 1694 | 1773 | 165 | 162 |",
+ "| 780 | 868 | 780 | 868 | 868 | 780 | 40 | 258 | 3143 | 3665 | 3351 | 4627 | 1340 | 1223 | 1316 | 117 | 102 |",
+ "| 740 | 466 | 740 | 466 | 466 | 740 | 82 | 164 | 3592 | 4168 | 3766 | 4627 | 962 | 768 | 943 | 244 | 122 |",
+ "| 772 | 832 | 772 | 832 | 832 | 772 | | 277 | 3189 | 3684 | 3351 | 4627 | 1316 | 1199 | 1276 | 119 | 64 |",
+ "+------+-----+------+-----+-----+------+----+-----+------+------+------+------+------+------+------+-----+-----+",
+ ];
+ assert_batches_eq!(expected, &actual);
+ Ok(())
+}
+
+#[tokio::test]
+async fn window_frame_groups_without_order_by() -> Result<()> {
let ctx = SessionContext::new();
register_aggregate_csv(&ctx).await?;
// execute the query
let df = ctx
.sql(
"SELECT
- COUNT(c1) OVER (ORDER BY c2 GROUPS BETWEEN 1 PRECEDING AND 1 FOLLOWING)
- FROM aggregate_test_100;",
+ SUM(c4) OVER(PARTITION BY c2 GROUPS BETWEEN 1 PRECEDING AND 1 FOLLOWING)
+ FROM aggregate_test_100
+ ORDER BY c9;",
)
.await?;
- let results = df.collect().await;
- assert!(results
- .as_ref()
- .err()
- .unwrap()
- .to_string()
- .contains("Window frame definitions involving GROUPS are not supported yet"));
+ let err = df.collect().await.unwrap_err();
+ assert_contains!(
+ err.to_string(),
+ "Execution error: GROUPS mode requires an ORDER BY clause".to_owned()
+ );
Ok(())
}
diff --git a/datafusion/physical-expr/src/window/aggregate.rs b/datafusion/physical-expr/src/window/aggregate.rs
index e6c754387..69b0812c1 100644
--- a/datafusion/physical-expr/src/window/aggregate.rs
+++ b/datafusion/physical-expr/src/window/aggregate.rs
@@ -33,6 +33,8 @@ use datafusion_expr::WindowFrame;
use crate::{expressions::PhysicalSortExpr, PhysicalExpr};
use crate::{window::WindowExpr, AggregateExpr};
+use super::window_frame_state::WindowFrameContext;
+
/// A window expr that takes the form of an aggregate function
#[derive(Debug)]
pub struct AggregateWindowExpr {
@@ -114,13 +116,13 @@ impl WindowExpr for AggregateWindowExpr {
.map(|v| v.slice(partition_range.start, length))
.collect::<Vec<_>>();
+ let mut window_frame_ctx = WindowFrameContext::new(&window_frame);
let mut last_range: (usize, usize) = (0, 0);
// We iterate on each row to perform a running calculation.
// First, cur_range is calculated, then it is compared with last_range.
for i in 0..length {
- let cur_range = self.calculate_range(
- &window_frame,
+ let cur_range = window_frame_ctx.calculate_range(
&slice_order_bys,
&sort_options,
length,
diff --git a/datafusion/physical-expr/src/window/built_in.rs b/datafusion/physical-expr/src/window/built_in.rs
index e4e377175..da551d534 100644
--- a/datafusion/physical-expr/src/window/built_in.rs
+++ b/datafusion/physical-expr/src/window/built_in.rs
@@ -17,6 +17,7 @@
//! Physical exec for built-in window function expressions.
+use super::window_frame_state::WindowFrameContext;
use super::BuiltInWindowFunctionExpr;
use super::WindowExpr;
use crate::{expressions::PhysicalSortExpr, PhysicalExpr};
@@ -113,10 +114,10 @@ impl WindowExpr for BuiltInWindowExpr {
.iter()
.map(|v| v.slice(partition_range.start, length))
.collect::<Vec<_>>();
+ let mut window_frame_ctx = WindowFrameContext::new(&window_frame);
// We iterate on each row to calculate window frame range and and window function result
for idx in 0..length {
- let range = self.calculate_range(
- &window_frame,
+ let range = window_frame_ctx.calculate_range(
&slice_order_bys,
&sort_options,
num_rows,
diff --git a/datafusion/physical-expr/src/window/mod.rs b/datafusion/physical-expr/src/window/mod.rs
index 059f8b886..40ed658ee 100644
--- a/datafusion/physical-expr/src/window/mod.rs
+++ b/datafusion/physical-expr/src/window/mod.rs
@@ -25,6 +25,7 @@ pub(crate) mod partition_evaluator;
pub(crate) mod rank;
pub(crate) mod row_number;
mod window_expr;
+mod window_frame_state;
pub use aggregate::AggregateWindowExpr;
pub use built_in::BuiltInWindowExpr;
diff --git a/datafusion/physical-expr/src/window/window_expr.rs b/datafusion/physical-expr/src/window/window_expr.rs
index 9c4b1b179..67caba51d 100644
--- a/datafusion/physical-expr/src/window/window_expr.rs
+++ b/datafusion/physical-expr/src/window/window_expr.rs
@@ -20,17 +20,12 @@ use arrow::compute::kernels::partition::lexicographical_partition_ranges;
use arrow::compute::kernels::sort::{SortColumn, SortOptions};
use arrow::record_batch::RecordBatch;
use arrow::{array::ArrayRef, datatypes::Field};
-use datafusion_common::bisect::bisect;
-use datafusion_common::{DataFusionError, Result, ScalarValue};
+use datafusion_common::{DataFusionError, Result};
use std::any::Any;
-use std::cmp::min;
use std::fmt::Debug;
use std::ops::Range;
use std::sync::Arc;
-use datafusion_expr::WindowFrameBound;
-use datafusion_expr::{WindowFrame, WindowFrameUnits};
-
/// A window expression that:
/// * knows its resulting field
pub trait WindowExpr: Send + Sync + Debug {
@@ -115,208 +110,4 @@ pub trait WindowExpr: Send + Sync + Debug {
sort_columns.extend(order_by_columns);
Ok(sort_columns)
}
-
- /// We use start and end bounds to calculate current row's starting and ending range.
- /// This function supports different modes, but we currently do not support window calculation for GROUPS inside window frames.
- fn calculate_range(
- &self,
- window_frame: &Option<Arc<WindowFrame>>,
- range_columns: &[ArrayRef],
- sort_options: &[SortOptions],
- length: usize,
- idx: usize,
- ) -> Result<(usize, usize)> {
- if let Some(window_frame) = window_frame {
- match window_frame.units {
- WindowFrameUnits::Range => {
- let start = match &window_frame.start_bound {
- // UNBOUNDED PRECEDING
- WindowFrameBound::Preceding(n) => {
- if n.is_null() {
- 0
- } else {
- calculate_index_of_row::<true, true>(
- range_columns,
- sort_options,
- idx,
- Some(n),
- )?
- }
- }
- WindowFrameBound::CurrentRow => {
- if range_columns.is_empty() {
- 0
- } else {
- calculate_index_of_row::<true, true>(
- range_columns,
- sort_options,
- idx,
- None,
- )?
- }
- }
- WindowFrameBound::Following(n) => {
- calculate_index_of_row::<true, false>(
- range_columns,
- sort_options,
- idx,
- Some(n),
- )?
- }
- };
- let end = match &window_frame.end_bound {
- WindowFrameBound::Preceding(n) => {
- calculate_index_of_row::<false, true>(
- range_columns,
- sort_options,
- idx,
- Some(n),
- )?
- }
- WindowFrameBound::CurrentRow => {
- if range_columns.is_empty() {
- length
- } else {
- calculate_index_of_row::<false, false>(
- range_columns,
- sort_options,
- idx,
- None,
- )?
- }
- }
- WindowFrameBound::Following(n) => {
- if n.is_null() {
- // UNBOUNDED FOLLOWING
- length
- } else {
- calculate_index_of_row::<false, false>(
- range_columns,
- sort_options,
- idx,
- Some(n),
- )?
- }
- }
- };
- Ok((start, end))
- }
- WindowFrameUnits::Rows => {
- let start = match window_frame.start_bound {
- // UNBOUNDED PRECEDING
- WindowFrameBound::Preceding(ScalarValue::UInt64(None)) => 0,
- WindowFrameBound::Preceding(ScalarValue::UInt64(Some(n))) => {
- if idx >= n as usize {
- idx - n as usize
- } else {
- 0
- }
- }
- WindowFrameBound::Preceding(_) => {
- return Err(DataFusionError::Internal(
- "Rows should be Uint".to_string(),
- ))
- }
- WindowFrameBound::CurrentRow => idx,
- // UNBOUNDED FOLLOWING
- WindowFrameBound::Following(ScalarValue::UInt64(None)) => {
- return Err(DataFusionError::Internal(format!(
- "Frame start cannot be UNBOUNDED FOLLOWING '{:?}'",
- window_frame
- )))
- }
- WindowFrameBound::Following(ScalarValue::UInt64(Some(n))) => {
- min(idx + n as usize, length)
- }
- WindowFrameBound::Following(_) => {
- return Err(DataFusionError::Internal(
- "Rows should be Uint".to_string(),
- ))
- }
- };
- let end = match window_frame.end_bound {
- // UNBOUNDED PRECEDING
- WindowFrameBound::Preceding(ScalarValue::UInt64(None)) => {
- return Err(DataFusionError::Internal(format!(
- "Frame end cannot be UNBOUNDED PRECEDING '{:?}'",
- window_frame
- )))
- }
- WindowFrameBound::Preceding(ScalarValue::UInt64(Some(n))) => {
- if idx >= n as usize {
- idx - n as usize + 1
- } else {
- 0
- }
- }
- WindowFrameBound::Preceding(_) => {
- return Err(DataFusionError::Internal(
- "Rows should be Uint".to_string(),
- ))
- }
- WindowFrameBound::CurrentRow => idx + 1,
- // UNBOUNDED FOLLOWING
- WindowFrameBound::Following(ScalarValue::UInt64(None)) => length,
- WindowFrameBound::Following(ScalarValue::UInt64(Some(n))) => {
- min(idx + n as usize + 1, length)
- }
- WindowFrameBound::Following(_) => {
- return Err(DataFusionError::Internal(
- "Rows should be Uint".to_string(),
- ))
- }
- };
- Ok((start, end))
- }
- WindowFrameUnits::Groups => Err(DataFusionError::NotImplemented(
- "Window frame for groups is not implemented".to_string(),
- )),
- }
- } else {
- Ok((0, length))
- }
- }
-}
-
-fn calculate_index_of_row<const BISECT_SIDE: bool, const SEARCH_SIDE: bool>(
- range_columns: &[ArrayRef],
- sort_options: &[SortOptions],
- idx: usize,
- delta: Option<&ScalarValue>,
-) -> Result<usize> {
- let current_row_values = range_columns
- .iter()
- .map(|col| ScalarValue::try_from_array(col, idx))
- .collect::<Result<Vec<ScalarValue>>>()?;
- let end_range = if let Some(delta) = delta {
- let is_descending: bool = sort_options
- .first()
- .ok_or_else(|| DataFusionError::Internal("Array is empty".to_string()))?
- .descending;
-
- current_row_values
- .iter()
- .map(|value| {
- if value.is_null() {
- return Ok(value.clone());
- }
- if SEARCH_SIDE == is_descending {
- // TODO: Handle positive overflows
- value.add(delta)
- } else if value.is_unsigned() && value < delta {
- // NOTE: This gets a polymorphic zero without having long coercion code for ScalarValue.
- // If we decide to implement a "default" construction mechanism for ScalarValue,
- // change the following statement to use that.
- value.sub(value)
- } else {
- // TODO: Handle negative overflows
- value.sub(delta)
- }
- })
- .collect::<Result<Vec<ScalarValue>>>()?
- } else {
- current_row_values
- };
- // `BISECT_SIDE` true means bisect_left, false means bisect_right
- bisect::<BISECT_SIDE>(range_columns, &end_range, sort_options)
}
diff --git a/datafusion/physical-expr/src/window/window_frame_state.rs b/datafusion/physical-expr/src/window/window_frame_state.rs
new file mode 100644
index 000000000..fc8d7d03a
--- /dev/null
+++ b/datafusion/physical-expr/src/window/window_frame_state.rs
@@ -0,0 +1,1003 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This module provides utilities for window frame index calculations depending on the window frame mode:
+//! RANGE, ROWS, GROUPS.
+
+use arrow::array::ArrayRef;
+use arrow::compute::kernels::sort::SortOptions;
+use datafusion_common::bisect::{bisect, find_bisect_point};
+use datafusion_common::{DataFusionError, Result, ScalarValue};
+use datafusion_expr::{WindowFrame, WindowFrameBound, WindowFrameUnits};
+use std::cmp::min;
+use std::collections::VecDeque;
+use std::fmt::Debug;
+use std::sync::Arc;
+
+/// This object stores the window frame state for use in incremental calculations.
+#[derive(Debug)]
+pub enum WindowFrameContext<'a> {
+ // ROWS-frames are inherently stateless:
+ Rows(&'a Arc<WindowFrame>),
+ // RANGE-frames will soon have a stateful implementation that is more efficient than a stateless one:
+ Range {
+ window_frame: &'a Arc<WindowFrame>,
+ state: WindowFrameStateRange,
+ },
+ // GROUPS-frames have a stateful implementation that is more efficient than a stateless one:
+ Groups {
+ window_frame: &'a Arc<WindowFrame>,
+ state: WindowFrameStateGroups,
+ },
+ Default,
+}
+
+impl<'a> WindowFrameContext<'a> {
+ /// Create a new default state for the given window frame.
+ pub fn new(window_frame: &'a Option<Arc<WindowFrame>>) -> Self {
+ if let Some(window_frame) = window_frame {
+ match window_frame.units {
+ WindowFrameUnits::Rows => WindowFrameContext::Rows(window_frame),
+ WindowFrameUnits::Range => WindowFrameContext::Range {
+ window_frame,
+ state: WindowFrameStateRange::default(),
+ },
+ WindowFrameUnits::Groups => WindowFrameContext::Groups {
+ window_frame,
+ state: WindowFrameStateGroups::default(),
+ },
+ }
+ } else {
+ WindowFrameContext::Default
+ }
+ }
+
+ /// This function calculates beginning/ending indices for the frame of the current row.
+ pub fn calculate_range(
+ &mut self,
+ range_columns: &[ArrayRef],
+ sort_options: &[SortOptions],
+ length: usize,
+ idx: usize,
+ ) -> Result<(usize, usize)> {
+ match *self {
+ WindowFrameContext::Rows(window_frame) => {
+ Self::calculate_range_rows(window_frame, length, idx)
+ }
+ // sort_options is used in RANGE mode calculations because the ordering and the position of the nulls
+ // have impact on the range calculations and comparison of the rows.
+ WindowFrameContext::Range {
+ window_frame,
+ ref mut state,
+ } => state.calculate_range(
+ window_frame,
+ range_columns,
+ sort_options,
+ length,
+ idx,
+ ),
+ // sort_options is not used in GROUPS mode calculations as the inequality of two rows is the indicator
+ // of a group change, and the ordering and the position of the nulls do not have impact on inequality.
+ WindowFrameContext::Groups {
+ window_frame,
+ ref mut state,
+ } => state.calculate_range(window_frame, range_columns, length, idx),
+ WindowFrameContext::Default => Ok((0, length)),
+ }
+ }
+
+ /// This function calculates beginning/ending indices for the frame of the current row.
+ fn calculate_range_rows(
+ window_frame: &Arc<WindowFrame>,
+ length: usize,
+ idx: usize,
+ ) -> Result<(usize, usize)> {
+ let start = match window_frame.start_bound {
+ // UNBOUNDED PRECEDING
+ WindowFrameBound::Preceding(ScalarValue::UInt64(None)) => 0,
+ WindowFrameBound::Preceding(ScalarValue::UInt64(Some(n))) => {
+ if idx >= n as usize {
+ idx - n as usize
+ } else {
+ 0
+ }
+ }
+ WindowFrameBound::CurrentRow => idx,
+ // UNBOUNDED FOLLOWING
+ WindowFrameBound::Following(ScalarValue::UInt64(None)) => {
+ return Err(DataFusionError::Internal(format!(
+ "Frame start cannot be UNBOUNDED FOLLOWING '{:?}'",
+ window_frame
+ )))
+ }
+ WindowFrameBound::Following(ScalarValue::UInt64(Some(n))) => {
+ min(idx + n as usize, length)
+ }
+ // ERRONEOUS FRAMES
+ WindowFrameBound::Preceding(_) | WindowFrameBound::Following(_) => {
+ return Err(DataFusionError::Internal("Rows should be Uint".to_string()))
+ }
+ };
+ let end = match window_frame.end_bound {
+ // UNBOUNDED PRECEDING
+ WindowFrameBound::Preceding(ScalarValue::UInt64(None)) => {
+ return Err(DataFusionError::Internal(format!(
+ "Frame end cannot be UNBOUNDED PRECEDING '{:?}'",
+ window_frame
+ )))
+ }
+ WindowFrameBound::Preceding(ScalarValue::UInt64(Some(n))) => {
+ if idx >= n as usize {
+ idx - n as usize + 1
+ } else {
+ 0
+ }
+ }
+ WindowFrameBound::CurrentRow => idx + 1,
+ // UNBOUNDED FOLLOWING
+ WindowFrameBound::Following(ScalarValue::UInt64(None)) => length,
+ WindowFrameBound::Following(ScalarValue::UInt64(Some(n))) => {
+ min(idx + n as usize + 1, length)
+ }
+ // ERRONEOUS FRAMES
+ WindowFrameBound::Preceding(_) | WindowFrameBound::Following(_) => {
+ return Err(DataFusionError::Internal("Rows should be Uint".to_string()))
+ }
+ };
+ Ok((start, end))
+ }
+}
+
+/// This structure encapsulates all the state information we require as we
+/// scan ranges of data while processing window frames. Currently we calculate
+/// things from scratch every time, but we will make this incremental in the future.
+#[derive(Debug, Default)]
+pub struct WindowFrameStateRange {}
+
+impl WindowFrameStateRange {
+ /// This function calculates beginning/ending indices for the frame of the current row.
+ fn calculate_range(
+ &mut self,
+ window_frame: &Arc<WindowFrame>,
+ range_columns: &[ArrayRef],
+ sort_options: &[SortOptions],
+ length: usize,
+ idx: usize,
+ ) -> Result<(usize, usize)> {
+ let start = match window_frame.start_bound {
+ WindowFrameBound::Preceding(ref n) => {
+ if n.is_null() {
+ // UNBOUNDED PRECEDING
+ 0
+ } else {
+ self.calculate_index_of_row::<true, true>(
+ range_columns,
+ sort_options,
+ idx,
+ Some(n),
+ )?
+ }
+ }
+ WindowFrameBound::CurrentRow => {
+ if range_columns.is_empty() {
+ 0
+ } else {
+ self.calculate_index_of_row::<true, true>(
+ range_columns,
+ sort_options,
+ idx,
+ None,
+ )?
+ }
+ }
+ WindowFrameBound::Following(ref n) => self
+ .calculate_index_of_row::<true, false>(
+ range_columns,
+ sort_options,
+ idx,
+ Some(n),
+ )?,
+ };
+ let end = match window_frame.end_bound {
+ WindowFrameBound::Preceding(ref n) => self
+ .calculate_index_of_row::<false, true>(
+ range_columns,
+ sort_options,
+ idx,
+ Some(n),
+ )?,
+ WindowFrameBound::CurrentRow => {
+ if range_columns.is_empty() {
+ length
+ } else {
+ self.calculate_index_of_row::<false, false>(
+ range_columns,
+ sort_options,
+ idx,
+ None,
+ )?
+ }
+ }
+ WindowFrameBound::Following(ref n) => {
+ if n.is_null() {
+ // UNBOUNDED FOLLOWING
+ length
+ } else {
+ self.calculate_index_of_row::<false, false>(
+ range_columns,
+ sort_options,
+ idx,
+ Some(n),
+ )?
+ }
+ }
+ };
+ Ok((start, end))
+ }
+
+ /// This function does the heavy lifting when finding range boundaries. It is meant to be
+ /// called twice, in succession, to get window frame start and end indices (with `BISECT_SIDE`
+ /// supplied as false and true, respectively).
+ fn calculate_index_of_row<const BISECT_SIDE: bool, const SEARCH_SIDE: bool>(
+ &mut self,
+ range_columns: &[ArrayRef],
+ sort_options: &[SortOptions],
+ idx: usize,
+ delta: Option<&ScalarValue>,
+ ) -> Result<usize> {
+ let current_row_values = range_columns
+ .iter()
+ .map(|col| ScalarValue::try_from_array(col, idx))
+ .collect::<Result<Vec<ScalarValue>>>()?;
+ let end_range = if let Some(delta) = delta {
+ let is_descending: bool = sort_options
+ .first()
+ .ok_or_else(|| DataFusionError::Internal("Array is empty".to_string()))?
+ .descending;
+
+ current_row_values
+ .iter()
+ .map(|value| {
+ if value.is_null() {
+ return Ok(value.clone());
+ }
+ if SEARCH_SIDE == is_descending {
+ // TODO: Handle positive overflows
+ value.add(delta)
+ } else if value.is_unsigned() && value < delta {
+ // NOTE: This gets a polymorphic zero without having long coercion code for ScalarValue.
+ // If we decide to implement a "default" construction mechanism for ScalarValue,
+ // change the following statement to use that.
+ value.sub(value)
+ } else {
+ // TODO: Handle negative overflows
+ value.sub(delta)
+ }
+ })
+ .collect::<Result<Vec<ScalarValue>>>()?
+ } else {
+ current_row_values
+ };
+ // `BISECT_SIDE` true means bisect_left, false means bisect_right
+ bisect::<BISECT_SIDE>(range_columns, &end_range, sort_options)
+ }
+}
+
+// In GROUPS mode, rows with duplicate sorting values are grouped together.
+// Therefore, there must be an ORDER BY clause in the window definition to use GROUPS mode.
+// The syntax is as follows:
+// GROUPS frame_start [ frame_exclusion ]
+// GROUPS BETWEEN frame_start AND frame_end [ frame_exclusion ]
+// The optional frame_exclusion specifier is not yet supported.
+// The frame_start and frame_end parameters allow us to specify which rows the window
+// frame starts and ends with. They accept the following values:
+// - UNBOUNDED PRECEDING: Start with the first row of the partition. Possible only in frame_start.
+// - offset PRECEDING: When used in frame_start, it refers to the first row of the group
+// that comes "offset" groups before the current group (i.e. the group
+// containing the current row). When used in frame_end, it refers to the
+// last row of the group that comes "offset" groups before the current group.
+// - CURRENT ROW: When used in frame_start, it refers to the first row of the group containing
+// the current row. When used in frame_end, it refers to the last row of the group
+// containing the current row.
+// - offset FOLLOWING: When used in frame_start, it refers to the first row of the group
+// that comes "offset" groups after the current group (i.e. the group
+// containing the current row). When used in frame_end, it refers to the
+// last row of the group that comes "offset" groups after the current group.
+// - UNBOUNDED FOLLOWING: End with the last row of the partition. Possible only in frame_end.
+
+// This structure encapsulates all the state information we require as we
+// scan groups of data while processing window frames.
+#[derive(Debug, Default)]
+pub struct WindowFrameStateGroups {
+ current_group_idx: u64,
+ group_start_indices: VecDeque<(Vec<ScalarValue>, usize)>,
+ previous_row_values: Option<Vec<ScalarValue>>,
+ reached_end: bool,
+ window_frame_end_idx: u64,
+ window_frame_start_idx: u64,
+}
+
+impl WindowFrameStateGroups {
+ /// This function calculates beginning/ending indices for the frame of the current row.
+ fn calculate_range(
+ &mut self,
+ window_frame: &Arc<WindowFrame>,
+ range_columns: &[ArrayRef],
+ length: usize,
+ idx: usize,
+ ) -> Result<(usize, usize)> {
+ if range_columns.is_empty() {
+ return Err(DataFusionError::Execution(
+ "GROUPS mode requires an ORDER BY clause".to_string(),
+ ));
+ }
+ let start = match window_frame.start_bound {
+ // UNBOUNDED PRECEDING
+ WindowFrameBound::Preceding(ScalarValue::UInt64(None)) => 0,
+ WindowFrameBound::Preceding(ScalarValue::UInt64(Some(n))) => self
+ .calculate_index_of_group::<true, true>(range_columns, idx, n, length)?,
+ WindowFrameBound::CurrentRow => self.calculate_index_of_group::<true, true>(
+ range_columns,
+ idx,
+ 0,
+ length,
+ )?,
+ WindowFrameBound::Following(ScalarValue::UInt64(Some(n))) => self
+ .calculate_index_of_group::<true, false>(range_columns, idx, n, length)?,
+ // UNBOUNDED FOLLOWING
+ WindowFrameBound::Following(ScalarValue::UInt64(None)) => {
+ return Err(DataFusionError::Internal(format!(
+ "Frame start cannot be UNBOUNDED FOLLOWING '{:?}'",
+ window_frame
+ )))
+ }
+ // ERRONEOUS FRAMES
+ WindowFrameBound::Preceding(_) | WindowFrameBound::Following(_) => {
+ return Err(DataFusionError::Internal(
+ "Groups should be Uint".to_string(),
+ ))
+ }
+ };
+ let end = match window_frame.end_bound {
+ // UNBOUNDED PRECEDING
+ WindowFrameBound::Preceding(ScalarValue::UInt64(None)) => {
+ return Err(DataFusionError::Internal(format!(
+ "Frame end cannot be UNBOUNDED PRECEDING '{:?}'",
+ window_frame
+ )))
+ }
+ WindowFrameBound::Preceding(ScalarValue::UInt64(Some(n))) => self
+ .calculate_index_of_group::<false, true>(range_columns, idx, n, length)?,
+ WindowFrameBound::CurrentRow => self
+ .calculate_index_of_group::<false, false>(
+ range_columns,
+ idx,
+ 0,
+ length,
+ )?,
+ WindowFrameBound::Following(ScalarValue::UInt64(Some(n))) => self
+ .calculate_index_of_group::<false, false>(
+ range_columns,
+ idx,
+ n,
+ length,
+ )?,
+ // UNBOUNDED FOLLOWING
+ WindowFrameBound::Following(ScalarValue::UInt64(None)) => length,
+ // ERRONEOUS FRAMES
+ WindowFrameBound::Preceding(_) | WindowFrameBound::Following(_) => {
+ return Err(DataFusionError::Internal(
+ "Groups should be Uint".to_string(),
+ ))
+ }
+ };
+ Ok((start, end))
+ }
+
+ /// This function does the heavy lifting when finding group boundaries. It is meant to be
+ /// called twice, in succession, to get window frame start and end indices (with `BISECT_SIDE`
+ /// supplied as false and true, respectively).
+ fn calculate_index_of_group<const BISECT_SIDE: bool, const SEARCH_SIDE: bool>(
+ &mut self,
+ range_columns: &[ArrayRef],
+ idx: usize,
+ delta: u64,
+ length: usize,
+ ) -> Result<usize> {
+ let current_row_values = range_columns
+ .iter()
+ .map(|col| ScalarValue::try_from_array(col, idx))
+ .collect::<Result<Vec<ScalarValue>>>()?;
+
+ if BISECT_SIDE {
+ // When we call this function to get the window frame start index, it tries to initialize
+ // the internal grouping state if this is not already done before. This initialization takes
+ // place only when the window frame start index is greater than or equal to zero. In this
+ // case, the current row is stored in group_start_indices, with row values as the group
+ // identifier and row index as the start index of the group.
+ if !self.initialized() {
+ self.initialize::<SEARCH_SIDE>(delta, range_columns)?;
+ }
+ } else if !self.reached_end {
+ // When we call this function to get the window frame end index, it extends the window
+ // frame one by one until the current row's window frame end index is reached by finding
+ // the next group.
+ self.extend_window_frame_if_necessary::<SEARCH_SIDE>(range_columns, delta)?;
+ }
+ // We keep track of previous row values, so that a group change can be identified.
+ // If there is a group change, the window frame is advanced and shifted by one group.
+ let group_change = match &self.previous_row_values {
+ None => false,
+ Some(values) => ¤t_row_values != values,
+ };
+ if self.previous_row_values.is_none() || group_change {
+ self.previous_row_values = Some(current_row_values);
+ }
+ if group_change {
+ self.current_group_idx += 1;
+ self.advance_one_group::<SEARCH_SIDE>(range_columns)?;
+ self.shift_one_group::<SEARCH_SIDE>(delta);
+ }
+ Ok(if self.group_start_indices.is_empty() {
+ if self.reached_end {
+ length
+ } else {
+ 0
+ }
+ } else if BISECT_SIDE {
+ match self.group_start_indices.get(0) {
+ Some(&(_, idx)) => idx,
+ None => 0,
+ }
+ } else {
+ match (self.reached_end, self.group_start_indices.back()) {
+ (false, Some(&(_, idx))) => idx,
+ _ => length,
+ }
+ })
+ }
+
+ fn extend_window_frame_if_necessary<const SEARCH_SIDE: bool>(
+ &mut self,
+ range_columns: &[ArrayRef],
+ delta: u64,
+ ) -> Result<()> {
+ let current_window_frame_end_idx = if !SEARCH_SIDE {
+ self.current_group_idx + delta + 1
+ } else if self.current_group_idx >= delta {
+ self.current_group_idx - delta + 1
+ } else {
+ 0
+ };
+ if current_window_frame_end_idx == 0 {
+ // the end index of the window frame is still before the first index
+ return Ok(());
+ }
+ if self.group_start_indices.is_empty() {
+ self.initialize_window_frame_start(range_columns)?;
+ }
+ while !self.reached_end
+ && self.window_frame_end_idx <= current_window_frame_end_idx
+ {
+ self.advance_one_group::<SEARCH_SIDE>(range_columns)?;
+ }
+ Ok(())
+ }
+
+ fn initialize<const SEARCH_SIDE: bool>(
+ &mut self,
+ delta: u64,
+ range_columns: &[ArrayRef],
+ ) -> Result<()> {
+ if !SEARCH_SIDE {
+ self.window_frame_start_idx = self.current_group_idx + delta;
+ self.initialize_window_frame_start(range_columns)
+ } else if self.current_group_idx >= delta {
+ self.window_frame_start_idx = self.current_group_idx - delta;
+ self.initialize_window_frame_start(range_columns)
+ } else {
+ Ok(())
+ }
+ }
+
+ fn initialize_window_frame_start(
+ &mut self,
+ range_columns: &[ArrayRef],
+ ) -> Result<()> {
+ let mut group_values = range_columns
+ .iter()
+ .map(|col| ScalarValue::try_from_array(col, 0))
+ .collect::<Result<Vec<ScalarValue>>>()?;
+ let mut start_idx: usize = 0;
+ for _ in 0..self.window_frame_start_idx {
+ let next_group_and_start_index =
+ WindowFrameStateGroups::find_next_group_and_start_index(
+ range_columns,
+ &group_values,
+ start_idx,
+ )?;
+ if let Some(entry) = next_group_and_start_index {
+ (group_values, start_idx) = entry;
+ } else {
+ // not enough groups to generate a window frame
+ self.window_frame_end_idx = self.window_frame_start_idx;
+ self.reached_end = true;
+ return Ok(());
+ }
+ }
+ self.group_start_indices
+ .push_back((group_values, start_idx));
+ self.window_frame_end_idx = self.window_frame_start_idx + 1;
+ Ok(())
+ }
+
+ fn initialized(&self) -> bool {
+ self.reached_end || !self.group_start_indices.is_empty()
+ }
+
+ /// This function advances the window frame by one group.
+ fn advance_one_group<const SEARCH_SIDE: bool>(
+ &mut self,
+ range_columns: &[ArrayRef],
+ ) -> Result<()> {
+ let last_group_values = self.group_start_indices.back();
+ let last_group_values = if let Some(values) = last_group_values {
+ values
+ } else {
+ return Ok(());
+ };
+ let next_group_and_start_index =
+ WindowFrameStateGroups::find_next_group_and_start_index(
+ range_columns,
+ &last_group_values.0,
+ last_group_values.1,
+ )?;
+ if let Some(entry) = next_group_and_start_index {
+ self.group_start_indices.push_back(entry);
+ self.window_frame_end_idx += 1;
+ } else {
+ // not enough groups to proceed
+ self.reached_end = true;
+ }
+ Ok(())
+ }
+
+ /// This function drops the oldest group from the window frame.
+ fn shift_one_group<const SEARCH_SIDE: bool>(&mut self, delta: u64) {
+ let current_window_frame_start_idx = if !SEARCH_SIDE {
+ self.current_group_idx + delta
+ } else if self.current_group_idx >= delta {
+ self.current_group_idx - delta
+ } else {
+ 0
+ };
+ if current_window_frame_start_idx > self.window_frame_start_idx {
+ self.group_start_indices.pop_front();
+ self.window_frame_start_idx += 1;
+ }
+ }
+
+ /// This function finds the next group and its start index for a given group and start index.
+ /// It utilizes an exponentially growing step size to find the group boundary.
+ // TODO: For small group sizes, proceeding one-by-one to find the group change can be more efficient.
+ // Statistics about previous group sizes can be used to choose one-by-one vs. exponentially growing,
+ // or even to set the base step_size when exponentially growing. We can also create a benchmark
+ // implementation to get insights about the crossover point.
+ fn find_next_group_and_start_index(
+ range_columns: &[ArrayRef],
+ current_row_values: &[ScalarValue],
+ idx: usize,
+ ) -> Result<Option<(Vec<ScalarValue>, usize)>> {
+ let mut step_size: usize = 1;
+ let data_size: usize = range_columns
+ .get(0)
+ .ok_or_else(|| {
+ DataFusionError::Internal("Column array shouldn't be empty".to_string())
+ })?
+ .len();
+ let mut low = idx;
+ let mut high = idx + step_size;
+ while high < data_size {
+ let val = range_columns
+ .iter()
+ .map(|arr| ScalarValue::try_from_array(arr, high))
+ .collect::<Result<Vec<ScalarValue>>>()?;
+ if val == current_row_values {
+ low = high;
+ step_size *= 2;
+ high += step_size;
+ } else {
+ break;
+ }
+ }
+ low = find_bisect_point(
+ range_columns,
+ current_row_values,
+ |current, to_compare| Ok(current == to_compare),
+ low,
+ min(high, data_size),
+ )?;
+ if low == data_size {
+ return Ok(None);
+ }
+ let val = range_columns
+ .iter()
+ .map(|arr| ScalarValue::try_from_array(arr, low))
+ .collect::<Result<Vec<ScalarValue>>>()?;
+ Ok(Some((val, low)))
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use arrow::array::Float64Array;
+ use datafusion_common::ScalarValue;
+ use std::sync::Arc;
+
+ use crate::from_slice::FromSlice;
+
+ use super::*;
+
+ struct TestData {
+ arrays: Vec<ArrayRef>,
+ group_indices: [usize; 6],
+ num_groups: usize,
+ num_rows: usize,
+ next_group_indices: [usize; 5],
+ }
+
+ fn test_data() -> TestData {
+ let num_groups: usize = 5;
+ let num_rows: usize = 6;
+ let group_indices = [0, 1, 2, 2, 4, 5];
+ let next_group_indices = [1, 2, 4, 4, 5];
+
+ let arrays: Vec<ArrayRef> = vec![
+ Arc::new(Float64Array::from_slice([5.0, 7.0, 8.0, 8., 9., 10.])),
+ Arc::new(Float64Array::from_slice([2.0, 3.0, 3.0, 3., 4.0, 5.0])),
+ Arc::new(Float64Array::from_slice([5.0, 7.0, 8.0, 8., 10., 11.0])),
+ Arc::new(Float64Array::from_slice([15.0, 13.0, 8.0, 8., 5., 0.0])),
+ ];
+ TestData {
+ arrays,
+ group_indices,
+ num_groups,
+ num_rows,
+ next_group_indices,
+ }
+ }
+
+ #[test]
+ fn test_find_next_group_and_start_index() {
+ let test_data = test_data();
+ for (current_idx, next_idx) in test_data.next_group_indices.iter().enumerate() {
+ let current_row_values = test_data
+ .arrays
+ .iter()
+ .map(|col| ScalarValue::try_from_array(col, current_idx))
+ .collect::<Result<Vec<ScalarValue>>>()
+ .unwrap();
+ let next_row_values = test_data
+ .arrays
+ .iter()
+ .map(|col| ScalarValue::try_from_array(col, *next_idx))
+ .collect::<Result<Vec<ScalarValue>>>()
+ .unwrap();
+ let res = WindowFrameStateGroups::find_next_group_and_start_index(
+ &test_data.arrays,
+ ¤t_row_values,
+ current_idx,
+ )
+ .unwrap();
+ assert_eq!(res, Some((next_row_values, *next_idx)));
+ }
+ let current_idx = test_data.num_rows - 1;
+ let current_row_values = test_data
+ .arrays
+ .iter()
+ .map(|col| ScalarValue::try_from_array(col, current_idx))
+ .collect::<Result<Vec<ScalarValue>>>()
+ .unwrap();
+ let res = WindowFrameStateGroups::find_next_group_and_start_index(
+ &test_data.arrays,
+ ¤t_row_values,
+ current_idx,
+ )
+ .unwrap();
+ assert_eq!(res, None);
+ }
+
+ #[test]
+ fn test_window_frame_groups_preceding_delta_greater_than_partition_size() {
+ const START: bool = true;
+ const END: bool = false;
+ const PRECEDING: bool = true;
+ const DELTA: u64 = 10;
+
+ let test_data = test_data();
+ let mut window_frame_groups = WindowFrameStateGroups::default();
+ window_frame_groups
+ .initialize::<PRECEDING>(DELTA, &test_data.arrays)
+ .unwrap();
+ assert_eq!(window_frame_groups.window_frame_start_idx, 0);
+ assert_eq!(window_frame_groups.window_frame_end_idx, 0);
+ assert!(!window_frame_groups.reached_end);
+ assert_eq!(window_frame_groups.group_start_indices.len(), 0);
+
+ window_frame_groups
+ .extend_window_frame_if_necessary::<PRECEDING>(&test_data.arrays, DELTA)
+ .unwrap();
+ assert_eq!(window_frame_groups.window_frame_start_idx, 0);
+ assert_eq!(window_frame_groups.window_frame_end_idx, 0);
+ assert!(!window_frame_groups.reached_end);
+ assert_eq!(window_frame_groups.group_start_indices.len(), 0);
+
+ for idx in 0..test_data.num_rows {
+ let start = window_frame_groups
+ .calculate_index_of_group::<START, PRECEDING>(
+ &test_data.arrays,
+ idx,
+ DELTA,
+ test_data.num_rows,
+ )
+ .unwrap();
+ assert_eq!(start, 0);
+ let end = window_frame_groups
+ .calculate_index_of_group::<END, PRECEDING>(
+ &test_data.arrays,
+ idx,
+ DELTA,
+ test_data.num_rows,
+ )
+ .unwrap();
+ assert_eq!(end, 0);
+ }
+ }
+
+ #[test]
+ fn test_window_frame_groups_following_delta_greater_than_partition_size() {
+ const START: bool = true;
+ const END: bool = false;
+ const FOLLOWING: bool = false;
+ const DELTA: u64 = 10;
+
+ let test_data = test_data();
+ let mut window_frame_groups = WindowFrameStateGroups::default();
+ window_frame_groups
+ .initialize::<FOLLOWING>(DELTA, &test_data.arrays)
+ .unwrap();
+ assert_eq!(window_frame_groups.window_frame_start_idx, DELTA);
+ assert_eq!(window_frame_groups.window_frame_end_idx, DELTA);
+ assert!(window_frame_groups.reached_end);
+ assert_eq!(window_frame_groups.group_start_indices.len(), 0);
+
+ window_frame_groups
+ .extend_window_frame_if_necessary::<FOLLOWING>(&test_data.arrays, DELTA)
+ .unwrap();
+ assert_eq!(window_frame_groups.window_frame_start_idx, DELTA);
+ assert_eq!(window_frame_groups.window_frame_end_idx, DELTA);
+ assert!(window_frame_groups.reached_end);
+ assert_eq!(window_frame_groups.group_start_indices.len(), 0);
+
+ for idx in 0..test_data.num_rows {
+ let start = window_frame_groups
+ .calculate_index_of_group::<START, FOLLOWING>(
+ &test_data.arrays,
+ idx,
+ DELTA,
+ test_data.num_rows,
+ )
+ .unwrap();
+ assert_eq!(start, test_data.num_rows);
+ let end = window_frame_groups
+ .calculate_index_of_group::<END, FOLLOWING>(
+ &test_data.arrays,
+ idx,
+ DELTA,
+ test_data.num_rows,
+ )
+ .unwrap();
+ assert_eq!(end, test_data.num_rows);
+ }
+ }
+
+ #[test]
+ fn test_window_frame_groups_preceding_and_following_delta_greater_than_partition_size(
+ ) {
+ const START: bool = true;
+ const END: bool = false;
+ const FOLLOWING: bool = false;
+ const PRECEDING: bool = true;
+ const DELTA: u64 = 10;
+
+ let test_data = test_data();
+ let mut window_frame_groups = WindowFrameStateGroups::default();
+ window_frame_groups
+ .initialize::<PRECEDING>(DELTA, &test_data.arrays)
+ .unwrap();
+ assert_eq!(window_frame_groups.window_frame_start_idx, 0);
+ assert_eq!(window_frame_groups.window_frame_end_idx, 0);
+ assert!(!window_frame_groups.reached_end);
+ assert_eq!(window_frame_groups.group_start_indices.len(), 0);
+
+ window_frame_groups
+ .extend_window_frame_if_necessary::<FOLLOWING>(&test_data.arrays, DELTA)
+ .unwrap();
+ assert_eq!(window_frame_groups.window_frame_start_idx, 0);
+ assert_eq!(
+ window_frame_groups.window_frame_end_idx,
+ test_data.num_groups as u64
+ );
+ assert!(window_frame_groups.reached_end);
+ assert_eq!(
+ window_frame_groups.group_start_indices.len(),
+ test_data.num_groups
+ );
+
+ for idx in 0..test_data.num_rows {
+ let start = window_frame_groups
+ .calculate_index_of_group::<START, PRECEDING>(
+ &test_data.arrays,
+ idx,
+ DELTA,
+ test_data.num_rows,
+ )
+ .unwrap();
+ assert_eq!(start, 0);
+ let end = window_frame_groups
+ .calculate_index_of_group::<END, FOLLOWING>(
+ &test_data.arrays,
+ idx,
+ DELTA,
+ test_data.num_rows,
+ )
+ .unwrap();
+ assert_eq!(end, test_data.num_rows);
+ }
+ }
+
+ #[test]
+ fn test_window_frame_groups_preceding_and_following_1() {
+ const START: bool = true;
+ const END: bool = false;
+ const FOLLOWING: bool = false;
+ const PRECEDING: bool = true;
+ const DELTA: u64 = 1;
+
+ let test_data = test_data();
+ let mut window_frame_groups = WindowFrameStateGroups::default();
+ window_frame_groups
+ .initialize::<PRECEDING>(DELTA, &test_data.arrays)
+ .unwrap();
+ assert_eq!(window_frame_groups.window_frame_start_idx, 0);
+ assert_eq!(window_frame_groups.window_frame_end_idx, 0);
+ assert!(!window_frame_groups.reached_end);
+ assert_eq!(window_frame_groups.group_start_indices.len(), 0);
+
+ window_frame_groups
+ .extend_window_frame_if_necessary::<FOLLOWING>(&test_data.arrays, DELTA)
+ .unwrap();
+ assert_eq!(window_frame_groups.window_frame_start_idx, 0);
+ assert_eq!(window_frame_groups.window_frame_end_idx, 2 * DELTA + 1);
+ assert!(!window_frame_groups.reached_end);
+ assert_eq!(
+ window_frame_groups.group_start_indices.len(),
+ 2 * DELTA as usize + 1
+ );
+
+ for idx in 0..test_data.num_rows {
+ let start_idx = if idx < DELTA as usize {
+ 0
+ } else {
+ test_data.group_indices[idx] - DELTA as usize
+ };
+ let start = window_frame_groups
+ .calculate_index_of_group::<START, PRECEDING>(
+ &test_data.arrays,
+ idx,
+ DELTA,
+ test_data.num_rows,
+ )
+ .unwrap();
+ assert_eq!(start, test_data.group_indices[start_idx]);
+ let mut end_idx = if idx >= test_data.num_groups {
+ test_data.num_rows
+ } else {
+ test_data.next_group_indices[idx]
+ };
+ for _ in 0..DELTA {
+ end_idx = if end_idx >= test_data.num_groups {
+ test_data.num_rows
+ } else {
+ test_data.next_group_indices[end_idx]
+ };
+ }
+ let end = window_frame_groups
+ .calculate_index_of_group::<END, FOLLOWING>(
+ &test_data.arrays,
+ idx,
+ DELTA,
+ test_data.num_rows,
+ )
+ .unwrap();
+ assert_eq!(end, end_idx);
+ }
+ }
+
+ #[test]
+ fn test_window_frame_groups_preceding_1_and_unbounded_following() {
+ const START: bool = true;
+ const PRECEDING: bool = true;
+ const DELTA: u64 = 1;
+
+ let test_data = test_data();
+ let mut window_frame_groups = WindowFrameStateGroups::default();
+ window_frame_groups
+ .initialize::<PRECEDING>(DELTA, &test_data.arrays)
+ .unwrap();
+ assert_eq!(window_frame_groups.window_frame_start_idx, 0);
+ assert_eq!(window_frame_groups.window_frame_end_idx, 0);
+ assert!(!window_frame_groups.reached_end);
+ assert_eq!(window_frame_groups.group_start_indices.len(), 0);
+
+ for idx in 0..test_data.num_rows {
+ let start_idx = if idx < DELTA as usize {
+ 0
+ } else {
+ test_data.group_indices[idx] - DELTA as usize
+ };
+ let start = window_frame_groups
+ .calculate_index_of_group::<START, PRECEDING>(
+ &test_data.arrays,
+ idx,
+ DELTA,
+ test_data.num_rows,
+ )
+ .unwrap();
+ assert_eq!(start, test_data.group_indices[start_idx]);
+ }
+ }
+
+ #[test]
+ fn test_window_frame_groups_current_and_unbounded_following() {
+ const START: bool = true;
+ const PRECEDING: bool = true;
+ const DELTA: u64 = 0;
+
+ let test_data = test_data();
+ let mut window_frame_groups = WindowFrameStateGroups::default();
+ window_frame_groups
+ .initialize::<PRECEDING>(DELTA, &test_data.arrays)
+ .unwrap();
+ assert_eq!(window_frame_groups.window_frame_start_idx, 0);
+ assert_eq!(window_frame_groups.window_frame_end_idx, 1);
+ assert!(!window_frame_groups.reached_end);
+ assert_eq!(window_frame_groups.group_start_indices.len(), 1);
+
+ for idx in 0..test_data.num_rows {
+ let start = window_frame_groups
+ .calculate_index_of_group::<START, PRECEDING>(
+ &test_data.arrays,
+ idx,
+ DELTA,
+ test_data.num_rows,
+ )
+ .unwrap();
+ assert_eq!(start, test_data.group_indices[idx]);
+ }
+ }
+}
diff --git a/integration-tests/sqls/simple_window_groups.sql b/integration-tests/sqls/simple_window_groups.sql
new file mode 100644
index 000000000..dd369c28c
--- /dev/null
+++ b/integration-tests/sqls/simple_window_groups.sql
@@ -0,0 +1,71 @@
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements. See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership. The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License. You may obtain a copy of the License at
+
+-- http://www.apache.org/licenses/LICENSE-2.0
+
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+SELECT
+ SUM(c4) OVER(ORDER BY c3 DESC GROUPS BETWEEN 3 PRECEDING AND 1 FOLLOWING) as summation1,
+ SUM(c4) OVER(ORDER BY c3 DESC GROUPS BETWEEN 3 PRECEDING AND 2 PRECEDING) as summation2,
+ SUM(c5) OVER(ORDER BY c4 DESC GROUPS BETWEEN 2 PRECEDING AND 2 PRECEDING) as summation3,
+ SUM(c4) OVER(ORDER BY c3 DESC GROUPS BETWEEN 1 FOLLOWING AND 3 FOLLOWING) as summation4,
+ SUM(c3) OVER(ORDER BY c4 DESC GROUPS BETWEEN 1 FOLLOWING AND 1 FOLLOWING) as summation5,
+ SUM(c5) OVER(ORDER BY c3 DESC GROUPS 2 PRECEDING) as summation6,
+ SUM(c5) OVER(ORDER BY c3 DESC GROUPS BETWEEN CURRENT ROW AND 3 FOLLOWING) as summation7,
+ SUM(c5) OVER(ORDER BY c4 DESC GROUPS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) as summation8,
+ SUM(c4) OVER(ORDER BY c4 DESC GROUPS UNBOUNDED PRECEDING) as summation9,
+ SUM(c5) OVER(ORDER BY c4 DESC GROUPS CURRENT ROW) as summation10,
+ SUM(c5) OVER(ORDER BY c4 DESC GROUPS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) as summation11,
+ SUM(c5) OVER(ORDER BY c4 DESC GROUPS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING) as summation12,
+ SUM(c5) OVER(ORDER BY c4 DESC GROUPS BETWEEN 1 FOLLOWING AND UNBOUNDED FOLLOWING) as summation13,
+ SUM(c4) OVER(PARTITION BY c4 ORDER BY c3 GROUPS BETWEEN 3 PRECEDING AND 1 FOLLOWING) as summation21,
+ SUM(c4) OVER(PARTITION BY c4 ORDER BY c3 GROUPS BETWEEN 3 PRECEDING AND 2 PRECEDING) as summation22,
+ SUM(c5) OVER(PARTITION BY c4 ORDER BY c4 GROUPS BETWEEN 2 PRECEDING AND 2 PRECEDING) as summation23,
+ SUM(c4) OVER(PARTITION BY c4 ORDER BY c3 GROUPS BETWEEN 1 FOLLOWING AND 3 FOLLOWING) as summation24,
+ SUM(c3) OVER(PARTITION BY c4 ORDER BY c4 GROUPS BETWEEN 1 FOLLOWING AND 1 FOLLOWING) as summation25,
+ SUM(c5) OVER(PARTITION BY c4 ORDER BY c3 GROUPS 2 PRECEDING) as summation26,
+ SUM(c5) OVER(PARTITION BY c4 ORDER BY c3 GROUPS BETWEEN CURRENT ROW AND 3 FOLLOWING) as summation27,
+ SUM(c5) OVER(PARTITION BY c4 ORDER BY c4 GROUPS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) as summation28,
+ SUM(c4) OVER(PARTITION BY c4 ORDER BY c4 GROUPS UNBOUNDED PRECEDING) as summation29,
+ SUM(c5) OVER(PARTITION BY c4 ORDER BY c4 GROUPS CURRENT ROW) as summation30,
+ SUM(c5) OVER(PARTITION BY c4 ORDER BY c4 GROUPS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) as summation31,
+ SUM(c5) OVER(PARTITION BY c4 ORDER BY c4 GROUPS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING) as summation32,
+ SUM(c5) OVER(PARTITION BY c4 ORDER BY c4 GROUPS BETWEEN 1 FOLLOWING AND UNBOUNDED FOLLOWING) as summation33,
+ SUM(c4) OVER(ORDER BY c5, c3 GROUPS BETWEEN 3 PRECEDING AND 1 FOLLOWING) as summation41,
+ SUM(c4) OVER(ORDER BY c5, c3 GROUPS BETWEEN 3 PRECEDING AND 2 PRECEDING) as summation42,
+ SUM(c5) OVER(ORDER BY c5, c4 GROUPS BETWEEN 2 PRECEDING AND 2 PRECEDING) as summation43,
+ SUM(c4) OVER(ORDER BY c5, c3 GROUPS BETWEEN 1 FOLLOWING AND 3 FOLLOWING) as summation44,
+ SUM(c3) OVER(ORDER BY c5, c4 GROUPS BETWEEN 1 FOLLOWING AND 1 FOLLOWING) as summation45,
+ SUM(c5) OVER(ORDER BY c5, c3 GROUPS 2 PRECEDING) as summation46,
+ SUM(c5) OVER(ORDER BY c5, c3 GROUPS BETWEEN CURRENT ROW AND 3 FOLLOWING) as summation47,
+ SUM(c5) OVER(ORDER BY c5, c4 GROUPS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) as summation48,
+ SUM(c4) OVER(ORDER BY c5, c4 GROUPS UNBOUNDED PRECEDING) as summation49,
+ SUM(c5) OVER(ORDER BY c5, c4 GROUPS CURRENT ROW) as summation50,
+ SUM(c5) OVER(ORDER BY c5, c4 GROUPS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) as summation51,
+ SUM(c5) OVER(ORDER BY c5, c4 GROUPS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING) as summation52,
+ SUM(c5) OVER(ORDER BY c5, c4 GROUPS BETWEEN 1 FOLLOWING AND UNBOUNDED FOLLOWING) as summation53,
+ SUM(c4) OVER(ORDER BY c3 GROUPS BETWEEN 3 PRECEDING AND 1 FOLLOWING) as summation61,
+ SUM(c4) OVER(ORDER BY c3 GROUPS BETWEEN 3 PRECEDING AND 2 PRECEDING) as summation62,
+ SUM(c5) OVER(ORDER BY c4 GROUPS BETWEEN 2 PRECEDING AND 2 PRECEDING) as summation63,
+ SUM(c4) OVER(ORDER BY c3 GROUPS BETWEEN 1 FOLLOWING AND 3 FOLLOWING) as summation64,
+ SUM(c3) OVER(ORDER BY c4 GROUPS BETWEEN 1 FOLLOWING AND 1 FOLLOWING) as summation65,
+ SUM(c5) OVER(ORDER BY c3 GROUPS 2 PRECEDING) as summation66,
+ SUM(c5) OVER(ORDER BY c3 GROUPS BETWEEN CURRENT ROW AND 3 FOLLOWING) as summation67,
+ SUM(c5) OVER(ORDER BY c4 GROUPS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) as summation68,
+ SUM(c4) OVER(ORDER BY c4 GROUPS UNBOUNDED PRECEDING) as summation69,
+ SUM(c5) OVER(ORDER BY c4 GROUPS CURRENT ROW) as summation70,
+ SUM(c5) OVER(ORDER BY c4 GROUPS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) as summation71,
+ SUM(c5) OVER(ORDER BY c4 GROUPS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING) as summation72,
+ SUM(c5) OVER(ORDER BY c4 GROUPS BETWEEN 1 FOLLOWING AND UNBOUNDED FOLLOWING) as summation73
+FROM test
+ORDER BY c9;
diff --git a/integration-tests/test_psql_parity.py b/integration-tests/test_psql_parity.py
index be197eb61..e4c2bb17e 100644
--- a/integration-tests/test_psql_parity.py
+++ b/integration-tests/test_psql_parity.py
@@ -82,11 +82,10 @@ test_files = set(root.glob("*.sql"))
class TestPsqlParity:
def test_tests_count(self):
- assert len(test_files) == 25, "tests are missed"
+ assert len(test_files) == 26, "tests are missed"
@pytest.mark.parametrize("fname", test_files, ids=str)
def test_sql_file(self, fname):
datafusion_output = pd.read_csv(io.BytesIO(generate_csv_from_datafusion(fname)))
psql_output = pd.read_csv(io.BytesIO(generate_csv_from_psql(fname)))
np.testing.assert_allclose(datafusion_output, psql_output, equal_nan=True, verbose=True)
-