You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2022/05/25 21:15:57 UTC
[arrow-datafusion] branch master updated: refactor: add optimzer struct (#2616)
This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 894be6719 refactor: add optimzer struct (#2616)
894be6719 is described below
commit 894be6719373be85fa777028fe3ec534536660e3
Author: jakevin <30...@users.noreply.github.com>
AuthorDate: Thu May 26 05:15:53 2022 +0800
refactor: add optimzer struct (#2616)
---
datafusion/core/src/execution/context.rs | 59 ++++++++++--------------------
datafusion/core/src/optimizer/optimizer.rs | 43 ++++++++++++++++++++++
2 files changed, 62 insertions(+), 40 deletions(-)
diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs
index 619ac13b1..272cdc6da 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -33,14 +33,15 @@ use crate::{
MemTable, ViewTable,
},
logical_plan::{PlanType, ToStringifiedPlan},
- optimizer::eliminate_filter::EliminateFilter,
- optimizer::eliminate_limit::EliminateLimit,
+ optimizer::{
+ eliminate_filter::EliminateFilter, eliminate_limit::EliminateLimit,
+ optimizer::Optimizer,
+ },
physical_optimizer::{
aggregate_statistics::AggregateStatistics,
hash_build_probe_order::HashBuildProbeOrder, optimizer::PhysicalOptimizerRule,
},
};
-use log::{debug, trace};
use parking_lot::RwLock;
use std::string::String;
use std::sync::Arc;
@@ -1189,7 +1190,7 @@ pub struct SessionState {
/// Uuid for the session
pub session_id: String,
/// Responsible for optimizing a logical plan
- pub optimizers: Vec<Arc<dyn OptimizerRule + Send + Sync>>,
+ pub optimizer: Optimizer,
/// Responsible for optimizing a physical execution plan
pub physical_optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>>,
/// Responsible for planning `LogicalPlan`s, and `ExecutionPlan`
@@ -1255,7 +1256,7 @@ impl SessionState {
SessionState {
session_id,
- optimizers: vec![
+ optimizer: Optimizer::new(vec![
// Simplify expressions first to maximize the chance
// of applying other optimizations
Arc::new(SimplifyExpressions::new()),
@@ -1267,7 +1268,7 @@ impl SessionState {
Arc::new(FilterPushDown::new()),
Arc::new(LimitPushDown::new()),
Arc::new(SingleDistinctToGroupBy::new()),
- ],
+ ]),
physical_optimizers: vec![
Arc::new(AggregateStatistics::new()),
Arc::new(HashBuildProbeOrder::new()),
@@ -1328,9 +1329,9 @@ impl SessionState {
/// Replace the optimizer rules
pub fn with_optimizer_rules(
mut self,
- optimizers: Vec<Arc<dyn OptimizerRule + Send + Sync>>,
+ rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>,
) -> Self {
- self.optimizers = optimizers;
+ self.optimizer = Optimizer::new(rules);
self
}
@@ -1348,7 +1349,7 @@ impl SessionState {
mut self,
optimizer_rule: Arc<dyn OptimizerRule + Send + Sync>,
) -> Self {
- self.optimizers.push(optimizer_rule);
+ self.optimizer.rules.push(optimizer_rule);
self
}
@@ -1363,16 +1364,21 @@ impl SessionState {
/// Optimizes the logical plan by applying optimizer rules.
pub fn optimize(&self, plan: &LogicalPlan) -> Result<LogicalPlan> {
+ let execution_props = &mut self.execution_props.clone();
+
if let LogicalPlan::Explain(e) = plan {
let mut stringified_plans = e.stringified_plans.clone();
// optimize the child plan, capturing the output of each optimizer
- let plan =
- self.optimize_internal(e.plan.as_ref(), |optimized_plan, optimizer| {
+ let plan = self.optimizer.optimize(
+ e.plan.as_ref(),
+ execution_props,
+ |optimized_plan, optimizer| {
let optimizer_name = optimizer.name().to_string();
let plan_type = PlanType::OptimizedLogicalPlan { optimizer_name };
stringified_plans.push(optimized_plan.to_stringified(plan_type));
- })?;
+ },
+ )?;
Ok(LogicalPlan::Explain(Explain {
verbose: e.verbose,
@@ -1381,35 +1387,8 @@ impl SessionState {
schema: e.schema.clone(),
}))
} else {
- self.optimize_internal(plan, |_, _| {})
- }
- }
-
- /// Optimizes the logical plan by applying optimizer rules, and
- /// invoking observer function after each call
- fn optimize_internal<F>(
- &self,
- plan: &LogicalPlan,
- mut observer: F,
- ) -> Result<LogicalPlan>
- where
- F: FnMut(&LogicalPlan, &dyn OptimizerRule),
- {
- let execution_props = &mut self.execution_props.clone();
- let optimizers = &self.optimizers;
-
- let execution_props = execution_props.start_execution();
-
- let mut new_plan = plan.clone();
- debug!("Input logical plan:\n{}\n", plan.display_indent());
- trace!("Full input logical plan:\n{:?}", plan);
- for optimizer in optimizers {
- new_plan = optimizer.optimize(&new_plan, execution_props)?;
- observer(&new_plan, optimizer.as_ref());
+ self.optimizer.optimize(plan, execution_props, |_, _| {})
}
- debug!("Optimized logical plan:\n{}\n", new_plan.display_indent());
- trace!("Full Optimized logical plan:\n {:?}", plan);
- Ok(new_plan)
}
/// Creates a physical plan from a logical plan.
diff --git a/datafusion/core/src/optimizer/optimizer.rs b/datafusion/core/src/optimizer/optimizer.rs
index 5cf404794..cc00eb8f8 100644
--- a/datafusion/core/src/optimizer/optimizer.rs
+++ b/datafusion/core/src/optimizer/optimizer.rs
@@ -17,6 +17,10 @@
//! Query optimizer traits
+use std::sync::Arc;
+
+use log::{debug, trace};
+
use crate::error::Result;
use crate::execution::context::ExecutionProps;
use crate::logical_plan::LogicalPlan;
@@ -35,3 +39,42 @@ pub trait OptimizerRule {
/// A human readable name for this optimizer rule
fn name(&self) -> &str;
}
+
+/// A rule-based optimizer.
+#[derive(Clone)]
+pub struct Optimizer {
+ /// All rules to apply
+ pub rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>,
+}
+
+impl Optimizer {
+ /// Create a new optimizer with the given rules
+ pub fn new(rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>) -> Self {
+ Self { rules }
+ }
+
+ /// Optimizes the logical plan by applying optimizer rules, and
+ /// invoking observer function after each call
+ pub fn optimize<F>(
+ &self,
+ plan: &LogicalPlan,
+ execution_props: &mut ExecutionProps,
+ mut observer: F,
+ ) -> Result<LogicalPlan>
+ where
+ F: FnMut(&LogicalPlan, &dyn OptimizerRule),
+ {
+ let execution_props = execution_props.start_execution();
+
+ let mut new_plan = plan.clone();
+ debug!("Input logical plan:\n{}\n", plan.display_indent());
+ trace!("Full input logical plan:\n{:?}", plan);
+ for rule in &self.rules {
+ new_plan = rule.optimize(&new_plan, execution_props)?;
+ observer(&new_plan, rule.as_ref());
+ }
+ debug!("Optimized logical plan:\n{}\n", new_plan.display_indent());
+ trace!("Full Optimized logical plan:\n {:?}", plan);
+ Ok(new_plan)
+ }
+}