You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/06/11 18:50:33 UTC

[arrow-datafusion] branch master updated: reuse datafusion physical planner in ballista building from protobuf (#532)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new ad70a1e  reuse datafusion physical planner in ballista building from protobuf (#532)
ad70a1e is described below

commit ad70a1e91667174436f2110a70e3e557c7069e9a
Author: Jiayu Liu <Ji...@users.noreply.github.com>
AuthorDate: Sat Jun 12 02:50:23 2021 +0800

    reuse datafusion physical planner in ballista building from protobuf (#532)
    
    * use logical planner in ballista building
    
    * simplify statement
    
    * fix unit test
    
    * fix per comment
---
 .../core/src/serde/physical_plan/from_proto.rs     | 142 ++++-----------------
 datafusion/src/physical_plan/planner.rs            | 116 ++++++++++++++---
 datafusion/src/physical_plan/windows.rs            |  44 +++++--
 3 files changed, 153 insertions(+), 149 deletions(-)

diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
index b319d5b..d49d53c 100644
--- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs
+++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
@@ -36,7 +36,7 @@ use datafusion::execution::context::{
     ExecutionConfig, ExecutionContextState, ExecutionProps,
 };
 use datafusion::logical_plan::{DFSchema, Expr};
-use datafusion::physical_plan::aggregates::{create_aggregate_expr, AggregateFunction};
+use datafusion::physical_plan::aggregates::AggregateFunction;
 use datafusion::physical_plan::expressions::col;
 use datafusion::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec};
 use datafusion::physical_plan::hash_join::PartitionMode;
@@ -45,7 +45,6 @@ use datafusion::physical_plan::planner::DefaultPhysicalPlanner;
 use datafusion::physical_plan::window_functions::{
     BuiltInWindowFunction, WindowFunction,
 };
-use datafusion::physical_plan::windows::create_window_expr;
 use datafusion::physical_plan::windows::WindowAggExec;
 use datafusion::physical_plan::{
     coalesce_batches::CoalesceBatchesExec,
@@ -205,76 +204,27 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
                         )
                     })?
                     .clone();
-
                 let physical_schema: SchemaRef =
                     SchemaRef::new((&input_schema).try_into()?);
-
-                let catalog_list =
-                    Arc::new(MemoryCatalogList::new()) as Arc<dyn CatalogList>;
-                let ctx_state = ExecutionContextState {
-                    catalog_list,
-                    scalar_functions: Default::default(),
-                    var_provider: Default::default(),
-                    aggregate_functions: Default::default(),
-                    config: ExecutionConfig::new(),
-                    execution_props: ExecutionProps::new(),
-                };
-
+                let ctx_state = ExecutionContextState::new();
                 let window_agg_expr: Vec<(Expr, String)> = window_agg
                     .window_expr
                     .iter()
                     .zip(window_agg.window_expr_name.iter())
                     .map(|(expr, name)| expr.try_into().map(|expr| (expr, name.clone())))
                     .collect::<Result<Vec<_>, _>>()?;
-
-                let mut physical_window_expr = vec![];
-
                 let df_planner = DefaultPhysicalPlanner::default();
-
-                for (expr, name) in &window_agg_expr {
-                    match expr {
-                        Expr::WindowFunction {
-                            fun,
-                            args,
-                            partition_by,
-                            order_by,
-                            window_frame,
-                            ..
-                        } => {
-                            let arg = df_planner
-                                .create_physical_expr(
-                                    &args[0],
-                                    &physical_schema,
-                                    &ctx_state,
-                                )
-                                .map_err(|e| {
-                                    BallistaError::General(format!("{:?}", e))
-                                })?;
-                            if !partition_by.is_empty() {
-                                return Err(BallistaError::NotImplemented("Window function with partition by is not yet implemented".to_owned()));
-                            }
-                            if !order_by.is_empty() {
-                                return Err(BallistaError::NotImplemented("Window function with order by is not yet implemented".to_owned()));
-                            }
-                            if window_frame.is_some() {
-                                return Err(BallistaError::NotImplemented("Window function with window frame is not yet implemented".to_owned()));
-                            }
-                            let window_expr = create_window_expr(
-                                &fun,
-                                &[arg],
-                                &physical_schema,
-                                name.to_owned(),
-                            )?;
-                            physical_window_expr.push(window_expr);
-                        }
-                        _ => {
-                            return Err(BallistaError::General(
-                                "Invalid expression for WindowAggrExec".to_string(),
-                            ));
-                        }
-                    }
-                }
-
+                let physical_window_expr = window_agg_expr
+                    .iter()
+                    .map(|(expr, name)| {
+                        df_planner.create_window_expr_with_name(
+                            expr,
+                            name.to_string(),
+                            &physical_schema,
+                            &ctx_state,
+                        )
+                    })
+                    .collect::<Result<Vec<_>, _>>()?;
                 Ok(Arc::new(WindowAggExec::try_new(
                     physical_window_expr,
                     input,
@@ -297,7 +247,6 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
                         AggregateMode::FinalPartitioned
                     }
                 };
-
                 let group = hash_agg
                     .group_expr
                     .iter()
@@ -306,25 +255,13 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
                         compile_expr(expr, &input.schema()).map(|e| (e, name.to_string()))
                     })
                     .collect::<Result<Vec<_>, _>>()?;
