You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/04/10 14:58:43 UTC
[arrow-datafusion] branch main updated: Clean up SortExec creation and add doc comments (#5889)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new db2fa44b77 Clean up SortExec creation and add doc comments (#5889)
db2fa44b77 is described below
commit db2fa44b779a5274bc06d71364cd20e0e4b666bb
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Mon Apr 10 10:58:36 2023 -0400
Clean up SortExec creation and add doc comments (#5889)
* Clean up SortExec creation and add doc comments
* Reduce API surface
* restore sort bench
* fix benchmark
* Add with_fetch
---
benchmarks/src/bin/parquet.rs | 2 +-
datafusion/core/benches/sort.rs | 6 +-
.../physical_optimizer/global_sort_selection.rs | 10 +-
.../core/src/physical_optimizer/repartition.rs | 9 +-
.../src/physical_optimizer/sort_enforcement.rs | 39 ++----
datafusion/core/src/physical_optimizer/utils.rs | 6 +-
datafusion/core/src/physical_plan/common.rs | 2 +-
datafusion/core/src/physical_plan/planner.rs | 4 +-
datafusion/core/src/physical_plan/sorts/sort.rs | 143 +++++++++++++--------
.../physical_plan/sorts/sort_preserving_merge.rs | 10 +-
datafusion/core/tests/order_spill_fuzz.rs | 2 +-
datafusion/proto/src/physical_plan/mod.rs | 30 ++---
12 files changed, 137 insertions(+), 126 deletions(-)
diff --git a/benchmarks/src/bin/parquet.rs b/benchmarks/src/bin/parquet.rs
index 7ddc18c07a..589f967a6d 100644
--- a/benchmarks/src/bin/parquet.rs
+++ b/benchmarks/src/bin/parquet.rs
@@ -325,7 +325,7 @@ async fn exec_sort(
) -> Result<(usize, std::time::Duration)> {
let start = Instant::now();
let scan = test_file.create_scan(None).await?;
- let exec = Arc::new(SortExec::try_new(expr.to_owned(), scan, None)?);
+ let exec = Arc::new(SortExec::new(expr.to_owned(), scan));
let task_ctx = ctx.task_ctx();
let result = collect(exec, task_ctx).await?;
let elapsed = start.elapsed();
diff --git a/datafusion/core/benches/sort.rs b/datafusion/core/benches/sort.rs
index 907392bc3e..4045702d63 100644
--- a/datafusion/core/benches/sort.rs
+++ b/datafusion/core/benches/sort.rs
@@ -189,7 +189,7 @@ impl BenchCase {
let exec = MemoryExec::try_new(partitions, schema, None).unwrap();
let exec =
- SortExec::new_with_partitioning(sort.clone(), Arc::new(exec), true, None);
+ SortExec::new(sort.clone(), Arc::new(exec)).with_preserve_partitioning(true);
let plan = Arc::new(SortPreservingMergeExec::new(sort, Arc::new(exec)));
Self {
@@ -211,7 +211,7 @@ impl BenchCase {
let exec = MemoryExec::try_new(partitions, schema, None).unwrap();
let exec = Arc::new(CoalescePartitionsExec::new(Arc::new(exec)));
- let plan = Arc::new(SortExec::try_new(sort, exec, None).unwrap());
+ let plan = Arc::new(SortExec::new(sort, exec));
Self {
runtime,
@@ -231,7 +231,7 @@ impl BenchCase {
let sort = make_sort_exprs(schema.as_ref());
let exec = MemoryExec::try_new(partitions, schema, None).unwrap();
- let exec = SortExec::new_with_partitioning(sort, Arc::new(exec), true, None);
+ let exec = SortExec::new(sort, Arc::new(exec)).with_preserve_partitioning(true);
let plan = Arc::new(CoalescePartitionsExec::new(Arc::new(exec)));
Self {
diff --git a/datafusion/core/src/physical_optimizer/global_sort_selection.rs b/datafusion/core/src/physical_optimizer/global_sort_selection.rs
index e29735e741..9466297d24 100644
--- a/datafusion/core/src/physical_optimizer/global_sort_selection.rs
+++ b/datafusion/core/src/physical_optimizer/global_sort_selection.rs
@@ -60,12 +60,12 @@ impl PhysicalOptimizerRule for GlobalSortSelection {
&& !sort_exec.preserve_partitioning()
&& (sort_exec.fetch().is_some() || config.optimizer.repartition_sorts)
{
- let sort = SortExec::new_with_partitioning(
+ let sort = SortExec::new(
sort_exec.expr().to_vec(),
- sort_exec.input().clone(),
- true,
- sort_exec.fetch(),
- );
+ sort_exec.input().clone()
+ )
+ .with_fetch(sort_exec.fetch())
+ .with_preserve_partitioning(true);
let global_sort: Arc<dyn ExecutionPlan> =
Arc::new(SortPreservingMergeExec::new(
sort_exec.expr().to_vec(),
diff --git a/datafusion/core/src/physical_optimizer/repartition.rs b/datafusion/core/src/physical_optimizer/repartition.rs
index 35592519a4..3bb21b12be 100644
--- a/datafusion/core/src/physical_optimizer/repartition.rs
+++ b/datafusion/core/src/physical_optimizer/repartition.rs
@@ -460,12 +460,9 @@ mod tests {
expr: col("c1", &schema()).unwrap(),
options: SortOptions::default(),
}];
- Arc::new(SortExec::new_with_partitioning(
- sort_exprs,
- input,
- preserve_partitioning,
- None,
- ))
+ let new_sort = SortExec::new(sort_exprs, input)
+ .with_preserve_partitioning(preserve_partitioning);
+ Arc::new(new_sort)
}
fn projection_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
diff --git a/datafusion/core/src/physical_optimizer/sort_enforcement.rs b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
index 26299e4683..b1a4da65e0 100644
--- a/datafusion/core/src/physical_optimizer/sort_enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
@@ -1210,12 +1210,10 @@ mod tests {
sort_expr("non_nullable_col", &schema),
];
let repartition_exec = repartition_exec(spm);
- let sort2 = Arc::new(SortExec::new_with_partitioning(
- sort_exprs.clone(),
- repartition_exec,
- true,
- None,
- )) as _;
+ let sort2 = Arc::new(
+ SortExec::new(sort_exprs.clone(), repartition_exec)
+ .with_preserve_partitioning(true),
+ ) as _;
let spm2 = sort_preserving_merge_exec(sort_exprs, sort2);
let physical_plan = aggregate_exec(spm2);
@@ -1253,12 +1251,9 @@ mod tests {
let sort_exprs = vec![sort_expr("non_nullable_col", &schema)];
// let sort = sort_exec(sort_exprs.clone(), union);
- let sort = Arc::new(SortExec::new_with_partitioning(
- sort_exprs.clone(),
- union,
- true,
- None,
- )) as _;
+ let sort = Arc::new(
+ SortExec::new(sort_exprs.clone(), union).with_preserve_partitioning(true),
+ ) as _;
let spm = sort_preserving_merge_exec(sort_exprs, sort);
let filter = filter_exec(
@@ -2264,12 +2259,8 @@ mod tests {
let window = bounded_window_exec("nullable_col", sort_exprs.clone(), memory_exec);
let repartition = repartition_exec(window);
- let orig_plan = Arc::new(SortExec::new_with_partitioning(
- sort_exprs,
- repartition,
- false,
- None,
- )) as Arc<dyn ExecutionPlan>;
+ let orig_plan =
+ Arc::new(SortExec::new(sort_exprs, repartition)) as Arc<dyn ExecutionPlan>;
let mut plan = orig_plan.clone();
let rules = vec![
@@ -2305,12 +2296,10 @@ mod tests {
let repartition = repartition_exec(coalesce_partitions);
let sort_exprs = vec![sort_expr("nullable_col", &schema)];
// Add local sort
- let sort = Arc::new(SortExec::new_with_partitioning(
- sort_exprs.clone(),
- repartition,
- true,
- None,
- )) as _;
+ let sort = Arc::new(
+ SortExec::new(sort_exprs.clone(), repartition)
+ .with_preserve_partitioning(true),
+ ) as _;
let spm = sort_preserving_merge_exec(sort_exprs.clone(), sort);
let sort = sort_exec(sort_exprs, spm);
@@ -2362,7 +2351,7 @@ mod tests {
input: Arc<dyn ExecutionPlan>,
) -> Arc<dyn ExecutionPlan> {
let sort_exprs = sort_exprs.into_iter().collect();
- Arc::new(SortExec::try_new(sort_exprs, input, None).unwrap())
+ Arc::new(SortExec::new(sort_exprs, input))
}
fn sort_preserving_merge_exec(
diff --git a/datafusion/core/src/physical_optimizer/utils.rs b/datafusion/core/src/physical_optimizer/utils.rs
index 2fa833bb7e..06bef0fbda 100644
--- a/datafusion/core/src/physical_optimizer/utils.rs
+++ b/datafusion/core/src/physical_optimizer/utils.rs
@@ -66,10 +66,12 @@ pub fn add_sort_above(
if !ordering_satisfy(node.output_ordering(), Some(&sort_expr), || {
node.equivalence_properties()
}) {
+ let new_sort = SortExec::new(sort_expr, node.clone());
+
*node = Arc::new(if node.output_partitioning().partition_count() > 1 {
- SortExec::new_with_partitioning(sort_expr, node.clone(), true, None)
+ new_sort.with_preserve_partitioning(true)
} else {
- SortExec::try_new(sort_expr, node.clone(), None)?
+ new_sort
}) as _
}
Ok(())
diff --git a/datafusion/core/src/physical_plan/common.rs b/datafusion/core/src/physical_plan/common.rs
index 7fb67d758f..42cd8fada9 100644
--- a/datafusion/core/src/physical_plan/common.rs
+++ b/datafusion/core/src/physical_plan/common.rs
@@ -485,7 +485,7 @@ mod tests {
options: SortOptions::default(),
}];
let memory_exec = Arc::new(MemoryExec::try_new(&[], schema.clone(), None)?) as _;
- let sort_exec = Arc::new(SortExec::try_new(sort_expr.clone(), memory_exec, None)?)
+ let sort_exec = Arc::new(SortExec::new(sort_expr.clone(), memory_exec))
as Arc<dyn ExecutionPlan>;
let memory_exec2 = Arc::new(MemoryExec::try_new(&[], schema, None)?) as _;
// memory_exec2 doesn't have output ordering
diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs
index 8c3f61b01e..01efa2f796 100644
--- a/datafusion/core/src/physical_plan/planner.rs
+++ b/datafusion/core/src/physical_plan/planner.rs
@@ -821,7 +821,9 @@ impl DefaultPhysicalPlanner {
)),
})
.collect::<Result<Vec<_>>>()?;
- Ok(Arc::new(SortExec::try_new(sort_expr, physical_input, *fetch)?))
+ let new_sort = SortExec::new(sort_expr, physical_input)
+ .with_fetch(*fetch);
+ Ok(Arc::new(new_sort))
}
LogicalPlan::Join(Join {
left,
diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs
index 1eceb02c2d..6037bc5c8b 100644
--- a/datafusion/core/src/physical_plan/sorts/sort.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort.rs
@@ -621,7 +621,10 @@ fn read_spill(sender: Sender<Result<RecordBatch>>, path: &Path) -> Result<()> {
Ok(())
}
-/// External Sort execution plan
+/// Sort execution plan.
+///
+/// This operator supports sorting datasets that are larger than the
+/// memory allotted by the memory manager, by spilling to disk.
#[derive(Debug)]
pub struct SortExec {
/// Input schema
@@ -630,7 +633,8 @@ pub struct SortExec {
expr: Vec<PhysicalSortExpr>,
/// Containing all metrics set created during sort
metrics_set: CompositeMetricsSet,
- /// Preserve partitions of input plan
+ /// Preserve partitions of input plan. If false, the input partitions
+ /// will be sorted and merged into a single output partition.
preserve_partitioning: bool,
/// Fetch highest/lowest n results
fetch: Option<usize>,
@@ -638,34 +642,65 @@ pub struct SortExec {
impl SortExec {
/// Create a new sort execution plan
+ #[deprecated(since = "22.0.0", note = "use `new` and `with_fetch`")]
pub fn try_new(
expr: Vec<PhysicalSortExpr>,
input: Arc<dyn ExecutionPlan>,
fetch: Option<usize>,
) -> Result<Self> {
- Ok(Self::new_with_partitioning(expr, input, false, fetch))
+ Ok(Self::new(expr, input).with_fetch(fetch))
}
- /// Whether this `SortExec` preserves partitioning of the children
- pub fn preserve_partitioning(&self) -> bool {
- self.preserve_partitioning
+ /// Create a new sort execution plan that produces a single,
+ /// sorted output partition.
+ pub fn new(expr: Vec<PhysicalSortExpr>, input: Arc<dyn ExecutionPlan>) -> Self {
+ Self {
+ expr,
+ input,
+ metrics_set: CompositeMetricsSet::new(),
+ preserve_partitioning: false,
+ fetch: None,
+ }
}
/// Create a new sort execution plan with the option to preserve
/// the partitioning of the input plan
+ #[deprecated(
+ since = "22.0.0",
+ note = "use `new`, `with_fetch` and `with_preserve_partioning` instead"
+ )]
pub fn new_with_partitioning(
expr: Vec<PhysicalSortExpr>,
input: Arc<dyn ExecutionPlan>,
preserve_partitioning: bool,
fetch: Option<usize>,
) -> Self {
- Self {
- expr,
- input,
- metrics_set: CompositeMetricsSet::new(),
- preserve_partitioning,
- fetch,
- }
+ Self::new(expr, input)
+ .with_fetch(fetch)
+ .with_preserve_partitioning(preserve_partitioning)
+ }
+
+ /// Whether this `SortExec` preserves partitioning of the children
+ pub fn preserve_partitioning(&self) -> bool {
+ self.preserve_partitioning
+ }
+
+ /// Specify the partitioning behavior of this sort exec
+ ///
+ /// If `preserve_partitioning` is true, sorts each partition
+ /// individually, producing one sorted strema for each input partition.
+ ///
+ /// If `preserve_partitioning` is false, sorts and merges all
+ /// input partitions producing a single, sorted partition.
+ pub fn with_preserve_partitioning(mut self, preserve_partitioning: bool) -> Self {
+ self.preserve_partitioning = preserve_partitioning;
+ self
+ }
+
+ /// Whether this `SortExec` preserves partitioning of the children
+ pub fn with_fetch(mut self, fetch: Option<usize>) -> Self {
+ self.fetch = fetch;
+ self
}
/// Input schema
@@ -704,7 +739,7 @@ impl ExecutionPlan for SortExec {
/// Specifies whether this plan generates an infinite stream of records.
/// If the plan does not support pipelining, but it its input(s) are
- /// infinite, returns an error to indicate this.
+ /// infinite, returns an error to indicate this.
fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
if children[0] {
Err(DataFusionError::Plan(
@@ -745,12 +780,11 @@ impl ExecutionPlan for SortExec {
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
- Ok(Arc::new(SortExec::new_with_partitioning(
- self.expr.clone(),
- children[0].clone(),
- self.preserve_partitioning,
- self.fetch,
- )))
+ let new_sort = SortExec::new(self.expr.clone(), children[0].clone())
+ .with_fetch(self.fetch)
+ .with_preserve_partitioning(self.preserve_partitioning);
+
+ Ok(Arc::new(new_sort))
}
fn execute(
@@ -935,7 +969,7 @@ mod tests {
let csv = test::scan_partitioned_csv(partitions)?;
let schema = csv.schema();
- let sort_exec = Arc::new(SortExec::try_new(
+ let sort_exec = Arc::new(SortExec::new(
vec![
// c1 string column
PhysicalSortExpr {
@@ -954,8 +988,7 @@ mod tests {
},
],
Arc::new(CoalescePartitionsExec::new(csv)),
- None,
- )?);
+ ));
let result = collect(sort_exec, task_ctx).await?;
@@ -995,7 +1028,7 @@ mod tests {
let csv = test::scan_partitioned_csv(partitions)?;
let schema = csv.schema();
- let sort_exec = Arc::new(SortExec::try_new(
+ let sort_exec = Arc::new(SortExec::new(
vec![
// c1 string column
PhysicalSortExpr {
@@ -1014,8 +1047,7 @@ mod tests {
},
],
Arc::new(CoalescePartitionsExec::new(csv)),
- None,
- )?);
+ ));
let task_ctx = session_ctx.task_ctx();
let result = collect(sort_exec.clone(), task_ctx).await?;
@@ -1079,27 +1111,29 @@ mod tests {
let csv = test::scan_partitioned_csv(partitions)?;
let schema = csv.schema();
- let sort_exec = Arc::new(SortExec::try_new(
- vec![
- // c1 string column
- PhysicalSortExpr {
- expr: col("c1", &schema)?,
- options: SortOptions::default(),
- },
- // c2 uin32 column
- PhysicalSortExpr {
- expr: col("c2", &schema)?,
- options: SortOptions::default(),
- },
- // c7 uin8 column
- PhysicalSortExpr {
- expr: col("c7", &schema)?,
- options: SortOptions::default(),
- },
- ],
- Arc::new(CoalescePartitionsExec::new(csv)),
- fetch,
- )?);
+ let sort_exec = Arc::new(
+ SortExec::new(
+ vec![
+ // c1 string column
+ PhysicalSortExpr {
+ expr: col("c1", &schema)?,
+ options: SortOptions::default(),
+ },
+ // c2 uin32 column
+ PhysicalSortExpr {
+ expr: col("c2", &schema)?,
+ options: SortOptions::default(),
+ },
+ // c7 uin8 column
+ PhysicalSortExpr {
+ expr: col("c7", &schema)?,
+ options: SortOptions::default(),
+ },
+ ],
+ Arc::new(CoalescePartitionsExec::new(csv)),
+ )
+ .with_fetch(fetch),
+ );
let task_ctx = session_ctx.task_ctx();
let result = collect(sort_exec.clone(), task_ctx).await?;
@@ -1137,14 +1171,13 @@ mod tests {
let input =
Arc::new(MemoryExec::try_new(&[vec![batch]], schema.clone(), None).unwrap());
- let sort_exec = Arc::new(SortExec::try_new(
+ let sort_exec = Arc::new(SortExec::new(
vec![PhysicalSortExpr {
expr: col("field_name", &schema)?,
options: SortOptions::default(),
}],
input,
- None,
- )?);
+ ));
let result: Vec<RecordBatch> = collect(sort_exec, task_ctx).await?;
@@ -1199,7 +1232,7 @@ mod tests {
],
)?;
- let sort_exec = Arc::new(SortExec::try_new(
+ let sort_exec = Arc::new(SortExec::new(
vec![
PhysicalSortExpr {
expr: col("a", &schema)?,
@@ -1217,8 +1250,7 @@ mod tests {
},
],
Arc::new(MemoryExec::try_new(&[vec![batch]], schema, None)?),
- None,
- )?);
+ ));
assert_eq!(DataType::Float32, *sort_exec.schema().field(0).data_type());
assert_eq!(DataType::Float64, *sort_exec.schema().field(1).data_type());
@@ -1279,14 +1311,13 @@ mod tests {
let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 1));
let refs = blocking_exec.refs();
- let sort_exec = Arc::new(SortExec::try_new(
+ let sort_exec = Arc::new(SortExec::new(
vec![PhysicalSortExpr {
expr: col("a", &schema)?,
options: SortOptions::default(),
}],
blocking_exec,
- None,
- )?);
+ ));
let fut = collect(sort_exec, task_ctx);
let mut fut = fut.boxed();
diff --git a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
index 2c1c3bbfcd..e96be05f4a 100644
--- a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
@@ -522,12 +522,8 @@ mod tests {
sort: Vec<PhysicalSortExpr>,
context: Arc<TaskContext>,
) -> RecordBatch {
- let sort_exec = Arc::new(SortExec::new_with_partitioning(
- sort.clone(),
- input,
- true,
- None,
- ));
+ let sort_exec =
+ Arc::new(SortExec::new(sort.clone(), input).with_preserve_partitioning(true));
sorted_merge(sort_exec, sort, context).await
}
@@ -537,7 +533,7 @@ mod tests {
context: Arc<TaskContext>,
) -> RecordBatch {
let merge = Arc::new(CoalescePartitionsExec::new(src));
- let sort_exec = Arc::new(SortExec::try_new(sort, merge, None).unwrap());
+ let sort_exec = Arc::new(SortExec::new(sort, merge));
let mut result = collect(sort_exec, context).await.unwrap();
assert_eq!(result.len(), 1);
result.remove(0)
diff --git a/datafusion/core/tests/order_spill_fuzz.rs b/datafusion/core/tests/order_spill_fuzz.rs
index 42a7d58d27..fdf69ec0d1 100644
--- a/datafusion/core/tests/order_spill_fuzz.rs
+++ b/datafusion/core/tests/order_spill_fuzz.rs
@@ -74,7 +74,7 @@ async fn run_sort(pool_size: usize, size_spill: Vec<(usize, bool)>) {
}];
let exec = MemoryExec::try_new(&input, schema, None).unwrap();
- let sort = Arc::new(SortExec::try_new(sort, Arc::new(exec), None).unwrap());
+ let sort = Arc::new(SortExec::new(sort, Arc::new(exec)));
let runtime_config = RuntimeConfig::new()
.with_memory_pool(Arc::new(GreedyMemoryPool::new(pool_size)));
diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs
index 7711b3c961..5bf82e423c 100644
--- a/datafusion/proto/src/physical_plan/mod.rs
+++ b/datafusion/proto/src/physical_plan/mod.rs
@@ -613,12 +613,11 @@ impl AsExecutionPlan for PhysicalPlanNode {
} else {
Some(sort.fetch as usize)
};
- Ok(Arc::new(SortExec::new_with_partitioning(
- exprs,
- input,
- sort.preserve_partitioning,
- fetch,
- )))
+ let new_sort = SortExec::new(exprs, input)
+ .with_fetch(fetch)
+ .with_preserve_partitioning(sort.preserve_partitioning);
+
+ Ok(Arc::new(new_sort))
}
PhysicalPlanType::SortPreservingMerge(sort) => {
let input: Arc<dyn ExecutionPlan> =
@@ -1438,11 +1437,10 @@ mod roundtrip_tests {
},
},
];
- roundtrip_test(Arc::new(SortExec::try_new(
+ roundtrip_test(Arc::new(SortExec::new(
sort_exprs,
Arc::new(EmptyExec::new(false, schema)),
- None,
- )?))
+ )))
}
#[test]
@@ -1467,19 +1465,15 @@ mod roundtrip_tests {
},
];
- roundtrip_test(Arc::new(SortExec::new_with_partitioning(
+ roundtrip_test(Arc::new(SortExec::new(
sort_exprs.clone(),
Arc::new(EmptyExec::new(false, schema.clone())),
- false,
- None,
)))?;
- roundtrip_test(Arc::new(SortExec::new_with_partitioning(
- sort_exprs,
- Arc::new(EmptyExec::new(false, schema)),
- true,
- None,
- )))
+ roundtrip_test(Arc::new(
+ SortExec::new(sort_exprs, Arc::new(EmptyExec::new(false, schema)))
+ .with_preserve_partitioning(true),
+ ))
}
#[test]