You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/06/16 15:44:01 UTC
[arrow-datafusion] branch main updated: Replace supports_bounded_execution with supports_retract_batch (#6695)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 8da5f26a43 Replace supports_bounded_execution with supports_retract_batch (#6695)
8da5f26a43 is described below
commit 8da5f26a43e044a46e22bb3e43384aac79582d9c
Author: Mustafa Akur <10...@users.noreply.github.com>
AuthorDate: Fri Jun 16 18:43:56 2023 +0300
Replace supports_bounded_execution with supports_retract_batch (#6695)
* feat: support sliding window accumulators
Rationale:
The default implementation of the `Accumulator` trait returns an error
for the `retract_batch` API.
* Allow AggregateUDF to define retractable batch
* Replace supports_bounded_execution with supports_retract_batch
* simplifications
* simplifications
* Rename evalaute_with_rank_all
---------
Co-authored-by: Stuart Carnie <st...@gmail.com>
Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
---
datafusion/physical-expr/src/aggregate/average.rs | 7 +++----
datafusion/physical-expr/src/aggregate/count.rs | 8 ++++----
datafusion/physical-expr/src/aggregate/min_max.rs | 16 ++++++++--------
datafusion/physical-expr/src/aggregate/mod.rs | 6 ------
datafusion/physical-expr/src/aggregate/sum.rs | 8 ++++----
datafusion/physical-expr/src/window/aggregate.rs | 3 +--
datafusion/physical-expr/src/window/built_in.rs | 2 +-
datafusion/physical-expr/src/window/cume_dist.rs | 4 ++--
.../physical-expr/src/window/partition_evaluator.rs | 8 ++++----
datafusion/physical-expr/src/window/rank.rs | 6 +++---
datafusion/physical-expr/src/window/sliding_aggregate.rs | 3 +--
11 files changed, 31 insertions(+), 40 deletions(-)
diff --git a/datafusion/physical-expr/src/aggregate/average.rs b/datafusion/physical-expr/src/aggregate/average.rs
index 6075728622..3c76da51a9 100644
--- a/datafusion/physical-expr/src/aggregate/average.rs
+++ b/datafusion/physical-expr/src/aggregate/average.rs
@@ -134,10 +134,6 @@ impl AggregateExpr for Avg {
is_row_accumulator_support_dtype(&self.sum_data_type)
}
- fn supports_bounded_execution(&self) -> bool {
- true
- }
-
fn create_row_accumulator(
&self,
start_index: usize,
@@ -263,6 +259,9 @@ impl Accumulator for AvgAccumulator {
)),
}
}
+ fn supports_retract_batch(&self) -> bool {
+ true
+ }
fn size(&self) -> usize {
std::mem::size_of_val(self) - std::mem::size_of_val(&self.sum) + self.sum.size()
diff --git a/datafusion/physical-expr/src/aggregate/count.rs b/datafusion/physical-expr/src/aggregate/count.rs
index 15df28b4e3..22cb2512fc 100644
--- a/datafusion/physical-expr/src/aggregate/count.rs
+++ b/datafusion/physical-expr/src/aggregate/count.rs
@@ -133,10 +133,6 @@ impl AggregateExpr for Count {
true
}
- fn supports_bounded_execution(&self) -> bool {
- true
- }
-
fn create_row_accumulator(
&self,
start_index: usize,
@@ -214,6 +210,10 @@ impl Accumulator for CountAccumulator {
Ok(ScalarValue::Int64(Some(self.count)))
}
+ fn supports_retract_batch(&self) -> bool {
+ true
+ }
+
fn size(&self) -> usize {
std::mem::size_of_val(self)
}
diff --git a/datafusion/physical-expr/src/aggregate/min_max.rs b/datafusion/physical-expr/src/aggregate/min_max.rs
index f811dae7b5..e3c061dc13 100644
--- a/datafusion/physical-expr/src/aggregate/min_max.rs
+++ b/datafusion/physical-expr/src/aggregate/min_max.rs
@@ -125,10 +125,6 @@ impl AggregateExpr for Max {
is_row_accumulator_support_dtype(&self.data_type)
}
- fn supports_bounded_execution(&self) -> bool {
- true
- }
-
fn create_row_accumulator(
&self,
start_index: usize,
@@ -699,6 +695,10 @@ impl Accumulator for SlidingMaxAccumulator {
Ok(self.max.clone())
}
+ fn supports_retract_batch(&self) -> bool {
+ true
+ }
+
fn size(&self) -> usize {
std::mem::size_of_val(self) - std::mem::size_of_val(&self.max) + self.max.size()
}
@@ -825,10 +825,6 @@ impl AggregateExpr for Min {
is_row_accumulator_support_dtype(&self.data_type)
}
- fn supports_bounded_execution(&self) -> bool {
- true
- }
-
fn create_row_accumulator(
&self,
start_index: usize,
@@ -958,6 +954,10 @@ impl Accumulator for SlidingMinAccumulator {
Ok(self.min.clone())
}
+ fn supports_retract_batch(&self) -> bool {
+ true
+ }
+
fn size(&self) -> usize {
std::mem::size_of_val(self) - std::mem::size_of_val(&self.min) + self.min.size()
}
diff --git a/datafusion/physical-expr/src/aggregate/mod.rs b/datafusion/physical-expr/src/aggregate/mod.rs
index 09fd9bcfc5..7d2316c532 100644
--- a/datafusion/physical-expr/src/aggregate/mod.rs
+++ b/datafusion/physical-expr/src/aggregate/mod.rs
@@ -96,12 +96,6 @@ pub trait AggregateExpr: Send + Sync + Debug + PartialEq<dyn Any> {
false
}
- /// Specifies whether this aggregate function can run using bounded memory.
- /// Any accumulator returning "true" needs to implement `retract_batch`.
- fn supports_bounded_execution(&self) -> bool {
- false
- }
-
/// RowAccumulator to access/update row-based aggregation state in-place.
/// Currently, row accumulator only supports states of fixed-sized type.
///
diff --git a/datafusion/physical-expr/src/aggregate/sum.rs b/datafusion/physical-expr/src/aggregate/sum.rs
index 1c70dc67be..efa55f0602 100644
--- a/datafusion/physical-expr/src/aggregate/sum.rs
+++ b/datafusion/physical-expr/src/aggregate/sum.rs
@@ -131,10 +131,6 @@ impl AggregateExpr for Sum {
is_row_accumulator_support_dtype(&self.data_type)
}
- fn supports_bounded_execution(&self) -> bool {
- true
- }
-
fn create_row_accumulator(
&self,
start_index: usize,
@@ -361,6 +357,10 @@ impl Accumulator for SumAccumulator {
}
}
+ fn supports_retract_batch(&self) -> bool {
+ true
+ }
+
fn size(&self) -> usize {
std::mem::size_of_val(self) - std::mem::size_of_val(&self.sum) + self.sum.size()
}
diff --git a/datafusion/physical-expr/src/window/aggregate.rs b/datafusion/physical-expr/src/window/aggregate.rs
index c8a4797a52..5892f7f3f3 100644
--- a/datafusion/physical-expr/src/window/aggregate.rs
+++ b/datafusion/physical-expr/src/window/aggregate.rs
@@ -155,8 +155,7 @@ impl WindowExpr for PlainAggregateWindowExpr {
}
fn uses_bounded_memory(&self) -> bool {
- self.aggregate.supports_bounded_execution()
- && !self.window_frame.end_bound.is_unbounded()
+ !self.window_frame.end_bound.is_unbounded()
}
}
diff --git a/datafusion/physical-expr/src/window/built_in.rs b/datafusion/physical-expr/src/window/built_in.rs
index a03267c035..828bc7218f 100644
--- a/datafusion/physical-expr/src/window/built_in.rs
+++ b/datafusion/physical-expr/src/window/built_in.rs
@@ -122,7 +122,7 @@ impl WindowExpr for BuiltInWindowExpr {
} else if evaluator.include_rank() {
let columns = self.sort_columns(batch)?;
let sort_partition_points = evaluate_partition_ranges(num_rows, &columns)?;
- evaluator.evaluate_with_rank_all(num_rows, &sort_partition_points)
+ evaluator.evaluate_all_with_rank(num_rows, &sort_partition_points)
} else {
let (values, _) = self.get_values_orderbys(batch)?;
evaluator.evaluate_all(&values, num_rows)
diff --git a/datafusion/physical-expr/src/window/cume_dist.rs b/datafusion/physical-expr/src/window/cume_dist.rs
index 47f2e4208d..9040165ac9 100644
--- a/datafusion/physical-expr/src/window/cume_dist.rs
+++ b/datafusion/physical-expr/src/window/cume_dist.rs
@@ -70,7 +70,7 @@ impl BuiltInWindowFunctionExpr for CumeDist {
pub(crate) struct CumeDistEvaluator;
impl PartitionEvaluator for CumeDistEvaluator {
- fn evaluate_with_rank_all(
+ fn evaluate_all_with_rank(
&self,
num_rows: usize,
ranks_in_partition: &[Range<usize>],
@@ -109,7 +109,7 @@ mod tests {
) -> Result<()> {
let result = expr
.create_evaluator()?
- .evaluate_with_rank_all(num_rows, &ranks)?;
+ .evaluate_all_with_rank(num_rows, &ranks)?;
let result = as_float64_array(&result)?;
let result = result.values();
assert_eq!(expected, *result);
diff --git a/datafusion/physical-expr/src/window/partition_evaluator.rs b/datafusion/physical-expr/src/window/partition_evaluator.rs
index 0dfad0e80f..e518e89a75 100644
--- a/datafusion/physical-expr/src/window/partition_evaluator.rs
+++ b/datafusion/physical-expr/src/window/partition_evaluator.rs
@@ -69,7 +69,7 @@ use std::ops::Range;
///
/// # Stateless `PartitionEvaluator`
///
-/// In this case, either [`Self::evaluate_all`] or [`Self::evaluate_with_rank_all`] is called with values for the
+/// In this case, either [`Self::evaluate_all`] or [`Self::evaluate_all_with_rank`] is called with values for the
/// entire partition.
///
/// # Stateful `PartitionEvaluator`
@@ -221,7 +221,7 @@ pub trait PartitionEvaluator: Debug + Send {
))
}
- /// [`PartitionEvaluator::evaluate_with_rank_all`] is called for window
+ /// [`PartitionEvaluator::evaluate_all_with_rank`] is called for window
/// functions that only need the rank of a row within its window
/// frame.
///
@@ -248,7 +248,7 @@ pub trait PartitionEvaluator: Debug + Send {
/// (3,4),
/// ]
/// ```
- fn evaluate_with_rank_all(
+ fn evaluate_all_with_rank(
&self,
_num_rows: usize,
_ranks_in_partition: &[Range<usize>],
@@ -278,7 +278,7 @@ pub trait PartitionEvaluator: Debug + Send {
/// Can this function be evaluated with (only) rank
///
- /// If `include_rank` is true, implement [`PartitionEvaluator::evaluate_with_rank_all`]
+ /// If `include_rank` is true, implement [`PartitionEvaluator::evaluate_all_with_rank`]
fn include_rank(&self) -> bool {
false
}
diff --git a/datafusion/physical-expr/src/window/rank.rs b/datafusion/physical-expr/src/window/rank.rs
index be184ca891..59a08358cd 100644
--- a/datafusion/physical-expr/src/window/rank.rs
+++ b/datafusion/physical-expr/src/window/rank.rs
@@ -159,7 +159,7 @@ impl PartitionEvaluator for RankEvaluator {
}
}
- fn evaluate_with_rank_all(
+ fn evaluate_all_with_rank(
&self,
num_rows: usize,
ranks_in_partition: &[Range<usize>],
@@ -236,7 +236,7 @@ mod tests {
) -> Result<()> {
let result = expr
.create_evaluator()?
- .evaluate_with_rank_all(num_rows, &ranks)?;
+ .evaluate_all_with_rank(num_rows, &ranks)?;
let result = as_float64_array(&result)?;
let result = result.values();
assert_eq!(expected, *result);
@@ -248,7 +248,7 @@ mod tests {
ranks: Vec<Range<usize>>,
expected: Vec<u64>,
) -> Result<()> {
- let result = expr.create_evaluator()?.evaluate_with_rank_all(8, &ranks)?;
+ let result = expr.create_evaluator()?.evaluate_all_with_rank(8, &ranks)?;
let result = as_uint64_array(&result)?;
let result = result.values();
assert_eq!(expected, *result);
diff --git a/datafusion/physical-expr/src/window/sliding_aggregate.rs b/datafusion/physical-expr/src/window/sliding_aggregate.rs
index 709f8d23be..1494129cf8 100644
--- a/datafusion/physical-expr/src/window/sliding_aggregate.rs
+++ b/datafusion/physical-expr/src/window/sliding_aggregate.rs
@@ -139,8 +139,7 @@ impl WindowExpr for SlidingAggregateWindowExpr {
}
fn uses_bounded_memory(&self) -> bool {
- self.aggregate.supports_bounded_execution()
- && !self.window_frame.end_bound.is_unbounded()
+ !self.window_frame.end_bound.is_unbounded()
}
}