You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/08/09 17:30:53 UTC

[GitHub] [arrow] jorgecarleitao opened a new pull request #7919: ARROW-9678: [Rust] [DataFusion] Improve projection push down to remove unused columns

jorgecarleitao opened a new pull request #7919:
URL: https://github.com/apache/arrow/pull/7919


   This PR makes the projection optimizer remove any projection or aggregation that is not used down the plan, thus improving speed and convenience.
   
   This is worked on top of #7879 and only the last commit is specific to this PR.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on pull request #7919: ARROW-9678: [Rust] [DataFusion] Improve projection push down to remove unused columns

Posted by GitBox <gi...@apache.org>.
andygrove commented on pull request #7919:
URL: https://github.com/apache/arrow/pull/7919#issuecomment-678662753


   I do have a nagging concern that the logic may not work if the query plan contains aliases that rename columns, but we can address that as a follow up if/when that becomes an issue.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on a change in pull request #7919: ARROW-9678: [Rust] [DataFusion] Improve projection push down to remove unused columns

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #7919:
URL: https://github.com/apache/arrow/pull/7919#discussion_r470974242



##########
File path: rust/datafusion/src/optimizer/projection_push_down.rs
##########
@@ -196,12 +95,213 @@ fn get_projected_schema(
     // create the projected schema
     let mut projected_fields: Vec<Field> = Vec::with_capacity(projection.len());
     for i in &projection {
-        projected_fields.push(table_schema.fields()[*i].clone());
+        projected_fields.push(schema.fields()[*i].clone());
     }
 
     Ok((projection, Schema::new(projected_fields)))
 }
 
