You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by jo...@apache.org on 2021/01/31 10:02:12 UTC

[arrow] branch master updated: ARROW-11395: [DataFusion] Support custom optimizers

This is an automated email from the ASF dual-hosted git repository.

jorgecarleitao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 77a46f2  ARROW-11395: [DataFusion] Support custom optimizers
77a46f2 is described below

commit 77a46f28fc322b398d0ba2084f2e9718f2314a3b
Author: Jorge C. Leitao <jo...@gmail.com>
AuthorDate: Sun Jan 31 11:00:23 2021 +0100

    ARROW-11395: [DataFusion] Support custom optimizers
    
    This PR adds support for custom [logical] optimization rules to be injected to the `ExecutionContext`, thereby allowing people to create their own optimizers and configure the context with them, so that they run through the logical plan during the optimization step.
    
    Another way of thinking about this is that `QueryPlanner` is now only responsible for converting a logical plan into an execution plan, while the `optimizerRule` is responsible for re-writing a logical plan into a logical plan:
    
    * `OptimizerRule`: `LogicalPlan -> LogicalPlan`
    * `QueryPlanner`: `LogicalPlan -> ExecutionPlan`
    
    The second commit on this PR is just a small simplification that helps people writing the rules without having to worry about the `Explain`. This is important because forgetting about it has major consequences to UX (explain does not work as intended)
    
    Closes #9333 from jorgecarleitao/split_optimizer
    
    Authored-by: Jorge C. Leitao <jo...@gmail.com>
    Signed-off-by: Jorge C. Leitao <jo...@gmail.com>
---
 rust/datafusion/src/execution/context.rs           | 41 +++++++------
 rust/datafusion/src/optimizer/filter_push_down.rs  |  4 +-
 .../src/optimizer/hash_build_probe_order.rs        |  2 +-
 rust/datafusion/src/optimizer/optimizer.rs         | 17 +-----
 .../src/optimizer/projection_push_down.rs          |  6 +-
 rust/datafusion/src/optimizer/utils.rs             | 42 +++++++++++--
 rust/datafusion/tests/user_defined_plan.rs         | 70 ++++++++--------------
 7 files changed, 92 insertions(+), 90 deletions(-)

diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs
index 5600c55..1accff2 100644
--- a/rust/datafusion/src/execution/context.rs
+++ b/rust/datafusion/src/execution/context.rs
@@ -324,19 +324,15 @@ impl ExecutionContext {
 
     /// Optimize the logical plan by applying optimizer rules
     pub fn optimize(&self, plan: &LogicalPlan) -> Result<LogicalPlan> {
-        // Apply standard rewrites and optimizations
+        let optimizers = &self.state.lock().unwrap().config.optimizers;
+
+        let mut new_plan = plan.clone();
         debug!("Logical plan:\n {:?}", plan);
-        let mut plan = ProjectionPushDown::new().optimize(&plan)?;
-        plan = FilterPushDown::new().optimize(&plan)?;
-        plan = HashBuildProbeOrder::new().optimize(&plan)?;
+        for optimizer in optimizers {
+            new_plan = optimizer.optimize(&new_plan)?;
+        }
         debug!("Optimized logical plan:\n {:?}", plan);
-
-        self.state
-            .lock()
-            .unwrap()
-            .config
-            .query_planner
-            .rewrite_logical_plan(plan)
+        Ok(new_plan)
     }
 
     /// Create a physical plan from a logical plan
@@ -454,13 +450,6 @@ impl FunctionRegistry for ExecutionContext {
 
 /// A planner used to add extensions to DataFusion logical and physical plans.
 pub trait QueryPlanner {
-    /// Given a `LogicalPlan`, create a new, modified `LogicalPlan`
-    /// plan. This method is run after built in `OptimizerRule`s. By
-    /// default returns the `plan` unmodified.
-    fn rewrite_logical_plan(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
-        Ok(plan)
-    }
-
     /// Given a `LogicalPlan`, create an `ExecutionPlan` suitable for execution
     fn create_physical_plan(
         &self,
@@ -491,6 +480,8 @@ pub struct ExecutionConfig {
     pub concurrency: usize,
     /// Default batch size when reading data sources
     pub batch_size: usize,
+    /// Responsible for optimizing a logical plan
+    optimizers: Vec<Arc<dyn OptimizerRule + Send + Sync>>,
     /// Responsible for planning `LogicalPlan`s, and `ExecutionPlan`
     query_planner: Arc<dyn QueryPlanner + Send + Sync>,
 }
@@ -501,6 +492,11 @@ impl ExecutionConfig {
         Self {
             concurrency: num_cpus::get(),
             batch_size: 32768,
+            optimizers: vec![
+                Arc::new(ProjectionPushDown::new()),
+                Arc::new(FilterPushDown::new()),
+                Arc::new(HashBuildProbeOrder::new()),
+            ],
             query_planner: Arc::new(DefaultQueryPlanner {}),
         }
     }
@@ -529,6 +525,15 @@ impl ExecutionConfig {
         self.query_planner = query_planner;
         self
     }
+
+    /// Adds a new [`OptimizerRule`]
+    pub fn add_optimizer_rule(
+        mut self,
+        optimizer_rule: Arc<dyn OptimizerRule + Send + Sync>,
+    ) -> Self {
+        self.optimizers.push(optimizer_rule);
+        self
+    }
 }
 
 /// Execution context for registering data sources and executing queries
diff --git a/rust/datafusion/src/optimizer/filter_push_down.rs b/rust/datafusion/src/optimizer/filter_push_down.rs
index 421bffa..b72ede1 100644
--- a/rust/datafusion/src/optimizer/filter_push_down.rs
+++ b/rust/datafusion/src/optimizer/filter_push_down.rs
@@ -396,7 +396,7 @@ impl OptimizerRule for FilterPushDown {
         "filter_push_down"
     }
 
-    fn optimize(&mut self, plan: &LogicalPlan) -> Result<LogicalPlan> {
+    fn optimize(&self, plan: &LogicalPlan) -> Result<LogicalPlan> {
         optimize(plan, State::default())
     }
 }
@@ -438,7 +438,7 @@ mod tests {
     use arrow::datatypes::SchemaRef;
 
     fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
-        let mut rule = FilterPushDown::new();
+        let rule = FilterPushDown::new();
         let optimized_plan = rule.optimize(plan).expect("failed to optimize plan");
         let formatted_plan = format!("{:?}", optimized_plan);
         assert_eq!(formatted_plan, expected);
diff --git a/rust/datafusion/src/optimizer/hash_build_probe_order.rs b/rust/datafusion/src/optimizer/hash_build_probe_order.rs
index 6d6a884..5d63926 100644
--- a/rust/datafusion/src/optimizer/hash_build_probe_order.rs
+++ b/rust/datafusion/src/optimizer/hash_build_probe_order.rs
@@ -98,7 +98,7 @@ impl OptimizerRule for HashBuildProbeOrder {
         "hash_build_probe_order"
     }
 
-    fn optimize(&mut self, plan: &LogicalPlan) -> Result<LogicalPlan> {
+    fn optimize(&self, plan: &LogicalPlan) -> Result<LogicalPlan> {
         match plan {
             // Main optimization rule, swaps order of left and right
             // based on number of rows in each table
diff --git a/rust/datafusion/src/optimizer/optimizer.rs b/rust/datafusion/src/optimizer/optimizer.rs
index 3001519..34c566f 100644
--- a/rust/datafusion/src/optimizer/optimizer.rs
+++ b/rust/datafusion/src/optimizer/optimizer.rs
@@ -17,7 +17,6 @@
 
 //! Query optimizer traits
 
-use super::utils;
 use crate::error::Result;
 use crate::logical_plan::LogicalPlan;
 
@@ -25,21 +24,7 @@ use crate::logical_plan::LogicalPlan;
 /// logical plan.
 pub trait OptimizerRule {
     /// Perform optimizations on the plan
-    fn optimize(&mut self, plan: &LogicalPlan) -> Result<LogicalPlan>;
+    fn optimize(&self, plan: &LogicalPlan) -> Result<LogicalPlan>;
     /// Produce a human readable name for this optimizer rule
     fn name(&self) -> &str;
-
-    /// Convenience rule for writing optimizers: recursively invoke
-    /// optimize on plan's children and then return a node of the same
-    /// type. Useful for optimizer rules which want to leave the type
-    /// of plan unchanged but still apply to the children.
-    fn optimize_children(&mut self, plan: &LogicalPlan) -> Result<LogicalPlan> {
-        let new_exprs = utils::expressions(&plan);
-        let new_inputs = utils::inputs(&plan)
-            .into_iter()
-            .map(|plan| self.optimize(plan))
-            .collect::<Result<Vec<_>>>()?;
-
-        utils::from_plan(plan, &new_exprs, &new_inputs)
-    }
 }
diff --git a/rust/datafusion/src/optimizer/projection_push_down.rs b/rust/datafusion/src/optimizer/projection_push_down.rs
index bb87ed8..948fe71 100644
--- a/rust/datafusion/src/optimizer/projection_push_down.rs
+++ b/rust/datafusion/src/optimizer/projection_push_down.rs
@@ -32,7 +32,7 @@ use utils::optimize_explain;
 pub struct ProjectionPushDown {}
 
 impl OptimizerRule for ProjectionPushDown {
-    fn optimize(&mut self, plan: &LogicalPlan) -> Result<LogicalPlan> {
+    fn optimize(&self, plan: &LogicalPlan) -> Result<LogicalPlan> {
         // set of all columns refered by the plan (and thus considered required by the root)
         let required_columns = plan
             .schema()
@@ -108,7 +108,7 @@ fn get_projected_schema(
 
 /// Recursively transverses the logical plan removing expressions and that are not needed.
 fn optimize_plan(
-    optimizer: &mut ProjectionPushDown,
+    optimizer: &ProjectionPushDown,
     plan: &LogicalPlan,
     required_columns: &HashSet<String>, // set of columns required up to this step
     has_projection: bool,
@@ -525,7 +525,7 @@ mod tests {
     }
 
     fn optimize(plan: &LogicalPlan) -> Result<LogicalPlan> {
-        let mut rule = ProjectionPushDown::new();
+        let rule = ProjectionPushDown::new();
         rule.optimize(plan)
     }
 }
diff --git a/rust/datafusion/src/optimizer/utils.rs b/rust/datafusion/src/optimizer/utils.rs
index 68143b1..474aecc 100644
--- a/rust/datafusion/src/optimizer/utils.rs
+++ b/rust/datafusion/src/optimizer/utils.rs
@@ -95,7 +95,7 @@ pub fn expr_to_column_names(expr: &Expr, accum: &mut HashSet<String>) -> Result<
 /// Create a `LogicalPlan::Explain` node by running `optimizer` on the
 /// input plan and capturing the resulting plan string
 pub fn optimize_explain(
-    optimizer: &mut impl OptimizerRule,
+    optimizer: &impl OptimizerRule,
     verbose: bool,
     plan: &LogicalPlan,
     stringified_plans: &Vec<StringifiedPlan>,
@@ -119,6 +119,40 @@ pub fn optimize_explain(
     })
 }
 
+/// Convenience rule for writing optimizers: recursively invoke
+/// optimize on plan's children and then return a node of the same
+/// type. Useful for optimizer rules which want to leave the type
+/// of plan unchanged but still apply to the children.
+/// This also handles the case when the `plan` is a [`LogicalPlan::Explain`].
+pub fn optimize_children(
+    optimizer: &impl OptimizerRule,
+    plan: &LogicalPlan,
+) -> Result<LogicalPlan> {
+    if let LogicalPlan::Explain {
+        verbose,
+        plan,
+        stringified_plans,
+        schema,
+    } = plan
+    {
+        return optimize_explain(
+            optimizer,
+            *verbose,
+            &*plan,
+            stringified_plans,
+            &schema.as_ref().to_owned().into(),
+        );
+    }
+
+    let new_exprs = expressions(&plan);
+    let new_inputs = inputs(&plan)
+        .into_iter()
+        .map(|plan| optimizer.optimize(plan))
+        .collect::<Result<Vec<_>>>()?;
+
+    from_plan(plan, &new_exprs, &new_inputs)
+}
+
 /// returns all expressions (non-recursively) in the current logical plan node.
 pub fn expressions(plan: &LogicalPlan) -> Vec<Expr> {
     match plan {
@@ -446,7 +480,7 @@ mod tests {
     struct TestOptimizer {}
 
     impl OptimizerRule for TestOptimizer {
-        fn optimize(&mut self, plan: &LogicalPlan) -> Result<LogicalPlan> {
+        fn optimize(&self, plan: &LogicalPlan) -> Result<LogicalPlan> {
             Ok(plan.clone())
         }
 
@@ -457,13 +491,13 @@ mod tests {
 
     #[test]
     fn test_optimize_explain() -> Result<()> {
-        let mut optimizer = TestOptimizer {};
+        let optimizer = TestOptimizer {};
 
         let empty_plan = LogicalPlanBuilder::empty(false).build()?;
         let schema = LogicalPlan::explain_schema();
 
         let optimized_explain = optimize_explain(
-            &mut optimizer,
+            &optimizer,
             true,
             &empty_plan,
             &vec![StringifiedPlan::new(PlanType::LogicalPlan, "...")],
diff --git a/rust/datafusion/tests/user_defined_plan.rs b/rust/datafusion/tests/user_defined_plan.rs
index 315a89e..51e1cd8 100644
--- a/rust/datafusion/tests/user_defined_plan.rs
+++ b/rust/datafusion/tests/user_defined_plan.rs
@@ -72,7 +72,7 @@ use datafusion::{
     execution::context::ExecutionContextState,
     execution::context::QueryPlanner,
     logical_plan::{Expr, LogicalPlan, UserDefinedLogicalNode},
-    optimizer::{optimizer::OptimizerRule, utils::optimize_explain},
+    optimizer::{optimizer::OptimizerRule, utils::optimize_children},
     physical_plan::{
         planner::{DefaultPhysicalPlanner, ExtensionPlanner},
         Distribution, ExecutionPlan, Partitioning, PhysicalPlanner, RecordBatchStream,
@@ -178,7 +178,9 @@ async fn topk_plan() -> Result<()> {
 }
 
 fn make_topk_context() -> ExecutionContext {
-    let config = ExecutionConfig::new().with_query_planner(Arc::new(TopKQueryPlanner {}));
+    let config = ExecutionConfig::new()
+        .with_query_planner(Arc::new(TopKQueryPlanner {}))
+        .add_optimizer_rule(Arc::new(TopKOptimizerRule {}));
 
     ExecutionContext::with_config(config)
 }
@@ -188,10 +190,6 @@ fn make_topk_context() -> ExecutionContext {
 struct TopKQueryPlanner {}
 
 impl QueryPlanner for TopKQueryPlanner {
-    fn rewrite_logical_plan(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
-        TopKOptimizerRule {}.optimize(&plan)
-    }
-
     /// Given a `LogicalPlan` created from above, create an
     /// `ExecutionPlan` suitable for execution
     fn create_physical_plan(
@@ -210,52 +208,32 @@ impl QueryPlanner for TopKQueryPlanner {
 struct TopKOptimizerRule {}
 impl OptimizerRule for TopKOptimizerRule {
     // Example rewrite pass to insert a user defined LogicalPlanNode
-    fn optimize(&mut self, plan: &LogicalPlan) -> Result<LogicalPlan> {
-        match plan {
-            // Note: this code simply looks for the pattern of a Limit followed by a
-            // Sort and replaces it by a TopK node. It does not handle many
-            // edge cases (e.g multiple sort columns, sort ASC / DESC), etc.
-            LogicalPlan::Limit { ref n, ref input } => {
-                if let LogicalPlan::Sort {
-                    ref expr,
-                    ref input,
-                } = **input
-                {
-                    if expr.len() == 1 {
-                        // we found a sort with a single sort expr, replace with a a TopK
-                        return Ok(LogicalPlan::Extension {
-                            node: Arc::new(TopKPlanNode {
-                                k: *n,
-                                input: self.optimize(input.as_ref())?,
-                                expr: expr[0].clone(),
-                            }),
-                        });
-                    }
+    fn optimize(&self, plan: &LogicalPlan) -> Result<LogicalPlan> {
+        // Note: this code simply looks for the pattern of a Limit followed by a
+        // Sort and replaces it by a TopK node. It does not handle many
+        // edge cases (e.g multiple sort columns, sort ASC / DESC), etc.
+        if let LogicalPlan::Limit { ref n, ref input } = plan {
+            if let LogicalPlan::Sort {
+                ref expr,
+                ref input,
+            } = **input
+            {
+                if expr.len() == 1 {
+                    // we found a sort with a single sort expr, replace with a a TopK
+                    return Ok(LogicalPlan::Extension {
+                        node: Arc::new(TopKPlanNode {
+                            k: *n,
+                            input: self.optimize(input.as_ref())?,
+                            expr: expr[0].clone(),
+                        }),
+                    });
                 }
             }
-            // Due to the way explain is implemented, in order to get
-            // explain functionality we need to explicitly handle it
-            // here.
-            LogicalPlan::Explain {
-                verbose,
-                plan,
-                stringified_plans,
-                schema,
-            } => {
-                return optimize_explain(
-                    self,
-                    *verbose,
-                    &*plan,
-                    stringified_plans,
-                    &schema.as_ref().to_owned().into(),
-                )
-            }
-            _ => {}
         }
 
         // If we didn't find the Limit/Sort combination, recurse as
         // normal and build the result.
-        self.optimize_children(plan)
+        optimize_children(self, plan)
     }
 
     fn name(&self) -> &str {