You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/06/21 18:36:01 UTC
[arrow-datafusion] branch main updated: refactor: extract merge_projection common function. (#6735)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 37154286e9 refactor: extract merge_projection common function. (#6735)
37154286e9 is described below
commit 37154286e9941f156642235369d83e3a08304e6c
Author: jakevin <ja...@gmail.com>
AuthorDate: Thu Jun 22 02:35:55 2023 +0800
refactor: extract merge_projection common function. (#6735)
---
datafusion/optimizer/src/merge_projection.rs | 48 +++++++++++++-----------
datafusion/optimizer/src/push_down_projection.rs | 24 +-----------
2 files changed, 29 insertions(+), 43 deletions(-)
diff --git a/datafusion/optimizer/src/merge_projection.rs b/datafusion/optimizer/src/merge_projection.rs
index 1426aed146..408055b8e7 100644
--- a/datafusion/optimizer/src/merge_projection.rs
+++ b/datafusion/optimizer/src/merge_projection.rs
@@ -44,28 +44,8 @@ impl OptimizerRule for MergeProjection {
LogicalPlan::Projection(parent_projection) => {
match parent_projection.input.as_ref() {
LogicalPlan::Projection(child_projection) => {
- let replace_map = collect_projection_expr(child_projection);
- let new_exprs = parent_projection
- .expr
- .iter()
- .map(|expr| replace_cols_by_name(expr.clone(), &replace_map))
- .enumerate()
- .map(|(i, e)| match e {
- Ok(e) => {
- let parent_expr = parent_projection.schema.fields()
- [i]
- .qualified_name();
- e.alias_if_changed(parent_expr)
- }
- Err(e) => Err(e),
- })
- .collect::<Result<Vec<_>>>()?;
let new_plan =
- LogicalPlan::Projection(Projection::try_new_with_schema(
- new_exprs,
- child_projection.input.clone(),
- parent_projection.schema.clone(),
- )?);
+ merge_projection(parent_projection, child_projection)?;
Ok(Some(
self.try_optimize(&new_plan, _config)?.unwrap_or(new_plan),
))
@@ -86,6 +66,32 @@ impl OptimizerRule for MergeProjection {
}
}
+pub(super) fn merge_projection(
+ parent_projection: &Projection,
+ child_projection: &Projection,
+) -> Result<LogicalPlan> {
+ let replace_map = collect_projection_expr(child_projection);
+ let new_exprs = parent_projection
+ .expr
+ .iter()
+ .map(|expr| replace_cols_by_name(expr.clone(), &replace_map))
+ .enumerate()
+ .map(|(i, e)| match e {
+ Ok(e) => {
+ let parent_expr = parent_projection.schema.fields()[i].qualified_name();
+ e.alias_if_changed(parent_expr)
+ }
+ Err(e) => Err(e),
+ })
+ .collect::<Result<Vec<_>>>()?;
+ let new_plan = LogicalPlan::Projection(Projection::try_new_with_schema(
+ new_exprs,
+ child_projection.input.clone(),
+ parent_projection.schema.clone(),
+ )?);
+ Ok(new_plan)
+}
+
pub fn collect_projection_expr(projection: &Projection) -> HashMap<String, Expr> {
projection
.schema
diff --git a/datafusion/optimizer/src/push_down_projection.rs b/datafusion/optimizer/src/push_down_projection.rs
index 16ddb9b41f..4773a944f4 100644
--- a/datafusion/optimizer/src/push_down_projection.rs
+++ b/datafusion/optimizer/src/push_down_projection.rs
@@ -19,6 +19,7 @@
//! loaded into memory
use crate::eliminate_project::can_eliminate;
+use crate::merge_projection::merge_projection;
use crate::optimizer::ApplyOrder;
use crate::push_down_filter::replace_cols_by_name;
use crate::{OptimizerConfig, OptimizerRule};
@@ -91,28 +92,7 @@ impl OptimizerRule for PushDownProjection {
let new_plan = match child_plan {
LogicalPlan::Projection(child_projection) => {
- // merge projection
- let replace_map = collect_projection_expr(child_projection);
- let new_exprs = projection
- .expr
- .iter()
- .map(|expr| replace_cols_by_name(expr.clone(), &replace_map))
- .enumerate()
- .map(|(i, e)| match e {
- Ok(e) => {
- let parent_expr =
- projection.schema.fields()[i].qualified_name();
- e.alias_if_changed(parent_expr)
- }
- Err(e) => Err(e),
- })
- .collect::<Result<Vec<_>>>()?;
- let new_plan = LogicalPlan::Projection(Projection::try_new_with_schema(
- new_exprs,
- child_projection.input.clone(),
- projection.schema.clone(),
- )?);
-
+ let new_plan = merge_projection(projection, child_projection)?;
self.try_optimize(&new_plan, _config)?.unwrap_or(new_plan)
}
LogicalPlan::Join(join) => {