You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "mustafasrepo (via GitHub)" <gi...@apache.org> on 2023/11/28 10:25:05 UTC

[PR] Optimize Projections during Logical Plan [arrow-datafusion]

mustafasrepo opened a new pull request, #8340:
URL: https://github.com/apache/arrow-datafusion/pull/8340

   ## Which issue does this PR close?
   
   <!--
   We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123.
   -->
   
   Closes [8296](https://github.com/apache/arrow-datafusion/issues/8296).
   
   ## Rationale for this change
   
   <!--
    Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed.
    Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes.  
   -->
   
   ## What changes are included in this PR?
   
   This PR removes unnecessary `Columns(Fields)` from the `LogicalPlan`, when it is beneficial, by adding a projection that removes unnecessary columns from the plan. 
   
   This feature is accomplished by `optimize_projections` `LogicalPlan` rule. 
   New `OptimizeProjections` rule covers the functionality of `MergeProjections `, `PushDownProjection` and `EliminateProjection`. Hence as part of this PR, these rules are removed also.
   
   With this PR 
   - we will produce `LogicalPlan`s that contain only absolutely necessary fields by subsequent operators (when doing so is desirable, or beneficial e.g most of the case)
   - Consecutive projections are not always merged. If consecutive projection is used to cache complex intermediate result by subsequent projection, we say that consecutive projection is beneficial.
   
   Consider 
   
   ``` sql
   Projection: intermediate, intermediate+1
     Projection: t.a/2 as intermediate
       TableScan: t projection=[a]
   ```
   
   will not be merged to 
   
   ``` sql
   Projection: t.a/2, t.a/2+1
     TableScan: t projection=[a]
   ```
   
   The reason is that, in the first plan, computation `t.a/2` is calculated once, and used by subsequent projection twice.
   However, in the second plan computation `t.a/2` calculated twice. (This feature solves the problem in the issue.)
   <!--
   There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR.
   -->
   
   ## Are these changes tested?
   Yes
   <!--
   We typically require tests for all PRs in order to:
   1. Prevent the code from being accidentally broken by subsequent changes
   2. Serve as another way to document the expected behavior of the code
   
   If tests are not included in your PR, please explain why (for example, are they covered by existing tests)?
   -->
   
   ## Are there any user-facing changes?
   
   <!--
   If there are user-facing changes then we may require documentation to be updated before approving the PR.
   -->
   
   <!--
   If there are any breaking changes to public APIs, please add the `api change` label.
   -->


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Optimize Projections during Logical Plan [arrow-datafusion]

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on code in PR #8340:
URL: https://github.com/apache/arrow-datafusion/pull/8340#discussion_r1408281202


##########
datafusion/optimizer/src/merge_projection.rs:
##########
@@ -15,105 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::collections::HashMap;
-
-use crate::optimizer::ApplyOrder;
-use crate::push_down_filter::replace_cols_by_name;
-use crate::{OptimizerConfig, OptimizerRule};
-
-use datafusion_common::Result;
-use datafusion_expr::{Expr, LogicalPlan, Projection};
-
-/// Optimization rule that merge [LogicalPlan::Projection].
-#[derive(Default)]
-pub struct MergeProjection;
-
-impl MergeProjection {
-    #[allow(missing_docs)]
-    pub fn new() -> Self {
-        Self {}
-    }
-}
-
-impl OptimizerRule for MergeProjection {
-    fn try_optimize(
-        &self,
-        plan: &LogicalPlan,
-        _config: &dyn OptimizerConfig,
-    ) -> Result<Option<LogicalPlan>> {
-        match plan {
-            LogicalPlan::Projection(parent_projection) => {
-                match parent_projection.input.as_ref() {
-                    LogicalPlan::Projection(child_projection) => {
-                        let new_plan =
-                            merge_projection(parent_projection, child_projection)?;
-                        Ok(Some(
-                            self.try_optimize(&new_plan, _config)?.unwrap_or(new_plan),
-                        ))
-                    }
-                    _ => Ok(None),
-                }
-            }
-            _ => Ok(None),
-        }
-    }
-
-    fn name(&self) -> &str {
-        "merge_projection"
-    }
-
-    fn apply_order(&self) -> Option<ApplyOrder> {
-        Some(ApplyOrder::TopDown)
-    }
-}
-
-pub(super) fn merge_projection(
-    parent_projection: &Projection,
-    child_projection: &Projection,
-) -> Result<LogicalPlan> {
-    let replace_map = collect_projection_expr(child_projection);
-    let new_exprs = parent_projection
-        .expr
-        .iter()
-        .map(|expr| replace_cols_by_name(expr.clone(), &replace_map))
-        .enumerate()
-        .map(|(i, e)| match e {
-            Ok(e) => {
-                let parent_expr = parent_projection.schema.fields()[i].qualified_name();
-                e.alias_if_changed(parent_expr)
-            }
-            Err(e) => Err(e),
-        })
-        .collect::<Result<Vec<_>>>()?;
-    // Use try_new, since schema changes with changing expressions.
-    let new_plan = LogicalPlan::Projection(Projection::try_new(
-        new_exprs,
-        child_projection.input.clone(),
-    )?);
-    Ok(new_plan)
-}
-
-pub fn collect_projection_expr(projection: &Projection) -> HashMap<String, Expr> {
-    projection
-        .schema
-        .fields()
-        .iter()
-        .enumerate()
-        .flat_map(|(i, field)| {
-            // strip alias
-            let expr = projection.expr[i].clone().unalias();
-            // Convert both qualified and unqualified fields
-            [
-                (field.name().clone(), expr.clone()),
-                (field.qualified_name(), expr),
-            ]
-        })
-        .collect::<HashMap<_, _>>()
-}
-
 #[cfg(test)]
 mod tests {

Review Comment:
   I plan to do so in next PR. I didn't want to increase diff because of test movement as this is PR already big.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Optimize Projections during Logical Plan [arrow-datafusion]

Posted by "Dandandan (via GitHub)" <gi...@apache.org>.
Dandandan commented on code in PR #8340:
URL: https://github.com/apache/arrow-datafusion/pull/8340#discussion_r1407626300


##########
datafusion/sqllogictest/test_files/window.slt:
##########
@@ -1731,26 +1731,28 @@ logical_plan
 Projection: COUNT(*) AS global_count
 --Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1)) AS COUNT(*)]]
 ----SubqueryAlias: a
