You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/02/10 15:53:24 UTC

[spark] branch branch-3.0 updated: [SPARK-30326][SQL] Raise exception if analyzer exceed max iterations

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new fd6d1b4  [SPARK-30326][SQL] Raise exception if analyzer exceed max iterations
fd6d1b4 is described below

commit fd6d1b400630d7fee6d031e6de1fccfb4993778b
Author: Eric Wu <49...@qq.com>
AuthorDate: Mon Feb 10 23:41:39 2020 +0800

    [SPARK-30326][SQL] Raise exception if analyzer exceed max iterations
    
    ### What changes were proposed in this pull request?
    Enhance RuleExecutor strategy to take different actions when exceeding max iterations. And raise exception if analyzer exceed max iterations.
    
    ### Why are the changes needed?
    Currently, both analyzer and optimizer just log warning message if rule execution exceed max iterations. They should have different behavior. Analyzer should raise exception to indicates the plan is not fixed after max iterations, while optimizer just log warning to keep the current plan. This is more feasible after SPARK-30138 was introduced.
    
    ### Does this PR introduce any user-facing change?
    No
    
    ### How was this patch tested?
    Add test in AnalysisSuite
    
    Closes #26977 from Eric5553/EnhanceMaxIterations.
    
    Authored-by: Eric Wu <49...@qq.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit b2011a295bd78b3693a516e049e90250366b8f52)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../spark/sql/catalyst/analysis/Analyzer.scala     | 10 +++++++-
 .../spark/sql/catalyst/optimizer/Optimizer.scala   |  5 +++-
 .../spark/sql/catalyst/rules/RuleExecutor.scala    | 27 ++++++++++++++++++----
 .../sql/catalyst/analysis/AnalysisSuite.scala      | 25 +++++++++++++++++++-
 4 files changed, 60 insertions(+), 7 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 75f1aa7..ce82b3b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -176,7 +176,15 @@ class Analyzer(
 
   def resolver: Resolver = conf.resolver
 
-  protected val fixedPoint = FixedPoint(maxIterations)
+  /**
+   * If the plan cannot be resolved within maxIterations, analyzer will throw exception to inform
+   * user to increase the value of SQLConf.ANALYZER_MAX_ITERATIONS.
+   */
+  protected val fixedPoint =
+    FixedPoint(
+      maxIterations,
+      errorOnExceed = true,
+      maxIterationsSetting = SQLConf.ANALYZER_MAX_ITERATIONS.key)
 
   /**
    * Override to provide additional rules for the "Resolution" batch.
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 0fdf6b0..c90117b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -53,7 +53,10 @@ abstract class Optimizer(catalogManager: CatalogManager)
       "PartitionPruning",
       "Extract Python UDFs")
 
-  protected def fixedPoint = FixedPoint(SQLConf.get.optimizerMaxIterations)
+  protected def fixedPoint =
+    FixedPoint(
+      SQLConf.get.optimizerMaxIterations,
+      maxIterationsSetting = SQLConf.OPTIMIZER_MAX_ITERATIONS.key)
 
   /**
    * Defines the default rule batches in the Optimizer.
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
index 287ae0e..da5242b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
@@ -45,7 +45,17 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {
    * An execution strategy for rules that indicates the maximum number of executions. If the
    * execution reaches fix point (i.e. converge) before maxIterations, it will stop.
    */
-  abstract class Strategy { def maxIterations: Int }
+  abstract class Strategy {
+
+    /** The maximum number of executions. */
+    def maxIterations: Int
+
+    /** Whether to throw exception when exceeding the maximum number. */
+    def errorOnExceed: Boolean = false
+
+    /** The key of SQLConf setting to tune maxIterations */
+    def maxIterationsSetting: String = null
+  }
 
   /** A strategy that is run once and idempotent. */
   case object Once extends Strategy { val maxIterations = 1 }
@@ -54,7 +64,10 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {
    * A strategy that runs until fix point or maxIterations times, whichever comes first.
    * Especially, a FixedPoint(1) batch is supposed to run only once.
    */
-  case class FixedPoint(maxIterations: Int) extends Strategy
+  case class FixedPoint(
+    override val maxIterations: Int,
+    override val errorOnExceed: Boolean = false,
+    override val maxIterationsSetting: String = null) extends Strategy
 
   /** A batch of rules. */
   protected case class Batch(name: String, strategy: Strategy, rules: Rule[TreeType]*)
@@ -155,8 +168,14 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {
         if (iteration > batch.strategy.maxIterations) {
           // Only log if this is a rule that is supposed to run more than once.
           if (iteration != 2) {
-            val message = s"Max iterations (${iteration - 1}) reached for batch ${batch.name}"
-            if (Utils.isTesting) {
+            val endingMsg = if (batch.strategy.maxIterationsSetting == null) {
+              "."
+            } else {
+              s", please set '${batch.strategy.maxIterationsSetting}' to a larger value."
+            }
+            val message = s"Max iterations (${iteration - 1}) reached for batch ${batch.name}" +
+              s"$endingMsg"
+            if (Utils.isTesting || batch.strategy.errorOnExceed) {
               throw new TreeNodeException(curPlan, message, null)
             } else {
               logWarning(message)
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
index c747d39..d385133 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
@@ -25,9 +25,10 @@ import org.scalatest.Matchers
 
 import org.apache.spark.api.python.PythonEvalType
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType}
+import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, InMemoryCatalog, SessionCatalog}
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.errors.TreeNodeException
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Count, Sum}
 import org.apache.spark.sql.catalyst.parser.CatalystSqlParser.parsePlan
@@ -745,4 +746,26 @@ class AnalysisSuite extends AnalysisTest with Matchers {
       CollectMetrics("evt1", sumWithFilter :: Nil, testRelation),
       "aggregates with filter predicate are not allowed" :: Nil)
   }
+
+  test("Analysis exceed max iterations") {
+    // RuleExecutor only throw exception or log warning when the rule is supposed to run
+    // more than once.
+    val maxIterations = 2
+    val conf = new SQLConf().copy(SQLConf.ANALYZER_MAX_ITERATIONS -> maxIterations)
+    val testAnalyzer = new Analyzer(
+      new SessionCatalog(new InMemoryCatalog, FunctionRegistry.builtin, conf), conf)
+
+    val plan = testRelation2.select(
+      $"a" / Literal(2) as "div1",
+      $"a" / $"b" as "div2",
+      $"a" / $"c" as "div3",
+      $"a" / $"d" as "div4",
+      $"e" / $"e" as "div5")
+
+    val message = intercept[TreeNodeException[LogicalPlan]] {
+      testAnalyzer.execute(plan)
+    }.getMessage
+    assert(message.startsWith(s"Max iterations ($maxIterations) reached for batch Resolution, " +
+      s"please set '${SQLConf.ANALYZER_MAX_ITERATIONS.key}' to a larger value."))
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org