You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/08/15 05:12:27 UTC

[GitHub] [arrow] houqp commented on a change in pull request #7880: ARROW-9619: [Rust] [DataFusion] Add predicate push-down

houqp commented on a change in pull request #7880:
URL: https://github.com/apache/arrow/pull/7880#discussion_r470041733



##########
File path: rust/datafusion/src/optimizer/filter_push_down.rs
##########
@@ -0,0 +1,467 @@
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Filter Push Down optimizer rule ensures that filters are applied as early as possible in the plan
+
+use crate::error::Result;
+use crate::logicalplan::Expr;
+use crate::logicalplan::LogicalPlan;
+use crate::optimizer::optimizer::OptimizerRule;
+use crate::optimizer::utils;
+use std::collections::{HashMap, HashSet};
+
+/// Filter Push Down optimizer rule pushes filter clauses down the plan
+///
+/// This optimization looks for the maximum depth of each column in the plan where a filter can be applied and
+/// re-writes the plan with filters on those locations.
+/// It performs two passes on the plan:
+/// 1. identify filters, which columns they use, and projections along the path
+/// 2. move filters down, re-writing the expressions using the projections
+pub struct FilterPushDown {}
+
+impl OptimizerRule for FilterPushDown {
+    fn optimize(&mut self, plan: &LogicalPlan) -> Result<LogicalPlan> {
+        let (break_points, selections, projections) = analyze_plan(plan, 0)?;
+
+        // compute max depth for each of the columns
+        let mut breaks: HashMap<String, usize> = HashMap::new();
+        for (key, depth) in break_points {
+            match breaks.get(&key) {
+                Some(current_depth) => {
+                    if depth > *current_depth {
+                        breaks.insert(key, depth);
+                    }
+                }
+                None => {
+                    breaks.insert(key, depth);
+                }
+            }
+        }
+
+        // construct optimized position of each of the new selections
+        let mut new_selections: HashMap<usize, Expr> = HashMap::new();
+        for (selection_depth, expr) in selections {
+            let mut columns: HashSet<String> = HashSet::new();
+            utils::expr_to_column_names(&expr, &mut columns)?;
+
+            // compute the depths of each of the observed columns and the respective maximum
+            let depth = columns
+                .iter()
+                .filter_map(|column| breaks.get(column))
+                .max_by_key(|depth| **depth);
+
+            let new_depth = match depth {
+                None => selection_depth,
+                Some(d) => *d,
+            };
+
+            // re-write the new selections based on all projections that it crossed.
+            // E.g. in `Selection: #b\n  Projection: #a > 1 as b`, we can swap them, but the selection must be "#a > 1"
+            let mut new_expression = expr.clone();
+            for depth_i in selection_depth..new_depth {
+                if let Some(projection) = projections.get(&depth_i) {
+                    new_expression = rewrite(&new_expression, projection)?;
+                }
+            }
+
+            new_selections.insert(new_depth, new_expression);
+        }
+
+        optimize_plan(plan, &new_selections, 0)
+    }
+}
+
+/// Recursively transverses the logical plan looking for depths that break filter pushdown
+/// Returns a tuple:
+/// 0: map "column -> depth" of the depth that each column is found up to.
+/// 1: map "depth -> filter expression"
+/// 2: map "depth -> projection"
+fn analyze_plan(
+    plan: &LogicalPlan,
+    depth: usize,
+) -> Result<(
+    HashMap<String, usize>,
+    HashMap<usize, Expr>,
+    HashMap<usize, HashMap<String, Expr>>,
+)> {
+    match plan {
+        LogicalPlan::Selection { input, expr } => {
+            let mut result = analyze_plan(&input, depth + 1)?;
+            result.1.insert(depth, expr.clone());

Review comment:
       I find `result.{0,1,2}.insert` a little bit hard to follow when going through the code, had to keep going back to the function comment to find out which index refers to which map. Would be better if we create a named struct to store the return value.

##########
File path: rust/datafusion/src/optimizer/filter_push_down.rs
##########
@@ -0,0 +1,505 @@
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Filter Push Down optimizer rule ensures that filters are applied as early as possible in the plan
+
+use crate::error::Result;
+use crate::logicalplan::Expr;
+use crate::logicalplan::LogicalPlan;
+use crate::optimizer::optimizer::OptimizerRule;
+use crate::optimizer::utils;
+use std::collections::{HashMap, HashSet};
+
+/// Filter Push Down optimizer rule pushes filter clauses down the plan
+///
+/// This optimization looks for the maximum depth of each column in the plan where a filter can be applied and
+/// re-writes the plan with filters on those locations.
+/// It performs two passes on the plan:
+/// 1. identify filters, which columns they use, and projections along the path
+/// 2. move filters down, re-writing the expressions using the projections
+/*
+A filter-commutative operation is a operation whose result of filter(op(data)) = op(filter(data)).
+An example of a filter-commutative operation is a projection; a counter-example is `limit`.
+
+The filter-commutative property is column-specific. An aggregate grouped by A on SUM(B)
+can commute with a filter that depends on A only, but does not commute with a filter that depends
+on SUM(B).
+
+A location in this module is identified by a number, depth, which is 0 for the last operation
+and highest for the first operation (tipically a scan).
+
+This optimizer commutes filters with filter-commutative operations to push the filters
+to the maximum possible depth, consequently re-writing the filter expressions by every
+projection that changes the filter's expression.
+
+    Selection: #b Gt Int64(10)
+        Projection: #a AS b
+
+is optimized to
+
+    Projection: #a AS b
+        Selection: #a Gt Int64(10)  <--- changed from #b to #a
+
+To perform such optimization, we first analyze the plan to identify three items:
+
+1. Where are the filters located in the plan
+2. Where are non-commutable operations' columns located in the plan (break_points)
+3. Where are projections located in the plan
+
+With this information, we re-write the plan by:
+
+1. Computing the maximum possible depth of each column
+2. Computing the maximum possible depth of each filter expression based on the columns it depends on
+3. re-write the filter expression for every projection that it commutes with from its original depth to its max possible depth

Review comment:
       i think we missed step 4. selection push down? Would be helpful to mention push down starts from the the first operation (highest depth) in the plan as well.

##########
File path: rust/datafusion/src/optimizer/filter_push_down.rs
##########
@@ -0,0 +1,505 @@
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Filter Push Down optimizer rule ensures that filters are applied as early as possible in the plan
+
+use crate::error::Result;
+use crate::logicalplan::Expr;
+use crate::logicalplan::LogicalPlan;
+use crate::optimizer::optimizer::OptimizerRule;
+use crate::optimizer::utils;
+use std::collections::{HashMap, HashSet};
+
+/// Filter Push Down optimizer rule pushes filter clauses down the plan
+///
+/// This optimization looks for the maximum depth of each column in the plan where a filter can be applied and
+/// re-writes the plan with filters on those locations.
+/// It performs two passes on the plan:
+/// 1. identify filters, which columns they use, and projections along the path
+/// 2. move filters down, re-writing the expressions using the projections
+/*
+A filter-commutative operation is a operation whose result of filter(op(data)) = op(filter(data)).
+An example of a filter-commutative operation is a projection; a counter-example is `limit`.
+
+The filter-commutative property is column-specific. An aggregate grouped by A on SUM(B)
+can commute with a filter that depends on A only, but does not commute with a filter that depends
+on SUM(B).
+
+A location in this module is identified by a number, depth, which is 0 for the last operation
+and highest for the first operation (tipically a scan).
+
+This optimizer commutes filters with filter-commutative operations to push the filters
+to the maximum possible depth, consequently re-writing the filter expressions by every
+projection that changes the filter's expression.
+
+    Selection: #b Gt Int64(10)
+        Projection: #a AS b
+
+is optimized to
+
+    Projection: #a AS b
+        Selection: #a Gt Int64(10)  <--- changed from #b to #a
+
+To perform such optimization, we first analyze the plan to identify three items:
+
+1. Where are the filters located in the plan
+2. Where are non-commutable operations' columns located in the plan (break_points)
+3. Where are projections located in the plan
+
+With this information, we re-write the plan by:
+
+1. Computing the maximum possible depth of each column
+2. Computing the maximum possible depth of each filter expression based on the columns it depends on
+3. re-write the filter expression for every projection that it commutes with from its original depth to its max possible depth
+*/
+pub struct FilterPushDown {}
+
+impl OptimizerRule for FilterPushDown {
+    fn optimize(&mut self, plan: &LogicalPlan) -> Result<LogicalPlan> {
+        let (break_points, selections, projections) = analyze_plan(plan, 0)?;
+
+        // compute max depth for each of the columns
+        let mut breaks: HashMap<String, usize> = HashMap::new();

Review comment:
       yes i think you are right, key is guaranteed to be unique for each iteration. hashmap is probably the wrong data structure to use for `break_points`. or we can do the depth comparison directly in `analyze_plan` on every map insertion.

##########
File path: rust/datafusion/src/optimizer/filter_push_down.rs
##########
@@ -0,0 +1,505 @@
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Filter Push Down optimizer rule ensures that filters are applied as early as possible in the plan
+
+use crate::error::Result;
+use crate::logicalplan::Expr;
+use crate::logicalplan::LogicalPlan;
+use crate::optimizer::optimizer::OptimizerRule;
+use crate::optimizer::utils;
+use std::collections::{HashMap, HashSet};
+
+/// Filter Push Down optimizer rule pushes filter clauses down the plan
+///
+/// This optimization looks for the maximum depth of each column in the plan where a filter can be applied and
+/// re-writes the plan with filters on those locations.
+/// It performs two passes on the plan:
+/// 1. identify filters, which columns they use, and projections along the path
+/// 2. move filters down, re-writing the expressions using the projections
+/*
+A filter-commutative operation is a operation whose result of filter(op(data)) = op(filter(data)).
+An example of a filter-commutative operation is a projection; a counter-example is `limit`.
+
+The filter-commutative property is column-specific. An aggregate grouped by A on SUM(B)
+can commute with a filter that depends on A only, but does not commute with a filter that depends
+on SUM(B).
+
+A location in this module is identified by a number, depth, which is 0 for the last operation
+and highest for the first operation (tipically a scan).
+
+This optimizer commutes filters with filter-commutative operations to push the filters
+to the maximum possible depth, consequently re-writing the filter expressions by every
+projection that changes the filter's expression.
+
+    Selection: #b Gt Int64(10)
+        Projection: #a AS b
+
+is optimized to
+
+    Projection: #a AS b
+        Selection: #a Gt Int64(10)  <--- changed from #b to #a
+
+To perform such optimization, we first analyze the plan to identify three items:

Review comment:
       100% on this, great write up :+1: 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org