-------Sort: aggregate_test_100.c1 ASC NULLS LAST
---------Aggregate: groupBy=[[aggregate_test_100.c1]], aggr=[[]]
-----------Projection: aggregate_test_100.c1
-------------Filter: aggregate_test_100.c13 != Utf8("C2GT5KVyOPZpgKVl110TyZO0NcJ434")
---------------TableScan: aggregate_test_100 projection=[c1, c13], partial_filters=[aggregate_test_100.c13 != Utf8("C2GT5KVyOPZpgKVl110TyZO0NcJ434")]
+------Projection:

Review Comment:
   Plans seem to be all slightly better :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Optimize Projections during Logical Plan [arrow-datafusion]

Posted by "Dandandan (via GitHub)" <gi...@apache.org>.
Dandandan commented on code in PR #8340:
URL: https://github.com/apache/arrow-datafusion/pull/8340#discussion_r1407629172


##########
datafusion/sqllogictest/test_files/subquery.slt:
##########
@@ -437,7 +437,7 @@ Projection: t1.t1_id, (<subquery>) AS t2_int
 ------Projection: t2.t2_int
 --------Filter: t2.t2_int = outer_ref(t1.t1_int)
 ----------TableScan: t2
---TableScan: t1 projection=[t1_id]
+--TableScan: t1 projection=[t1_id, t1_int]

Review Comment:
   Ok, thanks for clarifying :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Optimize Projections during Logical Plan [arrow-datafusion]

Posted by "jonahgao (via GitHub)" <gi...@apache.org>.
jonahgao commented on code in PR #8340:
URL: https://github.com/apache/arrow-datafusion/pull/8340#discussion_r1408007822


