You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/01/18 18:26:27 UTC
[arrow-datafusion] branch master updated: Minor: Reduce redundancy creating window_agg in sort_enforcement tests (#4945)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 64fa312ec Minor: Reduce redundancy creating window_agg in sort_enforcement tests (#4945)
64fa312ec is described below
commit 64fa312ecc5f32294e70fd7389e18cb41f25e732
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Wed Jan 18 19:26:21 2023 +0100
Minor: Reduce redundancy creating window_agg in sort_enforcement tests (#4945)
---
.../src/physical_optimizer/sort_enforcement.rs | 65 +++++++++++-----------
1 file changed, 33 insertions(+), 32 deletions(-)
diff --git a/datafusion/core/src/physical_optimizer/sort_enforcement.rs b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
index 703a13a1c..c17192cad 100644
--- a/datafusion/core/src/physical_optimizer/sort_enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
@@ -671,32 +671,18 @@ mod tests {
)];
let sort = sort_exec(sort_exprs.clone(), source);
- let window_agg_exec = Arc::new(WindowAggExec::try_new(
- vec![create_window_expr(
- &WindowFunction::AggregateFunction(AggregateFunction::Count),
- "count".to_owned(),
- &[col("non_nullable_col", &schema)?],
- &[],
- &sort_exprs,
- Arc::new(WindowFrame::new(true)),
- schema.as_ref(),
- )?],
- sort.clone(),
- sort.schema(),
- vec![],
- Some(sort_exprs),
- )?) as Arc<dyn ExecutionPlan>;
+ let window_agg = window_exec("non_nullable_col", sort_exprs, sort);
let sort_exprs = vec![sort_expr_options(
"non_nullable_col",
- &window_agg_exec.schema(),
+ &window_agg.schema(),
SortOptions {
descending: false,
nulls_first: false,
},
)];
- let sort = sort_exec(sort_exprs.clone(), window_agg_exec);
+ let sort = sort_exec(sort_exprs.clone(), window_agg);
// Add dummy layer propagating Sort above, to test whether sort can be removed from multi layer before
let filter = filter_exec(
@@ -707,21 +693,7 @@ mod tests {
);
// let filter_exec = sort_exec;
- let physical_plan = Arc::new(WindowAggExec::try_new(
- vec![create_window_expr(
- &WindowFunction::AggregateFunction(AggregateFunction::Count),
- "count".to_owned(),
- &[col("non_nullable_col", &schema)?],
- &[],
- &sort_exprs,
- Arc::new(WindowFrame::new(true)),
- schema.as_ref(),
- )?],
- filter.clone(),
- filter.schema(),
- vec![],
- Some(sort_exprs),
- )?) as Arc<dyn ExecutionPlan>;
+ let physical_plan = window_exec("non_nullable_col", sort_exprs, filter);
let expected_input = vec![
"WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow }]",
@@ -888,6 +860,35 @@ mod tests {
Arc::new(FilterExec::try_new(predicate, input).unwrap())
}
+ fn window_exec(
+ col_name: &str,
+ sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
+ input: Arc<dyn ExecutionPlan>,
+ ) -> Arc<dyn ExecutionPlan> {
+ let sort_exprs: Vec<_> = sort_exprs.into_iter().collect();
+ let schema = input.schema();
+
+ Arc::new(
+ WindowAggExec::try_new(
+ vec![create_window_expr(
+ &WindowFunction::AggregateFunction(AggregateFunction::Count),
+ "count".to_owned(),
+ &[col(col_name, &schema).unwrap()],
+ &[],
+ &sort_exprs,
+ Arc::new(WindowFrame::new(true)),
+ schema.as_ref(),
+ )
+ .unwrap()],
+ input.clone(),
+ input.schema(),
+ vec![],
+ Some(sort_exprs),
+ )
+ .unwrap(),
+ )
+ }
+
/// Create a non sorted parquet exec
fn parquet_exec(schema: &SchemaRef) -> Arc<ParquetExec> {
Arc::new(ParquetExec::new(