+/// Recursively transverses the logical plan removing expressions and that are not needed.
+fn optimize_plan(
+    plan: &LogicalPlan,
+    required_columns: &HashSet<String>, // set of columns required up to this step
+    has_projection: bool,
+) -> Result<LogicalPlan> {
+    match plan {
+        LogicalPlan::Projection {
+            input,
+            expr,
+            schema,
+        } => {
+            // projection:
+            // * remove any expression that is not required
+            // * construct the new set of required columns
+
+            let mut new_expr = Vec::new();
+            let mut new_fields = Vec::new();
+            let mut new_required_columns = HashSet::new();
+
+            // re-write schema and expressions removing un-used columns

Review comment:
       ```suggestion
               // Gather all columns needed for expressions in this Projection
   ```
   I don't think the schema is being modified here

##########
File path: rust/datafusion/src/optimizer/projection_push_down.rs
##########
@@ -196,12 +95,213 @@ fn get_projected_schema(
     // create the projected schema
     let mut projected_fields: Vec<Field> = Vec::with_capacity(projection.len());
     for i in &projection {
-        projected_fields.push(table_schema.fields()[*i].clone());
+        projected_fields.push(schema.fields()[*i].clone());
     }
 
     Ok((projection, Schema::new(projected_fields)))
 }
 
+/// Recursively transverses the logical plan removing expressions and that are not needed.
+fn optimize_plan(
+    plan: &LogicalPlan,
+    required_columns: &HashSet<String>, // set of columns required up to this step
+    has_projection: bool,
+) -> Result<LogicalPlan> {
+    match plan {
+        LogicalPlan::Projection {
+            input,
+            expr,
+            schema,
+        } => {
+            // projection:
+            // * remove any expression that is not required
+            // * construct the new set of required columns
+
+            let mut new_expr = Vec::new();
+            let mut new_fields = Vec::new();
+            let mut new_required_columns = HashSet::new();
+
+            // re-write schema and expressions removing un-used columns
+            schema
+                .fields()
+                .iter()
+                .enumerate()
+                .map(|(i, field)| {
+                    if required_columns.contains(field.name()) {
+                        new_expr.push(expr[i].clone());
+                        new_fields.push(field.clone());
+
+                        // gather the new set of required columns
+                        utils::expr_to_column_names(&expr[i], &mut new_required_columns)
+                    } else {
+                        Ok(())
+                    }
+                })
+                .collect::<Result<()>>()?;
+
+            let new_input = optimize_plan(&input, &new_required_columns, true)?;
+            if new_fields.len() == 0 {
+                // no need for an expression at all
+                Ok(new_input)
+            } else {
+                Ok(LogicalPlan::Projection {
+                    expr: new_expr,
+                    input: Box::new(new_input),
+                    schema: Box::new(Schema::new(new_fields)),
+                })
+            }
+        }
+        LogicalPlan::Aggregate {
+            schema,
+            input,
+            group_expr,
+            aggr_expr,
+            ..
+        } => {
+            // aggregate:
+            // * remove any aggregate expression that is not required
+            // * construct the new set of required columns
+
+            let mut new_required_columns = HashSet::new();
+            utils::exprlist_to_column_names(group_expr, &mut new_required_columns)?;
+
+            // re-write schema and expressions removing un-used columns
+            let mut new_aggr_expr = Vec::new();
+            let mut new_fields = Vec::new();
+            aggr_expr
+                .iter()
+                .map(|expr| {
+                    let name = &expr.name(&schema)?;
+                    let field = schema.field_with_name(name)?;
+
+                    if required_columns.contains(name) {
+                        new_aggr_expr.push(expr.clone());
+                        new_fields.push(field.clone());
+
+                        // add to the new set of required columns
+                        utils::expr_to_column_names(expr, &mut new_required_columns)
+                    } else {
+                        Ok(())
+                    }
+                })
+                .collect::<Result<()>>()?;
+            let new_schema = Schema::new(new_fields);
+
+            Ok(LogicalPlan::Aggregate {
+                group_expr: group_expr.clone(),
+                aggr_expr: new_aggr_expr,
+                input: Box::new(optimize_plan(&input, &new_required_columns, true)?),
+                schema: Box::new(new_schema),
+            })
+        }
+        // scans:
+        // * remove un-used columns from the scan projection
+        LogicalPlan::TableScan {
+            schema_name,
+            table_name,
+            table_schema,
+            projection,
+            ..
+        } => {
+            let (projection, projected_schema) = get_projected_schema(
+                &table_schema,
+                projection,
+                required_columns,
+                has_projection,
+            )?;
+
+            // return the table scan with projection
+            Ok(LogicalPlan::TableScan {
+                schema_name: schema_name.to_string(),
+                table_name: table_name.to_string(),
+                table_schema: table_schema.clone(),
+                projection: Some(projection),
+                projected_schema: Box::new(projected_schema),
+            })
+        }
+        LogicalPlan::InMemoryScan {
+            data,
+            schema,
+            projection,
+            ..
+        } => {
+            let (projection, projected_schema) = get_projected_schema(
+                &schema,
+                projection,
+                required_columns,
+                has_projection,
+            )?;
+            Ok(LogicalPlan::InMemoryScan {
+                data: data.clone(),
+                schema: schema.clone(),
+                projection: Some(projection),
+                projected_schema: Box::new(projected_schema),
+            })
+        }
+        LogicalPlan::CsvScan {
+            path,
+            has_header,
+            delimiter,
+            schema,
+            projection,
+            ..
+        } => {
+            let (projection, projected_schema) = get_projected_schema(
+                &schema,
+                projection,
+                required_columns,
+                has_projection,
+            )?;
+
+            Ok(LogicalPlan::CsvScan {
+                path: path.to_owned(),
+                has_header: *has_header,
+                schema: schema.clone(),
+                delimiter: *delimiter,
+                projection: Some(projection),
+                projected_schema: Box::new(projected_schema),
+            })
+        }
+        LogicalPlan::ParquetScan {
+            path,
+            schema,
+            projection,
+            ..
+        } => {
+            let (projection, projected_schema) = get_projected_schema(
+                &schema,
+                projection,
+                required_columns,
+                has_projection,
+            )?;
+
+            Ok(LogicalPlan::ParquetScan {
+                path: path.to_owned(),
+                schema: schema.clone(),
+                projection: Some(projection),
+                projected_schema: Box::new(projected_schema),
+            })
+        }
+        // all other nodes:
+        // * gather all used columns as required columns
+        _ => {
+            let expr = utils::expressions(plan);

Review comment:
       This is a nice fallback here -- aka to default to the safe path and keep all expressions. 
   
   I wonder what you would think about explicitly listing out the LogicalPlan types here (rather than using `_`) -- the rationale would be that then anyone who was adding a new LogicalPlan type would be told by the compiler they should consider how the project_push_down optimization should be applied to their new operator




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on a change in pull request #7919: ARROW-9678: [Rust] [DataFusion] Improve projection push down to remove unused columns

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on a change in pull request #7919:
URL: https://github.com/apache/arrow/pull/7919#discussion_r470420151



##########
File path: rust/datafusion/src/optimizer/projection_push_down.rs
##########
@@ -45,116 +48,12 @@ impl ProjectionPushDown {
     pub fn new() -> Self {
         Self {}
     }
-
-    fn optimize_plan(

Review comment:
       This change is just a copy to outside `ProjectionPushDown`, since it does not really depend on the state of `ProjectionPushDown`, i.e. it can be written as a normal function.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on pull request #7919: ARROW-9678: [Rust] [DataFusion] Improve projection push down to remove unused columns

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on pull request #7919:
URL: https://github.com/apache/arrow/pull/7919#issuecomment-673895968


   FYI @andygrove and @alamb
   
   Again, this is useful mostly to the `table` API. SQL statements parsed into an optimized plan AFAIK.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on pull request #7919: ARROW-9678: [Rust] [DataFusion] Improve projection push down to remove unused columns

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on pull request #7919:
URL: https://github.com/apache/arrow/pull/7919#issuecomment-678659297


   @andygrove , is there anything we need to work this further?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] github-actions[bot] commented on pull request #7919: ARROW-9678: [Rust] [DataFusion] Improve projection push down to remove unused columns

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #7919:
URL: https://github.com/apache/arrow/pull/7919#issuecomment-671079606


   https://issues.apache.org/jira/browse/ARROW-9678


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on a change in pull request #7919: ARROW-9678: [Rust] [DataFusion] Improve projection push down to remove unused columns

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on a change in pull request #7919:
URL: https://github.com/apache/arrow/pull/7919#discussion_r470420151



##########
File path: rust/datafusion/src/optimizer/projection_push_down.rs
##########
@@ -45,116 +48,12 @@ impl ProjectionPushDown {
     pub fn new() -> Self {
         Self {}
     }
-
-    fn optimize_plan(

Review comment:
       This just moves the function to outside `ProjectionPushDown`, since it does not really depend on the state of `ProjectionPushDown`, i.e. it can be written as a normal function.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao edited a comment on pull request #7919: ARROW-9678: [Rust] [DataFusion] Improve projection push down to remove unused columns

Posted by GitBox <gi...@apache.org>.
jorgecarleitao edited a comment on pull request #7919:
URL: https://github.com/apache/arrow/pull/7919#issuecomment-673895968


   FYI @andygrove and @alamb
   
   Again, this is useful mostly to the `table` API. SQL statements are parsed into an optimized plan AFAIK.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on pull request #7919: ARROW-9678: [Rust] [DataFusion] Improve projection push down to remove unused columns

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on pull request #7919:
URL: https://github.com/apache/arrow/pull/7919#issuecomment-678665344


   Thank you, @andygrove ! I encapsulated that thought on ARROW-9830, with issue type "Test". :)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on a change in pull request #7919: ARROW-9678: [Rust] [DataFusion] Improve projection push down to remove unused columns

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on a change in pull request #7919:
URL: https://github.com/apache/arrow/pull/7919#discussion_r471010910



##########
File path: rust/datafusion/src/optimizer/projection_push_down.rs
##########
@@ -196,12 +95,213 @@ fn get_projected_schema(
     // create the projected schema
     let mut projected_fields: Vec<Field> = Vec::with_capacity(projection.len());
     for i in &projection {
-        projected_fields.push(table_schema.fields()[*i].clone());
+        projected_fields.push(schema.fields()[*i].clone());
     }
 
     Ok((projection, Schema::new(projected_fields)))
 }
 
+/// Recursively transverses the logical plan removing expressions and that are not needed.
+fn optimize_plan(
+    plan: &LogicalPlan,
+    required_columns: &HashSet<String>, // set of columns required up to this step
+    has_projection: bool,
+) -> Result<LogicalPlan> {
+    match plan {
+        LogicalPlan::Projection {
+            input,
+            expr,
+            schema,
+        } => {
+            // projection:
+            // * remove any expression that is not required
+            // * construct the new set of required columns
+
+            let mut new_expr = Vec::new();
+            let mut new_fields = Vec::new();
+            let mut new_required_columns = HashSet::new();
+
+            // re-write schema and expressions removing un-used columns

Review comment:
       Fixed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove closed pull request #7919: ARROW-9678: [Rust] [DataFusion] Improve projection push down to remove unused columns

Posted by GitBox <gi...@apache.org>.
andygrove closed pull request #7919:
URL: https://github.com/apache/arrow/pull/7919


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org