You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/04/10 14:58:43 UTC

[arrow-datafusion] branch main updated: Clean up SortExec creation and add doc comments (#5889)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new db2fa44b77 Clean up SortExec creation and add doc comments (#5889)
db2fa44b77 is described below

commit db2fa44b779a5274bc06d71364cd20e0e4b666bb
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Mon Apr 10 10:58:36 2023 -0400

    Clean up SortExec creation and add doc comments (#5889)
    
    * Clean up SortExec creation and add doc comments
    
    * Reduce API surface
    
    * restore sort bench
    
    * fix benchmark
    
    * Add with_fetch
---
 benchmarks/src/bin/parquet.rs                      |   2 +-
 datafusion/core/benches/sort.rs                    |   6 +-
 .../physical_optimizer/global_sort_selection.rs    |  10 +-
 .../core/src/physical_optimizer/repartition.rs     |   9 +-
 .../src/physical_optimizer/sort_enforcement.rs     |  39 ++----
 datafusion/core/src/physical_optimizer/utils.rs    |   6 +-
 datafusion/core/src/physical_plan/common.rs        |   2 +-
 datafusion/core/src/physical_plan/planner.rs       |   4 +-
 datafusion/core/src/physical_plan/sorts/sort.rs    | 143 +++++++++++++--------
 .../physical_plan/sorts/sort_preserving_merge.rs   |  10 +-
 datafusion/core/tests/order_spill_fuzz.rs          |   2 +-
 datafusion/proto/src/physical_plan/mod.rs          |  30 ++---
 12 files changed, 137 insertions(+), 126 deletions(-)

diff --git a/benchmarks/src/bin/parquet.rs b/benchmarks/src/bin/parquet.rs
index 7ddc18c07a..589f967a6d 100644
--- a/benchmarks/src/bin/parquet.rs
+++ b/benchmarks/src/bin/parquet.rs
@@ -325,7 +325,7 @@ async fn exec_sort(
 ) -> Result<(usize, std::time::Duration)> {
     let start = Instant::now();
     let scan = test_file.create_scan(None).await?;
-    let exec = Arc::new(SortExec::try_new(expr.to_owned(), scan, None)?);
+    let exec = Arc::new(SortExec::new(expr.to_owned(), scan));
     let task_ctx = ctx.task_ctx();
     let result = collect(exec, task_ctx).await?;
     let elapsed = start.elapsed();
diff --git a/datafusion/core/benches/sort.rs b/datafusion/core/benches/sort.rs
index 907392bc3e..4045702d63 100644
--- a/datafusion/core/benches/sort.rs
+++ b/datafusion/core/benches/sort.rs
@@ -189,7 +189,7 @@ impl BenchCase {
 
         let exec = MemoryExec::try_new(partitions, schema, None).unwrap();
         let exec =
-            SortExec::new_with_partitioning(sort.clone(), Arc::new(exec), true, None);
+            SortExec::new(sort.clone(), Arc::new(exec)).with_preserve_partitioning(true);
         let plan = Arc::new(SortPreservingMergeExec::new(sort, Arc::new(exec)));
 
         Self {
@@ -211,7 +211,7 @@ impl BenchCase {
 
         let exec = MemoryExec::try_new(partitions, schema, None).unwrap();
         let exec = Arc::new(CoalescePartitionsExec::new(Arc::new(exec)));
-        let plan = Arc::new(SortExec::try_new(sort, exec, None).unwrap());
+        let plan = Arc::new(SortExec::new(sort, exec));
 
         Self {
             runtime,
@@ -231,7 +231,7 @@ impl BenchCase {
         let sort = make_sort_exprs(schema.as_ref());
 
         let exec = MemoryExec::try_new(partitions, schema, None).unwrap();
-        let exec = SortExec::new_with_partitioning(sort, Arc::new(exec), true, None);
+        let exec = SortExec::new(sort, Arc::new(exec)).with_preserve_partitioning(true);
         let plan = Arc::new(CoalescePartitionsExec::new(Arc::new(exec)));
 
         Self {
diff --git a/datafusion/core/src/physical_optimizer/global_sort_selection.rs b/datafusion/core/src/physical_optimizer/global_sort_selection.rs
index e29735e741..9466297d24 100644
--- a/datafusion/core/src/physical_optimizer/global_sort_selection.rs
+++ b/datafusion/core/src/physical_optimizer/global_sort_selection.rs
@@ -60,12 +60,12 @@ impl PhysicalOptimizerRule for GlobalSortSelection {
                         && !sort_exec.preserve_partitioning()
                         && (sort_exec.fetch().is_some() ||  config.optimizer.repartition_sorts)
                     {
-                            let sort = SortExec::new_with_partitioning(
+                            let sort = SortExec::new(
                                 sort_exec.expr().to_vec(),
-                                sort_exec.input().clone(),
-                                true,
-                                sort_exec.fetch(),
-                            );
+                                sort_exec.input().clone()
+                            )
+                            .with_fetch(sort_exec.fetch())
+                            .with_preserve_partitioning(true);
                             let global_sort: Arc<dyn ExecutionPlan> =
                                 Arc::new(SortPreservingMergeExec::new(
                                     sort_exec.expr().to_vec(),
diff --git a/datafusion/core/src/physical_optimizer/repartition.rs b/datafusion/core/src/physical_optimizer/repartition.rs
index 35592519a4..3bb21b12be 100644
--- a/datafusion/core/src/physical_optimizer/repartition.rs
+++ b/datafusion/core/src/physical_optimizer/repartition.rs
@@ -460,12 +460,9 @@ mod tests {
             expr: col("c1", &schema()).unwrap(),
             options: SortOptions::default(),
         }];
-        Arc::new(SortExec::new_with_partitioning(
-            sort_exprs,
-            input,
-            preserve_partitioning,
-            None,
-        ))
+        let new_sort = SortExec::new(sort_exprs, input)
+            .with_preserve_partitioning(preserve_partitioning);
+        Arc::new(new_sort)
     }
 
     fn projection_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
diff --git a/datafusion/core/src/physical_optimizer/sort_enforcement.rs b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
index 26299e4683..b1a4da65e0 100644
--- a/datafusion/core/src/physical_optimizer/sort_enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
@@ -1210,12 +1210,10 @@ mod tests {
             sort_expr("non_nullable_col", &schema),
         ];
         let repartition_exec = repartition_exec(spm);
-        let sort2 = Arc::new(SortExec::new_with_partitioning(
-            sort_exprs.clone(),
-            repartition_exec,
-            true,
-            None,
-        )) as _;
+        let sort2 = Arc::new(
+            SortExec::new(sort_exprs.clone(), repartition_exec)
+                .with_preserve_partitioning(true),
+        ) as _;
         let spm2 = sort_preserving_merge_exec(sort_exprs, sort2);
 
         let physical_plan = aggregate_exec(spm2);
@@ -1253,12 +1251,9 @@ mod tests {
 
         let sort_exprs = vec![sort_expr("non_nullable_col", &schema)];
         // let sort = sort_exec(sort_exprs.clone(), union);
-        let sort = Arc::new(SortExec::new_with_partitioning(
-            sort_exprs.clone(),
-            union,
-            true,
-            None,
-        )) as _;
+        let sort = Arc::new(
+            SortExec::new(sort_exprs.clone(), union).with_preserve_partitioning(true),
+        ) as _;
         let spm = sort_preserving_merge_exec(sort_exprs, sort);
 
         let filter = filter_exec(
@@ -2264,12 +2259,8 @@ mod tests {
         let window = bounded_window_exec("nullable_col", sort_exprs.clone(), memory_exec);
         let repartition = repartition_exec(window);
 
-        let orig_plan = Arc::new(SortExec::new_with_partitioning(
-            sort_exprs,
-            repartition,
-            false,
-            None,
-        )) as Arc<dyn ExecutionPlan>;
+        let orig_plan =
+            Arc::new(SortExec::new(sort_exprs, repartition)) as Arc<dyn ExecutionPlan>;
 
         let mut plan = orig_plan.clone();
         let rules = vec![
@@ -2305,12 +2296,10 @@ mod tests {
         let repartition = repartition_exec(coalesce_partitions);
         let sort_exprs = vec![sort_expr("nullable_col", &schema)];
         // Add local sort
-        let sort = Arc::new(SortExec::new_with_partitioning(
-            sort_exprs.clone(),
-            repartition,
-            true,
-            None,
-        )) as _;
+        let sort = Arc::new(
+            SortExec::new(sort_exprs.clone(), repartition)
+                .with_preserve_partitioning(true),
+        ) as _;
         let spm = sort_preserving_merge_exec(sort_exprs.clone(), sort);
         let sort = sort_exec(sort_exprs, spm);
 
@@ -2362,7 +2351,7 @@ mod tests {
         input: Arc<dyn ExecutionPlan>,
     ) -> Arc<dyn ExecutionPlan> {
         let sort_exprs = sort_exprs.into_iter().collect();
-        Arc::new(SortExec::try_new(sort_exprs, input, None).unwrap())
+        Arc::new(SortExec::new(sort_exprs, input))
     }
 
     fn sort_preserving_merge_exec(
diff --git a/datafusion/core/src/physical_optimizer/utils.rs b/datafusion/core/src/physical_optimizer/utils.rs
index 2fa833bb7e..06bef0fbda 100644
--- a/datafusion/core/src/physical_optimizer/utils.rs
+++ b/datafusion/core/src/physical_optimizer/utils.rs
@@ -66,10 +66,12 @@ pub fn add_sort_above(
     if !ordering_satisfy(node.output_ordering(), Some(&sort_expr), || {
         node.equivalence_properties()
     }) {
+        let new_sort = SortExec::new(sort_expr, node.clone());
+
         *node = Arc::new(if node.output_partitioning().partition_count() > 1 {
-            SortExec::new_with_partitioning(sort_expr, node.clone(), true, None)
+            new_sort.with_preserve_partitioning(true)
         } else {
-            SortExec::try_new(sort_expr, node.clone(), None)?
+            new_sort
         }) as _
     }
     Ok(())
diff --git a/datafusion/core/src/physical_plan/common.rs b/datafusion/core/src/physical_plan/common.rs
index 7fb67d758f..42cd8fada9 100644
--- a/datafusion/core/src/physical_plan/common.rs
+++ b/datafusion/core/src/physical_plan/common.rs
@@ -485,7 +485,7 @@ mod tests {
             options: SortOptions::default(),
         }];
         let memory_exec = Arc::new(MemoryExec::try_new(&[], schema.clone(), None)?) as _;
-        let sort_exec = Arc::new(SortExec::try_new(sort_expr.clone(), memory_exec, None)?)
+        let sort_exec = Arc::new(SortExec::new(sort_expr.clone(), memory_exec))
             as Arc<dyn ExecutionPlan>;
         let memory_exec2 = Arc::new(MemoryExec::try_new(&[], schema, None)?) as _;
         // memory_exec2 doesn't have output ordering
diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs
index 8c3f61b01e..01efa2f796 100644
--- a/datafusion/core/src/physical_plan/planner.rs
+++ b/datafusion/core/src/physical_plan/planner.rs
@@ -821,7 +821,9 @@ impl DefaultPhysicalPlanner {
                             )),
                         })
                         .collect::<Result<Vec<_>>>()?;
-                    Ok(Arc::new(SortExec::try_new(sort_expr, physical_input, *fetch)?))
+                    let new_sort = SortExec::new(sort_expr, physical_input)
+                        .with_fetch(*fetch);
+                    Ok(Arc::new(new_sort))
                 }
                 LogicalPlan::Join(Join {
                     left,
diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs
index 1eceb02c2d..6037bc5c8b 100644
--- a/datafusion/core/src/physical_plan/sorts/sort.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort.rs
@@ -621,7 +621,10 @@ fn read_spill(sender: Sender<Result<RecordBatch>>, path: &Path) -> Result<()> {
     Ok(())
 }
 
-/// External Sort execution plan
+/// Sort execution plan.
+///
+/// This operator supports sorting datasets that are larger than the
+/// memory allotted by the memory manager, by spilling to disk.
 #[derive(Debug)]
 pub struct SortExec {
     /// Input schema
@@ -630,7 +633,8 @@ pub struct SortExec {
     expr: Vec<PhysicalSortExpr>,
     /// Containing all metrics set created during sort
     metrics_set: CompositeMetricsSet,
-    /// Preserve partitions of input plan
+    /// Preserve partitions of input plan. If false, the input partitions
+    /// will be sorted and merged into a single output partition.
     preserve_partitioning: bool,
     /// Fetch highest/lowest n results
     fetch: Option<usize>,
@@ -638,34 +642,65 @@ pub struct SortExec {
 
 impl SortExec {
     /// Create a new sort execution plan
+    #[deprecated(since = "22.0.0", note = "use `new` and `with_fetch`")]
     pub fn try_new(
         expr: Vec<PhysicalSortExpr>,
         input: Arc<dyn ExecutionPlan>,
         fetch: Option<usize>,
     ) -> Result<Self> {
-        Ok(Self::new_with_partitioning(expr, input, false, fetch))
+        Ok(Self::new(expr, input).with_fetch(fetch))
     }
 
-    /// Whether this `SortExec` preserves partitioning of the children
-    pub fn preserve_partitioning(&self) -> bool {
-        self.preserve_partitioning
+    /// Create a new sort execution plan that produces a single,
+    /// sorted output partition.
+    pub fn new(expr: Vec<PhysicalSortExpr>, input: Arc<dyn ExecutionPlan>) -> Self {
+        Self {
+            expr,
+            input,
+            metrics_set: CompositeMetricsSet::new(),
+            preserve_partitioning: false,
+            fetch: None,
+        }
     }
 
     /// Create a new sort execution plan with the option to preserve
     /// the partitioning of the input plan
+    #[deprecated(
+        since = "22.0.0",
+        note = "use `new`, `with_fetch` and `with_preserve_partioning` instead"
+    )]
     pub fn new_with_partitioning(
         expr: Vec<PhysicalSortExpr>,
         input: Arc<dyn ExecutionPlan>,
         preserve_partitioning: bool,
         fetch: Option<usize>,
     ) -> Self {
-        Self {
-            expr,
-            input,
-            metrics_set: CompositeMetricsSet::new(),
-            preserve_partitioning,
-            fetch,
-        }
+        Self::new(expr, input)
+            .with_fetch(fetch)
+            .with_preserve_partitioning(preserve_partitioning)
+    }
+
+    /// Whether this `SortExec` preserves partitioning of the children
+    pub fn preserve_partitioning(&self) -> bool {
+        self.preserve_partitioning
+    }
+
+    /// Specify the partitioning behavior of this sort exec
+    ///
+    /// If `preserve_partitioning` is true, sorts each partition
+    /// individually, producing one sorted strema for each input partition.
+    ///
+    /// If `preserve_partitioning` is false, sorts and merges all
+    /// input partitions producing a single, sorted partition.
+    pub fn with_preserve_partitioning(mut self, preserve_partitioning: bool) -> Self {
+        self.preserve_partitioning = preserve_partitioning;
+        self
+    }
+
+    /// Whether this `SortExec` preserves partitioning of the children
+    pub fn with_fetch(mut self, fetch: Option<usize>) -> Self {
+        self.fetch = fetch;
+        self
     }
 
     /// Input schema
@@ -704,7 +739,7 @@ impl ExecutionPlan for SortExec {
 
     /// Specifies whether this plan generates an infinite stream of records.
     /// If the plan does not support pipelining, but it its input(s) are
-    /// infinite, returns an error to indicate this.    
+    /// infinite, returns an error to indicate this.
     fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
         if children[0] {
             Err(DataFusionError::Plan(
@@ -745,12 +780,11 @@ impl ExecutionPlan for SortExec {
         self: Arc<Self>,
         children: Vec<Arc<dyn ExecutionPlan>>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        Ok(Arc::new(SortExec::new_with_partitioning(
-            self.expr.clone(),
-            children[0].clone(),
-            self.preserve_partitioning,
-            self.fetch,
-        )))
+        let new_sort = SortExec::new(self.expr.clone(), children[0].clone())
+            .with_fetch(self.fetch)
+            .with_preserve_partitioning(self.preserve_partitioning);
+
+        Ok(Arc::new(new_sort))
     }
 
     fn execute(
@@ -935,7 +969,7 @@ mod tests {
         let csv = test::scan_partitioned_csv(partitions)?;
         let schema = csv.schema();
 
-        let sort_exec = Arc::new(SortExec::try_new(
+        let sort_exec = Arc::new(SortExec::new(
             vec![
                 // c1 string column
                 PhysicalSortExpr {
@@ -954,8 +988,7 @@ mod tests {
                 },
             ],
             Arc::new(CoalescePartitionsExec::new(csv)),
-            None,
-        )?);
+        ));
 
         let result = collect(sort_exec, task_ctx).await?;
 
@@ -995,7 +1028,7 @@ mod tests {
         let csv = test::scan_partitioned_csv(partitions)?;
         let schema = csv.schema();
 
-        let sort_exec = Arc::new(SortExec::try_new(
+        let sort_exec = Arc::new(SortExec::new(
             vec![
                 // c1 string column
                 PhysicalSortExpr {
@@ -1014,8 +1047,7 @@ mod tests {
                 },
             ],
             Arc::new(CoalescePartitionsExec::new(csv)),
-            None,
-        )?);
+        ));
 
         let task_ctx = session_ctx.task_ctx();
         let result = collect(sort_exec.clone(), task_ctx).await?;
@@ -1079,27 +1111,29 @@ mod tests {
             let csv = test::scan_partitioned_csv(partitions)?;
             let schema = csv.schema();
 
-            let sort_exec = Arc::new(SortExec::try_new(
-                vec![
-                    // c1 string column
-                    PhysicalSortExpr {
-                        expr: col("c1", &schema)?,
-                        options: SortOptions::default(),
-                    },
-                    // c2 uin32 column
-                    PhysicalSortExpr {
-                        expr: col("c2", &schema)?,
-                        options: SortOptions::default(),
-                    },
-                    // c7 uin8 column
-                    PhysicalSortExpr {
-                        expr: col("c7", &schema)?,
-                        options: SortOptions::default(),
-                    },
-                ],
-                Arc::new(CoalescePartitionsExec::new(csv)),
-                fetch,
-            )?);
+            let sort_exec = Arc::new(
+                SortExec::new(
+                    vec![
+                        // c1 string column
+                        PhysicalSortExpr {
+                            expr: col("c1", &schema)?,
+                            options: SortOptions::default(),
+                        },
+                        // c2 uin32 column
+                        PhysicalSortExpr {
+                            expr: col("c2", &schema)?,
+                            options: SortOptions::default(),
+                        },
+                        // c7 uin8 column
+                        PhysicalSortExpr {
+                            expr: col("c7", &schema)?,
+                            options: SortOptions::default(),
+                        },
+                    ],
+                    Arc::new(CoalescePartitionsExec::new(csv)),
+                )
+                .with_fetch(fetch),
+            );
 
             let task_ctx = session_ctx.task_ctx();
             let result = collect(sort_exec.clone(), task_ctx).await?;
@@ -1137,14 +1171,13 @@ mod tests {
         let input =
             Arc::new(MemoryExec::try_new(&[vec![batch]], schema.clone(), None).unwrap());
 
-        let sort_exec = Arc::new(SortExec::try_new(
+        let sort_exec = Arc::new(SortExec::new(
             vec![PhysicalSortExpr {
                 expr: col("field_name", &schema)?,
                 options: SortOptions::default(),
             }],
             input,
-            None,
-        )?);
+        ));
 
         let result: Vec<RecordBatch> = collect(sort_exec, task_ctx).await?;
 
@@ -1199,7 +1232,7 @@ mod tests {
             ],
         )?;
 
-        let sort_exec = Arc::new(SortExec::try_new(
+        let sort_exec = Arc::new(SortExec::new(
             vec![
                 PhysicalSortExpr {
                     expr: col("a", &schema)?,
@@ -1217,8 +1250,7 @@ mod tests {
                 },
             ],
             Arc::new(MemoryExec::try_new(&[vec![batch]], schema, None)?),
-            None,
-        )?);
+        ));
 
         assert_eq!(DataType::Float32, *sort_exec.schema().field(0).data_type());
         assert_eq!(DataType::Float64, *sort_exec.schema().field(1).data_type());
@@ -1279,14 +1311,13 @@ mod tests {
 
         let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 1));
         let refs = blocking_exec.refs();
-        let sort_exec = Arc::new(SortExec::try_new(
+        let sort_exec = Arc::new(SortExec::new(
             vec![PhysicalSortExpr {
                 expr: col("a", &schema)?,
                 options: SortOptions::default(),
             }],
             blocking_exec,
-            None,
-        )?);
+        ));
 
         let fut = collect(sort_exec, task_ctx);
         let mut fut = fut.boxed();
diff --git a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
index 2c1c3bbfcd..e96be05f4a 100644
--- a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
@@ -522,12 +522,8 @@ mod tests {
         sort: Vec<PhysicalSortExpr>,
         context: Arc<TaskContext>,
     ) -> RecordBatch {
-        let sort_exec = Arc::new(SortExec::new_with_partitioning(
-            sort.clone(),
-            input,
-            true,
-            None,
-        ));
+        let sort_exec =
+            Arc::new(SortExec::new(sort.clone(), input).with_preserve_partitioning(true));
         sorted_merge(sort_exec, sort, context).await
     }
 
@@ -537,7 +533,7 @@ mod tests {
         context: Arc<TaskContext>,
     ) -> RecordBatch {
         let merge = Arc::new(CoalescePartitionsExec::new(src));
-        let sort_exec = Arc::new(SortExec::try_new(sort, merge, None).unwrap());
+        let sort_exec = Arc::new(SortExec::new(sort, merge));
         let mut result = collect(sort_exec, context).await.unwrap();
         assert_eq!(result.len(), 1);
         result.remove(0)
diff --git a/datafusion/core/tests/order_spill_fuzz.rs b/datafusion/core/tests/order_spill_fuzz.rs
index 42a7d58d27..fdf69ec0d1 100644
--- a/datafusion/core/tests/order_spill_fuzz.rs
+++ b/datafusion/core/tests/order_spill_fuzz.rs
@@ -74,7 +74,7 @@ async fn run_sort(pool_size: usize, size_spill: Vec<(usize, bool)>) {
         }];
 
         let exec = MemoryExec::try_new(&input, schema, None).unwrap();
-        let sort = Arc::new(SortExec::try_new(sort, Arc::new(exec), None).unwrap());
+        let sort = Arc::new(SortExec::new(sort, Arc::new(exec)));
 
         let runtime_config = RuntimeConfig::new()
             .with_memory_pool(Arc::new(GreedyMemoryPool::new(pool_size)));
diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs
index 7711b3c961..5bf82e423c 100644
--- a/datafusion/proto/src/physical_plan/mod.rs
+++ b/datafusion/proto/src/physical_plan/mod.rs
@@ -613,12 +613,11 @@ impl AsExecutionPlan for PhysicalPlanNode {
                 } else {
                     Some(sort.fetch as usize)
                 };
-                Ok(Arc::new(SortExec::new_with_partitioning(
-                    exprs,
-                    input,
-                    sort.preserve_partitioning,
-                    fetch,
-                )))
+                let new_sort = SortExec::new(exprs, input)
+                    .with_fetch(fetch)
+                    .with_preserve_partitioning(sort.preserve_partitioning);
+
+                Ok(Arc::new(new_sort))
             }
             PhysicalPlanType::SortPreservingMerge(sort) => {
                 let input: Arc<dyn ExecutionPlan> =
@@ -1438,11 +1437,10 @@ mod roundtrip_tests {
                 },
             },
         ];
-        roundtrip_test(Arc::new(SortExec::try_new(
+        roundtrip_test(Arc::new(SortExec::new(
             sort_exprs,
             Arc::new(EmptyExec::new(false, schema)),
-            None,
-        )?))
+        )))
     }
 
     #[test]
@@ -1467,19 +1465,15 @@ mod roundtrip_tests {
             },
         ];
 
-        roundtrip_test(Arc::new(SortExec::new_with_partitioning(
+        roundtrip_test(Arc::new(SortExec::new(
             sort_exprs.clone(),
             Arc::new(EmptyExec::new(false, schema.clone())),
-            false,
-            None,
         )))?;
 
-        roundtrip_test(Arc::new(SortExec::new_with_partitioning(
-            sort_exprs,
-            Arc::new(EmptyExec::new(false, schema)),
-            true,
-            None,
-        )))
+        roundtrip_test(Arc::new(
+            SortExec::new(sort_exprs, Arc::new(EmptyExec::new(false, schema)))
+                .with_preserve_partitioning(true),
+        ))
     }
 
     #[test]