You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/09/02 14:29:57 UTC

[GitHub] [arrow] alamb opened a new pull request #8097: ARROW-9821: [Rust][DataFusion] Support for User Defined ExtensionNodes in the LogicalPlan

alamb opened a new pull request #8097:
URL: https://github.com/apache/arrow/pull/8097


   This PR is based on the prototype design in https://github.com/apache/arrow/pull/8020 and the discussion in the [Design Document](https://docs.google.com/document/d/1IHCGkCuUvnE9BavkykPULn6Ugxgqc1JShT4nz1vMi7g/edit#) and the [discussion on the mailing list](https://lists.apache.org/thread.html/rf8ae7d1147e93e3f6172bc2e4fa50a38abcb35f046cc5830e09da6cc%40%3Cdev.arrow.apache.org%3E). See also https://issues.apache.org/jira/browse/ARROW-9821,
   
   This PR adds:
   1. A `ExtensionNode` trait for defining user defined behavior in LogicalPlanNodes
   2. Support for planning (both logical and physical) for such plan nodes
   3. An end to end example and test of using LogicalPlanNode to implement a simple "topK" operator using a custom defined plan node.
   
   The idea of the end to end example is both to serve as documentation as well as to ensure the API can be used for a non trivial example operator
   
   Major Differences from the prototype:
   1. Renamed `LogicalPlanNode` to `ExtensionPlanNode` as I think that better reflects what it is
   2. I did not change the built in `LimitNode` to use the new interface, but instead created an end-to-end demonstration
   3. Demonstration of how to provide a custom physical planner (per comment https://github.com/apache/arrow/pull/8020#discussion_r475953168 from @andygrove)
   3. The code for `Extension` plan node is less of a mess (uses `Arc` instead of `Box`)
   4.  Register the new optimization passes so that the high level `ExecutionContext::sql` could be used rather than the low level APIs.
   
   I am sorry for the relatively large PR -- most of the code is the new example with comments. 
   
   I can also break this into a few smaller PRs (the changes to `ExecutionConfig` and `DefaultPhysicalPlanner` could naturally be broken out, if that would be easier)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on pull request #8097: ARROW-9821: [Rust][DataFusion] Support for User Defined ExtensionNodes in the LogicalPlan

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #8097:
URL: https://github.com/apache/arrow/pull/8097#issuecomment-688881219


   🎉 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on pull request #8097: ARROW-9821: [Rust][DataFusion] Support for User Defined ExtensionNodes in the LogicalPlan

Posted by GitBox <gi...@apache.org>.
andygrove commented on pull request #8097:
URL: https://github.com/apache/arrow/pull/8097#issuecomment-688368606


   @alamb Apologies .. I did not get to this yet due to work commitments, but I am going to make time today to review this.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on a change in pull request #8097: ARROW-9821: [Rust][DataFusion] Support for User Defined ExtensionNodes in the LogicalPlan

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #8097:
URL: https://github.com/apache/arrow/pull/8097#discussion_r483275351



##########
File path: rust/datafusion/src/execution/context.rs
##########
@@ -417,7 +445,16 @@ impl ExecutionConfig {
         mut self,
         physical_planner: Arc<dyn PhysicalPlanner>,
     ) -> Self {
-        self.physical_planner = Some(physical_planner);
+        self.physical_planner = physical_planner;
+        self
+    }
+
+    /// Optional source of additional optimization passes
+    pub fn with_optimizer_rule_source(

Review comment:
       @jorgecarleitao got it -- I will perhaps try with Clone later. There is  a style of writing optimizers that actually does put state on the optimizer object itself -- another style, which I think you used for predicate pushdown, is to pass state as explicit arguments.
   
   The challenge with using function arguments is that I can't think of any way to write generic functions like `optimize_children` in that style: https://github.com/apache/arrow/pull/8097/files#diff-4a428d98d7a9b116e888c2abd108fbedR32-R43.
   
   I'll think about it this evening and maybe something will pop into my head




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on pull request #8097: ARROW-9821: [Rust][DataFusion] Support for User Defined ExtensionNodes in the LogicalPlan

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #8097:
URL: https://github.com/apache/arrow/pull/8097#issuecomment-688254785


   @andygrove  -- is there any chance you have had a chance to think through the interactions withg providing alternate planners and optimization rules? If not, no worries. I am just trying to see if I should implement @jorgecarleitao 's suggestions yet or if I should wait for your input.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on a change in pull request #8097: ARROW-9821: [Rust][DataFusion] Support for User Defined ExtensionNodes in the LogicalPlan

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on a change in pull request #8097:
URL: https://github.com/apache/arrow/pull/8097#discussion_r482968858



##########
File path: rust/datafusion/src/physical_plan/planner.rs
##########
@@ -331,6 +355,16 @@ impl DefaultPhysicalPlanner {
                 let schema_ref = Arc::new(schema.as_ref().clone());
                 Ok(Arc::new(ExplainExec::new(schema_ref, stringified_plans)))
             }
+            LogicalPlan::Extension { node } => {
+                let inputs = node
+                    .inputs()
+                    .into_iter()
+                    .map(|input_plan| self.create_physical_plan(input_plan, ctx_state))
+                    .collect::<Result<Vec<_>>>()?;
+
+                self.extension_planner

Review comment:
       I am sorry, I did not mean to say that we should do it in this PR - it does not. I just got too excited about this ^_^
   
   The use case:
   
   You publish the `TopK` extension as a (library) crate called `datafusion-topk`, and I publish a crate `datafusion-s3` with another extension.
   
   A user wants to use both extensions. It installs them by:
   
   1. adding each crate to `Cargo.toml`
   2. initialize the default planner with both of them
   3. plan them
   4. execute them
   
   I.e. freaking easy!
   
   Broadly speaking, this allows the existence of an ecosystem of extensions/user-defined plans: people can share hand-crafted plans and plans can be added as dependencies to the crate and registered to the planner to be used by other people. 🤯
   
   This also reduces the pressure of placing everything in DataFusion's codebase: if we offer an API to extend DataFusion in this way, people can just distribute libraries with the extension/user-defined plan without having to go through the decision process of whether X is part of DataFusion's core or not (e.g. a scan of format Y, or a scan over protocol Z).
   
   For me, this use case does require an easy way to achieve `2. initialize the default planner with both of them`. But again, this PR is definitely a major step in this direction!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on a change in pull request #8097: ARROW-9821: [Rust][DataFusion] Support for User Defined ExtensionNodes in the LogicalPlan

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #8097:
URL: https://github.com/apache/arrow/pull/8097#discussion_r482891141



##########
File path: rust/datafusion/src/logical_plan/mod.rs
##########
@@ -755,8 +756,72 @@ impl fmt::Debug for Expr {
     }
 }
 
-/// The LogicalPlan represents different types of relations (such as Projection,
-/// Filter, etc) and can be created by the SQL query planner and the DataFrame API.
+/// This defines the interface for `LogicalPlan` nodes that can be
+/// used to extend DataFusion with custom relational operators.
+///
+/// See the example in
+/// [user_defined_plan.rs](../../tests/user_defined_plan.rs) for an
+/// example of how to use this extenison API
+pub trait ExtensionPlanNode: Debug {

Review comment:
       I struggled with this naming -- I could go with `UserDefinedNode` though I wonder if it should be `UserDefinedPlanNode` or something to distinguish it from nodes in `ExecutionPlan`s.  @andygrove  any thoughts?

##########
File path: rust/datafusion/src/execution/context.rs
##########
@@ -417,7 +445,16 @@ impl ExecutionConfig {
         mut self,
         physical_planner: Arc<dyn PhysicalPlanner>,
     ) -> Self {
-        self.physical_planner = Some(physical_planner);
+        self.physical_planner = physical_planner;
+        self
+    }
+
+    /// Optional source of additional optimization passes
+    pub fn with_optimizer_rule_source(

Review comment:
       I would like to implement the pattern of passing in  `Arc<dyn OptimizerRule>` directly (in fact I tried to implement it initially). The issue I found is that `ExecutionContext` is read only (immutable) during planning, but the signature of `OptimizerRule` requires `&mut self` (aka it can change the `OptimizerRule` state).
   
   I think Rust is actually saving us from ourselves, because as discussed in  https://issues.apache.org/jira/browse/ARROW-9888 the ExecutionContext can be shared between threads (and thus allowing one thread to mutate the `OptimizerRule`s could cause problems) 
   
   What do you think about making `optimizer_rules` `Vec<dyn OptimizerRule + Clone>` and then planning with the result of `clone()`ing ? 
   
   Actually, the more I think about this the more I like the `Clone` idea...

##########
File path: rust/datafusion/src/physical_plan/planner.rs
##########
@@ -331,6 +355,16 @@ impl DefaultPhysicalPlanner {
                 let schema_ref = Arc::new(schema.as_ref().clone());
                 Ok(Arc::new(ExplainExec::new(schema_ref, stringified_plans)))
             }
+            LogicalPlan::Extension { node } => {
+                let inputs = node
+                    .inputs()
+                    .into_iter()
+                    .map(|input_plan| self.create_physical_plan(input_plan, ctx_state))
+                    .collect::<Result<Vec<_>>>()?;
+
+                self.extension_planner
+                    .plan_extension(node.as_ref(), inputs, ctx_state)

Review comment:
       good idea -- I will do so.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on a change in pull request #8097: ARROW-9821: [Rust][DataFusion] Support for User Defined ExtensionNodes in the LogicalPlan

Posted by GitBox <gi...@apache.org>.
andygrove commented on a change in pull request #8097:
URL: https://github.com/apache/arrow/pull/8097#discussion_r484555403



##########
File path: rust/datafusion/src/execution/context.rs
##########
@@ -288,9 +288,17 @@ impl ExecutionContext {
 
     /// Optimize the logical plan by applying optimizer rules
     pub fn optimize(&self, plan: &LogicalPlan) -> Result<LogicalPlan> {
-        let plan = ProjectionPushDown::new().optimize(&plan)?;
-        let plan = FilterPushDown::new().optimize(&plan)?;
-        let plan = TypeCoercionRule::new(self).optimize(&plan)?;
+        let mut plan = ProjectionPushDown::new().optimize(&plan)?;
+        plan = FilterPushDown::new().optimize(&plan)?;
+        plan = TypeCoercionRule::new(self).optimize(&plan)?;
+
+        // apply any user supplied rules
+        let mut rules = self.state.config.optimizer_rule_source.rules();

Review comment:
       I don't have any objections to this approach, but I think I would have done it slightly differently. I am wondering if it makes sense to have one trait to cover the functionality that users can extend DataFusion with? Something like this ...
   
   ```rust
   trait UserDefinedQueryPlanner {
     fn optimize_logical_plan(plan: &dyn LogicalPlan) -> Result<Arc<dyn LogicalPlan>>;
     fn create_physical_plan(plan: &dyn LogicalPlan) -> Result<Arc<dyn PhysicalPlan>>;
   }
   ```
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove closed pull request #8097: ARROW-9821: [Rust][DataFusion] Support for User Defined ExtensionNodes in the LogicalPlan

Posted by GitBox <gi...@apache.org>.
andygrove closed pull request #8097:
URL: https://github.com/apache/arrow/pull/8097


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on pull request #8097: ARROW-9821: [Rust][DataFusion] Support for User Defined ExtensionNodes in the LogicalPlan

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on pull request #8097:
URL: https://github.com/apache/arrow/pull/8097#issuecomment-688881985


   🥳 🎉 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on a change in pull request #8097: ARROW-9821: [Rust][DataFusion] Support for User Defined ExtensionNodes in the LogicalPlan

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on a change in pull request #8097:
URL: https://github.com/apache/arrow/pull/8097#discussion_r482703011



##########
File path: rust/datafusion/src/physical_plan/planner.rs
##########
@@ -331,6 +355,16 @@ impl DefaultPhysicalPlanner {
                 let schema_ref = Arc::new(schema.as_ref().clone());
                 Ok(Arc::new(ExplainExec::new(schema_ref, stringified_plans)))
             }
+            LogicalPlan::Extension { node } => {
+                let inputs = node
+                    .inputs()
+                    .into_iter()
+                    .map(|input_plan| self.create_physical_plan(input_plan, ctx_state))
+                    .collect::<Result<Vec<_>>>()?;
+
+                self.extension_planner

Review comment:
       If I understand correctly, this implies that a single `ExtensionPlanner` needs to be able to handle all custom nodes.
   
   Thus, if two extensions are shared, e.g. in cargo crates, consumers of those extensions can't just add them, as they need to re-write their extension planner to incorporate each of them.
   
   An alternative:
   
   * make `extension_planner` 's interface returns `Result<Option<Arc<dyn ExecutionPlan>>>`, and `None` represent that the planner does not know how to plan that node
   * change the logic here to loop over extensions until a non-None is found
   * make extensions return `None` for any node/nodes that they do not support
   * make ` self.extension_planner` be `self.extension_planners`
   * make `with_extension_planner` be `with_extension_planners(Vec)` (or `add_extension_planner()`)
   
   This would allow extensions to be shared along with their respective planner (or planners if they support multiple execution architectures), and users can just add it to their list of extensions and pass them to `with_extension_planners`.

##########
File path: rust/datafusion/tests/user_defined_plan.rs
##########
@@ -0,0 +1,500 @@
+// Licensed to the Apache Software Foundation (ASF) under one

Review comment:
       Savage! 💯💯

##########
File path: rust/datafusion/src/physical_plan/planner.rs
##########
@@ -331,6 +355,16 @@ impl DefaultPhysicalPlanner {
                 let schema_ref = Arc::new(schema.as_ref().clone());
                 Ok(Arc::new(ExplainExec::new(schema_ref, stringified_plans)))
             }
+            LogicalPlan::Extension { node } => {
+                let inputs = node
+                    .inputs()
+                    .into_iter()
+                    .map(|input_plan| self.create_physical_plan(input_plan, ctx_state))
+                    .collect::<Result<Vec<_>>>()?;
+
+                self.extension_planner
+                    .plan_extension(node.as_ref(), inputs, ctx_state)

Review comment:
       I think that we should have a check around here to verify that the logical and physical node has the same output `schema`.

##########
File path: rust/datafusion/src/logical_plan/mod.rs
##########
@@ -755,8 +756,72 @@ impl fmt::Debug for Expr {
     }
 }
 
-/// The LogicalPlan represents different types of relations (such as Projection,
-/// Filter, etc) and can be created by the SQL query planner and the DataFrame API.
+/// This defines the interface for `LogicalPlan` nodes that can be
+/// used to extend DataFusion with custom relational operators.
+///
+/// See the example in
+/// [user_defined_plan.rs](../../tests/user_defined_plan.rs) for an
+/// example of how to use this extenison API
+pub trait ExtensionPlanNode: Debug {

Review comment:
       Another idea: `UserDefinedNode` (UDN), to be aligned with user defined function (UDF) that most people are familiar with and already know what it means.

##########
File path: rust/datafusion/src/execution/context.rs
##########
@@ -417,7 +445,16 @@ impl ExecutionConfig {
         mut self,
         physical_planner: Arc<dyn PhysicalPlanner>,
     ) -> Self {
-        self.physical_planner = Some(physical_planner);
+        self.physical_planner = physical_planner;
+        self
+    }
+
+    /// Optional source of additional optimization passes
+    pub fn with_optimizer_rule_source(

Review comment:
       Going in the same lines as my comment about the planning, one idea is to make `optimizer_rule_source` be a vector of rules, and instantiate `ExecutionConfig` with a `vec` of those rules, instead of having an extra `OptimizerRuleSource` to hold those rules.
   
   The rules themselves are `Arc<dyn OptimizerRule>`, and users write them by the trait.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on a change in pull request #8097: ARROW-9821: [Rust][DataFusion] Support for User Defined ExtensionNodes in the LogicalPlan

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #8097:
URL: https://github.com/apache/arrow/pull/8097#discussion_r484812881



##########
File path: rust/datafusion/src/logical_plan/mod.rs
##########
@@ -755,8 +756,72 @@ impl fmt::Debug for Expr {
     }
 }
 
-/// The LogicalPlan represents different types of relations (such as Projection,
-/// Filter, etc) and can be created by the SQL query planner and the DataFrame API.
+/// This defines the interface for `LogicalPlan` nodes that can be
+/// used to extend DataFusion with custom relational operators.
+///
+/// See the example in
+/// [user_defined_plan.rs](../../tests/user_defined_plan.rs) for an
+/// example of how to use this extenison API
+pub trait ExtensionPlanNode: Debug {

Review comment:
       I like `UserDefinedLogicalNode` -- I will make that change.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on pull request #8097: ARROW-9821: [Rust][DataFusion] Support for User Defined ExtensionNodes in the LogicalPlan

Posted by GitBox <gi...@apache.org>.
andygrove commented on pull request #8097:
URL: https://github.com/apache/arrow/pull/8097#issuecomment-686126907


   @alamb I took a first pass through this and it is looking good overall. I need a bit more time to think through some of the interaction with providing alternate planners and optimization rules and will try and make time for this tomorrow.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on a change in pull request #8097: ARROW-9821: [Rust][DataFusion] Support for User Defined ExtensionNodes in the LogicalPlan

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #8097:
URL: https://github.com/apache/arrow/pull/8097#discussion_r484891048



##########
File path: rust/datafusion/src/execution/context.rs
##########
@@ -383,24 +383,56 @@ impl ScalarFunctionRegistry for ExecutionContext {
     }
 }
 
+/// A planner used to add extensions to DataFusion logical and phusical plans.

Review comment:
       FYI @andygrove  here is the single trait you suggested (though I renamed it `QueryPlanner` rather than `UserDefinedQueryPlanner`

##########
File path: rust/datafusion/src/physical_plan/planner.rs
##########
@@ -331,6 +355,16 @@ impl DefaultPhysicalPlanner {
                 let schema_ref = Arc::new(schema.as_ref().clone());
                 Ok(Arc::new(ExplainExec::new(schema_ref, stringified_plans)))
             }
+            LogicalPlan::Extension { node } => {
+                let inputs = node
+                    .inputs()
+                    .into_iter()
+                    .map(|input_plan| self.create_physical_plan(input_plan, ctx_state))
+                    .collect::<Result<Vec<_>>>()?;
+
+                self.extension_planner

Review comment:
       I like this idea so much I filed a ticket to make it happen: https://issues.apache.org/jira/browse/ARROW-9940. I'll try and look into it later this week (given I am effectively starting to create my own extensions now for an internal project at work, that will be a good test case). 

##########
File path: rust/datafusion/src/execution/context.rs
##########
@@ -288,9 +288,17 @@ impl ExecutionContext {
 
     /// Optimize the logical plan by applying optimizer rules
     pub fn optimize(&self, plan: &LogicalPlan) -> Result<LogicalPlan> {
-        let plan = ProjectionPushDown::new().optimize(&plan)?;
-        let plan = FilterPushDown::new().optimize(&plan)?;
-        let plan = TypeCoercionRule::new(self).optimize(&plan)?;
+        let mut plan = ProjectionPushDown::new().optimize(&plan)?;
+        plan = FilterPushDown::new().optimize(&plan)?;
+        plan = TypeCoercionRule::new(self).optimize(&plan)?;
+
+        // apply any user supplied rules
+        let mut rules = self.state.config.optimizer_rule_source.rules();

Review comment:
       I like this approach -- I will give it a try

##########
File path: rust/datafusion/src/execution/context.rs
##########
@@ -417,7 +445,16 @@ impl ExecutionConfig {
         mut self,
         physical_planner: Arc<dyn PhysicalPlanner>,
     ) -> Self {
-        self.physical_planner = Some(physical_planner);
+        self.physical_planner = physical_planner;
+        self
+    }
+
+    /// Optional source of additional optimization passes
+    pub fn with_optimizer_rule_source(

Review comment:
       As it turns out that @andygrove 's suggestion for a single planner trait has removed the need for this trait entirely 🎉
   
   

##########
File path: rust/datafusion/src/execution/context.rs
##########
@@ -375,15 +380,39 @@ impl ScalarFunctionRegistry for ExecutionContext {
     }
 }
 
+/// Provides OptimizerRule instances to

Review comment:
       As it turns out your suggestion for a single planner trait has removed the need for this trait entirely 🎉 

##########
File path: rust/datafusion/src/logical_plan/mod.rs
##########
@@ -755,8 +756,72 @@ impl fmt::Debug for Expr {
     }
 }
 
-/// The LogicalPlan represents different types of relations (such as Projection,
-/// Filter, etc) and can be created by the SQL query planner and the DataFrame API.
+/// This defines the interface for `LogicalPlan` nodes that can be
+/// used to extend DataFusion with custom relational operators.
+///
+/// See the example in
+/// [user_defined_plan.rs](../../tests/user_defined_plan.rs) for an
+/// example of how to use this extenison API
+pub trait ExtensionPlanNode: Debug {
+    /// Return a reference to self as Any, to support dynamic downcasting
+    fn as_any(&self) -> &dyn Any;
+
+    /// Return the the logical plan's inputs
+    fn inputs(&self) -> Vec<&LogicalPlan>;

Review comment:
       I think children is a better name. 
   
   I filed ticket to make names consistent:  https://issues.apache.org/jira/browse/ARROW-9939. I'll try and get a PR up later today or maybe tomorrow

##########
File path: rust/datafusion/tests/user_defined_plan.rs
##########
@@ -0,0 +1,510 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This module contains an end to end demonstration of creating
+//! a user defined operator in DataFusion.
+//!
+//! Specifically, it shows how to define a `TopKNode` that implements
+//! `ExtensionPlanNode`, add an OptimizerRule to rewrite a
+//! `LogicalPlan` to use that node a `LogicalPlan`, create an
+//! `ExecutionPlan` and finally produce results.
+//!
+//! # TopK Background:
+//!
+//! A "Top K" node is a common query optimization which is used for
+//! queries such as "find the top 3 customers by revenue". The
+//! (simplified) SQL for such a query might be:
+//!
+//! ```sql
+//! CREATE EXTERNAL TABLE sales(customer_id VARCHAR, revenue BIGINT)
+//!   STORED AS CSV location 'tests/customer.csv';
+//!
+//! SELECT customer_id, revenue FROM sales ORDER BY revenue DESC limit 3;
+//! ```
+//!
+//! And a naive plan would be:
+//!
+//! ```
+//! > explain SELECT customer_id, revenue FROM sales ORDER BY revenue DESC limit 3;
+//! +--------------+----------------------------------------+
+//! | plan_type    | plan                                   |
+//! +--------------+----------------------------------------+
+//! | logical_plan | Limit: 3                               |
+//! |              |   Sort: #revenue DESC NULLS FIRST      |
+//! |              |     Projection: #customer_id, #revenue |
+//! |              |       TableScan: sales projection=None |
+//! +--------------+----------------------------------------+
+//! ```
+//!
+//! While this plan produces the correct answer, the careful reader
+//! will note it fully sorts the input before discarding everything
+//! other than the top 3 elements.
+//!
+//! The same answer can be produced by simply keeping track of the top
+//! N elements, reducing the total amount of required buffer memory.
+//!
+
+use arrow::{
+    array::{Int64Array, StringArray},
+    datatypes::SchemaRef,
+    error::ArrowError,
+    record_batch::{RecordBatch, RecordBatchReader},
+    util::pretty::pretty_format_batches,
+};
+use datafusion::{
+    error::{ExecutionError, Result},
+    execution::context::ExecutionContextState,
+    execution::context::QueryPlanner,
+    logical_plan::{Expr, LogicalPlan, UserDefinedLogicalNode},
+    optimizer::{optimizer::OptimizerRule, utils::optimize_explain},
+    physical_plan::{
+        planner::{DefaultPhysicalPlanner, ExtensionPlanner},
+        Distribution, ExecutionPlan, Partitioning, PhysicalPlanner,
+    },
+    prelude::{ExecutionConfig, ExecutionContext},
+};
+use fmt::Debug;
+use std::{
+    any::Any,
+    collections::BTreeMap,
+    fmt,
+    sync::{Arc, Mutex},
+};
+
+/// Execute the specified sql and return the resulting record batches
+/// pretty printed as a String.
+fn exec_sql(ctx: &mut ExecutionContext, sql: &str) -> Result<String> {
+    let df = ctx.sql(sql)?;
+    let batches = df.collect()?;
+    pretty_format_batches(&batches).map_err(|e| ExecutionError::ArrowError(e))
+}
+
+/// Create a test table.
+fn setup_table(mut ctx: ExecutionContext) -> Result<ExecutionContext> {
+    let sql = "CREATE EXTERNAL TABLE sales(customer_id VARCHAR, revenue BIGINT) STORED AS CSV location 'tests/customer.csv'";
+
+    let expected = vec!["++", "++"];
+
+    let s = exec_sql(&mut ctx, sql)?;
+    let actual = s.lines().collect::<Vec<_>>();
+
+    assert_eq!(expected, actual, "Creating table");
+    Ok(ctx)
+}
+
+const QUERY: &str =
+    "SELECT customer_id, revenue FROM sales ORDER BY revenue DESC limit 3";
+
+// Run the query using the specified execution context and compare it
+// to the known result
+fn run_and_compare_query(mut ctx: ExecutionContext, description: &str) -> Result<()> {
+    let expected = vec![
+        "+-------------+---------+",
+        "| customer_id | revenue |",
+        "+-------------+---------+",
+        "| paul        | 300     |",
+        "| jorge       | 200     |",
+        "| andy        | 150     |",
+        "+-------------+---------+",
+    ];
+
+    let s = exec_sql(&mut ctx, QUERY)?;
+    let actual = s.lines().collect::<Vec<_>>();
+
+    assert_eq!(
+        expected,
+        actual,
+        "output mismatch for {}. Expectedn\n{}Actual:\n{}",
+        description,
+        expected.join("\n"),
+        s
+    );
+    Ok(())
+}
+
+#[test]
+// Run the query using default planners and optimizer
+fn normal_query() -> Result<()> {
+    let ctx = setup_table(ExecutionContext::new())?;
+    run_and_compare_query(ctx, "Default context")
+}
+
+#[test]
+// Run the query using topk optimization
+fn topk_query() -> Result<()> {
+    // Note the only difference is that the top
+    let ctx = setup_table(make_topk_context())?;
+    run_and_compare_query(ctx, "Topk context")
+}
+
+#[test]
+// Run EXPLAIN PLAN and show the plan was in fact rewritten
+fn topk_plan() -> Result<()> {
+    let mut ctx = setup_table(make_topk_context())?;
+
+    let expected = vec![
+        "| logical_plan after topk                 | TopK: k=3                                      |",
+        "|                                         |   Projection: #customer_id, #revenue           |",
+        "|                                         |     TableScan: sales projection=Some([0, 1])   |",
+    ].join("\n");
+
+    let explain_query = format!("EXPLAIN VERBOSE {}", QUERY);
+    let actual_output = exec_sql(&mut ctx, &explain_query)?;
+
+    // normalize newlines (output on windows uses \r\n)
+    let actual_output = actual_output.replace("\r\n", "\n");
+
+    assert!(actual_output.contains(&expected) , "Expected output not present in actual output\nExpected:\n---------\n{}\nActual:\n--------\n{}", expected, actual_output);
+    Ok(())
+}
+
+fn make_topk_context() -> ExecutionContext {
+    let config = ExecutionConfig::new().with_query_planner(Arc::new(TopKQueryPlanner {}));
+
+    ExecutionContext::with_config(config)
+}
+
+// ------ The implementation of the TopK code follows -----
+
+struct TopKQueryPlanner {}

Review comment:
       Here is the updated way to register extensions with DataFusion




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on a change in pull request #8097: ARROW-9821: [Rust][DataFusion] Support for User Defined ExtensionNodes in the LogicalPlan

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #8097:
URL: https://github.com/apache/arrow/pull/8097#discussion_r483273475



##########
File path: rust/datafusion/src/physical_plan/planner.rs
##########
@@ -331,6 +355,16 @@ impl DefaultPhysicalPlanner {
                 let schema_ref = Arc::new(schema.as_ref().clone());
                 Ok(Arc::new(ExplainExec::new(schema_ref, stringified_plans)))
             }
+            LogicalPlan::Extension { node } => {
+                let inputs = node
+                    .inputs()
+                    .into_iter()
+                    .map(|input_plan| self.create_physical_plan(input_plan, ctx_state))
+                    .collect::<Result<Vec<_>>>()?;
+
+                self.extension_planner

Review comment:
       @jorgecarleitao  I see -- and that makes sense 👍 👍 
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on a change in pull request #8097: ARROW-9821: [Rust][DataFusion] Support for User Defined ExtensionNodes in the LogicalPlan

Posted by GitBox <gi...@apache.org>.
andygrove commented on a change in pull request #8097:
URL: https://github.com/apache/arrow/pull/8097#discussion_r484556394



##########
File path: rust/datafusion/src/logical_plan/mod.rs
##########
@@ -755,8 +756,72 @@ impl fmt::Debug for Expr {
     }
 }
 
-/// The LogicalPlan represents different types of relations (such as Projection,
-/// Filter, etc) and can be created by the SQL query planner and the DataFrame API.
+/// This defines the interface for `LogicalPlan` nodes that can be
+/// used to extend DataFusion with custom relational operators.
+///
+/// See the example in
+/// [user_defined_plan.rs](../../tests/user_defined_plan.rs) for an
+/// example of how to use this extenison API
+pub trait ExtensionPlanNode: Debug {
+    /// Return a reference to self as Any, to support dynamic downcasting
+    fn as_any(&self) -> &dyn Any;
+
+    /// Return the the logical plan's inputs
+    fn inputs(&self) -> Vec<&LogicalPlan>;

Review comment:
       I've been thinking lately that we should start standardizing on `children` rather than `inputs`. What do others think about this? This doesn't have to happen in this PR though.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on a change in pull request #8097: ARROW-9821: [Rust][DataFusion] Support for User Defined ExtensionNodes in the LogicalPlan

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #8097:
URL: https://github.com/apache/arrow/pull/8097#discussion_r482115402



##########
File path: rust/datafusion/src/execution/context.rs
##########
@@ -375,15 +380,39 @@ impl ScalarFunctionRegistry for ExecutionContext {
     }
 }
 
+/// Provides OptimizerRule instances to

Review comment:
       I am not a huge fan of this `OptimizerRuleSource` but it was the best I could come up with at this point

##########
File path: rust/datafusion/src/execution/context.rs
##########
@@ -375,15 +380,39 @@ impl ScalarFunctionRegistry for ExecutionContext {
     }
 }
 
+/// Provides OptimizerRule instances to
+///
+/// Because OptimizerRule's themselves need
+/// to be mutable to conform to the OptimizerRuleTrait, they
+/// must be instantiated every time a plan is optimized
+/// ExecutionContext::optimize doesn't have a mutable reference to
+/// &self....
+pub trait OptimizerRuleSource {
+    /// Return the OptimizerRules to apply to a LogicalPlan. The rules
+    /// are applied in the order they are returned in the Vec.
+    fn rules(&self) -> Vec<Box<dyn OptimizerRule>>;
+}
+
+/// Supplies no additional optimizer rules
+struct DefaultOptimizerRuleSource {}
+
+impl OptimizerRuleSource for DefaultOptimizerRuleSource {
+    fn rules(&self) -> Vec<Box<dyn OptimizerRule>> {
+        vec![]
+    }
+}
+
 /// Configuration options for execution context
 #[derive(Clone)]
 pub struct ExecutionConfig {
     /// Number of concurrent threads for query execution.
     pub concurrency: usize,
     /// Default batch size when reading data sources
     pub batch_size: usize,
-    /// Optional physical planner to override the default physical planner
-    physical_planner: Option<Arc<dyn PhysicalPlanner>>,
+    /// Physical planner for converting from `LogicalPlan` to `ExecutionPlan`
+    physical_planner: Arc<dyn PhysicalPlanner>,

Review comment:
       I thought the code would be neater if there was always a physical_planner (which is an instance of the `DefaultPhysicalPlanner`)

##########
File path: rust/datafusion/tests/user_defined_plan.rs
##########
@@ -0,0 +1,496 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This module contains an end to end demonstration of creating
+//! a user defined operator in DataFusion.
+//!
+//! Specifically, it shows how to define a `TopKNode` that implements
+//! `ExtensionPlanNode`, add an OptimizerRule to rewrite a
+//! `LogicalPlan` to use that node a `LogicalPlan`, create an
+//! `ExecutionPlan` and finally produce results.
+//!
+//! # TopK Background:
+//!
+//! A "Top K" node is a common query optimization which is used for
+//! queries such as "find the top 3 customers by revenue". The
+//! (simplified) SQL for such a query might be:
+//!
+//! ```sql
+//! CREATE EXTERNAL TABLE sales(customer_id VARCHAR, revenue BIGINT)
+//!   STORED AS CSV location 'tests/customer.csv';
+//!
+//! SELECT customer_id, revenue FROM sales ORDER BY revenue DESC limit 3;
+//! ```
+//!
+//! And a naive plan would be:
+//!
+//! ```
+//! > explain SELECT customer_id, revenue FROM sales ORDER BY revenue DESC limit 3;
+//! +--------------+----------------------------------------+
+//! | plan_type    | plan                                   |
+//! +--------------+----------------------------------------+
+//! | logical_plan | Limit: 3                               |
+//! |              |   Sort: #revenue DESC NULLS FIRST      |
+//! |              |     Projection: #customer_id, #revenue |
+//! |              |       TableScan: sales projection=None |
+//! +--------------+----------------------------------------+
+//! ```
+//!
+//! While this plan produces the correct answer, the careful reader
+//! will note it fully sorts the input before discarding everything
+//! other than the top 3 elements.
+//!
+//! The same answer can be produced by simply keeping track of the top
+//! N elements, reducing the total amount of required buffer memory.
+//!
+
+use arrow::{
+    array::{Int64Array, StringArray},
+    datatypes::{Schema, SchemaRef},
+    error::ArrowError,
+    record_batch::{RecordBatch, RecordBatchReader},
+    util::pretty::pretty_format_batches,
+};
+use datafusion::{
+    error::{ExecutionError, Result},
+    execution::context::{ExecutionContextState, OptimizerRuleSource},
+    logical_plan::{Expr, ExtensionPlanNode, LogicalPlan},
+    optimizer::{optimizer::OptimizerRule, utils::optimize_explain},
+    physical_plan::{
+        planner::{DefaultPhysicalPlanner, ExtensionPlanner},
+        Distribution, ExecutionPlan, Partitioning,
+    },
+    prelude::{ExecutionConfig, ExecutionContext},
+};
+use fmt::Debug;
+use std::{
+    any::Any,
+    collections::BTreeMap,
+    fmt,
+    sync::{Arc, Mutex},
+};
+
+/// Execute the specified sql and return the resulting record batches
+/// pretty printed as a String.
+fn exec_sql(ctx: &mut ExecutionContext, sql: &str) -> Result<String> {
+    let df = ctx.sql(sql)?;
+    let batches = df.collect()?;
+    pretty_format_batches(&batches).map_err(|e| ExecutionError::ArrowError(e))
+}
+
+/// Create a test table.
+fn setup_table(mut ctx: ExecutionContext) -> Result<ExecutionContext> {
+    let sql = "CREATE EXTERNAL TABLE sales(customer_id VARCHAR, revenue BIGINT) STORED AS CSV location 'tests/customer.csv'";
+
+    let expected = vec!["++", "++"];
+
+    let s = exec_sql(&mut ctx, sql)?;
+    let actual = s.lines().collect::<Vec<_>>();
+
+    assert_eq!(expected, actual, "Creating table");
+    Ok(ctx)
+}
+
+const QUERY: &str =
+    "SELECT customer_id, revenue FROM sales ORDER BY revenue DESC limit 3";
+
+// Run the query using the specified execution context and compare it
+// to the known result
+fn run_and_compare_query(mut ctx: ExecutionContext, description: &str) -> Result<()> {
+    let expected = vec![
+        "+-------------+---------+",
+        "| customer_id | revenue |",
+        "+-------------+---------+",
+        "| paul        | 300     |",
+        "| jorge       | 200     |",
+        "| andy        | 150     |",
+        "+-------------+---------+",
+    ];
+
+    let s = exec_sql(&mut ctx, QUERY)?;
+    let actual = s.lines().collect::<Vec<_>>();
+
+    assert_eq!(
+        expected,
+        actual,
+        "output mismatch for {}. Expectedn\n{}Actual:\n{}",
+        description,
+        expected.join("\n"),
+        s
+    );
+    Ok(())
+}
+
+#[test]
+// Run the query using default planners and optimizer
+fn normal_query() -> Result<()> {
+    let ctx = setup_table(ExecutionContext::new())?;
+    run_and_compare_query(ctx, "Default context")
+}
+
+#[test]
+// Run the query using topk optimization
+fn topk_query() -> Result<()> {
+    // Note the only difference is that the top
+    let ctx = setup_table(make_topk_context())?;
+    run_and_compare_query(ctx, "Topk context")
+}
+
+#[test]
+// Run EXPLAIN PLAN and show the plan was in fact rewritten
+fn topk_plan() -> Result<()> {
+    let mut ctx = setup_table(make_topk_context())?;
+
+    let expected = vec![
+        "| logical_plan after topk                 | TopK: k=3                                      |",
+        "|                                         |   Projection: #customer_id, #revenue           |",
+        "|                                         |     TableScan: sales projection=Some([0, 1])   |",
+    ].join("\n");
+
+    let explain_query = format!("EXPLAIN VERBOSE {}", QUERY);
+    let actual_output = exec_sql(&mut ctx, &explain_query)?;
+
+    assert!(actual_output.contains(&expected) , "Expected output not present in actual output\nExpected:\n---------\n{}\nActual:\n--------\n{}", expected, actual_output);
+    Ok(())
+}
+
+fn make_topk_context() -> ExecutionContext {
+    let physical_planner = Arc::new(DefaultPhysicalPlanner::with_extension_planner(

Review comment:
       Here is the example of how to register `OptimizerRule`s and PhysicalPlan rules

##########
File path: rust/datafusion/src/physical_plan/planner.rs
##########
@@ -42,14 +44,30 @@ use crate::physical_plan::{AggregateExpr, ExecutionPlan, PhysicalExpr, PhysicalP
 use arrow::compute::SortOptions;
 use arrow::datatypes::Schema;
 
+/// This trait permits the `DefaultPhysicalPlanner` to create plans for
+/// user defined `ExtensionPlanNode`s
+pub trait ExtensionPlanner {

Review comment:
       @andygrove  -- here is my  proposed change to support physical planning of extension nodes in response to https://github.com/apache/arrow/pull/8020#discussion_r475953168. Rather than doing physical planning directly in the `ExtensionNode` itself, this is a mechanism to register planning capabilities into the `DefaultPhysicalPlanner`

##########
File path: rust/datafusion/src/logical_plan/mod.rs
##########
@@ -890,11 +955,16 @@ pub enum LogicalPlan {
         /// The output schema of the explain (2 columns of text)
         schema: Box<Schema>,
     },
+    /// Extension operator defined outside of DataFusion
+    Extension {
+        /// The runtime extension operator
+        node: Arc<dyn ExtensionPlanNode>,
+    },
 }
 
 impl LogicalPlan {
     /// Get a reference to the logical plan's schema
-    pub fn schema(&self) -> &Box<Schema> {
+    pub fn schema(&self) -> &Schema {

Review comment:
       Returning a `&Box<>` requires implementers to store their `Schema` objects in a `Box` -- using a `&Schema` provides additional flexibility




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on a change in pull request #8097: ARROW-9821: [Rust][DataFusion] Support for User Defined ExtensionNodes in the LogicalPlan

Posted by GitBox <gi...@apache.org>.
andygrove commented on a change in pull request #8097:
URL: https://github.com/apache/arrow/pull/8097#discussion_r484556134



##########
File path: rust/datafusion/src/logical_plan/mod.rs
##########
@@ -755,8 +756,72 @@ impl fmt::Debug for Expr {
     }
 }
 
-/// The LogicalPlan represents different types of relations (such as Projection,
-/// Filter, etc) and can be created by the SQL query planner and the DataFrame API.
+/// This defines the interface for `LogicalPlan` nodes that can be
+/// used to extend DataFusion with custom relational operators.
+///
+/// See the example in
+/// [user_defined_plan.rs](../../tests/user_defined_plan.rs) for an
+/// example of how to use this extenison API
+pub trait ExtensionPlanNode: Debug {

Review comment:
       I like the idea of standardizing on `UserDefined*` for extension points. Everything we do here for logical plans, we will end up doing for physical plans too, I expect. Given that, we should add `Logical` to the name, so `UserDefinedLogicalNode` perhaps?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on a change in pull request #8097: ARROW-9821: [Rust][DataFusion] Support for User Defined ExtensionNodes in the LogicalPlan

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on a change in pull request #8097:
URL: https://github.com/apache/arrow/pull/8097#discussion_r482975446



##########
File path: rust/datafusion/src/execution/context.rs
##########
@@ -417,7 +445,16 @@ impl ExecutionConfig {
         mut self,
         physical_planner: Arc<dyn PhysicalPlanner>,
     ) -> Self {
-        self.physical_planner = Some(physical_planner);
+        self.physical_planner = physical_planner;
+        self
+    }
+
+    /// Optional source of additional optimization passes
+    pub fn with_optimizer_rule_source(

Review comment:
       I like with `Clone` also.
   
   Another idea (not for this PR) is to make `OptimizerRule` be a function only, i.e. replace the trait by `Fn(Plan) -> Result<Plan>`. `TypeCoercer` lost its state in one of my recent PRs. In the end, the way we have been using this trait is:
   
   1. Initialize the optimizer without arguments
   2. Run the optimizer with a single argument (logicalplan)
   3. remove the optimizer
   
   If we really need to offer the optimizer some more information, we could just pass the executionContextState as a second argument of `optimize`.
   
   This way, we do not really need to worry about memory management, which we really do not need in practice, as optimizations are stateless unless we need some introspection as to what it did, in which case logging seems adequate.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on pull request #8097: ARROW-9821: [Rust][DataFusion] Support for User Defined ExtensionNodes in the LogicalPlan

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #8097:
URL: https://github.com/apache/arrow/pull/8097#issuecomment-688782487


   > @alamb Apologies .. I did not get to this yet due to work commitments, but I am going to make time today to review this.
   
   No problem @andygrove -- I *totally* understand and thank you for making the time on a holiday to review it. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on a change in pull request #8097: ARROW-9821: [Rust][DataFusion] Support for User Defined ExtensionNodes in the LogicalPlan

Posted by GitBox <gi...@apache.org>.
andygrove commented on a change in pull request #8097:
URL: https://github.com/apache/arrow/pull/8097#discussion_r484555554



##########
File path: rust/datafusion/src/execution/context.rs
##########
@@ -375,15 +380,39 @@ impl ScalarFunctionRegistry for ExecutionContext {
     }
 }
 
+/// Provides OptimizerRule instances to

Review comment:
       I think I would prefer `Provider` to `Source` in this context. Also, would be good to document whether these are logical or physical optimizer rules.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on a change in pull request #8097: ARROW-9821: [Rust][DataFusion] Support for User Defined ExtensionNodes in the LogicalPlan

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #8097:
URL: https://github.com/apache/arrow/pull/8097#discussion_r482887593



##########
File path: rust/datafusion/src/physical_plan/planner.rs
##########
@@ -331,6 +355,16 @@ impl DefaultPhysicalPlanner {
                 let schema_ref = Arc::new(schema.as_ref().clone());
                 Ok(Arc::new(ExplainExec::new(schema_ref, stringified_plans)))
             }
+            LogicalPlan::Extension { node } => {
+                let inputs = node
+                    .inputs()
+                    .into_iter()
+                    .map(|input_plan| self.create_physical_plan(input_plan, ctx_state))
+                    .collect::<Result<Vec<_>>>()?;
+
+                self.extension_planner

Review comment:
       > this implies that a single ExtensionPlanner needs to be able to handle all custom nodes.
   
   This is correct. I had not thought about having two different sets of extensions registered at the same time. Do you have any specific examples of  situations where users might do that? 
   
   As you point out, given the design in this PR, users of DataFusion would have to do some extra work to combine extensions -- namely they would effectively have to write their own extension that invoked the others.
   
   Given there is workaround for multiple extensions (even though it is not very user friendly), my personal preference is to begin with supporting a single extension for now. 
   
   I think your suggestion on how to implement multiple extension planners is an excellent one and I will use it if we decide to support multiple set of extensions. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] github-actions[bot] commented on pull request #8097: ARROW-9821: [Rust][DataFusion] Support for User Defined ExtensionNodes in the LogicalPlan

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #8097:
URL: https://github.com/apache/arrow/pull/8097#issuecomment-685777205


   https://issues.apache.org/jira/browse/ARROW-9821


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org