You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/06/14 20:05:46 UTC
[arrow-datafusion] branch main updated: Combine evaluate_stateful and evaluate_inside_range (#6665)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 4184a7f72c Combine evaluate_stateful and evaluate_inside_range (#6665)
4184a7f72c is described below
commit 4184a7f72c7b6172f9f0ff2ce1d466ac0f42e0dd
Author: Mustafa Akur <10...@users.noreply.github.com>
AuthorDate: Wed Jun 14 23:05:39 2023 +0300
Combine evaluate_stateful and evaluate_inside_range (#6665)
* Combine evaluate_stateful and evaluate_inside_range
* Move flags to partition_evaluator trait
* Update comments
* Update PartitionEvaluator comment
* move include_rank to partition_evaluator
* Default implement get_range when window frame is not used.
---
datafusion/physical-expr/src/window/built_in.rs | 29 +++---
.../src/window/built_in_window_function_expr.rs | 25 -----
datafusion/physical-expr/src/window/cume_dist.rs | 12 +--
datafusion/physical-expr/src/window/lead_lag.rs | 22 +++--
datafusion/physical-expr/src/window/nth_value.rs | 70 +++++++-------
datafusion/physical-expr/src/window/ntile.rs | 6 +-
.../src/window/partition_evaluator.rs | 104 ++++++++++++++-------
datafusion/physical-expr/src/window/rank.rs | 34 ++++---
datafusion/physical-expr/src/window/row_number.rs | 30 +++---
9 files changed, 179 insertions(+), 153 deletions(-)
diff --git a/datafusion/physical-expr/src/window/built_in.rs b/datafusion/physical-expr/src/window/built_in.rs
index 02cd2d163a..a03267c035 100644
--- a/datafusion/physical-expr/src/window/built_in.rs
+++ b/datafusion/physical-expr/src/window/built_in.rs
@@ -95,9 +95,9 @@ impl WindowExpr for BuiltInWindowExpr {
}
fn evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef> {
- let evaluator = self.expr.create_evaluator()?;
+ let mut evaluator = self.expr.create_evaluator()?;
let num_rows = batch.num_rows();
- if self.expr.uses_window_frame() {
+ if evaluator.uses_window_frame() {
let sort_options: Vec<SortOptions> =
self.order_by.iter().map(|o| o.options).collect();
let mut row_wise_results = vec![];
@@ -114,18 +114,18 @@ impl WindowExpr for BuiltInWindowExpr {
num_rows,
idx,
)?;
- let value = evaluator.evaluate_inside_range(&values, &range)?;
+ let value = evaluator.evaluate(&values, &range)?;
row_wise_results.push(value);
last_range = range;
}
ScalarValue::iter_to_array(row_wise_results.into_iter())
- } else if self.expr.include_rank() {
+ } else if evaluator.include_rank() {
let columns = self.sort_columns(batch)?;
let sort_partition_points = evaluate_partition_ranges(num_rows, &columns)?;
- evaluator.evaluate_with_rank(num_rows, &sort_partition_points)
+ evaluator.evaluate_with_rank_all(num_rows, &sort_partition_points)
} else {
let (values, _) = self.get_values_orderbys(batch)?;
- evaluator.evaluate(&values, num_rows)
+ evaluator.evaluate_all(&values, num_rows)
}
}
@@ -164,7 +164,7 @@ impl WindowExpr for BuiltInWindowExpr {
// We iterate on each row to perform a running calculation.
let record_batch = &partition_batch_state.record_batch;
let num_rows = record_batch.num_rows();
- let sort_partition_points = if self.expr.include_rank() {
+ let sort_partition_points = if evaluator.include_rank() {
let columns = self.sort_columns(record_batch)?;
evaluate_partition_ranges(num_rows, &columns)?
} else {
@@ -172,7 +172,7 @@ impl WindowExpr for BuiltInWindowExpr {
};
let mut row_wise_results: Vec<ScalarValue> = vec![];
for idx in state.last_calculated_index..num_rows {
- let frame_range = if self.expr.uses_window_frame() {
+ let frame_range = if evaluator.uses_window_frame() {
state
.window_frame_ctx
.get_or_insert_with(|| {
@@ -199,7 +199,8 @@ impl WindowExpr for BuiltInWindowExpr {
// Update last range
state.window_frame_range = frame_range;
evaluator.update_state(state, idx, &order_bys, &sort_partition_points)?;
- row_wise_results.push(evaluator.evaluate_stateful(&values)?);
+ row_wise_results
+ .push(evaluator.evaluate(&values, &state.window_frame_range)?);
}
let out_col = if row_wise_results.is_empty() {
new_empty_array(out_type)
@@ -231,8 +232,12 @@ impl WindowExpr for BuiltInWindowExpr {
}
fn uses_bounded_memory(&self) -> bool {
- self.expr.supports_bounded_execution()
- && (!self.expr.uses_window_frame()
- || !self.window_frame.end_bound.is_unbounded())
+ if let Ok(evaluator) = self.expr.create_evaluator() {
+ evaluator.supports_bounded_execution()
+ && (!evaluator.uses_window_frame()
+ || !self.window_frame.end_bound.is_unbounded())
+ } else {
+ false
+ }
}
}
diff --git a/datafusion/physical-expr/src/window/built_in_window_function_expr.rs b/datafusion/physical-expr/src/window/built_in_window_function_expr.rs
index 763bcfc2b1..432bf78368 100644
--- a/datafusion/physical-expr/src/window/built_in_window_function_expr.rs
+++ b/datafusion/physical-expr/src/window/built_in_window_function_expr.rs
@@ -85,29 +85,4 @@ pub trait BuiltInWindowFunctionExpr: Send + Sync + std::fmt::Debug {
///
/// The default implementation does nothing
fn add_equal_orderings(&self, _builder: &mut OrderingEquivalenceBuilder) {}
-
- /// Can the window function be incrementally computed using
- /// bounded memory?
- ///
- /// If this function returns true, [`Self::create_evaluator`] must
- /// implement [`PartitionEvaluator::evaluate_stateful`]
- fn supports_bounded_execution(&self) -> bool {
- false
- }
-
- /// Does the window function use the values from its window frame?
- ///
- /// If this function returns true, [`Self::create_evaluator`] must
- /// implement [`PartitionEvaluator::evaluate_inside_range`]
- fn uses_window_frame(&self) -> bool {
- false
- }
-
- /// Can this function be evaluated with (only) rank
- ///
- /// If `include_rank` is true, then [`Self::create_evaluator`] must
- /// implement [`PartitionEvaluator::evaluate_with_rank`]
- fn include_rank(&self) -> bool {
- false
- }
}
diff --git a/datafusion/physical-expr/src/window/cume_dist.rs b/datafusion/physical-expr/src/window/cume_dist.rs
index 2214555dbe..47f2e4208d 100644
--- a/datafusion/physical-expr/src/window/cume_dist.rs
+++ b/datafusion/physical-expr/src/window/cume_dist.rs
@@ -64,17 +64,13 @@ impl BuiltInWindowFunctionExpr for CumeDist {
fn create_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
Ok(Box::new(CumeDistEvaluator {}))
}
-
- fn include_rank(&self) -> bool {
- true
- }
}
#[derive(Debug)]
pub(crate) struct CumeDistEvaluator;
impl PartitionEvaluator for CumeDistEvaluator {
- fn evaluate_with_rank(
+ fn evaluate_with_rank_all(
&self,
num_rows: usize,
ranks_in_partition: &[Range<usize>],
@@ -94,6 +90,10 @@ impl PartitionEvaluator for CumeDistEvaluator {
);
Ok(Arc::new(result))
}
+
+ fn include_rank(&self) -> bool {
+ true
+ }
}
#[cfg(test)]
@@ -109,7 +109,7 @@ mod tests {
) -> Result<()> {
let result = expr
.create_evaluator()?
- .evaluate_with_rank(num_rows, &ranks)?;
+ .evaluate_with_rank_all(num_rows, &ranks)?;
let result = as_float64_array(&result)?;
let result = result.values();
assert_eq!(expected, *result);
diff --git a/datafusion/physical-expr/src/window/lead_lag.rs b/datafusion/physical-expr/src/window/lead_lag.rs
index bae5098edf..24248f989e 100644
--- a/datafusion/physical-expr/src/window/lead_lag.rs
+++ b/datafusion/physical-expr/src/window/lead_lag.rs
@@ -110,10 +110,6 @@ impl BuiltInWindowFunctionExpr for WindowShift {
}))
}
- fn supports_bounded_execution(&self) -> bool {
- true
- }
-
fn reverse_expr(&self) -> Option<Arc<dyn BuiltInWindowFunctionExpr>> {
Some(Arc::new(Self {
name: self.name.clone(),
@@ -206,7 +202,11 @@ impl PartitionEvaluator for WindowShiftEvaluator {
}
}
- fn evaluate_stateful(&mut self, values: &[ArrayRef]) -> Result<ScalarValue> {
+ fn evaluate(
+ &mut self,
+ values: &[ArrayRef],
+ _range: &Range<usize>,
+ ) -> Result<ScalarValue> {
let array = &values[0];
let dtype = array.data_type();
let idx = self.state.idx as i64 - self.shift_offset;
@@ -217,11 +217,19 @@ impl PartitionEvaluator for WindowShiftEvaluator {
}
}
- fn evaluate(&self, values: &[ArrayRef], _num_rows: usize) -> Result<ArrayRef> {
+ fn evaluate_all(
+ &mut self,
+ values: &[ArrayRef],
+ _num_rows: usize,
+ ) -> Result<ArrayRef> {
// LEAD, LAG window functions take single column, values will have size 1
let value = &values[0];
shift_with_default_value(value, self.shift_offset, self.default_value.as_ref())
}
+
+ fn supports_bounded_execution(&self) -> bool {
+ true
+ }
}
fn get_default_value(
@@ -258,7 +266,7 @@ mod tests {
let values = expr.evaluate_args(&batch)?;
let result = expr
.create_evaluator()?
- .evaluate(&values, batch.num_rows())?;
+ .evaluate_all(&values, batch.num_rows())?;
let result = as_int32_array(&result)?;
assert_eq!(expected, *result);
Ok(())
diff --git a/datafusion/physical-expr/src/window/nth_value.rs b/datafusion/physical-expr/src/window/nth_value.rs
index aa5fe77df0..e6dbeba834 100644
--- a/datafusion/physical-expr/src/window/nth_value.rs
+++ b/datafusion/physical-expr/src/window/nth_value.rs
@@ -122,14 +122,6 @@ impl BuiltInWindowFunctionExpr for NthValue {
Ok(Box::new(NthValueEvaluator { state }))
}
- fn supports_bounded_execution(&self) -> bool {
- true
- }
-
- fn uses_window_frame(&self) -> bool {
- true
- }
-
fn reverse_expr(&self) -> Option<Arc<dyn BuiltInWindowFunctionExpr>> {
let reversed_kind = match self.kind {
NthValueKind::First => NthValueKind::Last,
@@ -197,40 +189,44 @@ impl PartitionEvaluator for NthValueEvaluator {
Ok(())
}
- fn evaluate_stateful(&mut self, values: &[ArrayRef]) -> Result<ScalarValue> {
- if let Some(ref result) = self.state.finalized_result {
- Ok(result.clone())
- } else {
- self.evaluate_inside_range(values, &self.state.range)
- }
- }
-
- fn evaluate_inside_range(
- &self,
+ fn evaluate(
+ &mut self,
values: &[ArrayRef],
range: &Range<usize>,
) -> Result<ScalarValue> {
- // FIRST_VALUE, LAST_VALUE, NTH_VALUE window functions take a single column, values will have size 1.
- let arr = &values[0];
- let n_range = range.end - range.start;
- if n_range == 0 {
- // We produce None if the window is empty.
- return ScalarValue::try_from(arr.data_type());
- }
- match self.state.kind {
- NthValueKind::First => ScalarValue::try_from_array(arr, range.start),
- NthValueKind::Last => ScalarValue::try_from_array(arr, range.end - 1),
- NthValueKind::Nth(n) => {
- // We are certain that n > 0.
- let index = (n as usize) - 1;
- if index >= n_range {
- ScalarValue::try_from(arr.data_type())
- } else {
- ScalarValue::try_from_array(arr, range.start + index)
+ if let Some(ref result) = self.state.finalized_result {
+ Ok(result.clone())
+ } else {
+ // FIRST_VALUE, LAST_VALUE, NTH_VALUE window functions take a single column, values will have size 1.
+ let arr = &values[0];
+ let n_range = range.end - range.start;
+ if n_range == 0 {
+ // We produce None if the window is empty.
+ return ScalarValue::try_from(arr.data_type());
+ }
+ match self.state.kind {
+ NthValueKind::First => ScalarValue::try_from_array(arr, range.start),
+ NthValueKind::Last => ScalarValue::try_from_array(arr, range.end - 1),
+ NthValueKind::Nth(n) => {
+ // We are certain that n > 0.
+ let index = (n as usize) - 1;
+ if index >= n_range {
+ ScalarValue::try_from(arr.data_type())
+ } else {
+ ScalarValue::try_from_array(arr, range.start + index)
+ }
}
}
}
}
+
+ fn supports_bounded_execution(&self) -> bool {
+ true
+ }
+
+ fn uses_window_frame(&self) -> bool {
+ true
+ }
}
#[cfg(test)]
@@ -254,11 +250,11 @@ mod tests {
end: i + 1,
})
}
- let evaluator = expr.create_evaluator()?;
+ let mut evaluator = expr.create_evaluator()?;
let values = expr.evaluate_args(&batch)?;
let result = ranges
.iter()
- .map(|range| evaluator.evaluate_inside_range(&values, range))
+ .map(|range| evaluator.evaluate(&values, range))
.collect::<Result<Vec<ScalarValue>>>()?;
let result = ScalarValue::iter_to_array(result.into_iter())?;
let result = as_int32_array(&result)?;
diff --git a/datafusion/physical-expr/src/window/ntile.rs b/datafusion/physical-expr/src/window/ntile.rs
index 479fa26333..2feab9956a 100644
--- a/datafusion/physical-expr/src/window/ntile.rs
+++ b/datafusion/physical-expr/src/window/ntile.rs
@@ -70,7 +70,11 @@ pub(crate) struct NtileEvaluator {
}
impl PartitionEvaluator for NtileEvaluator {
- fn evaluate(&self, _values: &[ArrayRef], num_rows: usize) -> Result<ArrayRef> {
+ fn evaluate_all(
+ &mut self,
+ _values: &[ArrayRef],
+ num_rows: usize,
+ ) -> Result<ArrayRef> {
let num_rows = num_rows as u64;
let mut vec: Vec<u64> = Vec::new();
for i in 0..num_rows {
diff --git a/datafusion/physical-expr/src/window/partition_evaluator.rs b/datafusion/physical-expr/src/window/partition_evaluator.rs
index 9e665c5677..553f631790 100644
--- a/datafusion/physical-expr/src/window/partition_evaluator.rs
+++ b/datafusion/physical-expr/src/window/partition_evaluator.rs
@@ -69,18 +69,17 @@ use std::ops::Range;
///
/// # Stateless `PartitionEvaluator`
///
-/// In this case, [`Self::evaluate`], [`Self::evaluate_with_rank`] or
-/// [`Self::evaluate_inside_range`] is called with values for the
+/// In this case, either [`Self::evaluate_all`] or [`Self::evaluate_with_rank_all`] is called with values for the
/// entire partition.
///
/// # Stateful `PartitionEvaluator`
///
-/// In this case, [`Self::evaluate_stateful`] is called to calculate
+/// In this case, [`Self::evaluate`] is called to calculate
/// the results of the window function incrementally for each new
/// batch.
///
/// For example, when computing `ROW_NUMBER` incrementally,
-/// [`Self::evaluate_stateful`] will be called multiple times with
+/// [`Self::evaluate`] will be called multiple times with
/// different batches. For all batches after the first, the output
/// `row_number` must start from last `row_number` produced for the
/// previous batch. The previous row number is saved and restored as
@@ -88,6 +87,15 @@ use std::ops::Range;
///
/// [`BuiltInWindowFunctionExpr`]: crate::window::BuiltInWindowFunctionExpr
/// [`BuiltInWindowFunctionExpr::create_evaluator`]: crate::window::BuiltInWindowFunctionExpr::create_evaluator
+/// When implementing a new `PartitionEvaluator`,
+/// `uses_window_frame` and `supports_bounded_execution` flags determine which evaluation method will be called
+/// during runtime. Implement corresponding evaluator according to table below.
+/// |uses_window_frame|supports_bounded_execution|function_to_implement|
+/// |---|---|----|
+/// |false|false|`evaluate_all` (if we were to implement `PERCENT_RANK` it would end up in this quadrant, we cannot produce any result without seeing whole data)|
+/// |false|true|`evaluate` (optionally can also implement `evaluate_all` for more optimized implementation. However, there will be default implementation that is suboptimal) . If we were to implement `ROW_NUMBER` it will end up in this quadrant. Example `OddRowNumber` showcases this use case|
+/// |true|false|`evaluate` (I think as long as `uses_window_frame` is `true`. There is no way for `supports_bounded_execution` to be false). I couldn't come up with any example for this quadrant |
+/// |true|true|`evaluate`. If we were to implement `FIRST_VALUE`, it would end up in this quadrant|.
pub trait PartitionEvaluator: Debug + Send {
/// Updates the internal state for window function
///
@@ -120,35 +128,58 @@ pub trait PartitionEvaluator: Debug + Send {
Ok(())
}
- /// Gets the range where the window function result is calculated.
- ///
- /// `idx`: is the index of last row for which result is calculated.
- /// `n_rows`: is the number of rows of the input record batch (Used during bounds check)
- fn get_range(&self, _idx: usize, _n_rows: usize) -> Result<Range<usize>> {
- Err(DataFusionError::NotImplemented(
- "get_range is not implemented for this window function".to_string(),
- ))
+ /// If `uses_window_frame` flag is `false`. This method is used to calculate required range for the window function
+ /// Generally there is no required range, hence by default this returns smallest range(current row). e.g seeing current row
+ /// is enough to calculate window result (such as row_number, rank, etc)
+ fn get_range(&self, idx: usize, _n_rows: usize) -> Result<Range<usize>> {
+ if self.uses_window_frame() {
+ Err(DataFusionError::Execution(
+ "Range should be calculated from window frame".to_string(),
+ ))
+ } else {
+ Ok(Range {
+ start: idx,
+ end: idx + 1,
+ })
+ }
}
/// Called for window functions that *do not use* values from the
/// the window frame, such as `ROW_NUMBER`, `RANK`, `DENSE_RANK`,
- /// `PERCENT_RANK`, `CUME_DIST`, `LEAD`, `LAG`).
- fn evaluate(&self, _values: &[ArrayRef], _num_rows: usize) -> Result<ArrayRef> {
- Err(DataFusionError::NotImplemented(
- "evaluate is not implemented by default".into(),
- ))
+ /// `PERCENT_RANK`, `CUME_DIST`, `LEAD`, `LAG`). It produces result
+ /// of all rows in a single pass. It expects to receive whole table data
+ /// as a single batch.
+ fn evaluate_all(&mut self, values: &[ArrayRef], num_rows: usize) -> Result<ArrayRef> {
+ // When window frame boundaries are not used and evaluator supports bounded execution
+ // We can calculate evaluate result by repeatedly calling `self.evaluate` `num_rows` times
+ // If user wants to implement more efficient version, this method should be overwritten
+ // Default implementation may behave suboptimally (For instance `NumRowEvaluator` overwrites it)
+ if !self.uses_window_frame() && self.supports_bounded_execution() {
+ let res = (0..num_rows)
+ .map(|idx| self.evaluate(values, &self.get_range(idx, num_rows)?))
+ .collect::<Result<Vec<_>>>()?;
+ ScalarValue::iter_to_array(res.into_iter())
+ } else {
+ Err(DataFusionError::NotImplemented(
+ "evaluate_all is not implemented by default".into(),
+ ))
+ }
}
/// Evaluate window function result inside given range.
///
/// Only used for stateful evaluation
- fn evaluate_stateful(&mut self, _values: &[ArrayRef]) -> Result<ScalarValue> {
+ fn evaluate(
+ &mut self,
+ _values: &[ArrayRef],
+ _range: &Range<usize>,
+ ) -> Result<ScalarValue> {
Err(DataFusionError::NotImplemented(
- "evaluate_stateful is not implemented by default".into(),
+ "evaluate is not implemented by default".into(),
))
}
- /// [`PartitionEvaluator::evaluate_with_rank`] is called for window
+ /// [`PartitionEvaluator::evaluate_with_rank_all`] is called for window
/// functions that only need the rank of a row within its window
/// frame.
///
@@ -175,7 +206,7 @@ pub trait PartitionEvaluator: Debug + Send {
/// (3,4),
/// ]
/// ```
- fn evaluate_with_rank(
+ fn evaluate_with_rank_all(
&self,
_num_rows: usize,
_ranks_in_partition: &[Range<usize>],
@@ -185,18 +216,25 @@ pub trait PartitionEvaluator: Debug + Send {
))
}
- /// Called for window functions that use values from window frame,
- /// such as `FIRST_VALUE`, `LAST_VALUE`, `NTH_VALUE` and produce a
- /// single value for every row in the partition.
+ /// Can the window function be incrementally computed using
+ /// bounded memory?
///
- /// Returns a [`ScalarValue`] that is the value of the window function for the entire partition
- fn evaluate_inside_range(
- &self,
- _values: &[ArrayRef],
- _range: &Range<usize>,
- ) -> Result<ScalarValue> {
- Err(DataFusionError::NotImplemented(
- "evaluate_inside_range is not implemented by default".into(),
- ))
+ /// If this function returns true, implement [`PartitionEvaluator::evaluate`]
+ fn supports_bounded_execution(&self) -> bool {
+ false
+ }
+
+ /// Does the window function use the values from its window frame?
+ ///
+ /// If this function returns true, implement [`PartitionEvaluator::evaluate`]
+ fn uses_window_frame(&self) -> bool {
+ false
+ }
+
+ /// Can this function be evaluated with (only) rank
+ ///
+ /// If `include_rank` is true, implement [`PartitionEvaluator::evaluate_with_rank_all`]
+ fn include_rank(&self) -> bool {
+ false
}
}
diff --git a/datafusion/physical-expr/src/window/rank.rs b/datafusion/physical-expr/src/window/rank.rs
index 918fa89f0e..be184ca891 100644
--- a/datafusion/physical-expr/src/window/rank.rs
+++ b/datafusion/physical-expr/src/window/rank.rs
@@ -100,14 +100,6 @@ impl BuiltInWindowFunctionExpr for Rank {
&self.name
}
- fn supports_bounded_execution(&self) -> bool {
- matches!(self.rank_type, RankType::Basic | RankType::Dense)
- }
-
- fn include_rank(&self) -> bool {
- true
- }
-
fn create_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
Ok(Box::new(RankEvaluator {
state: RankState::default(),
@@ -123,12 +115,6 @@ pub(crate) struct RankEvaluator {
}
impl PartitionEvaluator for RankEvaluator {
- fn get_range(&self, idx: usize, _n_rows: usize) -> Result<Range<usize>> {
- let start = idx;
- let end = idx + 1;
- Ok(Range { start, end })
- }
-
fn update_state(
&mut self,
state: &WindowAggState,
@@ -157,7 +143,11 @@ impl PartitionEvaluator for RankEvaluator {
}
/// evaluate window function result inside given range
- fn evaluate_stateful(&mut self, _values: &[ArrayRef]) -> Result<ScalarValue> {
+ fn evaluate(
+ &mut self,
+ _values: &[ArrayRef],
+ _range: &Range<usize>,
+ ) -> Result<ScalarValue> {
match self.rank_type {
RankType::Basic => Ok(ScalarValue::UInt64(Some(
self.state.last_rank_boundary as u64 + 1,
@@ -169,7 +159,7 @@ impl PartitionEvaluator for RankEvaluator {
}
}
- fn evaluate_with_rank(
+ fn evaluate_with_rank_all(
&self,
num_rows: usize,
ranks_in_partition: &[Range<usize>],
@@ -215,6 +205,14 @@ impl PartitionEvaluator for RankEvaluator {
};
Ok(result)
}
+
+ fn supports_bounded_execution(&self) -> bool {
+ matches!(self.rank_type, RankType::Basic | RankType::Dense)
+ }
+
+ fn include_rank(&self) -> bool {
+ true
+ }
}
#[cfg(test)]
@@ -238,7 +236,7 @@ mod tests {
) -> Result<()> {
let result = expr
.create_evaluator()?
- .evaluate_with_rank(num_rows, &ranks)?;
+ .evaluate_with_rank_all(num_rows, &ranks)?;
let result = as_float64_array(&result)?;
let result = result.values();
assert_eq!(expected, *result);
@@ -250,7 +248,7 @@ mod tests {
ranks: Vec<Range<usize>>,
expected: Vec<u64>,
) -> Result<()> {
- let result = expr.create_evaluator()?.evaluate_with_rank(8, &ranks)?;
+ let result = expr.create_evaluator()?.evaluate_with_rank_all(8, &ranks)?;
let result = as_uint64_array(&result)?;
let result = result.values();
assert_eq!(expected, *result);
diff --git a/datafusion/physical-expr/src/window/row_number.rs b/datafusion/physical-expr/src/window/row_number.rs
index 0abc982897..3c1f0583b4 100644
--- a/datafusion/physical-expr/src/window/row_number.rs
+++ b/datafusion/physical-expr/src/window/row_number.rs
@@ -85,10 +85,6 @@ impl BuiltInWindowFunctionExpr for RowNumber {
fn create_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
Ok(Box::<NumRowsEvaluator>::default())
}
-
- fn supports_bounded_execution(&self) -> bool {
- true
- }
}
#[derive(Default, Debug)]
@@ -97,23 +93,29 @@ pub(crate) struct NumRowsEvaluator {
}
impl PartitionEvaluator for NumRowsEvaluator {
- fn get_range(&self, idx: usize, _n_rows: usize) -> Result<Range<usize>> {
- let start = idx;
- let end = idx + 1;
- Ok(Range { start, end })
- }
-
/// evaluate window function result inside given range
- fn evaluate_stateful(&mut self, _values: &[ArrayRef]) -> Result<ScalarValue> {
+ fn evaluate(
+ &mut self,
+ _values: &[ArrayRef],
+ _range: &Range<usize>,
+ ) -> Result<ScalarValue> {
self.state.n_rows += 1;
Ok(ScalarValue::UInt64(Some(self.state.n_rows as u64)))
}
- fn evaluate(&self, _values: &[ArrayRef], num_rows: usize) -> Result<ArrayRef> {
+ fn evaluate_all(
+ &mut self,
+ _values: &[ArrayRef],
+ num_rows: usize,
+ ) -> Result<ArrayRef> {
Ok(Arc::new(UInt64Array::from_iter_values(
1..(num_rows as u64) + 1,
)))
}
+
+ fn supports_bounded_execution(&self) -> bool {
+ true
+ }
}
#[cfg(test)]
@@ -134,7 +136,7 @@ mod tests {
let values = row_number.evaluate_args(&batch)?;
let result = row_number
.create_evaluator()?
- .evaluate(&values, batch.num_rows())?;
+ .evaluate_all(&values, batch.num_rows())?;
let result = as_uint64_array(&result)?;
let result = result.values();
assert_eq!(vec![1, 2, 3, 4, 5, 6, 7, 8], *result);
@@ -152,7 +154,7 @@ mod tests {
let values = row_number.evaluate_args(&batch)?;
let result = row_number
.create_evaluator()?
- .evaluate(&values, batch.num_rows())?;
+ .evaluate_all(&values, batch.num_rows())?;
let result = as_uint64_array(&result)?;
let result = result.values();
assert_eq!(vec![1, 2, 3, 4, 5, 6, 7, 8], *result);