You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by dh...@apache.org on 2022/12/16 19:55:05 UTC
[arrow-datafusion] branch master updated: Add need_data_exchange in the ExecutionPlan to indicate whether a physical operator needs data exchange (#4586)
This is an automated email from the ASF dual-hosted git repository.
dheres pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 920f11a64 Add need_data_exchange in the ExecutionPlan to indicate whether a physical operator needs data exchange (#4586)
920f11a64 is described below
commit 920f11a64aa4d7a4571944c96d2d49f21aae2c7e
Author: yahoNanJing <90...@users.noreply.github.com>
AuthorDate: Sat Dec 17 03:54:59 2022 +0800
Add need_data_exchange in the ExecutionPlan to indicate whether a physical operator needs data exchange (#4586)
* Add need_data_exchange in the ExecutionPlan to indicate whether a physical operator needs data exchange
* Always Prefer SortPreservingMergeExec to the global SortExec
* Temporary remove unsupported ut caused by arrow-rs
* Move out the method need_data_exchange from ExecutionPlan
* Revert "Temporary remove unsupported ut caused by arrow-rs"
This reverts commit fc136f618c625d0762c21a58b13840d295a991ba.
* Fix for comments
* Deal with controversial part
Co-authored-by: yangzhong <ya...@ebay.com>
---
.../core/src/physical_optimizer/enforcement.rs | 48 ++++++++++++++++++----
datafusion/core/src/physical_plan/mod.rs | 38 +++++++++++++++--
datafusion/core/src/physical_plan/planner.rs | 18 +-------
datafusion/core/tests/sql/explain_analyze.rs | 3 +-
4 files changed, 76 insertions(+), 31 deletions(-)
diff --git a/datafusion/core/src/physical_optimizer/enforcement.rs b/datafusion/core/src/physical_optimizer/enforcement.rs
index 3110061c4..3da9d2477 100644
--- a/datafusion/core/src/physical_optimizer/enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/enforcement.rs
@@ -31,6 +31,7 @@ use crate::physical_plan::repartition::RepartitionExec;
use crate::physical_plan::rewrite::TreeNodeRewritable;
use crate::physical_plan::sorts::sort::SortExec;
use crate::physical_plan::sorts::sort::SortOptions;
+use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use crate::physical_plan::windows::WindowAggExec;
use crate::physical_plan::Partitioning;
use crate::physical_plan::{with_new_children_if_necessary, Distribution, ExecutionPlan};
@@ -835,6 +836,9 @@ fn new_join_conditions(
new_join_on
}
+/// Within this function, it checks whether we need to add additional plan operators
+/// of data exchanging and data ordering to satisfy the required distribution and ordering.
+/// And we should avoid to manually add plan operators of data exchanging and data ordering in other places
fn ensure_distribution_and_ordering(
plan: Arc<dyn crate::physical_plan::ExecutionPlan>,
target_partitions: usize,
@@ -842,6 +846,37 @@ fn ensure_distribution_and_ordering(
if plan.children().is_empty() {
return Ok(plan);
}
+ // It's mainly for changing the single node global SortExec to
+ // the SortPreservingMergeExec with multiple local SortExec.
+ // What's more, if limit exists, it can also be pushed down to the local sort
+ let plan = plan
+ .as_any()
+ .downcast_ref::<SortExec>()
+ .and_then(|sort_exec| {
+ // There are three situations that there's no need for this optimization
+ // - There's only one input partition;
+ // - It's already preserving the partitioning so that it can be regarded as a local sort
+ // - There's no limit pushed down to the local sort (It's still controversial)
+ if sort_exec.input().output_partitioning().partition_count() > 1
+ && !sort_exec.preserve_partitioning()
+ && sort_exec.fetch().is_some()
+ {
+ let sort = SortExec::new_with_partitioning(
+ sort_exec.expr().to_vec(),
+ sort_exec.input().clone(),
+ true,
+ sort_exec.fetch(),
+ );
+ Some(Arc::new(SortPreservingMergeExec::new(
+ sort_exec.expr().to_vec(),
+ Arc::new(sort),
+ )))
+ } else {
+ None
+ }
+ })
+ .map_or(plan, |new_plan| new_plan);
+
let required_input_distributions = plan.required_input_distribution();
let required_input_orderings = plan.required_input_ordering();
let children: Vec<Arc<dyn ExecutionPlan>> = plan.children();
@@ -874,7 +909,7 @@ fn ensure_distribution_and_ordering(
}
});
- // Add SortExec to guarantee output ordering
+ // Add local SortExec to guarantee output ordering within each partition
let new_children: Result<Vec<Arc<dyn ExecutionPlan>>> = children
.zip(required_input_orderings.into_iter())
.map(|(child_result, required)| {
@@ -885,14 +920,9 @@ fn ensure_distribution_and_ordering(
Ok(child)
} else {
let sort_expr = required.unwrap().to_vec();
- if child.output_partitioning().partition_count() > 1 {
- Ok(Arc::new(SortExec::new_with_partitioning(
- sort_expr, child, true, None,
- )) as Arc<dyn ExecutionPlan>)
- } else {
- Ok(Arc::new(SortExec::try_new(sort_expr, child, None)?)
- as Arc<dyn ExecutionPlan>)
- }
+ Ok(Arc::new(SortExec::new_with_partitioning(
+ sort_expr, child, true, None,
+ )) as Arc<dyn ExecutionPlan>)
}
})
.collect();
diff --git a/datafusion/core/src/physical_plan/mod.rs b/datafusion/core/src/physical_plan/mod.rs
index 24f5b2daa..5a247e167 100644
--- a/datafusion/core/src/physical_plan/mod.rs
+++ b/datafusion/core/src/physical_plan/mod.rs
@@ -110,8 +110,8 @@ pub trait ExecutionPlan: Debug + Send + Sync {
/// Specifies the output partitioning scheme of this plan
fn output_partitioning(&self) -> Partitioning;
- /// If the output of this operator is sorted, returns `Some(keys)`
- /// with the description of how it was sorted.
+ /// If the output of this operator within each partition is sorted,
+ /// returns `Some(keys)` with the description of how it was sorted.
///
/// For example, Sort, (obviously) produces sorted output as does
/// SortPreservingMergeStream. Less obviously `Projection`
@@ -128,8 +128,8 @@ pub trait ExecutionPlan: Debug + Send + Sync {
vec![Distribution::UnspecifiedDistribution; self.children().len()]
}
- /// Specifies the ordering requirements for all the
- /// children for this operator.
+ /// Specifies the ordering requirements for all of the children
+ /// For each child, it's the local ordering requirement within each partition rather than the global ordering
fn required_input_ordering(&self) -> Vec<Option<&[PhysicalSortExpr]>> {
vec![None; self.children().len()]
}
@@ -243,6 +243,34 @@ pub trait ExecutionPlan: Debug + Send + Sync {
fn statistics(&self) -> Statistics;
}
+/// Indicate whether a data exchange is needed for the input of `plan`, which will be very helpful
+/// especially for the distributed engine to judge whether need to deal with shuffling.
+/// Currently there are 3 kinds of execution plan which needs data exchange
+/// 1. RepartitionExec for changing the partition number between two operators
+/// 2. CoalescePartitionsExec for collapsing all of the partitions into one without ordering guarantee
+/// 3. SortPreservingMergeExec for collapsing all of the sorted partitions into one with ordering guarantee
+pub fn need_data_exchange(plan: Arc<dyn ExecutionPlan>) -> bool {
+ if let Some(repart) = plan.as_any().downcast_ref::<RepartitionExec>() {
+ !matches!(
+ repart.output_partitioning(),
+ Partitioning::RoundRobinBatch(_)
+ )
+ } else if let Some(coalesce) = plan.as_any().downcast_ref::<CoalescePartitionsExec>()
+ {
+ coalesce.input().output_partitioning().partition_count() > 1
+ } else if let Some(sort_preserving_merge) =
+ plan.as_any().downcast_ref::<SortPreservingMergeExec>()
+ {
+ sort_preserving_merge
+ .input()
+ .output_partitioning()
+ .partition_count()
+ > 1
+ } else {
+ false
+ }
+}
+
/// Returns a copy of this plan if we change any child according to the pointer comparison.
/// The size of `children` must be equal to the size of `ExecutionPlan::children()`.
/// Allow the vtable address comparisons for ExecutionPlan Trait Objects,it is harmless even
@@ -655,6 +683,8 @@ pub mod values;
pub mod windows;
use crate::execution::context::TaskContext;
+use crate::physical_plan::repartition::RepartitionExec;
+use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
pub use datafusion_physical_expr::{
expressions, functions, hash_utils, type_coercion, udf,
};
diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs
index d0d74c54f..5ba8cabe9 100644
--- a/datafusion/core/src/physical_plan/planner.rs
+++ b/datafusion/core/src/physical_plan/planner.rs
@@ -18,7 +18,6 @@
//! Physical query planner
use super::analyze::AnalyzeExec;
-use super::sorts::sort_preserving_merge::SortPreservingMergeExec;
use super::{
aggregates, empty::EmptyExec, joins::PartitionMode, udaf, union::UnionExec,
values::ValuesExec, windows,
@@ -838,22 +837,7 @@ impl DefaultPhysicalPlanner {
)),
})
.collect::<Result<Vec<_>>>()?;
- // If we have a `LIMIT` can run sort/limts in parallel (similar to TopK)
- Ok(if fetch.is_some() && session_state.config.target_partitions() > 1 {
- let sort = SortExec::new_with_partitioning(
- sort_expr,
- physical_input,
- true,
- *fetch,
- );
- let merge = SortPreservingMergeExec::new(
- sort.expr().to_vec(),
- Arc::new(sort),
- );
- Arc::new(merge)
- } else {
- Arc::new(SortExec::try_new(sort_expr, physical_input, *fetch)?)
- })
+ Ok(Arc::new(SortExec::try_new(sort_expr, physical_input, *fetch)?))
}
LogicalPlan::Join(Join {
left,
diff --git a/datafusion/core/tests/sql/explain_analyze.rs b/datafusion/core/tests/sql/explain_analyze.rs
index 89112adae..b6bbb3cff 100644
--- a/datafusion/core/tests/sql/explain_analyze.rs
+++ b/datafusion/core/tests/sql/explain_analyze.rs
@@ -92,10 +92,11 @@ async fn explain_analyze_baseline_metrics() {
"CoalesceBatchesExec: target_batch_size=4096",
"metrics=[output_rows=5, elapsed_compute"
);
+ // The number of output rows becomes less after changing the global sort to the local sort with limit push down
assert_metrics!(
&formatted,
"CoalescePartitionsExec",
- "metrics=[output_rows=5, elapsed_compute="
+ "metrics=[output_rows=3, elapsed_compute="
);
assert_metrics!(
&formatted,