You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/09/13 16:02:49 UTC

[arrow-datafusion] branch main updated: Move common code to utils (#7545)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 11b72d6765 Move common code to utils (#7545)
11b72d6765 is described below

commit 11b72d6765a81fc8ab400f4dcc8d709037b23f6f
Author: Mustafa Akur <10...@users.noreply.github.com>
AuthorDate: Wed Sep 13 19:02:43 2023 +0300

    Move common code to utils (#7545)
---
 .../src/physical_optimizer/enforce_distribution.rs | 13 ++--
 .../core/src/physical_optimizer/enforce_sorting.rs | 86 +++-------------------
 .../replace_with_order_preserving_variants.rs      |  7 +-
 datafusion/core/src/physical_optimizer/utils.rs    | 66 +++++++++++++++++
 4 files changed, 88 insertions(+), 84 deletions(-)

diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
index a0a9bc32e9..3bfe9ae3df 100644
--- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
@@ -28,8 +28,9 @@ use std::sync::Arc;
 use crate::config::ConfigOptions;
 use crate::datasource::physical_plan::{CsvExec, ParquetExec};
 use crate::error::{DataFusionError, Result};
-use crate::physical_optimizer::enforce_sorting::{unbounded_output, ExecTree};
-use crate::physical_optimizer::utils::{add_sort_above, get_plan_string};
+use crate::physical_optimizer::utils::{
+    add_sort_above, get_plan_string, unbounded_output, ExecTree,
+};
 use crate::physical_optimizer::PhysicalOptimizerRule;
 use crate::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy};
 use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
@@ -1446,7 +1447,7 @@ fn ensure_distribution(
 /// we can optimize distribution of the plan if/when necessary.
 #[derive(Debug, Clone)]
 struct DistributionContext {
-    pub(crate) plan: Arc<dyn ExecutionPlan>,
+    plan: Arc<dyn ExecutionPlan>,
     /// Keep track of associations for each child of the plan. If `None`,
     /// there is no distribution changing operator in its descendants.
     distribution_onwards: Vec<Option<ExecTree>>,
@@ -1602,7 +1603,7 @@ struct JoinKeyPairs {
 
 #[derive(Debug, Clone)]
 struct PlanWithKeyRequirements {
-    pub plan: Arc<dyn ExecutionPlan>,
+    plan: Arc<dyn ExecutionPlan>,
     /// Parent required key ordering
     required_key_ordering: Vec<Arc<dyn PhysicalExpr>>,
     /// The request key ordering to children
@@ -1610,7 +1611,7 @@ struct PlanWithKeyRequirements {
 }
 
 impl PlanWithKeyRequirements {
-    pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
+    fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
         let children_len = plan.children().len();
         PlanWithKeyRequirements {
             plan,
@@ -1619,7 +1620,7 @@ impl PlanWithKeyRequirements {
         }
     }
 
-    pub fn children(&self) -> Vec<PlanWithKeyRequirements> {
+    fn children(&self) -> Vec<PlanWithKeyRequirements> {
         let plan_children = self.plan.children();
         assert_eq!(plan_children.len(), self.request_key_ordering.len());
         plan_children
diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs b/datafusion/core/src/physical_optimizer/enforce_sorting.rs
index a976105d36..b6f2adac1b 100644
--- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs
@@ -34,8 +34,6 @@
 //! in the physical plan. The first sort is unnecessary since its result is overwritten
 //! by another [`SortExec`]. Therefore, this rule removes it from the physical plan.
 
-use std::fmt;
-use std::fmt::Formatter;
 use std::sync::Arc;
 
 use crate::config::ConfigOptions;
@@ -45,9 +43,9 @@ use crate::physical_optimizer::replace_with_order_preserving_variants::{
 };
 use crate::physical_optimizer::sort_pushdown::{pushdown_sorts, SortPushDown};
 use crate::physical_optimizer::utils::{
-    add_sort_above, find_indices, get_plan_string, is_coalesce_partitions, is_limit,
-    is_repartition, is_sort, is_sort_preserving_merge, is_sorted, is_union, is_window,
-    merge_and_order_indices, set_difference,
+    add_sort_above, find_indices, is_coalesce_partitions, is_limit, is_repartition,
+    is_sort, is_sort_preserving_merge, is_sorted, is_union, is_window,
+    merge_and_order_indices, set_difference, unbounded_output, ExecTree,
 };
 use crate::physical_optimizer::PhysicalOptimizerRule;
 use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
@@ -68,7 +66,7 @@ use datafusion_physical_expr::utils::{
 };
 use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr, PhysicalSortRequirement};
 
-use itertools::{concat, izip, Itertools};
+use itertools::{izip, Itertools};
 
 /// This rule inspects [`SortExec`]'s in the given physical plan and removes the
 /// ones it can prove unnecessary.
@@ -82,54 +80,6 @@ impl EnforceSorting {
     }
 }
 
-/// This object implements a tree that we use while keeping track of paths
-/// leading to [`SortExec`]s.
-#[derive(Debug, Clone)]
-pub(crate) struct ExecTree {
-    /// The `ExecutionPlan` associated with this node
-    pub plan: Arc<dyn ExecutionPlan>,
-    /// Child index of the plan in its parent
-    pub idx: usize,
-    /// Children of the plan that would need updating if we remove leaf executors
-    pub children: Vec<ExecTree>,
-}
-
-impl fmt::Display for ExecTree {
-    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
-        let plan_string = get_plan_string(&self.plan);
-        write!(f, "\nidx: {:?}", self.idx)?;
-        write!(f, "\nplan: {:?}", plan_string)?;
-        for child in self.children.iter() {
-            write!(f, "\nexec_tree:{}", child)?;
-        }
-        writeln!(f)
-    }
-}
-
-impl ExecTree {
-    /// Create new Exec tree
-    pub fn new(
-        plan: Arc<dyn ExecutionPlan>,
-        idx: usize,
-        children: Vec<ExecTree>,
-    ) -> Self {
-        ExecTree {
-            plan,
-            idx,
-            children,
-        }
-    }
-
-    /// This function returns the executors at the leaves of the tree.
-    fn get_leaves(&self) -> Vec<Arc<dyn ExecutionPlan>> {
-        if self.children.is_empty() {
-            vec![self.plan.clone()]
-        } else {
-            concat(self.children.iter().map(|e| e.get_leaves()))
-        }
-    }
-}
-
 /// This object is used within the [`EnforceSorting`] rule to track the closest
 /// [`SortExec`] descendant(s) for every child of a plan.
 #[derive(Debug, Clone)]
@@ -143,7 +93,7 @@ struct PlanWithCorrespondingSort {
 }
 
 impl PlanWithCorrespondingSort {
-    pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
+    fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
         let length = plan.children().len();
         PlanWithCorrespondingSort {
             plan,
@@ -151,7 +101,7 @@ impl PlanWithCorrespondingSort {
         }
     }
 
-    pub fn new_from_children_nodes(
+    fn new_from_children_nodes(
         children_nodes: Vec<PlanWithCorrespondingSort>,
         parent_plan: Arc<dyn ExecutionPlan>,
     ) -> Result<Self> {
@@ -201,7 +151,7 @@ impl PlanWithCorrespondingSort {
         Ok(PlanWithCorrespondingSort { plan, sort_onwards })
     }
 
-    pub fn children(&self) -> Vec<PlanWithCorrespondingSort> {
+    fn children(&self) -> Vec<PlanWithCorrespondingSort> {
         self.plan
             .children()
             .into_iter()
@@ -258,7 +208,7 @@ struct PlanWithCorrespondingCoalescePartitions {
 }
 
 impl PlanWithCorrespondingCoalescePartitions {
-    pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
+    fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
         let length = plan.children().len();
         PlanWithCorrespondingCoalescePartitions {
             plan,
@@ -266,7 +216,7 @@ impl PlanWithCorrespondingCoalescePartitions {
         }
     }
 
-    pub fn new_from_children_nodes(
+    fn new_from_children_nodes(
         children_nodes: Vec<PlanWithCorrespondingCoalescePartitions>,
         parent_plan: Arc<dyn ExecutionPlan>,
     ) -> Result<Self> {
@@ -317,7 +267,7 @@ impl PlanWithCorrespondingCoalescePartitions {
         })
     }
 
-    pub fn children(&self) -> Vec<PlanWithCorrespondingCoalescePartitions> {
+    fn children(&self) -> Vec<PlanWithCorrespondingCoalescePartitions> {
         self.plan
             .children()
             .into_iter()
@@ -980,21 +930,6 @@ fn check_alignment(
     })
 }
 
-// Get output (un)boundedness information for the given `plan`.
-pub(crate) fn unbounded_output(plan: &Arc<dyn ExecutionPlan>) -> bool {
-    let result = if plan.children().is_empty() {
-        plan.unbounded_output(&[])
-    } else {
-        let children_unbounded_output = plan
-            .children()
-            .iter()
-            .map(unbounded_output)
-            .collect::<Vec<_>>();
-        plan.unbounded_output(&children_unbounded_output)
-    };
-    result.unwrap_or(true)
-}
-
 #[cfg(test)]
 mod tests {
     use super::*;
@@ -1022,6 +957,7 @@ mod tests {
     use datafusion_physical_expr::PhysicalSortExpr;
 
     use crate::physical_optimizer::enforce_distribution::EnforceDistribution;
+    use crate::physical_optimizer::utils::get_plan_string;
     use std::sync::Arc;
 
     fn create_test_schema() -> Result<SchemaRef> {
diff --git a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
index 59ded45f7b..b406a54105 100644
--- a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
+++ b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
@@ -20,8 +20,9 @@
 //! performance or to accommodate unbounded streams by fixing the pipeline.
 
 use crate::error::Result;
-use crate::physical_optimizer::enforce_sorting::{unbounded_output, ExecTree};
-use crate::physical_optimizer::utils::{is_coalesce_partitions, is_sort};
+use crate::physical_optimizer::utils::{
+    is_coalesce_partitions, is_sort, unbounded_output, ExecTree,
+};
 use crate::physical_plan::repartition::RepartitionExec;
 use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
 use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan};
@@ -880,7 +881,7 @@ mod tests {
 
     // creates a csv exec source for the test purposes
     // projection and has_header parameters are given static due to testing needs
-    pub fn csv_exec_sorted(
+    fn csv_exec_sorted(
         schema: &SchemaRef,
         sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
         infinite_source: bool,
diff --git a/datafusion/core/src/physical_optimizer/utils.rs b/datafusion/core/src/physical_optimizer/utils.rs
index 5d150ae60f..b4dd75e586 100644
--- a/datafusion/core/src/physical_optimizer/utils.rs
+++ b/datafusion/core/src/physical_optimizer/utils.rs
@@ -17,8 +17,11 @@
 
 //! Collection of utility functions that are leveraged by the query optimizer rules
 
+use itertools::concat;
 use std::borrow::Borrow;
 use std::collections::HashSet;
+use std::fmt;
+use std::fmt::Formatter;
 use std::sync::Arc;
 
 use crate::error::Result;
@@ -35,6 +38,69 @@ use datafusion_common::DataFusionError;
 use datafusion_physical_expr::utils::ordering_satisfy;
 use datafusion_physical_expr::PhysicalSortExpr;
 
+/// This object implements a tree that we use while keeping track of paths
+/// leading to [`SortExec`]s.
+#[derive(Debug, Clone)]
+pub(crate) struct ExecTree {
+    /// The `ExecutionPlan` associated with this node
+    pub plan: Arc<dyn ExecutionPlan>,
+    /// Child index of the plan in its parent
+    pub idx: usize,
+    /// Children of the plan that would need updating if we remove leaf executors
+    pub children: Vec<ExecTree>,
+}
+
+impl fmt::Display for ExecTree {
+    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
+        let plan_string = get_plan_string(&self.plan);
+        write!(f, "\nidx: {:?}", self.idx)?;
+        write!(f, "\nplan: {:?}", plan_string)?;
+        for child in self.children.iter() {
+            write!(f, "\nexec_tree:{}", child)?;
+        }
+        writeln!(f)
+    }
+}
+
+impl ExecTree {
+    /// Create new Exec tree
+    pub fn new(
+        plan: Arc<dyn ExecutionPlan>,
+        idx: usize,
+        children: Vec<ExecTree>,
+    ) -> Self {
+        ExecTree {
+            plan,
+            idx,
+            children,
+        }
+    }
+
+    /// This function returns the executors at the leaves of the tree.
+    pub fn get_leaves(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+        if self.children.is_empty() {
+            vec![self.plan.clone()]
+        } else {
+            concat(self.children.iter().map(|e| e.get_leaves()))
+        }
+    }
+}
+
+// Get output (un)boundedness information for the given `plan`.
+pub(crate) fn unbounded_output(plan: &Arc<dyn ExecutionPlan>) -> bool {
+    let result = if plan.children().is_empty() {
+        plan.unbounded_output(&[])
+    } else {
+        let children_unbounded_output = plan
+            .children()
+            .iter()
+            .map(unbounded_output)
+            .collect::<Vec<_>>();
+        plan.unbounded_output(&children_unbounded_output)
+    };
+    result.unwrap_or(true)
+}
+
 /// This utility function adds a `SortExec` above an operator according to the
 /// given ordering requirements while preserving the original partitioning.
 pub fn add_sort_above(