You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/08/15 12:55:11 UTC

[GitHub] [arrow] alamb commented on a change in pull request #7919: ARROW-9678: [Rust] [DataFusion] Improve projection push down to remove unused columns

alamb commented on a change in pull request #7919:
URL: https://github.com/apache/arrow/pull/7919#discussion_r470974242



##########
File path: rust/datafusion/src/optimizer/projection_push_down.rs
##########
@@ -196,12 +95,213 @@ fn get_projected_schema(
     // create the projected schema
     let mut projected_fields: Vec<Field> = Vec::with_capacity(projection.len());
     for i in &projection {
-        projected_fields.push(table_schema.fields()[*i].clone());
+        projected_fields.push(schema.fields()[*i].clone());
     }
 
     Ok((projection, Schema::new(projected_fields)))
 }
 
+/// Recursively transverses the logical plan removing expressions and that are not needed.
+fn optimize_plan(
+    plan: &LogicalPlan,
+    required_columns: &HashSet<String>, // set of columns required up to this step
+    has_projection: bool,
+) -> Result<LogicalPlan> {
+    match plan {
+        LogicalPlan::Projection {
+            input,
+            expr,
+            schema,
+        } => {
+            // projection:
+            // * remove any expression that is not required
+            // * construct the new set of required columns
+
+            let mut new_expr = Vec::new();
+            let mut new_fields = Vec::new();
+            let mut new_required_columns = HashSet::new();
+
+            // re-write schema and expressions removing un-used columns

Review comment:
       ```suggestion
               // Gather all columns needed for expressions in this Projection
   ```
   I don't think the schema is being modified here

##########
File path: rust/datafusion/src/optimizer/projection_push_down.rs
##########
@@ -196,12 +95,213 @@ fn get_projected_schema(
     // create the projected schema
     let mut projected_fields: Vec<Field> = Vec::with_capacity(projection.len());
     for i in &projection {
-        projected_fields.push(table_schema.fields()[*i].clone());
+        projected_fields.push(schema.fields()[*i].clone());
     }
 
     Ok((projection, Schema::new(projected_fields)))
 }
 
+/// Recursively transverses the logical plan removing expressions and that are not needed.
+fn optimize_plan(
+    plan: &LogicalPlan,
+    required_columns: &HashSet<String>, // set of columns required up to this step
+    has_projection: bool,
+) -> Result<LogicalPlan> {
+    match plan {
+        LogicalPlan::Projection {
+            input,
+            expr,
+            schema,
+        } => {
+            // projection:
+            // * remove any expression that is not required
+            // * construct the new set of required columns
+
+            let mut new_expr = Vec::new();
+            let mut new_fields = Vec::new();
+            let mut new_required_columns = HashSet::new();
+
+            // re-write schema and expressions removing un-used columns
+            schema
+                .fields()
+                .iter()
+                .enumerate()
+                .map(|(i, field)| {
+                    if required_columns.contains(field.name()) {
+                        new_expr.push(expr[i].clone());
+                        new_fields.push(field.clone());
+
+                        // gather the new set of required columns
+                        utils::expr_to_column_names(&expr[i], &mut new_required_columns)
+                    } else {
+                        Ok(())
+                    }
+                })
+                .collect::<Result<()>>()?;
+
+            let new_input = optimize_plan(&input, &new_required_columns, true)?;
+            if new_fields.len() == 0 {
+                // no need for an expression at all
+                Ok(new_input)
+            } else {
+                Ok(LogicalPlan::Projection {
+                    expr: new_expr,
+                    input: Box::new(new_input),
+                    schema: Box::new(Schema::new(new_fields)),
+                })
+            }
+        }
+        LogicalPlan::Aggregate {
+            schema,
+            input,
+            group_expr,
+            aggr_expr,
+            ..
+        } => {
+            // aggregate:
+            // * remove any aggregate expression that is not required
+            // * construct the new set of required columns
+
+            let mut new_required_columns = HashSet::new();
+            utils::exprlist_to_column_names(group_expr, &mut new_required_columns)?;
+
+            // re-write schema and expressions removing un-used columns
+            let mut new_aggr_expr = Vec::new();
+            let mut new_fields = Vec::new();
+            aggr_expr
+                .iter()
+                .map(|expr| {
+                    let name = &expr.name(&schema)?;
+                    let field = schema.field_with_name(name)?;
+
+                    if required_columns.contains(name) {
+                        new_aggr_expr.push(expr.clone());
+                        new_fields.push(field.clone());
+
+                        // add to the new set of required columns
+                        utils::expr_to_column_names(expr, &mut new_required_columns)
+                    } else {
+                        Ok(())
+                    }
+                })
+                .collect::<Result<()>>()?;
+            let new_schema = Schema::new(new_fields);
+
+            Ok(LogicalPlan::Aggregate {
+                group_expr: group_expr.clone(),
+                aggr_expr: new_aggr_expr,
+                input: Box::new(optimize_plan(&input, &new_required_columns, true)?),
+                schema: Box::new(new_schema),
+            })
+        }
+        // scans:
+        // * remove un-used columns from the scan projection
+        LogicalPlan::TableScan {
+            schema_name,
+            table_name,
+            table_schema,
+            projection,
+            ..
+        } => {
+            let (projection, projected_schema) = get_projected_schema(
+                &table_schema,
+                projection,
+                required_columns,
+                has_projection,
+            )?;
+
+            // return the table scan with projection
+            Ok(LogicalPlan::TableScan {
+                schema_name: schema_name.to_string(),
+                table_name: table_name.to_string(),
+                table_schema: table_schema.clone(),
+                projection: Some(projection),
+                projected_schema: Box::new(projected_schema),
+            })
+        }
+        LogicalPlan::InMemoryScan {
+            data,
+            schema,
+            projection,
+            ..
+        } => {
+            let (projection, projected_schema) = get_projected_schema(
+                &schema,
+                projection,
+                required_columns,
+                has_projection,
+            )?;
+            Ok(LogicalPlan::InMemoryScan {
+                data: data.clone(),
+                schema: schema.clone(),
+                projection: Some(projection),
+                projected_schema: Box::new(projected_schema),
+            })
+        }
+        LogicalPlan::CsvScan {
+            path,
+            has_header,
+            delimiter,
+            schema,
+            projection,
+            ..
+        } => {
+            let (projection, projected_schema) = get_projected_schema(
+                &schema,
+                projection,
+                required_columns,
+                has_projection,
+            )?;
+
+            Ok(LogicalPlan::CsvScan {
+                path: path.to_owned(),
+                has_header: *has_header,
+                schema: schema.clone(),
+                delimiter: *delimiter,
+                projection: Some(projection),
+                projected_schema: Box::new(projected_schema),
+            })
+        }
+        LogicalPlan::ParquetScan {
+            path,
+            schema,
+            projection,
+            ..
+        } => {
+            let (projection, projected_schema) = get_projected_schema(
+                &schema,
+                projection,
+                required_columns,
+                has_projection,
+            )?;
+
+            Ok(LogicalPlan::ParquetScan {
+                path: path.to_owned(),
+                schema: schema.clone(),
+                projection: Some(projection),
+                projected_schema: Box::new(projected_schema),
+            })
+        }
+        // all other nodes:
+        // * gather all used columns as required columns
+        _ => {
+            let expr = utils::expressions(plan);

Review comment:
       This is a nice fallback here -- aka to default to the safe path and keep all expressions. 
   
   I wonder what you would think about explicitly listing out the LogicalPlan types here (rather than using `_`) -- the rationale would be that then anyone who was adding a new LogicalPlan type would be told by the compiler they should consider how the project_push_down optimization should be applied to their new operator




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org