You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/08/17 12:05:55 UTC

[GitHub] [arrow] alamb commented on a change in pull request #7975: ARROW-9758: [Rust] [DataFusion] Allow physical planner to be replaced

alamb commented on a change in pull request #7975:
URL: https://github.com/apache/arrow/pull/7975#discussion_r471433074



##########
File path: rust/datafusion/src/execution/context.rs
##########
@@ -1452,11 +1109,34 @@ mod tests {
         Ok(())
     }
 
+    #[test]
+    fn custom_physical_planner() -> Result<()> {

Review comment:
       👍 

##########
File path: rust/datafusion/src/execution/context.rs
##########
@@ -373,363 +355,12 @@ impl ExecutionContext {
     pub fn create_physical_plan(
         &self,
         logical_plan: &LogicalPlan,
-        batch_size: usize,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        match logical_plan {
-            LogicalPlan::TableScan {
-                table_name,
-                projection,
-                ..
-            } => match self
-                .state
-                .lock()
-                .expect("failed to lock mutex")
-                .datasources
-                .lock()
-                .expect("failed to lock mutex")
-                .get(table_name)
-            {
-                Some(provider) => {
-                    let partitions = provider.scan(projection, batch_size)?;
-                    if partitions.is_empty() {
-                        Err(ExecutionError::General(
-                            "Table provider returned no partitions".to_string(),
-                        ))
-                    } else {
-                        let schema = match projection {
-                            None => provider.schema().clone(),
-                            Some(p) => Arc::new(Schema::new(
-                                p.iter()
-                                    .map(|i| provider.schema().field(*i).clone())
-                                    .collect(),
-                            )),
-                        };
-
-                        let exec = DatasourceExec::new(schema, partitions.clone());
-                        Ok(Arc::new(exec))
-                    }
-                }
-                _ => Err(ExecutionError::General(format!(
-                    "No table named {}",
-                    table_name
-                ))),
-            },
-            LogicalPlan::InMemoryScan {
-                data,
-                projection,
-                projected_schema,
-                ..
-            } => Ok(Arc::new(MemoryExec::try_new(
-                data,
-                Arc::new(projected_schema.as_ref().to_owned()),
-                projection.to_owned(),
-            )?)),
-            LogicalPlan::CsvScan {
-                path,
-                schema,
-                has_header,
-                delimiter,
-                projection,
-                ..
-            } => Ok(Arc::new(CsvExec::try_new(
-                path,
-                CsvReadOptions::new()
-                    .schema(schema.as_ref())
-                    .delimiter_option(*delimiter)
-                    .has_header(*has_header),
-                projection.to_owned(),
-                batch_size,
-            )?)),
-            LogicalPlan::ParquetScan {
-                path, projection, ..
-            } => Ok(Arc::new(ParquetExec::try_new(
-                path,
-                projection.to_owned(),
-                batch_size,
-            )?)),
-            LogicalPlan::Projection { input, expr, .. } => {
-                let input = self.create_physical_plan(input, batch_size)?;
-                let input_schema = input.as_ref().schema().clone();
-                let runtime_expr = expr
-                    .iter()
-                    .map(|e| {
-                        tuple_err((
-                            self.create_physical_expr(e, &input_schema),
-                            e.name(&input_schema),
-                        ))
-                    })
-                    .collect::<Result<Vec<_>>>()?;
-                Ok(Arc::new(ProjectionExec::try_new(runtime_expr, input)?))
-            }
-            LogicalPlan::Aggregate {
-                input,
-                group_expr,
-                aggr_expr,
-                ..
-            } => {
-                // Initially need to perform the aggregate and then merge the partitions
-                let input = self.create_physical_plan(input, batch_size)?;
-                let input_schema = input.as_ref().schema().clone();
-
-                let groups = group_expr
-                    .iter()
-                    .map(|e| {
-                        tuple_err((
-                            self.create_physical_expr(e, &input_schema),
-                            e.name(&input_schema),
-                        ))
-                    })
-                    .collect::<Result<Vec<_>>>()?;
-                let aggregates = aggr_expr
-                    .iter()
-                    .map(|e| {
-                        tuple_err((
-                            self.create_aggregate_expr(e, &input_schema),
-                            e.name(&input_schema),
-                        ))
-                    })
-                    .collect::<Result<Vec<_>>>()?;
-
-                let initial_aggr = HashAggregateExec::try_new(
-                    groups.clone(),
-                    aggregates.clone(),
-                    input,
-                )?;
-
-                let schema = initial_aggr.schema();
-                let partitions = initial_aggr.partitions()?;
-
-                if partitions.len() == 1 {
-                    return Ok(Arc::new(initial_aggr));
-                }
-
-                let merge = Arc::new(MergeExec::new(
-                    schema.clone(),
-                    partitions,
-                    self.state
-                        .lock()
-                        .expect("failed to lock mutex")
-                        .config
-                        .concurrency,
-                ));
-
-                // construct the expressions for the final aggregation
-                let (final_group, final_aggr) = initial_aggr.make_final_expr(
-                    groups.iter().map(|x| x.1.clone()).collect(),
-                    aggregates.iter().map(|x| x.1.clone()).collect(),
-                );
-
-                // construct a second aggregation, keeping the final column name equal to the first aggregation
-                // and the expressions corresponding to the respective aggregate
-                Ok(Arc::new(HashAggregateExec::try_new(
-                    final_group
-                        .iter()
-                        .enumerate()
-                        .map(|(i, expr)| (expr.clone(), groups[i].1.clone()))
-                        .collect(),
-                    final_aggr
-                        .iter()
-                        .enumerate()
-                        .map(|(i, expr)| (expr.clone(), aggregates[i].1.clone()))
-                        .collect(),
-                    merge,
-                )?))
-            }
-            LogicalPlan::Selection { input, expr, .. } => {
-                let input = self.create_physical_plan(input, batch_size)?;
-                let input_schema = input.as_ref().schema().clone();
-                let runtime_expr = self.create_physical_expr(expr, &input_schema)?;
-                Ok(Arc::new(SelectionExec::try_new(runtime_expr, input)?))
-            }
-            LogicalPlan::Sort { expr, input, .. } => {
-                let input = self.create_physical_plan(input, batch_size)?;
-                let input_schema = input.as_ref().schema().clone();
-
-                let sort_expr = expr
-                    .iter()
-                    .map(|e| match e {
-                        Expr::Sort {
-                            expr,
-                            asc,
-                            nulls_first,
-                        } => self.create_physical_sort_expr(
-                            expr,
-                            &input_schema,
-                            SortOptions {
-                                descending: !*asc,
-                                nulls_first: *nulls_first,
-                            },
-                        ),
-                        _ => Err(ExecutionError::ExecutionError(
-                            "Sort only accepts sort expressions".to_string(),
-                        )),
-                    })
-                    .collect::<Result<Vec<_>>>()?;
-
-                Ok(Arc::new(SortExec::try_new(
-                    sort_expr,
-                    input,
-                    self.state
-                        .lock()
-                        .expect("failed to lock mutex")
-                        .config
-                        .concurrency,
-                )?))
-            }
-            LogicalPlan::Limit { input, n, .. } => {
-                let input = self.create_physical_plan(input, batch_size)?;
-                let input_schema = input.as_ref().schema().clone();
-
-                Ok(Arc::new(GlobalLimitExec::new(
-                    input_schema.clone(),
-                    input.partitions()?,
-                    *n,
-                    self.state
-                        .lock()
-                        .expect("failed to lock mutex")
-                        .config
-                        .concurrency,
-                )))
-            }
-            LogicalPlan::Explain {
-                verbose,
-                plan,
-                stringified_plans,
-                schema,
-            } => {
-                let input = self.create_physical_plan(plan, batch_size)?;
-
-                let mut stringified_plans = stringified_plans
-                    .iter()
-                    .filter(|s| s.should_display(*verbose))
-                    .map(|s| s.clone())
-                    .collect::<Vec<_>>();
-
-                // add in the physical plan if requested
-                if *verbose {
-                    stringified_plans.push(StringifiedPlan::new(
-                        PlanType::PhysicalPlan,
-                        format!("{:#?}", input),
-                    ));
-                }
-                let schema_ref = Arc::new((**schema).clone());
-                Ok(Arc::new(ExplainExec::new(schema_ref, stringified_plans)))
-            }
-            _ => Err(ExecutionError::General(
-                "Unsupported logical plan variant".to_string(),
-            )),
-        }
-    }
-
-    /// Create a physical expression from a logical expression
-    pub fn create_physical_expr(
-        &self,
-        e: &Expr,
-        input_schema: &Schema,
-    ) -> Result<Arc<dyn PhysicalExpr>> {
-        match e {
-            Expr::Alias(expr, ..) => Ok(self.create_physical_expr(expr, input_schema)?),
-            Expr::Column(name) => {
-                // check that name exists
-                input_schema.field_with_name(&name)?;
-                Ok(Arc::new(Column::new(name)))
-            }
-            Expr::Literal(value) => Ok(Arc::new(Literal::new(value.clone()))),
-            Expr::BinaryExpr { left, op, right } => Ok(Arc::new(BinaryExpr::new(
-                self.create_physical_expr(left, input_schema)?,
-                op.clone(),
-                self.create_physical_expr(right, input_schema)?,
-            ))),
-            Expr::Cast { expr, data_type } => Ok(Arc::new(CastExpr::try_new(
-                self.create_physical_expr(expr, input_schema)?,
-                input_schema,
-                data_type.clone(),
-            )?)),
-            Expr::ScalarFunction {
-                name,
-                args,
-                return_type,
-            } => match &self
-                .state
-                .lock()
-                .expect("failed to lock mutex")
-                .scalar_functions
-                .lock()
-                .expect("failed to lock mutex")
-                .get(name)
-            {
-                Some(f) => {
-                    let mut physical_args = vec![];
-                    for e in args {
-                        physical_args.push(self.create_physical_expr(e, input_schema)?);
-                    }
-                    Ok(Arc::new(ScalarFunctionExpr::new(
-                        name,
-                        Box::new(f.fun.clone()),
-                        physical_args,
-                        return_type,
-                    )))
-                }
-                _ => Err(ExecutionError::General(format!(
-                    "Invalid scalar function '{:?}'",
-                    name
-                ))),
-            },
-            other => Err(ExecutionError::NotImplemented(format!(
-                "Physical plan does not support logical expression {:?}",
-                other
-            ))),
-        }
-    }
-
-    /// Create an aggregate expression from a logical expression
-    pub fn create_aggregate_expr(
-        &self,
-        e: &Expr,
-        input_schema: &Schema,
-    ) -> Result<Arc<dyn AggregateExpr>> {
-        match e {
-            Expr::AggregateFunction { name, args, .. } => {
-                match name.to_lowercase().as_ref() {
-                    "sum" => Ok(Arc::new(Sum::new(
-                        self.create_physical_expr(&args[0], input_schema)?,
-                    ))),
-                    "avg" => Ok(Arc::new(Avg::new(
-                        self.create_physical_expr(&args[0], input_schema)?,
-                    ))),
-                    "max" => Ok(Arc::new(Max::new(
-                        self.create_physical_expr(&args[0], input_schema)?,
-                    ))),
-                    "min" => Ok(Arc::new(Min::new(
-                        self.create_physical_expr(&args[0], input_schema)?,
-                    ))),
-                    "count" => Ok(Arc::new(Count::new(
-                        self.create_physical_expr(&args[0], input_schema)?,
-                    ))),
-                    other => Err(ExecutionError::NotImplemented(format!(
-                        "Unsupported aggregate function '{}'",
-                        other
-                    ))),
-                }
-            }
-            other => Err(ExecutionError::General(format!(
-                "Invalid aggregate expression '{:?}'",
-                other
-            ))),
-        }
-    }
-
-    /// Create an aggregate expression from a logical expression
-    pub fn create_physical_sort_expr(
-        &self,
-        e: &Expr,
-        input_schema: &Schema,
-        options: SortOptions,
-    ) -> Result<PhysicalSortExpr> {
-        Ok(PhysicalSortExpr {
-            expr: self.create_physical_expr(e, input_schema)?,
-            options: options,
-        })
+        let planner: Arc<dyn PhysicalPlanner> = match self.config().physical_planner {
+            Some(planner) => planner,
+            None => Arc::new(PhysicalPlannerImpl::default()),

Review comment:
       Maybe as part of a future PR, this could be called `DefaultPhysicalPlanner` to hint more strongly that one can replace it with another. I'll throw up a PR proposing that change




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org