You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2022/05/25 21:15:57 UTC

[arrow-datafusion] branch master updated: refactor: add optimzer struct (#2616)

This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 894be6719 refactor: add optimzer struct (#2616)
894be6719 is described below

commit 894be6719373be85fa777028fe3ec534536660e3
Author: jakevin <30...@users.noreply.github.com>
AuthorDate: Thu May 26 05:15:53 2022 +0800

    refactor: add optimzer struct (#2616)
---
 datafusion/core/src/execution/context.rs   | 59 ++++++++++--------------------
 datafusion/core/src/optimizer/optimizer.rs | 43 ++++++++++++++++++++++
 2 files changed, 62 insertions(+), 40 deletions(-)

diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs
index 619ac13b1..272cdc6da 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -33,14 +33,15 @@ use crate::{
         MemTable, ViewTable,
     },
     logical_plan::{PlanType, ToStringifiedPlan},
-    optimizer::eliminate_filter::EliminateFilter,
-    optimizer::eliminate_limit::EliminateLimit,
+    optimizer::{
+        eliminate_filter::EliminateFilter, eliminate_limit::EliminateLimit,
+        optimizer::Optimizer,
+    },
     physical_optimizer::{
         aggregate_statistics::AggregateStatistics,
         hash_build_probe_order::HashBuildProbeOrder, optimizer::PhysicalOptimizerRule,
     },
 };
-use log::{debug, trace};
 use parking_lot::RwLock;
 use std::string::String;
 use std::sync::Arc;
@@ -1189,7 +1190,7 @@ pub struct SessionState {
     /// Uuid for the session
     pub session_id: String,
     /// Responsible for optimizing a logical plan
-    pub optimizers: Vec<Arc<dyn OptimizerRule + Send + Sync>>,
+    pub optimizer: Optimizer,
     /// Responsible for optimizing a physical execution plan
     pub physical_optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>>,
     /// Responsible for planning `LogicalPlan`s, and `ExecutionPlan`
@@ -1255,7 +1256,7 @@ impl SessionState {
 
         SessionState {
             session_id,
-            optimizers: vec![
+            optimizer: Optimizer::new(vec![
                 // Simplify expressions first to maximize the chance
                 // of applying other optimizations
                 Arc::new(SimplifyExpressions::new()),
@@ -1267,7 +1268,7 @@ impl SessionState {
                 Arc::new(FilterPushDown::new()),
                 Arc::new(LimitPushDown::new()),
                 Arc::new(SingleDistinctToGroupBy::new()),
-            ],
+            ]),
             physical_optimizers: vec![
                 Arc::new(AggregateStatistics::new()),
                 Arc::new(HashBuildProbeOrder::new()),
@@ -1328,9 +1329,9 @@ impl SessionState {
     /// Replace the optimizer rules
     pub fn with_optimizer_rules(
         mut self,
-        optimizers: Vec<Arc<dyn OptimizerRule + Send + Sync>>,
+        rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>,
     ) -> Self {
-        self.optimizers = optimizers;
+        self.optimizer = Optimizer::new(rules);
         self
     }
 
@@ -1348,7 +1349,7 @@ impl SessionState {
         mut self,
         optimizer_rule: Arc<dyn OptimizerRule + Send + Sync>,
     ) -> Self {
-        self.optimizers.push(optimizer_rule);
+        self.optimizer.rules.push(optimizer_rule);
         self
     }
 
@@ -1363,16 +1364,21 @@ impl SessionState {
 
     /// Optimizes the logical plan by applying optimizer rules.
     pub fn optimize(&self, plan: &LogicalPlan) -> Result<LogicalPlan> {
+        let execution_props = &mut self.execution_props.clone();
+
         if let LogicalPlan::Explain(e) = plan {
             let mut stringified_plans = e.stringified_plans.clone();
 
             // optimize the child plan, capturing the output of each optimizer
-            let plan =
-                self.optimize_internal(e.plan.as_ref(), |optimized_plan, optimizer| {
+            let plan = self.optimizer.optimize(
+                e.plan.as_ref(),
+                execution_props,
+                |optimized_plan, optimizer| {
                     let optimizer_name = optimizer.name().to_string();
                     let plan_type = PlanType::OptimizedLogicalPlan { optimizer_name };
                     stringified_plans.push(optimized_plan.to_stringified(plan_type));
-                })?;
+                },
+            )?;
 
             Ok(LogicalPlan::Explain(Explain {
                 verbose: e.verbose,
@@ -1381,35 +1387,8 @@ impl SessionState {
                 schema: e.schema.clone(),
             }))
         } else {
-            self.optimize_internal(plan, |_, _| {})
-        }
-    }
-
-    /// Optimizes the logical plan by applying optimizer rules, and
-    /// invoking observer function after each call
-    fn optimize_internal<F>(
-        &self,
-        plan: &LogicalPlan,
-        mut observer: F,
-    ) -> Result<LogicalPlan>
-    where
-        F: FnMut(&LogicalPlan, &dyn OptimizerRule),
-    {
-        let execution_props = &mut self.execution_props.clone();
-        let optimizers = &self.optimizers;
-
-        let execution_props = execution_props.start_execution();
-
-        let mut new_plan = plan.clone();
-        debug!("Input logical plan:\n{}\n", plan.display_indent());
-        trace!("Full input logical plan:\n{:?}", plan);
-        for optimizer in optimizers {
-            new_plan = optimizer.optimize(&new_plan, execution_props)?;
-            observer(&new_plan, optimizer.as_ref());
+            self.optimizer.optimize(plan, execution_props, |_, _| {})
         }
-        debug!("Optimized logical plan:\n{}\n", new_plan.display_indent());
-        trace!("Full Optimized logical plan:\n {:?}", plan);
-        Ok(new_plan)
     }
 
     /// Creates a physical plan from a logical plan.
diff --git a/datafusion/core/src/optimizer/optimizer.rs b/datafusion/core/src/optimizer/optimizer.rs
index 5cf404794..cc00eb8f8 100644
--- a/datafusion/core/src/optimizer/optimizer.rs
+++ b/datafusion/core/src/optimizer/optimizer.rs
@@ -17,6 +17,10 @@
 
 //! Query optimizer traits
 
+use std::sync::Arc;
+
+use log::{debug, trace};
+
 use crate::error::Result;
 use crate::execution::context::ExecutionProps;
 use crate::logical_plan::LogicalPlan;
@@ -35,3 +39,42 @@ pub trait OptimizerRule {
     /// A human readable name for this optimizer rule
     fn name(&self) -> &str;
 }
+
+/// A rule-based optimizer.
+#[derive(Clone)]
+pub struct Optimizer {
+    /// All rules to apply
+    pub rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>,
+}
+
+impl Optimizer {
+    /// Create a new optimizer with the given rules
+    pub fn new(rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>) -> Self {
+        Self { rules }
+    }
+
+    /// Optimizes the logical plan by applying optimizer rules, and
+    /// invoking observer function after each call
+    pub fn optimize<F>(
+        &self,
+        plan: &LogicalPlan,
+        execution_props: &mut ExecutionProps,
+        mut observer: F,
+    ) -> Result<LogicalPlan>
+    where
+        F: FnMut(&LogicalPlan, &dyn OptimizerRule),
+    {
+        let execution_props = execution_props.start_execution();
+
+        let mut new_plan = plan.clone();
+        debug!("Input logical plan:\n{}\n", plan.display_indent());
+        trace!("Full input logical plan:\n{:?}", plan);
+        for rule in &self.rules {
+            new_plan = rule.optimize(&new_plan, execution_props)?;
+            observer(&new_plan, rule.as_ref());
+        }
+        debug!("Optimized logical plan:\n{}\n", new_plan.display_indent());
+        trace!("Full Optimized logical plan:\n {:?}", plan);
+        Ok(new_plan)
+    }
+}