You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yh...@apache.org on 2016/04/16 05:28:12 UTC

spark git commit: [SPARK-14677][SQL] Make the max number of iterations configurable for Catalyst

Repository: spark
Updated Branches:
  refs/heads/master b2dfa8495 -> f4be0946a


[SPARK-14677][SQL] Make the max number of iterations configurable for Catalyst

## What changes were proposed in this pull request?
We currently hard code the max number of optimizer/analyzer iterations to 100. This patch makes it configurable. While I'm at it, I also added the SessionCatalog to the optimizer, so we can use information there in optimization.

## How was this patch tested?
Updated unit tests to reflect the change.

Author: Reynold Xin <rx...@databricks.com>

Closes #12434 from rxin/SPARK-14677.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f4be0946
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f4be0946
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f4be0946

Branch: refs/heads/master
Commit: f4be0946af219379fb2476e6f80b2e50463adeb2
Parents: b2dfa84
Author: Reynold Xin <rx...@databricks.com>
Authored: Fri Apr 15 20:28:09 2016 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Fri Apr 15 20:28:09 2016 -0700

----------------------------------------------------------------------
 .../spark/sql/catalyst/CatalystConf.scala       | 32 +++++------------
 .../spark/sql/catalyst/analysis/Analyzer.scala  | 19 ++++++-----
 .../sql/catalyst/optimizer/Optimizer.scala      | 36 ++++++++++++--------
 .../expressions/ExpressionEvalHelper.scala      |  4 +--
 .../expressions/MathFunctionsSuite.scala        |  4 +--
 .../optimizer/OptimizerExtendableSuite.scala    | 10 ++----
 .../spark/sql/execution/SparkOptimizer.scala    | 12 ++++---
 .../org/apache/spark/sql/internal/SQLConf.scala |  7 ++++
 .../spark/sql/internal/SessionState.scala       |  2 +-
 9 files changed, 62 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f4be0946/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala
index 2b98aac..abba866 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala
@@ -19,46 +19,32 @@ package org.apache.spark.sql.catalyst
 
 import org.apache.spark.sql.catalyst.analysis._
 
