You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/06/16 15:44:01 UTC

[arrow-datafusion] branch main updated: Replace supports_bounded_execution with supports_retract_batch (#6695)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 8da5f26a43 Replace supports_bounded_execution with supports_retract_batch (#6695)
8da5f26a43 is described below

commit 8da5f26a43e044a46e22bb3e43384aac79582d9c
Author: Mustafa Akur <10...@users.noreply.github.com>
AuthorDate: Fri Jun 16 18:43:56 2023 +0300

    Replace supports_bounded_execution with supports_retract_batch (#6695)
    
    * feat: support sliding window accumulators
    
    Rationale:
    
    The default implementation of the `Accumulator` trait returns an error
    for the `retract_batch` API.
    
    * Allow AggregateUDF to define retractable batch
    
    * Replace supports_bounded_execution with supports_retract_batch
    
    * simplifications
    
    * simplifications
    
    * Rename evalaute_with_rank_all
    
    ---------
    
    Co-authored-by: Stuart Carnie <st...@gmail.com>
    Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
---
 datafusion/physical-expr/src/aggregate/average.rs        |  7 +++----
 datafusion/physical-expr/src/aggregate/count.rs          |  8 ++++----
 datafusion/physical-expr/src/aggregate/min_max.rs        | 16 ++++++++--------
 datafusion/physical-expr/src/aggregate/mod.rs            |  6 ------
 datafusion/physical-expr/src/aggregate/sum.rs            |  8 ++++----
 datafusion/physical-expr/src/window/aggregate.rs         |  3 +--
 datafusion/physical-expr/src/window/built_in.rs          |  2 +-
 datafusion/physical-expr/src/window/cume_dist.rs         |  4 ++--
 .../physical-expr/src/window/partition_evaluator.rs      |  8 ++++----
 datafusion/physical-expr/src/window/rank.rs              |  6 +++---
 datafusion/physical-expr/src/window/sliding_aggregate.rs |  3 +--
 11 files changed, 31 insertions(+), 40 deletions(-)

diff --git a/datafusion/physical-expr/src/aggregate/average.rs b/datafusion/physical-expr/src/aggregate/average.rs
index 6075728622..3c76da51a9 100644
--- a/datafusion/physical-expr/src/aggregate/average.rs
+++ b/datafusion/physical-expr/src/aggregate/average.rs
@@ -134,10 +134,6 @@ impl AggregateExpr for Avg {
         is_row_accumulator_support_dtype(&self.sum_data_type)
     }
 
-    fn supports_bounded_execution(&self) -> bool {
-        true
-    }
-
     fn create_row_accumulator(
         &self,
         start_index: usize,
@@ -263,6 +259,9 @@ impl Accumulator for AvgAccumulator {
             )),
         }
     }
