You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2017/02/09 05:49:01 UTC

spark git commit: [SPARK-19265][SQL][FOLLOW-UP] Configurable `tableRelationCache` maximum size

Repository: spark
Updated Branches:
  refs/heads/master 50a991264 -> 9d9d67c79


[SPARK-19265][SQL][FOLLOW-UP] Configurable `tableRelationCache` maximum size

## What changes were proposed in this pull request?

SPARK-19265 had made table relation cache general; this follow-up aims to make `tableRelationCache`'s maximum size configurable.

In order to do sanity-check, this patch also adds a `checkValue()` method to `TypedConfigBuilder`.

## How was this patch tested?

new test case: `test("conf entry: checkValue()")`

Author: Liwei Lin <lw...@gmail.com>

Closes #16736 from lw-lin/conf.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9d9d67c7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9d9d67c7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9d9d67c7

Branch: refs/heads/master
Commit: 9d9d67c7957f7cbbdbe889bdbc073568b2bfbb16
Parents: 50a9912
Author: Liwei Lin <lw...@gmail.com>
Authored: Thu Feb 9 00:48:47 2017 -0500
Committer: gatorsmile <ga...@gmail.com>
Committed: Thu Feb 9 00:48:47 2017 -0500

----------------------------------------------------------------------
 .../spark/internal/config/ConfigBuilder.scala   |  8 +++++++
 .../internal/config/ConfigEntrySuite.scala      | 22 ++++++++++++++++++++
 .../spark/sql/catalyst/CatalystConf.scala       |  3 +++
 .../sql/catalyst/catalog/SessionCatalog.scala   |  6 +++---
 .../org/apache/spark/sql/internal/SQLConf.scala | 11 ++++++++++
 .../spark/sql/internal/SQLConfEntrySuite.scala  | 16 ++++++++++++++
 6 files changed, 63 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9d9d67c7/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala b/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala
index 0f5c8a9..a177e66 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala
@@ -90,6 +90,14 @@ private[spark] class TypedConfigBuilder[T](
     new TypedConfigBuilder(parent, s => fn(converter(s)), stringConverter)
   }
 
