You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2022/10/03 17:07:41 UTC
[arrow-datafusion] branch master updated: Move optimizer init to optimizer crate (#3692)
This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 6b282ec95 Move optimizer init to optimizer crate (#3692)
6b282ec95 is described below
commit 6b282ec9585a1ca150c5760ba0536876706c6d35
Author: Andy Grove <an...@gmail.com>
AuthorDate: Mon Oct 3 11:07:34 2022 -0600
Move optimizer init to optimizer crate (#3692)
---
datafusion/core/src/execution/context.rs | 59 +++++--------------------
datafusion/optimizer/src/optimizer.rs | 60 ++++++++++++++++++++++++--
datafusion/optimizer/tests/integration-test.rs | 44 +------------------
3 files changed, 68 insertions(+), 95 deletions(-)
diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs
index 2a805a5fc..7284d3e8a 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -33,10 +33,7 @@ use crate::{
MemTable, ViewTable,
},
logical_plan::{PlanType, ToStringifiedPlan},
- optimizer::{
- eliminate_filter::EliminateFilter, eliminate_limit::EliminateLimit,
- optimizer::Optimizer,
- },
+ optimizer::optimizer::Optimizer,
physical_optimizer::{
aggregate_statistics::AggregateStatistics,
hash_build_probe_order::HashBuildProbeOrder, optimizer::PhysicalOptimizerRule,
@@ -72,16 +69,7 @@ use crate::logical_plan::{
CreateMemoryTable, CreateView, DropTable, FunctionRegistry, LogicalPlan,
LogicalPlanBuilder, UNNAMED_TABLE,
};
-use crate::optimizer::common_subexpr_eliminate::CommonSubexprEliminate;
-use crate::optimizer::filter_push_down::FilterPushDown;
-use crate::optimizer::limit_push_down::LimitPushDown;
use crate::optimizer::optimizer::{OptimizerConfig, OptimizerRule};
-use crate::optimizer::projection_push_down::ProjectionPushDown;
-use crate::optimizer::reduce_cross_join::ReduceCrossJoin;
-use crate::optimizer::reduce_outer_join::ReduceOuterJoin;
-use crate::optimizer::simplify_expressions::SimplifyExpressions;
-use crate::optimizer::single_distinct_to_groupby::SingleDistinctToGroupBy;
-use crate::optimizer::subquery_filter_to_join::SubqueryFilterToJoin;
use datafusion_sql::{ResolvedTableReference, TableReference};
use crate::physical_optimizer::coalesce_batches::CoalesceBatches;
@@ -107,13 +95,6 @@ use chrono::{DateTime, Utc};
use datafusion_common::ScalarValue;
use datafusion_expr::logical_plan::DropView;
use datafusion_expr::{TableSource, TableType};
-use datafusion_optimizer::decorrelate_where_exists::DecorrelateWhereExists;
-use datafusion_optimizer::decorrelate_where_in::DecorrelateWhereIn;
-use datafusion_optimizer::filter_null_join_keys::FilterNullJoinKeys;
-use datafusion_optimizer::rewrite_disjunctive_predicate::RewriteDisjunctivePredicate;
-use datafusion_optimizer::scalar_subquery_to_join::ScalarSubqueryToJoin;
-use datafusion_optimizer::type_coercion::TypeCoercion;
-use datafusion_optimizer::unwrap_cast_in_comparison::UnwrapCastInComparison;
use datafusion_sql::{
parser::DFParser,
planner::{ContextProvider, SqlToRel},
@@ -1465,33 +1446,13 @@ impl SessionState {
.register_catalog(config.default_catalog.clone(), default_catalog);
}
- let mut rules: Vec<Arc<dyn OptimizerRule + Sync + Send>> = vec![
- Arc::new(TypeCoercion::new()),
- Arc::new(SimplifyExpressions::new()),
- Arc::new(UnwrapCastInComparison::new()),
- Arc::new(DecorrelateWhereExists::new()),
- Arc::new(DecorrelateWhereIn::new()),
- Arc::new(ScalarSubqueryToJoin::new()),
- Arc::new(SubqueryFilterToJoin::new()),
- Arc::new(EliminateFilter::new()),
- Arc::new(ReduceCrossJoin::new()),
- Arc::new(CommonSubexprEliminate::new()),
- Arc::new(EliminateLimit::new()),
- Arc::new(ProjectionPushDown::new()),
- Arc::new(RewriteDisjunctivePredicate::new()),
- ];
- if config
- .config_options
- .read()
- .get_bool(OPT_FILTER_NULL_JOIN_KEYS)
- .unwrap_or_default()
- {
- rules.push(Arc::new(FilterNullJoinKeys::default()));
- }
- rules.push(Arc::new(ReduceOuterJoin::new()));
- rules.push(Arc::new(FilterPushDown::new()));
- rules.push(Arc::new(LimitPushDown::new()));
- rules.push(Arc::new(SingleDistinctToGroupBy::new()));
+ let optimizer_config = OptimizerConfig::new().filter_null_keys(
+ config
+ .config_options
+ .read()
+ .get_bool(OPT_FILTER_NULL_JOIN_KEYS)
+ .unwrap_or_default(),
+ );
let mut physical_optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Sync + Send>> = vec![
Arc::new(AggregateStatistics::new()),
@@ -1518,7 +1479,7 @@ impl SessionState {
SessionState {
session_id,
- optimizer: Optimizer::new(rules),
+ optimizer: Optimizer::new(&optimizer_config),
physical_optimizers,
query_planner: Arc::new(DefaultQueryPlanner {}),
catalog_list,
@@ -1575,7 +1536,7 @@ impl SessionState {
mut self,
rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>,
) -> Self {
- self.optimizer = Optimizer::new(rules);
+ self.optimizer = Optimizer::with_rules(rules);
self
}
diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs
index e2ccd4944..5ef5cfdd5 100644
--- a/datafusion/optimizer/src/optimizer.rs
+++ b/datafusion/optimizer/src/optimizer.rs
@@ -17,6 +17,24 @@
//! Query optimizer traits
+use crate::common_subexpr_eliminate::CommonSubexprEliminate;
+use crate::decorrelate_where_exists::DecorrelateWhereExists;
+use crate::decorrelate_where_in::DecorrelateWhereIn;
+use crate::eliminate_filter::EliminateFilter;
+use crate::eliminate_limit::EliminateLimit;
+use crate::filter_null_join_keys::FilterNullJoinKeys;
+use crate::filter_push_down::FilterPushDown;
+use crate::limit_push_down::LimitPushDown;
+use crate::projection_push_down::ProjectionPushDown;
+use crate::reduce_cross_join::ReduceCrossJoin;
+use crate::reduce_outer_join::ReduceOuterJoin;
+use crate::rewrite_disjunctive_predicate::RewriteDisjunctivePredicate;
+use crate::scalar_subquery_to_join::ScalarSubqueryToJoin;
+use crate::simplify_expressions::SimplifyExpressions;
+use crate::single_distinct_to_groupby::SingleDistinctToGroupBy;
+use crate::subquery_filter_to_join::SubqueryFilterToJoin;
+use crate::type_coercion::TypeCoercion;
+use crate::unwrap_cast_in_comparison::UnwrapCastInComparison;
use chrono::{DateTime, Utc};
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::logical_plan::LogicalPlan;
@@ -50,6 +68,8 @@ pub struct OptimizerConfig {
next_id: usize,
/// Option to skip rules that produce errors
skip_failing_rules: bool,
+ /// Specify whether to enable the filter_null_keys rule
+ filter_null_keys: bool,
}
impl OptimizerConfig {
@@ -59,9 +79,16 @@ impl OptimizerConfig {
query_execution_start_time: chrono::Utc::now(),
next_id: 0, // useful for generating things like unique subquery aliases
skip_failing_rules: true,
+ filter_null_keys: true,
}
}
+ /// Specify whether to enable the filter_null_keys rule
+ pub fn filter_null_keys(mut self, filter_null_keys: bool) -> Self {
+ self.filter_null_keys = filter_null_keys;
+ self
+ }
+
/// Specify whether the optimizer should skip rules that produce
/// errors, or fail the query
pub fn with_query_execution_start_time(
@@ -107,8 +134,35 @@ pub struct Optimizer {
}
impl Optimizer {
+ /// Create a new optimizer using the recommended list of rules
+ pub fn new(config: &OptimizerConfig) -> Self {
+ let mut rules: Vec<Arc<dyn OptimizerRule + Sync + Send>> = vec![
+ Arc::new(TypeCoercion::new()),
+ Arc::new(SimplifyExpressions::new()),
+ Arc::new(UnwrapCastInComparison::new()),
+ Arc::new(DecorrelateWhereExists::new()),
+ Arc::new(DecorrelateWhereIn::new()),
+ Arc::new(ScalarSubqueryToJoin::new()),
+ Arc::new(SubqueryFilterToJoin::new()),
+ Arc::new(EliminateFilter::new()),
+ Arc::new(ReduceCrossJoin::new()),
+ Arc::new(CommonSubexprEliminate::new()),
+ Arc::new(EliminateLimit::new()),
+ Arc::new(ProjectionPushDown::new()),
+ Arc::new(RewriteDisjunctivePredicate::new()),
+ ];
+ if config.filter_null_keys {
+ rules.push(Arc::new(FilterNullJoinKeys::default()));
+ }
+ rules.push(Arc::new(ReduceOuterJoin::new()));
+ rules.push(Arc::new(FilterPushDown::new()));
+ rules.push(Arc::new(LimitPushDown::new()));
+ rules.push(Arc::new(SingleDistinctToGroupBy::new()));
+ Self::with_rules(rules)
+ }
+
/// Create a new optimizer with the given rules
- pub fn new(rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>) -> Self {
+ pub fn with_rules(rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>) -> Self {
Self { rules }
}
@@ -172,7 +226,7 @@ mod tests {
#[test]
fn skip_failing_rule() -> Result<(), DataFusionError> {
- let opt = Optimizer::new(vec![Arc::new(BadRule {})]);
+ let opt = Optimizer::with_rules(vec![Arc::new(BadRule {})]);
let mut config = OptimizerConfig::new().with_skip_failing_rules(true);
let plan = LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
@@ -184,7 +238,7 @@ mod tests {
#[test]
fn no_skip_failing_rule() -> Result<(), DataFusionError> {
- let opt = Optimizer::new(vec![Arc::new(BadRule {})]);
+ let opt = Optimizer::with_rules(vec![Arc::new(BadRule {})]);
let mut config = OptimizerConfig::new().with_skip_failing_rules(false);
let plan = LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
diff --git a/datafusion/optimizer/tests/integration-test.rs b/datafusion/optimizer/tests/integration-test.rs
index 7811e475c..86f55e698 100644
--- a/datafusion/optimizer/tests/integration-test.rs
+++ b/datafusion/optimizer/tests/integration-test.rs
@@ -18,25 +18,7 @@
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::{AggregateUDF, LogicalPlan, ScalarUDF, TableSource};
-use datafusion_optimizer::common_subexpr_eliminate::CommonSubexprEliminate;
-use datafusion_optimizer::decorrelate_where_exists::DecorrelateWhereExists;
-use datafusion_optimizer::decorrelate_where_in::DecorrelateWhereIn;
-use datafusion_optimizer::eliminate_filter::EliminateFilter;
-use datafusion_optimizer::eliminate_limit::EliminateLimit;
-use datafusion_optimizer::filter_null_join_keys::FilterNullJoinKeys;
-use datafusion_optimizer::filter_push_down::FilterPushDown;
-use datafusion_optimizer::limit_push_down::LimitPushDown;
use datafusion_optimizer::optimizer::Optimizer;
-use datafusion_optimizer::projection_push_down::ProjectionPushDown;
-use datafusion_optimizer::reduce_cross_join::ReduceCrossJoin;
-use datafusion_optimizer::reduce_outer_join::ReduceOuterJoin;
-use datafusion_optimizer::rewrite_disjunctive_predicate::RewriteDisjunctivePredicate;
-use datafusion_optimizer::scalar_subquery_to_join::ScalarSubqueryToJoin;
-use datafusion_optimizer::simplify_expressions::SimplifyExpressions;
-use datafusion_optimizer::single_distinct_to_groupby::SingleDistinctToGroupBy;
-use datafusion_optimizer::subquery_filter_to_join::SubqueryFilterToJoin;
-use datafusion_optimizer::type_coercion::TypeCoercion;
-use datafusion_optimizer::unwrap_cast_in_comparison::UnwrapCastInComparison;
use datafusion_optimizer::{OptimizerConfig, OptimizerRule};
use datafusion_sql::planner::{ContextProvider, SqlToRel};
use datafusion_sql::sqlparser::ast::Statement;
@@ -104,31 +86,6 @@ fn between_date64_plus_interval() -> Result<()> {
}
fn test_sql(sql: &str) -> Result<LogicalPlan> {
- // TODO should make align with rules in the context
- // https://github.com/apache/arrow-datafusion/issues/3524
- let rules: Vec<Arc<dyn OptimizerRule + Sync + Send>> = vec![
- Arc::new(TypeCoercion::new()),
- Arc::new(SimplifyExpressions::new()),
- Arc::new(UnwrapCastInComparison::new()),
- Arc::new(DecorrelateWhereExists::new()),
- Arc::new(DecorrelateWhereIn::new()),
- Arc::new(ScalarSubqueryToJoin::new()),
- Arc::new(SubqueryFilterToJoin::new()),
- Arc::new(EliminateFilter::new()),
- Arc::new(CommonSubexprEliminate::new()),
- Arc::new(EliminateLimit::new()),
- Arc::new(ReduceCrossJoin::new()),
- Arc::new(ProjectionPushDown::new()),
- Arc::new(RewriteDisjunctivePredicate::new()),
- Arc::new(FilterNullJoinKeys::default()),
- Arc::new(ReduceOuterJoin::new()),
- Arc::new(FilterPushDown::new()),
- Arc::new(LimitPushDown::new()),
- Arc::new(SingleDistinctToGroupBy::new()),
- ];
-
- let optimizer = Optimizer::new(rules);
-
// parse the SQL
let dialect = GenericDialect {}; // or AnsiDialect, or your own dialect ...
let ast: Vec<Statement> = Parser::parse_sql(&dialect, sql).unwrap();
@@ -141,6 +98,7 @@ fn test_sql(sql: &str) -> Result<LogicalPlan> {
// optimize the logical plan
let mut config = OptimizerConfig::new().with_skip_failing_rules(false);
+ let optimizer = Optimizer::new(&config);
optimizer.optimize(&plan, &mut config, &observe)
}