You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/01/18 18:26:27 UTC

[arrow-datafusion] branch master updated: Minor: Reduce redundancy creating window_agg in sort_enforcement tests (#4945)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 64fa312ec Minor: Reduce redundancy creating window_agg in sort_enforcement tests (#4945)
64fa312ec is described below

commit 64fa312ecc5f32294e70fd7389e18cb41f25e732
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Wed Jan 18 19:26:21 2023 +0100

    Minor: Reduce redundancy creating window_agg in sort_enforcement tests (#4945)
---
 .../src/physical_optimizer/sort_enforcement.rs     | 65 +++++++++++-----------
 1 file changed, 33 insertions(+), 32 deletions(-)

diff --git a/datafusion/core/src/physical_optimizer/sort_enforcement.rs b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
index 703a13a1c..c17192cad 100644
--- a/datafusion/core/src/physical_optimizer/sort_enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
@@ -671,32 +671,18 @@ mod tests {
         )];
         let sort = sort_exec(sort_exprs.clone(), source);
 
-        let window_agg_exec = Arc::new(WindowAggExec::try_new(
-            vec![create_window_expr(
-                &WindowFunction::AggregateFunction(AggregateFunction::Count),
-                "count".to_owned(),
-                &[col("non_nullable_col", &schema)?],
-                &[],
-                &sort_exprs,
-                Arc::new(WindowFrame::new(true)),
-                schema.as_ref(),
-            )?],
-            sort.clone(),
-            sort.schema(),
-            vec![],
-            Some(sort_exprs),
-        )?) as Arc<dyn ExecutionPlan>;
+        let window_agg = window_exec("non_nullable_col", sort_exprs, sort);
 
         let sort_exprs = vec![sort_expr_options(
             "non_nullable_col",
-            &window_agg_exec.schema(),
+            &window_agg.schema(),
             SortOptions {
                 descending: false,
                 nulls_first: false,
             },
         )];
 
-        let sort = sort_exec(sort_exprs.clone(), window_agg_exec);
+        let sort = sort_exec(sort_exprs.clone(), window_agg);
 
         // Add dummy layer propagating Sort above, to test whether sort can be removed from multi layer before
         let filter = filter_exec(
@@ -707,21 +693,7 @@ mod tests {
         );
 
         // let filter_exec = sort_exec;
-        let physical_plan = Arc::new(WindowAggExec::try_new(
-            vec![create_window_expr(
-                &WindowFunction::AggregateFunction(AggregateFunction::Count),
-                "count".to_owned(),
-                &[col("non_nullable_col", &schema)?],
-                &[],
-                &sort_exprs,
-                Arc::new(WindowFrame::new(true)),
-                schema.as_ref(),
-            )?],
-            filter.clone(),
-            filter.schema(),
-            vec![],
-            Some(sort_exprs),
-        )?) as Arc<dyn ExecutionPlan>;
+        let physical_plan = window_exec("non_nullable_col", sort_exprs, filter);
 
         let expected_input = vec![
             "WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow }]",
@@ -888,6 +860,35 @@ mod tests {
         Arc::new(FilterExec::try_new(predicate, input).unwrap())
     }
 
+    fn window_exec(
+        col_name: &str,
+        sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
+        input: Arc<dyn ExecutionPlan>,
+    ) -> Arc<dyn ExecutionPlan> {
+        let sort_exprs: Vec<_> = sort_exprs.into_iter().collect();
+        let schema = input.schema();
+
+        Arc::new(
+            WindowAggExec::try_new(
+                vec![create_window_expr(
+                    &WindowFunction::AggregateFunction(AggregateFunction::Count),
+                    "count".to_owned(),
+                    &[col(col_name, &schema).unwrap()],
+                    &[],
+                    &sort_exprs,
+                    Arc::new(WindowFrame::new(true)),
+                    schema.as_ref(),
+                )
+                .unwrap()],
+                input.clone(),
+                input.schema(),
+                vec![],
+                Some(sort_exprs),
+            )
+            .unwrap(),
+        )
+    }
+
     /// Create a non sorted parquet exec
     fn parquet_exec(schema: &SchemaRef) -> Arc<ParquetExec> {
         Arc::new(ParquetExec::new(