+  /** Checks if the user-provided value for the config matches the validator. */
+  def checkValue(validator: T => Boolean, errorMsg: String): TypedConfigBuilder[T] = {
+    transform { v =>
+      if (!validator(v)) throw new IllegalArgumentException(errorMsg)
+      v
+    }
+  }
+
   /** Check that user-provided values for the config match a pre-defined set. */
   def checkValues(validValues: Set[T]): TypedConfigBuilder[T] = {
     transform { v =>

http://git-wip-us.apache.org/repos/asf/spark/blob/9d9d67c7/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala b/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala
index 91a96bd..71eed46 100644
--- a/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala
+++ b/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala
@@ -128,6 +128,28 @@ class ConfigEntrySuite extends SparkFunSuite {
     assert(conf.get(transformationConf) === "bar")
   }
 
+  test("conf entry: checkValue()") {
+    def createEntry(default: Int): ConfigEntry[Int] =
+      ConfigBuilder(testKey("checkValue"))
+        .intConf
+        .checkValue(value => value >= 0, "value must be non-negative")
+        .createWithDefault(default)
+
+    val conf = new SparkConf()
+
+    val entry = createEntry(10)
+    conf.set(entry, -1)
+    val e1 = intercept[IllegalArgumentException] {
+      conf.get(entry)
+    }
+    assert(e1.getMessage == "value must be non-negative")
+
+    val e2 = intercept[IllegalArgumentException] {
+      createEntry(-1)
+    }
+    assert(e2.getMessage == "value must be non-negative")
+  }
+
   test("conf entry: valid values check") {
     val conf = new SparkConf()
     val enum = ConfigBuilder(testKey("enum"))

http://git-wip-us.apache.org/repos/asf/spark/blob/9d9d67c7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala
index 0b6fa56..5f50ce1 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala
@@ -34,6 +34,8 @@ trait CatalystConf {
   def optimizerInSetConversionThreshold: Int
   def maxCaseBranchesForCodegen: Int
 
+  def tableRelationCacheSize: Int
+
   def runSQLonFile: Boolean
 
   def warehousePath: String
@@ -69,6 +71,7 @@ case class SimpleCatalystConf(
     optimizerMaxIterations: Int = 100,
     optimizerInSetConversionThreshold: Int = 10,
     maxCaseBranchesForCodegen: Int = 20,
+    tableRelationCacheSize: Int = 1000,
     runSQLonFile: Boolean = true,
     crossJoinEnabled: Boolean = false,
     cboEnabled: Boolean = false,

http://git-wip-us.apache.org/repos/asf/spark/blob/9d9d67c7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index e9543f7..dd0c5cb 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -118,11 +118,11 @@ class SessionCatalog(
   }
 
   /**
-   * A cache of qualified table name to table relation plan.
+   * A cache of qualified table names to table relation plans.
    */
   val tableRelationCache: Cache[QualifiedTableName, LogicalPlan] = {
-    // TODO: create a config instead of hardcode 1000 here.
-    CacheBuilder.newBuilder().maximumSize(1000).build[QualifiedTableName, LogicalPlan]()
+    val cacheSize = conf.tableRelationCacheSize
+    CacheBuilder.newBuilder().maximumSize(cacheSize).build[QualifiedTableName, LogicalPlan]()
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/9d9d67c7/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 8c77da1..dc0f130 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -786,6 +786,9 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
 
   def maxCaseBranchesForCodegen: Int = getConf(MAX_CASES_BRANCHES)
 
+  def tableRelationCacheSize: Int =
+    getConf(StaticSQLConf.FILESOURCE_TABLE_RELATION_CACHE_SIZE)
+
   def exchangeReuseEnabled: Boolean = getConf(EXCHANGE_REUSE_ENABLED)
 
   def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE)
@@ -1034,6 +1037,14 @@ object StaticSQLConf {
       .intConf
       .createWithDefault(4000)
 
+  val FILESOURCE_TABLE_RELATION_CACHE_SIZE =
+    buildStaticConf("spark.sql.filesourceTableRelationCacheSize")
+      .internal()
+      .doc("The maximum size of the cache that maps qualified table names to table relation plans.")
+      .intConf
+      .checkValue(cacheSize => cacheSize >= 0, "The maximum size of the cache must not be negative")
+      .createWithDefault(1000)
+
   // When enabling the debug, Spark SQL internal table properties are not filtered out; however,
   // some related DDL commands (e.g., ANALYZE TABLE and CREATE TABLE LIKE) might not work properly.
   val DEBUG_MODE = buildStaticConf("spark.sql.debug")

http://git-wip-us.apache.org/repos/asf/spark/blob/9d9d67c7/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfEntrySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfEntrySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfEntrySuite.scala
index 6c12f0f..0e3a5ca 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfEntrySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfEntrySuite.scala
@@ -171,4 +171,20 @@ class SQLConfEntrySuite extends SparkFunSuite {
       buildConf(key).stringConf.createOptional
     }
   }
+
+  test("StaticSQLConf.FILESOURCE_TABLE_RELATION_CACHE_SIZE") {
+    val confEntry = StaticSQLConf.FILESOURCE_TABLE_RELATION_CACHE_SIZE
+    assert(conf.getConf(confEntry) === 1000)
+
+    conf.setConf(confEntry, -1)
+    val e1 = intercept[IllegalArgumentException] {
+      conf.getConf(confEntry)
+    }
+    assert(e1.getMessage === "The maximum size of the cache must not be negative")
+
+    val e2 = intercept[IllegalArgumentException] {
+      conf.setConfString(confEntry.key, "-1")
+    }
+    assert(e2.getMessage === "The maximum size of the cache must not be negative")
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org