You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2020/08/22 16:42:26 UTC

[arrow] branch master updated: ARROW-9678: [Rust] [DataFusion] Improve projection push down to remove unused columns

This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 5e7be07  ARROW-9678: [Rust] [DataFusion] Improve projection push down to remove unused columns
5e7be07 is described below

commit 5e7be0712e871521c56fcc820ba3a5e428a4b091
Author: Jorge C. Leitao <jo...@gmail.com>
AuthorDate: Sat Aug 22 10:41:41 2020 -0600

    ARROW-9678: [Rust] [DataFusion] Improve projection push down to remove unused columns
    
    This PR makes the projection optimizer remove any projection or aggregation that is not used down the plan, thus improving speed and convenience.
    
    This is worked on top of #7879 and only the last commit is specific to this PR.
    
    Closes #7919 from jorgecarleitao/projection
    
    Authored-by: Jorge C. Leitao <jo...@gmail.com>
    Signed-off-by: Andy Grove <an...@gmail.com>
---
 .../src/optimizer/projection_push_down.rs          | 437 +++++++++++++++------
 rust/datafusion/src/test/mod.rs                    |   8 +
 2 files changed, 322 insertions(+), 123 deletions(-)

diff --git a/rust/datafusion/src/optimizer/projection_push_down.rs b/rust/datafusion/src/optimizer/projection_push_down.rs
index 956b533..4ec2fa0 100644
--- a/rust/datafusion/src/optimizer/projection_push_down.rs
+++ b/rust/datafusion/src/optimizer/projection_push_down.rs
@@ -27,17 +27,20 @@ use arrow::error::Result as ArrowResult;
 use std::collections::HashSet;
 use utils::optimize_explain;
 
