You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/11/14 12:05:37 UTC
[arrow-datafusion] branch master updated: [FOLLOWUP] Enforcement Rule: resolve review comments, refactor adjust_input_keys_ordering() (#4184)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new d72b4f04c [FOLLOWUP] Enforcement Rule: resolve review comments, refactor adjust_input_keys_ordering() (#4184)
d72b4f04c is described below
commit d72b4f04cf12424a54b22bf376b87065d5fcb6aa
Author: mingmwang <mi...@ebay.com>
AuthorDate: Mon Nov 14 20:05:30 2022 +0800
[FOLLOWUP] Enforcement Rule: resolve review comments, refactor adjust_input_keys_ordering() (#4184)
* [FOLLOWUP] Enforcement Rule: resolve review comments, refactor adjust_input_keys_ordering()
* fix typo
* remove allow(clippy::vtable_address_comparisons)
---
datafusion/core/src/execution/context.rs | 3 +-
.../core/src/physical_optimizer/enforcement.rs | 737 ++++++++++++---------
.../core/src/physical_optimizer/merge_exec.rs | 75 ---
datafusion/core/src/physical_optimizer/mod.rs | 1 -
datafusion/core/src/physical_plan/mod.rs | 83 ++-
datafusion/core/src/physical_plan/rewrite.rs | 5 +-
datafusion/physical-expr/src/equivalence.rs | 14 +
datafusion/physical-expr/src/physical_expr.rs | 5 +-
datafusion/physical-expr/src/rewrite.rs | 5 +-
9 files changed, 542 insertions(+), 386 deletions(-)
diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs
index e0d2e0611..ce88b9f20 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -1589,8 +1589,9 @@ impl SessionState {
)));
}
physical_optimizers.push(Arc::new(Repartition::new()));
+ // Repartition rule could introduce additional RepartitionExec with RoundRobin partitioning.
+ // To make sure the SinglePartition is satisfied, run the BasicEnforcement again, originally it was the AddCoalescePartitionsExec here.
physical_optimizers.push(Arc::new(BasicEnforcement::new()));
- // physical_optimizers.push(Arc::new(AddCoalescePartitionsExec::new()));
SessionState {
session_id,
diff --git a/datafusion/core/src/physical_optimizer/enforcement.rs b/datafusion/core/src/physical_optimizer/enforcement.rs
index 1eaf153a9..21931397f 100644
--- a/datafusion/core/src/physical_optimizer/enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/enforcement.rs
@@ -29,24 +29,27 @@ use crate::physical_plan::projection::ProjectionExec;
use crate::physical_plan::repartition::RepartitionExec;
use crate::physical_plan::rewrite::TreeNodeRewritable;
use crate::physical_plan::sorts::sort::SortExec;
+use crate::physical_plan::sorts::sort::SortOptions;
use crate::physical_plan::windows::WindowAggExec;
use crate::physical_plan::Partitioning;
use crate::physical_plan::{with_new_children_if_necessary, Distribution, ExecutionPlan};
use crate::prelude::SessionConfig;
+use arrow::datatypes::SchemaRef;
use datafusion_expr::logical_plan::JoinType;
use datafusion_physical_expr::equivalence::EquivalenceProperties;
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::expressions::NoOp;
use datafusion_physical_expr::{
expr_list_eq_strict_order, normalize_expr_with_equivalence_properties,
- normalize_sort_expr_with_equivalence_properties, PhysicalExpr, PhysicalSortExpr,
+ normalize_sort_expr_with_equivalence_properties, AggregateExpr, PhysicalExpr,
+ PhysicalSortExpr,
};
use std::collections::HashMap;
use std::sync::Arc;
/// BasicEnforcement rule, it ensures the Distribution and Ordering requirements are met
/// in the strictest way. It might add additional [[RepartitionExec]] to the plan tree
-/// and give a non-optimal plan, but it can avoid the possible data skew in joins
+/// and give a non-optimal plan, but it can avoid the possible data skew in joins.
///
/// For example for a HashJoin with keys(a, b, c), the required Distribution(a, b, c) can be satisfied by
/// several alternative partitioning ways: [(a, b, c), (a, b), (a, c), (b, c), (a), (b), (c), ( )].
@@ -72,7 +75,10 @@ impl PhysicalOptimizerRule for BasicEnforcement {
let top_down_join_key_reordering = config.top_down_join_key_reordering;
let new_plan = if top_down_join_key_reordering {
// Run a top-down process to adjust input key ordering recursively
- adjust_input_keys_down_recursively(plan, vec![])?
+ let plan_requirements = PlanWithKeyRequirements::new(plan);
+ let adjusted =
+ plan_requirements.transform_down(&adjust_input_keys_ordering)?;
+ adjusted.plan
} else {
plan
};
@@ -99,13 +105,51 @@ impl PhysicalOptimizerRule for BasicEnforcement {
/// When the physical planner creates the Joins, the ordering of join keys is from the original query.
/// That might not match with the output partitioning of the join node's children
-/// This method runs a top-down process and try to adjust the output partitioning of the children
-/// if the children themselves are Joins or Aggregations.
-fn adjust_input_keys_down_recursively(
- plan: Arc<dyn crate::physical_plan::ExecutionPlan>,
- parent_required: Vec<Arc<dyn PhysicalExpr>>,
-) -> Result<Arc<dyn crate::physical_plan::ExecutionPlan>> {
- let plan_any = plan.as_any();
+/// A Top-Down process will use this method to adjust children's output partitioning based on the parent key reordering requirements:
+///
+/// Example:
+/// TopJoin on (a, b, c)
+/// bottom left join on(b, a, c)
+/// bottom right join on(c, b, a)
+///
+/// Will be adjusted to:
+/// TopJoin on (a, b, c)
+/// bottom left join on(a, b, c)
+/// bottom right join on(a, b, c)
+///
+/// Example:
+/// TopJoin on (a, b, c)
+/// Agg1 group by (b, a, c)
+/// Agg2 group by (c, b, a)
+///
+/// Will be adjusted to:
+/// TopJoin on (a, b, c)
+/// Projection(b, a, c)
+/// Agg1 group by (a, b, c)
+/// Projection(c, b, a)
+/// Agg2 group by (a, b, c)
+///
+/// Following is the explanation of the reordering process:
+///
+/// 1) If the current plan is Partitioned HashJoin, SortMergeJoin, check whether the requirements can be satisfied by adjusting join keys ordering:
+/// Requirements can not be satisfied, clear the current requirements, generate new requirements(to pushdown) based on the current join keys, return the unchanged plan.
+/// Requirements is already satisfied, clear the current requirements, generate new requirements(to pushdown) based on the current join keys, return the unchanged plan.
+/// Requirements can be satisfied by adjusting keys ordering, clear the current requiements, generate new requirements(to pushdown) based on the adjusted join keys, return the changed plan.
+///
+/// 2) If the current plan is Aggregation, check whether the requirements can be satisfied by adjusting group by keys ordering:
+/// Requirements can not be satisfied, clear all the requirements, return the unchanged plan.
+/// Requirements is already satisfied, clear all the requirements, return the unchanged plan.
+/// Requirements can be satisfied by adjusting keys ordering, clear all the requirements, return the changed plan.
+///
+/// 3) If the current plan is RepartitionExec, CoalescePartitionsExec or WindowAggExec, clear all the requirements, return the unchanged plan
+/// 4) If the current plan is Projection, transform the requirements to the columns before the Projection and push down requirements
+/// 5) For other types of operators, by default, pushdown the parent requirements to children.
+///
+fn adjust_input_keys_ordering(
+ requirements: PlanWithKeyRequirements,
+) -> Option<PlanWithKeyRequirements> {
+ let parent_required = requirements.required_key_ordering.clone();
+ let plan_any = requirements.plan.as_any();
if let Some(HashJoinExec {
left,
right,
@@ -119,99 +163,65 @@ fn adjust_input_keys_down_recursively(
{
match mode {
PartitionMode::Partitioned => {
- let join_key_pairs = extract_join_keys(on);
- if let Some((
- JoinKeyPairs {
- left_keys,
- right_keys,
- },
- new_positions,
- )) = try_reorder(
- join_key_pairs.clone(),
- parent_required,
- &plan.equivalence_properties(),
- ) {
- let new_join_on = if !new_positions.is_empty() {
- new_join_conditions(&left_keys, &right_keys)
- } else {
- on.clone()
+ let join_constructor =
+ |new_conditions: (Vec<(Column, Column)>, Vec<SortOptions>)| {
+ Arc::new(
+ HashJoinExec::try_new(
+ left.clone(),
+ right.clone(),
+ new_conditions.0,
+ filter.clone(),
+ join_type,
+ PartitionMode::Partitioned,
+ null_equals_null,
+ )
+ .unwrap(),
+ ) as Arc<dyn ExecutionPlan>
};
- let new_left =
- adjust_input_keys_down_recursively(left.clone(), left_keys)?;
- let new_right =
- adjust_input_keys_down_recursively(right.clone(), right_keys)?;
- Ok(Arc::new(HashJoinExec::try_new(
- new_left,
- new_right,
- new_join_on,
- filter.clone(),
- join_type,
- PartitionMode::Partitioned,
- null_equals_null,
- )?))
- } else {
- let new_left = adjust_input_keys_down_recursively(
- left.clone(),
- join_key_pairs.left_keys,
- )?;
- let new_right = adjust_input_keys_down_recursively(
- right.clone(),
- join_key_pairs.right_keys,
- )?;
- Ok(Arc::new(HashJoinExec::try_new(
- new_left,
- new_right,
- on.clone(),
- filter.clone(),
- join_type,
- PartitionMode::Partitioned,
- null_equals_null,
- )?))
- }
+ Some(reorder_partitioned_join_keys(
+ requirements.plan.clone(),
+ &parent_required,
+ on,
+ vec![],
+ &join_constructor,
+ ))
}
PartitionMode::CollectLeft => {
- let new_left = adjust_input_keys_down_recursively(left.clone(), vec![])?;
- let new_right = match join_type {
- JoinType::Inner | JoinType::Right => try_push_required_to_right(
- parent_required,
- right.clone(),
+ let new_right_request = match join_type {
+ JoinType::Inner | JoinType::Right => shift_right_required(
+ &parent_required,
left.schema().fields().len(),
- )?,
+ ),
JoinType::RightSemi | JoinType::RightAnti => {
- adjust_input_keys_down_recursively(
- right.clone(),
- parent_required.clone(),
- )?
+ Some(parent_required.clone())
}
JoinType::Left
| JoinType::LeftSemi
| JoinType::LeftAnti
- | JoinType::Full => {
- adjust_input_keys_down_recursively(right.clone(), vec![])?
- }
+ | JoinType::Full => None,
};
- Ok(Arc::new(HashJoinExec::try_new(
- new_left,
- new_right,
- on.clone(),
- filter.clone(),
- join_type,
- PartitionMode::CollectLeft,
- null_equals_null,
- )?))
+ // Push down requirements to the right side
+ Some(PlanWithKeyRequirements {
+ plan: requirements.plan.clone(),
+ required_key_ordering: vec![],
+ request_key_ordering: vec![None, new_right_request],
+ })
}
}
- } else if let Some(CrossJoinExec { left, right, .. }) =
+ } else if let Some(CrossJoinExec { left, .. }) =
plan_any.downcast_ref::<CrossJoinExec>()
{
- let new_left = adjust_input_keys_down_recursively(left.clone(), vec![])?;
- let new_right = try_push_required_to_right(
- parent_required,
- right.clone(),
- left.schema().fields().len(),
- )?;
- Ok(Arc::new(CrossJoinExec::try_new(new_left, new_right)?))
+ let left_columns_len = left.schema().fields().len();
+ // Push down requirements to the right side
+ Some(PlanWithKeyRequirements {
+ plan: requirements.plan.clone(),
+ required_key_ordering: vec![],
+ request_key_ordering: vec![
+ None,
+ shift_right_required(&parent_required, left_columns_len),
+ ],
+ })
} else if let Some(SortMergeJoinExec {
left,
right,
@@ -222,63 +232,27 @@ fn adjust_input_keys_down_recursively(
..
}) = plan_any.downcast_ref::<SortMergeJoinExec>()
{
- let join_key_pairs = extract_join_keys(on);
- if let Some((
- JoinKeyPairs {
- left_keys,
- right_keys,
- },
- new_positions,
- )) = try_reorder(
- join_key_pairs.clone(),
- parent_required,
- &plan.equivalence_properties(),
- ) {
- let new_join_on = if !new_positions.is_empty() {
- new_join_conditions(&left_keys, &right_keys)
- } else {
- on.clone()
- };
- let new_options = if !new_positions.is_empty() {
- let mut new_sort_options = vec![];
- for idx in 0..sort_options.len() {
- new_sort_options.push(sort_options[new_positions[idx]])
- }
- new_sort_options
- } else {
- sort_options.clone()
+ let join_constructor =
+ |new_conditions: (Vec<(Column, Column)>, Vec<SortOptions>)| {
+ Arc::new(
+ SortMergeJoinExec::try_new(
+ left.clone(),
+ right.clone(),
+ new_conditions.0,
+ *join_type,
+ new_conditions.1,
+ *null_equals_null,
+ )
+ .unwrap(),
+ ) as Arc<dyn ExecutionPlan>
};
-
- let new_left = adjust_input_keys_down_recursively(left.clone(), left_keys)?;
- let new_right =
- adjust_input_keys_down_recursively(right.clone(), right_keys)?;
-
- Ok(Arc::new(SortMergeJoinExec::try_new(
- new_left,
- new_right,
- new_join_on,
- *join_type,
- new_options,
- *null_equals_null,
- )?))
- } else {
- let new_left = adjust_input_keys_down_recursively(
- left.clone(),
- join_key_pairs.left_keys,
- )?;
- let new_right = adjust_input_keys_down_recursively(
- right.clone(),
- join_key_pairs.right_keys,
- )?;
- Ok(Arc::new(SortMergeJoinExec::try_new(
- new_left,
- new_right,
- on.clone(),
- *join_type,
- sort_options.clone(),
- *null_equals_null,
- )?))
- }
+ Some(reorder_partitioned_join_keys(
+ requirements.plan.clone(),
+ &parent_required,
+ on,
+ sort_options.clone(),
+ &join_constructor,
+ ))
} else if let Some(AggregateExec {
mode,
group_by,
@@ -288,173 +262,255 @@ fn adjust_input_keys_down_recursively(
..
}) = plan_any.downcast_ref::<AggregateExec>()
{
- if parent_required.is_empty() {
- plan.map_children(|plan| adjust_input_keys_down_recursively(plan, vec![]))
- } else {
+ if !parent_required.is_empty() {
match mode {
- AggregateMode::Final => plan.map_children(|plan| {
- adjust_input_keys_down_recursively(plan, vec![])
- }),
- AggregateMode::FinalPartitioned | AggregateMode::Partial => {
- let out_put_columns = group_by
- .expr()
- .iter()
- .enumerate()
- .map(|(index, (_col, name))| Column::new(name, index))
- .collect::<Vec<_>>();
-
- let out_put_exprs = out_put_columns
- .iter()
- .map(|c| Arc::new(c.clone()) as Arc<dyn PhysicalExpr>)
- .collect::<Vec<_>>();
-
- // Check whether the requirements can be satisfied by the Aggregation
- if parent_required.len() != out_put_exprs.len()
- || expr_list_eq_strict_order(&out_put_exprs, &parent_required)
- || !group_by.null_expr().is_empty()
- {
- plan.map_children(|plan| {
- adjust_input_keys_down_recursively(plan, vec![])
- })
- } else {
- let new_positions =
- expected_expr_positions(&out_put_exprs, &parent_required);
- match new_positions {
- Some(positions) => {
- let mut new_group_exprs = vec![];
- for idx in positions.into_iter() {
- new_group_exprs.push(group_by.expr()[idx].clone());
- }
- let new_group_by =
- PhysicalGroupBy::new_single(new_group_exprs);
- match mode {
- AggregateMode::FinalPartitioned => {
- // Since the input of FinalPartitioned should be the Partial AggregateExec and they should
- // share the same column order, it's safe to call adjust_input_keys_down_recursively here
- let new_input =
- adjust_input_keys_down_recursively(
- input.clone(),
- parent_required,
- )?;
- let new_agg = Arc::new(AggregateExec::try_new(
- AggregateMode::FinalPartitioned,
- new_group_by,
- aggr_expr.clone(),
- new_input,
- input_schema.clone(),
- )?);
-
- // Need to create a new projection to change the expr ordering back
- let mut proj_exprs = out_put_columns
- .iter()
- .map(|col| {
- (
- Arc::new(Column::new(
- col.name(),
- new_agg
- .schema()
- .index_of(col.name())
- .unwrap(),
- ))
- as Arc<dyn PhysicalExpr>,
- col.name().to_owned(),
- )
- })
- .collect::<Vec<_>>();
- let agg_schema = new_agg.schema();
- let agg_fields = agg_schema.fields();
- for (idx, field) in agg_fields
- .iter()
- .enumerate()
- .skip(out_put_columns.len())
- {
- proj_exprs.push((
- Arc::new(Column::new(
- field.name().as_str(),
- idx,
- ))
- as Arc<dyn PhysicalExpr>,
- field.name().clone(),
- ))
- }
- // TODO merge adjacent Projections if there are
- Ok(Arc::new(ProjectionExec::try_new(
- proj_exprs, new_agg,
- )?))
- }
- AggregateMode::Partial => {
- let new_input =
- adjust_input_keys_down_recursively(
- input.clone(),
- vec![],
- )?;
- Ok(Arc::new(AggregateExec::try_new(
- AggregateMode::Partial,
- new_group_by,
- aggr_expr.clone(),
- new_input,
- input_schema.clone(),
- )?))
- }
- _ => Ok(plan),
- }
- }
- None => plan.map_children(|plan| {
- adjust_input_keys_down_recursively(plan, vec![])
- }),
- }
- }
- }
+ AggregateMode::FinalPartitioned => Some(reorder_aggregate_keys(
+ requirements.plan.clone(),
+ &parent_required,
+ group_by,
+ aggr_expr,
+ input.clone(),
+ input_schema,
+ )),
+ _ => Some(PlanWithKeyRequirements::new(requirements.plan.clone())),
}
+ } else {
+ // Keep everything unchanged
+ None
}
} else if let Some(ProjectionExec { expr, .. }) =
plan_any.downcast_ref::<ProjectionExec>()
{
- // For Projection, we need to transform the columns to the columns before the Projection
+ // For Projection, we need to transform the requirements to the columns before the Projection
// And then to push down the requirements
// Construct a mapping from new name to the the orginal Column
- let mut column_mapping = HashMap::new();
- for (expression, name) in expr.iter() {
- if let Some(column) = expression.as_any().downcast_ref::<Column>() {
- column_mapping.insert(name.clone(), column.clone());
- };
- }
- let new_required: Vec<Arc<dyn PhysicalExpr>> = parent_required
- .iter()
- .filter_map(|r| {
- if let Some(column) = r.as_any().downcast_ref::<Column>() {
- column_mapping.get(column.name())
- } else {
- None
- }
- })
- .map(|e| Arc::new(e.clone()) as Arc<dyn PhysicalExpr>)
- .collect::<Vec<_>>();
+ let new_required = map_columns_before_projection(&parent_required, expr);
if new_required.len() == parent_required.len() {
- plan.map_children(|plan| {
- adjust_input_keys_down_recursively(plan, new_required.clone())
+ Some(PlanWithKeyRequirements {
+ plan: requirements.plan.clone(),
+ required_key_ordering: vec![],
+ request_key_ordering: vec![Some(new_required.clone())],
})
} else {
- plan.map_children(|plan| adjust_input_keys_down_recursively(plan, vec![]))
+ // Can not satisfy, clear the current requirements and generate new empty requirements
+ Some(PlanWithKeyRequirements::new(requirements.plan.clone()))
}
} else if plan_any.downcast_ref::<RepartitionExec>().is_some()
|| plan_any.downcast_ref::<CoalescePartitionsExec>().is_some()
|| plan_any.downcast_ref::<WindowAggExec>().is_some()
{
- plan.map_children(|plan| adjust_input_keys_down_recursively(plan, vec![]))
+ Some(PlanWithKeyRequirements::new(requirements.plan.clone()))
} else {
- plan.map_children(|plan| {
- adjust_input_keys_down_recursively(plan, parent_required.clone())
+ // By default, push down the parent requirements to children
+ let children_len = requirements.plan.children().len();
+ Some(PlanWithKeyRequirements {
+ plan: requirements.plan.clone(),
+ required_key_ordering: vec![],
+ request_key_ordering: vec![Some(parent_required.clone()); children_len],
})
}
}
-fn try_push_required_to_right(
- parent_required: Vec<Arc<dyn PhysicalExpr>>,
- right: Arc<dyn ExecutionPlan>,
- left_columns_len: usize,
-) -> Result<Arc<dyn ExecutionPlan>> {
+fn reorder_partitioned_join_keys<F>(
+ join_plan: Arc<dyn ExecutionPlan>,
+ parent_required: &[Arc<dyn PhysicalExpr>],
+ on: &[(Column, Column)],
+ sort_options: Vec<SortOptions>,
+ join_constructor: &F,
+) -> PlanWithKeyRequirements
+where
+ F: Fn((Vec<(Column, Column)>, Vec<SortOptions>)) -> Arc<dyn ExecutionPlan>,
+{
+ let join_key_pairs = extract_join_keys(on);
+ if let Some((
+ JoinKeyPairs {
+ left_keys,
+ right_keys,
+ },
+ new_positions,
+ )) = try_reorder(
+ join_key_pairs.clone(),
+ parent_required,
+ &join_plan.equivalence_properties(),
+ ) {
+ if !new_positions.is_empty() {
+ let new_join_on = new_join_conditions(&left_keys, &right_keys);
+ let mut new_sort_options: Vec<SortOptions> = vec![];
+ for idx in 0..sort_options.len() {
+ new_sort_options.push(sort_options[new_positions[idx]])
+ }
+
+ PlanWithKeyRequirements {
+ plan: join_constructor((new_join_on, new_sort_options)),
+ required_key_ordering: vec![],
+ request_key_ordering: vec![Some(left_keys), Some(right_keys)],
+ }
+ } else {
+ PlanWithKeyRequirements {
+ plan: join_plan,
+ required_key_ordering: vec![],
+ request_key_ordering: vec![Some(left_keys), Some(right_keys)],
+ }
+ }
+ } else {
+ PlanWithKeyRequirements {
+ plan: join_plan,
+ required_key_ordering: vec![],
+ request_key_ordering: vec![
+ Some(join_key_pairs.left_keys),
+ Some(join_key_pairs.right_keys),
+ ],
+ }
+ }
+}
+
+fn reorder_aggregate_keys(
+ agg_plan: Arc<dyn ExecutionPlan>,
+ parent_required: &[Arc<dyn PhysicalExpr>],
+ group_by: &PhysicalGroupBy,
+ aggr_expr: &[Arc<dyn AggregateExpr>],
+ agg_input: Arc<dyn ExecutionPlan>,
+ input_schema: &SchemaRef,
+) -> PlanWithKeyRequirements {
+ let out_put_columns = group_by
+ .expr()
+ .iter()
+ .enumerate()
+ .map(|(index, (_col, name))| Column::new(name, index))
+ .collect::<Vec<_>>();
+
+ let out_put_exprs = out_put_columns
+ .iter()
+ .map(|c| Arc::new(c.clone()) as Arc<dyn PhysicalExpr>)
+ .collect::<Vec<_>>();
+
+ if parent_required.len() != out_put_exprs.len()
+ || !group_by.null_expr().is_empty()
+ || expr_list_eq_strict_order(&out_put_exprs, parent_required)
+ {
+ PlanWithKeyRequirements::new(agg_plan)
+ } else {
+ let new_positions = expected_expr_positions(&out_put_exprs, parent_required);
+ match new_positions {
+ None => PlanWithKeyRequirements::new(agg_plan),
+ Some(positions) => {
+ let new_partial_agg = if let Some(AggregateExec {
+ mode,
+ group_by,
+ aggr_expr,
+ input,
+ input_schema,
+ ..
+ }) =
+ agg_input.as_any().downcast_ref::<AggregateExec>()
+ {
+ if matches!(mode, AggregateMode::Partial) {
+ let mut new_group_exprs = vec![];
+ for idx in positions.iter() {
+ new_group_exprs.push(group_by.expr()[*idx].clone());
+ }
+ let new_partial_group_by =
+ PhysicalGroupBy::new_single(new_group_exprs);
+ // new Partial AggregateExec
+ Some(Arc::new(
+ AggregateExec::try_new(
+ AggregateMode::Partial,
+ new_partial_group_by,
+ aggr_expr.clone(),
+ input.clone(),
+ input_schema.clone(),
+ )
+ .unwrap(),
+ ))
+ } else {
+ None
+ }
+ } else {
+ None
+ };
+ if let Some(partial_agg) = new_partial_agg {
+ let mut new_group_exprs = vec![];
+ for idx in positions.into_iter() {
+ new_group_exprs.push(group_by.expr()[idx].clone());
+ }
+ let new_group_by = PhysicalGroupBy::new_single(new_group_exprs);
+ let new_final_agg = Arc::new(
+ AggregateExec::try_new(
+ AggregateMode::FinalPartitioned,
+ new_group_by,
+ aggr_expr.to_vec(),
+ partial_agg,
+ input_schema.clone(),
+ )
+ .unwrap(),
+ );
+
+ // Need to create a new projection to change the expr ordering back
+ let mut proj_exprs = out_put_columns
+ .iter()
+ .map(|col| {
+ (
+ Arc::new(Column::new(
+ col.name(),
+ new_final_agg.schema().index_of(col.name()).unwrap(),
+ ))
+ as Arc<dyn PhysicalExpr>,
+ col.name().to_owned(),
+ )
+ })
+ .collect::<Vec<_>>();
+ let agg_schema = new_final_agg.schema();
+ let agg_fields = agg_schema.fields();
+ for (idx, field) in
+ agg_fields.iter().enumerate().skip(out_put_columns.len())
+ {
+ proj_exprs.push((
+ Arc::new(Column::new(field.name().as_str(), idx))
+ as Arc<dyn PhysicalExpr>,
+ field.name().clone(),
+ ))
+ }
+ // TODO merge adjacent Projections if there are
+ PlanWithKeyRequirements::new(Arc::new(
+ ProjectionExec::try_new(proj_exprs, new_final_agg).unwrap(),
+ ))
+ } else {
+ PlanWithKeyRequirements::new(agg_plan)
+ }
+ }
+ }
+ }
+}
+
+fn map_columns_before_projection(
+ parent_required: &[Arc<dyn PhysicalExpr>],
+ proj_exprs: &[(Arc<dyn PhysicalExpr>, String)],
+) -> Vec<Arc<dyn PhysicalExpr>> {
+ let mut column_mapping = HashMap::new();
+ for (expression, name) in proj_exprs.iter() {
+ if let Some(column) = expression.as_any().downcast_ref::<Column>() {
+ column_mapping.insert(name.clone(), column.clone());
+ };
+ }
let new_required: Vec<Arc<dyn PhysicalExpr>> = parent_required
+ .iter()
+ .filter_map(|r| {
+ if let Some(column) = r.as_any().downcast_ref::<Column>() {
+ column_mapping.get(column.name())
+ } else {
+ None
+ }
+ })
+ .map(|e| Arc::new(e.clone()) as Arc<dyn PhysicalExpr>)
+ .collect::<Vec<_>>();
+ new_required
+}
+
+fn shift_right_required(
+ parent_required: &[Arc<dyn PhysicalExpr>],
+ left_columns_len: usize,
+) -> Option<Vec<Arc<dyn PhysicalExpr>>> {
+ let new_right_required: Vec<Arc<dyn PhysicalExpr>> = parent_required
.iter()
.filter_map(|r| {
if let Some(col) = r.as_any().downcast_ref::<Column>() {
@@ -473,18 +529,33 @@ fn try_push_required_to_right(
.collect::<Vec<_>>();
// if the parent required are all comming from the right side, the requirements can be pushdown
- if new_required.len() == parent_required.len() {
- adjust_input_keys_down_recursively(right.clone(), new_required)
+ if new_right_required.len() != parent_required.len() {
+ None
} else {
- adjust_input_keys_down_recursively(right.clone(), vec![])
+ Some(new_right_required)
}
}
/// When the physical planner creates the Joins, the ordering of join keys is from the original query.
/// That might not match with the output partitioning of the join node's children
/// This method will try to change the ordering of the join keys to match with the
-/// partitioning of the join nodes' children.
-/// If it can not match with both sides, it will try to match with one, either left side or right side.
+/// partitioning of the join nodes' children. If it can not match with both sides, it will try to
+/// match with one, either the left side or the right side.
+///
+/// Example:
+/// TopJoin on (a, b, c)
+/// bottom left join on(b, a, c)
+/// bottom right join on(c, b, a)
+///
+/// Will be adjusted to:
+/// TopJoin on (b, a, c)
+/// bottom left join on(b, a, c)
+/// bottom right join on(c, b, a)
+///
+/// Compared to the Top-Down reordering process, this Bottom-Up approach is much simpler, but might not reach a best result.
+/// The Bottom-Up approach will be useful in future if we plan to support storage partition-wised Joins.
+/// In that case, the datasources/tables might be pre-partitioned and we can't adjust the key ordering of the datasources
+/// and then can't apply the Top-Down reordering process.
fn reorder_join_keys_to_inputs(
plan: Arc<dyn crate::physical_plan::ExecutionPlan>,
) -> Arc<dyn crate::physical_plan::ExecutionPlan> {
@@ -601,7 +672,7 @@ fn reorder_current_join_keys(
) -> Option<(JoinKeyPairs, Vec<usize>)> {
match (left_partition, right_partition.clone()) {
(Some(Partitioning::Hash(left_exprs, _)), _) => {
- try_reorder(join_keys.clone(), left_exprs, left_equivalence_properties)
+ try_reorder(join_keys.clone(), &left_exprs, left_equivalence_properties)
.or_else(|| {
reorder_current_join_keys(
join_keys,
@@ -613,7 +684,7 @@ fn reorder_current_join_keys(
})
}
(_, Some(Partitioning::Hash(right_exprs, _))) => {
- try_reorder(join_keys, right_exprs, right_equivalence_properties)
+ try_reorder(join_keys, &right_exprs, right_equivalence_properties)
}
_ => None,
}
@@ -621,7 +692,7 @@ fn reorder_current_join_keys(
fn try_reorder(
join_keys: JoinKeyPairs,
- expected: Vec<Arc<dyn PhysicalExpr>>,
+ expected: &[Arc<dyn PhysicalExpr>],
equivalence_properties: &EquivalenceProperties,
) -> Option<(JoinKeyPairs, Vec<usize>)> {
let mut normalized_expected = vec![];
@@ -630,8 +701,8 @@ fn try_reorder(
if join_keys.left_keys.len() != expected.len() {
return None;
}
- if expr_list_eq_strict_order(&expected, &join_keys.left_keys)
- || expr_list_eq_strict_order(&expected, &join_keys.right_keys)
+ if expr_list_eq_strict_order(expected, &join_keys.left_keys)
+ || expr_list_eq_strict_order(expected, &join_keys.right_keys)
{
return Some((join_keys, vec![]));
} else if !equivalence_properties.classes().is_empty() {
@@ -677,8 +748,8 @@ fn try_reorder(
}
}
- let new_positions = expected_expr_positions(&join_keys.left_keys, &expected)
- .or_else(|| expected_expr_positions(&join_keys.right_keys, &expected))
+ let new_positions = expected_expr_positions(&join_keys.left_keys, expected)
+ .or_else(|| expected_expr_positions(&join_keys.right_keys, expected))
.or_else(|| expected_expr_positions(&normalized_left_keys, &normalized_expected))
.or_else(|| {
expected_expr_positions(&normalized_right_keys, &normalized_expected)
@@ -885,6 +956,70 @@ struct JoinKeyPairs {
right_keys: Vec<Arc<dyn PhysicalExpr>>,
}
+#[derive(Debug, Clone)]
+struct PlanWithKeyRequirements {
+ plan: Arc<dyn ExecutionPlan>,
+ /// Parent required key ordering
+ required_key_ordering: Vec<Arc<dyn PhysicalExpr>>,
+ /// The request key ordering to children
+ request_key_ordering: Vec<Option<Vec<Arc<dyn PhysicalExpr>>>>,
+}
+
+impl PlanWithKeyRequirements {
+ pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
+ let children_len = plan.children().len();
+ PlanWithKeyRequirements {
+ plan,
+ required_key_ordering: vec![],
+ request_key_ordering: vec![None; children_len],
+ }
+ }
+
+ pub fn children(&self) -> Vec<PlanWithKeyRequirements> {
+ let plan_children = self.plan.children();
+ assert_eq!(plan_children.len(), self.request_key_ordering.len());
+ plan_children
+ .into_iter()
+ .zip(self.request_key_ordering.clone().into_iter())
+ .map(|(child, required)| {
+ let from_parent = required.unwrap_or_default();
+ let length = child.children().len();
+ PlanWithKeyRequirements {
+ plan: child,
+ required_key_ordering: from_parent.clone(),
+ request_key_ordering: vec![None; length],
+ }
+ })
+ .collect()
+ }
+}
+
+impl TreeNodeRewritable for PlanWithKeyRequirements {
+ fn map_children<F>(self, transform: F) -> Result<Self>
+ where
+ F: FnMut(Self) -> Result<Self>,
+ {
+ let children = self.children();
+ if !children.is_empty() {
+ let new_children: Result<Vec<_>> =
+ children.into_iter().map(transform).collect();
+
+ let children_plans = new_children?
+ .into_iter()
+ .map(|child| child.plan)
+ .collect::<Vec<_>>();
+ let new_plan = with_new_children_if_necessary(self.plan, children_plans)?;
+ Ok(PlanWithKeyRequirements {
+ plan: new_plan,
+ required_key_ordering: self.required_key_ordering,
+ request_key_ordering: self.request_key_ordering,
+ })
+ } else {
+ Ok(self)
+ }
+ }
+}
+
#[cfg(test)]
mod tests {
use crate::physical_plan::filter::FilterExec;
diff --git a/datafusion/core/src/physical_optimizer/merge_exec.rs b/datafusion/core/src/physical_optimizer/merge_exec.rs
deleted file mode 100644
index 1ed0f6196..000000000
--- a/datafusion/core/src/physical_optimizer/merge_exec.rs
+++ /dev/null
@@ -1,75 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! AddCoalescePartitionsExec adds CoalescePartitionsExec to plans
-//! with more than one partition, to coalesce them into one partition
-//! when the node needs a single partition
-use super::optimizer::PhysicalOptimizerRule;
-use crate::physical_plan::with_new_children_if_necessary;
-use crate::{
- error::Result,
- physical_plan::{coalesce_partitions::CoalescePartitionsExec, Distribution},
-};
-use std::sync::Arc;
-
-/// Introduces CoalescePartitionsExec
-#[derive(Default)]
-pub struct AddCoalescePartitionsExec {}
-
-impl AddCoalescePartitionsExec {
- #[allow(missing_docs)]
- pub fn new() -> Self {
- Self {}
- }
-}
-
-impl PhysicalOptimizerRule for AddCoalescePartitionsExec {
- fn optimize(
- &self,
- plan: Arc<dyn crate::physical_plan::ExecutionPlan>,
- _config: &crate::execution::context::SessionConfig,
- ) -> Result<Arc<dyn crate::physical_plan::ExecutionPlan>> {
- if plan.children().is_empty() {
- // leaf node, children cannot be replaced
- Ok(plan.clone())
- } else {
- let children = plan
- .children()
- .iter()
- .map(|child| self.optimize(child.clone(), _config))
- .collect::<Result<Vec<_>>>()?;
- assert_eq!(children.len(), plan.required_input_distribution().len());
- let new_children = children
- .into_iter()
- .zip(plan.required_input_distribution())
- .map(|(child, dist)| match dist {
- Distribution::SinglePartition
- if child.output_partitioning().partition_count() > 1 =>
- {
- Arc::new(CoalescePartitionsExec::new(child.clone()))
- }
- _ => child,
- })
- .collect::<Vec<_>>();
- with_new_children_if_necessary(plan, new_children)
- }
- }
-
- fn name(&self) -> &str {
- "add_merge_exec"
- }
-}
diff --git a/datafusion/core/src/physical_optimizer/mod.rs b/datafusion/core/src/physical_optimizer/mod.rs
index 5ecb9cd37..5050dae35 100644
--- a/datafusion/core/src/physical_optimizer/mod.rs
+++ b/datafusion/core/src/physical_optimizer/mod.rs
@@ -22,7 +22,6 @@ pub mod aggregate_statistics;
pub mod coalesce_batches;
pub mod enforcement;
pub mod hash_build_probe_order;
-pub mod merge_exec;
pub mod optimizer;
pub mod pruning;
pub mod repartition;
diff --git a/datafusion/core/src/physical_plan/mod.rs b/datafusion/core/src/physical_plan/mod.rs
index 0bbb8bcab..7c3ef4c7e 100644
--- a/datafusion/core/src/physical_plan/mod.rs
+++ b/datafusion/core/src/physical_plan/mod.rs
@@ -252,14 +252,15 @@ pub fn with_new_children_if_necessary(
plan: Arc<dyn ExecutionPlan>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
- if children.len() != plan.children().len() {
+ let old_children = plan.children();
+ if children.len() != old_children.len() {
Err(DataFusionError::Internal(
"Wrong number of children".to_string(),
))
} else if children.is_empty()
|| children
.iter()
- .zip(plan.children().iter())
+ .zip(old_children.iter())
.any(|(c1, c2)| !Arc::ptr_eq(c1, c2))
{
plan.with_new_children(children)
@@ -656,3 +657,81 @@ use crate::execution::context::TaskContext;
pub use datafusion_physical_expr::{
expressions, functions, hash_utils, type_coercion, udf,
};
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use arrow::datatypes::DataType;
+ use arrow::datatypes::Schema;
+
+ use crate::physical_plan::Distribution;
+ use crate::physical_plan::Partitioning;
+ use crate::physical_plan::PhysicalExpr;
+ use datafusion_physical_expr::expressions::Column;
+
+ use std::sync::Arc;
+
+ #[tokio::test]
+ async fn partitioning_satisfy_distribution() -> Result<()> {
+ let schema = Arc::new(Schema::new(vec![
+ arrow::datatypes::Field::new("column_1", DataType::Int64, false),
+ arrow::datatypes::Field::new("column_2", DataType::Utf8, false),
+ ]));
+
+ let partition_exprs1: Vec<Arc<dyn PhysicalExpr>> = vec![
+ Arc::new(Column::new_with_schema("column_1", &schema).unwrap()),
+ Arc::new(Column::new_with_schema("column_2", &schema).unwrap()),
+ ];
+
+ let partition_exprs2: Vec<Arc<dyn PhysicalExpr>> = vec![
+ Arc::new(Column::new_with_schema("column_2", &schema).unwrap()),
+ Arc::new(Column::new_with_schema("column_1", &schema).unwrap()),
+ ];
+
+ let distribution_types = vec![
+ Distribution::UnspecifiedDistribution,
+ Distribution::SinglePartition,
+ Distribution::HashPartitioned(partition_exprs1.clone()),
+ ];
+
+ let single_partition = Partitioning::UnknownPartitioning(1);
+ let unspecified_partition = Partitioning::UnknownPartitioning(10);
+ let round_robin_partition = Partitioning::RoundRobinBatch(10);
+ let hash_partition1 = Partitioning::Hash(partition_exprs1, 10);
+ let hash_partition2 = Partitioning::Hash(partition_exprs2, 10);
+
+ for distribution in distribution_types {
+ let result = (
+ single_partition.satisfy(distribution.clone(), || {
+ EquivalenceProperties::new(schema.clone())
+ }),
+ unspecified_partition.satisfy(distribution.clone(), || {
+ EquivalenceProperties::new(schema.clone())
+ }),
+ round_robin_partition.satisfy(distribution.clone(), || {
+ EquivalenceProperties::new(schema.clone())
+ }),
+ hash_partition1.satisfy(distribution.clone(), || {
+ EquivalenceProperties::new(schema.clone())
+ }),
+ hash_partition2.satisfy(distribution.clone(), || {
+ EquivalenceProperties::new(schema.clone())
+ }),
+ );
+
+ match distribution {
+ Distribution::UnspecifiedDistribution => {
+ assert_eq!(result, (true, true, true, true, true))
+ }
+ Distribution::SinglePartition => {
+ assert_eq!(result, (true, false, false, false, false))
+ }
+ Distribution::HashPartitioned(_) => {
+ assert_eq!(result, (false, false, false, true, false))
+ }
+ }
+ }
+
+ Ok(())
+ }
+}
diff --git a/datafusion/core/src/physical_plan/rewrite.rs b/datafusion/core/src/physical_plan/rewrite.rs
index 1dfc36eb1..a431dd31a 100644
--- a/datafusion/core/src/physical_plan/rewrite.rs
+++ b/datafusion/core/src/physical_plan/rewrite.rs
@@ -154,9 +154,10 @@ impl TreeNodeRewritable for Arc<dyn ExecutionPlan> {
where
F: FnMut(Self) -> Result<Self>,
{
- if !self.children().is_empty() {
+ let children = self.children();
+ if !children.is_empty() {
let new_children: Result<Vec<_>> =
- self.children().into_iter().map(transform).collect();
+ children.into_iter().map(transform).collect();
with_new_children_if_necessary(self, new_children?)
} else {
Ok(self)
diff --git a/datafusion/physical-expr/src/equivalence.rs b/datafusion/physical-expr/src/equivalence.rs
index 13cafaba4..4651c4054 100644
--- a/datafusion/physical-expr/src/equivalence.rs
+++ b/datafusion/physical-expr/src/equivalence.rs
@@ -247,11 +247,16 @@ mod tests {
eq_properties.add_equal_conditions(new_condition);
assert_eq!(eq_properties.classes().len(), 1);
assert_eq!(eq_properties.classes()[0].len(), 2);
+ assert!(eq_properties.classes()[0].contains(&Column::new("a", 0)));
+ assert!(eq_properties.classes()[0].contains(&Column::new("b", 1)));
let new_condition = (&Column::new("b", 1), &Column::new("c", 2));
eq_properties.add_equal_conditions(new_condition);
assert_eq!(eq_properties.classes().len(), 1);
assert_eq!(eq_properties.classes()[0].len(), 3);
+ assert!(eq_properties.classes()[0].contains(&Column::new("a", 0)));
+ assert!(eq_properties.classes()[0].contains(&Column::new("b", 1)));
+ assert!(eq_properties.classes()[0].contains(&Column::new("c", 2)));
let new_condition = (&Column::new("x", 3), &Column::new("y", 4));
eq_properties.add_equal_conditions(new_condition);
@@ -261,6 +266,11 @@ mod tests {
eq_properties.add_equal_conditions(new_condition);
assert_eq!(eq_properties.classes().len(), 1);
assert_eq!(eq_properties.classes()[0].len(), 5);
+ assert!(eq_properties.classes()[0].contains(&Column::new("a", 0)));
+ assert!(eq_properties.classes()[0].contains(&Column::new("b", 1)));
+ assert!(eq_properties.classes()[0].contains(&Column::new("c", 2)));
+ assert!(eq_properties.classes()[0].contains(&Column::new("x", 3)));
+ assert!(eq_properties.classes()[0].contains(&Column::new("y", 4)));
Ok(())
}
@@ -301,6 +311,10 @@ mod tests {
project_equivalence_properties(input_properties, &alias_map, &mut out_properties);
assert_eq!(out_properties.classes().len(), 1);
assert_eq!(out_properties.classes()[0].len(), 4);
+ assert!(out_properties.classes()[0].contains(&Column::new("a1", 0)));
+ assert!(out_properties.classes()[0].contains(&Column::new("a2", 1)));
+ assert!(out_properties.classes()[0].contains(&Column::new("a3", 2)));
+ assert!(out_properties.classes()[0].contains(&Column::new("a4", 3)));
Ok(())
}
diff --git a/datafusion/physical-expr/src/physical_expr.rs b/datafusion/physical-expr/src/physical_expr.rs
index 0d238582d..64482ac03 100644
--- a/datafusion/physical-expr/src/physical_expr.rs
+++ b/datafusion/physical-expr/src/physical_expr.rs
@@ -193,14 +193,15 @@ pub fn with_new_children_if_necessary(
expr: Arc<dyn PhysicalExpr>,
children: Vec<Arc<dyn PhysicalExpr>>,
) -> Result<Arc<dyn PhysicalExpr>> {
- if children.len() != expr.children().len() {
+ let old_children = expr.children();
+ if children.len() != old_children.len() {
Err(DataFusionError::Internal(
"PhysicalExpr: Wrong number of children".to_string(),
))
} else if children.is_empty()
|| children
.iter()
- .zip(expr.children().iter())
+ .zip(old_children.iter())
.any(|(c1, c2)| !Arc::ptr_eq(c1, c2))
{
expr.with_new_children(children)
diff --git a/datafusion/physical-expr/src/rewrite.rs b/datafusion/physical-expr/src/rewrite.rs
index ccd9210df..487d08455 100644
--- a/datafusion/physical-expr/src/rewrite.rs
+++ b/datafusion/physical-expr/src/rewrite.rs
@@ -152,9 +152,10 @@ impl TreeNodeRewritable for Arc<dyn PhysicalExpr> {
where
F: FnMut(Self) -> Result<Self>,
{
- if !self.children().is_empty() {
+ let children = self.children();
+ if !children.is_empty() {
let new_children: Result<Vec<_>> =
- self.children().into_iter().map(transform).collect();
+ children.into_iter().map(transform).collect();
with_new_children_if_necessary(self, new_children?)
} else {
Ok(self)