-
                 let logical_agg_expr: Vec<(Expr, String)> = hash_agg
                     .aggr_expr
                     .iter()
                     .zip(hash_agg.aggr_expr_name.iter())
                     .map(|(expr, name)| expr.try_into().map(|expr| (expr, name.clone())))
                     .collect::<Result<Vec<_>, _>>()?;
-
-                let catalog_list =
-                    Arc::new(MemoryCatalogList::new()) as Arc<dyn CatalogList>;
-                let ctx_state = ExecutionContextState {
-                    catalog_list,
-                    scalar_functions: Default::default(),
-                    var_provider: Default::default(),
-                    aggregate_functions: Default::default(),
-                    config: ExecutionConfig::new(),
-                    execution_props: ExecutionProps::new(),
-                };
-
+                let ctx_state = ExecutionContextState::new();
                 let input_schema = hash_agg
                     .input_schema
                     .as_ref()
@@ -336,37 +273,18 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
                     .clone();
                 let physical_schema: SchemaRef =
                     SchemaRef::new((&input_schema).try_into()?);
-
-                let mut physical_aggr_expr = vec![];
-
                 let df_planner = DefaultPhysicalPlanner::default();
-                for (expr, name) in &logical_agg_expr {
-                    match expr {
-                        Expr::AggregateFunction { fun, args, .. } => {
-                            let arg = df_planner
-                                .create_physical_expr(
-                                    &args[0],
-                                    &physical_schema,
-                                    &ctx_state,
-                                )
-                                .map_err(|e| {
-                                    BallistaError::General(format!("{:?}", e))
-                                })?;
-                            physical_aggr_expr.push(create_aggregate_expr(
-                                &fun,
-                                false,
-                                &[arg],
-                                &physical_schema,
-                                name.to_string(),
-                            )?);
-                        }
-                        _ => {
-                            return Err(BallistaError::General(
-                                "Invalid expression for HashAggregateExec".to_string(),
-                            ))
-                        }
-                    }
-                }
+                let physical_aggr_expr = logical_agg_expr
+                    .iter()
+                    .map(|(expr, name)| {
+                        df_planner.create_aggregate_expr_with_name(
+                            expr,
+                            name.to_string(),
+                            &physical_schema,
+                            &ctx_state,
+                        )
+                    })
+                    .collect::<Result<Vec<_>, _>>()?;
                 Ok(Arc::new(HashAggregateExec::try_new(
                     agg_mode,
                     group,
@@ -484,15 +402,7 @@ fn compile_expr(
     schema: &Schema,
 ) -> Result<Arc<dyn PhysicalExpr>, BallistaError> {
     let df_planner = DefaultPhysicalPlanner::default();
-    let catalog_list = Arc::new(MemoryCatalogList::new()) as Arc<dyn CatalogList>;
-    let state = ExecutionContextState {
-        catalog_list,
-        scalar_functions: HashMap::new(),
-        var_provider: HashMap::new(),
-        aggregate_functions: HashMap::new(),
-        config: ExecutionConfig::new(),
-        execution_props: ExecutionProps::new(),
-    };
+    let state = ExecutionContextState::new();
     let expr: Expr = expr.try_into()?;
     df_planner
         .create_physical_expr(&expr, schema, &state)
diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs
index d7451c7..d42948a 100644
--- a/datafusion/src/physical_plan/planner.rs
+++ b/datafusion/src/physical_plan/planner.rs
@@ -731,34 +731,82 @@ impl DefaultPhysicalPlanner {
         }
     }
 
-    /// Create a window expression from a logical expression
-    pub fn create_window_expr(
+    /// Create a window expression with a name from a logical expression
+    pub fn create_window_expr_with_name(
         &self,
         e: &Expr,
-        logical_input_schema: &DFSchema,
+        name: String,
         physical_input_schema: &Schema,
         ctx_state: &ExecutionContextState,
     ) -> Result<Arc<dyn WindowExpr>> {
-        // unpack aliased logical expressions, e.g. "sum(col) over () as total"
-        let (name, e) = match e {
-            Expr::Alias(sub_expr, alias) => (alias.clone(), sub_expr.as_ref()),
-            _ => (e.name(logical_input_schema)?, e),
-        };
-
         match e {
-            Expr::WindowFunction { fun, args, .. } => {
+            Expr::WindowFunction {
+                fun,
+                args,
+                partition_by,
+                order_by,
+                window_frame,
+            } => {
                 let args = args
                     .iter()
                     .map(|e| {
                         self.create_physical_expr(e, physical_input_schema, ctx_state)
                     })
                     .collect::<Result<Vec<_>>>()?;
-                // if !order_by.is_empty() {
-                //     return Err(DataFusionError::NotImplemented(
-                //         "Window function with order by is not yet implemented".to_owned(),
-                //     ));
-                // }
-                windows::create_window_expr(fun, &args, physical_input_schema, name)
+                let partition_by = partition_by
+                    .iter()
+                    .map(|e| {
+                        self.create_physical_expr(e, physical_input_schema, ctx_state)
+                    })
+                    .collect::<Result<Vec<_>>>()?;
+                let order_by = order_by
+                    .iter()
+                    .map(|e| match e {
+                        Expr::Sort {
+                            expr,
+                            asc,
+                            nulls_first,
+                        } => self.create_physical_sort_expr(
+                            expr,
+                            &physical_input_schema,
+                            SortOptions {
+                                descending: !*asc,
+                                nulls_first: *nulls_first,
+                            },
+                            &ctx_state,
+                        ),
+                        _ => Err(DataFusionError::Plan(
+                            "Sort only accepts sort expressions".to_string(),
+                        )),
+                    })
+                    .collect::<Result<Vec<_>>>()?;
+                if !partition_by.is_empty() {
+                    return Err(DataFusionError::NotImplemented(
+                            "window expression with non-empty partition by clause is not yet supported"
+                                .to_owned(),
+                        ));
+                }
+                if !order_by.is_empty() {
+                    return Err(DataFusionError::NotImplemented(
+                            "window expression with non-empty order by clause is not yet supported"
+                                .to_owned(),
+                        ));
+                }
+                if window_frame.is_some() {
+                    return Err(DataFusionError::NotImplemented(
+                            "window expression with window frame definition is not yet supported"
+                                .to_owned(),
+                        ));
+                }
+                windows::create_window_expr(
+                    fun,
+                    name,
+                    &args,
+                    &partition_by,
+                    &order_by,
+                    *window_frame,
+                    physical_input_schema,
+                )
             }
             other => Err(DataFusionError::Internal(format!(
                 "Invalid window expression '{:?}'",
@@ -767,20 +815,30 @@ impl DefaultPhysicalPlanner {
         }
     }
 
-    /// Create an aggregate expression from a logical expression
-    pub fn create_aggregate_expr(
+    /// Create a window expression from a logical expression or an alias
+    pub fn create_window_expr(
         &self,
         e: &Expr,
         logical_input_schema: &DFSchema,
         physical_input_schema: &Schema,
         ctx_state: &ExecutionContextState,
-    ) -> Result<Arc<dyn AggregateExpr>> {
-        // unpack aliased logical expressions, e.g. "sum(col) as total"
+    ) -> Result<Arc<dyn WindowExpr>> {
+        // unpack aliased logical expressions, e.g. "sum(col) over () as total"
         let (name, e) = match e {
             Expr::Alias(sub_expr, alias) => (alias.clone(), sub_expr.as_ref()),
             _ => (e.name(logical_input_schema)?, e),
         };
+        self.create_window_expr_with_name(e, name, physical_input_schema, ctx_state)
+    }
 
+    /// Create an aggregate expression with a name from a logical expression
+    pub fn create_aggregate_expr_with_name(
+        &self,
+        e: &Expr,
+        name: String,
+        physical_input_schema: &Schema,
+        ctx_state: &ExecutionContextState,
+    ) -> Result<Arc<dyn AggregateExpr>> {
         match e {
             Expr::AggregateFunction {
                 fun,
@@ -819,7 +877,23 @@ impl DefaultPhysicalPlanner {
         }
     }
 
-    /// Create an aggregate expression from a logical expression
+    /// Create an aggregate expression from a logical expression or an alias
+    pub fn create_aggregate_expr(
+        &self,
+        e: &Expr,
+        logical_input_schema: &DFSchema,
+        physical_input_schema: &Schema,
+        ctx_state: &ExecutionContextState,
+    ) -> Result<Arc<dyn AggregateExpr>> {
+        // unpack aliased logical expressions, e.g. "sum(col) as total"
+        let (name, e) = match e {
+            Expr::Alias(sub_expr, alias) => (alias.clone(), sub_expr.as_ref()),
+            _ => (e.name(logical_input_schema)?, e),
+        };
+        self.create_aggregate_expr_with_name(e, name, physical_input_schema, ctx_state)
+    }
+
+    /// Create a physical sort expression from a logical expression
     pub fn create_physical_sort_expr(
         &self,
         e: &Expr,
diff --git a/datafusion/src/physical_plan/windows.rs b/datafusion/src/physical_plan/windows.rs
index 9a6b929..565a9ee 100644
--- a/datafusion/src/physical_plan/windows.rs
+++ b/datafusion/src/physical_plan/windows.rs
@@ -18,9 +18,11 @@
 //! Execution plan for window functions
 
 use crate::error::{DataFusionError, Result};
+
+use crate::logical_plan::window_frames::WindowFrame;
 use crate::physical_plan::{
     aggregates, common,
-    expressions::{Literal, NthValue, RowNumber},
+    expressions::{Literal, NthValue, PhysicalSortExpr, RowNumber},
     type_coercion::coerce,
     window_functions::signature_for_built_in,
     window_functions::BuiltInWindowFunctionExpr,
@@ -61,12 +63,18 @@ pub struct WindowAggExec {
 /// Create a physical expression for window function
 pub fn create_window_expr(
     fun: &WindowFunction,
+    name: String,
     args: &[Arc<dyn PhysicalExpr>],
+    // https://github.com/apache/arrow-datafusion/issues/299
+    _partition_by: &[Arc<dyn PhysicalExpr>],
+    // https://github.com/apache/arrow-datafusion/issues/360
+    _order_by: &[PhysicalSortExpr],
+    // https://github.com/apache/arrow-datafusion/issues/361
+    _window_frame: Option<WindowFrame>,
     input_schema: &Schema,
-    name: String,
 ) -> Result<Arc<dyn WindowExpr>> {
-    match fun {
-        WindowFunction::AggregateFunction(fun) => Ok(Arc::new(AggregateWindowExpr {
+    Ok(match fun {
+        WindowFunction::AggregateFunction(fun) => Arc::new(AggregateWindowExpr {
             aggregate: aggregates::create_aggregate_expr(
                 fun,
                 false,
@@ -74,11 +82,11 @@ pub fn create_window_expr(
                 input_schema,
                 name,
             )?,
-        })),
-        WindowFunction::BuiltInWindowFunction(fun) => Ok(Arc::new(BuiltInWindowExpr {
+        }),
+        WindowFunction::BuiltInWindowFunction(fun) => Arc::new(BuiltInWindowExpr {
             window: create_built_in_window_expr(fun, args, input_schema, name)?,
-        })),
-    }
+        }),
+    })
 }
 
 fn create_built_in_window_expr(
@@ -537,9 +545,12 @@ mod tests {
         let window_exec = Arc::new(WindowAggExec::try_new(
             vec![create_window_expr(
                 &WindowFunction::AggregateFunction(AggregateFunction::Count),
+                "count".to_owned(),
                 &[col("c3")],
+                &[],
+                &[],
+                Some(WindowFrame::default()),
                 schema.as_ref(),
-                "count".to_owned(),
             )?],
             input,
             schema.clone(),
@@ -567,21 +578,30 @@ mod tests {
             vec![
                 create_window_expr(
                     &WindowFunction::AggregateFunction(AggregateFunction::Count),
+                    "count".to_owned(),
                     &[col("c3")],
+                    &[],
+                    &[],
+                    Some(WindowFrame::default()),
                     schema.as_ref(),
-                    "count".to_owned(),
                 )?,
                 create_window_expr(
                     &WindowFunction::AggregateFunction(AggregateFunction::Max),
+                    "max".to_owned(),
                     &[col("c3")],
+                    &[],
+                    &[],
+                    Some(WindowFrame::default()),
                     schema.as_ref(),
-                    "max".to_owned(),
                 )?,
                 create_window_expr(
                     &WindowFunction::AggregateFunction(AggregateFunction::Min),
+                    "min".to_owned(),
                     &[col("c3")],
+                    &[],
+                    &[],
+                    Some(WindowFrame::default()),
                     schema.as_ref(),
-                    "min".to_owned(),
                 )?,
             ],
             input,