You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/08/02 12:23:17 UTC

[GitHub] [arrow] jorgecarleitao opened a new pull request #7880: ARROW-9619: [Rust] [DataFusion] Add predicate push-down

jorgecarleitao opened a new pull request #7880:
URL: https://github.com/apache/arrow/pull/7880


   This PR adds a new optimizer to push filters down. For example, a plan of the form 
   
   ```
   Selection: #SUM(c) Gt Int64(10)\
     Selection: #b Gt Int64(10)\
       Aggregate: groupBy=[[#b]], aggr=[[SUM(#c)]]\
         Projection: #a AS b, #c\
           TableScan: test projection=None"
   ```
   
   is converted to 
   
   ```
   Selection: #SUM(c) Gt Int64(10)\
     Aggregate: groupBy=[[#b]], aggr=[[SUM(#c)]]\
       Projection: #a AS b, #c\
         Selection: #a Gt Int64(10)\
           TableScan: test projection=None";
   ```
   
   (note how the filter expression changed, and how only the filter on the key of the aggregate was pushed)
   
   This works by performing two passes on the plan. On the first pass (analyze), it identifies:
   
   1. all filters are on the plan (selections)
   2. all projections are on the plan (projections)
   3. all places where a filter on a column cannot be pushed down (break_points)
   
   After this pass, it computes the maximum depth that a filter can be pushed down as well as the new expression that the filter should have, given all the projections that exist.
   
   On the second pass (optimize), it:
   
   * removes all old filters
   * adds all new filters
   
   See comments on the code for details.
   
   This PR is built on top of #7879 (first two commits).
   
   FYI @andygrove @sunchao 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on a change in pull request #7880: ARROW-9619: [Rust] [DataFusion] Add predicate push-down

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on a change in pull request #7880:
URL: https://github.com/apache/arrow/pull/7880#discussion_r471004814



