You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/06/14 20:05:46 UTC

[arrow-datafusion] branch main updated: Combine evaluate_stateful and evaluate_inside_range (#6665)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 4184a7f72c Combine evaluate_stateful and evaluate_inside_range (#6665)
4184a7f72c is described below

commit 4184a7f72c7b6172f9f0ff2ce1d466ac0f42e0dd
Author: Mustafa Akur <10...@users.noreply.github.com>
AuthorDate: Wed Jun 14 23:05:39 2023 +0300

    Combine evaluate_stateful and evaluate_inside_range (#6665)
    
    * Combine evaluate_stateful and evaluate_inside_range
    
    * Move flags to partition_evaluator trait
    
    * Update comments
    
    * Update PartitionEvaluator comment
    
    * move include_rank to partition_evaluator
    
    * Default implement get_range when window frame is not used.
---
 datafusion/physical-expr/src/window/built_in.rs    |  29 +++---
 .../src/window/built_in_window_function_expr.rs    |  25 -----
 datafusion/physical-expr/src/window/cume_dist.rs   |  12 +--
 datafusion/physical-expr/src/window/lead_lag.rs    |  22 +++--
 datafusion/physical-expr/src/window/nth_value.rs   |  70 +++++++-------
 datafusion/physical-expr/src/window/ntile.rs       |   6 +-
 .../src/window/partition_evaluator.rs              | 104 ++++++++++++++-------
 datafusion/physical-expr/src/window/rank.rs        |  34 ++++---
 datafusion/physical-expr/src/window/row_number.rs  |  30 +++---
 9 files changed, 179 insertions(+), 153 deletions(-)

diff --git a/datafusion/physical-expr/src/window/built_in.rs b/datafusion/physical-expr/src/window/built_in.rs
index 02cd2d163a..a03267c035 100644
--- a/datafusion/physical-expr/src/window/built_in.rs
+++ b/datafusion/physical-expr/src/window/built_in.rs
@@ -95,9 +95,9 @@ impl WindowExpr for BuiltInWindowExpr {
     }
 
     fn evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef> {
-        let evaluator = self.expr.create_evaluator()?;
+        let mut evaluator = self.expr.create_evaluator()?;
         let num_rows = batch.num_rows();
-        if self.expr.uses_window_frame() {
+        if evaluator.uses_window_frame() {
             let sort_options: Vec<SortOptions> =
                 self.order_by.iter().map(|o| o.options).collect();
             let mut row_wise_results = vec![];
@@ -114,18 +114,18 @@ impl WindowExpr for BuiltInWindowExpr {
                     num_rows,
                     idx,
                 )?;
-                let value = evaluator.evaluate_inside_range(&values, &range)?;
+                let value = evaluator.evaluate(&values, &range)?;
                 row_wise_results.push(value);
                 last_range = range;
             }
             ScalarValue::iter_to_array(row_wise_results.into_iter())
-        } else if self.expr.include_rank() {
+        } else if evaluator.include_rank() {
             let columns = self.sort_columns(batch)?;
             let sort_partition_points = evaluate_partition_ranges(num_rows, &columns)?;
-            evaluator.evaluate_with_rank(num_rows, &sort_partition_points)
+            evaluator.evaluate_with_rank_all(num_rows, &sort_partition_points)
         } else {
             let (values, _) = self.get_values_orderbys(batch)?;
-            evaluator.evaluate(&values, num_rows)
+            evaluator.evaluate_all(&values, num_rows)
         }
     }
 
@@ -164,7 +164,7 @@ impl WindowExpr for BuiltInWindowExpr {
             // We iterate on each row to perform a running calculation.
             let record_batch = &partition_batch_state.record_batch;
             let num_rows = record_batch.num_rows();
-            let sort_partition_points = if self.expr.include_rank() {
+            let sort_partition_points = if evaluator.include_rank() {
                 let columns = self.sort_columns(record_batch)?;
                 evaluate_partition_ranges(num_rows, &columns)?
             } else {
@@ -172,7 +172,7 @@ impl WindowExpr for BuiltInWindowExpr {
             };
             let mut row_wise_results: Vec<ScalarValue> = vec![];
             for idx in state.last_calculated_index..num_rows {
-                let frame_range = if self.expr.uses_window_frame() {
+                let frame_range = if evaluator.uses_window_frame() {
                     state
                         .window_frame_ctx
                         .get_or_insert_with(|| {
@@ -199,7 +199,8 @@ impl WindowExpr for BuiltInWindowExpr {
                 // Update last range
                 state.window_frame_range = frame_range;
                 evaluator.update_state(state, idx, &order_bys, &sort_partition_points)?;
-                row_wise_results.push(evaluator.evaluate_stateful(&values)?);
+                row_wise_results
+                    .push(evaluator.evaluate(&values, &state.window_frame_range)?);
             }
             let out_col = if row_wise_results.is_empty() {
                 new_empty_array(out_type)
@@ -231,8 +232,12 @@ impl WindowExpr for BuiltInWindowExpr {
     }
 
     fn uses_bounded_memory(&self) -> bool {
-        self.expr.supports_bounded_execution()
-            && (!self.expr.uses_window_frame()
-                || !self.window_frame.end_bound.is_unbounded())
+        if let Ok(evaluator) = self.expr.create_evaluator() {
+            evaluator.supports_bounded_execution()
+                && (!evaluator.uses_window_frame()
+                    || !self.window_frame.end_bound.is_unbounded())
+        } else {
+            false
+        }
     }
 }
diff --git a/datafusion/physical-expr/src/window/built_in_window_function_expr.rs b/datafusion/physical-expr/src/window/built_in_window_function_expr.rs
index 763bcfc2b1..432bf78368 100644
--- a/datafusion/physical-expr/src/window/built_in_window_function_expr.rs
+++ b/datafusion/physical-expr/src/window/built_in_window_function_expr.rs
@@ -85,29 +85,4 @@ pub trait BuiltInWindowFunctionExpr: Send + Sync + std::fmt::Debug {
     ///
     /// The default implementation does nothing
     fn add_equal_orderings(&self, _builder: &mut OrderingEquivalenceBuilder) {}
-
-    /// Can the window function be incrementally computed using
-    /// bounded memory?
-    ///
-    /// If this function returns true, [`Self::create_evaluator`] must
-    /// implement [`PartitionEvaluator::evaluate_stateful`]
-    fn supports_bounded_execution(&self) -> bool {
-        false
-    }
-
-    /// Does the window function use the values from its window frame?
-    ///
-    /// If this function returns true, [`Self::create_evaluator`] must
-    /// implement [`PartitionEvaluator::evaluate_inside_range`]
-    fn uses_window_frame(&self) -> bool {
-        false
-    }
-
-    /// Can this function be evaluated with (only) rank
-    ///
-    /// If `include_rank` is true, then [`Self::create_evaluator`] must
-    /// implement [`PartitionEvaluator::evaluate_with_rank`]
-    fn include_rank(&self) -> bool {
-        false
-    }
 }
diff --git a/datafusion/physical-expr/src/window/cume_dist.rs b/datafusion/physical-expr/src/window/cume_dist.rs
index 2214555dbe..47f2e4208d 100644
--- a/datafusion/physical-expr/src/window/cume_dist.rs
+++ b/datafusion/physical-expr/src/window/cume_dist.rs
@@ -64,17 +64,13 @@ impl BuiltInWindowFunctionExpr for CumeDist {
     fn create_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
         Ok(Box::new(CumeDistEvaluator {}))
     }
-
-    fn include_rank(&self) -> bool {
-        true
-    }
 }
 
 #[derive(Debug)]
 pub(crate) struct CumeDistEvaluator;
 
 impl PartitionEvaluator for CumeDistEvaluator {
-    fn evaluate_with_rank(
+    fn evaluate_with_rank_all(
         &self,
         num_rows: usize,
         ranks_in_partition: &[Range<usize>],
@@ -94,6 +90,10 @@ impl PartitionEvaluator for CumeDistEvaluator {
         );
         Ok(Arc::new(result))
     }
+
+    fn include_rank(&self) -> bool {
+        true
+    }
 }
 
 #[cfg(test)]
@@ -109,7 +109,7 @@ mod tests {
     ) -> Result<()> {
         let result = expr
             .create_evaluator()?
-            .evaluate_with_rank(num_rows, &ranks)?;
+            .evaluate_with_rank_all(num_rows, &ranks)?;
         let result = as_float64_array(&result)?;
         let result = result.values();
         assert_eq!(expected, *result);
diff --git a/datafusion/physical-expr/src/window/lead_lag.rs b/datafusion/physical-expr/src/window/lead_lag.rs
index bae5098edf..24248f989e 100644
--- a/datafusion/physical-expr/src/window/lead_lag.rs
+++ b/datafusion/physical-expr/src/window/lead_lag.rs
@@ -110,10 +110,6 @@ impl BuiltInWindowFunctionExpr for WindowShift {
         }))
     }
 
-    fn supports_bounded_execution(&self) -> bool {
-        true
-    }
-
     fn reverse_expr(&self) -> Option<Arc<dyn BuiltInWindowFunctionExpr>> {
         Some(Arc::new(Self {
             name: self.name.clone(),
@@ -206,7 +202,11 @@ impl PartitionEvaluator for WindowShiftEvaluator {
         }
     }
 
-    fn evaluate_stateful(&mut self, values: &[ArrayRef]) -> Result<ScalarValue> {
+    fn evaluate(
+        &mut self,
+        values: &[ArrayRef],
+        _range: &Range<usize>,
+    ) -> Result<ScalarValue> {
         let array = &values[0];
         let dtype = array.data_type();
         let idx = self.state.idx as i64 - self.shift_offset;
@@ -217,11 +217,19 @@ impl PartitionEvaluator for WindowShiftEvaluator {
         }
     }
 
-    fn evaluate(&self, values: &[ArrayRef], _num_rows: usize) -> Result<ArrayRef> {
+    fn evaluate_all(
+        &mut self,
+        values: &[ArrayRef],
+        _num_rows: usize,
+    ) -> Result<ArrayRef> {
         // LEAD, LAG window functions take single column, values will have size 1
         let value = &values[0];
         shift_with_default_value(value, self.shift_offset, self.default_value.as_ref())
     }
+
+    fn supports_bounded_execution(&self) -> bool {
+        true
+    }
 }
 
 fn get_default_value(
@@ -258,7 +266,7 @@ mod tests {
         let values = expr.evaluate_args(&batch)?;
         let result = expr
             .create_evaluator()?
-            .evaluate(&values, batch.num_rows())?;
+            .evaluate_all(&values, batch.num_rows())?;
         let result = as_int32_array(&result)?;
         assert_eq!(expected, *result);
         Ok(())
diff --git a/datafusion/physical-expr/src/window/nth_value.rs b/datafusion/physical-expr/src/window/nth_value.rs
index aa5fe77df0..e6dbeba834 100644
--- a/datafusion/physical-expr/src/window/nth_value.rs
+++ b/datafusion/physical-expr/src/window/nth_value.rs
@@ -122,14 +122,6 @@ impl BuiltInWindowFunctionExpr for NthValue {
         Ok(Box::new(NthValueEvaluator { state }))
     }
 
-    fn supports_bounded_execution(&self) -> bool {
-        true
-    }
-
-    fn uses_window_frame(&self) -> bool {
-        true
-    }
-
     fn reverse_expr(&self) -> Option<Arc<dyn BuiltInWindowFunctionExpr>> {
         let reversed_kind = match self.kind {
             NthValueKind::First => NthValueKind::Last,
@@ -197,40 +189,44 @@ impl PartitionEvaluator for NthValueEvaluator {
         Ok(())
     }
 
-    fn evaluate_stateful(&mut self, values: &[ArrayRef]) -> Result<ScalarValue> {
-        if let Some(ref result) = self.state.finalized_result {
-            Ok(result.clone())
-        } else {
-            self.evaluate_inside_range(values, &self.state.range)
-        }
-    }
-
-    fn evaluate_inside_range(
-        &self,
+    fn evaluate(
+        &mut self,
         values: &[ArrayRef],
         range: &Range<usize>,
     ) -> Result<ScalarValue> {
-        // FIRST_VALUE, LAST_VALUE, NTH_VALUE window functions take a single column, values will have size 1.
-        let arr = &values[0];
-        let n_range = range.end - range.start;
-        if n_range == 0 {
-            // We produce None if the window is empty.
-            return ScalarValue::try_from(arr.data_type());
-        }
-        match self.state.kind {
-            NthValueKind::First => ScalarValue::try_from_array(arr, range.start),
-            NthValueKind::Last => ScalarValue::try_from_array(arr, range.end - 1),
-            NthValueKind::Nth(n) => {
-                // We are certain that n > 0.
-                let index = (n as usize) - 1;
-                if index >= n_range {
-                    ScalarValue::try_from(arr.data_type())
-                } else {
-                    ScalarValue::try_from_array(arr, range.start + index)
+        if let Some(ref result) = self.state.finalized_result {
+            Ok(result.clone())
+        } else {
+            // FIRST_VALUE, LAST_VALUE, NTH_VALUE window functions take a single column, values will have size 1.
+            let arr = &values[0];
+            let n_range = range.end - range.start;
+            if n_range == 0 {
+                // We produce None if the window is empty.
+                return ScalarValue::try_from(arr.data_type());
+            }
+            match self.state.kind {
+                NthValueKind::First => ScalarValue::try_from_array(arr, range.start),
+                NthValueKind::Last => ScalarValue::try_from_array(arr, range.end - 1),
+                NthValueKind::Nth(n) => {
+                    // We are certain that n > 0.
+                    let index = (n as usize) - 1;
+                    if index >= n_range {
+                        ScalarValue::try_from(arr.data_type())
+                    } else {
+                        ScalarValue::try_from_array(arr, range.start + index)
+                    }
                 }
             }
         }
     }
+
+    fn supports_bounded_execution(&self) -> bool {
+        true
+    }
+
+    fn uses_window_frame(&self) -> bool {
+        true
+    }
 }
 
 #[cfg(test)]
@@ -254,11 +250,11 @@ mod tests {
                 end: i + 1,
             })
         }
-        let evaluator = expr.create_evaluator()?;
+        let mut evaluator = expr.create_evaluator()?;
         let values = expr.evaluate_args(&batch)?;
         let result = ranges
             .iter()
-            .map(|range| evaluator.evaluate_inside_range(&values, range))
+            .map(|range| evaluator.evaluate(&values, range))
             .collect::<Result<Vec<ScalarValue>>>()?;
         let result = ScalarValue::iter_to_array(result.into_iter())?;
         let result = as_int32_array(&result)?;
diff --git a/datafusion/physical-expr/src/window/ntile.rs b/datafusion/physical-expr/src/window/ntile.rs
index 479fa26333..2feab9956a 100644
--- a/datafusion/physical-expr/src/window/ntile.rs
+++ b/datafusion/physical-expr/src/window/ntile.rs
@@ -70,7 +70,11 @@ pub(crate) struct NtileEvaluator {
 }
 
 impl PartitionEvaluator for NtileEvaluator {
-    fn evaluate(&self, _values: &[ArrayRef], num_rows: usize) -> Result<ArrayRef> {
+    fn evaluate_all(
+        &mut self,
+        _values: &[ArrayRef],
+        num_rows: usize,
+    ) -> Result<ArrayRef> {
         let num_rows = num_rows as u64;
         let mut vec: Vec<u64> = Vec::new();
         for i in 0..num_rows {
diff --git a/datafusion/physical-expr/src/window/partition_evaluator.rs b/datafusion/physical-expr/src/window/partition_evaluator.rs
index 9e665c5677..553f631790 100644
--- a/datafusion/physical-expr/src/window/partition_evaluator.rs
+++ b/datafusion/physical-expr/src/window/partition_evaluator.rs
@@ -69,18 +69,17 @@ use std::ops::Range;
 ///
 /// # Stateless `PartitionEvaluator`
 ///
-/// In this case, [`Self::evaluate`], [`Self::evaluate_with_rank`] or
-/// [`Self::evaluate_inside_range`] is called with values for the
+/// In this case, either [`Self::evaluate_all`] or [`Self::evaluate_with_rank_all`] is called with values for the
 /// entire partition.
 ///
 /// # Stateful `PartitionEvaluator`
 ///
-/// In this case, [`Self::evaluate_stateful`] is called to calculate
+/// In this case, [`Self::evaluate`] is called to calculate
 /// the results of the window function incrementally for each new
 /// batch.
 ///
 /// For example, when computing `ROW_NUMBER` incrementally,
-/// [`Self::evaluate_stateful`] will be called multiple times with
+/// [`Self::evaluate`] will be called multiple times with
 /// different batches. For all batches after the first, the output
 /// `row_number` must start from last `row_number` produced for the
 /// previous batch. The previous row number is saved and restored as
@@ -88,6 +87,15 @@ use std::ops::Range;
 ///
 /// [`BuiltInWindowFunctionExpr`]: crate::window::BuiltInWindowFunctionExpr
 /// [`BuiltInWindowFunctionExpr::create_evaluator`]: crate::window::BuiltInWindowFunctionExpr::create_evaluator
+/// When implementing a new `PartitionEvaluator`,
+/// `uses_window_frame` and `supports_bounded_execution` flags determine which evaluation method will be called
+/// during runtime. Implement corresponding evaluator according to table below.
+/// |uses_window_frame|supports_bounded_execution|function_to_implement|
+/// |---|---|----|
+/// |false|false|`evaluate_all` (if we were to implement `PERCENT_RANK` it would end up in this quadrant, we cannot produce any result without seeing whole data)|
+/// |false|true|`evaluate` (optionally can also implement `evaluate_all` for more optimized implementation. However, there will be default implementation that is suboptimal) . If we were to implement `ROW_NUMBER` it will end up in this quadrant. Example `OddRowNumber` showcases this use case|
+/// |true|false|`evaluate` (I think as long as `uses_window_frame` is `true`. There is no way for `supports_bounded_execution` to be false). I couldn't come up with any example for this quadrant |
+/// |true|true|`evaluate`. If we were to implement `FIRST_VALUE`, it would end up in this quadrant|.
 pub trait PartitionEvaluator: Debug + Send {
     /// Updates the internal state for window function
     ///
@@ -120,35 +128,58 @@ pub trait PartitionEvaluator: Debug + Send {
         Ok(())
     }
 
-    /// Gets the range where the window function result is calculated.
-    ///
-    /// `idx`: is the index of last row for which result is calculated.
-    /// `n_rows`: is the number of rows of the input record batch (Used during bounds check)
-    fn get_range(&self, _idx: usize, _n_rows: usize) -> Result<Range<usize>> {
-        Err(DataFusionError::NotImplemented(
-            "get_range is not implemented for this window function".to_string(),
-        ))
+    /// If `uses_window_frame` flag is `false`. This method is used to calculate required range for the window function
+    /// Generally there is no required range, hence by default this returns smallest range(current row). e.g seeing current row
+    /// is enough to calculate window result (such as row_number, rank, etc)
+    fn get_range(&self, idx: usize, _n_rows: usize) -> Result<Range<usize>> {
+        if self.uses_window_frame() {
+            Err(DataFusionError::Execution(
+                "Range should be calculated from window frame".to_string(),
+            ))
+        } else {
+            Ok(Range {
+                start: idx,
+                end: idx + 1,
+            })
+        }
     }
 
     /// Called for window functions that *do not use* values from the
     /// the window frame, such as `ROW_NUMBER`, `RANK`, `DENSE_RANK`,
-    /// `PERCENT_RANK`, `CUME_DIST`, `LEAD`, `LAG`).
-    fn evaluate(&self, _values: &[ArrayRef], _num_rows: usize) -> Result<ArrayRef> {
-        Err(DataFusionError::NotImplemented(
-            "evaluate is not implemented by default".into(),
-        ))
+    /// `PERCENT_RANK`, `CUME_DIST`, `LEAD`, `LAG`). It produces result
+    /// of all rows in a single pass. It expects to receive whole table data
+    /// as a single batch.
+    fn evaluate_all(&mut self, values: &[ArrayRef], num_rows: usize) -> Result<ArrayRef> {
+        // When window frame boundaries are not used and evaluator supports bounded execution
+        // We can calculate evaluate result by repeatedly calling `self.evaluate` `num_rows` times
+        // If user wants to implement more efficient version, this method should be overwritten
+        // Default implementation may behave suboptimally (For instance `NumRowEvaluator` overwrites it)
+        if !self.uses_window_frame() && self.supports_bounded_execution() {
+            let res = (0..num_rows)
+                .map(|idx| self.evaluate(values, &self.get_range(idx, num_rows)?))
+                .collect::<Result<Vec<_>>>()?;
+            ScalarValue::iter_to_array(res.into_iter())
+        } else {
+            Err(DataFusionError::NotImplemented(
+                "evaluate_all is not implemented by default".into(),
+            ))
+        }
     }
 
     /// Evaluate window function result inside given range.
     ///
     /// Only used for stateful evaluation
-    fn evaluate_stateful(&mut self, _values: &[ArrayRef]) -> Result<ScalarValue> {
+    fn evaluate(
+        &mut self,
+        _values: &[ArrayRef],
+        _range: &Range<usize>,
+    ) -> Result<ScalarValue> {
         Err(DataFusionError::NotImplemented(
-            "evaluate_stateful is not implemented by default".into(),
+            "evaluate is not implemented by default".into(),
         ))
     }
 
-    /// [`PartitionEvaluator::evaluate_with_rank`] is called for window
+    /// [`PartitionEvaluator::evaluate_with_rank_all`] is called for window
     /// functions that only need the rank of a row within its window
     /// frame.
     ///
@@ -175,7 +206,7 @@ pub trait PartitionEvaluator: Debug + Send {
     ///   (3,4),
     /// ]
     /// ```
-    fn evaluate_with_rank(
+    fn evaluate_with_rank_all(
         &self,
         _num_rows: usize,
         _ranks_in_partition: &[Range<usize>],
@@ -185,18 +216,25 @@ pub trait PartitionEvaluator: Debug + Send {
         ))
     }
 
-    /// Called for window functions that use values from window frame,
-    /// such as `FIRST_VALUE`, `LAST_VALUE`, `NTH_VALUE` and produce a
-    /// single value for every row in the partition.
+    /// Can the window function be incrementally computed using
+    /// bounded memory?
     ///
-    /// Returns a [`ScalarValue`] that is the value of the window function for the entire partition
-    fn evaluate_inside_range(
-        &self,
-        _values: &[ArrayRef],
-        _range: &Range<usize>,
-    ) -> Result<ScalarValue> {
-        Err(DataFusionError::NotImplemented(
-            "evaluate_inside_range is not implemented by default".into(),
-        ))
+    /// If this function returns true, implement [`PartitionEvaluator::evaluate`]
+    fn supports_bounded_execution(&self) -> bool {
+        false
+    }
+
+    /// Does the window function use the values from its window frame?
+    ///
+    /// If this function returns true, implement [`PartitionEvaluator::evaluate`]
+    fn uses_window_frame(&self) -> bool {
+        false
+    }
+
+    /// Can this function be evaluated with (only) rank
+    ///
+    /// If `include_rank` is true, implement [`PartitionEvaluator::evaluate_with_rank_all`]
+    fn include_rank(&self) -> bool {
+        false
     }
 }
diff --git a/datafusion/physical-expr/src/window/rank.rs b/datafusion/physical-expr/src/window/rank.rs
index 918fa89f0e..be184ca891 100644
--- a/datafusion/physical-expr/src/window/rank.rs
+++ b/datafusion/physical-expr/src/window/rank.rs
@@ -100,14 +100,6 @@ impl BuiltInWindowFunctionExpr for Rank {
         &self.name
     }
 
-    fn supports_bounded_execution(&self) -> bool {
-        matches!(self.rank_type, RankType::Basic | RankType::Dense)
-    }
-
-    fn include_rank(&self) -> bool {
-        true
-    }
-
     fn create_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
         Ok(Box::new(RankEvaluator {
             state: RankState::default(),
@@ -123,12 +115,6 @@ pub(crate) struct RankEvaluator {
 }
 
 impl PartitionEvaluator for RankEvaluator {
-    fn get_range(&self, idx: usize, _n_rows: usize) -> Result<Range<usize>> {
-        let start = idx;
-        let end = idx + 1;
-        Ok(Range { start, end })
-    }
-
     fn update_state(
         &mut self,
         state: &WindowAggState,
@@ -157,7 +143,11 @@ impl PartitionEvaluator for RankEvaluator {
     }
 
     /// evaluate window function result inside given range
-    fn evaluate_stateful(&mut self, _values: &[ArrayRef]) -> Result<ScalarValue> {
+    fn evaluate(
+        &mut self,
+        _values: &[ArrayRef],
+        _range: &Range<usize>,
+    ) -> Result<ScalarValue> {
         match self.rank_type {
             RankType::Basic => Ok(ScalarValue::UInt64(Some(
                 self.state.last_rank_boundary as u64 + 1,
@@ -169,7 +159,7 @@ impl PartitionEvaluator for RankEvaluator {
         }
     }
 
-    fn evaluate_with_rank(
+    fn evaluate_with_rank_all(
         &self,
         num_rows: usize,
         ranks_in_partition: &[Range<usize>],
@@ -215,6 +205,14 @@ impl PartitionEvaluator for RankEvaluator {
         };
         Ok(result)
     }
+
+    fn supports_bounded_execution(&self) -> bool {
+        matches!(self.rank_type, RankType::Basic | RankType::Dense)
+    }
+
+    fn include_rank(&self) -> bool {
+        true
+    }
 }
 
 #[cfg(test)]
@@ -238,7 +236,7 @@ mod tests {
     ) -> Result<()> {
         let result = expr
             .create_evaluator()?
-            .evaluate_with_rank(num_rows, &ranks)?;
+            .evaluate_with_rank_all(num_rows, &ranks)?;
         let result = as_float64_array(&result)?;
         let result = result.values();
         assert_eq!(expected, *result);
@@ -250,7 +248,7 @@ mod tests {
         ranks: Vec<Range<usize>>,
         expected: Vec<u64>,
     ) -> Result<()> {
-        let result = expr.create_evaluator()?.evaluate_with_rank(8, &ranks)?;
+        let result = expr.create_evaluator()?.evaluate_with_rank_all(8, &ranks)?;
         let result = as_uint64_array(&result)?;
         let result = result.values();
         assert_eq!(expected, *result);
diff --git a/datafusion/physical-expr/src/window/row_number.rs b/datafusion/physical-expr/src/window/row_number.rs
index 0abc982897..3c1f0583b4 100644
--- a/datafusion/physical-expr/src/window/row_number.rs
+++ b/datafusion/physical-expr/src/window/row_number.rs
@@ -85,10 +85,6 @@ impl BuiltInWindowFunctionExpr for RowNumber {
     fn create_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
         Ok(Box::<NumRowsEvaluator>::default())
     }
-
-    fn supports_bounded_execution(&self) -> bool {
-        true
-    }
 }
 
 #[derive(Default, Debug)]
@@ -97,23 +93,29 @@ pub(crate) struct NumRowsEvaluator {
 }
 
 impl PartitionEvaluator for NumRowsEvaluator {
-    fn get_range(&self, idx: usize, _n_rows: usize) -> Result<Range<usize>> {
-        let start = idx;
-        let end = idx + 1;
-        Ok(Range { start, end })
-    }
-
     /// evaluate window function result inside given range
-    fn evaluate_stateful(&mut self, _values: &[ArrayRef]) -> Result<ScalarValue> {
+    fn evaluate(
+        &mut self,
+        _values: &[ArrayRef],
+        _range: &Range<usize>,
+    ) -> Result<ScalarValue> {
         self.state.n_rows += 1;
         Ok(ScalarValue::UInt64(Some(self.state.n_rows as u64)))
     }
 
-    fn evaluate(&self, _values: &[ArrayRef], num_rows: usize) -> Result<ArrayRef> {
+    fn evaluate_all(
+        &mut self,
+        _values: &[ArrayRef],
+        num_rows: usize,
+    ) -> Result<ArrayRef> {
         Ok(Arc::new(UInt64Array::from_iter_values(
             1..(num_rows as u64) + 1,
         )))
     }
+
+    fn supports_bounded_execution(&self) -> bool {
+        true
+    }
 }
 
 #[cfg(test)]
@@ -134,7 +136,7 @@ mod tests {
         let values = row_number.evaluate_args(&batch)?;
         let result = row_number
             .create_evaluator()?
-            .evaluate(&values, batch.num_rows())?;
+            .evaluate_all(&values, batch.num_rows())?;
         let result = as_uint64_array(&result)?;
         let result = result.values();
         assert_eq!(vec![1, 2, 3, 4, 5, 6, 7, 8], *result);
@@ -152,7 +154,7 @@ mod tests {
         let values = row_number.evaluate_args(&batch)?;
         let result = row_number
             .create_evaluator()?
-            .evaluate(&values, batch.num_rows())?;
+            .evaluate_all(&values, batch.num_rows())?;
         let result = as_uint64_array(&result)?;
         let result = result.values();
         assert_eq!(vec![1, 2, 3, 4, 5, 6, 7, 8], *result);