You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/11/14 12:05:37 UTC

[arrow-datafusion] branch master updated: [FOLLOWUP] Enforcement Rule: resolve review comments, refactor adjust_input_keys_ordering() (#4184)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new d72b4f04c [FOLLOWUP] Enforcement Rule: resolve review comments, refactor adjust_input_keys_ordering() (#4184)
d72b4f04c is described below

commit d72b4f04cf12424a54b22bf376b87065d5fcb6aa
Author: mingmwang <mi...@ebay.com>
AuthorDate: Mon Nov 14 20:05:30 2022 +0800

    [FOLLOWUP] Enforcement Rule: resolve review comments, refactor adjust_input_keys_ordering() (#4184)
    
    * [FOLLOWUP] Enforcement Rule: resolve review comments, refactor adjust_input_keys_ordering()
    
    * fix typo
    
    * remove allow(clippy::vtable_address_comparisons)
---
 datafusion/core/src/execution/context.rs           |   3 +-
 .../core/src/physical_optimizer/enforcement.rs     | 737 ++++++++++++---------
 .../core/src/physical_optimizer/merge_exec.rs      |  75 ---
 datafusion/core/src/physical_optimizer/mod.rs      |   1 -
 datafusion/core/src/physical_plan/mod.rs           |  83 ++-
 datafusion/core/src/physical_plan/rewrite.rs       |   5 +-
 datafusion/physical-expr/src/equivalence.rs        |  14 +
 datafusion/physical-expr/src/physical_expr.rs      |   5 +-
 datafusion/physical-expr/src/rewrite.rs            |   5 +-
 9 files changed, 542 insertions(+), 386 deletions(-)

diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs
index e0d2e0611..ce88b9f20 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -1589,8 +1589,9 @@ impl SessionState {
             )));
         }
         physical_optimizers.push(Arc::new(Repartition::new()));
+        // Repartition rule could introduce additional RepartitionExec with RoundRobin partitioning.
+        // To make sure the SinglePartition is satisfied, run the BasicEnforcement again, originally it was the AddCoalescePartitionsExec here.
         physical_optimizers.push(Arc::new(BasicEnforcement::new()));
