You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/09/13 16:02:49 UTC
[arrow-datafusion] branch main updated: Move common code to utils (#7545)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 11b72d6765 Move common code to utils (#7545)
11b72d6765 is described below
commit 11b72d6765a81fc8ab400f4dcc8d709037b23f6f
Author: Mustafa Akur <10...@users.noreply.github.com>
AuthorDate: Wed Sep 13 19:02:43 2023 +0300
Move common code to utils (#7545)
---
.../src/physical_optimizer/enforce_distribution.rs | 13 ++--
.../core/src/physical_optimizer/enforce_sorting.rs | 86 +++-------------------
.../replace_with_order_preserving_variants.rs | 7 +-
datafusion/core/src/physical_optimizer/utils.rs | 66 +++++++++++++++++
4 files changed, 88 insertions(+), 84 deletions(-)
diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
index a0a9bc32e9..3bfe9ae3df 100644
--- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
@@ -28,8 +28,9 @@ use std::sync::Arc;
use crate::config::ConfigOptions;
use crate::datasource::physical_plan::{CsvExec, ParquetExec};
use crate::error::{DataFusionError, Result};
-use crate::physical_optimizer::enforce_sorting::{unbounded_output, ExecTree};
-use crate::physical_optimizer::utils::{add_sort_above, get_plan_string};
+use crate::physical_optimizer::utils::{
+ add_sort_above, get_plan_string, unbounded_output, ExecTree,
+};
use crate::physical_optimizer::PhysicalOptimizerRule;
use crate::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy};
use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
@@ -1446,7 +1447,7 @@ fn ensure_distribution(
/// we can optimize distribution of the plan if/when necessary.
#[derive(Debug, Clone)]
struct DistributionContext {
- pub(crate) plan: Arc<dyn ExecutionPlan>,
+ plan: Arc<dyn ExecutionPlan>,
/// Keep track of associations for each child of the plan. If `None`,
/// there is no distribution changing operator in its descendants.
distribution_onwards: Vec<Option<ExecTree>>,
@@ -1602,7 +1603,7 @@ struct JoinKeyPairs {
#[derive(Debug, Clone)]
struct PlanWithKeyRequirements {
- pub plan: Arc<dyn ExecutionPlan>,
+ plan: Arc<dyn ExecutionPlan>,
/// Parent required key ordering
required_key_ordering: Vec<Arc<dyn PhysicalExpr>>,
/// The request key ordering to children
@@ -1610,7 +1611,7 @@ struct PlanWithKeyRequirements {
}
impl PlanWithKeyRequirements {
- pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
+ fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
let children_len = plan.children().len();
PlanWithKeyRequirements {
plan,
@@ -1619,7 +1620,7 @@ impl PlanWithKeyRequirements {
}
}
- pub fn children(&self) -> Vec<PlanWithKeyRequirements> {
+ fn children(&self) -> Vec<PlanWithKeyRequirements> {
let plan_children = self.plan.children();
assert_eq!(plan_children.len(), self.request_key_ordering.len());
plan_children
diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs b/datafusion/core/src/physical_optimizer/enforce_sorting.rs
index a976105d36..b6f2adac1b 100644
--- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs
@@ -34,8 +34,6 @@
//! in the physical plan. The first sort is unnecessary since its result is overwritten
//! by another [`SortExec`]. Therefore, this rule removes it from the physical plan.
-use std::fmt;
-use std::fmt::Formatter;
use std::sync::Arc;
use crate::config::ConfigOptions;
@@ -45,9 +43,9 @@ use crate::physical_optimizer::replace_with_order_preserving_variants::{
};
use crate::physical_optimizer::sort_pushdown::{pushdown_sorts, SortPushDown};
use crate::physical_optimizer::utils::{
- add_sort_above, find_indices, get_plan_string, is_coalesce_partitions, is_limit,
- is_repartition, is_sort, is_sort_preserving_merge, is_sorted, is_union, is_window,
- merge_and_order_indices, set_difference,
+ add_sort_above, find_indices, is_coalesce_partitions, is_limit, is_repartition,
+ is_sort, is_sort_preserving_merge, is_sorted, is_union, is_window,
+ merge_and_order_indices, set_difference, unbounded_output, ExecTree,
};
use crate::physical_optimizer::PhysicalOptimizerRule;
use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
@@ -68,7 +66,7 @@ use datafusion_physical_expr::utils::{
};
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr, PhysicalSortRequirement};
-use itertools::{concat, izip, Itertools};
+use itertools::{izip, Itertools};
/// This rule inspects [`SortExec`]'s in the given physical plan and removes the
/// ones it can prove unnecessary.
@@ -82,54 +80,6 @@ impl EnforceSorting {
}
}
-/// This object implements a tree that we use while keeping track of paths
-/// leading to [`SortExec`]s.
-#[derive(Debug, Clone)]
-pub(crate) struct ExecTree {
- /// The `ExecutionPlan` associated with this node
- pub plan: Arc<dyn ExecutionPlan>,
- /// Child index of the plan in its parent
- pub idx: usize,
- /// Children of the plan that would need updating if we remove leaf executors
- pub children: Vec<ExecTree>,
-}
-
-impl fmt::Display for ExecTree {
- fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
- let plan_string = get_plan_string(&self.plan);
- write!(f, "\nidx: {:?}", self.idx)?;
- write!(f, "\nplan: {:?}", plan_string)?;
- for child in self.children.iter() {
- write!(f, "\nexec_tree:{}", child)?;
- }
- writeln!(f)
- }
-}
-
-impl ExecTree {
- /// Create new Exec tree
- pub fn new(
- plan: Arc<dyn ExecutionPlan>,
- idx: usize,
- children: Vec<ExecTree>,
- ) -> Self {
- ExecTree {
- plan,
- idx,
- children,
- }
- }
-
- /// This function returns the executors at the leaves of the tree.
- fn get_leaves(&self) -> Vec<Arc<dyn ExecutionPlan>> {
- if self.children.is_empty() {
- vec![self.plan.clone()]
- } else {
- concat(self.children.iter().map(|e| e.get_leaves()))
- }
- }
-}
-
/// This object is used within the [`EnforceSorting`] rule to track the closest
/// [`SortExec`] descendant(s) for every child of a plan.
#[derive(Debug, Clone)]
@@ -143,7 +93,7 @@ struct PlanWithCorrespondingSort {
}
impl PlanWithCorrespondingSort {
- pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
+ fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
let length = plan.children().len();
PlanWithCorrespondingSort {
plan,
@@ -151,7 +101,7 @@ impl PlanWithCorrespondingSort {
}
}
- pub fn new_from_children_nodes(
+ fn new_from_children_nodes(
children_nodes: Vec<PlanWithCorrespondingSort>,
parent_plan: Arc<dyn ExecutionPlan>,
) -> Result<Self> {
@@ -201,7 +151,7 @@ impl PlanWithCorrespondingSort {
Ok(PlanWithCorrespondingSort { plan, sort_onwards })
}
- pub fn children(&self) -> Vec<PlanWithCorrespondingSort> {
+ fn children(&self) -> Vec<PlanWithCorrespondingSort> {
self.plan
.children()
.into_iter()
@@ -258,7 +208,7 @@ struct PlanWithCorrespondingCoalescePartitions {
}
impl PlanWithCorrespondingCoalescePartitions {
- pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
+ fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
let length = plan.children().len();
PlanWithCorrespondingCoalescePartitions {
plan,
@@ -266,7 +216,7 @@ impl PlanWithCorrespondingCoalescePartitions {
}
}
- pub fn new_from_children_nodes(
+ fn new_from_children_nodes(
children_nodes: Vec<PlanWithCorrespondingCoalescePartitions>,
parent_plan: Arc<dyn ExecutionPlan>,
) -> Result<Self> {
@@ -317,7 +267,7 @@ impl PlanWithCorrespondingCoalescePartitions {
})
}
- pub fn children(&self) -> Vec<PlanWithCorrespondingCoalescePartitions> {
+ fn children(&self) -> Vec<PlanWithCorrespondingCoalescePartitions> {
self.plan
.children()
.into_iter()
@@ -980,21 +930,6 @@ fn check_alignment(
})
}
-// Get output (un)boundedness information for the given `plan`.
-pub(crate) fn unbounded_output(plan: &Arc<dyn ExecutionPlan>) -> bool {
- let result = if plan.children().is_empty() {
- plan.unbounded_output(&[])
- } else {
- let children_unbounded_output = plan
- .children()
- .iter()
- .map(unbounded_output)
- .collect::<Vec<_>>();
- plan.unbounded_output(&children_unbounded_output)
- };
- result.unwrap_or(true)
-}
-
#[cfg(test)]
mod tests {
use super::*;
@@ -1022,6 +957,7 @@ mod tests {
use datafusion_physical_expr::PhysicalSortExpr;
use crate::physical_optimizer::enforce_distribution::EnforceDistribution;
+ use crate::physical_optimizer::utils::get_plan_string;
use std::sync::Arc;
fn create_test_schema() -> Result<SchemaRef> {
diff --git a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
index 59ded45f7b..b406a54105 100644
--- a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
+++ b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
@@ -20,8 +20,9 @@
//! performance or to accommodate unbounded streams by fixing the pipeline.
use crate::error::Result;
-use crate::physical_optimizer::enforce_sorting::{unbounded_output, ExecTree};
-use crate::physical_optimizer::utils::{is_coalesce_partitions, is_sort};
+use crate::physical_optimizer::utils::{
+ is_coalesce_partitions, is_sort, unbounded_output, ExecTree,
+};
use crate::physical_plan::repartition::RepartitionExec;
use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan};
@@ -880,7 +881,7 @@ mod tests {
// creates a csv exec source for the test purposes
// projection and has_header parameters are given static due to testing needs
- pub fn csv_exec_sorted(
+ fn csv_exec_sorted(
schema: &SchemaRef,
sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
infinite_source: bool,
diff --git a/datafusion/core/src/physical_optimizer/utils.rs b/datafusion/core/src/physical_optimizer/utils.rs
index 5d150ae60f..b4dd75e586 100644
--- a/datafusion/core/src/physical_optimizer/utils.rs
+++ b/datafusion/core/src/physical_optimizer/utils.rs
@@ -17,8 +17,11 @@
//! Collection of utility functions that are leveraged by the query optimizer rules
+use itertools::concat;
use std::borrow::Borrow;
use std::collections::HashSet;
+use std::fmt;
+use std::fmt::Formatter;
use std::sync::Arc;
use crate::error::Result;
@@ -35,6 +38,69 @@ use datafusion_common::DataFusionError;
use datafusion_physical_expr::utils::ordering_satisfy;
use datafusion_physical_expr::PhysicalSortExpr;
+/// This object implements a tree that we use while keeping track of paths
+/// leading to [`SortExec`]s.
+#[derive(Debug, Clone)]
+pub(crate) struct ExecTree {
+ /// The `ExecutionPlan` associated with this node
+ pub plan: Arc<dyn ExecutionPlan>,
+ /// Child index of the plan in its parent
+ pub idx: usize,
+ /// Children of the plan that would need updating if we remove leaf executors
+ pub children: Vec<ExecTree>,
+}
+
+impl fmt::Display for ExecTree {
+ fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
+ let plan_string = get_plan_string(&self.plan);
+ write!(f, "\nidx: {:?}", self.idx)?;
+ write!(f, "\nplan: {:?}", plan_string)?;
+ for child in self.children.iter() {
+ write!(f, "\nexec_tree:{}", child)?;
+ }
+ writeln!(f)
+ }
+}
+
+impl ExecTree {
+ /// Create new Exec tree
+ pub fn new(
+ plan: Arc<dyn ExecutionPlan>,
+ idx: usize,
+ children: Vec<ExecTree>,
+ ) -> Self {
+ ExecTree {
+ plan,
+ idx,
+ children,
+ }
+ }
+
+ /// This function returns the executors at the leaves of the tree.
+ pub fn get_leaves(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+ if self.children.is_empty() {
+ vec![self.plan.clone()]
+ } else {
+ concat(self.children.iter().map(|e| e.get_leaves()))
+ }
+ }
+}
+
+// Get output (un)boundedness information for the given `plan`.
+pub(crate) fn unbounded_output(plan: &Arc<dyn ExecutionPlan>) -> bool {
+ let result = if plan.children().is_empty() {
+ plan.unbounded_output(&[])
+ } else {
+ let children_unbounded_output = plan
+ .children()
+ .iter()
+ .map(unbounded_output)
+ .collect::<Vec<_>>();
+ plan.unbounded_output(&children_unbounded_output)
+ };
+ result.unwrap_or(true)
+}
+
/// This utility function adds a `SortExec` above an operator according to the
/// given ordering requirements while preserving the original partitioning.
pub fn add_sort_above(