You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/08/16 16:31:52 UTC

[GitHub] [arrow] andygrove opened a new pull request #7975: ARROW-9758: [Rust] [DataFusion] Allow physical planner to be replaced

andygrove opened a new pull request #7975:
URL: https://github.com/apache/arrow/pull/7975


   This PR mainly moves the physical query logic out of ExecutionContext and into its own struct. There is a new `PhysicalPlanner` trait, and it is now possible to bring-you-own planner if the one in DataFusion doesn't meet your needs (for example, if you need to implement distributed query execution).
   
   pseudo code example:
   
   ```rust
   let config = ExectionConfig::new().with_physical_planner(Arc::new(MyPhysicalPlanner::new()));
   let ctx = ExecutionContext::new(config);
   ctx.sql("SELECT * FROM foo");
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on a change in pull request #7975: ARROW-9758: [Rust] [DataFusion] Allow physical planner to be replaced

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on a change in pull request #7975:
URL: https://github.com/apache/arrow/pull/7975#discussion_r471137546



##########
File path: rust/datafusion/src/execution/context.rs
##########
@@ -373,363 +361,13 @@ impl ExecutionContext {
     pub fn create_physical_plan(
         &self,
         logical_plan: &LogicalPlan,
-        batch_size: usize,
+        _batch_size: usize,

Review comment:
       remove?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] github-actions[bot] commented on pull request #7975: ARROW-9758: [Rust] [DataFusion] Allow physical planner to be replaced

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #7975:
URL: https://github.com/apache/arrow/pull/7975#issuecomment-674549512


   https://issues.apache.org/jira/browse/ARROW-9758


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on a change in pull request #7975: ARROW-9758: [Rust] [DataFusion] Allow physical planner to be replaced

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #7975:
URL: https://github.com/apache/arrow/pull/7975#discussion_r471437170



##########
File path: rust/datafusion/src/execution/context.rs
##########
@@ -373,363 +355,12 @@ impl ExecutionContext {
     pub fn create_physical_plan(
         &self,
         logical_plan: &LogicalPlan,
-        batch_size: usize,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        match logical_plan {
-            LogicalPlan::TableScan {
-                table_name,
-                projection,
-                ..
-            } => match self
-                .state
-                .lock()
-                .expect("failed to lock mutex")
-                .datasources
-                .lock()
-                .expect("failed to lock mutex")
-                .get(table_name)
-            {
-                Some(provider) => {
-                    let partitions = provider.scan(projection, batch_size)?;
-                    if partitions.is_empty() {
-                        Err(ExecutionError::General(
-                            "Table provider returned no partitions".to_string(),
-                        ))
-                    } else {
-                        let schema = match projection {
-                            None => provider.schema().clone(),
-                            Some(p) => Arc::new(Schema::new(
-                                p.iter()
-                                    .map(|i| provider.schema().field(*i).clone())
-                                    .collect(),
-                            )),
-                        };
-
-                        let exec = DatasourceExec::new(schema, partitions.clone());
-                        Ok(Arc::new(exec))
-                    }
-                }
-                _ => Err(ExecutionError::General(format!(
-                    "No table named {}",
-                    table_name
-                ))),
-            },
-            LogicalPlan::InMemoryScan {
-                data,
-                projection,
-                projected_schema,
-                ..
-            } => Ok(Arc::new(MemoryExec::try_new(
-                data,
-                Arc::new(projected_schema.as_ref().to_owned()),
-                projection.to_owned(),
-            )?)),
-            LogicalPlan::CsvScan {
-                path,
-                schema,
-                has_header,
-                delimiter,
-                projection,
-                ..
-            } => Ok(Arc::new(CsvExec::try_new(
-                path,
-                CsvReadOptions::new()
-                    .schema(schema.as_ref())
-                    .delimiter_option(*delimiter)
-                    .has_header(*has_header),
-                projection.to_owned(),
-                batch_size,
-            )?)),
-            LogicalPlan::ParquetScan {
-                path, projection, ..
-            } => Ok(Arc::new(ParquetExec::try_new(
-                path,
-                projection.to_owned(),
-                batch_size,
-            )?)),
-            LogicalPlan::Projection { input, expr, .. } => {
-                let input = self.create_physical_plan(input, batch_size)?;
-                let input_schema = input.as_ref().schema().clone();
-                let runtime_expr = expr
-                    .iter()
-                    .map(|e| {
-                        tuple_err((
-                            self.create_physical_expr(e, &input_schema),
-                            e.name(&input_schema),
-                        ))
-                    })
-                    .collect::<Result<Vec<_>>>()?;
-                Ok(Arc::new(ProjectionExec::try_new(runtime_expr, input)?))
-            }
-            LogicalPlan::Aggregate {
-                input,
-                group_expr,
-                aggr_expr,
-                ..
-            } => {
-                // Initially need to perform the aggregate and then merge the partitions
-                let input = self.create_physical_plan(input, batch_size)?;
-                let input_schema = input.as_ref().schema().clone();
-
-                let groups = group_expr
-                    .iter()
-                    .map(|e| {
-                        tuple_err((
-                            self.create_physical_expr(e, &input_schema),
-                            e.name(&input_schema),
-                        ))
-                    })
-                    .collect::<Result<Vec<_>>>()?;
-                let aggregates = aggr_expr
-                    .iter()
-                    .map(|e| {
-                        tuple_err((
-                            self.create_aggregate_expr(e, &input_schema),
-                            e.name(&input_schema),
-                        ))
-                    })
-                    .collect::<Result<Vec<_>>>()?;
-
-                let initial_aggr = HashAggregateExec::try_new(
-                    groups.clone(),
-                    aggregates.clone(),
-                    input,
-                )?;
-
-                let schema = initial_aggr.schema();
-                let partitions = initial_aggr.partitions()?;
-
-                if partitions.len() == 1 {
-                    return Ok(Arc::new(initial_aggr));
-                }
-
-                let merge = Arc::new(MergeExec::new(
-                    schema.clone(),
-                    partitions,
-                    self.state
-                        .lock()
-                        .expect("failed to lock mutex")
-                        .config
-                        .concurrency,
-                ));
-
-                // construct the expressions for the final aggregation
-                let (final_group, final_aggr) = initial_aggr.make_final_expr(
-                    groups.iter().map(|x| x.1.clone()).collect(),
-                    aggregates.iter().map(|x| x.1.clone()).collect(),
-                );
-
-                // construct a second aggregation, keeping the final column name equal to the first aggregation
-                // and the expressions corresponding to the respective aggregate
-                Ok(Arc::new(HashAggregateExec::try_new(
-                    final_group
-                        .iter()
-                        .enumerate()
-                        .map(|(i, expr)| (expr.clone(), groups[i].1.clone()))
-                        .collect(),
-                    final_aggr
-                        .iter()
-                        .enumerate()
-                        .map(|(i, expr)| (expr.clone(), aggregates[i].1.clone()))
-                        .collect(),
-                    merge,
-                )?))
-            }
-            LogicalPlan::Selection { input, expr, .. } => {
-                let input = self.create_physical_plan(input, batch_size)?;
-                let input_schema = input.as_ref().schema().clone();
-                let runtime_expr = self.create_physical_expr(expr, &input_schema)?;
-                Ok(Arc::new(SelectionExec::try_new(runtime_expr, input)?))
-            }
-            LogicalPlan::Sort { expr, input, .. } => {
-                let input = self.create_physical_plan(input, batch_size)?;
-                let input_schema = input.as_ref().schema().clone();
-
-                let sort_expr = expr
-                    .iter()
-                    .map(|e| match e {
-                        Expr::Sort {
-                            expr,
-                            asc,
-                            nulls_first,
-                        } => self.create_physical_sort_expr(
-                            expr,
-                            &input_schema,
-                            SortOptions {
-                                descending: !*asc,
-                                nulls_first: *nulls_first,
-                            },
-                        ),
-                        _ => Err(ExecutionError::ExecutionError(
-                            "Sort only accepts sort expressions".to_string(),
-                        )),
-                    })
-                    .collect::<Result<Vec<_>>>()?;
-
-                Ok(Arc::new(SortExec::try_new(
-                    sort_expr,
-                    input,
-                    self.state
-                        .lock()
-                        .expect("failed to lock mutex")
-                        .config
-                        .concurrency,
-                )?))
-            }
-            LogicalPlan::Limit { input, n, .. } => {
-                let input = self.create_physical_plan(input, batch_size)?;
-                let input_schema = input.as_ref().schema().clone();
-
-                Ok(Arc::new(GlobalLimitExec::new(
-                    input_schema.clone(),
-                    input.partitions()?,
-                    *n,
-                    self.state
-                        .lock()
-                        .expect("failed to lock mutex")
-                        .config
-                        .concurrency,
-                )))
-            }
-            LogicalPlan::Explain {
-                verbose,
-                plan,
-                stringified_plans,
-                schema,
-            } => {
-                let input = self.create_physical_plan(plan, batch_size)?;
-
-                let mut stringified_plans = stringified_plans
-                    .iter()
-                    .filter(|s| s.should_display(*verbose))
-                    .map(|s| s.clone())
-                    .collect::<Vec<_>>();
-
-                // add in the physical plan if requested
-                if *verbose {
-                    stringified_plans.push(StringifiedPlan::new(
-                        PlanType::PhysicalPlan,
-                        format!("{:#?}", input),
-                    ));
-                }
-                let schema_ref = Arc::new((**schema).clone());
-                Ok(Arc::new(ExplainExec::new(schema_ref, stringified_plans)))
-            }
-            _ => Err(ExecutionError::General(
-                "Unsupported logical plan variant".to_string(),
-            )),
-        }
-    }
-
-    /// Create a physical expression from a logical expression
-    pub fn create_physical_expr(
-        &self,
-        e: &Expr,
-        input_schema: &Schema,
-    ) -> Result<Arc<dyn PhysicalExpr>> {
-        match e {
-            Expr::Alias(expr, ..) => Ok(self.create_physical_expr(expr, input_schema)?),
-            Expr::Column(name) => {
-                // check that name exists
-                input_schema.field_with_name(&name)?;
-                Ok(Arc::new(Column::new(name)))
-            }
-            Expr::Literal(value) => Ok(Arc::new(Literal::new(value.clone()))),
-            Expr::BinaryExpr { left, op, right } => Ok(Arc::new(BinaryExpr::new(
-                self.create_physical_expr(left, input_schema)?,
-                op.clone(),
-                self.create_physical_expr(right, input_schema)?,
-            ))),
-            Expr::Cast { expr, data_type } => Ok(Arc::new(CastExpr::try_new(
-                self.create_physical_expr(expr, input_schema)?,
-                input_schema,
-                data_type.clone(),
-            )?)),
-            Expr::ScalarFunction {
-                name,
-                args,
-                return_type,
-            } => match &self
-                .state
-                .lock()
-                .expect("failed to lock mutex")
-                .scalar_functions
-                .lock()
-                .expect("failed to lock mutex")
-                .get(name)
-            {
-                Some(f) => {
-                    let mut physical_args = vec![];
-                    for e in args {
-                        physical_args.push(self.create_physical_expr(e, input_schema)?);
-                    }
-                    Ok(Arc::new(ScalarFunctionExpr::new(
-                        name,
-                        Box::new(f.fun.clone()),
-                        physical_args,
-                        return_type,
-                    )))
-                }
-                _ => Err(ExecutionError::General(format!(
-                    "Invalid scalar function '{:?}'",
-                    name
-                ))),
-            },
-            other => Err(ExecutionError::NotImplemented(format!(
-                "Physical plan does not support logical expression {:?}",
-                other
-            ))),
-        }
-    }
-
-    /// Create an aggregate expression from a logical expression
-    pub fn create_aggregate_expr(
-        &self,
-        e: &Expr,
-        input_schema: &Schema,
-    ) -> Result<Arc<dyn AggregateExpr>> {
-        match e {
-            Expr::AggregateFunction { name, args, .. } => {
-                match name.to_lowercase().as_ref() {
-                    "sum" => Ok(Arc::new(Sum::new(
-                        self.create_physical_expr(&args[0], input_schema)?,
-                    ))),
-                    "avg" => Ok(Arc::new(Avg::new(
-                        self.create_physical_expr(&args[0], input_schema)?,
-                    ))),
-                    "max" => Ok(Arc::new(Max::new(
-                        self.create_physical_expr(&args[0], input_schema)?,
-                    ))),
-                    "min" => Ok(Arc::new(Min::new(
-                        self.create_physical_expr(&args[0], input_schema)?,
-                    ))),
-                    "count" => Ok(Arc::new(Count::new(
-                        self.create_physical_expr(&args[0], input_schema)?,
-                    ))),
-                    other => Err(ExecutionError::NotImplemented(format!(
-                        "Unsupported aggregate function '{}'",
-                        other
-                    ))),
-                }
-            }
-            other => Err(ExecutionError::General(format!(
-                "Invalid aggregate expression '{:?}'",
-                other
-            ))),
-        }
-    }
-
-    /// Create an aggregate expression from a logical expression
-    pub fn create_physical_sort_expr(
-        &self,
-        e: &Expr,
-        input_schema: &Schema,
-        options: SortOptions,
-    ) -> Result<PhysicalSortExpr> {
-        Ok(PhysicalSortExpr {
-            expr: self.create_physical_expr(e, input_schema)?,
-            options: options,
-        })
+        let planner: Arc<dyn PhysicalPlanner> = match self.config().physical_planner {
+            Some(planner) => planner,
+            None => Arc::new(PhysicalPlannerImpl::default()),

Review comment:
       https://github.com/apache/arrow/pull/7980




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on a change in pull request #7975: ARROW-9758: [Rust] [DataFusion] Allow physical planner to be replaced

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #7975:
URL: https://github.com/apache/arrow/pull/7975#discussion_r471433074



##########
File path: rust/datafusion/src/execution/context.rs
##########
@@ -1452,11 +1109,34 @@ mod tests {
         Ok(())
     }
 
+    #[test]
+    fn custom_physical_planner() -> Result<()> {

Review comment:
       👍 

##########
File path: rust/datafusion/src/execution/context.rs
##########
@@ -373,363 +355,12 @@ impl ExecutionContext {
     pub fn create_physical_plan(
         &self,
         logical_plan: &LogicalPlan,
-        batch_size: usize,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        match logical_plan {
-            LogicalPlan::TableScan {
-                table_name,
-                projection,
-                ..
-            } => match self
-                .state
-                .lock()
-                .expect("failed to lock mutex")
-                .datasources
-                .lock()
-                .expect("failed to lock mutex")
-                .get(table_name)
-            {
-                Some(provider) => {
-                    let partitions = provider.scan(projection, batch_size)?;
-                    if partitions.is_empty() {
-                        Err(ExecutionError::General(
-                            "Table provider returned no partitions".to_string(),
-                        ))
-                    } else {
-                        let schema = match projection {
-                            None => provider.schema().clone(),
-                            Some(p) => Arc::new(Schema::new(
-                                p.iter()
-                                    .map(|i| provider.schema().field(*i).clone())
-                                    .collect(),
-                            )),
-                        };
-
-                        let exec = DatasourceExec::new(schema, partitions.clone());
-                        Ok(Arc::new(exec))
-                    }
-                }
-                _ => Err(ExecutionError::General(format!(
-                    "No table named {}",
-                    table_name
-                ))),
-            },
-            LogicalPlan::InMemoryScan {
-                data,
-                projection,
-                projected_schema,
-                ..
-            } => Ok(Arc::new(MemoryExec::try_new(
-                data,
-                Arc::new(projected_schema.as_ref().to_owned()),
-                projection.to_owned(),
-            )?)),
-            LogicalPlan::CsvScan {
-                path,
-                schema,
-                has_header,
-                delimiter,
-                projection,
-                ..
-            } => Ok(Arc::new(CsvExec::try_new(
-                path,
-                CsvReadOptions::new()
-                    .schema(schema.as_ref())
-                    .delimiter_option(*delimiter)
-                    .has_header(*has_header),
-                projection.to_owned(),
-                batch_size,
-            )?)),
-            LogicalPlan::ParquetScan {
-                path, projection, ..
-            } => Ok(Arc::new(ParquetExec::try_new(
-                path,
-                projection.to_owned(),
-                batch_size,
-            )?)),
-            LogicalPlan::Projection { input, expr, .. } => {
-                let input = self.create_physical_plan(input, batch_size)?;
-                let input_schema = input.as_ref().schema().clone();
-                let runtime_expr = expr
-                    .iter()
-                    .map(|e| {
-                        tuple_err((
-                            self.create_physical_expr(e, &input_schema),
-                            e.name(&input_schema),
-                        ))
-                    })
-                    .collect::<Result<Vec<_>>>()?;
-                Ok(Arc::new(ProjectionExec::try_new(runtime_expr, input)?))
-            }
-            LogicalPlan::Aggregate {
-                input,
-                group_expr,
-                aggr_expr,
-                ..
-            } => {
-                // Initially need to perform the aggregate and then merge the partitions
-                let input = self.create_physical_plan(input, batch_size)?;
-                let input_schema = input.as_ref().schema().clone();
-
-                let groups = group_expr
-                    .iter()
-                    .map(|e| {
-                        tuple_err((
-                            self.create_physical_expr(e, &input_schema),
-                            e.name(&input_schema),
-                        ))
-                    })
-                    .collect::<Result<Vec<_>>>()?;
-                let aggregates = aggr_expr
-                    .iter()
-                    .map(|e| {
-                        tuple_err((
-                            self.create_aggregate_expr(e, &input_schema),
-                            e.name(&input_schema),
-                        ))
-                    })
-                    .collect::<Result<Vec<_>>>()?;
-
-                let initial_aggr = HashAggregateExec::try_new(
-                    groups.clone(),
-                    aggregates.clone(),
-                    input,
-                )?;
-
-                let schema = initial_aggr.schema();
-                let partitions = initial_aggr.partitions()?;
-
-                if partitions.len() == 1 {
-                    return Ok(Arc::new(initial_aggr));
-                }
-
-                let merge = Arc::new(MergeExec::new(
-                    schema.clone(),
-                    partitions,
-                    self.state
-                        .lock()
-                        .expect("failed to lock mutex")
-                        .config
-                        .concurrency,
-                ));
-
-                // construct the expressions for the final aggregation
-                let (final_group, final_aggr) = initial_aggr.make_final_expr(
-                    groups.iter().map(|x| x.1.clone()).collect(),
-                    aggregates.iter().map(|x| x.1.clone()).collect(),
-                );
-
-                // construct a second aggregation, keeping the final column name equal to the first aggregation
-                // and the expressions corresponding to the respective aggregate
-                Ok(Arc::new(HashAggregateExec::try_new(
-                    final_group
-                        .iter()
-                        .enumerate()
-                        .map(|(i, expr)| (expr.clone(), groups[i].1.clone()))
-                        .collect(),
-                    final_aggr
-                        .iter()
-                        .enumerate()
-                        .map(|(i, expr)| (expr.clone(), aggregates[i].1.clone()))
-                        .collect(),
-                    merge,
-                )?))
-            }
-            LogicalPlan::Selection { input, expr, .. } => {
-                let input = self.create_physical_plan(input, batch_size)?;
-                let input_schema = input.as_ref().schema().clone();
-                let runtime_expr = self.create_physical_expr(expr, &input_schema)?;
-                Ok(Arc::new(SelectionExec::try_new(runtime_expr, input)?))
-            }
-            LogicalPlan::Sort { expr, input, .. } => {
-                let input = self.create_physical_plan(input, batch_size)?;
-                let input_schema = input.as_ref().schema().clone();
-
-                let sort_expr = expr
-                    .iter()
-                    .map(|e| match e {
-                        Expr::Sort {
-                            expr,
-                            asc,
-                            nulls_first,
-                        } => self.create_physical_sort_expr(
-                            expr,
-                            &input_schema,
-                            SortOptions {
-                                descending: !*asc,
-                                nulls_first: *nulls_first,
-                            },
-                        ),
-                        _ => Err(ExecutionError::ExecutionError(
-                            "Sort only accepts sort expressions".to_string(),
-                        )),
-                    })
-                    .collect::<Result<Vec<_>>>()?;
-
-                Ok(Arc::new(SortExec::try_new(
-                    sort_expr,
-                    input,
-                    self.state
-                        .lock()
-                        .expect("failed to lock mutex")
-                        .config
-                        .concurrency,
-                )?))
-            }
-            LogicalPlan::Limit { input, n, .. } => {
-                let input = self.create_physical_plan(input, batch_size)?;
-                let input_schema = input.as_ref().schema().clone();
-
-                Ok(Arc::new(GlobalLimitExec::new(
-                    input_schema.clone(),
-                    input.partitions()?,
-                    *n,
-                    self.state
-                        .lock()
-                        .expect("failed to lock mutex")
-                        .config
-                        .concurrency,
-                )))
-            }
-            LogicalPlan::Explain {
-                verbose,
-                plan,
-                stringified_plans,
-                schema,
-            } => {
-                let input = self.create_physical_plan(plan, batch_size)?;
-
-                let mut stringified_plans = stringified_plans
-                    .iter()
-                    .filter(|s| s.should_display(*verbose))
-                    .map(|s| s.clone())
-                    .collect::<Vec<_>>();
-
-                // add in the physical plan if requested
-                if *verbose {
-                    stringified_plans.push(StringifiedPlan::new(
-                        PlanType::PhysicalPlan,
-                        format!("{:#?}", input),
-                    ));
-                }
-                let schema_ref = Arc::new((**schema).clone());
-                Ok(Arc::new(ExplainExec::new(schema_ref, stringified_plans)))
-            }
-            _ => Err(ExecutionError::General(
-                "Unsupported logical plan variant".to_string(),
-            )),
-        }
-    }
-
-    /// Create a physical expression from a logical expression
-    pub fn create_physical_expr(
-        &self,
-        e: &Expr,
-        input_schema: &Schema,
-    ) -> Result<Arc<dyn PhysicalExpr>> {
-        match e {
-            Expr::Alias(expr, ..) => Ok(self.create_physical_expr(expr, input_schema)?),
-            Expr::Column(name) => {
-                // check that name exists
-                input_schema.field_with_name(&name)?;
-                Ok(Arc::new(Column::new(name)))
-            }
-            Expr::Literal(value) => Ok(Arc::new(Literal::new(value.clone()))),
-            Expr::BinaryExpr { left, op, right } => Ok(Arc::new(BinaryExpr::new(
-                self.create_physical_expr(left, input_schema)?,
-                op.clone(),
-                self.create_physical_expr(right, input_schema)?,
-            ))),
-            Expr::Cast { expr, data_type } => Ok(Arc::new(CastExpr::try_new(
-                self.create_physical_expr(expr, input_schema)?,
-                input_schema,
-                data_type.clone(),
-            )?)),
-            Expr::ScalarFunction {
-                name,
-                args,
-                return_type,
-            } => match &self
-                .state
-                .lock()
-                .expect("failed to lock mutex")
-                .scalar_functions
-                .lock()
-                .expect("failed to lock mutex")
-                .get(name)
-            {
-                Some(f) => {
-                    let mut physical_args = vec![];
-                    for e in args {
-                        physical_args.push(self.create_physical_expr(e, input_schema)?);
-                    }
-                    Ok(Arc::new(ScalarFunctionExpr::new(
-                        name,
-                        Box::new(f.fun.clone()),
-                        physical_args,
-                        return_type,
-                    )))
-                }
-                _ => Err(ExecutionError::General(format!(
-                    "Invalid scalar function '{:?}'",
-                    name
-                ))),
-            },
-            other => Err(ExecutionError::NotImplemented(format!(
-                "Physical plan does not support logical expression {:?}",
-                other
-            ))),
-        }
-    }
-
-    /// Create an aggregate expression from a logical expression
-    pub fn create_aggregate_expr(
-        &self,
-        e: &Expr,
-        input_schema: &Schema,
-    ) -> Result<Arc<dyn AggregateExpr>> {
-        match e {
-            Expr::AggregateFunction { name, args, .. } => {
-                match name.to_lowercase().as_ref() {
-                    "sum" => Ok(Arc::new(Sum::new(
-                        self.create_physical_expr(&args[0], input_schema)?,
-                    ))),
-                    "avg" => Ok(Arc::new(Avg::new(
-                        self.create_physical_expr(&args[0], input_schema)?,
-                    ))),
-                    "max" => Ok(Arc::new(Max::new(
-                        self.create_physical_expr(&args[0], input_schema)?,
-                    ))),
-                    "min" => Ok(Arc::new(Min::new(
-                        self.create_physical_expr(&args[0], input_schema)?,
-                    ))),
-                    "count" => Ok(Arc::new(Count::new(
-                        self.create_physical_expr(&args[0], input_schema)?,
-                    ))),
-                    other => Err(ExecutionError::NotImplemented(format!(
-                        "Unsupported aggregate function '{}'",
-                        other
-                    ))),
-                }
-            }
-            other => Err(ExecutionError::General(format!(
-                "Invalid aggregate expression '{:?}'",
-                other
-            ))),
-        }
-    }
-
-    /// Create an aggregate expression from a logical expression
-    pub fn create_physical_sort_expr(
-        &self,
-        e: &Expr,
-        input_schema: &Schema,
-        options: SortOptions,
-    ) -> Result<PhysicalSortExpr> {
-        Ok(PhysicalSortExpr {
-            expr: self.create_physical_expr(e, input_schema)?,
-            options: options,
-        })
+        let planner: Arc<dyn PhysicalPlanner> = match self.config().physical_planner {
+            Some(planner) => planner,
+            None => Arc::new(PhysicalPlannerImpl::default()),

Review comment:
       Maybe as part of a future PR, this could be called `DefaultPhysicalPlanner` to hint more strongly that one can replace it with another. I'll throw up a PR proposing that change




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove closed pull request #7975: ARROW-9758: [Rust] [DataFusion] Allow physical planner to be replaced

Posted by GitBox <gi...@apache.org>.
andygrove closed pull request #7975:
URL: https://github.com/apache/arrow/pull/7975


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on a change in pull request #7975: ARROW-9758: [Rust] [DataFusion] Allow physical planner to be replaced

Posted by GitBox <gi...@apache.org>.
andygrove commented on a change in pull request #7975:
URL: https://github.com/apache/arrow/pull/7975#discussion_r471139144



##########
File path: rust/datafusion/src/execution/context.rs
##########
@@ -373,363 +361,13 @@ impl ExecutionContext {
     pub fn create_physical_plan(
         &self,
         logical_plan: &LogicalPlan,
-        batch_size: usize,
+        _batch_size: usize,

Review comment:
       oops, thanks




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on pull request #7975: ARROW-9758: [Rust] [DataFusion] Allow physical planner to be replaced

Posted by GitBox <gi...@apache.org>.
andygrove commented on pull request #7975:
URL: https://github.com/apache/arrow/pull/7975#issuecomment-674547945


   @jorgecarleitao @alamb 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on pull request #7975: ARROW-9758: [Rust] [DataFusion] Allow physical planner to be replaced

Posted by GitBox <gi...@apache.org>.
andygrove commented on pull request #7975:
URL: https://github.com/apache/arrow/pull/7975#issuecomment-674560250


   Thanks @jorgecarleitao I added a test and removed the legacy batch_size param everywhere now that it is in the configs. This really helped clean up the UX some more.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org