-private[spark] trait CatalystConf {
+/**
+ * Interface for configuration options used in the catalyst module.
+ */
+trait CatalystConf {
   def caseSensitiveAnalysis: Boolean
 
   def orderByOrdinal: Boolean
   def groupByOrdinal: Boolean
 
+  def optimizerMaxIterations: Int
+
   /**
    * Returns the [[Resolver]] for the current configuration, which can be used to determine if two
    * identifiers are equal.
    */
   def resolver: Resolver = {
-    if (caseSensitiveAnalysis) {
-      caseSensitiveResolution
-    } else {
-      caseInsensitiveResolution
-    }
+    if (caseSensitiveAnalysis) caseSensitiveResolution else caseInsensitiveResolution
   }
 }
 
-/**
- * A trivial conf that is empty.  Used for testing when all
- * relations are already filled in and the analyser needs only to resolve attribute references.
- */
-object EmptyConf extends CatalystConf {
-  override def caseSensitiveAnalysis: Boolean = {
-    throw new UnsupportedOperationException
-  }
-  override def orderByOrdinal: Boolean = {
-    throw new UnsupportedOperationException
-  }
-  override def groupByOrdinal: Boolean = {
-    throw new UnsupportedOperationException
-  }
-}
 
 /** A CatalystConf that can be used for local testing. */
 case class SimpleCatalystConf(
     caseSensitiveAnalysis: Boolean,
     orderByOrdinal: Boolean = true,
-    groupByOrdinal: Boolean = true)
-
+    groupByOrdinal: Boolean = true,
+    optimizerMaxIterations: Int = 100)
   extends CatalystConf {
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/f4be0946/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index de40ddd..37ff6ab 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -39,14 +39,13 @@ import org.apache.spark.sql.types._
  * Used for testing when all relations are already filled in and the analyzer needs only
  * to resolve attribute references.
  */
-object SimpleAnalyzer
-  extends SimpleAnalyzer(
-    EmptyFunctionRegistry,
+object SimpleAnalyzer extends Analyzer(
+    new SessionCatalog(
+      new InMemoryCatalog,
+      EmptyFunctionRegistry,
+      new SimpleCatalystConf(caseSensitiveAnalysis = true)),
     new SimpleCatalystConf(caseSensitiveAnalysis = true))
 
-class SimpleAnalyzer(functionRegistry: FunctionRegistry, conf: CatalystConf)
-  extends Analyzer(new SessionCatalog(new InMemoryCatalog, functionRegistry, conf), conf)
-
 /**
  * Provides a logical query plan analyzer, which translates [[UnresolvedAttribute]]s and
  * [[UnresolvedRelation]]s into fully typed objects using information in a
@@ -55,9 +54,13 @@ class SimpleAnalyzer(functionRegistry: FunctionRegistry, conf: CatalystConf)
 class Analyzer(
     catalog: SessionCatalog,
     conf: CatalystConf,
-    maxIterations: Int = 100)
+    maxIterations: Int)
   extends RuleExecutor[LogicalPlan] with CheckAnalysis {
 
+  def this(catalog: SessionCatalog, conf: CatalystConf) = {
+    this(catalog, conf, conf.optimizerMaxIterations)
+  }
+
   def resolver: Resolver = {
     if (conf.caseSensitiveAnalysis) {
       caseSensitiveResolution
@@ -66,7 +69,7 @@ class Analyzer(
     }
   }
 
-  val fixedPoint = FixedPoint(maxIterations)
+  protected val fixedPoint = FixedPoint(maxIterations)
 
   /**
    * Override to provide additional rules for the "Resolution" batch.

http://git-wip-us.apache.org/repos/asf/spark/blob/f4be0946/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index bb68ef8..6c8f8f4 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.optimizer
 import scala.annotation.tailrec
 import scala.collection.immutable.HashSet
 
-import org.apache.spark.sql.catalyst.{CatalystConf, EmptyConf}
+import org.apache.spark.sql.catalyst.{CatalystConf, SimpleCatalystConf}
 import org.apache.spark.sql.catalyst.analysis.{CleanupAliases, DistinctAggregationRewriter, EliminateSubqueryAliases, EmptyFunctionRegistry}
 import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog}
 import org.apache.spark.sql.catalyst.expressions._
@@ -36,9 +36,11 @@ import org.apache.spark.sql.types._
  * Abstract class all optimizers should inherit of, contains the standard batches (extending
  * Optimizers can override this.
  */
-abstract class Optimizer(
-    conf: CatalystConf,
-    sessionCatalog: SessionCatalog) extends RuleExecutor[LogicalPlan] {
+abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf)
+  extends RuleExecutor[LogicalPlan] {
+
+  protected val fixedPoint = FixedPoint(conf.optimizerMaxIterations)
+
   def batches: Seq[Batch] = {
     // Technically some of the rules in Finish Analysis are not optimizer rules and belong more
     // in the analyzer, because they are needed for correctness (e.g. ComputeCurrentTime).
@@ -59,12 +61,12 @@ abstract class Optimizer(
     //   since the other rules might make two separate Unions operators adjacent.
     Batch("Union", Once,
       CombineUnions) ::
-    Batch("Replace Operators", FixedPoint(100),
+    Batch("Replace Operators", fixedPoint,
       ReplaceIntersectWithSemiJoin,
       ReplaceDistinctWithAggregate) ::
-    Batch("Aggregate", FixedPoint(100),
+    Batch("Aggregate", fixedPoint,
       RemoveLiteralFromGroupExpressions) ::
-    Batch("Operator Optimizations", FixedPoint(100),
+    Batch("Operator Optimizations", fixedPoint,
       // Operator push down
       SetOperationPushDown,
       SamplePushDown,
@@ -95,11 +97,11 @@ abstract class Optimizer(
       SimplifyCasts,
       SimplifyCaseConversionExpressions,
       EliminateSerialization) ::
-    Batch("Decimal Optimizations", FixedPoint(100),
+    Batch("Decimal Optimizations", fixedPoint,
       DecimalAggregates) ::
-    Batch("Typed Filter Optimization", FixedPoint(100),
+    Batch("Typed Filter Optimization", fixedPoint,
       EmbedSerializerInFilter) ::
-    Batch("LocalRelation", FixedPoint(100),
+    Batch("LocalRelation", fixedPoint,
       ConvertToLocalRelation) ::
     Batch("Subquery", Once,
       OptimizeSubqueries) :: Nil
@@ -117,15 +119,19 @@ abstract class Optimizer(
 }
 
 /**
- * Non-abstract representation of the standard Spark optimizing strategies
+ * An optimizer used in test code.
  *
  * To ensure extendability, we leave the standard rules in the abstract optimizer rules, while
  * specific rules go to the subclasses
  */
-object DefaultOptimizer
-  extends Optimizer(
-    EmptyConf,
-    new SessionCatalog(new InMemoryCatalog, EmptyFunctionRegistry, EmptyConf))
+object SimpleTestOptimizer extends SimpleTestOptimizer
+
+class SimpleTestOptimizer extends Optimizer(
+  new SessionCatalog(
+    new InMemoryCatalog,
+    EmptyFunctionRegistry,
+    new SimpleCatalystConf(caseSensitiveAnalysis = true)),
+  new SimpleCatalystConf(caseSensitiveAnalysis = true))
 
 /**
  * Pushes operations down into a Sample.

http://git-wip-us.apache.org/repos/asf/spark/blob/f4be0946/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
index cf26d48..faa90fb 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
@@ -24,7 +24,7 @@ import org.scalatest.prop.GeneratorDrivenPropertyChecks
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
 import org.apache.spark.sql.catalyst.expressions.codegen._
-import org.apache.spark.sql.catalyst.optimizer.DefaultOptimizer
+import org.apache.spark.sql.catalyst.optimizer.SimpleTestOptimizer
 import org.apache.spark.sql.catalyst.plans.logical.{OneRowRelation, Project}
 import org.apache.spark.sql.types.DataType
 import org.apache.spark.util.Utils
@@ -153,7 +153,7 @@ trait ExpressionEvalHelper extends GeneratorDrivenPropertyChecks {
       expected: Any,
       inputRow: InternalRow = EmptyRow): Unit = {
     val plan = Project(Alias(expression, s"Optimized($expression)")() :: Nil, OneRowRelation)
-    val optimizedPlan = DefaultOptimizer.execute(plan)
+    val optimizedPlan = SimpleTestOptimizer.execute(plan)
     checkEvaluationWithoutCodegen(optimizedPlan.expressions.head, expected, inputRow)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f4be0946/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala
index 27195d3..452792d 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala
@@ -25,7 +25,7 @@ import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection
-import org.apache.spark.sql.catalyst.optimizer.DefaultOptimizer
+import org.apache.spark.sql.catalyst.optimizer.SimpleTestOptimizer
 import org.apache.spark.sql.catalyst.plans.logical.{OneRowRelation, Project}
 import org.apache.spark.sql.types._
 
@@ -151,7 +151,7 @@ class MathFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper {
     expression: Expression,
     inputRow: InternalRow = EmptyRow): Unit = {
     val plan = Project(Alias(expression, s"Optimized($expression)")() :: Nil, OneRowRelation)
-    val optimizedPlan = DefaultOptimizer.execute(plan)
+    val optimizedPlan = SimpleTestOptimizer.execute(plan)
     checkNaNWithoutCodegen(optimizedPlan.expressions.head, inputRow)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f4be0946/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerExtendableSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerExtendableSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerExtendableSuite.scala
index 55af6c5..7112c03 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerExtendableSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerExtendableSuite.scala
@@ -15,12 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql.catalyst
+package org.apache.spark.sql.catalyst.optimizer
 
 import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.catalyst.analysis.EmptyFunctionRegistry
-import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog}
-import org.apache.spark.sql.catalyst.optimizer.Optimizer
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.rules.Rule
 
@@ -40,10 +37,7 @@ class OptimizerExtendableSuite extends SparkFunSuite {
    * This class represents a dummy extended optimizer that takes the batches of the
    * Optimizer and adds custom ones.
    */
-  class ExtendedOptimizer
-    extends Optimizer(
-      EmptyConf,
-      new SessionCatalog(new InMemoryCatalog, EmptyFunctionRegistry, EmptyConf)) {
+  class ExtendedOptimizer extends SimpleTestOptimizer {
 
     // rules set to DummyRule, would not be executed anyways
     val myBatches: Seq[Batch] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/f4be0946/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
index 8dfbba7..08b2d7f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
@@ -18,14 +18,16 @@
 package org.apache.spark.sql.execution
 
 import org.apache.spark.sql.ExperimentalMethods
-import org.apache.spark.sql.catalyst.CatalystConf
 import org.apache.spark.sql.catalyst.catalog.SessionCatalog
 import org.apache.spark.sql.catalyst.optimizer.Optimizer
+import org.apache.spark.sql.internal.SQLConf
 
 class SparkOptimizer(
-    conf: CatalystConf,
-    sessionCatalog: SessionCatalog,
-    experimentalMethods: ExperimentalMethods) extends Optimizer(conf, sessionCatalog) {
+    catalog: SessionCatalog,
+    conf: SQLConf,
+    experimentalMethods: ExperimentalMethods)
+  extends Optimizer(catalog, conf) {
+
   override def batches: Seq[Batch] = super.batches :+ Batch(
-    "User Provided Optimizers", FixedPoint(100), experimentalMethods.extraOptimizations: _*)
+    "User Provided Optimizers", fixedPoint, experimentalMethods.extraOptimizations: _*)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/f4be0946/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 20d9a28..e58b717 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -51,6 +51,11 @@ object SQLConf {
 
   }
 
+  val OPTIMIZER_MAX_ITERATIONS = SQLConfigBuilder("spark.sql.optimizer.maxIterations")
+    .doc("The max number of iterations the optimizer and analyzer runs")
+    .intConf
+    .createWithDefault(100)
+
   val ALLOW_MULTIPLE_CONTEXTS = SQLConfigBuilder("spark.sql.allowMultipleContexts")
     .doc("When set to true, creating multiple SQLContexts/HiveContexts is allowed. " +
       "When set to false, only one SQLContext/HiveContext is allowed to be created " +
@@ -473,6 +478,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
 
   /** ************************ Spark SQL Params/Hints ******************* */
 
+  def optimizerMaxIterations: Int = getConf(OPTIMIZER_MAX_ITERATIONS)
+
   def checkpointLocation: String = getConf(CHECKPOINT_LOCATION)
 
   def filesMaxPartitionBytes: Long = getConf(FILES_MAX_PARTITION_BYTES)

http://git-wip-us.apache.org/repos/asf/spark/blob/f4be0946/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index 10497e4..c30f879 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -80,7 +80,7 @@ private[sql] class SessionState(ctx: SQLContext) {
   /**
    * Logical query plan optimizer.
    */
-  lazy val optimizer: Optimizer = new SparkOptimizer(conf, catalog, experimentalMethods)
+  lazy val optimizer: Optimizer = new SparkOptimizer(catalog, conf, experimentalMethods)
 
   /**
    * Parser that extracts expressions, plans, table identifiers etc. from SQL texts.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org