+    fn supports_retract_batch(&self) -> bool {
+        true
+    }
 
     fn size(&self) -> usize {
         std::mem::size_of_val(self) - std::mem::size_of_val(&self.sum) + self.sum.size()
diff --git a/datafusion/physical-expr/src/aggregate/count.rs b/datafusion/physical-expr/src/aggregate/count.rs
index 15df28b4e3..22cb2512fc 100644
--- a/datafusion/physical-expr/src/aggregate/count.rs
+++ b/datafusion/physical-expr/src/aggregate/count.rs
@@ -133,10 +133,6 @@ impl AggregateExpr for Count {
         true
     }
 
-    fn supports_bounded_execution(&self) -> bool {
-        true
-    }
-
     fn create_row_accumulator(
         &self,
         start_index: usize,
@@ -214,6 +210,10 @@ impl Accumulator for CountAccumulator {
         Ok(ScalarValue::Int64(Some(self.count)))
     }
 
+    fn supports_retract_batch(&self) -> bool {
+        true
+    }
+
     fn size(&self) -> usize {
         std::mem::size_of_val(self)
     }
diff --git a/datafusion/physical-expr/src/aggregate/min_max.rs b/datafusion/physical-expr/src/aggregate/min_max.rs
index f811dae7b5..e3c061dc13 100644
--- a/datafusion/physical-expr/src/aggregate/min_max.rs
+++ b/datafusion/physical-expr/src/aggregate/min_max.rs
@@ -125,10 +125,6 @@ impl AggregateExpr for Max {
         is_row_accumulator_support_dtype(&self.data_type)
     }
 
-    fn supports_bounded_execution(&self) -> bool {
-        true
-    }
-
     fn create_row_accumulator(
         &self,
         start_index: usize,
@@ -699,6 +695,10 @@ impl Accumulator for SlidingMaxAccumulator {
         Ok(self.max.clone())
     }
 
+    fn supports_retract_batch(&self) -> bool {
+        true
+    }
+
     fn size(&self) -> usize {
         std::mem::size_of_val(self) - std::mem::size_of_val(&self.max) + self.max.size()
     }
@@ -825,10 +825,6 @@ impl AggregateExpr for Min {
         is_row_accumulator_support_dtype(&self.data_type)
     }
 
-    fn supports_bounded_execution(&self) -> bool {
-        true
-    }
-
     fn create_row_accumulator(
         &self,
         start_index: usize,
@@ -958,6 +954,10 @@ impl Accumulator for SlidingMinAccumulator {
         Ok(self.min.clone())
     }
 
+    fn supports_retract_batch(&self) -> bool {
+        true
+    }
+
     fn size(&self) -> usize {
         std::mem::size_of_val(self) - std::mem::size_of_val(&self.min) + self.min.size()
     }
diff --git a/datafusion/physical-expr/src/aggregate/mod.rs b/datafusion/physical-expr/src/aggregate/mod.rs
index 09fd9bcfc5..7d2316c532 100644
--- a/datafusion/physical-expr/src/aggregate/mod.rs
+++ b/datafusion/physical-expr/src/aggregate/mod.rs
@@ -96,12 +96,6 @@ pub trait AggregateExpr: Send + Sync + Debug + PartialEq<dyn Any> {
         false
     }
 
-    /// Specifies whether this aggregate function can run using bounded memory.
-    /// Any accumulator returning "true" needs to implement `retract_batch`.
-    fn supports_bounded_execution(&self) -> bool {
-        false
-    }
-
     /// RowAccumulator to access/update row-based aggregation state in-place.
     /// Currently, row accumulator only supports states of fixed-sized type.
     ///
diff --git a/datafusion/physical-expr/src/aggregate/sum.rs b/datafusion/physical-expr/src/aggregate/sum.rs
index 1c70dc67be..efa55f0602 100644
--- a/datafusion/physical-expr/src/aggregate/sum.rs
+++ b/datafusion/physical-expr/src/aggregate/sum.rs
@@ -131,10 +131,6 @@ impl AggregateExpr for Sum {
         is_row_accumulator_support_dtype(&self.data_type)
     }
 
-    fn supports_bounded_execution(&self) -> bool {
-        true
-    }
-
     fn create_row_accumulator(
         &self,
         start_index: usize,
@@ -361,6 +357,10 @@ impl Accumulator for SumAccumulator {
         }
     }
 
+    fn supports_retract_batch(&self) -> bool {
+        true
+    }
+
     fn size(&self) -> usize {
         std::mem::size_of_val(self) - std::mem::size_of_val(&self.sum) + self.sum.size()
     }
diff --git a/datafusion/physical-expr/src/window/aggregate.rs b/datafusion/physical-expr/src/window/aggregate.rs
index c8a4797a52..5892f7f3f3 100644
--- a/datafusion/physical-expr/src/window/aggregate.rs
+++ b/datafusion/physical-expr/src/window/aggregate.rs
@@ -155,8 +155,7 @@ impl WindowExpr for PlainAggregateWindowExpr {
     }
 
     fn uses_bounded_memory(&self) -> bool {
-        self.aggregate.supports_bounded_execution()
-            && !self.window_frame.end_bound.is_unbounded()
+        !self.window_frame.end_bound.is_unbounded()
     }
 }
 
diff --git a/datafusion/physical-expr/src/window/built_in.rs b/datafusion/physical-expr/src/window/built_in.rs
index a03267c035..828bc7218f 100644
--- a/datafusion/physical-expr/src/window/built_in.rs
+++ b/datafusion/physical-expr/src/window/built_in.rs
@@ -122,7 +122,7 @@ impl WindowExpr for BuiltInWindowExpr {
         } else if evaluator.include_rank() {
             let columns = self.sort_columns(batch)?;
             let sort_partition_points = evaluate_partition_ranges(num_rows, &columns)?;
-            evaluator.evaluate_with_rank_all(num_rows, &sort_partition_points)
+            evaluator.evaluate_all_with_rank(num_rows, &sort_partition_points)
         } else {
             let (values, _) = self.get_values_orderbys(batch)?;
             evaluator.evaluate_all(&values, num_rows)
diff --git a/datafusion/physical-expr/src/window/cume_dist.rs b/datafusion/physical-expr/src/window/cume_dist.rs
index 47f2e4208d..9040165ac9 100644
--- a/datafusion/physical-expr/src/window/cume_dist.rs
+++ b/datafusion/physical-expr/src/window/cume_dist.rs
@@ -70,7 +70,7 @@ impl BuiltInWindowFunctionExpr for CumeDist {
 pub(crate) struct CumeDistEvaluator;
 
 impl PartitionEvaluator for CumeDistEvaluator {
-    fn evaluate_with_rank_all(
+    fn evaluate_all_with_rank(
         &self,
         num_rows: usize,
         ranks_in_partition: &[Range<usize>],
@@ -109,7 +109,7 @@ mod tests {
     ) -> Result<()> {
         let result = expr
             .create_evaluator()?
-            .evaluate_with_rank_all(num_rows, &ranks)?;
+            .evaluate_all_with_rank(num_rows, &ranks)?;
         let result = as_float64_array(&result)?;
         let result = result.values();
         assert_eq!(expected, *result);
diff --git a/datafusion/physical-expr/src/window/partition_evaluator.rs b/datafusion/physical-expr/src/window/partition_evaluator.rs
index 0dfad0e80f..e518e89a75 100644
--- a/datafusion/physical-expr/src/window/partition_evaluator.rs
+++ b/datafusion/physical-expr/src/window/partition_evaluator.rs
@@ -69,7 +69,7 @@ use std::ops::Range;
 ///
 /// # Stateless `PartitionEvaluator`
 ///
-/// In this case, either [`Self::evaluate_all`] or [`Self::evaluate_with_rank_all`] is called with values for the
+/// In this case, either [`Self::evaluate_all`] or [`Self::evaluate_all_with_rank`] is called with values for the
 /// entire partition.
 ///
 /// # Stateful `PartitionEvaluator`
@@ -221,7 +221,7 @@ pub trait PartitionEvaluator: Debug + Send {
         ))
     }
 
-    /// [`PartitionEvaluator::evaluate_with_rank_all`] is called for window
+    /// [`PartitionEvaluator::evaluate_all_with_rank`] is called for window
     /// functions that only need the rank of a row within its window
     /// frame.
     ///
@@ -248,7 +248,7 @@ pub trait PartitionEvaluator: Debug + Send {
     ///   (3,4),
     /// ]
     /// ```
-    fn evaluate_with_rank_all(
+    fn evaluate_all_with_rank(
         &self,
         _num_rows: usize,
         _ranks_in_partition: &[Range<usize>],
@@ -278,7 +278,7 @@ pub trait PartitionEvaluator: Debug + Send {
 
     /// Can this function be evaluated with (only) rank
     ///
-    /// If `include_rank` is true, implement [`PartitionEvaluator::evaluate_with_rank_all`]
+    /// If `include_rank` is true, implement [`PartitionEvaluator::evaluate_all_with_rank`]
     fn include_rank(&self) -> bool {
         false
     }
diff --git a/datafusion/physical-expr/src/window/rank.rs b/datafusion/physical-expr/src/window/rank.rs
index be184ca891..59a08358cd 100644
--- a/datafusion/physical-expr/src/window/rank.rs
+++ b/datafusion/physical-expr/src/window/rank.rs
@@ -159,7 +159,7 @@ impl PartitionEvaluator for RankEvaluator {
         }
     }
 
-    fn evaluate_with_rank_all(
+    fn evaluate_all_with_rank(
         &self,
         num_rows: usize,
         ranks_in_partition: &[Range<usize>],
@@ -236,7 +236,7 @@ mod tests {
     ) -> Result<()> {
         let result = expr
             .create_evaluator()?
-            .evaluate_with_rank_all(num_rows, &ranks)?;
+            .evaluate_all_with_rank(num_rows, &ranks)?;
         let result = as_float64_array(&result)?;
         let result = result.values();
         assert_eq!(expected, *result);
@@ -248,7 +248,7 @@ mod tests {
         ranks: Vec<Range<usize>>,
         expected: Vec<u64>,
     ) -> Result<()> {
-        let result = expr.create_evaluator()?.evaluate_with_rank_all(8, &ranks)?;
+        let result = expr.create_evaluator()?.evaluate_all_with_rank(8, &ranks)?;
         let result = as_uint64_array(&result)?;
         let result = result.values();
         assert_eq!(expected, *result);
diff --git a/datafusion/physical-expr/src/window/sliding_aggregate.rs b/datafusion/physical-expr/src/window/sliding_aggregate.rs
index 709f8d23be..1494129cf8 100644
--- a/datafusion/physical-expr/src/window/sliding_aggregate.rs
+++ b/datafusion/physical-expr/src/window/sliding_aggregate.rs
@@ -139,8 +139,7 @@ impl WindowExpr for SlidingAggregateWindowExpr {
     }
 
     fn uses_bounded_memory(&self) -> bool {
-        self.aggregate.supports_bounded_execution()
-            && !self.window_frame.end_bound.is_unbounded()
+        !self.window_frame.end_bound.is_unbounded()
     }
 }