##########
File path: rust/datafusion/src/optimizer/filter_push_down.rs
##########
@@ -0,0 +1,505 @@
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Filter Push Down optimizer rule ensures that filters are applied as early as possible in the plan
+
+use crate::error::Result;
+use crate::logicalplan::Expr;
+use crate::logicalplan::LogicalPlan;
+use crate::optimizer::optimizer::OptimizerRule;
+use crate::optimizer::utils;
+use std::collections::{HashMap, HashSet};
+
+/// Filter Push Down optimizer rule pushes filter clauses down the plan
+///
+/// This optimization looks for the maximum depth of each column in the plan where a filter can be applied and
+/// re-writes the plan with filters on those locations.
+/// It performs two passes on the plan:
+/// 1. identify filters, which columns they use, and projections along the path
+/// 2. move filters down, re-writing the expressions using the projections
+/*
+A filter-commutative operation is a operation whose result of filter(op(data)) = op(filter(data)).
+An example of a filter-commutative operation is a projection; a counter-example is `limit`.
+
+The filter-commutative property is column-specific. An aggregate grouped by A on SUM(B)
+can commute with a filter that depends on A only, but does not commute with a filter that depends
+on SUM(B).
+
+A location in this module is identified by a number, depth, which is 0 for the last operation
+and highest for the first operation (tipically a scan).
+
+This optimizer commutes filters with filter-commutative operations to push the filters
+to the maximum possible depth, consequently re-writing the filter expressions by every
+projection that changes the filter's expression.
+
+    Selection: #b Gt Int64(10)
+        Projection: #a AS b
+
+is optimized to
+
+    Projection: #a AS b
+        Selection: #a Gt Int64(10)  <--- changed from #b to #a
+
+To perform such optimization, we first analyze the plan to identify three items:
+
+1. Where are the filters located in the plan
+2. Where are non-commutable operations' columns located in the plan (break_points)
+3. Where are projections located in the plan
+
+With this information, we re-write the plan by:
+
+1. Computing the maximum possible depth of each column
+2. Computing the maximum possible depth of each filter expression based on the columns it depends on
+3. re-write the filter expression for every projection that it commutes with from its original depth to its max possible depth
+*/
+pub struct FilterPushDown {}
+
+impl OptimizerRule for FilterPushDown {
+    fn optimize(&mut self, plan: &LogicalPlan) -> Result<LogicalPlan> {
+        let (break_points, selections, projections) = analyze_plan(plan, 0)?;
+
+        // compute max depth for each of the columns
+        let mut breaks: HashMap<String, usize> = HashMap::new();

Review comment:
       Good catch. Yes, you are right. Leftovers from a previous iteration :/




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] houqp commented on pull request #7880: ARROW-9619: [Rust] [DataFusion] Add predicate push-down

Posted by GitBox <gi...@apache.org>.
houqp commented on pull request #7880:
URL: https://github.com/apache/arrow/pull/7880#issuecomment-675259193


   One thing I do like about spark's optimizer is all optimization rules share a common plan tree traversal and mutation routine, which made individual optimization rule easier to reason about. I can see us adopting the same pattern in the future to simplify the existing code base.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on a change in pull request #7880: ARROW-9619: [Rust] [DataFusion] Add predicate push-down

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #7880:
URL: https://github.com/apache/arrow/pull/7880#discussion_r470753855



##########
File path: rust/datafusion/src/optimizer/type_coercion.rs
##########
@@ -43,138 +45,77 @@ impl<'a> TypeCoercionRule<'a> {
         Self { scalar_functions }
     }
 
-    /// Rewrite an expression list to include explicit CAST operations when required
-    fn rewrite_expr_list(&self, expr: &[Expr], schema: &Schema) -> Result<Vec<Expr>> {
-        Ok(expr
+    /// Rewrite an expression to include explicit CAST operations when required
+    fn rewrite_expr(&self, expr: &Expr, schema: &Schema) -> Result<Expr> {
+        let expressions = utils::expr_expressions(expr)?;
+
+        // recurse of the re-write
+        let mut expressions = expressions
             .iter()
             .map(|e| self.rewrite_expr(e, schema))
-            .collect::<Result<Vec<_>>>()?)
-    }
+            .collect::<Result<Vec<_>>>()?;
 
-    /// Rewrite an expression to include explicit CAST operations when required
-    fn rewrite_expr(&self, expr: &Expr, schema: &Schema) -> Result<Expr> {
+        // modify `expressions` by introducing casts when necessary
         match expr {
-            Expr::BinaryExpr { left, op, right } => {
-                let left = self.rewrite_expr(left, schema)?;
-                let right = self.rewrite_expr(right, schema)?;
-                let left_type = left.get_type(schema)?;
-                let right_type = right.get_type(schema)?;
-                if left_type == right_type {
-                    Ok(Expr::BinaryExpr {
-                        left: Box::new(left),
-                        op: op.clone(),
-                        right: Box::new(right),
-                    })
-                } else {
+            Expr::BinaryExpr { .. } => {
+                let left_type = expressions[0].get_type(schema)?;
+                let right_type = expressions[1].get_type(schema)?;
+                if left_type != right_type {
                     let super_type = utils::get_supertype(&left_type, &right_type)?;
-                    Ok(Expr::BinaryExpr {
-                        left: Box::new(left.cast_to(&super_type, schema)?),
-                        op: op.clone(),
-                        right: Box::new(right.cast_to(&super_type, schema)?),
-                    })
+
+                    expressions[0] = expressions[0].cast_to(&super_type, schema)?;
+                    expressions[1] = expressions[1].cast_to(&super_type, schema)?;
                 }
             }
-            Expr::IsNull(e) => Ok(Expr::IsNull(Box::new(self.rewrite_expr(e, schema)?))),
-            Expr::IsNotNull(e) => {
-                Ok(Expr::IsNotNull(Box::new(self.rewrite_expr(e, schema)?)))
-            }
-            Expr::ScalarFunction {
-                name,
-                args,
-                return_type,
-            } => {
+            Expr::ScalarFunction { name, .. } => {
                 // cast the inputs of scalar functions to the appropriate type where possible
                 match self.scalar_functions.get(name) {
                     Some(func_meta) => {
-                        let mut func_args = Vec::with_capacity(args.len());
-                        for i in 0..args.len() {
+                        for i in 0..expressions.len() {
                             let field = &func_meta.args[i];
-                            let expr = self.rewrite_expr(&args[i], schema)?;
-                            let actual_type = expr.get_type(schema)?;
+                            let actual_type = expressions[i].get_type(schema)?;
                             let required_type = field.data_type();
-                            if &actual_type == required_type {
-                                func_args.push(expr)
-                            } else {
+                            if &actual_type != required_type {
                                 let super_type =
                                     utils::get_supertype(&actual_type, required_type)?;
-                                func_args.push(expr.cast_to(&super_type, schema)?);
-                            }
+                                expressions[i] =
+                                    expressions[i].cast_to(&super_type, schema)?
+                            };
                         }
-
-                        Ok(Expr::ScalarFunction {
-                            name: name.clone(),
-                            args: func_args,
-                            return_type: return_type.clone(),
-                        })
                     }
-                    _ => Err(ExecutionError::General(format!(
-                        "Invalid scalar function {}",
-                        name
-                    ))),
+                    _ => {
+                        return Err(ExecutionError::General(format!(
+                            "Invalid scalar function {}",
+                            name
+                        )))
+                    }
                 }
             }
-            Expr::AggregateFunction {
-                name,
-                args,
-                return_type,
-            } => Ok(Expr::AggregateFunction {
-                name: name.clone(),
-                args: args
-                    .iter()
-                    .map(|a| self.rewrite_expr(a, schema))
-                    .collect::<Result<Vec<_>>>()?,
-                return_type: return_type.clone(),
-            }),
-            Expr::Cast { .. } => Ok(expr.clone()),
-            Expr::Column(_) => Ok(expr.clone()),
-            Expr::Alias(expr, alias) => Ok(Expr::Alias(
-                Box::new(self.rewrite_expr(expr, schema)?),
-                alias.to_owned(),
-            )),
-            Expr::Literal(_) => Ok(expr.clone()),
-            Expr::Not(_) => Ok(expr.clone()),
-            Expr::Sort { .. } => Ok(expr.clone()),
-            Expr::Wildcard { .. } => Err(ExecutionError::General(
-                "Wildcard expressions are not valid in a logical query plan".to_owned(),
-            )),
-            Expr::Nested(e) => self.rewrite_expr(e, schema),
-        }
+            _ => {}
+        };
+        utils::from_expression(expr, &expressions)
     }
 }
 
 impl<'a> OptimizerRule for TypeCoercionRule<'a> {
     fn optimize(&mut self, plan: &LogicalPlan) -> Result<LogicalPlan> {
-        match plan {
-            LogicalPlan::Projection { expr, input, .. } => {
-                LogicalPlanBuilder::from(&self.optimize(input)?)
-                    .project(self.rewrite_expr_list(expr, input.schema())?)?
-                    .build()
-            }
-            LogicalPlan::Selection { expr, input, .. } => {
-                LogicalPlanBuilder::from(&self.optimize(input)?)
-                    .filter(self.rewrite_expr(expr, input.schema())?)?
-                    .build()
-            }
-            LogicalPlan::Aggregate {
-                input,
-                group_expr,
-                aggr_expr,
-                ..
-            } => LogicalPlanBuilder::from(&self.optimize(input)?)
-                .aggregate(
-                    self.rewrite_expr_list(group_expr, input.schema())?,
-                    self.rewrite_expr_list(aggr_expr, input.schema())?,
-                )?
-                .build(),
-            LogicalPlan::TableScan { .. } => Ok(plan.clone()),
-            LogicalPlan::InMemoryScan { .. } => Ok(plan.clone()),
-            LogicalPlan::ParquetScan { .. } => Ok(plan.clone()),
-            LogicalPlan::CsvScan { .. } => Ok(plan.clone()),
-            LogicalPlan::EmptyRelation { .. } => Ok(plan.clone()),
-            LogicalPlan::Limit { .. } => Ok(plan.clone()),
-            LogicalPlan::Sort { .. } => Ok(plan.clone()),
-            LogicalPlan::CreateExternalTable { .. } => Ok(plan.clone()),
-        }
+        let inputs = utils::inputs(plan);
+        let expressions = utils::expressions(plan);
+
+        // apply the optimization to all inputs of the plan
+        let new_inputs = inputs
+            .iter()
+            .map(|plan| self.optimize(*plan))
+            .collect::<Result<Vec<_>>>()?;
+        // re-write all expressions on this plan.
+        // This assumes a single input, [0]. It wont work for join, subqueries and union operations with more than one input.
+        // It is currently not an issue as we do not have any plan with more than one input.
+        let new_expressions = expressions

Review comment:
       ```suggestion
           assert!(expressions.len() == 0 || inputs.len() > 0, "Assume that all plan nodes with expressions had inputs");
           let new_expressions = expressions
   ```
   
   I think the `EmptyRelation`,  https://github.com/apache/arrow/blob/master/rust/datafusion/src/logicalplan.rs#L761-L764, for example has no input LogicalPlan, but perhaps you are saying "even though `EmptyRelation` has no inputs (and thus could cause `inputs[0].schema()` to panic) it also has no Expressions then the potential panic'ing code won't be run. 
   
   I guess I was thinking to the  future where we add expressions to root nodes (e.g. perhaps filtering *during* a table scan or something) which would then have expressions but no input.
   
    I think this code is fine as is. Perhaps we could make the code slightly easier to work with in the future if we did something like the assert suggestion here that there were no inputs if there were expressions rather than panic. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on a change in pull request #7880: ARROW-9619: [Rust] [DataFusion] Add predicate push-down

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #7880:
URL: https://github.com/apache/arrow/pull/7880#discussion_r470850721



##########
File path: rust/datafusion/src/optimizer/filter_push_down.rs
##########
@@ -0,0 +1,505 @@
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Filter Push Down optimizer rule ensures that filters are applied as early as possible in the plan
+
+use crate::error::Result;
+use crate::logicalplan::Expr;
+use crate::logicalplan::LogicalPlan;
+use crate::optimizer::optimizer::OptimizerRule;
+use crate::optimizer::utils;
+use std::collections::{HashMap, HashSet};
+
+/// Filter Push Down optimizer rule pushes filter clauses down the plan
+///
+/// This optimization looks for the maximum depth of each column in the plan where a filter can be applied and
+/// re-writes the plan with filters on those locations.
+/// It performs two passes on the plan:
+/// 1. identify filters, which columns they use, and projections along the path
+/// 2. move filters down, re-writing the expressions using the projections
+/*
+A filter-commutative operation is a operation whose result of filter(op(data)) = op(filter(data)).
+An example of a filter-commutative operation is a projection; a counter-example is `limit`.
+
+The filter-commutative property is column-specific. An aggregate grouped by A on SUM(B)
+can commute with a filter that depends on A only, but does not commute with a filter that depends
+on SUM(B).
+
+A location in this module is identified by a number, depth, which is 0 for the last operation
+and highest for the first operation (tipically a scan).

Review comment:
       ```suggestion
   and highest for the first operation (typically a scan).
   ```

##########
File path: rust/datafusion/src/optimizer/filter_push_down.rs
##########
@@ -0,0 +1,505 @@
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Filter Push Down optimizer rule ensures that filters are applied as early as possible in the plan
+
+use crate::error::Result;
+use crate::logicalplan::Expr;
+use crate::logicalplan::LogicalPlan;
+use crate::optimizer::optimizer::OptimizerRule;
+use crate::optimizer::utils;
+use std::collections::{HashMap, HashSet};
+
+/// Filter Push Down optimizer rule pushes filter clauses down the plan
+///
+/// This optimization looks for the maximum depth of each column in the plan where a filter can be applied and
+/// re-writes the plan with filters on those locations.
+/// It performs two passes on the plan:
+/// 1. identify filters, which columns they use, and projections along the path
+/// 2. move filters down, re-writing the expressions using the projections
+/*
+A filter-commutative operation is a operation whose result of filter(op(data)) = op(filter(data)).

Review comment:
       ```suggestion
   A filter-commutative operation is an operation whose result of filter(op(data)) = op(filter(data)).
   ```

##########
File path: rust/datafusion/src/optimizer/filter_push_down.rs
##########
@@ -0,0 +1,505 @@
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Filter Push Down optimizer rule ensures that filters are applied as early as possible in the plan
+
+use crate::error::Result;
+use crate::logicalplan::Expr;
+use crate::logicalplan::LogicalPlan;
+use crate::optimizer::optimizer::OptimizerRule;
+use crate::optimizer::utils;
+use std::collections::{HashMap, HashSet};
+
+/// Filter Push Down optimizer rule pushes filter clauses down the plan
+///
+/// This optimization looks for the maximum depth of each column in the plan where a filter can be applied and
+/// re-writes the plan with filters on those locations.
+/// It performs two passes on the plan:
+/// 1. identify filters, which columns they use, and projections along the path
+/// 2. move filters down, re-writing the expressions using the projections
+/*
+A filter-commutative operation is a operation whose result of filter(op(data)) = op(filter(data)).
+An example of a filter-commutative operation is a projection; a counter-example is `limit`.
+
+The filter-commutative property is column-specific. An aggregate grouped by A on SUM(B)
+can commute with a filter that depends on A only, but does not commute with a filter that depends
+on SUM(B).
+
+A location in this module is identified by a number, depth, which is 0 for the last operation
+and highest for the first operation (tipically a scan).
+
+This optimizer commutes filters with filter-commutative operations to push the filters
+to the maximum possible depth, consequently re-writing the filter expressions by every
+projection that changes the filter's expression.
+
+    Selection: #b Gt Int64(10)
+        Projection: #a AS b
+
+is optimized to
+
+    Projection: #a AS b
+        Selection: #a Gt Int64(10)  <--- changed from #b to #a
+
+To perform such optimization, we first analyze the plan to identify three items:

Review comment:
       This comment / description helps a lot. Thank you @jorgecarleitao 

##########
File path: rust/datafusion/src/optimizer/filter_push_down.rs
##########
@@ -0,0 +1,505 @@
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Filter Push Down optimizer rule ensures that filters are applied as early as possible in the plan
+
+use crate::error::Result;
+use crate::logicalplan::Expr;
+use crate::logicalplan::LogicalPlan;
+use crate::optimizer::optimizer::OptimizerRule;
+use crate::optimizer::utils;
+use std::collections::{HashMap, HashSet};
+
+/// Filter Push Down optimizer rule pushes filter clauses down the plan
+///
+/// This optimization looks for the maximum depth of each column in the plan where a filter can be applied and
+/// re-writes the plan with filters on those locations.
+/// It performs two passes on the plan:
+/// 1. identify filters, which columns they use, and projections along the path
+/// 2. move filters down, re-writing the expressions using the projections
+/*
+A filter-commutative operation is a operation whose result of filter(op(data)) = op(filter(data)).
+An example of a filter-commutative operation is a projection; a counter-example is `limit`.
+
+The filter-commutative property is column-specific. An aggregate grouped by A on SUM(B)
+can commute with a filter that depends on A only, but does not commute with a filter that depends
+on SUM(B).
+
+A location in this module is identified by a number, depth, which is 0 for the last operation
+and highest for the first operation (tipically a scan).
+
+This optimizer commutes filters with filter-commutative operations to push the filters
+to the maximum possible depth, consequently re-writing the filter expressions by every
+projection that changes the filter's expression.
+
+    Selection: #b Gt Int64(10)
+        Projection: #a AS b
+
+is optimized to
+
+    Projection: #a AS b
+        Selection: #a Gt Int64(10)  <--- changed from #b to #a
+
+To perform such optimization, we first analyze the plan to identify three items:
+
+1. Where are the filters located in the plan
+2. Where are non-commutable operations' columns located in the plan (break_points)
+3. Where are projections located in the plan
+
+With this information, we re-write the plan by:
+
+1. Computing the maximum possible depth of each column
+2. Computing the maximum possible depth of each filter expression based on the columns it depends on
+3. re-write the filter expression for every projection that it commutes with from its original depth to its max possible depth
+*/
+pub struct FilterPushDown {}
+
+impl OptimizerRule for FilterPushDown {
+    fn optimize(&mut self, plan: &LogicalPlan) -> Result<LogicalPlan> {
+        let (break_points, selections, projections) = analyze_plan(plan, 0)?;
+
+        // compute max depth for each of the columns
+        let mut breaks: HashMap<String, usize> = HashMap::new();

Review comment:
       I don't understand why this is needed -- `break_points` is already keyed on a column name (and thus the `breaks.get(key)` should never match. Indeed when I add an `assert!(false)` to this clause locally, all the tests still pass just fine. I may be missing something obvious
   
   ```
   diff --git a/rust/datafusion/src/optimizer/filter_push_down.rs b/rust/datafusion/src/optimizer/filter_push_down.rs
   index f6f36e4df..f2e306e0b 100644
   --- a/rust/datafusion/src/optimizer/filter_push_down.rs
   +++ b/rust/datafusion/src/optimizer/filter_push_down.rs
   @@ -74,6 +74,7 @@ impl OptimizerRule for FilterPushDown {
            for (key, depth) in break_points {
                match breaks.get(&key) {
                    Some(current_depth) => {
   +                    assert!(false);
                        if depth > *current_depth {
                            breaks.insert(key, depth);
                        }
   alamb@MacBook-Pro arrow % 
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on a change in pull request #7880: ARROW-9619: [Rust] [DataFusion] Add predicate push-down

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on a change in pull request #7880:
URL: https://github.com/apache/arrow/pull/7880#discussion_r470414491



##########
File path: rust/datafusion/src/optimizer/type_coercion.rs
##########
@@ -43,138 +45,77 @@ impl<'a> TypeCoercionRule<'a> {
         Self { scalar_functions }
     }
 
-    /// Rewrite an expression list to include explicit CAST operations when required
-    fn rewrite_expr_list(&self, expr: &[Expr], schema: &Schema) -> Result<Vec<Expr>> {
-        Ok(expr
+    /// Rewrite an expression to include explicit CAST operations when required
+    fn rewrite_expr(&self, expr: &Expr, schema: &Schema) -> Result<Expr> {
+        let expressions = utils::expr_expressions(expr)?;
+
+        // recurse of the re-write
+        let mut expressions = expressions
             .iter()
             .map(|e| self.rewrite_expr(e, schema))
-            .collect::<Result<Vec<_>>>()?)
-    }
+            .collect::<Result<Vec<_>>>()?;
 
-    /// Rewrite an expression to include explicit CAST operations when required
-    fn rewrite_expr(&self, expr: &Expr, schema: &Schema) -> Result<Expr> {
+        // modify `expressions` by introducing casts when necessary
         match expr {
-            Expr::BinaryExpr { left, op, right } => {
-                let left = self.rewrite_expr(left, schema)?;
-                let right = self.rewrite_expr(right, schema)?;
-                let left_type = left.get_type(schema)?;
-                let right_type = right.get_type(schema)?;
-                if left_type == right_type {
-                    Ok(Expr::BinaryExpr {
-                        left: Box::new(left),
-                        op: op.clone(),
-                        right: Box::new(right),
-                    })
-                } else {
+            Expr::BinaryExpr { .. } => {
+                let left_type = expressions[0].get_type(schema)?;
+                let right_type = expressions[1].get_type(schema)?;
+                if left_type != right_type {
                     let super_type = utils::get_supertype(&left_type, &right_type)?;
-                    Ok(Expr::BinaryExpr {
-                        left: Box::new(left.cast_to(&super_type, schema)?),
-                        op: op.clone(),
-                        right: Box::new(right.cast_to(&super_type, schema)?),
-                    })
+
+                    expressions[0] = expressions[0].cast_to(&super_type, schema)?;
+                    expressions[1] = expressions[1].cast_to(&super_type, schema)?;
                 }
             }
-            Expr::IsNull(e) => Ok(Expr::IsNull(Box::new(self.rewrite_expr(e, schema)?))),
-            Expr::IsNotNull(e) => {
-                Ok(Expr::IsNotNull(Box::new(self.rewrite_expr(e, schema)?)))
-            }
-            Expr::ScalarFunction {
-                name,
-                args,
-                return_type,
-            } => {
+            Expr::ScalarFunction { name, .. } => {
                 // cast the inputs of scalar functions to the appropriate type where possible
                 match self.scalar_functions.get(name) {
                     Some(func_meta) => {
-                        let mut func_args = Vec::with_capacity(args.len());
-                        for i in 0..args.len() {
+                        for i in 0..expressions.len() {
                             let field = &func_meta.args[i];
-                            let expr = self.rewrite_expr(&args[i], schema)?;
-                            let actual_type = expr.get_type(schema)?;
+                            let actual_type = expressions[i].get_type(schema)?;
                             let required_type = field.data_type();
-                            if &actual_type == required_type {
-                                func_args.push(expr)
-                            } else {
+                            if &actual_type != required_type {
                                 let super_type =
                                     utils::get_supertype(&actual_type, required_type)?;
-                                func_args.push(expr.cast_to(&super_type, schema)?);
-                            }
+                                expressions[i] =
+                                    expressions[i].cast_to(&super_type, schema)?
+                            };
                         }
-
-                        Ok(Expr::ScalarFunction {
-                            name: name.clone(),
-                            args: func_args,
-                            return_type: return_type.clone(),
-                        })
                     }
-                    _ => Err(ExecutionError::General(format!(
-                        "Invalid scalar function {}",
-                        name
-                    ))),
+                    _ => {
+                        return Err(ExecutionError::General(format!(
+                            "Invalid scalar function {}",
+                            name
+                        )))
+                    }
                 }
             }
-            Expr::AggregateFunction {
-                name,
-                args,
-                return_type,
-            } => Ok(Expr::AggregateFunction {
-                name: name.clone(),
-                args: args
-                    .iter()
-                    .map(|a| self.rewrite_expr(a, schema))
-                    .collect::<Result<Vec<_>>>()?,
-                return_type: return_type.clone(),
-            }),
-            Expr::Cast { .. } => Ok(expr.clone()),
-            Expr::Column(_) => Ok(expr.clone()),
-            Expr::Alias(expr, alias) => Ok(Expr::Alias(
-                Box::new(self.rewrite_expr(expr, schema)?),
-                alias.to_owned(),
-            )),
-            Expr::Literal(_) => Ok(expr.clone()),
-            Expr::Not(_) => Ok(expr.clone()),
-            Expr::Sort { .. } => Ok(expr.clone()),
-            Expr::Wildcard { .. } => Err(ExecutionError::General(
-                "Wildcard expressions are not valid in a logical query plan".to_owned(),
-            )),
-            Expr::Nested(e) => self.rewrite_expr(e, schema),
-        }
+            _ => {}
+        };
+        utils::from_expression(expr, &expressions)
     }
 }
 
 impl<'a> OptimizerRule for TypeCoercionRule<'a> {
     fn optimize(&mut self, plan: &LogicalPlan) -> Result<LogicalPlan> {
-        match plan {
-            LogicalPlan::Projection { expr, input, .. } => {
-                LogicalPlanBuilder::from(&self.optimize(input)?)
-                    .project(self.rewrite_expr_list(expr, input.schema())?)?
-                    .build()
-            }
-            LogicalPlan::Selection { expr, input, .. } => {
-                LogicalPlanBuilder::from(&self.optimize(input)?)
-                    .filter(self.rewrite_expr(expr, input.schema())?)?
-                    .build()
-            }
-            LogicalPlan::Aggregate {
-                input,
-                group_expr,
-                aggr_expr,
-                ..
-            } => LogicalPlanBuilder::from(&self.optimize(input)?)
-                .aggregate(
-                    self.rewrite_expr_list(group_expr, input.schema())?,
-                    self.rewrite_expr_list(aggr_expr, input.schema())?,
-                )?
-                .build(),
-            LogicalPlan::TableScan { .. } => Ok(plan.clone()),
-            LogicalPlan::InMemoryScan { .. } => Ok(plan.clone()),
-            LogicalPlan::ParquetScan { .. } => Ok(plan.clone()),
-            LogicalPlan::CsvScan { .. } => Ok(plan.clone()),
-            LogicalPlan::EmptyRelation { .. } => Ok(plan.clone()),
-            LogicalPlan::Limit { .. } => Ok(plan.clone()),
-            LogicalPlan::Sort { .. } => Ok(plan.clone()),
-            LogicalPlan::CreateExternalTable { .. } => Ok(plan.clone()),
-        }
+        let inputs = utils::inputs(plan);
+        let expressions = utils::expressions(plan);
+
+        // apply the optimization to all inputs of the plan
+        let new_inputs = inputs
+            .iter()
+            .map(|plan| self.optimize(*plan))
+            .collect::<Result<Vec<_>>>()?;
+        // re-write all expressions on this plan.
+        // This assumes a single input, [0]. It wont work for join, subqueries and union operations with more than one input.
+        // It is currently not an issue as we do not have any plan with more than one input.
+        let new_expressions = expressions

Review comment:
       Good catch. No because I am unsure how that could be possible: if we have expressions on a plan, we need an `input` to convert them to physical expressions and evaluate them against. AFAIK an expression always requires an input to be evaluated against. 
   
   Do you have an example in mind?
   
   AFAIK even a literal expression requires a schema to pass to `Expr::get_type` and `Expr::name`.
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on a change in pull request #7880: ARROW-9619: [Rust] [DataFusion] Add predicate push-down

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #7880:
URL: https://github.com/apache/arrow/pull/7880#discussion_r471103102



##########
File path: rust/datafusion/src/optimizer/filter_push_down.rs
##########
@@ -0,0 +1,631 @@
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Filter Push Down optimizer rule ensures that filters are applied as early as possible in the plan
+
+use crate::error::Result;
+use crate::logicalplan::Expr;
+use crate::logicalplan::{and, LogicalPlan};
+use crate::optimizer::optimizer::OptimizerRule;
+use crate::optimizer::utils;
+use std::collections::{BTreeMap, HashMap, HashSet};
+
+/// Filter Push Down optimizer rule pushes filter clauses down the plan
+///
+/// This optimization looks for the maximum depth of each column in the plan where a filter can be applied and
+/// re-writes the plan with filters on those locations.
+/// It performs two passes on the plan:
+/// 1. identify filters, which columns they use, and projections along the path
+/// 2. move filters down, re-writing the expressions using the projections
+/*
+A filter-commutative operation is an operation whose result of filter(op(data)) = op(filter(data)).
+An example of a filter-commutative operation is a projection; a counter-example is `limit`.
+
+The filter-commutative property is column-specific. An aggregate grouped by A on SUM(B)
+can commute with a filter that depends on A only, but does not commute with a filter that depends
+on SUM(B).
+
+A location in this module is identified by a number, depth, which is 0 for the last operation
+and highest for the first operation (typically a scan).
+
+This optimizer commutes filters with filter-commutative operations to push the filters
+to the maximum possible depth, consequently re-writing the filter expressions by every
+projection that changes the filter's expression.
+
+    Selection: #b Gt Int64(10)
+        Projection: #a AS b
+
+is optimized to
+
+    Projection: #a AS b
+        Selection: #a Gt Int64(10)  <--- changed from #b to #a
+
+To perform such optimization, we first analyze the plan to identify three items:
+
+1. Where are the filters located in the plan
+2. Where are non-commutable operations' columns located in the plan (break_points)
+3. Where are projections located in the plan
+
+With this information, we re-write the plan by:
+
+1. Computing the maximum possible depth of each column between breakpoints
+2. Computing the maximum possible depth of each filter expression based on the columns it depends on
+3. re-write the filter expression for every projection that it commutes with from its original depth to its max possible depth
+4. recursively re-write the plan by deleting old filter expressions and adding new filter expressions on their max possible depth.
+*/
+pub struct FilterPushDown {}
+
+impl OptimizerRule for FilterPushDown {
+    fn name(&self) -> &str {
+        return "filter_push_down";
+    }
+
+    fn optimize(&mut self, plan: &LogicalPlan) -> Result<LogicalPlan> {
+        let result = analyze_plan(plan, 0)?;
+        let break_points = result.break_points.clone();
+
+        // get max depth over all breakpoints
+        let max_depth = break_points.keys().max();
+        if max_depth.is_none() {
+            // it is unlikely that the plan is correct without break points as all scans
+            // adds breakpoints. We just return the plan and let others handle the error
+            return Ok(plan.clone());
+        }
+        let max_depth = *max_depth.unwrap(); // unwrap is safe by previous if
+
+        // construct optimized position of each of the new selections
+        // E.g. when we have a filter (c1 + c2 > 2), c1's max depth is 10 and c2 is 11, we
+        // can push the filter to depth 10
+        let mut new_selections: BTreeMap<usize, Expr> = BTreeMap::new();
+        for (selection_depth, expr) in result.selections {
+            // get all columns on the filter expression
+            let mut selection_columns: HashSet<String> = HashSet::new();
+            utils::expr_to_column_names(&expr, &mut selection_columns)?;
+
+            // identify the depths that are filter-commutable with this selection
+            let mut new_depth = selection_depth;
+            for depth in selection_depth..max_depth {
+                if let Some(break_columns) = break_points.get(&depth) {
+                    if selection_columns
+                        .intersection(break_columns)
+                        .peekable()
+                        .peek()
+                        .is_none()
+                    {
+                        new_depth += 1
+                    } else {
+                        // non-commutable: can't advance any further
+                        break;
+                    }
+                } else {
+                    new_depth += 1
+                }
+            }
+
+            // re-write the new selections based on all projections that it crossed.
+            // E.g. in `Selection: #b\n  Projection: #a > 1 as b`, we can swap them, but the selection must be "#a > 1"
+            let mut new_expression = expr.clone();
+            for depth_i in selection_depth..new_depth {
+                if let Some(projection) = result.projections.get(&depth_i) {
+                    new_expression = rewrite(&new_expression, projection)?;
+                }
+            }
+
+            // AND filter expressions that would be placed on the same depth
+            if let Some(existing_expression) = new_selections.get(&new_depth) {
+                new_expression = and(existing_expression, &new_expression)
+            }
+            new_selections.insert(new_depth, new_expression);
+        }
+
+        optimize_plan(plan, &new_selections, 0)
+    }
+}
+
+/// The result of a plan analysis suitable to perform a filter push down optimization
+// BTreeMap are ordered, which ensures stability in ordered operations.
+// Also, most inserts here are at the end
+struct AnalysisResult {
+    /// maps the depths of non filter-commutative nodes to their columns
+    /// depths not in here indicate that the node is commutative
+    pub break_points: BTreeMap<usize, HashSet<String>>,
+    /// maps the depths of filter nodes to expressions
+    pub selections: BTreeMap<usize, Expr>,
+    /// maps the depths of projection nodes to their expressions
+    pub projections: BTreeMap<usize, HashMap<String, Expr>>,
+}
+
+/// Recursively transverses the logical plan looking for depths that break filter pushdown
+fn analyze_plan(plan: &LogicalPlan, depth: usize) -> Result<AnalysisResult> {
+    match plan {
+        LogicalPlan::Selection { input, expr } => {
+            let mut result = analyze_plan(&input, depth + 1)?;
+            result.selections.insert(depth, expr.clone());
+            Ok(result)
+        }
+        LogicalPlan::Projection {
+            input,
+            expr,
+            schema,
+        } => {
+            let mut result = analyze_plan(&input, depth + 1)?;
+
+            // collect projection.
+            let mut projection = HashMap::new();
+            schema.fields().iter().enumerate().for_each(|(i, field)| {
+                // strip alias, as they should not be part of selections
+                let expr = match &expr[i] {
+                    Expr::Alias(expr, _) => expr.as_ref().clone(),
+                    expr => expr.clone(),
+                };
+
+                projection.insert(field.name().clone(), expr);
+            });
+            result.projections.insert(depth, projection);
+            Ok(result)
+        }
+        LogicalPlan::Aggregate {
+            input, aggr_expr, ..
+        } => {
+            let mut result = analyze_plan(&input, depth + 1)?;
+
+            // construct set of columns that `aggr_expr` depends on
+            let mut agg_columns = HashSet::new();
+            utils::exprlist_to_column_names(aggr_expr, &mut agg_columns)?;
+
+            // collect all columns that break at this depth:
+            // * columns whose aggregation expression depends on
+            // * the aggregation columns themselves
+            let mut columns = agg_columns.iter().cloned().collect::<HashSet<_>>();
+            let agg_columns = aggr_expr
+                .iter()
+                .map(|x| x.name(input.schema()))
+                .collect::<Result<HashSet<_>>>()?;
+            columns.extend(agg_columns);
+            result.break_points.insert(depth, columns);
+
+            Ok(result)
+        }
+        LogicalPlan::Sort { input, .. } => analyze_plan(&input, depth + 1),
+        LogicalPlan::Limit { input, .. } => {
+            let mut result = analyze_plan(&input, depth + 1)?;
+
+            // collect all columns that break at this depth
+            let columns = input
+                .schema()
+                .fields()
+                .iter()
+                .map(|f| f.name().clone())
+                .collect::<HashSet<_>>();
+            result.break_points.insert(depth, columns);
+            Ok(result)
+        }
+        // all other plans add breaks to all their columns to indicate that filters can't proceed further.
+        _ => {
+            let columns = plan
+                .schema()
+                .fields()
+                .iter()
+                .map(|f| f.name().clone())
+                .collect::<HashSet<_>>();
+            let mut break_points = BTreeMap::new();
+
+            break_points.insert(depth, columns);
+            Ok(AnalysisResult {
+                break_points,
+                selections: BTreeMap::new(),
+                projections: BTreeMap::new(),
+            })
+        }
+    }
+}
+
+impl FilterPushDown {
+    #[allow(missing_docs)]
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+/// Returns a re-written logical plan where all old filters are removed and the new ones are added.
+fn optimize_plan(
+    plan: &LogicalPlan,
+    new_selections: &BTreeMap<usize, Expr>,
+    depth: usize,
+) -> Result<LogicalPlan> {
+    // optimize the plan recursively:
+    let new_plan = match plan {
+        LogicalPlan::Selection { input, .. } => {
+            // ignore old selections
+            Ok(optimize_plan(&input, new_selections, depth + 1)?)
+        }
+        _ => {
+            // all other nodes are copied, optimizing recursively.
+            let expr = utils::expressions(plan);
+
+            let inputs = utils::inputs(plan);
+            let new_inputs = inputs
+                .iter()
+                .map(|plan| optimize_plan(plan, new_selections, depth + 1))
+                .collect::<Result<Vec<_>>>()?;
+
+            utils::from_plan(plan, &expr, &new_inputs)
+        }
+    }?;
+
+    // if a new selection is to be applied, apply it
+    if let Some(expr) = new_selections.get(&depth) {
+        return Ok(LogicalPlan::Selection {
+            expr: expr.clone(),
+            input: Box::new(new_plan),
+        });
+    } else {
+        Ok(new_plan)
+    }
+}
+
+/// replaces columns by its name on the projection.
+fn rewrite(expr: &Expr, projection: &HashMap<String, Expr>) -> Result<Expr> {
+    let expressions = utils::expr_sub_expressions(&expr)?;
+
+    let expressions = expressions
+        .iter()
+        .map(|e| rewrite(e, &projection))
+        .collect::<Result<Vec<_>>>()?;
+
+    match expr {
+        Expr::Column(name) => {
+            if let Some(expr) = projection.get(name) {
+                return Ok(expr.clone());
+            }
+        }
+        _ => {}
+    }
+
+    utils::rewrite_expression(&expr, &expressions)
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::logicalplan::col;
+    use crate::logicalplan::ScalarValue;
+    use crate::logicalplan::{aggregate_expr, lit, Expr, LogicalPlanBuilder, Operator};
+    use crate::test::*;
+    use arrow::datatypes::DataType;
+
+    fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
+        let mut rule = FilterPushDown::new();
+        let optimized_plan = rule.optimize(plan).expect("failed to optimize plan");
+        let formatted_plan = format!("{:?}", optimized_plan);
+        assert_eq!(formatted_plan, expected);
+    }
+
+    #[test]
+    fn filter_before_projection() -> Result<()> {
+        let table_scan = test_table_scan()?;
+        let plan = LogicalPlanBuilder::from(&table_scan)
+            .project(vec![col("a"), col("b")])?
+            .filter(col("a").eq(&Expr::Literal(ScalarValue::Int64(1))))?
+            .build()?;
+        // selection is before projection
+        let expected = "\
+            Projection: #a, #b\
+            \n  Selection: #a Eq Int64(1)\
+            \n    TableScan: test projection=None";
+        assert_optimized_plan_eq(&plan, expected);
+        Ok(())
+    }
+
+    #[test]
+    fn filter_after_limit() -> Result<()> {
+        let table_scan = test_table_scan()?;
+        let plan = LogicalPlanBuilder::from(&table_scan)
+            .project(vec![col("a"), col("b")])?
+            .limit(10)?
+            .filter(col("a").eq(&Expr::Literal(ScalarValue::Int64(1))))?
+            .build()?;
+        // selection is before single projection
+        let expected = "\
+            Selection: #a Eq Int64(1)\
+            \n  Limit: 10\
+            \n    Projection: #a, #b\
+            \n      TableScan: test projection=None";
+        assert_optimized_plan_eq(&plan, expected);
+        Ok(())
+    }
+
+    #[test]
+    fn filter_jump_2_plans() -> Result<()> {
+        let table_scan = test_table_scan()?;
+        let plan = LogicalPlanBuilder::from(&table_scan)
+            .project(vec![col("a"), col("b"), col("c")])?
+            .project(vec![col("c"), col("b")])?
+            .filter(col("a").eq(&Expr::Literal(ScalarValue::Int64(1))))?
+            .build()?;
+        // selection is before double projection
+        let expected = "\
+            Projection: #c, #b\
+            \n  Projection: #a, #b, #c\
+            \n    Selection: #a Eq Int64(1)\
+            \n      TableScan: test projection=None";
+        assert_optimized_plan_eq(&plan, expected);
+        Ok(())
+    }
+
+    #[test]
+    fn filter_move_agg() -> Result<()> {
+        let table_scan = test_table_scan()?;
+        let plan = LogicalPlanBuilder::from(&table_scan)
+            .aggregate(
+                vec![col("a")],
+                vec![aggregate_expr("SUM", col("b"), DataType::Int32)
+                    .alias("total_salary")],
+            )?
+            .filter(col("a").gt(&Expr::Literal(ScalarValue::Int64(10))))?
+            .build()?;
+        // selection of key aggregation is commutative
+        let expected = "\
+            Aggregate: groupBy=[[#a]], aggr=[[SUM(#b) AS total_salary]]\
+            \n  Selection: #a Gt Int64(10)\
+            \n    TableScan: test projection=None";
+        assert_optimized_plan_eq(&plan, expected);
+        Ok(())
+    }
+
+    #[test]
+    fn filter_keep_agg() -> Result<()> {
+        let table_scan = test_table_scan()?;
+        let plan = LogicalPlanBuilder::from(&table_scan)
+            .aggregate(
+                vec![col("a")],
+                vec![aggregate_expr("SUM", col("b"), DataType::Int32).alias("b")],
+            )?
+            .filter(col("b").gt(&Expr::Literal(ScalarValue::Int64(10))))?
+            .build()?;
+        // selection of aggregate is after aggregation since they are non-commutative
+        let expected = "\
+            Selection: #b Gt Int64(10)\
+            \n  Aggregate: groupBy=[[#a]], aggr=[[SUM(#b) AS b]]\
+            \n    TableScan: test projection=None";
+        assert_optimized_plan_eq(&plan, expected);
+        Ok(())
+    }
+
+    /// verifies that a filter is pushed to before a projection, the filter expression is correctly re-written
+    #[test]
+    fn alias() -> Result<()> {
+        let table_scan = test_table_scan()?;
+        let plan = LogicalPlanBuilder::from(&table_scan)
+            .project(vec![col("a").alias("b"), col("c")])?
+            .filter(col("b").eq(&Expr::Literal(ScalarValue::Int64(1))))?
+            .build()?;
+        // selection is before projection
+        let expected = "\
+            Projection: #a AS b, #c\
+            \n  Selection: #a Eq Int64(1)\
+            \n    TableScan: test projection=None";
+        assert_optimized_plan_eq(&plan, expected);
+        Ok(())
+    }
+
+    fn add(left: Expr, right: Expr) -> Expr {
+        Expr::BinaryExpr {
+            left: Box::new(left),
+            op: Operator::Plus,
+            right: Box::new(right),
+        }
+    }
+
+    fn multiply(left: Expr, right: Expr) -> Expr {
+        Expr::BinaryExpr {
+            left: Box::new(left),
+            op: Operator::Multiply,
+            right: Box::new(right),
+        }
+    }
+
+    /// verifies that a filter is pushed to before a projection with a complex expression, the filter expression is correctly re-written
+    #[test]
+    fn complex_expression() -> Result<()> {
+        let table_scan = test_table_scan()?;
+        let plan = LogicalPlanBuilder::from(&table_scan)
+            .project(vec![
+                add(multiply(col("a"), lit(2)), col("c")).alias("b"),
+                col("c"),
+            ])?
+            .filter(col("b").eq(&Expr::Literal(ScalarValue::Int64(1))))?
+            .build()?;
+
+        // not part of the test, just good to know:
+        assert_eq!(
+            format!("{:?}", plan),
+            "\
+            Selection: #b Eq Int64(1)\
+            \n  Projection: #a Multiply Int32(2) Plus #c AS b, #c\
+            \n    TableScan: test projection=None"
+        );
+
+        // selection is before projection
+        let expected = "\
+            Projection: #a Multiply Int32(2) Plus #c AS b, #c\
+            \n  Selection: #a Multiply Int32(2) Plus #c Eq Int64(1)\
+            \n    TableScan: test projection=None";
+        assert_optimized_plan_eq(&plan, expected);
+        Ok(())
+    }
+
+    /// verifies that when a filter is pushed to after 2 projections, the filter expression is correctly re-written
+    #[test]
+    fn complex_plan() -> Result<()> {
+        let table_scan = test_table_scan()?;
+        let plan = LogicalPlanBuilder::from(&table_scan)
+            .project(vec![
+                add(multiply(col("a"), lit(2)), col("c")).alias("b"),
+                col("c"),
+            ])?
+            // second projection where we rename columns, just to make it difficult
+            .project(vec![multiply(col("b"), lit(3)).alias("a"), col("c")])?
+            .filter(col("a").eq(&Expr::Literal(ScalarValue::Int64(1))))?
+            .build()?;
+
+        // not part of the test, just good to know:
+        assert_eq!(
+            format!("{:?}", plan),
+            "\
+            Selection: #a Eq Int64(1)\
+            \n  Projection: #b Multiply Int32(3) AS a, #c\
+            \n    Projection: #a Multiply Int32(2) Plus #c AS b, #c\
+            \n      TableScan: test projection=None"
+        );
+
+        // selection is before the projections
+        let expected = "\
+        Projection: #b Multiply Int32(3) AS a, #c\
+        \n  Projection: #a Multiply Int32(2) Plus #c AS b, #c\
+        \n    Selection: #a Multiply Int32(2) Plus #c Multiply Int32(3) Eq Int64(1)\
+        \n      TableScan: test projection=None";
+        assert_optimized_plan_eq(&plan, expected);
+        Ok(())
+    }
+
+    /// verifies that when two filters apply after an aggregation that only allows one to be pushed, one is pushed
+    /// and the other not.
+    #[test]
+    fn multi_filter() -> Result<()> {
+        // the aggregation allows one filter to pass (b), and the other one to not pass (SUM(c))
+        let table_scan = test_table_scan()?;
+        let plan = LogicalPlanBuilder::from(&table_scan)
+            .project(vec![col("a").alias("b"), col("c")])?
+            .aggregate(
+                vec![col("b")],
+                vec![aggregate_expr("SUM", col("c"), DataType::Int32)],
+            )?
+            .filter(col("b").gt(&Expr::Literal(ScalarValue::Int64(10))))?
+            .filter(col("SUM(c)").gt(&Expr::Literal(ScalarValue::Int64(10))))?
+            .build()?;
+
+        // not part of the test, just good to know:
+        assert_eq!(
+            format!("{:?}", plan),
+            "\
+            Selection: #SUM(c) Gt Int64(10)\
+            \n  Selection: #b Gt Int64(10)\
+            \n    Aggregate: groupBy=[[#b]], aggr=[[SUM(#c)]]\
+            \n      Projection: #a AS b, #c\
+            \n        TableScan: test projection=None"
+        );
+
+        // selection is before the projections
+        let expected = "\
+        Selection: #SUM(c) Gt Int64(10)\
+        \n  Aggregate: groupBy=[[#b]], aggr=[[SUM(#c)]]\
+        \n    Projection: #a AS b, #c\
+        \n      Selection: #a Gt Int64(10)\
+        \n        TableScan: test projection=None";
+        assert_optimized_plan_eq(&plan, expected);
+
+        Ok(())
+    }
+
+    /// verifies that when two limits are in place, we jump neither
+    #[test]
+    fn double_limit() -> Result<()> {
+        let table_scan = test_table_scan()?;
+        let plan = LogicalPlanBuilder::from(&table_scan)
+            .project(vec![col("a"), col("b")])?
+            .limit(20)?
+            .limit(10)?
+            .project(vec![col("a"), col("b")])?
+            .filter(col("a").eq(&Expr::Literal(ScalarValue::Int64(1))))?
+            .build()?;
+        // selection does not just any of the limits
+        let expected = "\
+            Projection: #a, #b\
+            \n  Selection: #a Eq Int64(1)\
+            \n    Limit: 10\
+            \n      Limit: 20\
+            \n        Projection: #a, #b\
+            \n          TableScan: test projection=None";
+        assert_optimized_plan_eq(&plan, expected);
+        Ok(())
+    }
+
+    /// verifies that filters with the same columns are correctly placed
+    #[test]
+    fn filter_2_breaks_limits() -> Result<()> {
+        let table_scan = test_table_scan()?;
+        let plan = LogicalPlanBuilder::from(&table_scan)
+            .project(vec![col("a")])?
+            .filter(col("a").lt_eq(&Expr::Literal(ScalarValue::Int64(1))))?
+            .limit(1)?
+            .project(vec![col("a")])?
+            .filter(col("a").gt_eq(&Expr::Literal(ScalarValue::Int64(1))))?
+            .build()?;
+        // Should be able to move both filters below the projections
+
+        // not part of the test
+        assert_eq!(
+            format!("{:?}", plan),
+            "Selection: #a GtEq Int64(1)\
+             \n  Projection: #a\
+             \n    Limit: 1\
+             \n      Selection: #a LtEq Int64(1)\
+             \n        Projection: #a\
+             \n          TableScan: test projection=None"
+        );
+
+        let expected = "\
+        Projection: #a\
+        \n  Selection: #a GtEq Int64(1)\
+        \n    Limit: 1\
+        \n      Projection: #a\

Review comment:
       Something doesn't quite seem right here: I am surprised that one `Projection` is left in the plan while another is not (it is fine given that this pass is just supposed to push Selections), this just seems odd




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on pull request #7880: ARROW-9619: [Rust] [DataFusion] Add predicate push-down

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on pull request #7880:
URL: https://github.com/apache/arrow/pull/7880#issuecomment-673890773


   Thank you very much @alamb for reviewing it!
   
   This optimizer is mostly useful in the `table` or `DataFrame` API, on which a view can be declared as a sequence of statements that are not optimized for execution, but optimized for a logical and code organization's point of view.
   
   One example is when we have a dataframe `df` that was constructed optimally, but we would like to only look at rows whose `'a' > 2`. Instead of having to go through the actual code that built that DataFrame and place the filter in the correct place after investigating where we should place it, we can just write `df.filter(df['a'] > 2).collect()`, and let the optimizer figure it out where to place it.
   
   I incorporated the comments above into #7879 , as IMO they are part of that PR, and rebased the whole thing. I will still address your comment about not full understanding the algorithm by adding a more extended comment and maybe try drawing some ASCII to better explain the idea, so that it is not only on my head.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on a change in pull request #7880: ARROW-9619: [Rust] [DataFusion] Add predicate push-down

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on a change in pull request #7880:
URL: https://github.com/apache/arrow/pull/7880#discussion_r471069937



##########
File path: rust/datafusion/src/optimizer/filter_push_down.rs
##########
@@ -0,0 +1,631 @@
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Filter Push Down optimizer rule ensures that filters are applied as early as possible in the plan
+
+use crate::error::Result;
+use crate::logicalplan::Expr;
+use crate::logicalplan::{and, LogicalPlan};
+use crate::optimizer::optimizer::OptimizerRule;
+use crate::optimizer::utils;
+use std::collections::{BTreeMap, HashMap, HashSet};
+
+/// Filter Push Down optimizer rule pushes filter clauses down the plan
+///
+/// This optimization looks for the maximum depth of each column in the plan where a filter can be applied and
+/// re-writes the plan with filters on those locations.
+/// It performs two passes on the plan:
+/// 1. identify filters, which columns they use, and projections along the path
+/// 2. move filters down, re-writing the expressions using the projections
+/*
+A filter-commutative operation is an operation whose result of filter(op(data)) = op(filter(data)).
+An example of a filter-commutative operation is a projection; a counter-example is `limit`.
+
+The filter-commutative property is column-specific. An aggregate grouped by A on SUM(B)
+can commute with a filter that depends on A only, but does not commute with a filter that depends
+on SUM(B).
+
+A location in this module is identified by a number, depth, which is 0 for the last operation
+and highest for the first operation (typically a scan).
+
+This optimizer commutes filters with filter-commutative operations to push the filters
+to the maximum possible depth, consequently re-writing the filter expressions by every
+projection that changes the filter's expression.
+
+    Selection: #b Gt Int64(10)
+        Projection: #a AS b
+
+is optimized to
+
+    Projection: #a AS b
+        Selection: #a Gt Int64(10)  <--- changed from #b to #a
+
+To perform such optimization, we first analyze the plan to identify three items:
+
+1. Where are the filters located in the plan
+2. Where are non-commutable operations' columns located in the plan (break_points)
+3. Where are projections located in the plan
+
+With this information, we re-write the plan by:
+
+1. Computing the maximum possible depth of each column between breakpoints
+2. Computing the maximum possible depth of each filter expression based on the columns it depends on
+3. re-write the filter expression for every projection that it commutes with from its original depth to its max possible depth
+4. recursively re-write the plan by deleting old filter expressions and adding new filter expressions on their max possible depth.
+*/
+pub struct FilterPushDown {}
+
+impl OptimizerRule for FilterPushDown {
+    fn name(&self) -> &str {
+        return "filter_push_down";
+    }
+
+    fn optimize(&mut self, plan: &LogicalPlan) -> Result<LogicalPlan> {
+        let result = analyze_plan(plan, 0)?;
+        let break_points = result.break_points.clone();
+
+        // get max depth over all breakpoints
+        let max_depth = break_points.keys().max();
+        if max_depth.is_none() {
+            // it is unlikely that the plan is correct without break points as all scans
+            // adds breakpoints. We just return the plan and let others handle the error
+            return Ok(plan.clone());

Review comment:
       The comment is poorly written. What I was trying to say is that we allow the compiler to not return an error on poorly designed plans, in which case it just does not perform any optimization. This way, the user is likely to receive a better error message.
   
   This is a design decision that we need to take wrt to optimizers (error or ignore?). I have no strong opinion about it either: we can also return an error.
   
   Let me know what you prefer that I will change it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on pull request #7880: ARROW-9619: [Rust] [DataFusion] Add predicate push-down

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on pull request #7880:
URL: https://github.com/apache/arrow/pull/7880#issuecomment-674483893


   > Something that can be left for future optimization: we can also go the other direction, i.e. break `And` filters into into individual boolean expressions so these filters can be partially pushed further down the plan.
   
   Yeap, good idea. AFAI experienced, spark is not doing this - at least up to spark 2.4.5.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on pull request #7880: ARROW-9619: [Rust] [DataFusion] Add predicate push-down

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on pull request #7880:
URL: https://github.com/apache/arrow/pull/7880#issuecomment-673076257


   Any of you @alamb @houqp @nevi-me @paddyhoran could help out here? I think that this does significantly speeds querying for anything more complex, as we run aggregations and projections on much less data.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on a change in pull request #7880: ARROW-9619: [Rust] [DataFusion] Add predicate push-down

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on a change in pull request #7880:
URL: https://github.com/apache/arrow/pull/7880#discussion_r471008766



##########
File path: rust/datafusion/src/optimizer/filter_push_down.rs
##########
@@ -0,0 +1,467 @@
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Filter Push Down optimizer rule ensures that filters are applied as early as possible in the plan
+
+use crate::error::Result;
+use crate::logicalplan::Expr;
+use crate::logicalplan::LogicalPlan;
+use crate::optimizer::optimizer::OptimizerRule;
+use crate::optimizer::utils;
+use std::collections::{HashMap, HashSet};
+
+/// Filter Push Down optimizer rule pushes filter clauses down the plan
+///
+/// This optimization looks for the maximum depth of each column in the plan where a filter can be applied and
+/// re-writes the plan with filters on those locations.
+/// It performs two passes on the plan:
+/// 1. identify filters, which columns they use, and projections along the path
+/// 2. move filters down, re-writing the expressions using the projections
+pub struct FilterPushDown {}
+
+impl OptimizerRule for FilterPushDown {
+    fn optimize(&mut self, plan: &LogicalPlan) -> Result<LogicalPlan> {
+        let (break_points, selections, projections) = analyze_plan(plan, 0)?;
+
+        // compute max depth for each of the columns
+        let mut breaks: HashMap<String, usize> = HashMap::new();
+        for (key, depth) in break_points {
+            match breaks.get(&key) {
+                Some(current_depth) => {
+                    if depth > *current_depth {
+                        breaks.insert(key, depth);
+                    }
+                }
+                None => {
+                    breaks.insert(key, depth);
+                }
+            }
+        }
+
+        // construct optimized position of each of the new selections
+        let mut new_selections: HashMap<usize, Expr> = HashMap::new();
+        for (selection_depth, expr) in selections {
+            let mut columns: HashSet<String> = HashSet::new();
+            utils::expr_to_column_names(&expr, &mut columns)?;
+
+            // compute the depths of each of the observed columns and the respective maximum
+            let depth = columns
+                .iter()
+                .filter_map(|column| breaks.get(column))
+                .max_by_key(|depth| **depth);
+
+            let new_depth = match depth {
+                None => selection_depth,
+                Some(d) => *d,
+            };
+
+            // re-write the new selections based on all projections that it crossed.
+            // E.g. in `Selection: #b\n  Projection: #a > 1 as b`, we can swap them, but the selection must be "#a > 1"
+            let mut new_expression = expr.clone();
+            for depth_i in selection_depth..new_depth {
+                if let Some(projection) = projections.get(&depth_i) {
+                    new_expression = rewrite(&new_expression, projection)?;
+                }
+            }
+
+            new_selections.insert(new_depth, new_expression);
+        }
+
+        optimize_plan(plan, &new_selections, 0)
+    }
+}
+
+/// Recursively transverses the logical plan looking for depths that break filter pushdown
+/// Returns a tuple:
+/// 0: map "column -> depth" of the depth that each column is found up to.
+/// 1: map "depth -> filter expression"
+/// 2: map "depth -> projection"
+fn analyze_plan(
+    plan: &LogicalPlan,
+    depth: usize,
+) -> Result<(
+    HashMap<String, usize>,
+    HashMap<usize, Expr>,
+    HashMap<usize, HashMap<String, Expr>>,
+)> {
+    match plan {
+        LogicalPlan::Selection { input, expr } => {
+            let mut result = analyze_plan(&input, depth + 1)?;
+            result.1.insert(depth, expr.clone());

Review comment:
       Good call. I've made it a struct. I tend to stick to functions to avoid issues with the lifetime of `self` (e.g. two calls of `optimize_plan`).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] houqp commented on a change in pull request #7880: ARROW-9619: [Rust] [DataFusion] Add predicate push-down

Posted by GitBox <gi...@apache.org>.
houqp commented on a change in pull request #7880:
URL: https://github.com/apache/arrow/pull/7880#discussion_r471062961



##########
File path: rust/datafusion/src/optimizer/filter_push_down.rs
##########
@@ -0,0 +1,631 @@
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Filter Push Down optimizer rule ensures that filters are applied as early as possible in the plan
+
+use crate::error::Result;
+use crate::logicalplan::Expr;
+use crate::logicalplan::{and, LogicalPlan};
+use crate::optimizer::optimizer::OptimizerRule;
+use crate::optimizer::utils;
+use std::collections::{BTreeMap, HashMap, HashSet};
+
+/// Filter Push Down optimizer rule pushes filter clauses down the plan
+///
+/// This optimization looks for the maximum depth of each column in the plan where a filter can be applied and
+/// re-writes the plan with filters on those locations.
+/// It performs two passes on the plan:
+/// 1. identify filters, which columns they use, and projections along the path
+/// 2. move filters down, re-writing the expressions using the projections
+/*
+A filter-commutative operation is an operation whose result of filter(op(data)) = op(filter(data)).
+An example of a filter-commutative operation is a projection; a counter-example is `limit`.
+
+The filter-commutative property is column-specific. An aggregate grouped by A on SUM(B)
+can commute with a filter that depends on A only, but does not commute with a filter that depends
+on SUM(B).
+
+A location in this module is identified by a number, depth, which is 0 for the last operation
+and highest for the first operation (typically a scan).
+
+This optimizer commutes filters with filter-commutative operations to push the filters
+to the maximum possible depth, consequently re-writing the filter expressions by every
+projection that changes the filter's expression.
+
+    Selection: #b Gt Int64(10)
+        Projection: #a AS b
+
+is optimized to
+
+    Projection: #a AS b
+        Selection: #a Gt Int64(10)  <--- changed from #b to #a
+
+To perform such optimization, we first analyze the plan to identify three items:
+
+1. Where are the filters located in the plan
+2. Where are non-commutable operations' columns located in the plan (break_points)
+3. Where are projections located in the plan
+
+With this information, we re-write the plan by:
+
+1. Computing the maximum possible depth of each column between breakpoints
+2. Computing the maximum possible depth of each filter expression based on the columns it depends on
+3. re-write the filter expression for every projection that it commutes with from its original depth to its max possible depth
+4. recursively re-write the plan by deleting old filter expressions and adding new filter expressions on their max possible depth.
+*/
+pub struct FilterPushDown {}
+
+impl OptimizerRule for FilterPushDown {
+    fn name(&self) -> &str {
+        return "filter_push_down";
+    }
+
+    fn optimize(&mut self, plan: &LogicalPlan) -> Result<LogicalPlan> {
+        let result = analyze_plan(plan, 0)?;
+        let break_points = result.break_points.clone();
+
+        // get max depth over all breakpoints
+        let max_depth = break_points.keys().max();
+        if max_depth.is_none() {
+            // it is unlikely that the plan is correct without break points as all scans
+            // adds breakpoints. We just return the plan and let others handle the error
+            return Ok(plan.clone());

Review comment:
       shouldn't we return error here instead if the plan is not correct?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on pull request #7880: ARROW-9619: [Rust] [DataFusion] Add predicate push-down

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #7880:
URL: https://github.com/apache/arrow/pull/7880#issuecomment-674429867


   I have had this nagging sensation that the algorithm isn't quite right when the same column is used multiple times. I finally came up with an example that shows part of what I have been worrying about. 
   
   Here is a new test that passes on this branch but I think is incorrect. Specifically, with two `Selection`s for the same variable separated by a `Limit`, one of the `Selection`s is  lost with the algorithm as written.  Am I missing something?
   
   ```
   
       #[test]
       fn filter_2_breaks_limits() -> Result<()> {
           let table_scan = test_table_scan()?;
           let plan = LogicalPlanBuilder::from(&table_scan)
               .project(vec![col("a")])?
               .filter(col("a").lt_eq(&Expr::Literal(ScalarValue::Int64(1))))?
               .limit(1)?
               .project(vec![col("a")])?
               .filter(col("a").gt_eq(&Expr::Literal(ScalarValue::Int64(1))))?
               .build()?;
           // Should be able to move both filters below the projections
   
           // not part of the test
           assert_eq!(
               format!("{:?}", plan),
               "Selection: #a GtEq Int64(1)\
                \n  Projection: #a\
                \n    Limit: 1\
                \n      Selection: #a LtEq Int64(1)\
                \n        Projection: #a\
                \n          TableScan: test projection=None"
           );
   
           // This just seems wong: we lost a selection....
           let expected = "\
           Projection: #a\
           \n  Selection: #a GtEq Int64(1)\
           \n    Limit: 1\
           \n      Projection: #a\
           \n        TableScan: test projection=None";
   
           assert_optimized_plan_eq(&plan, expected);
           Ok(())
       }
   ```
   
   FYI @jorgecarleitao 
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on a change in pull request #7880: ARROW-9619: [Rust] [DataFusion] Add predicate push-down

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #7880:
URL: https://github.com/apache/arrow/pull/7880#discussion_r472133640



##########
File path: rust/datafusion/src/optimizer/filter_push_down.rs
##########
@@ -0,0 +1,631 @@
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Filter Push Down optimizer rule ensures that filters are applied as early as possible in the plan
+
+use crate::error::Result;
+use crate::logicalplan::Expr;
+use crate::logicalplan::{and, LogicalPlan};
+use crate::optimizer::optimizer::OptimizerRule;
+use crate::optimizer::utils;
+use std::collections::{BTreeMap, HashMap, HashSet};
+
+/// Filter Push Down optimizer rule pushes filter clauses down the plan
+///
+/// This optimization looks for the maximum depth of each column in the plan where a filter can be applied and
+/// re-writes the plan with filters on those locations.
+/// It performs two passes on the plan:
+/// 1. identify filters, which columns they use, and projections along the path
+/// 2. move filters down, re-writing the expressions using the projections
+/*
+A filter-commutative operation is an operation whose result of filter(op(data)) = op(filter(data)).
+An example of a filter-commutative operation is a projection; a counter-example is `limit`.
+
+The filter-commutative property is column-specific. An aggregate grouped by A on SUM(B)
+can commute with a filter that depends on A only, but does not commute with a filter that depends
+on SUM(B).
+
+A location in this module is identified by a number, depth, which is 0 for the last operation
+and highest for the first operation (typically a scan).
+
+This optimizer commutes filters with filter-commutative operations to push the filters
+to the maximum possible depth, consequently re-writing the filter expressions by every
+projection that changes the filter's expression.
+
+    Selection: #b Gt Int64(10)
+        Projection: #a AS b
+
+is optimized to
+
+    Projection: #a AS b
+        Selection: #a Gt Int64(10)  <--- changed from #b to #a
+
+To perform such optimization, we first analyze the plan to identify three items:
+
+1. Where are the filters located in the plan
+2. Where are non-commutable operations' columns located in the plan (break_points)
+3. Where are projections located in the plan
+
+With this information, we re-write the plan by:
+
+1. Computing the maximum possible depth of each column between breakpoints
+2. Computing the maximum possible depth of each filter expression based on the columns it depends on
+3. re-write the filter expression for every projection that it commutes with from its original depth to its max possible depth
+4. recursively re-write the plan by deleting old filter expressions and adding new filter expressions on their max possible depth.
+*/
+pub struct FilterPushDown {}
+
+impl OptimizerRule for FilterPushDown {
+    fn name(&self) -> &str {
+        return "filter_push_down";
+    }
+
+    fn optimize(&mut self, plan: &LogicalPlan) -> Result<LogicalPlan> {
+        let result = analyze_plan(plan, 0)?;
+        let break_points = result.break_points.clone();
+
+        // get max depth over all breakpoints
+        let max_depth = break_points.keys().max();
+        if max_depth.is_none() {
+            // it is unlikely that the plan is correct without break points as all scans
+            // adds breakpoints. We just return the plan and let others handle the error
+            return Ok(plan.clone());
+        }
+        let max_depth = *max_depth.unwrap(); // unwrap is safe by previous if
+
+        // construct optimized position of each of the new selections
+        // E.g. when we have a filter (c1 + c2 > 2), c1's max depth is 10 and c2 is 11, we
+        // can push the filter to depth 10
+        let mut new_selections: BTreeMap<usize, Expr> = BTreeMap::new();
+        for (selection_depth, expr) in result.selections {
+            // get all columns on the filter expression
+            let mut selection_columns: HashSet<String> = HashSet::new();
+            utils::expr_to_column_names(&expr, &mut selection_columns)?;
+
+            // identify the depths that are filter-commutable with this selection
+            let mut new_depth = selection_depth;
+            for depth in selection_depth..max_depth {
+                if let Some(break_columns) = break_points.get(&depth) {
+                    if selection_columns
+                        .intersection(break_columns)
+                        .peekable()
+                        .peek()
+                        .is_none()
+                    {
+                        new_depth += 1
+                    } else {
+                        // non-commutable: can't advance any further
+                        break;
+                    }
+                } else {
+                    new_depth += 1
+                }
+            }
+
+            // re-write the new selections based on all projections that it crossed.
+            // E.g. in `Selection: #b\n  Projection: #a > 1 as b`, we can swap them, but the selection must be "#a > 1"
+            let mut new_expression = expr.clone();
+            for depth_i in selection_depth..new_depth {
+                if let Some(projection) = result.projections.get(&depth_i) {
+                    new_expression = rewrite(&new_expression, projection)?;
+                }
+            }
+
+            // AND filter expressions that would be placed on the same depth
+            if let Some(existing_expression) = new_selections.get(&new_depth) {
+                new_expression = and(existing_expression, &new_expression)
+            }
+            new_selections.insert(new_depth, new_expression);
+        }
+
+        optimize_plan(plan, &new_selections, 0)
+    }
+}
+
+/// The result of a plan analysis suitable to perform a filter push down optimization
+// BTreeMap are ordered, which ensures stability in ordered operations.
+// Also, most inserts here are at the end
+struct AnalysisResult {
+    /// maps the depths of non filter-commutative nodes to their columns
+    /// depths not in here indicate that the node is commutative
+    pub break_points: BTreeMap<usize, HashSet<String>>,
+    /// maps the depths of filter nodes to expressions
+    pub selections: BTreeMap<usize, Expr>,
+    /// maps the depths of projection nodes to their expressions
+    pub projections: BTreeMap<usize, HashMap<String, Expr>>,
+}
+
+/// Recursively transverses the logical plan looking for depths that break filter pushdown
+fn analyze_plan(plan: &LogicalPlan, depth: usize) -> Result<AnalysisResult> {
+    match plan {
+        LogicalPlan::Selection { input, expr } => {
+            let mut result = analyze_plan(&input, depth + 1)?;
+            result.selections.insert(depth, expr.clone());
+            Ok(result)
+        }
+        LogicalPlan::Projection {
+            input,
+            expr,
+            schema,
+        } => {
+            let mut result = analyze_plan(&input, depth + 1)?;
+
+            // collect projection.
+            let mut projection = HashMap::new();
+            schema.fields().iter().enumerate().for_each(|(i, field)| {
+                // strip alias, as they should not be part of selections
+                let expr = match &expr[i] {
+                    Expr::Alias(expr, _) => expr.as_ref().clone(),
+                    expr => expr.clone(),
+                };
+
+                projection.insert(field.name().clone(), expr);
+            });
+            result.projections.insert(depth, projection);
+            Ok(result)
+        }
+        LogicalPlan::Aggregate {
+            input, aggr_expr, ..
+        } => {
+            let mut result = analyze_plan(&input, depth + 1)?;
+
+            // construct set of columns that `aggr_expr` depends on
+            let mut agg_columns = HashSet::new();
+            utils::exprlist_to_column_names(aggr_expr, &mut agg_columns)?;
+
+            // collect all columns that break at this depth:
+            // * columns whose aggregation expression depends on
+            // * the aggregation columns themselves
+            let mut columns = agg_columns.iter().cloned().collect::<HashSet<_>>();
+            let agg_columns = aggr_expr
+                .iter()
+                .map(|x| x.name(input.schema()))
+                .collect::<Result<HashSet<_>>>()?;
+            columns.extend(agg_columns);
+            result.break_points.insert(depth, columns);
+
+            Ok(result)
+        }
+        LogicalPlan::Sort { input, .. } => analyze_plan(&input, depth + 1),
+        LogicalPlan::Limit { input, .. } => {
+            let mut result = analyze_plan(&input, depth + 1)?;
+
+            // collect all columns that break at this depth
+            let columns = input
+                .schema()
+                .fields()
+                .iter()
+                .map(|f| f.name().clone())
+                .collect::<HashSet<_>>();
+            result.break_points.insert(depth, columns);
+            Ok(result)
+        }
+        // all other plans add breaks to all their columns to indicate that filters can't proceed further.
+        _ => {
+            let columns = plan
+                .schema()
+                .fields()
+                .iter()
+                .map(|f| f.name().clone())
+                .collect::<HashSet<_>>();
+            let mut break_points = BTreeMap::new();
+
+            break_points.insert(depth, columns);
+            Ok(AnalysisResult {
+                break_points,
+                selections: BTreeMap::new(),
+                projections: BTreeMap::new(),
+            })
+        }
+    }
+}
+
+impl FilterPushDown {
+    #[allow(missing_docs)]
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+/// Returns a re-written logical plan where all old filters are removed and the new ones are added.
+fn optimize_plan(
+    plan: &LogicalPlan,
+    new_selections: &BTreeMap<usize, Expr>,
+    depth: usize,
+) -> Result<LogicalPlan> {
+    // optimize the plan recursively:
+    let new_plan = match plan {
+        LogicalPlan::Selection { input, .. } => {
+            // ignore old selections
+            Ok(optimize_plan(&input, new_selections, depth + 1)?)
+        }
+        _ => {
+            // all other nodes are copied, optimizing recursively.
+            let expr = utils::expressions(plan);
+
+            let inputs = utils::inputs(plan);
+            let new_inputs = inputs
+                .iter()
+                .map(|plan| optimize_plan(plan, new_selections, depth + 1))
+                .collect::<Result<Vec<_>>>()?;
+
+            utils::from_plan(plan, &expr, &new_inputs)
+        }
+    }?;
+
+    // if a new selection is to be applied, apply it
+    if let Some(expr) = new_selections.get(&depth) {
+        return Ok(LogicalPlan::Selection {
+            expr: expr.clone(),
+            input: Box::new(new_plan),
+        });
+    } else {
+        Ok(new_plan)
+    }
+}
+
+/// replaces columns by its name on the projection.
+fn rewrite(expr: &Expr, projection: &HashMap<String, Expr>) -> Result<Expr> {
+    let expressions = utils::expr_sub_expressions(&expr)?;
+
+    let expressions = expressions
+        .iter()
+        .map(|e| rewrite(e, &projection))
+        .collect::<Result<Vec<_>>>()?;
+
+    match expr {
+        Expr::Column(name) => {
+            if let Some(expr) = projection.get(name) {
+                return Ok(expr.clone());
+            }
+        }
+        _ => {}
+    }
+
+    utils::rewrite_expression(&expr, &expressions)
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::logicalplan::col;
+    use crate::logicalplan::ScalarValue;
+    use crate::logicalplan::{aggregate_expr, lit, Expr, LogicalPlanBuilder, Operator};
+    use crate::test::*;
+    use arrow::datatypes::DataType;
+
+    fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
+        let mut rule = FilterPushDown::new();
+        let optimized_plan = rule.optimize(plan).expect("failed to optimize plan");
+        let formatted_plan = format!("{:?}", optimized_plan);
+        assert_eq!(formatted_plan, expected);
+    }
+
+    #[test]
+    fn filter_before_projection() -> Result<()> {
+        let table_scan = test_table_scan()?;
+        let plan = LogicalPlanBuilder::from(&table_scan)
+            .project(vec![col("a"), col("b")])?
+            .filter(col("a").eq(&Expr::Literal(ScalarValue::Int64(1))))?
+            .build()?;
+        // selection is before projection
+        let expected = "\
+            Projection: #a, #b\
+            \n  Selection: #a Eq Int64(1)\
+            \n    TableScan: test projection=None";
+        assert_optimized_plan_eq(&plan, expected);
+        Ok(())
+    }
+
+    #[test]
+    fn filter_after_limit() -> Result<()> {
+        let table_scan = test_table_scan()?;
+        let plan = LogicalPlanBuilder::from(&table_scan)
+            .project(vec![col("a"), col("b")])?
+            .limit(10)?
+            .filter(col("a").eq(&Expr::Literal(ScalarValue::Int64(1))))?
+            .build()?;
+        // selection is before single projection
+        let expected = "\
+            Selection: #a Eq Int64(1)\
+            \n  Limit: 10\
+            \n    Projection: #a, #b\
+            \n      TableScan: test projection=None";
+        assert_optimized_plan_eq(&plan, expected);
+        Ok(())
+    }
+
+    #[test]
+    fn filter_jump_2_plans() -> Result<()> {
+        let table_scan = test_table_scan()?;
+        let plan = LogicalPlanBuilder::from(&table_scan)
+            .project(vec![col("a"), col("b"), col("c")])?
+            .project(vec![col("c"), col("b")])?
+            .filter(col("a").eq(&Expr::Literal(ScalarValue::Int64(1))))?
+            .build()?;
+        // selection is before double projection
+        let expected = "\
+            Projection: #c, #b\
+            \n  Projection: #a, #b, #c\
+            \n    Selection: #a Eq Int64(1)\
+            \n      TableScan: test projection=None";
+        assert_optimized_plan_eq(&plan, expected);
+        Ok(())
+    }
+
+    #[test]
+    fn filter_move_agg() -> Result<()> {
+        let table_scan = test_table_scan()?;
+        let plan = LogicalPlanBuilder::from(&table_scan)
+            .aggregate(
+                vec![col("a")],
+                vec![aggregate_expr("SUM", col("b"), DataType::Int32)
+                    .alias("total_salary")],
+            )?
+            .filter(col("a").gt(&Expr::Literal(ScalarValue::Int64(10))))?
+            .build()?;
+        // selection of key aggregation is commutative
+        let expected = "\
+            Aggregate: groupBy=[[#a]], aggr=[[SUM(#b) AS total_salary]]\
+            \n  Selection: #a Gt Int64(10)\
+            \n    TableScan: test projection=None";
+        assert_optimized_plan_eq(&plan, expected);
+        Ok(())
+    }
+
+    #[test]
+    fn filter_keep_agg() -> Result<()> {
+        let table_scan = test_table_scan()?;
+        let plan = LogicalPlanBuilder::from(&table_scan)
+            .aggregate(
+                vec![col("a")],
+                vec![aggregate_expr("SUM", col("b"), DataType::Int32).alias("b")],
+            )?
+            .filter(col("b").gt(&Expr::Literal(ScalarValue::Int64(10))))?
+            .build()?;
+        // selection of aggregate is after aggregation since they are non-commutative
+        let expected = "\
+            Selection: #b Gt Int64(10)\
+            \n  Aggregate: groupBy=[[#a]], aggr=[[SUM(#b) AS b]]\
+            \n    TableScan: test projection=None";
+        assert_optimized_plan_eq(&plan, expected);
+        Ok(())
+    }
+
+    /// verifies that a filter is pushed to before a projection, the filter expression is correctly re-written
+    #[test]
+    fn alias() -> Result<()> {
+        let table_scan = test_table_scan()?;
+        let plan = LogicalPlanBuilder::from(&table_scan)
+            .project(vec![col("a").alias("b"), col("c")])?
+            .filter(col("b").eq(&Expr::Literal(ScalarValue::Int64(1))))?
+            .build()?;
+        // selection is before projection
+        let expected = "\
+            Projection: #a AS b, #c\
+            \n  Selection: #a Eq Int64(1)\
+            \n    TableScan: test projection=None";
+        assert_optimized_plan_eq(&plan, expected);
+        Ok(())
+    }
+
+    fn add(left: Expr, right: Expr) -> Expr {
+        Expr::BinaryExpr {
+            left: Box::new(left),
+            op: Operator::Plus,
+            right: Box::new(right),
+        }
+    }
+
+    fn multiply(left: Expr, right: Expr) -> Expr {
+        Expr::BinaryExpr {
+            left: Box::new(left),
+            op: Operator::Multiply,
+            right: Box::new(right),
+        }
+    }
+
+    /// verifies that a filter is pushed to before a projection with a complex expression, the filter expression is correctly re-written
+    #[test]
+    fn complex_expression() -> Result<()> {
+        let table_scan = test_table_scan()?;
+        let plan = LogicalPlanBuilder::from(&table_scan)
+            .project(vec![
+                add(multiply(col("a"), lit(2)), col("c")).alias("b"),
+                col("c"),
+            ])?
+            .filter(col("b").eq(&Expr::Literal(ScalarValue::Int64(1))))?
+            .build()?;
+
+        // not part of the test, just good to know:
+        assert_eq!(
+            format!("{:?}", plan),
+            "\
+            Selection: #b Eq Int64(1)\
+            \n  Projection: #a Multiply Int32(2) Plus #c AS b, #c\
+            \n    TableScan: test projection=None"
+        );
+
+        // selection is before projection
+        let expected = "\
+            Projection: #a Multiply Int32(2) Plus #c AS b, #c\
+            \n  Selection: #a Multiply Int32(2) Plus #c Eq Int64(1)\
+            \n    TableScan: test projection=None";
+        assert_optimized_plan_eq(&plan, expected);
+        Ok(())
+    }
+
+    /// verifies that when a filter is pushed to after 2 projections, the filter expression is correctly re-written
+    #[test]
+    fn complex_plan() -> Result<()> {
+        let table_scan = test_table_scan()?;
+        let plan = LogicalPlanBuilder::from(&table_scan)
+            .project(vec![
+                add(multiply(col("a"), lit(2)), col("c")).alias("b"),
+                col("c"),
+            ])?
+            // second projection where we rename columns, just to make it difficult
+            .project(vec![multiply(col("b"), lit(3)).alias("a"), col("c")])?
+            .filter(col("a").eq(&Expr::Literal(ScalarValue::Int64(1))))?
+            .build()?;
+
+        // not part of the test, just good to know:
+        assert_eq!(
+            format!("{:?}", plan),
+            "\
+            Selection: #a Eq Int64(1)\
+            \n  Projection: #b Multiply Int32(3) AS a, #c\
+            \n    Projection: #a Multiply Int32(2) Plus #c AS b, #c\
+            \n      TableScan: test projection=None"
+        );
+
+        // selection is before the projections
+        let expected = "\
+        Projection: #b Multiply Int32(3) AS a, #c\
+        \n  Projection: #a Multiply Int32(2) Plus #c AS b, #c\
+        \n    Selection: #a Multiply Int32(2) Plus #c Multiply Int32(3) Eq Int64(1)\
+        \n      TableScan: test projection=None";
+        assert_optimized_plan_eq(&plan, expected);
+        Ok(())
+    }
+
+    /// verifies that when two filters apply after an aggregation that only allows one to be pushed, one is pushed
+    /// and the other not.
+    #[test]
+    fn multi_filter() -> Result<()> {
+        // the aggregation allows one filter to pass (b), and the other one to not pass (SUM(c))
+        let table_scan = test_table_scan()?;
+        let plan = LogicalPlanBuilder::from(&table_scan)
+            .project(vec![col("a").alias("b"), col("c")])?
+            .aggregate(
+                vec![col("b")],
+                vec![aggregate_expr("SUM", col("c"), DataType::Int32)],
+            )?
+            .filter(col("b").gt(&Expr::Literal(ScalarValue::Int64(10))))?
+            .filter(col("SUM(c)").gt(&Expr::Literal(ScalarValue::Int64(10))))?
+            .build()?;
+
+        // not part of the test, just good to know:
+        assert_eq!(
+            format!("{:?}", plan),
+            "\
+            Selection: #SUM(c) Gt Int64(10)\
+            \n  Selection: #b Gt Int64(10)\
+            \n    Aggregate: groupBy=[[#b]], aggr=[[SUM(#c)]]\
+            \n      Projection: #a AS b, #c\
+            \n        TableScan: test projection=None"
+        );
+
+        // selection is before the projections
+        let expected = "\
+        Selection: #SUM(c) Gt Int64(10)\
+        \n  Aggregate: groupBy=[[#b]], aggr=[[SUM(#c)]]\
+        \n    Projection: #a AS b, #c\
+        \n      Selection: #a Gt Int64(10)\
+        \n        TableScan: test projection=None";
+        assert_optimized_plan_eq(&plan, expected);
+
+        Ok(())
+    }
+
+    /// verifies that when two limits are in place, we jump neither
+    #[test]
+    fn double_limit() -> Result<()> {
+        let table_scan = test_table_scan()?;
+        let plan = LogicalPlanBuilder::from(&table_scan)
+            .project(vec![col("a"), col("b")])?
+            .limit(20)?
+            .limit(10)?
+            .project(vec![col("a"), col("b")])?
+            .filter(col("a").eq(&Expr::Literal(ScalarValue::Int64(1))))?
+            .build()?;
+        // selection does not just any of the limits
+        let expected = "\
+            Projection: #a, #b\
+            \n  Selection: #a Eq Int64(1)\
+            \n    Limit: 10\
+            \n      Limit: 20\
+            \n        Projection: #a, #b\
+            \n          TableScan: test projection=None";
+        assert_optimized_plan_eq(&plan, expected);
+        Ok(())
+    }
+
+    /// verifies that filters with the same columns are correctly placed
+    #[test]
+    fn filter_2_breaks_limits() -> Result<()> {
+        let table_scan = test_table_scan()?;
+        let plan = LogicalPlanBuilder::from(&table_scan)
+            .project(vec![col("a")])?
+            .filter(col("a").lt_eq(&Expr::Literal(ScalarValue::Int64(1))))?
+            .limit(1)?
+            .project(vec![col("a")])?
+            .filter(col("a").gt_eq(&Expr::Literal(ScalarValue::Int64(1))))?
+            .build()?;
+        // Should be able to move both filters below the projections
+
+        // not part of the test
+        assert_eq!(
+            format!("{:?}", plan),
+            "Selection: #a GtEq Int64(1)\
+             \n  Projection: #a\
+             \n    Limit: 1\
+             \n      Selection: #a LtEq Int64(1)\
+             \n        Projection: #a\
+             \n          TableScan: test projection=None"
+        );
+
+        let expected = "\
+        Projection: #a\
+        \n  Selection: #a GtEq Int64(1)\
+        \n    Limit: 1\
+        \n      Projection: #a\

Review comment:
       You are right -- I apologize I misread the diff. This looks good to me




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] houqp commented on a change in pull request #7880: ARROW-9619: [Rust] [DataFusion] Add predicate push-down

Posted by GitBox <gi...@apache.org>.
houqp commented on a change in pull request #7880:
URL: https://github.com/apache/arrow/pull/7880#discussion_r470041733



##########
File path: rust/datafusion/src/optimizer/filter_push_down.rs
##########
@@ -0,0 +1,467 @@
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Filter Push Down optimizer rule ensures that filters are applied as early as possible in the plan
+
+use crate::error::Result;
+use crate::logicalplan::Expr;
+use crate::logicalplan::LogicalPlan;
+use crate::optimizer::optimizer::OptimizerRule;
+use crate::optimizer::utils;
+use std::collections::{HashMap, HashSet};
+
+/// Filter Push Down optimizer rule pushes filter clauses down the plan
+///
+/// This optimization looks for the maximum depth of each column in the plan where a filter can be applied and
+/// re-writes the plan with filters on those locations.
+/// It performs two passes on the plan:
+/// 1. identify filters, which columns they use, and projections along the path
+/// 2. move filters down, re-writing the expressions using the projections
+pub struct FilterPushDown {}
+
+impl OptimizerRule for FilterPushDown {
+    fn optimize(&mut self, plan: &LogicalPlan) -> Result<LogicalPlan> {
+        let (break_points, selections, projections) = analyze_plan(plan, 0)?;
+
+        // compute max depth for each of the columns
+        let mut breaks: HashMap<String, usize> = HashMap::new();
+        for (key, depth) in break_points {
+            match breaks.get(&key) {
+                Some(current_depth) => {
+                    if depth > *current_depth {
+                        breaks.insert(key, depth);
+                    }
+                }
+                None => {
+                    breaks.insert(key, depth);
+                }
+            }
+        }
+
+        // construct optimized position of each of the new selections
+        let mut new_selections: HashMap<usize, Expr> = HashMap::new();
+        for (selection_depth, expr) in selections {
+            let mut columns: HashSet<String> = HashSet::new();
+            utils::expr_to_column_names(&expr, &mut columns)?;
+
+            // compute the depths of each of the observed columns and the respective maximum
+            let depth = columns
+                .iter()
+                .filter_map(|column| breaks.get(column))
+                .max_by_key(|depth| **depth);
+
+            let new_depth = match depth {
+                None => selection_depth,
+                Some(d) => *d,
+            };
+
+            // re-write the new selections based on all projections that it crossed.
+            // E.g. in `Selection: #b\n  Projection: #a > 1 as b`, we can swap them, but the selection must be "#a > 1"
+            let mut new_expression = expr.clone();
+            for depth_i in selection_depth..new_depth {
+                if let Some(projection) = projections.get(&depth_i) {
+                    new_expression = rewrite(&new_expression, projection)?;
+                }
+            }
+
+            new_selections.insert(new_depth, new_expression);
+        }
+
+        optimize_plan(plan, &new_selections, 0)
+    }
+}
+
+/// Recursively transverses the logical plan looking for depths that break filter pushdown
+/// Returns a tuple:
+/// 0: map "column -> depth" of the depth that each column is found up to.
+/// 1: map "depth -> filter expression"
+/// 2: map "depth -> projection"
+fn analyze_plan(
+    plan: &LogicalPlan,
+    depth: usize,
+) -> Result<(
+    HashMap<String, usize>,
+    HashMap<usize, Expr>,
+    HashMap<usize, HashMap<String, Expr>>,
+)> {
+    match plan {
+        LogicalPlan::Selection { input, expr } => {
+            let mut result = analyze_plan(&input, depth + 1)?;
+            result.1.insert(depth, expr.clone());

Review comment:
       I find `result.{0,1,2}.insert` a little bit hard to follow when going through the code, had to keep going back to the function comment to find out which index refers to which map. Would be better if we create a named struct to store the return value.

##########
File path: rust/datafusion/src/optimizer/filter_push_down.rs
##########
@@ -0,0 +1,505 @@
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Filter Push Down optimizer rule ensures that filters are applied as early as possible in the plan
+
+use crate::error::Result;
+use crate::logicalplan::Expr;
+use crate::logicalplan::LogicalPlan;
+use crate::optimizer::optimizer::OptimizerRule;
+use crate::optimizer::utils;
+use std::collections::{HashMap, HashSet};
+
+/// Filter Push Down optimizer rule pushes filter clauses down the plan
+///
+/// This optimization looks for the maximum depth of each column in the plan where a filter can be applied and
+/// re-writes the plan with filters on those locations.
+/// It performs two passes on the plan:
+/// 1. identify filters, which columns they use, and projections along the path
+/// 2. move filters down, re-writing the expressions using the projections
+/*
+A filter-commutative operation is a operation whose result of filter(op(data)) = op(filter(data)).
+An example of a filter-commutative operation is a projection; a counter-example is `limit`.
+
+The filter-commutative property is column-specific. An aggregate grouped by A on SUM(B)
+can commute with a filter that depends on A only, but does not commute with a filter that depends
+on SUM(B).
+
+A location in this module is identified by a number, depth, which is 0 for the last operation
+and highest for the first operation (tipically a scan).
+
+This optimizer commutes filters with filter-commutative operations to push the filters
+to the maximum possible depth, consequently re-writing the filter expressions by every
+projection that changes the filter's expression.
+
+    Selection: #b Gt Int64(10)
+        Projection: #a AS b
+
+is optimized to
+
+    Projection: #a AS b
+        Selection: #a Gt Int64(10)  <--- changed from #b to #a
+
+To perform such optimization, we first analyze the plan to identify three items:
+
+1. Where are the filters located in the plan
+2. Where are non-commutable operations' columns located in the plan (break_points)
+3. Where are projections located in the plan
+
+With this information, we re-write the plan by:
+
+1. Computing the maximum possible depth of each column
+2. Computing the maximum possible depth of each filter expression based on the columns it depends on
+3. re-write the filter expression for every projection that it commutes with from its original depth to its max possible depth

Review comment:
       i think we missed step 4. selection push down? Would be helpful to mention push down starts from the the first operation (highest depth) in the plan as well.

##########
File path: rust/datafusion/src/optimizer/filter_push_down.rs
##########
@@ -0,0 +1,505 @@
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Filter Push Down optimizer rule ensures that filters are applied as early as possible in the plan
+
+use crate::error::Result;
+use crate::logicalplan::Expr;
+use crate::logicalplan::LogicalPlan;
+use crate::optimizer::optimizer::OptimizerRule;
+use crate::optimizer::utils;
+use std::collections::{HashMap, HashSet};
+
+/// Filter Push Down optimizer rule pushes filter clauses down the plan
+///
+/// This optimization looks for the maximum depth of each column in the plan where a filter can be applied and
+/// re-writes the plan with filters on those locations.
+/// It performs two passes on the plan:
+/// 1. identify filters, which columns they use, and projections along the path
+/// 2. move filters down, re-writing the expressions using the projections
+/*
+A filter-commutative operation is a operation whose result of filter(op(data)) = op(filter(data)).
+An example of a filter-commutative operation is a projection; a counter-example is `limit`.
+
+The filter-commutative property is column-specific. An aggregate grouped by A on SUM(B)
+can commute with a filter that depends on A only, but does not commute with a filter that depends
+on SUM(B).
+
+A location in this module is identified by a number, depth, which is 0 for the last operation
+and highest for the first operation (tipically a scan).
+
+This optimizer commutes filters with filter-commutative operations to push the filters
+to the maximum possible depth, consequently re-writing the filter expressions by every
+projection that changes the filter's expression.
+
+    Selection: #b Gt Int64(10)
+        Projection: #a AS b
+
+is optimized to
+
+    Projection: #a AS b
+        Selection: #a Gt Int64(10)  <--- changed from #b to #a
+
+To perform such optimization, we first analyze the plan to identify three items:
+
+1. Where are the filters located in the plan
+2. Where are non-commutable operations' columns located in the plan (break_points)
+3. Where are projections located in the plan
+
+With this information, we re-write the plan by:
+
+1. Computing the maximum possible depth of each column
+2. Computing the maximum possible depth of each filter expression based on the columns it depends on
+3. re-write the filter expression for every projection that it commutes with from its original depth to its max possible depth
+*/
+pub struct FilterPushDown {}
+
+impl OptimizerRule for FilterPushDown {
+    fn optimize(&mut self, plan: &LogicalPlan) -> Result<LogicalPlan> {
+        let (break_points, selections, projections) = analyze_plan(plan, 0)?;
+
+        // compute max depth for each of the columns
+        let mut breaks: HashMap<String, usize> = HashMap::new();

Review comment:
       yes i think you are right, key is guaranteed to be unique for each iteration. hashmap is probably the wrong data structure to use for `break_points`. or we can do the depth comparison directly in `analyze_plan` on every map insertion.

##########
File path: rust/datafusion/src/optimizer/filter_push_down.rs
##########
@@ -0,0 +1,505 @@
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Filter Push Down optimizer rule ensures that filters are applied as early as possible in the plan
+
+use crate::error::Result;
+use crate::logicalplan::Expr;
+use crate::logicalplan::LogicalPlan;
+use crate::optimizer::optimizer::OptimizerRule;
+use crate::optimizer::utils;
+use std::collections::{HashMap, HashSet};
+
+/// Filter Push Down optimizer rule pushes filter clauses down the plan
+///
+/// This optimization looks for the maximum depth of each column in the plan where a filter can be applied and
+/// re-writes the plan with filters on those locations.
+/// It performs two passes on the plan:
+/// 1. identify filters, which columns they use, and projections along the path
+/// 2. move filters down, re-writing the expressions using the projections
+/*
+A filter-commutative operation is a operation whose result of filter(op(data)) = op(filter(data)).
+An example of a filter-commutative operation is a projection; a counter-example is `limit`.
+
+The filter-commutative property is column-specific. An aggregate grouped by A on SUM(B)
+can commute with a filter that depends on A only, but does not commute with a filter that depends
+on SUM(B).
+
+A location in this module is identified by a number, depth, which is 0 for the last operation
+and highest for the first operation (tipically a scan).
+
+This optimizer commutes filters with filter-commutative operations to push the filters
+to the maximum possible depth, consequently re-writing the filter expressions by every
+projection that changes the filter's expression.
+
+    Selection: #b Gt Int64(10)
+        Projection: #a AS b
+
+is optimized to
+
+    Projection: #a AS b
+        Selection: #a Gt Int64(10)  <--- changed from #b to #a
+
+To perform such optimization, we first analyze the plan to identify three items:

Review comment:
       100% on this, great write up :+1: 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on a change in pull request #7880: ARROW-9619: [Rust] [DataFusion] Add predicate push-down

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #7880:
URL: https://github.com/apache/arrow/pull/7880#discussion_r469914297



##########
File path: rust/datafusion/src/optimizer/utils.rs
##########
@@ -183,6 +183,162 @@ fn _get_supertype(l: &DataType, r: &DataType) -> Option<DataType> {
     }
 }
 
+/// returns all expressions in the logical plan.
+pub fn expressions(plan: &LogicalPlan) -> Vec<Expr> {
+    match plan {
+        LogicalPlan::Projection { expr, .. } => expr.clone(),
+        LogicalPlan::Selection { expr, .. } => vec![expr.clone()],
+        LogicalPlan::Aggregate {
+            group_expr,
+            aggr_expr,
+            ..
+        } => {
+            let mut result = group_expr.clone();
+            result.extend(aggr_expr.clone());
+            result
+        }
+        LogicalPlan::Sort { expr, .. } => expr.clone(),
+        // plans without expressions
+        LogicalPlan::TableScan { .. }
+        | LogicalPlan::InMemoryScan { .. }
+        | LogicalPlan::ParquetScan { .. }
+        | LogicalPlan::CsvScan { .. }
+        | LogicalPlan::EmptyRelation { .. }
+        | LogicalPlan::Limit { .. }
+        | LogicalPlan::CreateExternalTable { .. } => vec![],
+    }
+}
+
+/// returns all inputs in the logical plan
+pub fn inputs(plan: &LogicalPlan) -> Vec<&LogicalPlan> {
+    match plan {
+        LogicalPlan::Projection { input, .. } => vec![input],
+        LogicalPlan::Selection { input, .. } => vec![input],
+        LogicalPlan::Aggregate { input, .. } => vec![input],
+        LogicalPlan::Sort { input, .. } => vec![input],
+        LogicalPlan::Limit { input, .. } => vec![input],
+        // plans without inputs
+        LogicalPlan::TableScan { .. }
+        | LogicalPlan::InMemoryScan { .. }
+        | LogicalPlan::ParquetScan { .. }
+        | LogicalPlan::CsvScan { .. }
+        | LogicalPlan::EmptyRelation { .. }
+        | LogicalPlan::CreateExternalTable { .. } => vec![],
+    }
+}
+
+/// Returns a new logical plan based on the original one with inputs and expressions replaced
+pub fn from_plan(
+    plan: &LogicalPlan,
+    expr: &Vec<Expr>,
+    inputs: &Vec<LogicalPlan>,
+) -> Result<LogicalPlan> {
+    match plan {
+        LogicalPlan::Projection { schema, .. } => Ok(LogicalPlan::Projection {
+            expr: expr.clone(),
+            input: Box::new(inputs[0].clone()),
+            schema: schema.clone(),
+        }),
+        LogicalPlan::Selection { .. } => Ok(LogicalPlan::Selection {
+            expr: expr[0].clone(),
+            input: Box::new(inputs[0].clone()),
+        }),
+        LogicalPlan::Aggregate {
+            group_expr, schema, ..
+        } => Ok(LogicalPlan::Aggregate {
+            group_expr: expr[0..group_expr.len()].to_vec(),
+            aggr_expr: expr[group_expr.len()..].to_vec(),
+            input: Box::new(inputs[0].clone()),
+            schema: schema.clone(),
+        }),
+        LogicalPlan::Sort { .. } => Ok(LogicalPlan::Sort {
+            expr: expr.clone(),
+            input: Box::new(inputs[0].clone()),
+        }),
+        LogicalPlan::Limit { n, .. } => Ok(LogicalPlan::Limit {
+            n: *n,
+            input: Box::new(inputs[0].clone()),
+        }),
+        LogicalPlan::EmptyRelation { .. }
+        | LogicalPlan::TableScan { .. }
+        | LogicalPlan::InMemoryScan { .. }
+        | LogicalPlan::ParquetScan { .. }
+        | LogicalPlan::CsvScan { .. }
+        | LogicalPlan::CreateExternalTable { .. } => Ok(plan.clone()),
+    }
+}
+
+/// Returns all expressions composing the expression.
+/// E.g. if the expression is "(a + 1) + 1", it returns ["a + 1", "1"] (as Expr objects)
+pub fn expr_expressions(expr: &Expr) -> Result<Vec<&Expr>> {
+    match expr {
+        Expr::BinaryExpr { left, right, .. } => Ok(vec![left, right]),
+        Expr::IsNull(e) => Ok(vec![e]),
+        Expr::IsNotNull(e) => Ok(vec![e]),
+        Expr::ScalarFunction { args, .. } => Ok(args.iter().collect()),
+        Expr::AggregateFunction { args, .. } => Ok(args.iter().collect()),
+        Expr::Cast { expr, .. } => Ok(vec![expr]),
+        Expr::Column(_) => Ok(vec![]),
+        Expr::Alias(expr, ..) => Ok(vec![expr]),
+        Expr::Literal(_) => Ok(vec![]),
+        Expr::Not(expr) => Ok(vec![expr]),
+        Expr::Sort { expr, .. } => Ok(vec![expr]),
+        Expr::Wildcard { .. } => Err(ExecutionError::General(
+            "Wildcard expressions are not valid in a logical query plan".to_owned(),
+        )),
+        Expr::Nested(expr) => Ok(vec![expr]),
+    }
+}
+
+/// returns a new expression where the expressions in expr are replaced by the ones in `expr`.
+/// This is used in conjunction with ``expr_expressions`` to re-write expressions.
+pub fn from_expression(expr: &Expr, expressions: &Vec<Expr>) -> Result<Expr> {

Review comment:
       Calling this `rewrite_expression` that might make it clearer what this function was doing.

##########
File path: rust/datafusion/src/optimizer/utils.rs
##########
@@ -183,6 +183,162 @@ fn _get_supertype(l: &DataType, r: &DataType) -> Option<DataType> {
     }
 }
 
+/// returns all expressions in the logical plan.
+pub fn expressions(plan: &LogicalPlan) -> Vec<Expr> {
+    match plan {
+        LogicalPlan::Projection { expr, .. } => expr.clone(),
+        LogicalPlan::Selection { expr, .. } => vec![expr.clone()],
+        LogicalPlan::Aggregate {
+            group_expr,
+            aggr_expr,
+            ..
+        } => {
+            let mut result = group_expr.clone();
+            result.extend(aggr_expr.clone());
+            result
+        }
+        LogicalPlan::Sort { expr, .. } => expr.clone(),
+        // plans without expressions
+        LogicalPlan::TableScan { .. }
+        | LogicalPlan::InMemoryScan { .. }
+        | LogicalPlan::ParquetScan { .. }
+        | LogicalPlan::CsvScan { .. }
+        | LogicalPlan::EmptyRelation { .. }
+        | LogicalPlan::Limit { .. }
+        | LogicalPlan::CreateExternalTable { .. } => vec![],
+    }
+}
+
+/// returns all inputs in the logical plan
+pub fn inputs(plan: &LogicalPlan) -> Vec<&LogicalPlan> {
+    match plan {
+        LogicalPlan::Projection { input, .. } => vec![input],
+        LogicalPlan::Selection { input, .. } => vec![input],
+        LogicalPlan::Aggregate { input, .. } => vec![input],
+        LogicalPlan::Sort { input, .. } => vec![input],
+        LogicalPlan::Limit { input, .. } => vec![input],
+        // plans without inputs
+        LogicalPlan::TableScan { .. }
+        | LogicalPlan::InMemoryScan { .. }
+        | LogicalPlan::ParquetScan { .. }
+        | LogicalPlan::CsvScan { .. }
+        | LogicalPlan::EmptyRelation { .. }
+        | LogicalPlan::CreateExternalTable { .. } => vec![],
+    }
+}
+
+/// Returns a new logical plan based on the original one with inputs and expressions replaced
+pub fn from_plan(
+    plan: &LogicalPlan,
+    expr: &Vec<Expr>,
+    inputs: &Vec<LogicalPlan>,
+) -> Result<LogicalPlan> {
+    match plan {
+        LogicalPlan::Projection { schema, .. } => Ok(LogicalPlan::Projection {
+            expr: expr.clone(),
+            input: Box::new(inputs[0].clone()),
+            schema: schema.clone(),
+        }),
+        LogicalPlan::Selection { .. } => Ok(LogicalPlan::Selection {
+            expr: expr[0].clone(),
+            input: Box::new(inputs[0].clone()),
+        }),
+        LogicalPlan::Aggregate {
+            group_expr, schema, ..
+        } => Ok(LogicalPlan::Aggregate {
+            group_expr: expr[0..group_expr.len()].to_vec(),
+            aggr_expr: expr[group_expr.len()..].to_vec(),
+            input: Box::new(inputs[0].clone()),
+            schema: schema.clone(),
+        }),
+        LogicalPlan::Sort { .. } => Ok(LogicalPlan::Sort {
+            expr: expr.clone(),
+            input: Box::new(inputs[0].clone()),
+        }),
+        LogicalPlan::Limit { n, .. } => Ok(LogicalPlan::Limit {
+            n: *n,
+            input: Box::new(inputs[0].clone()),
+        }),
+        LogicalPlan::EmptyRelation { .. }
+        | LogicalPlan::TableScan { .. }
+        | LogicalPlan::InMemoryScan { .. }
+        | LogicalPlan::ParquetScan { .. }
+        | LogicalPlan::CsvScan { .. }
+        | LogicalPlan::CreateExternalTable { .. } => Ok(plan.clone()),
+    }
+}
+
+/// Returns all expressions composing the expression.
+/// E.g. if the expression is "(a + 1) + 1", it returns ["a + 1", "1"] (as Expr objects)
+pub fn expr_expressions(expr: &Expr) -> Result<Vec<&Expr>> {

Review comment:
       ```suggestion
   pub fn expr_sub_expressions(expr: &Expr) -> Result<Vec<&Expr>> {
   ```

##########
File path: rust/datafusion/src/optimizer/utils.rs
##########
@@ -183,6 +183,162 @@ fn _get_supertype(l: &DataType, r: &DataType) -> Option<DataType> {
     }
 }
 
+/// returns all expressions in the logical plan.
+pub fn expressions(plan: &LogicalPlan) -> Vec<Expr> {
+    match plan {
+        LogicalPlan::Projection { expr, .. } => expr.clone(),
+        LogicalPlan::Selection { expr, .. } => vec![expr.clone()],
+        LogicalPlan::Aggregate {
+            group_expr,
+            aggr_expr,
+            ..
+        } => {
+            let mut result = group_expr.clone();
+            result.extend(aggr_expr.clone());
+            result
+        }
+        LogicalPlan::Sort { expr, .. } => expr.clone(),
+        // plans without expressions
+        LogicalPlan::TableScan { .. }
+        | LogicalPlan::InMemoryScan { .. }
+        | LogicalPlan::ParquetScan { .. }
+        | LogicalPlan::CsvScan { .. }
+        | LogicalPlan::EmptyRelation { .. }
+        | LogicalPlan::Limit { .. }
+        | LogicalPlan::CreateExternalTable { .. } => vec![],
+    }
+}
+
+/// returns all inputs in the logical plan
+pub fn inputs(plan: &LogicalPlan) -> Vec<&LogicalPlan> {
+    match plan {
+        LogicalPlan::Projection { input, .. } => vec![input],
+        LogicalPlan::Selection { input, .. } => vec![input],
+        LogicalPlan::Aggregate { input, .. } => vec![input],
+        LogicalPlan::Sort { input, .. } => vec![input],
+        LogicalPlan::Limit { input, .. } => vec![input],
+        // plans without inputs
+        LogicalPlan::TableScan { .. }
+        | LogicalPlan::InMemoryScan { .. }
+        | LogicalPlan::ParquetScan { .. }
+        | LogicalPlan::CsvScan { .. }
+        | LogicalPlan::EmptyRelation { .. }
+        | LogicalPlan::CreateExternalTable { .. } => vec![],
+    }
+}
+
+/// Returns a new logical plan based on the original one with inputs and expressions replaced
+pub fn from_plan(
+    plan: &LogicalPlan,
+    expr: &Vec<Expr>,
+    inputs: &Vec<LogicalPlan>,
+) -> Result<LogicalPlan> {
+    match plan {
+        LogicalPlan::Projection { schema, .. } => Ok(LogicalPlan::Projection {
+            expr: expr.clone(),
+            input: Box::new(inputs[0].clone()),
+            schema: schema.clone(),
+        }),
+        LogicalPlan::Selection { .. } => Ok(LogicalPlan::Selection {
+            expr: expr[0].clone(),
+            input: Box::new(inputs[0].clone()),
+        }),
+        LogicalPlan::Aggregate {
+            group_expr, schema, ..
+        } => Ok(LogicalPlan::Aggregate {
+            group_expr: expr[0..group_expr.len()].to_vec(),
+            aggr_expr: expr[group_expr.len()..].to_vec(),
+            input: Box::new(inputs[0].clone()),
+            schema: schema.clone(),
+        }),
+        LogicalPlan::Sort { .. } => Ok(LogicalPlan::Sort {
+            expr: expr.clone(),
+            input: Box::new(inputs[0].clone()),
+        }),
+        LogicalPlan::Limit { n, .. } => Ok(LogicalPlan::Limit {
+            n: *n,
+            input: Box::new(inputs[0].clone()),
+        }),
+        LogicalPlan::EmptyRelation { .. }
+        | LogicalPlan::TableScan { .. }
+        | LogicalPlan::InMemoryScan { .. }
+        | LogicalPlan::ParquetScan { .. }
+        | LogicalPlan::CsvScan { .. }
+        | LogicalPlan::CreateExternalTable { .. } => Ok(plan.clone()),
+    }
+}
+
+/// Returns all expressions composing the expression.

Review comment:
       ```suggestion
   /// Returns direct children `Expression`s of `expr`.
   ```

##########
File path: rust/datafusion/src/optimizer/utils.rs
##########
@@ -183,6 +183,162 @@ fn _get_supertype(l: &DataType, r: &DataType) -> Option<DataType> {
     }
 }
 
+/// returns all expressions in the logical plan.

Review comment:
       ```suggestion
   /// returns all expressions (non-recursively) in the current logical plan node.
   ```

##########
File path: rust/datafusion/src/optimizer/filter_push_down.rs
##########
@@ -0,0 +1,467 @@
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Filter Push Down optimizer rule ensures that filters are applied as early as possible in the plan
+
+use crate::error::Result;
+use crate::logicalplan::Expr;
+use crate::logicalplan::LogicalPlan;
+use crate::optimizer::optimizer::OptimizerRule;
+use crate::optimizer::utils;
+use std::collections::{HashMap, HashSet};
+
+/// Filter Push Down optimizer rule pushes filter clauses down the plan
+///
+/// This optimization looks for the maximum depth of each column in the plan where a filter can be applied and
+/// re-writes the plan with filters on those locations.
+/// It performs two passes on the plan:
+/// 1. identify filters, which columns they use, and projections along the path
+/// 2. move filters down, re-writing the expressions using the projections
+pub struct FilterPushDown {}
+
+impl OptimizerRule for FilterPushDown {
+    fn optimize(&mut self, plan: &LogicalPlan) -> Result<LogicalPlan> {
+        let (break_points, selections, projections) = analyze_plan(plan, 0)?;
+
+        // compute max depth for each of the columns
+        let mut breaks: HashMap<String, usize> = HashMap::new();
+        for (key, depth) in break_points {
+            match breaks.get(&key) {
+                Some(current_depth) => {
+                    if depth > *current_depth {
+                        breaks.insert(key, depth);
+                    }
+                }
+                None => {
+                    breaks.insert(key, depth);
+                }
+            }
+        }
+
+        // construct optimized position of each of the new selections
+        let mut new_selections: HashMap<usize, Expr> = HashMap::new();
+        for (selection_depth, expr) in selections {
+            let mut columns: HashSet<String> = HashSet::new();
+            utils::expr_to_column_names(&expr, &mut columns)?;
+
+            // compute the depths of each of the observed columns and the respective maximum
+            let depth = columns
+                .iter()
+                .filter_map(|column| breaks.get(column))
+                .max_by_key(|depth| **depth);
+
+            let new_depth = match depth {
+                None => selection_depth,
+                Some(d) => *d,
+            };
+
+            // re-write the new selections based on all projections that it crossed.
+            // E.g. in `Selection: #b\n  Projection: #a > 1 as b`, we can swap them, but the selection must be "#a > 1"
+            let mut new_expression = expr.clone();
+            for depth_i in selection_depth..new_depth {
+                if let Some(projection) = projections.get(&depth_i) {
+                    new_expression = rewrite(&new_expression, projection)?;
+                }
+            }
+
+            new_selections.insert(new_depth, new_expression);
+        }
+
+        optimize_plan(plan, &new_selections, 0)
+    }
+}
+
+/// Recursively transverses the logical plan looking for depths that break filter pushdown
+/// Returns a tuple:
+/// 0: map "column -> depth" of the depth that each column is found up to.
+/// 1: map "depth -> filter expression"
+/// 2: map "depth -> projection"
+fn analyze_plan(
+    plan: &LogicalPlan,
+    depth: usize,
+) -> Result<(
+    HashMap<String, usize>,
+    HashMap<usize, Expr>,
+    HashMap<usize, HashMap<String, Expr>>,
+)> {
+    match plan {
+        LogicalPlan::Selection { input, expr } => {
+            let mut result = analyze_plan(&input, depth + 1)?;
+            result.1.insert(depth, expr.clone());
+            Ok(result)
+        }
+        LogicalPlan::Projection {
+            input,
+            expr,
+            schema,
+        } => {
+            let mut result = analyze_plan(&input, depth + 1)?;
+
+            // collect projection.
+            let mut projection = HashMap::new();
+            schema.fields().iter().enumerate().for_each(|(i, field)| {
+                // strip alias, as they should not be part of selections
+                let expr = match &expr[i] {
+                    Expr::Alias(expr, _) => expr.as_ref().clone(),
+                    expr => expr.clone(),
+                };
+
+                projection.insert(field.name().clone(), expr);
+            });
+            result.2.insert(depth, projection);
+            Ok(result)
+        }
+        LogicalPlan::Aggregate {
+            input, aggr_expr, ..
+        } => {
+            let mut result = analyze_plan(&input, depth + 1)?;
+
+            let mut accum = HashSet::new();
+            utils::exprlist_to_column_names(aggr_expr, &mut accum)?;
+
+            accum.iter().for_each(|x: &String| {
+                result.0.insert(x.clone(), depth);
+            });
+
+            Ok(result)
+        }
+        LogicalPlan::Sort { input, .. } => analyze_plan(&input, depth + 1),
+        LogicalPlan::Limit { input, .. } => {
+            let mut result = analyze_plan(&input, depth + 1)?;
+
+            // pick all fields in the input schema of this limit: none of them can be used after this limit.
+            input.schema().fields().iter().for_each(|x| {
+                result.0.insert(x.name().clone(), depth);
+            });
+            Ok(result)
+        }
+        // all other plans add breaks to all their columns to indicate that filters can't proceed further.
+        _ => {
+            let mut result = HashMap::new();
+            plan.schema().fields().iter().for_each(|x| {
+                result.insert(x.name().clone(), depth);
+            });
+            Ok((result, HashMap::new(), HashMap::new()))
+        }
+    }
+}
+
+impl FilterPushDown {
+    #[allow(missing_docs)]
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+/// Returns a re-written logical plan where all old filters are removed and the new ones are added.
+fn optimize_plan(
+    plan: &LogicalPlan,
+    new_selections: &HashMap<usize, Expr>,
+    depth: usize,
+) -> Result<LogicalPlan> {
+    // optimize the plan recursively:
+    let new_plan = match plan {
+        LogicalPlan::Selection { input, .. } => {
+            // ignore old selections
+            Ok(optimize_plan(&input, new_selections, depth + 1)?)
+        }
+        _ => {
+            // all other nodes are copied, optimizing recursively.
+            let expr = utils::expressions(plan);
+
+            let inputs = utils::inputs(plan);
+            let new_inputs = inputs
+                .iter()
+                .map(|plan| optimize_plan(plan, new_selections, depth + 1))
+                .collect::<Result<Vec<_>>>()?;
+
+            utils::from_plan(plan, &expr, &new_inputs)
+        }
+    }?;
+
+    // if a new selection is to be applied, apply it
+    if let Some(expr) = new_selections.get(&depth) {
+        return Ok(LogicalPlan::Selection {
+            expr: expr.clone(),
+            input: Box::new(new_plan),
+        });
+    } else {
+        Ok(new_plan)
+    }
+}
+
+/// replaces columns by its name on the projection.
+fn rewrite(expr: &Expr, projection: &HashMap<String, Expr>) -> Result<Expr> {
+    let expressions = utils::expr_expressions(&expr)?;
+
+    let expressions = expressions
+        .iter()
+        .map(|e| rewrite(e, &projection))
+        .collect::<Result<Vec<_>>>()?;
+
+    match expr {
+        Expr::Column(name) => {
+            if let Some(expr) = projection.get(name) {
+                return Ok(expr.clone());
+            }
+        }
+        _ => {}
+    }
+
+    utils::from_expression(&expr, &expressions)
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::logicalplan::col;
+    use crate::logicalplan::ScalarValue;
+    use crate::logicalplan::{aggregate_expr, lit, Expr, LogicalPlanBuilder, Operator};
+    use crate::test::*;
+    use arrow::datatypes::DataType;
+
+    fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
+        let mut rule = FilterPushDown::new();
+        let optimized_plan = rule.optimize(plan).expect("failed to optimize plan");
+        let formatted_plan = format!("{:?}", optimized_plan);
+        assert_eq!(formatted_plan, expected);
+    }
+
+    #[test]
+    fn filter_before_projection() -> Result<()> {
+        let table_scan = test_table_scan()?;
+        let plan = LogicalPlanBuilder::from(&table_scan)
+            .project(vec![col("a"), col("b")])?
+            .filter(col("a").eq(&Expr::Literal(ScalarValue::Int64(1))))?
+            .build()?;
+        // selection is before projection
+        let expected = "\
+            Projection: #a, #b\
+            \n  Selection: #a Eq Int64(1)\
+            \n    TableScan: test projection=None";
+        assert_optimized_plan_eq(&plan, expected);
+        Ok(())
+    }
+
+    #[test]
+    fn filter_after_limit() -> Result<()> {
+        let table_scan = test_table_scan()?;
+        let plan = LogicalPlanBuilder::from(&table_scan)
+            .project(vec![col("a"), col("b")])?
+            .limit(10)?
+            .filter(col("a").eq(&Expr::Literal(ScalarValue::Int64(1))))?
+            .build()?;
+        // selection is before single projection
+        let expected = "\
+            Selection: #a Eq Int64(1)\
+            \n  Limit: 10\
+            \n    Projection: #a, #b\
+            \n      TableScan: test projection=None";
+        assert_optimized_plan_eq(&plan, expected);
+        Ok(())
+    }
+
+    #[test]
+    fn filter_jump_2_plans() -> Result<()> {
+        let table_scan = test_table_scan()?;
+        let plan = LogicalPlanBuilder::from(&table_scan)
+            .project(vec![col("a"), col("b"), col("c")])?
+            .project(vec![col("c"), col("b")])?
+            .filter(col("a").eq(&Expr::Literal(ScalarValue::Int64(1))))?
+            .build()?;
+        // selection is before double projection
+        let expected = "\
+            Projection: #c, #b\
+            \n  Projection: #a, #b, #c\
+            \n    Selection: #a Eq Int64(1)\
+            \n      TableScan: test projection=None";
+        assert_optimized_plan_eq(&plan, expected);
+        Ok(())
+    }
+
+    #[test]
+    fn filter_move_agg() -> Result<()> {
+        let table_scan = test_table_scan()?;
+        let plan = LogicalPlanBuilder::from(&table_scan)
+            .aggregate(
+                vec![col("a")],
+                vec![aggregate_expr("SUM", col("b"), DataType::Int32)
+                    .alias("total_salary")],
+            )?
+            .filter(col("a").gt(&Expr::Literal(ScalarValue::Int64(10))))?
+            .build()?;
+        // selection of key aggregation is commutative
+        let expected = "\
+            Aggregate: groupBy=[[#a]], aggr=[[SUM(#b) AS total_salary]]\
+            \n  Selection: #a Gt Int64(10)\
+            \n    TableScan: test projection=None";
+        assert_optimized_plan_eq(&plan, expected);
+        Ok(())
+    }
+
+    #[test]
+    fn filter_keep_agg() -> Result<()> {
+        let table_scan = test_table_scan()?;
+        let plan = LogicalPlanBuilder::from(&table_scan)
+            .aggregate(
+                vec![col("a")],
+                vec![aggregate_expr("SUM", col("b"), DataType::Int32).alias("b")],
+            )?
+            .filter(col("b").gt(&Expr::Literal(ScalarValue::Int64(10))))?
+            .build()?;
+        // selection of aggregate is after aggregation since they are non-commutative
+        let expected = "\
+            Selection: #b Gt Int64(10)\
+            \n  Aggregate: groupBy=[[#a]], aggr=[[SUM(#b) AS b]]\
+            \n    TableScan: test projection=None";
+        assert_optimized_plan_eq(&plan, expected);
+        Ok(())
+    }
+
+    /// verifies that a filter is pushed to before a projection, the filter expression is correctly re-written
+    #[test]
+    fn alias() -> Result<()> {
+        let table_scan = test_table_scan()?;
+        let plan = LogicalPlanBuilder::from(&table_scan)
+            .project(vec![col("a").alias("b"), col("c")])?
+            .filter(col("b").eq(&Expr::Literal(ScalarValue::Int64(1))))?
+            .build()?;
+        // selection is before projection
+        let expected = "\
+            Projection: #a AS b, #c\
+            \n  Selection: #a Eq Int64(1)\
+            \n    TableScan: test projection=None";
+        assert_optimized_plan_eq(&plan, expected);
+        Ok(())
+    }
+
+    fn add(left: Expr, right: Expr) -> Expr {
+        Expr::BinaryExpr {
+            left: Box::new(left),
+            op: Operator::Plus,
+            right: Box::new(right),
+        }
+    }
+
+    fn multiply(left: Expr, right: Expr) -> Expr {
+        Expr::BinaryExpr {
+            left: Box::new(left),
+            op: Operator::Multiply,
+            right: Box::new(right),
+        }
+    }
+
+    /// verifies that a filter is pushed to before a projection with a complex expression, the filter expression is correctly re-written
+    #[test]
+    fn complex_expression() -> Result<()> {
+        let table_scan = test_table_scan()?;
+        let plan = LogicalPlanBuilder::from(&table_scan)
+            .project(vec![
+                add(multiply(col("a"), lit(2)), col("c")).alias("b"),
+                col("c"),
+            ])?
+            .filter(col("b").eq(&Expr::Literal(ScalarValue::Int64(1))))?
+            .build()?;
+
+        // not part of the test, just good to know:
+        assert_eq!(
+            format!("{:?}", plan),
+            "\
+            Selection: #b Eq Int64(1)\
+            \n  Projection: #a Multiply Int32(2) Plus #c AS b, #c\
+            \n    TableScan: test projection=None"
+        );
+
+        // selection is before projection
+        let expected = "\
+            Projection: #a Multiply Int32(2) Plus #c AS b, #c\
+            \n  Selection: #a Multiply Int32(2) Plus #c Eq Int64(1)\
+            \n    TableScan: test projection=None";
+        assert_optimized_plan_eq(&plan, expected);
+        Ok(())
+    }
+
+    /// verifies that when a filter is pushed to after 2 projections, the filter expression is correctly re-written
+    #[test]
+    fn complex_plan() -> Result<()> {
+        let table_scan = test_table_scan()?;
+        let plan = LogicalPlanBuilder::from(&table_scan)
+            .project(vec![
+                add(multiply(col("a"), lit(2)), col("c")).alias("b"),
+                col("c"),
+            ])?
+            // second projection where we rename columns, just to make it difficult
+            .project(vec![multiply(col("b"), lit(3)).alias("a"), col("c")])?
+            .filter(col("a").eq(&Expr::Literal(ScalarValue::Int64(1))))?
+            .build()?;
+
+        // not part of the test, just good to know:

Review comment:
       👍 

##########
File path: rust/datafusion/src/optimizer/type_coercion.rs
##########
@@ -43,138 +45,77 @@ impl<'a> TypeCoercionRule<'a> {
         Self { scalar_functions }
     }
 
-    /// Rewrite an expression list to include explicit CAST operations when required
-    fn rewrite_expr_list(&self, expr: &[Expr], schema: &Schema) -> Result<Vec<Expr>> {
-        Ok(expr
+    /// Rewrite an expression to include explicit CAST operations when required
+    fn rewrite_expr(&self, expr: &Expr, schema: &Schema) -> Result<Expr> {
+        let expressions = utils::expr_expressions(expr)?;
+
+        // recurse of the re-write
+        let mut expressions = expressions
             .iter()
             .map(|e| self.rewrite_expr(e, schema))
-            .collect::<Result<Vec<_>>>()?)
-    }
+            .collect::<Result<Vec<_>>>()?;
 
-    /// Rewrite an expression to include explicit CAST operations when required
-    fn rewrite_expr(&self, expr: &Expr, schema: &Schema) -> Result<Expr> {
+        // modify `expressions` by introducing casts when necessary
         match expr {
-            Expr::BinaryExpr { left, op, right } => {
-                let left = self.rewrite_expr(left, schema)?;
-                let right = self.rewrite_expr(right, schema)?;
-                let left_type = left.get_type(schema)?;
-                let right_type = right.get_type(schema)?;
-                if left_type == right_type {
-                    Ok(Expr::BinaryExpr {
-                        left: Box::new(left),
-                        op: op.clone(),
-                        right: Box::new(right),
-                    })
-                } else {
+            Expr::BinaryExpr { .. } => {
+                let left_type = expressions[0].get_type(schema)?;
+                let right_type = expressions[1].get_type(schema)?;
+                if left_type != right_type {
                     let super_type = utils::get_supertype(&left_type, &right_type)?;
-                    Ok(Expr::BinaryExpr {
-                        left: Box::new(left.cast_to(&super_type, schema)?),
-                        op: op.clone(),
-                        right: Box::new(right.cast_to(&super_type, schema)?),
-                    })
+
+                    expressions[0] = expressions[0].cast_to(&super_type, schema)?;
+                    expressions[1] = expressions[1].cast_to(&super_type, schema)?;
                 }
             }
-            Expr::IsNull(e) => Ok(Expr::IsNull(Box::new(self.rewrite_expr(e, schema)?))),
-            Expr::IsNotNull(e) => {
-                Ok(Expr::IsNotNull(Box::new(self.rewrite_expr(e, schema)?)))
-            }
-            Expr::ScalarFunction {
-                name,
-                args,
-                return_type,
-            } => {
+            Expr::ScalarFunction { name, .. } => {
                 // cast the inputs of scalar functions to the appropriate type where possible
                 match self.scalar_functions.get(name) {
                     Some(func_meta) => {
-                        let mut func_args = Vec::with_capacity(args.len());
-                        for i in 0..args.len() {
+                        for i in 0..expressions.len() {
                             let field = &func_meta.args[i];
-                            let expr = self.rewrite_expr(&args[i], schema)?;
-                            let actual_type = expr.get_type(schema)?;
+                            let actual_type = expressions[i].get_type(schema)?;
                             let required_type = field.data_type();
-                            if &actual_type == required_type {
-                                func_args.push(expr)
-                            } else {
+                            if &actual_type != required_type {
                                 let super_type =
                                     utils::get_supertype(&actual_type, required_type)?;
-                                func_args.push(expr.cast_to(&super_type, schema)?);
-                            }
+                                expressions[i] =
+                                    expressions[i].cast_to(&super_type, schema)?
+                            };
                         }
-
-                        Ok(Expr::ScalarFunction {
-                            name: name.clone(),
-                            args: func_args,
-                            return_type: return_type.clone(),
-                        })
                     }
-                    _ => Err(ExecutionError::General(format!(
-                        "Invalid scalar function {}",
-                        name
-                    ))),
+                    _ => {
+                        return Err(ExecutionError::General(format!(
+                            "Invalid scalar function {}",
+                            name
+                        )))
+                    }
                 }
             }
-            Expr::AggregateFunction {
-                name,
-                args,
-                return_type,
-            } => Ok(Expr::AggregateFunction {
-                name: name.clone(),
-                args: args
-                    .iter()
-                    .map(|a| self.rewrite_expr(a, schema))
-                    .collect::<Result<Vec<_>>>()?,
-                return_type: return_type.clone(),
-            }),
-            Expr::Cast { .. } => Ok(expr.clone()),
-            Expr::Column(_) => Ok(expr.clone()),
-            Expr::Alias(expr, alias) => Ok(Expr::Alias(
-                Box::new(self.rewrite_expr(expr, schema)?),
-                alias.to_owned(),
-            )),
-            Expr::Literal(_) => Ok(expr.clone()),
-            Expr::Not(_) => Ok(expr.clone()),
-            Expr::Sort { .. } => Ok(expr.clone()),
-            Expr::Wildcard { .. } => Err(ExecutionError::General(
-                "Wildcard expressions are not valid in a logical query plan".to_owned(),
-            )),
-            Expr::Nested(e) => self.rewrite_expr(e, schema),
-        }
+            _ => {}
+        };
+        utils::from_expression(expr, &expressions)
     }
 }
 
 impl<'a> OptimizerRule for TypeCoercionRule<'a> {
     fn optimize(&mut self, plan: &LogicalPlan) -> Result<LogicalPlan> {
-        match plan {
-            LogicalPlan::Projection { expr, input, .. } => {
-                LogicalPlanBuilder::from(&self.optimize(input)?)
-                    .project(self.rewrite_expr_list(expr, input.schema())?)?
-                    .build()
-            }
-            LogicalPlan::Selection { expr, input, .. } => {
-                LogicalPlanBuilder::from(&self.optimize(input)?)
-                    .filter(self.rewrite_expr(expr, input.schema())?)?
-                    .build()
-            }
-            LogicalPlan::Aggregate {
-                input,
-                group_expr,
-                aggr_expr,
-                ..
-            } => LogicalPlanBuilder::from(&self.optimize(input)?)
-                .aggregate(
-                    self.rewrite_expr_list(group_expr, input.schema())?,
-                    self.rewrite_expr_list(aggr_expr, input.schema())?,
-                )?
-                .build(),
-            LogicalPlan::TableScan { .. } => Ok(plan.clone()),
-            LogicalPlan::InMemoryScan { .. } => Ok(plan.clone()),
-            LogicalPlan::ParquetScan { .. } => Ok(plan.clone()),
-            LogicalPlan::CsvScan { .. } => Ok(plan.clone()),
-            LogicalPlan::EmptyRelation { .. } => Ok(plan.clone()),
-            LogicalPlan::Limit { .. } => Ok(plan.clone()),
-            LogicalPlan::Sort { .. } => Ok(plan.clone()),
-            LogicalPlan::CreateExternalTable { .. } => Ok(plan.clone()),
-        }
+        let inputs = utils::inputs(plan);
+        let expressions = utils::expressions(plan);
+
+        // apply the optimization to all inputs of the plan
+        let new_inputs = inputs
+            .iter()
+            .map(|plan| self.optimize(*plan))
+            .collect::<Result<Vec<_>>>()?;
+        // re-write all expressions on this plan.
+        // This assumes a single input, [0]. It wont work for join, subqueries and union operations with more than one input.
+        // It is currently not an issue as we do not have any plan with more than one input.
+        let new_expressions = expressions

Review comment:
       Do you have to check for the case here where the `LogicalPlan` node has no inputs?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on a change in pull request #7880: ARROW-9619: [Rust] [DataFusion] Add predicate push-down

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on a change in pull request #7880:
URL: https://github.com/apache/arrow/pull/7880#discussion_r471007087



##########
File path: rust/datafusion/src/optimizer/filter_push_down.rs
##########
@@ -0,0 +1,505 @@
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Filter Push Down optimizer rule ensures that filters are applied as early as possible in the plan
+
+use crate::error::Result;
+use crate::logicalplan::Expr;
+use crate::logicalplan::LogicalPlan;
+use crate::optimizer::optimizer::OptimizerRule;
+use crate::optimizer::utils;
+use std::collections::{HashMap, HashSet};
+
+/// Filter Push Down optimizer rule pushes filter clauses down the plan
+///
+/// This optimization looks for the maximum depth of each column in the plan where a filter can be applied and
+/// re-writes the plan with filters on those locations.
+/// It performs two passes on the plan:
+/// 1. identify filters, which columns they use, and projections along the path
+/// 2. move filters down, re-writing the expressions using the projections
+/*
+A filter-commutative operation is a operation whose result of filter(op(data)) = op(filter(data)).
+An example of a filter-commutative operation is a projection; a counter-example is `limit`.
+
+The filter-commutative property is column-specific. An aggregate grouped by A on SUM(B)
+can commute with a filter that depends on A only, but does not commute with a filter that depends
+on SUM(B).
+
+A location in this module is identified by a number, depth, which is 0 for the last operation
+and highest for the first operation (tipically a scan).
+
+This optimizer commutes filters with filter-commutative operations to push the filters
+to the maximum possible depth, consequently re-writing the filter expressions by every
+projection that changes the filter's expression.
+
+    Selection: #b Gt Int64(10)
+        Projection: #a AS b
+
+is optimized to
+
+    Projection: #a AS b
+        Selection: #a Gt Int64(10)  <--- changed from #b to #a
+
+To perform such optimization, we first analyze the plan to identify three items:
+
+1. Where are the filters located in the plan
+2. Where are non-commutable operations' columns located in the plan (break_points)
+3. Where are projections located in the plan
+
+With this information, we re-write the plan by:
+
+1. Computing the maximum possible depth of each column
+2. Computing the maximum possible depth of each filter expression based on the columns it depends on
+3. re-write the filter expression for every projection that it commutes with from its original depth to its max possible depth
+*/
+pub struct FilterPushDown {}
+
+impl OptimizerRule for FilterPushDown {
+    fn optimize(&mut self, plan: &LogicalPlan) -> Result<LogicalPlan> {
+        let (break_points, selections, projections) = analyze_plan(plan, 0)?;
+
+        // compute max depth for each of the columns
+        let mut breaks: HashMap<String, usize> = HashMap::new();

Review comment:
       The hashmap is updated on the analyze to be the lowest depth. I pushed another test just to make sure that we do not mix up depths.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on pull request #7880: ARROW-9619: [Rust] [DataFusion] Add predicate push-down

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on pull request #7880:
URL: https://github.com/apache/arrow/pull/7880#issuecomment-674211939


   @alamb , I have added a comment describing the algorithm. Could you take a look and evaluate if it helps at understanding the underlying code?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] houqp commented on pull request #7880: ARROW-9619: [Rust] [DataFusion] Add predicate push-down

Posted by GitBox <gi...@apache.org>.
houqp commented on pull request #7880:
URL: https://github.com/apache/arrow/pull/7880#issuecomment-674481844


   Something that can be left for future optimization: we can also go the other direction, i.e. break `And` filters into into individual boolean expressions so these filters can be partially pushed further down the plan.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on pull request #7880: ARROW-9619: [Rust] [DataFusion] Add predicate push-down

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on pull request #7880:
URL: https://github.com/apache/arrow/pull/7880#issuecomment-674472125


   @alamb, @houqp @andygrove , I think that this is ready to re-review.
   
   I modified the result returned by analyze to ensure that we do not lose relevant information (that lead to the error @alamb found).
   
   I also found and fixed another error related to the placement of two filters in the same depth, that caused filters to be dropped: their expressions are now `ANDed` instead, which has the added bonus of gobbling filters together whenever possible.
   
   All the changes are in new commits, in case it is easier for the review.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] github-actions[bot] commented on pull request #7880: ARROW-9619: [Rust] [DataFusion] Add predicate push-down

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #7880:
URL: https://github.com/apache/arrow/pull/7880#issuecomment-667668376


   https://issues.apache.org/jira/browse/ARROW-9619


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on a change in pull request #7880: ARROW-9619: [Rust] [DataFusion] Add predicate push-down

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on a change in pull request #7880:
URL: https://github.com/apache/arrow/pull/7880#discussion_r471887798



##########
File path: rust/datafusion/src/optimizer/filter_push_down.rs
##########
@@ -0,0 +1,631 @@
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Filter Push Down optimizer rule ensures that filters are applied as early as possible in the plan
+
+use crate::error::Result;
+use crate::logicalplan::Expr;
+use crate::logicalplan::{and, LogicalPlan};
+use crate::optimizer::optimizer::OptimizerRule;
+use crate::optimizer::utils;
+use std::collections::{BTreeMap, HashMap, HashSet};
+
+/// Filter Push Down optimizer rule pushes filter clauses down the plan
+///
+/// This optimization looks for the maximum depth of each column in the plan where a filter can be applied and
+/// re-writes the plan with filters on those locations.
+/// It performs two passes on the plan:
+/// 1. identify filters, which columns they use, and projections along the path
+/// 2. move filters down, re-writing the expressions using the projections
+/*
+A filter-commutative operation is an operation whose result of filter(op(data)) = op(filter(data)).
+An example of a filter-commutative operation is a projection; a counter-example is `limit`.
+
+The filter-commutative property is column-specific. An aggregate grouped by A on SUM(B)
+can commute with a filter that depends on A only, but does not commute with a filter that depends
+on SUM(B).
+
+A location in this module is identified by a number, depth, which is 0 for the last operation
+and highest for the first operation (typically a scan).
+
+This optimizer commutes filters with filter-commutative operations to push the filters
+to the maximum possible depth, consequently re-writing the filter expressions by every
+projection that changes the filter's expression.
+
+    Selection: #b Gt Int64(10)
+        Projection: #a AS b
+
+is optimized to
+
+    Projection: #a AS b
+        Selection: #a Gt Int64(10)  <--- changed from #b to #a
+
+To perform such optimization, we first analyze the plan to identify three items:
+
+1. Where are the filters located in the plan
+2. Where are non-commutable operations' columns located in the plan (break_points)
+3. Where are projections located in the plan
+
+With this information, we re-write the plan by:
+
+1. Computing the maximum possible depth of each column between breakpoints
+2. Computing the maximum possible depth of each filter expression based on the columns it depends on
+3. re-write the filter expression for every projection that it commutes with from its original depth to its max possible depth
+4. recursively re-write the plan by deleting old filter expressions and adding new filter expressions on their max possible depth.
+*/
+pub struct FilterPushDown {}
+
+impl OptimizerRule for FilterPushDown {
+    fn name(&self) -> &str {
+        return "filter_push_down";
+    }
+
+    fn optimize(&mut self, plan: &LogicalPlan) -> Result<LogicalPlan> {
+        let result = analyze_plan(plan, 0)?;
+        let break_points = result.break_points.clone();
+
+        // get max depth over all breakpoints
+        let max_depth = break_points.keys().max();
+        if max_depth.is_none() {
+            // it is unlikely that the plan is correct without break points as all scans
+            // adds breakpoints. We just return the plan and let others handle the error
+            return Ok(plan.clone());
+        }
+        let max_depth = *max_depth.unwrap(); // unwrap is safe by previous if
+
+        // construct optimized position of each of the new selections
+        // E.g. when we have a filter (c1 + c2 > 2), c1's max depth is 10 and c2 is 11, we
+        // can push the filter to depth 10
+        let mut new_selections: BTreeMap<usize, Expr> = BTreeMap::new();
+        for (selection_depth, expr) in result.selections {
+            // get all columns on the filter expression
+            let mut selection_columns: HashSet<String> = HashSet::new();
+            utils::expr_to_column_names(&expr, &mut selection_columns)?;
+
+            // identify the depths that are filter-commutable with this selection
+            let mut new_depth = selection_depth;
+            for depth in selection_depth..max_depth {
+                if let Some(break_columns) = break_points.get(&depth) {
+                    if selection_columns
+                        .intersection(break_columns)
+                        .peekable()
+                        .peek()
+                        .is_none()
+                    {
+                        new_depth += 1
+                    } else {
+                        // non-commutable: can't advance any further
+                        break;
+                    }
+                } else {
+                    new_depth += 1
+                }
+            }
+
+            // re-write the new selections based on all projections that it crossed.
+            // E.g. in `Selection: #b\n  Projection: #a > 1 as b`, we can swap them, but the selection must be "#a > 1"
+            let mut new_expression = expr.clone();
+            for depth_i in selection_depth..new_depth {
+                if let Some(projection) = result.projections.get(&depth_i) {
+                    new_expression = rewrite(&new_expression, projection)?;
+                }
+            }
+
+            // AND filter expressions that would be placed on the same depth
+            if let Some(existing_expression) = new_selections.get(&new_depth) {
+                new_expression = and(existing_expression, &new_expression)
+            }
+            new_selections.insert(new_depth, new_expression);
+        }
+
+        optimize_plan(plan, &new_selections, 0)
+    }
+}
+
+/// The result of a plan analysis suitable to perform a filter push down optimization
+// BTreeMap are ordered, which ensures stability in ordered operations.
+// Also, most inserts here are at the end
+struct AnalysisResult {
+    /// maps the depths of non filter-commutative nodes to their columns
+    /// depths not in here indicate that the node is commutative
+    pub break_points: BTreeMap<usize, HashSet<String>>,
+    /// maps the depths of filter nodes to expressions
+    pub selections: BTreeMap<usize, Expr>,
+    /// maps the depths of projection nodes to their expressions
+    pub projections: BTreeMap<usize, HashMap<String, Expr>>,
+}
+
+/// Recursively transverses the logical plan looking for depths that break filter pushdown
+fn analyze_plan(plan: &LogicalPlan, depth: usize) -> Result<AnalysisResult> {
+    match plan {
+        LogicalPlan::Selection { input, expr } => {
+            let mut result = analyze_plan(&input, depth + 1)?;
+            result.selections.insert(depth, expr.clone());
+            Ok(result)
+        }
+        LogicalPlan::Projection {
+            input,
+            expr,
+            schema,
+        } => {
+            let mut result = analyze_plan(&input, depth + 1)?;
+
+            // collect projection.
+            let mut projection = HashMap::new();
+            schema.fields().iter().enumerate().for_each(|(i, field)| {
+                // strip alias, as they should not be part of selections
+                let expr = match &expr[i] {
+                    Expr::Alias(expr, _) => expr.as_ref().clone(),
+                    expr => expr.clone(),
+                };
+
+                projection.insert(field.name().clone(), expr);
+            });
+            result.projections.insert(depth, projection);
+            Ok(result)
+        }
+        LogicalPlan::Aggregate {
+            input, aggr_expr, ..
+        } => {
+            let mut result = analyze_plan(&input, depth + 1)?;
+
+            // construct set of columns that `aggr_expr` depends on
+            let mut agg_columns = HashSet::new();
+            utils::exprlist_to_column_names(aggr_expr, &mut agg_columns)?;
+
+            // collect all columns that break at this depth:
+            // * columns whose aggregation expression depends on
+            // * the aggregation columns themselves
+            let mut columns = agg_columns.iter().cloned().collect::<HashSet<_>>();
+            let agg_columns = aggr_expr
+                .iter()
+                .map(|x| x.name(input.schema()))
+                .collect::<Result<HashSet<_>>>()?;
+            columns.extend(agg_columns);
+            result.break_points.insert(depth, columns);
+
+            Ok(result)
+        }
+        LogicalPlan::Sort { input, .. } => analyze_plan(&input, depth + 1),
+        LogicalPlan::Limit { input, .. } => {
+            let mut result = analyze_plan(&input, depth + 1)?;
+
+            // collect all columns that break at this depth
+            let columns = input
+                .schema()
+                .fields()
+                .iter()
+                .map(|f| f.name().clone())
+                .collect::<HashSet<_>>();
+            result.break_points.insert(depth, columns);
+            Ok(result)
+        }
+        // all other plans add breaks to all their columns to indicate that filters can't proceed further.
+        _ => {
+            let columns = plan
+                .schema()
+                .fields()
+                .iter()
+                .map(|f| f.name().clone())
+                .collect::<HashSet<_>>();
+            let mut break_points = BTreeMap::new();
+
+            break_points.insert(depth, columns);
+            Ok(AnalysisResult {
+                break_points,
+                selections: BTreeMap::new(),
+                projections: BTreeMap::new(),
+            })
+        }
+    }
+}
+
+impl FilterPushDown {
+    #[allow(missing_docs)]
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+/// Returns a re-written logical plan where all old filters are removed and the new ones are added.
+fn optimize_plan(
+    plan: &LogicalPlan,
+    new_selections: &BTreeMap<usize, Expr>,
+    depth: usize,
+) -> Result<LogicalPlan> {
+    // optimize the plan recursively:
+    let new_plan = match plan {
+        LogicalPlan::Selection { input, .. } => {
+            // ignore old selections
+            Ok(optimize_plan(&input, new_selections, depth + 1)?)
+        }
+        _ => {
+            // all other nodes are copied, optimizing recursively.
+            let expr = utils::expressions(plan);
+
+            let inputs = utils::inputs(plan);
+            let new_inputs = inputs
+                .iter()
+                .map(|plan| optimize_plan(plan, new_selections, depth + 1))
+                .collect::<Result<Vec<_>>>()?;
+
+            utils::from_plan(plan, &expr, &new_inputs)
+        }
+    }?;
+
+    // if a new selection is to be applied, apply it
+    if let Some(expr) = new_selections.get(&depth) {
+        return Ok(LogicalPlan::Selection {
+            expr: expr.clone(),
+            input: Box::new(new_plan),
+        });
+    } else {
+        Ok(new_plan)
+    }
+}
+
+/// replaces columns by its name on the projection.
+fn rewrite(expr: &Expr, projection: &HashMap<String, Expr>) -> Result<Expr> {
+    let expressions = utils::expr_sub_expressions(&expr)?;
+
+    let expressions = expressions
+        .iter()
+        .map(|e| rewrite(e, &projection))
+        .collect::<Result<Vec<_>>>()?;
+
+    match expr {
+        Expr::Column(name) => {
+            if let Some(expr) = projection.get(name) {
+                return Ok(expr.clone());
+            }
+        }
+        _ => {}
+    }
+
+    utils::rewrite_expression(&expr, &expressions)
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::logicalplan::col;
+    use crate::logicalplan::ScalarValue;
+    use crate::logicalplan::{aggregate_expr, lit, Expr, LogicalPlanBuilder, Operator};
+    use crate::test::*;
+    use arrow::datatypes::DataType;
+
+    fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
+        let mut rule = FilterPushDown::new();
+        let optimized_plan = rule.optimize(plan).expect("failed to optimize plan");
+        let formatted_plan = format!("{:?}", optimized_plan);
+        assert_eq!(formatted_plan, expected);
+    }
+
+    #[test]
+    fn filter_before_projection() -> Result<()> {
+        let table_scan = test_table_scan()?;
+        let plan = LogicalPlanBuilder::from(&table_scan)
+            .project(vec![col("a"), col("b")])?
+            .filter(col("a").eq(&Expr::Literal(ScalarValue::Int64(1))))?
+            .build()?;
+        // selection is before projection
+        let expected = "\
+            Projection: #a, #b\
+            \n  Selection: #a Eq Int64(1)\
+            \n    TableScan: test projection=None";
+        assert_optimized_plan_eq(&plan, expected);
+        Ok(())
+    }
+
+    #[test]
+    fn filter_after_limit() -> Result<()> {
+        let table_scan = test_table_scan()?;
+        let plan = LogicalPlanBuilder::from(&table_scan)
+            .project(vec![col("a"), col("b")])?
+            .limit(10)?
+            .filter(col("a").eq(&Expr::Literal(ScalarValue::Int64(1))))?
+            .build()?;
+        // selection is before single projection
+        let expected = "\
+            Selection: #a Eq Int64(1)\
+            \n  Limit: 10\
+            \n    Projection: #a, #b\
+            \n      TableScan: test projection=None";
+        assert_optimized_plan_eq(&plan, expected);
+        Ok(())
+    }
+
+    #[test]
+    fn filter_jump_2_plans() -> Result<()> {
+        let table_scan = test_table_scan()?;
+        let plan = LogicalPlanBuilder::from(&table_scan)
+            .project(vec![col("a"), col("b"), col("c")])?
+            .project(vec![col("c"), col("b")])?
+            .filter(col("a").eq(&Expr::Literal(ScalarValue::Int64(1))))?
+            .build()?;
+        // selection is before double projection
+        let expected = "\
+            Projection: #c, #b\
+            \n  Projection: #a, #b, #c\
+            \n    Selection: #a Eq Int64(1)\
+            \n      TableScan: test projection=None";
+        assert_optimized_plan_eq(&plan, expected);
+        Ok(())
+    }
+
+    #[test]
+    fn filter_move_agg() -> Result<()> {
+        let table_scan = test_table_scan()?;
+        let plan = LogicalPlanBuilder::from(&table_scan)
+            .aggregate(
+                vec![col("a")],
+                vec![aggregate_expr("SUM", col("b"), DataType::Int32)
+                    .alias("total_salary")],
+            )?
+            .filter(col("a").gt(&Expr::Literal(ScalarValue::Int64(10))))?
+            .build()?;
+        // selection of key aggregation is commutative
+        let expected = "\
+            Aggregate: groupBy=[[#a]], aggr=[[SUM(#b) AS total_salary]]\
+            \n  Selection: #a Gt Int64(10)\
+            \n    TableScan: test projection=None";
+        assert_optimized_plan_eq(&plan, expected);
+        Ok(())
+    }
+
+    #[test]
+    fn filter_keep_agg() -> Result<()> {
+        let table_scan = test_table_scan()?;
+        let plan = LogicalPlanBuilder::from(&table_scan)
+            .aggregate(
+                vec![col("a")],
+                vec![aggregate_expr("SUM", col("b"), DataType::Int32).alias("b")],
+            )?
+            .filter(col("b").gt(&Expr::Literal(ScalarValue::Int64(10))))?
+            .build()?;
+        // selection of aggregate is after aggregation since they are non-commutative
+        let expected = "\
+            Selection: #b Gt Int64(10)\
+            \n  Aggregate: groupBy=[[#a]], aggr=[[SUM(#b) AS b]]\
+            \n    TableScan: test projection=None";
+        assert_optimized_plan_eq(&plan, expected);
+        Ok(())
+    }
+
+    /// verifies that a filter is pushed to before a projection, the filter expression is correctly re-written
+    #[test]
+    fn alias() -> Result<()> {
+        let table_scan = test_table_scan()?;
+        let plan = LogicalPlanBuilder::from(&table_scan)
+            .project(vec![col("a").alias("b"), col("c")])?
+            .filter(col("b").eq(&Expr::Literal(ScalarValue::Int64(1))))?
+            .build()?;
+        // selection is before projection
+        let expected = "\
+            Projection: #a AS b, #c\
+            \n  Selection: #a Eq Int64(1)\
+            \n    TableScan: test projection=None";
+        assert_optimized_plan_eq(&plan, expected);
+        Ok(())
+    }
+
+    fn add(left: Expr, right: Expr) -> Expr {
+        Expr::BinaryExpr {
+            left: Box::new(left),
+            op: Operator::Plus,
+            right: Box::new(right),
+        }
+    }
+
+    fn multiply(left: Expr, right: Expr) -> Expr {
+        Expr::BinaryExpr {
+            left: Box::new(left),
+            op: Operator::Multiply,
+            right: Box::new(right),
+        }
+    }
+
+    /// verifies that a filter is pushed to before a projection with a complex expression, the filter expression is correctly re-written
+    #[test]
+    fn complex_expression() -> Result<()> {
+        let table_scan = test_table_scan()?;
+        let plan = LogicalPlanBuilder::from(&table_scan)
+            .project(vec![
+                add(multiply(col("a"), lit(2)), col("c")).alias("b"),
+                col("c"),
+            ])?
+            .filter(col("b").eq(&Expr::Literal(ScalarValue::Int64(1))))?
+            .build()?;
+
+        // not part of the test, just good to know:
+        assert_eq!(
+            format!("{:?}", plan),
+            "\
+            Selection: #b Eq Int64(1)\
+            \n  Projection: #a Multiply Int32(2) Plus #c AS b, #c\
+            \n    TableScan: test projection=None"
+        );
+
+        // selection is before projection
+        let expected = "\
+            Projection: #a Multiply Int32(2) Plus #c AS b, #c\
+            \n  Selection: #a Multiply Int32(2) Plus #c Eq Int64(1)\
+            \n    TableScan: test projection=None";
+        assert_optimized_plan_eq(&plan, expected);
+        Ok(())
+    }
+
+    /// verifies that when a filter is pushed to after 2 projections, the filter expression is correctly re-written
+    #[test]
+    fn complex_plan() -> Result<()> {
+        let table_scan = test_table_scan()?;
+        let plan = LogicalPlanBuilder::from(&table_scan)
+            .project(vec![
+                add(multiply(col("a"), lit(2)), col("c")).alias("b"),
+                col("c"),
+            ])?
+            // second projection where we rename columns, just to make it difficult
+            .project(vec![multiply(col("b"), lit(3)).alias("a"), col("c")])?
+            .filter(col("a").eq(&Expr::Literal(ScalarValue::Int64(1))))?
+            .build()?;
+
+        // not part of the test, just good to know:
+        assert_eq!(
+            format!("{:?}", plan),
+            "\
+            Selection: #a Eq Int64(1)\
+            \n  Projection: #b Multiply Int32(3) AS a, #c\
+            \n    Projection: #a Multiply Int32(2) Plus #c AS b, #c\
+            \n      TableScan: test projection=None"
+        );
+
+        // selection is before the projections
+        let expected = "\
+        Projection: #b Multiply Int32(3) AS a, #c\
+        \n  Projection: #a Multiply Int32(2) Plus #c AS b, #c\
+        \n    Selection: #a Multiply Int32(2) Plus #c Multiply Int32(3) Eq Int64(1)\
+        \n      TableScan: test projection=None";
+        assert_optimized_plan_eq(&plan, expected);
+        Ok(())
+    }
+
+    /// verifies that when two filters apply after an aggregation that only allows one to be pushed, one is pushed
+    /// and the other not.
+    #[test]
+    fn multi_filter() -> Result<()> {
+        // the aggregation allows one filter to pass (b), and the other one to not pass (SUM(c))
+        let table_scan = test_table_scan()?;
+        let plan = LogicalPlanBuilder::from(&table_scan)
+            .project(vec![col("a").alias("b"), col("c")])?
+            .aggregate(
+                vec![col("b")],
+                vec![aggregate_expr("SUM", col("c"), DataType::Int32)],
+            )?
+            .filter(col("b").gt(&Expr::Literal(ScalarValue::Int64(10))))?
+            .filter(col("SUM(c)").gt(&Expr::Literal(ScalarValue::Int64(10))))?
+            .build()?;
+
+        // not part of the test, just good to know:
+        assert_eq!(
+            format!("{:?}", plan),
+            "\
+            Selection: #SUM(c) Gt Int64(10)\
+            \n  Selection: #b Gt Int64(10)\
+            \n    Aggregate: groupBy=[[#b]], aggr=[[SUM(#c)]]\
+            \n      Projection: #a AS b, #c\
+            \n        TableScan: test projection=None"
+        );
+
+        // selection is before the projections
+        let expected = "\
+        Selection: #SUM(c) Gt Int64(10)\
+        \n  Aggregate: groupBy=[[#b]], aggr=[[SUM(#c)]]\
+        \n    Projection: #a AS b, #c\
+        \n      Selection: #a Gt Int64(10)\
+        \n        TableScan: test projection=None";
+        assert_optimized_plan_eq(&plan, expected);
+
+        Ok(())
+    }
+
+    /// verifies that when two limits are in place, we jump neither
+    #[test]
+    fn double_limit() -> Result<()> {
+        let table_scan = test_table_scan()?;
+        let plan = LogicalPlanBuilder::from(&table_scan)
+            .project(vec![col("a"), col("b")])?
+            .limit(20)?
+            .limit(10)?
+            .project(vec![col("a"), col("b")])?
+            .filter(col("a").eq(&Expr::Literal(ScalarValue::Int64(1))))?
+            .build()?;
+        // selection does not just any of the limits
+        let expected = "\
+            Projection: #a, #b\
+            \n  Selection: #a Eq Int64(1)\
+            \n    Limit: 10\
+            \n      Limit: 20\
+            \n        Projection: #a, #b\
+            \n          TableScan: test projection=None";
+        assert_optimized_plan_eq(&plan, expected);
+        Ok(())
+    }
+
+    /// verifies that filters with the same columns are correctly placed
+    #[test]
+    fn filter_2_breaks_limits() -> Result<()> {
+        let table_scan = test_table_scan()?;
+        let plan = LogicalPlanBuilder::from(&table_scan)
+            .project(vec![col("a")])?
+            .filter(col("a").lt_eq(&Expr::Literal(ScalarValue::Int64(1))))?
+            .limit(1)?
+            .project(vec![col("a")])?
+            .filter(col("a").gt_eq(&Expr::Literal(ScalarValue::Int64(1))))?
+            .build()?;
+        // Should be able to move both filters below the projections
+
+        // not part of the test
+        assert_eq!(
+            format!("{:?}", plan),
+            "Selection: #a GtEq Int64(1)\
+             \n  Projection: #a\
+             \n    Limit: 1\
+             \n      Selection: #a LtEq Int64(1)\
+             \n        Projection: #a\
+             \n          TableScan: test projection=None"
+        );
+
+        let expected = "\
+        Projection: #a\
+        \n  Selection: #a GtEq Int64(1)\
+        \n    Limit: 1\
+        \n      Projection: #a\

Review comment:
       Sorry, I did not understand your comment. I thought that both projections were left in the plan (line 590 and line 593).
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on pull request #7880: ARROW-9619: [Rust] [DataFusion] Add predicate push-down

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #7880:
URL: https://github.com/apache/arrow/pull/7880#issuecomment-674923231


   > > Something that can be left for future optimization: we can also go the other direction, i.e. break `And` filters into into individual boolean expressions so these filters can be partially pushed further down the plan.
   > 
   > Yeap, good idea. AFAI experienced, spark is not doing this - at least up to spark 2.4.5.
   
   I filed https://issues.apache.org/jira/browse/ARROW-9771 to track this suggestion


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on a change in pull request #7880: ARROW-9619: [Rust] [DataFusion] Add predicate push-down

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #7880:
URL: https://github.com/apache/arrow/pull/7880#discussion_r470975358



##########
File path: rust/datafusion/src/optimizer/filter_push_down.rs
##########
@@ -0,0 +1,467 @@
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Filter Push Down optimizer rule ensures that filters are applied as early as possible in the plan
+
+use crate::error::Result;
+use crate::logicalplan::Expr;
+use crate::logicalplan::LogicalPlan;
+use crate::optimizer::optimizer::OptimizerRule;
+use crate::optimizer::utils;
+use std::collections::{HashMap, HashSet};
+
+/// Filter Push Down optimizer rule pushes filter clauses down the plan
+///
+/// This optimization looks for the maximum depth of each column in the plan where a filter can be applied and
+/// re-writes the plan with filters on those locations.
+/// It performs two passes on the plan:
+/// 1. identify filters, which columns they use, and projections along the path
+/// 2. move filters down, re-writing the expressions using the projections
+pub struct FilterPushDown {}
+
+impl OptimizerRule for FilterPushDown {
+    fn optimize(&mut self, plan: &LogicalPlan) -> Result<LogicalPlan> {
+        let (break_points, selections, projections) = analyze_plan(plan, 0)?;
+
+        // compute max depth for each of the columns
+        let mut breaks: HashMap<String, usize> = HashMap::new();
+        for (key, depth) in break_points {
+            match breaks.get(&key) {
+                Some(current_depth) => {
+                    if depth > *current_depth {
+                        breaks.insert(key, depth);
+                    }
+                }
+                None => {
+                    breaks.insert(key, depth);
+                }
+            }
+        }
+
+        // construct optimized position of each of the new selections
+        let mut new_selections: HashMap<usize, Expr> = HashMap::new();
+        for (selection_depth, expr) in selections {
+            let mut columns: HashSet<String> = HashSet::new();
+            utils::expr_to_column_names(&expr, &mut columns)?;
+
+            // compute the depths of each of the observed columns and the respective maximum
+            let depth = columns
+                .iter()
+                .filter_map(|column| breaks.get(column))
+                .max_by_key(|depth| **depth);
+
+            let new_depth = match depth {
+                None => selection_depth,
+                Some(d) => *d,
+            };
+
+            // re-write the new selections based on all projections that it crossed.
+            // E.g. in `Selection: #b\n  Projection: #a > 1 as b`, we can swap them, but the selection must be "#a > 1"
+            let mut new_expression = expr.clone();
+            for depth_i in selection_depth..new_depth {
+                if let Some(projection) = projections.get(&depth_i) {
+                    new_expression = rewrite(&new_expression, projection)?;
+                }
+            }
+
+            new_selections.insert(new_depth, new_expression);
+        }
+
+        optimize_plan(plan, &new_selections, 0)
+    }
+}
+
+/// Recursively transverses the logical plan looking for depths that break filter pushdown
+/// Returns a tuple:
+/// 0: map "column -> depth" of the depth that each column is found up to.
+/// 1: map "depth -> filter expression"
+/// 2: map "depth -> projection"
+fn analyze_plan(
+    plan: &LogicalPlan,
+    depth: usize,
+) -> Result<(
+    HashMap<String, usize>,
+    HashMap<usize, Expr>,
+    HashMap<usize, HashMap<String, Expr>>,
+)> {
+    match plan {
+        LogicalPlan::Selection { input, expr } => {
+            let mut result = analyze_plan(&input, depth + 1)?;
+            result.1.insert(depth, expr.clone());

Review comment:
       I agree -- having these as a named struct would improve the readability in my opinion.
   
   The other alternative is to use fields on `self` as well




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove closed pull request #7880: ARROW-9619: [Rust] [DataFusion] Add predicate push-down

Posted by GitBox <gi...@apache.org>.
andygrove closed pull request #7880:
URL: https://github.com/apache/arrow/pull/7880


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on pull request #7880: ARROW-9619: [Rust] [DataFusion] Add predicate push-down

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #7880:
URL: https://github.com/apache/arrow/pull/7880#issuecomment-673117865


   I'll try and check it out carefully tomorrow morning (US EST time)
   
   On Wed, Aug 12, 2020 at 3:51 PM Jorge Leitao <no...@github.com>
   wrote:
   
   > Any of you @alamb <https://github.com/alamb> @houqp
   > <https://github.com/houqp> @nevi-me <https://github.com/nevi-me>
   > @paddyhoran <https://github.com/paddyhoran> could help out here? I think
   > that this does significantly speeds querying for anything more complex, as
   > we run aggregations and projections on much less data.
   >
   > —
   > You are receiving this because you were mentioned.
   > Reply to this email directly, view it on GitHub
   > <https://github.com/apache/arrow/pull/7880#issuecomment-673076257>, or
   > unsubscribe
   > <https://github.com/notifications/unsubscribe-auth/AADXZMNMPNXVREY4G7Y5XWDSALXCVANCNFSM4PSQWBHA>
   > .
   >
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on pull request #7880: ARROW-9619: [Rust] [DataFusion] Add predicate push-down

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on pull request #7880:
URL: https://github.com/apache/arrow/pull/7880#issuecomment-674434146


   @alamb , thank you so much for taking the time to think through this and come up with an example. I agree with you that it is wrong. I will evaluate whether the current approach is able to coupe with this, or whether we will have to scratch it and start from a different direction.
   
   I changed this PR back to draft as it is obviously out of spec.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org