##########
datafusion/optimizer/tests/optimizer_integration.rs:
##########
@@ -198,8 +199,9 @@ fn between_date64_plus_interval() -> Result<()> {
     let plan = test_sql(sql)?;
     let expected =
         "Aggregate: groupBy=[[]], aggr=[[COUNT(Int64(1))]]\
-        \n  Filter: test.col_date64 >= Date64(\"890179200000\") AND test.col_date64 <= Date64(\"897955200000\")\
-        \n    TableScan: test projection=[col_date64]";
+        \n  Projection: \

Review Comment:
   It seems there is an additional empty projection here compared to before.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Optimize Projections during Logical Plan [arrow-datafusion]

Posted by "jonahgao (via GitHub)" <gi...@apache.org>.
jonahgao commented on code in PR #8340:
URL: https://github.com/apache/arrow-datafusion/pull/8340#discussion_r1407992416


##########
datafusion/optimizer/src/merge_projection.rs:
##########
@@ -15,105 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::collections::HashMap;
-
-use crate::optimizer::ApplyOrder;
-use crate::push_down_filter::replace_cols_by_name;
-use crate::{OptimizerConfig, OptimizerRule};
-
-use datafusion_common::Result;
-use datafusion_expr::{Expr, LogicalPlan, Projection};
-
-/// Optimization rule that merge [LogicalPlan::Projection].
-#[derive(Default)]
-pub struct MergeProjection;
-
-impl MergeProjection {
-    #[allow(missing_docs)]
-    pub fn new() -> Self {
-        Self {}
-    }
-}
-
-impl OptimizerRule for MergeProjection {
-    fn try_optimize(
-        &self,
-        plan: &LogicalPlan,
-        _config: &dyn OptimizerConfig,
-    ) -> Result<Option<LogicalPlan>> {
-        match plan {
-            LogicalPlan::Projection(parent_projection) => {
-                match parent_projection.input.as_ref() {
-                    LogicalPlan::Projection(child_projection) => {
-                        let new_plan =
-                            merge_projection(parent_projection, child_projection)?;
-                        Ok(Some(
-                            self.try_optimize(&new_plan, _config)?.unwrap_or(new_plan),
-                        ))
-                    }
-                    _ => Ok(None),
-                }
-            }
-            _ => Ok(None),
-        }
-    }
-
-    fn name(&self) -> &str {
-        "merge_projection"
-    }
-
-    fn apply_order(&self) -> Option<ApplyOrder> {
-        Some(ApplyOrder::TopDown)
-    }
-}
-
-pub(super) fn merge_projection(
-    parent_projection: &Projection,
-    child_projection: &Projection,
-) -> Result<LogicalPlan> {
-    let replace_map = collect_projection_expr(child_projection);
-    let new_exprs = parent_projection
-        .expr
-        .iter()
-        .map(|expr| replace_cols_by_name(expr.clone(), &replace_map))
-        .enumerate()
-        .map(|(i, e)| match e {
-            Ok(e) => {
-                let parent_expr = parent_projection.schema.fields()[i].qualified_name();
-                e.alias_if_changed(parent_expr)
-            }
-            Err(e) => Err(e),
-        })
-        .collect::<Result<Vec<_>>>()?;
-    // Use try_new, since schema changes with changing expressions.
-    let new_plan = LogicalPlan::Projection(Projection::try_new(
-        new_exprs,
-        child_projection.input.clone(),
-    )?);
-    Ok(new_plan)
-}
-
-pub fn collect_projection_expr(projection: &Projection) -> HashMap<String, Expr> {
-    projection
-        .schema
-        .fields()
-        .iter()
-        .enumerate()
-        .flat_map(|(i, field)| {
-            // strip alias
-            let expr = projection.expr[i].clone().unalias();
-            // Convert both qualified and unqualified fields
-            [
-                (field.name().clone(), expr.clone()),
-                (field.qualified_name(), expr),
-            ]
-        })
-        .collect::<HashMap<_, _>>()
-}
-
 #[cfg(test)]
 mod tests {

Review Comment:
   Could we move these tests into `optimize_projection.rs` 🤔, so that we can delete the `merge_projection.rs` file?
   



##########
datafusion/optimizer/src/merge_projection.rs:
##########
@@ -15,105 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::collections::HashMap;
-
-use crate::optimizer::ApplyOrder;
-use crate::push_down_filter::replace_cols_by_name;
-use crate::{OptimizerConfig, OptimizerRule};
-
-use datafusion_common::Result;
-use datafusion_expr::{Expr, LogicalPlan, Projection};
-
-/// Optimization rule that merge [LogicalPlan::Projection].
-#[derive(Default)]
-pub struct MergeProjection;
-
-impl MergeProjection {
-    #[allow(missing_docs)]
-    pub fn new() -> Self {
-        Self {}
-    }
-}
-
-impl OptimizerRule for MergeProjection {
-    fn try_optimize(
-        &self,
-        plan: &LogicalPlan,
-        _config: &dyn OptimizerConfig,
-    ) -> Result<Option<LogicalPlan>> {
-        match plan {
-            LogicalPlan::Projection(parent_projection) => {
-                match parent_projection.input.as_ref() {
-                    LogicalPlan::Projection(child_projection) => {
-                        let new_plan =
-                            merge_projection(parent_projection, child_projection)?;
-                        Ok(Some(
-                            self.try_optimize(&new_plan, _config)?.unwrap_or(new_plan),
-                        ))
-                    }
-                    _ => Ok(None),
-                }
-            }
-            _ => Ok(None),
-        }
-    }
-
-    fn name(&self) -> &str {
-        "merge_projection"
-    }
-
-    fn apply_order(&self) -> Option<ApplyOrder> {
-        Some(ApplyOrder::TopDown)
-    }
-}
-
-pub(super) fn merge_projection(
-    parent_projection: &Projection,
-    child_projection: &Projection,
-) -> Result<LogicalPlan> {
-    let replace_map = collect_projection_expr(child_projection);
-    let new_exprs = parent_projection
-        .expr
-        .iter()
-        .map(|expr| replace_cols_by_name(expr.clone(), &replace_map))
-        .enumerate()
-        .map(|(i, e)| match e {
-            Ok(e) => {
-                let parent_expr = parent_projection.schema.fields()[i].qualified_name();
-                e.alias_if_changed(parent_expr)
-            }
-            Err(e) => Err(e),
-        })
-        .collect::<Result<Vec<_>>>()?;
-    // Use try_new, since schema changes with changing expressions.
-    let new_plan = LogicalPlan::Projection(Projection::try_new(
-        new_exprs,
-        child_projection.input.clone(),
-    )?);
-    Ok(new_plan)
-}
-
-pub fn collect_projection_expr(projection: &Projection) -> HashMap<String, Expr> {
-    projection
-        .schema
-        .fields()
-        .iter()
-        .enumerate()
-        .flat_map(|(i, field)| {
-            // strip alias
-            let expr = projection.expr[i].clone().unalias();
-            // Convert both qualified and unqualified fields
-            [
-                (field.name().clone(), expr.clone()),
-                (field.qualified_name(), expr),
-            ]
-        })
-        .collect::<HashMap<_, _>>()
-}
-
 #[cfg(test)]
 mod tests {

Review Comment:
   Could we move these tests into `optimize_projection.rs` 🤔, so that we can delete the `merge_projection.rs` file?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Optimize Projections during Logical Plan [arrow-datafusion]

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on code in PR #8340:
URL: https://github.com/apache/arrow-datafusion/pull/8340#discussion_r1407628350


##########
datafusion/sqllogictest/test_files/subquery.slt:
##########
@@ -437,7 +437,7 @@ Projection: t1.t1_id, (<subquery>) AS t2_int
 ------Projection: t2.t2_int
 --------Filter: t2.t2_int = outer_ref(t1.t1_int)
 ----------TableScan: t2
---TableScan: t1 projection=[t1_id]
+--TableScan: t1 projection=[t1_id, t1_int]

Review Comment:
   As far as I know, we cannot generate physical plans for these queries. I think it was a bug, however since it couldn't be tested with data (there is no physical plan generated), it was silent.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Optimize Projections during Logical Plan [arrow-datafusion]

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo merged PR #8340:
URL: https://github.com/apache/arrow-datafusion/pull/8340


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Optimize Projections during Logical Plan [arrow-datafusion]

Posted by "Dandandan (via GitHub)" <gi...@apache.org>.
Dandandan commented on code in PR #8340:
URL: https://github.com/apache/arrow-datafusion/pull/8340#discussion_r1408080879


##########
datafusion/optimizer/tests/optimizer_integration.rs:
##########
@@ -198,8 +199,9 @@ fn between_date64_plus_interval() -> Result<()> {
     let plan = test_sql(sql)?;
     let expected =
         "Aggregate: groupBy=[[]], aggr=[[COUNT(Int64(1))]]\
-        \n  Filter: test.col_date64 >= Date64(\"890179200000\") AND test.col_date64 <= Date64(\"897955200000\")\
-        \n    TableScan: test projection=[col_date64]";
+        \n  Projection: \

Review Comment:
   Yes, but this (empty) projection is fine, as the aggregate doesn't need any columns.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Optimize Projections during Logical Plan [arrow-datafusion]

Posted by "Dandandan (via GitHub)" <gi...@apache.org>.
Dandandan commented on code in PR #8340:
URL: https://github.com/apache/arrow-datafusion/pull/8340#discussion_r1407623548


##########
datafusion/sqllogictest/test_files/subquery.slt:
##########
@@ -437,7 +437,7 @@ Projection: t1.t1_id, (<subquery>) AS t2_int
 ------Projection: t2.t2_int
 --------Filter: t2.t2_int = outer_ref(t1.t1_int)
 ----------TableScan: t2
---TableScan: t1 projection=[t1_id]
+--TableScan: t1 projection=[t1_id, t1_int]

Review Comment:
   How did this work before (was this a bug)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org