-        // physical_optimizers.push(Arc::new(AddCoalescePartitionsExec::new()));
 
         SessionState {
             session_id,
diff --git a/datafusion/core/src/physical_optimizer/enforcement.rs b/datafusion/core/src/physical_optimizer/enforcement.rs
index 1eaf153a9..21931397f 100644
--- a/datafusion/core/src/physical_optimizer/enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/enforcement.rs
@@ -29,24 +29,27 @@ use crate::physical_plan::projection::ProjectionExec;
 use crate::physical_plan::repartition::RepartitionExec;
 use crate::physical_plan::rewrite::TreeNodeRewritable;
 use crate::physical_plan::sorts::sort::SortExec;
+use crate::physical_plan::sorts::sort::SortOptions;
 use crate::physical_plan::windows::WindowAggExec;
 use crate::physical_plan::Partitioning;
 use crate::physical_plan::{with_new_children_if_necessary, Distribution, ExecutionPlan};
 use crate::prelude::SessionConfig;
+use arrow::datatypes::SchemaRef;
 use datafusion_expr::logical_plan::JoinType;
 use datafusion_physical_expr::equivalence::EquivalenceProperties;
 use datafusion_physical_expr::expressions::Column;
 use datafusion_physical_expr::expressions::NoOp;
 use datafusion_physical_expr::{
     expr_list_eq_strict_order, normalize_expr_with_equivalence_properties,
-    normalize_sort_expr_with_equivalence_properties, PhysicalExpr, PhysicalSortExpr,
+    normalize_sort_expr_with_equivalence_properties, AggregateExpr, PhysicalExpr,
+    PhysicalSortExpr,
 };
 use std::collections::HashMap;
 use std::sync::Arc;
 
 /// BasicEnforcement rule, it ensures the Distribution and Ordering requirements are met
 /// in the strictest way. It might add additional [[RepartitionExec]] to the plan tree
-/// and give a non-optimal plan, but it can avoid the possible data skew in joins
+/// and give a non-optimal plan, but it can avoid the possible data skew in joins.
 ///
 /// For example for a HashJoin with keys(a, b, c), the required Distribution(a, b, c) can be satisfied by
 /// several alternative partitioning ways: [(a, b, c), (a, b), (a, c), (b, c), (a), (b), (c), ( )].
@@ -72,7 +75,10 @@ impl PhysicalOptimizerRule for BasicEnforcement {
         let top_down_join_key_reordering = config.top_down_join_key_reordering;
         let new_plan = if top_down_join_key_reordering {
             // Run a top-down process to adjust input key ordering recursively
-            adjust_input_keys_down_recursively(plan, vec![])?
+            let plan_requirements = PlanWithKeyRequirements::new(plan);
+            let adjusted =
+                plan_requirements.transform_down(&adjust_input_keys_ordering)?;
+            adjusted.plan
         } else {
             plan
         };
@@ -99,13 +105,51 @@ impl PhysicalOptimizerRule for BasicEnforcement {
 
 /// When the physical planner creates the Joins, the ordering of join keys is from the original query.
 /// That might not match with the output partitioning of the join node's children
-/// This method runs a top-down process and try to adjust the output partitioning of the children
-/// if the children themselves are Joins or Aggregations.
-fn adjust_input_keys_down_recursively(
-    plan: Arc<dyn crate::physical_plan::ExecutionPlan>,
-    parent_required: Vec<Arc<dyn PhysicalExpr>>,
-) -> Result<Arc<dyn crate::physical_plan::ExecutionPlan>> {
-    let plan_any = plan.as_any();
+/// A Top-Down process will use this method to adjust children's output partitioning based on the parent key reordering requirements:
+///
+/// Example:
+///     TopJoin on (a, b, c)
+///         bottom left join on(b, a, c)
+///         bottom right join on(c, b, a)
+///
+///  Will be adjusted to:
+///     TopJoin on (a, b, c)
+///         bottom left join on(a, b, c)
+///         bottom right join on(a, b, c)
+///
+/// Example:
+///     TopJoin on (a, b, c)
+///         Agg1 group by (b, a, c)
+///         Agg2 group by (c, b, a)
+///
+/// Will be adjusted to:
+///     TopJoin on (a, b, c)
+///          Projection(b, a, c)
+///             Agg1 group by (a, b, c)
+///          Projection(c, b, a)
+///             Agg2 group by (a, b, c)
+///
+/// Following is the explanation of the reordering process:
+///
+/// 1) If the current plan is Partitioned HashJoin, SortMergeJoin, check whether the requirements can be satisfied by adjusting join keys ordering:
+///    Requirements can not be satisfied, clear the current requirements, generate new requirements(to pushdown) based on the current join keys, return the unchanged plan.
+///    Requirements is already satisfied, clear the current requirements, generate new requirements(to pushdown) based on the current join keys, return the unchanged plan.
+///    Requirements can be satisfied by adjusting keys ordering, clear the current requiements, generate new requirements(to pushdown) based on the adjusted join keys, return the changed plan.
+///
+/// 2) If the current plan is Aggregation, check whether the requirements can be satisfied by adjusting group by keys ordering:
+///    Requirements can not be satisfied, clear all the requirements, return the unchanged plan.
+///    Requirements is already satisfied, clear all the requirements, return the unchanged plan.
+///    Requirements can be satisfied by adjusting keys ordering, clear all the requirements, return the changed plan.
+///
+/// 3) If the current plan is RepartitionExec, CoalescePartitionsExec or WindowAggExec, clear all the requirements, return the unchanged plan
+/// 4) If the current plan is Projection, transform the requirements to the columns before the Projection and push down requirements
+/// 5) For other types of operators, by default, pushdown the parent requirements to children.
+///
+fn adjust_input_keys_ordering(
+    requirements: PlanWithKeyRequirements,
+) -> Option<PlanWithKeyRequirements> {
+    let parent_required = requirements.required_key_ordering.clone();
+    let plan_any = requirements.plan.as_any();
     if let Some(HashJoinExec {
         left,
         right,
@@ -119,99 +163,65 @@ fn adjust_input_keys_down_recursively(
     {
         match mode {
             PartitionMode::Partitioned => {
-                let join_key_pairs = extract_join_keys(on);
-                if let Some((
-                    JoinKeyPairs {
-                        left_keys,
-                        right_keys,
-                    },
-                    new_positions,
-                )) = try_reorder(
-                    join_key_pairs.clone(),
-                    parent_required,
-                    &plan.equivalence_properties(),
-                ) {
-                    let new_join_on = if !new_positions.is_empty() {
-                        new_join_conditions(&left_keys, &right_keys)
-                    } else {
-                        on.clone()
+                let join_constructor =
+                    |new_conditions: (Vec<(Column, Column)>, Vec<SortOptions>)| {
+                        Arc::new(
+                            HashJoinExec::try_new(
+                                left.clone(),
+                                right.clone(),
+                                new_conditions.0,
+                                filter.clone(),
+                                join_type,
+                                PartitionMode::Partitioned,
+                                null_equals_null,
+                            )
+                            .unwrap(),
+                        ) as Arc<dyn ExecutionPlan>
                     };
-                    let new_left =
-                        adjust_input_keys_down_recursively(left.clone(), left_keys)?;
-                    let new_right =
-                        adjust_input_keys_down_recursively(right.clone(), right_keys)?;
-                    Ok(Arc::new(HashJoinExec::try_new(
-                        new_left,
-                        new_right,
-                        new_join_on,
-                        filter.clone(),
-                        join_type,
-                        PartitionMode::Partitioned,
-                        null_equals_null,
-                    )?))
-                } else {
-                    let new_left = adjust_input_keys_down_recursively(
-                        left.clone(),
-                        join_key_pairs.left_keys,
-                    )?;
-                    let new_right = adjust_input_keys_down_recursively(
-                        right.clone(),
-                        join_key_pairs.right_keys,
-                    )?;
-                    Ok(Arc::new(HashJoinExec::try_new(
-                        new_left,
-                        new_right,
-                        on.clone(),
-                        filter.clone(),
-                        join_type,
-                        PartitionMode::Partitioned,
-                        null_equals_null,
-                    )?))
-                }
+                Some(reorder_partitioned_join_keys(
+                    requirements.plan.clone(),
+                    &parent_required,
+                    on,
+                    vec![],
+                    &join_constructor,
+                ))
             }
             PartitionMode::CollectLeft => {
-                let new_left = adjust_input_keys_down_recursively(left.clone(), vec![])?;
-                let new_right = match join_type {
-                    JoinType::Inner | JoinType::Right => try_push_required_to_right(
-                        parent_required,
-                        right.clone(),
+                let new_right_request = match join_type {
+                    JoinType::Inner | JoinType::Right => shift_right_required(
+                        &parent_required,
                         left.schema().fields().len(),
-                    )?,
+                    ),
                     JoinType::RightSemi | JoinType::RightAnti => {
-                        adjust_input_keys_down_recursively(
-                            right.clone(),
-                            parent_required.clone(),
-                        )?
+                        Some(parent_required.clone())
                     }
                     JoinType::Left
                     | JoinType::LeftSemi
                     | JoinType::LeftAnti
-                    | JoinType::Full => {
-                        adjust_input_keys_down_recursively(right.clone(), vec![])?
-                    }
+                    | JoinType::Full => None,
                 };
 
-                Ok(Arc::new(HashJoinExec::try_new(
-                    new_left,
-                    new_right,
-                    on.clone(),
-                    filter.clone(),
-                    join_type,
-                    PartitionMode::CollectLeft,
-                    null_equals_null,
-                )?))
+                // Push down requirements to the right side
+                Some(PlanWithKeyRequirements {
+                    plan: requirements.plan.clone(),
+                    required_key_ordering: vec![],
+                    request_key_ordering: vec![None, new_right_request],
+                })
             }
         }
-    } else if let Some(CrossJoinExec { left, right, .. }) =
+    } else if let Some(CrossJoinExec { left, .. }) =
         plan_any.downcast_ref::<CrossJoinExec>()
     {
-        let new_left = adjust_input_keys_down_recursively(left.clone(), vec![])?;
-        let new_right = try_push_required_to_right(
-            parent_required,
-            right.clone(),
-            left.schema().fields().len(),
-        )?;
-        Ok(Arc::new(CrossJoinExec::try_new(new_left, new_right)?))
+        let left_columns_len = left.schema().fields().len();
+        // Push down requirements to the right side
+        Some(PlanWithKeyRequirements {
+            plan: requirements.plan.clone(),
+            required_key_ordering: vec![],
+            request_key_ordering: vec![
+                None,
+                shift_right_required(&parent_required, left_columns_len),
+            ],
+        })
     } else if let Some(SortMergeJoinExec {
         left,
         right,
@@ -222,63 +232,27 @@ fn adjust_input_keys_down_recursively(
         ..
     }) = plan_any.downcast_ref::<SortMergeJoinExec>()
     {
-        let join_key_pairs = extract_join_keys(on);
-        if let Some((
-            JoinKeyPairs {
-                left_keys,
-                right_keys,
-            },
-            new_positions,
-        )) = try_reorder(
-            join_key_pairs.clone(),
-            parent_required,
-            &plan.equivalence_properties(),
-        ) {
-            let new_join_on = if !new_positions.is_empty() {
-                new_join_conditions(&left_keys, &right_keys)
-            } else {
-                on.clone()
-            };
-            let new_options = if !new_positions.is_empty() {
-                let mut new_sort_options = vec![];
-                for idx in 0..sort_options.len() {
-                    new_sort_options.push(sort_options[new_positions[idx]])
-                }
-                new_sort_options
-            } else {
-                sort_options.clone()
+        let join_constructor =
+            |new_conditions: (Vec<(Column, Column)>, Vec<SortOptions>)| {
+                Arc::new(
+                    SortMergeJoinExec::try_new(
+                        left.clone(),
+                        right.clone(),
+                        new_conditions.0,
+                        *join_type,
+                        new_conditions.1,
+                        *null_equals_null,
+                    )
+                    .unwrap(),
+                ) as Arc<dyn ExecutionPlan>
             };
-
-            let new_left = adjust_input_keys_down_recursively(left.clone(), left_keys)?;
-            let new_right =
-                adjust_input_keys_down_recursively(right.clone(), right_keys)?;
-
-            Ok(Arc::new(SortMergeJoinExec::try_new(
-                new_left,
-                new_right,
-                new_join_on,
-                *join_type,
-                new_options,
-                *null_equals_null,
-            )?))
-        } else {
-            let new_left = adjust_input_keys_down_recursively(
-                left.clone(),
-                join_key_pairs.left_keys,
-            )?;
-            let new_right = adjust_input_keys_down_recursively(
-                right.clone(),
-                join_key_pairs.right_keys,
-            )?;
-            Ok(Arc::new(SortMergeJoinExec::try_new(
-                new_left,
-                new_right,
-                on.clone(),
-                *join_type,
-                sort_options.clone(),
-                *null_equals_null,
-            )?))
-        }
+        Some(reorder_partitioned_join_keys(
+            requirements.plan.clone(),
+            &parent_required,
+            on,
+            sort_options.clone(),
+            &join_constructor,
+        ))
     } else if let Some(AggregateExec {
         mode,
         group_by,
@@ -288,173 +262,255 @@ fn adjust_input_keys_down_recursively(
         ..
     }) = plan_any.downcast_ref::<AggregateExec>()
     {
-        if parent_required.is_empty() {
-            plan.map_children(|plan| adjust_input_keys_down_recursively(plan, vec![]))
-        } else {
+        if !parent_required.is_empty() {
             match mode {
-                AggregateMode::Final => plan.map_children(|plan| {
-                    adjust_input_keys_down_recursively(plan, vec![])
-                }),
-                AggregateMode::FinalPartitioned | AggregateMode::Partial => {
-                    let out_put_columns = group_by
-                        .expr()
-                        .iter()
-                        .enumerate()
-                        .map(|(index, (_col, name))| Column::new(name, index))
-                        .collect::<Vec<_>>();
-
-                    let out_put_exprs = out_put_columns
-                        .iter()
-                        .map(|c| Arc::new(c.clone()) as Arc<dyn PhysicalExpr>)
-                        .collect::<Vec<_>>();
-
-                    // Check whether the requirements can be satisfied by the Aggregation
-                    if parent_required.len() != out_put_exprs.len()
-                        || expr_list_eq_strict_order(&out_put_exprs, &parent_required)
-                        || !group_by.null_expr().is_empty()
-                    {
-                        plan.map_children(|plan| {
-                            adjust_input_keys_down_recursively(plan, vec![])
-                        })
-                    } else {
-                        let new_positions =
-                            expected_expr_positions(&out_put_exprs, &parent_required);
-                        match new_positions {
-                            Some(positions) => {
-                                let mut new_group_exprs = vec![];
-                                for idx in positions.into_iter() {
-                                    new_group_exprs.push(group_by.expr()[idx].clone());
-                                }
-                                let new_group_by =
-                                    PhysicalGroupBy::new_single(new_group_exprs);
-                                match mode {
-                                    AggregateMode::FinalPartitioned => {
-                                        // Since the input of FinalPartitioned should be the Partial AggregateExec and they should
-                                        // share the same column order, it's safe to call adjust_input_keys_down_recursively here
-                                        let new_input =
-                                            adjust_input_keys_down_recursively(
-                                                input.clone(),
-                                                parent_required,
-                                            )?;
-                                        let new_agg = Arc::new(AggregateExec::try_new(
-                                            AggregateMode::FinalPartitioned,
-                                            new_group_by,
-                                            aggr_expr.clone(),
-                                            new_input,
-                                            input_schema.clone(),
-                                        )?);
-
-                                        // Need to create a new projection to change the expr ordering back
-                                        let mut proj_exprs = out_put_columns
-                                            .iter()
-                                            .map(|col| {
-                                                (
-                                                    Arc::new(Column::new(
-                                                        col.name(),
-                                                        new_agg
-                                                            .schema()
-                                                            .index_of(col.name())
-                                                            .unwrap(),
-                                                    ))
-                                                        as Arc<dyn PhysicalExpr>,
-                                                    col.name().to_owned(),
-                                                )
-                                            })
-                                            .collect::<Vec<_>>();
-                                        let agg_schema = new_agg.schema();
-                                        let agg_fields = agg_schema.fields();
-                                        for (idx, field) in agg_fields
-                                            .iter()
-                                            .enumerate()
-                                            .skip(out_put_columns.len())
-                                        {
-                                            proj_exprs.push((
-                                                Arc::new(Column::new(
-                                                    field.name().as_str(),
-                                                    idx,
-                                                ))
-                                                    as Arc<dyn PhysicalExpr>,
-                                                field.name().clone(),
-                                            ))
-                                        }
-                                        // TODO merge adjacent Projections if there are
-                                        Ok(Arc::new(ProjectionExec::try_new(
-                                            proj_exprs, new_agg,
-                                        )?))
-                                    }
-                                    AggregateMode::Partial => {
-                                        let new_input =
-                                            adjust_input_keys_down_recursively(
-                                                input.clone(),
-                                                vec![],
-                                            )?;
-                                        Ok(Arc::new(AggregateExec::try_new(
-                                            AggregateMode::Partial,
-                                            new_group_by,
-                                            aggr_expr.clone(),
-                                            new_input,
-                                            input_schema.clone(),
-                                        )?))
-                                    }
-                                    _ => Ok(plan),
-                                }
-                            }
-                            None => plan.map_children(|plan| {
-                                adjust_input_keys_down_recursively(plan, vec![])
-                            }),
-                        }
-                    }
-                }
+                AggregateMode::FinalPartitioned => Some(reorder_aggregate_keys(
+                    requirements.plan.clone(),
+                    &parent_required,
+                    group_by,
+                    aggr_expr,
+                    input.clone(),
+                    input_schema,
+                )),
+                _ => Some(PlanWithKeyRequirements::new(requirements.plan.clone())),
             }
+        } else {
+            // Keep everything unchanged
+            None
         }
     } else if let Some(ProjectionExec { expr, .. }) =
         plan_any.downcast_ref::<ProjectionExec>()
     {
-        // For Projection, we need to transform the columns to the columns before the Projection
+        // For Projection, we need to transform the requirements to the columns before the Projection
         // And then to push down the requirements
         // Construct a mapping from new name to the the orginal Column
-        let mut column_mapping = HashMap::new();
-        for (expression, name) in expr.iter() {
-            if let Some(column) = expression.as_any().downcast_ref::<Column>() {
-                column_mapping.insert(name.clone(), column.clone());
-            };
-        }
-        let new_required: Vec<Arc<dyn PhysicalExpr>> = parent_required
-            .iter()
-            .filter_map(|r| {
-                if let Some(column) = r.as_any().downcast_ref::<Column>() {
-                    column_mapping.get(column.name())
-                } else {
-                    None
-                }
-            })
-            .map(|e| Arc::new(e.clone()) as Arc<dyn PhysicalExpr>)
-            .collect::<Vec<_>>();
+        let new_required = map_columns_before_projection(&parent_required, expr);
         if new_required.len() == parent_required.len() {
-            plan.map_children(|plan| {
-                adjust_input_keys_down_recursively(plan, new_required.clone())
+            Some(PlanWithKeyRequirements {
+                plan: requirements.plan.clone(),
+                required_key_ordering: vec![],
+                request_key_ordering: vec![Some(new_required.clone())],
             })
         } else {
-            plan.map_children(|plan| adjust_input_keys_down_recursively(plan, vec![]))
+            // Can not satisfy, clear the current requirements and generate new empty requirements
+            Some(PlanWithKeyRequirements::new(requirements.plan.clone()))
         }
     } else if plan_any.downcast_ref::<RepartitionExec>().is_some()
         || plan_any.downcast_ref::<CoalescePartitionsExec>().is_some()
         || plan_any.downcast_ref::<WindowAggExec>().is_some()
     {
-        plan.map_children(|plan| adjust_input_keys_down_recursively(plan, vec![]))
+        Some(PlanWithKeyRequirements::new(requirements.plan.clone()))
     } else {
-        plan.map_children(|plan| {
-            adjust_input_keys_down_recursively(plan, parent_required.clone())
+        // By default, push down the parent requirements to children
+        let children_len = requirements.plan.children().len();
+        Some(PlanWithKeyRequirements {
+            plan: requirements.plan.clone(),
+            required_key_ordering: vec![],
+            request_key_ordering: vec![Some(parent_required.clone()); children_len],
         })
     }
 }
 
-fn try_push_required_to_right(
-    parent_required: Vec<Arc<dyn PhysicalExpr>>,
-    right: Arc<dyn ExecutionPlan>,
-    left_columns_len: usize,
-) -> Result<Arc<dyn ExecutionPlan>> {
+fn reorder_partitioned_join_keys<F>(
+    join_plan: Arc<dyn ExecutionPlan>,
+    parent_required: &[Arc<dyn PhysicalExpr>],
+    on: &[(Column, Column)],
+    sort_options: Vec<SortOptions>,
+    join_constructor: &F,
+) -> PlanWithKeyRequirements
+where
+    F: Fn((Vec<(Column, Column)>, Vec<SortOptions>)) -> Arc<dyn ExecutionPlan>,
+{
+    let join_key_pairs = extract_join_keys(on);
+    if let Some((
+        JoinKeyPairs {
+            left_keys,
+            right_keys,
+        },
+        new_positions,
+    )) = try_reorder(
+        join_key_pairs.clone(),
+        parent_required,
+        &join_plan.equivalence_properties(),
+    ) {
+        if !new_positions.is_empty() {
+            let new_join_on = new_join_conditions(&left_keys, &right_keys);
+            let mut new_sort_options: Vec<SortOptions> = vec![];
+            for idx in 0..sort_options.len() {
+                new_sort_options.push(sort_options[new_positions[idx]])
+            }
+
+            PlanWithKeyRequirements {
+                plan: join_constructor((new_join_on, new_sort_options)),
+                required_key_ordering: vec![],
+                request_key_ordering: vec![Some(left_keys), Some(right_keys)],
+            }
+        } else {
+            PlanWithKeyRequirements {
+                plan: join_plan,
+                required_key_ordering: vec![],
+                request_key_ordering: vec![Some(left_keys), Some(right_keys)],
+            }
+        }
+    } else {
+        PlanWithKeyRequirements {
+            plan: join_plan,
+            required_key_ordering: vec![],
+            request_key_ordering: vec![
+                Some(join_key_pairs.left_keys),
+                Some(join_key_pairs.right_keys),
+            ],
+        }
+    }
+}
+
+fn reorder_aggregate_keys(
+    agg_plan: Arc<dyn ExecutionPlan>,
+    parent_required: &[Arc<dyn PhysicalExpr>],
+    group_by: &PhysicalGroupBy,
+    aggr_expr: &[Arc<dyn AggregateExpr>],
+    agg_input: Arc<dyn ExecutionPlan>,
+    input_schema: &SchemaRef,
+) -> PlanWithKeyRequirements {
+    let out_put_columns = group_by
+        .expr()
+        .iter()
+        .enumerate()
+        .map(|(index, (_col, name))| Column::new(name, index))
+        .collect::<Vec<_>>();
+
+    let out_put_exprs = out_put_columns
+        .iter()
+        .map(|c| Arc::new(c.clone()) as Arc<dyn PhysicalExpr>)
+        .collect::<Vec<_>>();
+
+    if parent_required.len() != out_put_exprs.len()
+        || !group_by.null_expr().is_empty()
+        || expr_list_eq_strict_order(&out_put_exprs, parent_required)
+    {
+        PlanWithKeyRequirements::new(agg_plan)
+    } else {
+        let new_positions = expected_expr_positions(&out_put_exprs, parent_required);
+        match new_positions {
+            None => PlanWithKeyRequirements::new(agg_plan),
+            Some(positions) => {
+                let new_partial_agg = if let Some(AggregateExec {
+                    mode,
+                    group_by,
+                    aggr_expr,
+                    input,
+                    input_schema,
+                    ..
+                }) =
+                    agg_input.as_any().downcast_ref::<AggregateExec>()
+                {
+                    if matches!(mode, AggregateMode::Partial) {
+                        let mut new_group_exprs = vec![];
+                        for idx in positions.iter() {
+                            new_group_exprs.push(group_by.expr()[*idx].clone());
+                        }
+                        let new_partial_group_by =
+                            PhysicalGroupBy::new_single(new_group_exprs);
+                        // new Partial AggregateExec
+                        Some(Arc::new(
+                            AggregateExec::try_new(
+                                AggregateMode::Partial,
+                                new_partial_group_by,
+                                aggr_expr.clone(),
+                                input.clone(),
+                                input_schema.clone(),
+                            )
+                            .unwrap(),
+                        ))
+                    } else {
+                        None
+                    }
+                } else {
+                    None
+                };
+                if let Some(partial_agg) = new_partial_agg {
+                    let mut new_group_exprs = vec![];
+                    for idx in positions.into_iter() {
+                        new_group_exprs.push(group_by.expr()[idx].clone());
+                    }
+                    let new_group_by = PhysicalGroupBy::new_single(new_group_exprs);
+                    let new_final_agg = Arc::new(
+                        AggregateExec::try_new(
+                            AggregateMode::FinalPartitioned,
+                            new_group_by,
+                            aggr_expr.to_vec(),
+                            partial_agg,
+                            input_schema.clone(),
+                        )
+                        .unwrap(),
+                    );
+
+                    // Need to create a new projection to change the expr ordering back
+                    let mut proj_exprs = out_put_columns
+                        .iter()
+                        .map(|col| {
+                            (
+                                Arc::new(Column::new(
+                                    col.name(),
+                                    new_final_agg.schema().index_of(col.name()).unwrap(),
+                                ))
+                                    as Arc<dyn PhysicalExpr>,
+                                col.name().to_owned(),
+                            )
+                        })
+                        .collect::<Vec<_>>();
+                    let agg_schema = new_final_agg.schema();
+                    let agg_fields = agg_schema.fields();
+                    for (idx, field) in
+                        agg_fields.iter().enumerate().skip(out_put_columns.len())
+                    {
+                        proj_exprs.push((
+                            Arc::new(Column::new(field.name().as_str(), idx))
+                                as Arc<dyn PhysicalExpr>,
+                            field.name().clone(),
+                        ))
+                    }
+                    // TODO merge adjacent Projections if there are
+                    PlanWithKeyRequirements::new(Arc::new(
+                        ProjectionExec::try_new(proj_exprs, new_final_agg).unwrap(),
+                    ))
+                } else {
+                    PlanWithKeyRequirements::new(agg_plan)
+                }
+            }
+        }
+    }
+}
+
+fn map_columns_before_projection(
+    parent_required: &[Arc<dyn PhysicalExpr>],
+    proj_exprs: &[(Arc<dyn PhysicalExpr>, String)],
+) -> Vec<Arc<dyn PhysicalExpr>> {
+    let mut column_mapping = HashMap::new();
+    for (expression, name) in proj_exprs.iter() {
+        if let Some(column) = expression.as_any().downcast_ref::<Column>() {
+            column_mapping.insert(name.clone(), column.clone());
+        };
+    }
     let new_required: Vec<Arc<dyn PhysicalExpr>> = parent_required
+        .iter()
+        .filter_map(|r| {
+            if let Some(column) = r.as_any().downcast_ref::<Column>() {
+                column_mapping.get(column.name())
+            } else {
+                None
+            }
+        })
+        .map(|e| Arc::new(e.clone()) as Arc<dyn PhysicalExpr>)
+        .collect::<Vec<_>>();
+    new_required
+}
+
+fn shift_right_required(
+    parent_required: &[Arc<dyn PhysicalExpr>],
+    left_columns_len: usize,
+) -> Option<Vec<Arc<dyn PhysicalExpr>>> {
+    let new_right_required: Vec<Arc<dyn PhysicalExpr>> = parent_required
         .iter()
         .filter_map(|r| {
             if let Some(col) = r.as_any().downcast_ref::<Column>() {
@@ -473,18 +529,33 @@ fn try_push_required_to_right(
         .collect::<Vec<_>>();
 
     // if the parent required are all comming from the right side, the requirements can be pushdown
-    if new_required.len() == parent_required.len() {
-        adjust_input_keys_down_recursively(right.clone(), new_required)
+    if new_right_required.len() != parent_required.len() {
+        None
     } else {
-        adjust_input_keys_down_recursively(right.clone(), vec![])
+        Some(new_right_required)
     }
 }
 
 /// When the physical planner creates the Joins, the ordering of join keys is from the original query.
 /// That might not match with the output partitioning of the join node's children
 /// This method will try to change the ordering of the join keys to match with the
-/// partitioning of the join nodes' children.
-/// If it can not match with both sides, it will try to match with one, either left side or right side.
+/// partitioning of the join nodes' children. If it can not match with both sides, it will try to
+/// match with one, either the left side or the right side.
+///
+/// Example:
+///     TopJoin on (a, b, c)
+///         bottom left join on(b, a, c)
+///         bottom right join on(c, b, a)
+///
+///  Will be adjusted to:
+///     TopJoin on (b, a, c)
+///         bottom left join on(b, a, c)
+///         bottom right join on(c, b, a)
+///
+/// Compared to the Top-Down reordering process, this Bottom-Up approach is much simpler, but might not reach a best result.
+/// The Bottom-Up approach will be useful in future if we plan to support storage partition-wised Joins.
+/// In that case, the datasources/tables might be pre-partitioned and we can't adjust the key ordering of the datasources
+/// and then can't apply the Top-Down reordering process.
 fn reorder_join_keys_to_inputs(
     plan: Arc<dyn crate::physical_plan::ExecutionPlan>,
 ) -> Arc<dyn crate::physical_plan::ExecutionPlan> {
@@ -601,7 +672,7 @@ fn reorder_current_join_keys(
 ) -> Option<(JoinKeyPairs, Vec<usize>)> {
     match (left_partition, right_partition.clone()) {
         (Some(Partitioning::Hash(left_exprs, _)), _) => {
-            try_reorder(join_keys.clone(), left_exprs, left_equivalence_properties)
+            try_reorder(join_keys.clone(), &left_exprs, left_equivalence_properties)
                 .or_else(|| {
                     reorder_current_join_keys(
                         join_keys,
@@ -613,7 +684,7 @@ fn reorder_current_join_keys(
                 })
         }
         (_, Some(Partitioning::Hash(right_exprs, _))) => {
-            try_reorder(join_keys, right_exprs, right_equivalence_properties)
+            try_reorder(join_keys, &right_exprs, right_equivalence_properties)
         }
         _ => None,
     }
@@ -621,7 +692,7 @@ fn reorder_current_join_keys(
 
 fn try_reorder(
     join_keys: JoinKeyPairs,
-    expected: Vec<Arc<dyn PhysicalExpr>>,
+    expected: &[Arc<dyn PhysicalExpr>],
     equivalence_properties: &EquivalenceProperties,
 ) -> Option<(JoinKeyPairs, Vec<usize>)> {
     let mut normalized_expected = vec![];
@@ -630,8 +701,8 @@ fn try_reorder(
     if join_keys.left_keys.len() != expected.len() {
         return None;
     }
-    if expr_list_eq_strict_order(&expected, &join_keys.left_keys)
-        || expr_list_eq_strict_order(&expected, &join_keys.right_keys)
+    if expr_list_eq_strict_order(expected, &join_keys.left_keys)
+        || expr_list_eq_strict_order(expected, &join_keys.right_keys)
     {
         return Some((join_keys, vec![]));
     } else if !equivalence_properties.classes().is_empty() {
@@ -677,8 +748,8 @@ fn try_reorder(
         }
     }
 
-    let new_positions = expected_expr_positions(&join_keys.left_keys, &expected)
-        .or_else(|| expected_expr_positions(&join_keys.right_keys, &expected))
+    let new_positions = expected_expr_positions(&join_keys.left_keys, expected)
+        .or_else(|| expected_expr_positions(&join_keys.right_keys, expected))
         .or_else(|| expected_expr_positions(&normalized_left_keys, &normalized_expected))
         .or_else(|| {
             expected_expr_positions(&normalized_right_keys, &normalized_expected)
@@ -885,6 +956,70 @@ struct JoinKeyPairs {
     right_keys: Vec<Arc<dyn PhysicalExpr>>,
 }
 
+#[derive(Debug, Clone)]
+struct PlanWithKeyRequirements {
+    plan: Arc<dyn ExecutionPlan>,
+    /// Parent required key ordering
+    required_key_ordering: Vec<Arc<dyn PhysicalExpr>>,
+    /// The request key ordering to children
+    request_key_ordering: Vec<Option<Vec<Arc<dyn PhysicalExpr>>>>,
+}
+
+impl PlanWithKeyRequirements {
+    pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
+        let children_len = plan.children().len();
+        PlanWithKeyRequirements {
+            plan,
+            required_key_ordering: vec![],
+            request_key_ordering: vec![None; children_len],
+        }
+    }
+
+    pub fn children(&self) -> Vec<PlanWithKeyRequirements> {
+        let plan_children = self.plan.children();
+        assert_eq!(plan_children.len(), self.request_key_ordering.len());
+        plan_children
+            .into_iter()
+            .zip(self.request_key_ordering.clone().into_iter())
+            .map(|(child, required)| {
+                let from_parent = required.unwrap_or_default();
+                let length = child.children().len();
+                PlanWithKeyRequirements {
+                    plan: child,
+                    required_key_ordering: from_parent.clone(),
+                    request_key_ordering: vec![None; length],
+                }
+            })
+            .collect()
+    }
+}
+
+impl TreeNodeRewritable for PlanWithKeyRequirements {
+    fn map_children<F>(self, transform: F) -> Result<Self>
+    where
+        F: FnMut(Self) -> Result<Self>,
+    {
+        let children = self.children();
+        if !children.is_empty() {
+            let new_children: Result<Vec<_>> =
+                children.into_iter().map(transform).collect();
+
+            let children_plans = new_children?
+                .into_iter()
+                .map(|child| child.plan)
+                .collect::<Vec<_>>();
+            let new_plan = with_new_children_if_necessary(self.plan, children_plans)?;
+            Ok(PlanWithKeyRequirements {
+                plan: new_plan,
+                required_key_ordering: self.required_key_ordering,
+                request_key_ordering: self.request_key_ordering,
+            })
+        } else {
+            Ok(self)
+        }
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use crate::physical_plan::filter::FilterExec;
diff --git a/datafusion/core/src/physical_optimizer/merge_exec.rs b/datafusion/core/src/physical_optimizer/merge_exec.rs
deleted file mode 100644
index 1ed0f6196..000000000
--- a/datafusion/core/src/physical_optimizer/merge_exec.rs
+++ /dev/null
@@ -1,75 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! AddCoalescePartitionsExec adds CoalescePartitionsExec to plans
-//! with more than one partition, to coalesce them into one partition
-//! when the node needs a single partition
-use super::optimizer::PhysicalOptimizerRule;
-use crate::physical_plan::with_new_children_if_necessary;
-use crate::{
-    error::Result,
-    physical_plan::{coalesce_partitions::CoalescePartitionsExec, Distribution},
-};
-use std::sync::Arc;
-
-/// Introduces CoalescePartitionsExec
-#[derive(Default)]
-pub struct AddCoalescePartitionsExec {}
-
-impl AddCoalescePartitionsExec {
-    #[allow(missing_docs)]
-    pub fn new() -> Self {
-        Self {}
-    }
-}
-
-impl PhysicalOptimizerRule for AddCoalescePartitionsExec {
-    fn optimize(
-        &self,
-        plan: Arc<dyn crate::physical_plan::ExecutionPlan>,
-        _config: &crate::execution::context::SessionConfig,
-    ) -> Result<Arc<dyn crate::physical_plan::ExecutionPlan>> {
-        if plan.children().is_empty() {
-            // leaf node, children cannot be replaced
-            Ok(plan.clone())
-        } else {
-            let children = plan
-                .children()
-                .iter()
-                .map(|child| self.optimize(child.clone(), _config))
-                .collect::<Result<Vec<_>>>()?;
-            assert_eq!(children.len(), plan.required_input_distribution().len());
-            let new_children = children
-                .into_iter()
-                .zip(plan.required_input_distribution())
-                .map(|(child, dist)| match dist {
-                    Distribution::SinglePartition
-                        if child.output_partitioning().partition_count() > 1 =>
-                    {
-                        Arc::new(CoalescePartitionsExec::new(child.clone()))
-                    }
-                    _ => child,
-                })
-                .collect::<Vec<_>>();
-            with_new_children_if_necessary(plan, new_children)
-        }
-    }
-
-    fn name(&self) -> &str {
-        "add_merge_exec"
-    }
-}
diff --git a/datafusion/core/src/physical_optimizer/mod.rs b/datafusion/core/src/physical_optimizer/mod.rs
index 5ecb9cd37..5050dae35 100644
--- a/datafusion/core/src/physical_optimizer/mod.rs
+++ b/datafusion/core/src/physical_optimizer/mod.rs
@@ -22,7 +22,6 @@ pub mod aggregate_statistics;
 pub mod coalesce_batches;
 pub mod enforcement;
 pub mod hash_build_probe_order;
-pub mod merge_exec;
 pub mod optimizer;
 pub mod pruning;
 pub mod repartition;
diff --git a/datafusion/core/src/physical_plan/mod.rs b/datafusion/core/src/physical_plan/mod.rs
index 0bbb8bcab..7c3ef4c7e 100644
--- a/datafusion/core/src/physical_plan/mod.rs
+++ b/datafusion/core/src/physical_plan/mod.rs
@@ -252,14 +252,15 @@ pub fn with_new_children_if_necessary(
     plan: Arc<dyn ExecutionPlan>,
     children: Vec<Arc<dyn ExecutionPlan>>,
 ) -> Result<Arc<dyn ExecutionPlan>> {
-    if children.len() != plan.children().len() {
+    let old_children = plan.children();
+    if children.len() != old_children.len() {
         Err(DataFusionError::Internal(
             "Wrong number of children".to_string(),
         ))
     } else if children.is_empty()
         || children
             .iter()
-            .zip(plan.children().iter())
+            .zip(old_children.iter())
             .any(|(c1, c2)| !Arc::ptr_eq(c1, c2))
     {
         plan.with_new_children(children)
@@ -656,3 +657,81 @@ use crate::execution::context::TaskContext;
 pub use datafusion_physical_expr::{
     expressions, functions, hash_utils, type_coercion, udf,
 };
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use arrow::datatypes::DataType;
+    use arrow::datatypes::Schema;
+
+    use crate::physical_plan::Distribution;
+    use crate::physical_plan::Partitioning;
+    use crate::physical_plan::PhysicalExpr;
+    use datafusion_physical_expr::expressions::Column;
+
+    use std::sync::Arc;
+
+    #[tokio::test]
+    async fn partitioning_satisfy_distribution() -> Result<()> {
+        let schema = Arc::new(Schema::new(vec![
+            arrow::datatypes::Field::new("column_1", DataType::Int64, false),
+            arrow::datatypes::Field::new("column_2", DataType::Utf8, false),
+        ]));
+
+        let partition_exprs1: Vec<Arc<dyn PhysicalExpr>> = vec![
+            Arc::new(Column::new_with_schema("column_1", &schema).unwrap()),
+            Arc::new(Column::new_with_schema("column_2", &schema).unwrap()),
+        ];
+
+        let partition_exprs2: Vec<Arc<dyn PhysicalExpr>> = vec![
+            Arc::new(Column::new_with_schema("column_2", &schema).unwrap()),
+            Arc::new(Column::new_with_schema("column_1", &schema).unwrap()),
+        ];
+
+        let distribution_types = vec![
+            Distribution::UnspecifiedDistribution,
+            Distribution::SinglePartition,
+            Distribution::HashPartitioned(partition_exprs1.clone()),
+        ];
+
+        let single_partition = Partitioning::UnknownPartitioning(1);
+        let unspecified_partition = Partitioning::UnknownPartitioning(10);
+        let round_robin_partition = Partitioning::RoundRobinBatch(10);
+        let hash_partition1 = Partitioning::Hash(partition_exprs1, 10);
+        let hash_partition2 = Partitioning::Hash(partition_exprs2, 10);
+
+        for distribution in distribution_types {
+            let result = (
+                single_partition.satisfy(distribution.clone(), || {
+                    EquivalenceProperties::new(schema.clone())
+                }),
+                unspecified_partition.satisfy(distribution.clone(), || {
+                    EquivalenceProperties::new(schema.clone())
+                }),
+                round_robin_partition.satisfy(distribution.clone(), || {
+                    EquivalenceProperties::new(schema.clone())
+                }),
+                hash_partition1.satisfy(distribution.clone(), || {
+                    EquivalenceProperties::new(schema.clone())
+                }),
+                hash_partition2.satisfy(distribution.clone(), || {
+                    EquivalenceProperties::new(schema.clone())
+                }),
+            );
+
+            match distribution {
+                Distribution::UnspecifiedDistribution => {
+                    assert_eq!(result, (true, true, true, true, true))
+                }
+                Distribution::SinglePartition => {
+                    assert_eq!(result, (true, false, false, false, false))
+                }
+                Distribution::HashPartitioned(_) => {
+                    assert_eq!(result, (false, false, false, true, false))
+                }
+            }
+        }
+
+        Ok(())
+    }
+}
diff --git a/datafusion/core/src/physical_plan/rewrite.rs b/datafusion/core/src/physical_plan/rewrite.rs
index 1dfc36eb1..a431dd31a 100644
--- a/datafusion/core/src/physical_plan/rewrite.rs
+++ b/datafusion/core/src/physical_plan/rewrite.rs
@@ -154,9 +154,10 @@ impl TreeNodeRewritable for Arc<dyn ExecutionPlan> {
     where
         F: FnMut(Self) -> Result<Self>,
     {
-        if !self.children().is_empty() {
+        let children = self.children();
+        if !children.is_empty() {
             let new_children: Result<Vec<_>> =
-                self.children().into_iter().map(transform).collect();
+                children.into_iter().map(transform).collect();
             with_new_children_if_necessary(self, new_children?)
         } else {
             Ok(self)
diff --git a/datafusion/physical-expr/src/equivalence.rs b/datafusion/physical-expr/src/equivalence.rs
index 13cafaba4..4651c4054 100644
--- a/datafusion/physical-expr/src/equivalence.rs
+++ b/datafusion/physical-expr/src/equivalence.rs
@@ -247,11 +247,16 @@ mod tests {
         eq_properties.add_equal_conditions(new_condition);
         assert_eq!(eq_properties.classes().len(), 1);
         assert_eq!(eq_properties.classes()[0].len(), 2);
+        assert!(eq_properties.classes()[0].contains(&Column::new("a", 0)));
+        assert!(eq_properties.classes()[0].contains(&Column::new("b", 1)));
 
         let new_condition = (&Column::new("b", 1), &Column::new("c", 2));
         eq_properties.add_equal_conditions(new_condition);
         assert_eq!(eq_properties.classes().len(), 1);
         assert_eq!(eq_properties.classes()[0].len(), 3);
+        assert!(eq_properties.classes()[0].contains(&Column::new("a", 0)));
+        assert!(eq_properties.classes()[0].contains(&Column::new("b", 1)));
+        assert!(eq_properties.classes()[0].contains(&Column::new("c", 2)));
 
         let new_condition = (&Column::new("x", 3), &Column::new("y", 4));
         eq_properties.add_equal_conditions(new_condition);
@@ -261,6 +266,11 @@ mod tests {
         eq_properties.add_equal_conditions(new_condition);
         assert_eq!(eq_properties.classes().len(), 1);
         assert_eq!(eq_properties.classes()[0].len(), 5);
+        assert!(eq_properties.classes()[0].contains(&Column::new("a", 0)));
+        assert!(eq_properties.classes()[0].contains(&Column::new("b", 1)));
+        assert!(eq_properties.classes()[0].contains(&Column::new("c", 2)));
+        assert!(eq_properties.classes()[0].contains(&Column::new("x", 3)));
+        assert!(eq_properties.classes()[0].contains(&Column::new("y", 4)));
 
         Ok(())
     }
@@ -301,6 +311,10 @@ mod tests {
         project_equivalence_properties(input_properties, &alias_map, &mut out_properties);
         assert_eq!(out_properties.classes().len(), 1);
         assert_eq!(out_properties.classes()[0].len(), 4);
+        assert!(out_properties.classes()[0].contains(&Column::new("a1", 0)));
+        assert!(out_properties.classes()[0].contains(&Column::new("a2", 1)));
+        assert!(out_properties.classes()[0].contains(&Column::new("a3", 2)));
+        assert!(out_properties.classes()[0].contains(&Column::new("a4", 3)));
 
         Ok(())
     }
diff --git a/datafusion/physical-expr/src/physical_expr.rs b/datafusion/physical-expr/src/physical_expr.rs
index 0d238582d..64482ac03 100644
--- a/datafusion/physical-expr/src/physical_expr.rs
+++ b/datafusion/physical-expr/src/physical_expr.rs
@@ -193,14 +193,15 @@ pub fn with_new_children_if_necessary(
     expr: Arc<dyn PhysicalExpr>,
     children: Vec<Arc<dyn PhysicalExpr>>,
 ) -> Result<Arc<dyn PhysicalExpr>> {
-    if children.len() != expr.children().len() {
+    let old_children = expr.children();
+    if children.len() != old_children.len() {
         Err(DataFusionError::Internal(
             "PhysicalExpr: Wrong number of children".to_string(),
         ))
     } else if children.is_empty()
         || children
             .iter()
-            .zip(expr.children().iter())
+            .zip(old_children.iter())
             .any(|(c1, c2)| !Arc::ptr_eq(c1, c2))
     {
         expr.with_new_children(children)
diff --git a/datafusion/physical-expr/src/rewrite.rs b/datafusion/physical-expr/src/rewrite.rs
index ccd9210df..487d08455 100644
--- a/datafusion/physical-expr/src/rewrite.rs
+++ b/datafusion/physical-expr/src/rewrite.rs
@@ -152,9 +152,10 @@ impl TreeNodeRewritable for Arc<dyn PhysicalExpr> {
     where
         F: FnMut(Self) -> Result<Self>,
     {
-        if !self.children().is_empty() {
+        let children = self.children();
+        if !children.is_empty() {
             let new_children: Result<Vec<_>> =
-                self.children().into_iter().map(transform).collect();
+                children.into_iter().map(transform).collect();
             with_new_children_if_necessary(self, new_children?)
         } else {
             Ok(self)