-/// Projection Push Down optimizer rule ensures that only referenced columns are
-/// loaded into memory.
-///
-/// This optimizer does not modify expressions in the plan; it only changes the read projections
+/// Optimizer that removes unused projections and aggregations from plans
+/// This reduces both scans and
 pub struct ProjectionPushDown {}
 
 impl OptimizerRule for ProjectionPushDown {
     fn optimize(&mut self, plan: &LogicalPlan) -> Result<LogicalPlan> {
-        // set of all columns refered from a scan.
-        let mut accum: HashSet<String> = HashSet::new();
-        self.optimize_plan(plan, &mut accum, false)
+        // set of all columns refered by the plan (and thus considered required by the root)
+        let required_columns = plan
+            .schema()
+            .fields()
+            .iter()
+            .map(|f| f.name().clone())
+            .collect::<HashSet<String>>();
+        return optimize_plan(self, plan, &required_columns, false);
     }
 
     fn name(&self) -> &str {
@@ -50,122 +53,12 @@ impl ProjectionPushDown {
     pub fn new() -> Self {
         Self {}
     }
-
-    fn optimize_plan(
-        &mut self,
-        plan: &LogicalPlan,
-        accum: &mut HashSet<String>,
-        has_projection: bool,
-    ) -> Result<LogicalPlan> {
-        match plan {
-            LogicalPlan::TableScan {
-                schema_name,
-                table_name,
-                table_schema,
-                projection,
-                ..
-            } => {
-                let (projection, projected_schema) = get_projected_schema(
-                    &table_schema,
-                    projection,
-                    accum,
-                    has_projection,
-                )?;
-
-                // return the table scan with projection
-                Ok(LogicalPlan::TableScan {
-                    schema_name: schema_name.to_string(),
-                    table_name: table_name.to_string(),
-                    table_schema: table_schema.clone(),
-                    projection: Some(projection),
-                    projected_schema: Box::new(projected_schema),
-                })
-            }
-            LogicalPlan::InMemoryScan {
-                data,
-                schema,
-                projection,
-                ..
-            } => {
-                let (projection, projected_schema) =
-                    get_projected_schema(&schema, projection, accum, has_projection)?;
-                Ok(LogicalPlan::InMemoryScan {
-                    data: data.clone(),
-                    schema: schema.clone(),
-                    projection: Some(projection),
-                    projected_schema: Box::new(projected_schema),
-                })
-            }
-            LogicalPlan::CsvScan {
-                path,
-                has_header,
-                delimiter,
-                schema,
-                projection,
-                ..
-            } => {
-                let (projection, projected_schema) =
-                    get_projected_schema(&schema, projection, accum, has_projection)?;
-
-                Ok(LogicalPlan::CsvScan {
-                    path: path.to_owned(),
-                    has_header: *has_header,
-                    schema: schema.clone(),
-                    delimiter: *delimiter,
-                    projection: Some(projection),
-                    projected_schema: Box::new(projected_schema),
-                })
-            }
-            LogicalPlan::ParquetScan {
-                path,
-                schema,
-                projection,
-                ..
-            } => {
-                let (projection, projected_schema) =
-                    get_projected_schema(&schema, projection, accum, has_projection)?;
-
-                Ok(LogicalPlan::ParquetScan {
-                    path: path.to_owned(),
-                    schema: schema.clone(),
-                    projection: Some(projection),
-                    projected_schema: Box::new(projected_schema),
-                })
-            }
-            LogicalPlan::Explain {
-                verbose,
-                plan,
-                stringified_plans,
-                schema,
-            } => optimize_explain(self, *verbose, &*plan, stringified_plans, &*schema),
-            // in all other cases, we construct a new plan based on the optimized inputs and re-written expressions
-            _ => {
-                let has_projection = match plan {
-                    LogicalPlan::Projection { .. } => true,
-                    _ => false,
-                };
-
-                let expr = utils::expressions(plan);
-                // collect all columns referenced by projection expressions
-                utils::exprlist_to_column_names(&expr, accum)?;
-
-                // apply the optimization to all inputs of the plan
-                let inputs = utils::inputs(plan);
-                let new_inputs = inputs
-                    .iter()
-                    .map(|plan| self.optimize_plan(plan, accum, has_projection))
-                    .collect::<Result<Vec<_>>>()?;
-
-                utils::from_plan(plan, &expr, &new_inputs)
-            }
-        }
-    }
 }
 
 fn get_projected_schema(
-    table_schema: &Schema,
+    schema: &Schema,
     projection: &Option<Vec<usize>>,
-    accum: &HashSet<String>,
+    required_columns: &HashSet<String>,
     has_projection: bool,
 ) -> Result<(Vec<usize>, Schema)> {
     if projection.is_some() {
@@ -179,9 +72,9 @@ fn get_projected_schema(
     //
     // we discard non-existing columns because some column names are not part of the schema,
     // e.g. when the column derives from an aggregation
-    let mut projection: Vec<usize> = accum
+    let mut projection: Vec<usize> = required_columns
         .iter()
-        .map(|name| table_schema.index_of(name))
+        .map(|name| schema.index_of(name))
         .filter_map(ArrowResult::ok)
         .collect();
 
@@ -192,7 +85,7 @@ fn get_projected_schema(
             projection.push(0);
         } else {
             // for table scan without projection, we default to return all columns
-            projection = table_schema
+            projection = schema
                 .fields()
                 .iter()
                 .enumerate()
@@ -207,12 +100,232 @@ fn get_projected_schema(
     // create the projected schema
     let mut projected_fields: Vec<Field> = Vec::with_capacity(projection.len());
     for i in &projection {
-        projected_fields.push(table_schema.fields()[*i].clone());
+        projected_fields.push(schema.fields()[*i].clone());
     }
 
     Ok((projection, Schema::new(projected_fields)))
 }
 
+/// Recursively transverses the logical plan removing expressions and that are not needed.
+fn optimize_plan(
+    optimizer: &mut ProjectionPushDown,
+    plan: &LogicalPlan,
+    required_columns: &HashSet<String>, // set of columns required up to this step
+    has_projection: bool,
+) -> Result<LogicalPlan> {
+    match plan {
+        LogicalPlan::Projection {
+            input,
+            expr,
+            schema,
+        } => {
+            // projection:
+            // * remove any expression that is not required
+            // * construct the new set of required columns
+
+            let mut new_expr = Vec::new();
+            let mut new_fields = Vec::new();
+            let mut new_required_columns = HashSet::new();
+
+            // Gather all columns needed for expressions in this Projection
+            schema
+                .fields()
+                .iter()
+                .enumerate()
+                .map(|(i, field)| {
+                    if required_columns.contains(field.name()) {
+                        new_expr.push(expr[i].clone());
+                        new_fields.push(field.clone());
+
+                        // gather the new set of required columns
+                        utils::expr_to_column_names(&expr[i], &mut new_required_columns)
+                    } else {
+                        Ok(())
+                    }
+                })
+                .collect::<Result<()>>()?;
+
+            let new_input =
+                optimize_plan(optimizer, &input, &new_required_columns, true)?;
+            if new_fields.len() == 0 {
+                // no need for an expression at all
+                Ok(new_input)
+            } else {
+                Ok(LogicalPlan::Projection {
+                    expr: new_expr,
+                    input: Box::new(new_input),
+                    schema: Box::new(Schema::new(new_fields)),
+                })
+            }
+        }
+        LogicalPlan::Aggregate {
+            schema,
+            input,
+            group_expr,
+            aggr_expr,
+            ..
+        } => {
+            // aggregate:
+            // * remove any aggregate expression that is not required
+            // * construct the new set of required columns
+
+            let mut new_required_columns = HashSet::new();
+            utils::exprlist_to_column_names(group_expr, &mut new_required_columns)?;
+
+            // Gather all columns needed for expressions in this Aggregate
+            let mut new_aggr_expr = Vec::new();
+            let mut new_fields = Vec::new();
+            aggr_expr
+                .iter()
+                .map(|expr| {
+                    let name = &expr.name(&schema)?;
+                    let field = schema.field_with_name(name)?;
+
+                    if required_columns.contains(name) {
+                        new_aggr_expr.push(expr.clone());
+                        new_fields.push(field.clone());
+
+                        // add to the new set of required columns
+                        utils::expr_to_column_names(expr, &mut new_required_columns)
+                    } else {
+                        Ok(())
+                    }
+                })
+                .collect::<Result<()>>()?;
+            let new_schema = Schema::new(new_fields);
+
+            Ok(LogicalPlan::Aggregate {
+                group_expr: group_expr.clone(),
+                aggr_expr: new_aggr_expr,
+                input: Box::new(optimize_plan(
+                    optimizer,
+                    &input,
+                    &new_required_columns,
+                    true,
+                )?),
+                schema: Box::new(new_schema),
+            })
+        }
+        // scans:
+        // * remove un-used columns from the scan projection
+        LogicalPlan::TableScan {
+            schema_name,
+            table_name,
+            table_schema,
+            projection,
+            ..
+        } => {
+            let (projection, projected_schema) = get_projected_schema(
+                &table_schema,
+                projection,
+                required_columns,
+                has_projection,
+            )?;
+
+            // return the table scan with projection
+            Ok(LogicalPlan::TableScan {
+                schema_name: schema_name.to_string(),
+                table_name: table_name.to_string(),
+                table_schema: table_schema.clone(),
+                projection: Some(projection),
+                projected_schema: Box::new(projected_schema),
+            })
+        }
+        LogicalPlan::InMemoryScan {
+            data,
+            schema,
+            projection,
+            ..
+        } => {
+            let (projection, projected_schema) = get_projected_schema(
+                &schema,
+                projection,
+                required_columns,
+                has_projection,
+            )?;
+            Ok(LogicalPlan::InMemoryScan {
+                data: data.clone(),
+                schema: schema.clone(),
+                projection: Some(projection),
+                projected_schema: Box::new(projected_schema),
+            })
+        }
+        LogicalPlan::CsvScan {
+            path,
+            has_header,
+            delimiter,
+            schema,
+            projection,
+            ..
+        } => {
+            let (projection, projected_schema) = get_projected_schema(
+                &schema,
+                projection,
+                required_columns,
+                has_projection,
+            )?;
+
+            Ok(LogicalPlan::CsvScan {
+                path: path.to_owned(),
+                has_header: *has_header,
+                schema: schema.clone(),
+                delimiter: *delimiter,
+                projection: Some(projection),
+                projected_schema: Box::new(projected_schema),
+            })
+        }
+        LogicalPlan::ParquetScan {
+            path,
+            schema,
+            projection,
+            ..
+        } => {
+            let (projection, projected_schema) = get_projected_schema(
+                &schema,
+                projection,
+                required_columns,
+                has_projection,
+            )?;
+
+            Ok(LogicalPlan::ParquetScan {
+                path: path.to_owned(),
+                schema: schema.clone(),
+                projection: Some(projection),
+                projected_schema: Box::new(projected_schema),
+            })
+        }
+        LogicalPlan::Explain {
+            verbose,
+            plan,
+            stringified_plans,
+            schema,
+        } => optimize_explain(optimizer, *verbose, &*plan, stringified_plans, &*schema),
+        // all other nodes:
+        // * gather all used columns as required columns
+        LogicalPlan::Limit { .. }
+        | LogicalPlan::Selection { .. }
+        | LogicalPlan::EmptyRelation { .. }
+        | LogicalPlan::Sort { .. }
+        | LogicalPlan::CreateExternalTable { .. } => {
+            let expr = utils::expressions(plan);
+            // collect all required columns by this plan
+            let mut new_required_columns = required_columns.clone();
+            utils::exprlist_to_column_names(&expr, &mut new_required_columns)?;
+
+            // apply the optimization to all inputs of the plan
+            let inputs = utils::inputs(plan);
+            let new_inputs = inputs
+                .iter()
+                .map(|plan| {
+                    optimize_plan(optimizer, plan, &new_required_columns, has_projection)
+                })
+                .collect::<Result<Vec<_>>>()?;
+
+            utils::from_plan(plan, &expr, &new_inputs)
+        }
+    }
+}
+
 #[cfg(test)]
 mod tests {
 
@@ -355,6 +468,84 @@ mod tests {
         Ok(())
     }
 
+    /// tests that it removes unused columns in projections
+    #[test]
+    fn table_unused_column() -> Result<()> {
+        let table_scan = test_table_scan()?;
+        assert_eq!(3, table_scan.schema().fields().len());
+        assert_fields_eq(&table_scan, vec!["a", "b", "c"]);
+
+        // we never use "b" in the first projection => remove it
+        let plan = LogicalPlanBuilder::from(&table_scan)
+            .project(vec![col("c"), col("a"), col("b")])?
+            .filter(col("c").gt(lit(1)))?
+            .aggregate(vec![col("c")], vec![max(col("a"))])?
+            .build()?;
+
+        assert_fields_eq(&plan, vec!["c", "MAX(a)"]);
+
+        let expected = "\
+        Aggregate: groupBy=[[#c]], aggr=[[MAX(#a)]]\
+        \n  Selection: #c Gt Int32(1)\
+        \n    Projection: #c, #a\
+        \n      TableScan: test projection=Some([0, 2])";
+
+        assert_optimized_plan_eq(&plan, expected);
+
+        Ok(())
+    }
+
+    /// tests that it removes un-needed projections
+    #[test]
+    fn table_unused_projection() -> Result<()> {
+        let table_scan = test_table_scan()?;
+        assert_eq!(3, table_scan.schema().fields().len());
+        assert_fields_eq(&table_scan, vec!["a", "b", "c"]);
+
+        // there is no need for the first projection
+        let plan = LogicalPlanBuilder::from(&table_scan)
+            .project(vec![col("b")])?
+            .project(vec![lit(1).alias("a")])?
+            .build()?;
+
+        assert_fields_eq(&plan, vec!["a"]);
+
+        let expected = "\
+        Projection: Int32(1) AS a\
+        \n  TableScan: test projection=Some([0])";
+
+        assert_optimized_plan_eq(&plan, expected);
+
+        Ok(())
+    }
+
+    /// tests that it removes an aggregate is never used downstream
+    #[test]
+    fn table_unused_aggregate() -> Result<()> {
+        let table_scan = test_table_scan()?;
+        assert_eq!(3, table_scan.schema().fields().len());
+        assert_fields_eq(&table_scan, vec!["a", "b", "c"]);
+
+        // we never use "min(b)" => remove it
+        let plan = LogicalPlanBuilder::from(&table_scan)
+            .aggregate(vec![col("a"), col("c")], vec![max(col("b")), min(col("b"))])?
+            .filter(col("c").gt(lit(1)))?
+            .project(vec![col("c"), col("a"), col("MAX(b)")])?
+            .build()?;
+
+        assert_fields_eq(&plan, vec!["c", "a", "MAX(b)"]);
+
+        let expected = "\
+        Projection: #c, #a, #MAX(b)\
+        \n  Selection: #c Gt Int32(1)\
+        \n    Aggregate: groupBy=[[#a, #c]], aggr=[[MAX(#b)]]\
+        \n      TableScan: test projection=Some([0, 1, 2])";
+
+        assert_optimized_plan_eq(&plan, expected);
+
+        Ok(())
+    }
+
     fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
         let optimized_plan = optimize(plan).expect("failed to optimize plan");
         let formatted_plan = format!("{:?}", optimized_plan);
diff --git a/rust/datafusion/src/test/mod.rs b/rust/datafusion/src/test/mod.rs
index 5b4445a..ed8f8b6 100644
--- a/rust/datafusion/src/test/mod.rs
+++ b/rust/datafusion/src/test/mod.rs
@@ -230,3 +230,11 @@ pub fn max(expr: Expr) -> Expr {
         args: vec![expr],
     }
 }
+
+pub fn min(expr: Expr) -> Expr {
+    Expr::AggregateFunction {
+        name: "MIN".to_owned(),
+        args: vec![expr],
+        return_type: DataType::Float64,
+    }
+}