You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2022/07/14 20:04:46 UTC

[arrow-datafusion] branch master updated: Optimizer should have option to skip failing rules (#2909)

This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 8ad3df54f Optimizer should have option to skip failing rules (#2909)
8ad3df54f is described below

commit 8ad3df54f54acd801d81485d7bb9678bf4727e7c
Author: Andy Grove <ag...@apache.org>
AuthorDate: Thu Jul 14 14:04:41 2022 -0600

    Optimizer should have option to skip failing rules (#2909)
    
    * Projection::new
    
    * try_new and validation
    
    * better variable name
    
    * split into try_new and try_new_with_schema
    
    * fmt
    
    * skip optimizer rules that fail
    
    * unit tests
    
    * fix merge conflict
    
    * user configurable
    
    * Update datafusion/optimizer/src/optimizer.rs
    
    Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
    
    * Update datafusion/optimizer/src/optimizer.rs
    
    Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
    
    * Update datafusion/optimizer/src/optimizer.rs
    
    Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
    
    * fix errors from applying suggestions
    
    Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
---
 datafusion/core/src/config.rs            | 13 ++++-
 datafusion/core/src/execution/context.rs |  8 ++-
 datafusion/optimizer/src/optimizer.rs    | 94 ++++++++++++++++++++++++++++++--
 docs/source/user-guide/configs.md        |  1 +
 4 files changed, 108 insertions(+), 8 deletions(-)

diff --git a/datafusion/core/src/config.rs b/datafusion/core/src/config.rs
index f732d6431..054c66bf5 100644
--- a/datafusion/core/src/config.rs
+++ b/datafusion/core/src/config.rs
@@ -43,6 +43,10 @@ pub const OPT_COALESCE_BATCHES: &str = "datafusion.execution.coalesce_batches";
 pub const OPT_COALESCE_TARGET_BATCH_SIZE: &str =
     "datafusion.execution.coalesce_target_batch_size";
 
+/// Configuration option "datafusion.optimizer.skip_failed_rules"
+pub const OPT_OPTIMIZER_SKIP_FAILED_RULES: &str =
+    "datafusion.optimizer.skip_failed_rules";
+
 /// Definition of a configuration option
 pub struct ConfigDefinition {
     /// key used to identifier this configuration option
@@ -156,11 +160,18 @@ impl BuiltInConfigs {
                  format!("Target batch size when coalescing batches. Uses in conjunction with the \
             configuration setting '{}'.", OPT_COALESCE_BATCHES),
                  4096,
+            ),
+            ConfigDefinition::new_bool(
+                OPT_OPTIMIZER_SKIP_FAILED_RULES,
+                "When set to true, the logical plan optimizer will produce warning \
+                messages if any optimization rules produce errors and then proceed to the next \
+                rule. When set to false, any rules that produce errors will cause the query to fail.",
+                true
             )],
         }
     }
 
-    /// Generate documentation that can be included int he user guide
+    /// Generate documentation that can be included in the user guide
     pub fn generate_config_markdown() -> String {
         use std::fmt::Write as _;
         let configs = Self::new();
diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs
index 42de53a88..5cb45be2f 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -83,7 +83,7 @@ use crate::physical_optimizer::repartition::Repartition;
 
 use crate::config::{
     ConfigOptions, OPT_BATCH_SIZE, OPT_COALESCE_BATCHES, OPT_COALESCE_TARGET_BATCH_SIZE,
-    OPT_FILTER_NULL_JOIN_KEYS,
+    OPT_FILTER_NULL_JOIN_KEYS, OPT_OPTIMIZER_SKIP_FAILED_RULES,
 };
 use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
 use crate::logical_plan::plan::Explain;
@@ -1371,7 +1371,11 @@ impl SessionState {
 
     /// Optimizes the logical plan by applying optimizer rules.
     pub fn optimize(&self, plan: &LogicalPlan) -> Result<LogicalPlan> {
-        let mut optimizer_config = OptimizerConfig::new();
+        let mut optimizer_config = OptimizerConfig::new().with_skip_failing_rules(
+            self.config
+                .config_options
+                .get_bool(OPT_OPTIMIZER_SKIP_FAILED_RULES),
+        );
         optimizer_config.query_execution_start_time =
             self.execution_props.query_execution_start_time;
 
diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs
index 1fee75df6..9d76cf5e7 100644
--- a/datafusion/optimizer/src/optimizer.rs
+++ b/datafusion/optimizer/src/optimizer.rs
@@ -20,7 +20,7 @@
 use chrono::{DateTime, Utc};
 use datafusion_common::Result;
 use datafusion_expr::logical_plan::LogicalPlan;
-use log::{debug, trace};
+use log::{debug, trace, warn};
 use std::sync::Arc;
 
 /// `OptimizerRule` transforms one ['LogicalPlan'] into another which
@@ -45,6 +45,8 @@ pub struct OptimizerConfig {
     /// to use a literal value instead
     pub query_execution_start_time: DateTime<Utc>,
     next_id: usize,
+    /// Option to skip rules that produce errors
+    skip_failing_rules: bool,
 }
 
 impl OptimizerConfig {
@@ -53,9 +55,16 @@ impl OptimizerConfig {
         Self {
             query_execution_start_time: chrono::Utc::now(),
             next_id: 0, // useful for generating things like unique subquery aliases
+            skip_failing_rules: true,
         }
     }
 
+    /// Specify whether the optimizer should skip rules that produce errors, or fail the query
+    pub fn with_skip_failing_rules(mut self, b: bool) -> Self {
+        self.skip_failing_rules = b;
+        self
+    }
+
     pub fn next_id(&mut self) -> usize {
         self.next_id += 1;
         self.next_id
@@ -97,13 +106,88 @@ impl Optimizer {
         debug!("Input logical plan:\n{}\n", plan.display_indent());
         trace!("Full input logical plan:\n{:?}", plan);
         for rule in &self.rules {
-            new_plan = rule.optimize(&new_plan, optimizer_config)?;
-            observer(&new_plan, rule.as_ref());
-            debug!("After apply {} rule:\n", rule.name());
-            debug!("Optimized logical plan:\n{}\n", new_plan.display_indent());
+            let result = rule.optimize(&new_plan, optimizer_config);
+            match result {
+                Ok(plan) => {
+                    new_plan = plan;
+                    observer(&new_plan, rule.as_ref());
+                    debug!("After apply {} rule:\n", rule.name());
+                    debug!("Optimized logical plan:\n{}\n", new_plan.display_indent());
+                }
+                Err(ref e) => {
+                    if optimizer_config.skip_failing_rules {
+                        // Note to future readers: if you see this warning it signals a
+                        // bug in the DataFusion optimizer. Please consider filing a ticket
+                        // https://github.com/apache/arrow-datafusion
+                        warn!(
+                            "Skipping optimizer rule {} due to unexpected error: {}",
+                            rule.name(),
+                            e
+                        );
+                    } else {
+                        return result;
+                    }
+                }
+            }
         }
         debug!("Optimized logical plan:\n{}\n", new_plan.display_indent());
         trace!("Full Optimized logical plan:\n {:?}", new_plan);
         Ok(new_plan)
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use crate::optimizer::Optimizer;
+    use crate::{OptimizerConfig, OptimizerRule};
+    use datafusion_common::{DFSchema, DataFusionError};
+    use datafusion_expr::logical_plan::EmptyRelation;
+    use datafusion_expr::LogicalPlan;
+    use std::sync::Arc;
+
+    #[test]
+    fn skip_failing_rule() -> Result<(), DataFusionError> {
+        let opt = Optimizer::new(vec![Arc::new(BadRule {})]);
+        let mut config = OptimizerConfig::new().with_skip_failing_rules(true);
+        let plan = LogicalPlan::EmptyRelation(EmptyRelation {
+            produce_one_row: false,
+            schema: Arc::new(DFSchema::empty()),
+        });
+        opt.optimize(&plan, &mut config, &observe)?;
+        Ok(())
+    }
+
+    #[test]
+    fn no_skip_failing_rule() -> Result<(), DataFusionError> {
+        let opt = Optimizer::new(vec![Arc::new(BadRule {})]);
+        let mut config = OptimizerConfig::new().with_skip_failing_rules(false);
+        let plan = LogicalPlan::EmptyRelation(EmptyRelation {
+            produce_one_row: false,
+            schema: Arc::new(DFSchema::empty()),
+        });
+        let result = opt.optimize(&plan, &mut config, &observe);
+        assert_eq!(
+            "Error during planning: rule failed",
+            format!("{}", result.err().unwrap())
+        );
+        Ok(())
+    }
+
+    fn observe(_plan: &LogicalPlan, _rule: &dyn OptimizerRule) {}
+
+    struct BadRule {}
+
+    impl OptimizerRule for BadRule {
+        fn optimize(
+            &self,
+            _plan: &LogicalPlan,
+            _optimizer_config: &mut OptimizerConfig,
+        ) -> datafusion_common::Result<LogicalPlan> {
+            Err(DataFusionError::Plan("rule failed".to_string()))
+        }
+
+        fn name(&self) -> &str {
+            "bad rule"
+        }
+    }
+}
diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md
index 41f6df850..6794067bf 100644
--- a/docs/source/user-guide/configs.md
+++ b/docs/source/user-guide/configs.md
@@ -43,3 +43,4 @@ Environment variables are read during `SessionConfig` initialisation so they mus
 | datafusion.explain.logical_plan_only            | Boolean | false   | When set to true, the explain statement will only print logical plans.                                                                                                                                                                                                                                                                                        |
 | datafusion.explain.physical_plan_only           | Boolean | false   | When set to true, the explain statement will only print physical plans.                                                                                                                                                                                                                                                                                       |
 | datafusion.optimizer.filter_null_join_keys      | Boolean | false   | When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down.                                                                                               |
+| datafusion.optimizer.skip_failed_rules          | Boolean | true    | When set to true, the logical plan optimizer will produce warning messages if any optimization rules produce errors and then proceed to the next rule. When set to false, any rules that produce errors will cause the query to fail.                                                                                                                         |