You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by dh...@apache.org on 2022/12/16 19:55:05 UTC

[arrow-datafusion] branch master updated: Add need_data_exchange in the ExecutionPlan to indicate whether a physical operator needs data exchange (#4586)

This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 920f11a64 Add need_data_exchange in the ExecutionPlan to indicate whether a physical operator needs data exchange (#4586)
920f11a64 is described below

commit 920f11a64aa4d7a4571944c96d2d49f21aae2c7e
Author: yahoNanJing <90...@users.noreply.github.com>
AuthorDate: Sat Dec 17 03:54:59 2022 +0800

    Add need_data_exchange in the ExecutionPlan to indicate whether a physical operator needs data exchange (#4586)
    
    * Add need_data_exchange in the ExecutionPlan to indicate whether a physical operator needs data exchange
    
    * Always Prefer SortPreservingMergeExec to the global SortExec
    
    * Temporary remove unsupported ut caused by arrow-rs
    
    * Move out the method need_data_exchange from ExecutionPlan
    
    * Revert "Temporary remove unsupported ut caused by arrow-rs"
    
    This reverts commit fc136f618c625d0762c21a58b13840d295a991ba.
    
    * Fix for comments
    
    * Deal with controversial part
    
    Co-authored-by: yangzhong <ya...@ebay.com>
---
 .../core/src/physical_optimizer/enforcement.rs     | 48 ++++++++++++++++++----
 datafusion/core/src/physical_plan/mod.rs           | 38 +++++++++++++++--
 datafusion/core/src/physical_plan/planner.rs       | 18 +-------
 datafusion/core/tests/sql/explain_analyze.rs       |  3 +-
 4 files changed, 76 insertions(+), 31 deletions(-)

diff --git a/datafusion/core/src/physical_optimizer/enforcement.rs b/datafusion/core/src/physical_optimizer/enforcement.rs
index 3110061c4..3da9d2477 100644
--- a/datafusion/core/src/physical_optimizer/enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/enforcement.rs
@@ -31,6 +31,7 @@ use crate::physical_plan::repartition::RepartitionExec;
 use crate::physical_plan::rewrite::TreeNodeRewritable;
 use crate::physical_plan::sorts::sort::SortExec;
 use crate::physical_plan::sorts::sort::SortOptions;
+use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
 use crate::physical_plan::windows::WindowAggExec;
 use crate::physical_plan::Partitioning;
 use crate::physical_plan::{with_new_children_if_necessary, Distribution, ExecutionPlan};
@@ -835,6 +836,9 @@ fn new_join_conditions(
     new_join_on
 }
 
+/// Within this function, it checks whether we need to add additional plan operators
+/// of data exchanging and data ordering to satisfy the required distribution and ordering.
+/// And we should avoid to manually add plan operators of data exchanging and data ordering in other places
 fn ensure_distribution_and_ordering(
     plan: Arc<dyn crate::physical_plan::ExecutionPlan>,
     target_partitions: usize,
@@ -842,6 +846,37 @@ fn ensure_distribution_and_ordering(
     if plan.children().is_empty() {
         return Ok(plan);
     }
+    // It's mainly for changing the single node global SortExec to
+    // the SortPreservingMergeExec with multiple local SortExec.
+    // What's more, if limit exists, it can also be pushed down to the local sort
+    let plan = plan
+        .as_any()
+        .downcast_ref::<SortExec>()
+        .and_then(|sort_exec| {
+            // There are three situations that there's no need for this optimization
+            // - There's only one input partition;
+            // - It's already preserving the partitioning so that it can be regarded as a local sort
+            // - There's no limit pushed down to the local sort (It's still controversial)
+            if sort_exec.input().output_partitioning().partition_count() > 1
+                && !sort_exec.preserve_partitioning()
+                && sort_exec.fetch().is_some()
+            {
+                let sort = SortExec::new_with_partitioning(
+                    sort_exec.expr().to_vec(),
+                    sort_exec.input().clone(),
+                    true,
+                    sort_exec.fetch(),
+                );
+                Some(Arc::new(SortPreservingMergeExec::new(
+                    sort_exec.expr().to_vec(),
+                    Arc::new(sort),
+                )))
+            } else {
+                None
+            }
+        })
+        .map_or(plan, |new_plan| new_plan);
+
     let required_input_distributions = plan.required_input_distribution();
     let required_input_orderings = plan.required_input_ordering();
     let children: Vec<Arc<dyn ExecutionPlan>> = plan.children();
@@ -874,7 +909,7 @@ fn ensure_distribution_and_ordering(
             }
         });
 
-    // Add SortExec to guarantee output ordering
+    // Add local SortExec to guarantee output ordering within each partition
     let new_children: Result<Vec<Arc<dyn ExecutionPlan>>> = children
         .zip(required_input_orderings.into_iter())
         .map(|(child_result, required)| {
@@ -885,14 +920,9 @@ fn ensure_distribution_and_ordering(
                 Ok(child)
             } else {
                 let sort_expr = required.unwrap().to_vec();
-                if child.output_partitioning().partition_count() > 1 {
-                    Ok(Arc::new(SortExec::new_with_partitioning(
-                        sort_expr, child, true, None,
-                    )) as Arc<dyn ExecutionPlan>)
-                } else {
-                    Ok(Arc::new(SortExec::try_new(sort_expr, child, None)?)
-                        as Arc<dyn ExecutionPlan>)
-                }
+                Ok(Arc::new(SortExec::new_with_partitioning(
+                    sort_expr, child, true, None,
+                )) as Arc<dyn ExecutionPlan>)
             }
         })
         .collect();
diff --git a/datafusion/core/src/physical_plan/mod.rs b/datafusion/core/src/physical_plan/mod.rs
index 24f5b2daa..5a247e167 100644
--- a/datafusion/core/src/physical_plan/mod.rs
+++ b/datafusion/core/src/physical_plan/mod.rs
@@ -110,8 +110,8 @@ pub trait ExecutionPlan: Debug + Send + Sync {
     /// Specifies the output partitioning scheme of this plan
     fn output_partitioning(&self) -> Partitioning;
 
-    /// If the output of this operator is sorted, returns `Some(keys)`
-    /// with the description of how it was sorted.
+    /// If the output of this operator within each partition is sorted,
+    /// returns `Some(keys)` with the description of how it was sorted.
     ///
     /// For example, Sort, (obviously) produces sorted output as does
     /// SortPreservingMergeStream. Less obviously `Projection`
@@ -128,8 +128,8 @@ pub trait ExecutionPlan: Debug + Send + Sync {
         vec![Distribution::UnspecifiedDistribution; self.children().len()]
     }
 
-    /// Specifies the ordering requirements for all the
-    /// children for this operator.
+    /// Specifies the ordering requirements for all of the children
+    /// For each child, it's the local ordering requirement within each partition rather than the global ordering
     fn required_input_ordering(&self) -> Vec<Option<&[PhysicalSortExpr]>> {
         vec![None; self.children().len()]
     }
@@ -243,6 +243,34 @@ pub trait ExecutionPlan: Debug + Send + Sync {
     fn statistics(&self) -> Statistics;
 }
 
+/// Indicate whether a data exchange is needed for the input of `plan`, which will be very helpful
+/// especially for the distributed engine to judge whether need to deal with shuffling.
+/// Currently there are 3 kinds of execution plan which needs data exchange
+///     1. RepartitionExec for changing the partition number between two operators
+///     2. CoalescePartitionsExec for collapsing all of the partitions into one without ordering guarantee
+///     3. SortPreservingMergeExec for collapsing all of the sorted partitions into one with ordering guarantee
+pub fn need_data_exchange(plan: Arc<dyn ExecutionPlan>) -> bool {
+    if let Some(repart) = plan.as_any().downcast_ref::<RepartitionExec>() {
+        !matches!(
+            repart.output_partitioning(),
+            Partitioning::RoundRobinBatch(_)
+        )
+    } else if let Some(coalesce) = plan.as_any().downcast_ref::<CoalescePartitionsExec>()
+    {
+        coalesce.input().output_partitioning().partition_count() > 1
+    } else if let Some(sort_preserving_merge) =
+        plan.as_any().downcast_ref::<SortPreservingMergeExec>()
+    {
+        sort_preserving_merge
+            .input()
+            .output_partitioning()
+            .partition_count()
+            > 1
+    } else {
+        false
+    }
+}
+
 /// Returns a copy of this plan if we change any child according to the pointer comparison.
 /// The size of `children` must be equal to the size of `ExecutionPlan::children()`.
 /// Allow the vtable address comparisons for ExecutionPlan Trait Objects,it is harmless even
@@ -655,6 +683,8 @@ pub mod values;
 pub mod windows;
 
 use crate::execution::context::TaskContext;
+use crate::physical_plan::repartition::RepartitionExec;
+use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
 pub use datafusion_physical_expr::{
     expressions, functions, hash_utils, type_coercion, udf,
 };
diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs
index d0d74c54f..5ba8cabe9 100644
--- a/datafusion/core/src/physical_plan/planner.rs
+++ b/datafusion/core/src/physical_plan/planner.rs
@@ -18,7 +18,6 @@
 //! Physical query planner
 
 use super::analyze::AnalyzeExec;
-use super::sorts::sort_preserving_merge::SortPreservingMergeExec;
 use super::{
     aggregates, empty::EmptyExec, joins::PartitionMode, udaf, union::UnionExec,
     values::ValuesExec, windows,
@@ -838,22 +837,7 @@ impl DefaultPhysicalPlanner {
                             )),
                         })
                         .collect::<Result<Vec<_>>>()?;
-                    // If we have a `LIMIT` can run sort/limts in parallel (similar to TopK)
-                    Ok(if fetch.is_some() && session_state.config.target_partitions() > 1 {
-                        let sort = SortExec::new_with_partitioning(
-                            sort_expr,
-                            physical_input,
-                            true,
-                            *fetch,
-                        );
-                        let merge = SortPreservingMergeExec::new(
-                            sort.expr().to_vec(),
-                            Arc::new(sort),
-                        );
-                        Arc::new(merge)
-                    } else {
-                        Arc::new(SortExec::try_new(sort_expr, physical_input, *fetch)?)
-                    })
+                    Ok(Arc::new(SortExec::try_new(sort_expr, physical_input, *fetch)?))
                 }
                 LogicalPlan::Join(Join {
                     left,
diff --git a/datafusion/core/tests/sql/explain_analyze.rs b/datafusion/core/tests/sql/explain_analyze.rs
index 89112adae..b6bbb3cff 100644
--- a/datafusion/core/tests/sql/explain_analyze.rs
+++ b/datafusion/core/tests/sql/explain_analyze.rs
@@ -92,10 +92,11 @@ async fn explain_analyze_baseline_metrics() {
         "CoalesceBatchesExec: target_batch_size=4096",
         "metrics=[output_rows=5, elapsed_compute"
     );
+    // The number of output rows becomes less after changing the global sort to the local sort with limit push down
     assert_metrics!(
         &formatted,
         "CoalescePartitionsExec",
-        "metrics=[output_rows=5, elapsed_compute="
+        "metrics=[output_rows=3, elapsed_compute="
     );
     assert_metrics!(
         &formatted,