You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2017/02/09 05:49:01 UTC
spark git commit: [SPARK-19265][SQL][FOLLOW-UP] Configurable
`tableRelationCache` maximum size
Repository: spark
Updated Branches:
refs/heads/master 50a991264 -> 9d9d67c79
[SPARK-19265][SQL][FOLLOW-UP] Configurable `tableRelationCache` maximum size
## What changes were proposed in this pull request?
SPARK-19265 had made table relation cache general; this follow-up aims to make `tableRelationCache`'s maximum size configurable.
In order to do sanity-check, this patch also adds a `checkValue()` method to `TypedConfigBuilder`.
## How was this patch tested?
new test case: `test("conf entry: checkValue()")`
Author: Liwei Lin <lw...@gmail.com>
Closes #16736 from lw-lin/conf.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9d9d67c7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9d9d67c7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9d9d67c7
Branch: refs/heads/master
Commit: 9d9d67c7957f7cbbdbe889bdbc073568b2bfbb16
Parents: 50a9912
Author: Liwei Lin <lw...@gmail.com>
Authored: Thu Feb 9 00:48:47 2017 -0500
Committer: gatorsmile <ga...@gmail.com>
Committed: Thu Feb 9 00:48:47 2017 -0500
----------------------------------------------------------------------
.../spark/internal/config/ConfigBuilder.scala | 8 +++++++
.../internal/config/ConfigEntrySuite.scala | 22 ++++++++++++++++++++
.../spark/sql/catalyst/CatalystConf.scala | 3 +++
.../sql/catalyst/catalog/SessionCatalog.scala | 6 +++---
.../org/apache/spark/sql/internal/SQLConf.scala | 11 ++++++++++
.../spark/sql/internal/SQLConfEntrySuite.scala | 16 ++++++++++++++
6 files changed, 63 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/9d9d67c7/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala b/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala
index 0f5c8a9..a177e66 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala
@@ -90,6 +90,14 @@ private[spark] class TypedConfigBuilder[T](
new TypedConfigBuilder(parent, s => fn(converter(s)), stringConverter)
}
+ /** Checks if the user-provided value for the config matches the validator. */
+ def checkValue(validator: T => Boolean, errorMsg: String): TypedConfigBuilder[T] = {
+ transform { v =>
+ if (!validator(v)) throw new IllegalArgumentException(errorMsg)
+ v
+ }
+ }
+
/** Check that user-provided values for the config match a pre-defined set. */
def checkValues(validValues: Set[T]): TypedConfigBuilder[T] = {
transform { v =>
http://git-wip-us.apache.org/repos/asf/spark/blob/9d9d67c7/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala b/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala
index 91a96bd..71eed46 100644
--- a/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala
+++ b/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala
@@ -128,6 +128,28 @@ class ConfigEntrySuite extends SparkFunSuite {
assert(conf.get(transformationConf) === "bar")
}
+ test("conf entry: checkValue()") {
+ def createEntry(default: Int): ConfigEntry[Int] =
+ ConfigBuilder(testKey("checkValue"))
+ .intConf
+ .checkValue(value => value >= 0, "value must be non-negative")
+ .createWithDefault(default)
+
+ val conf = new SparkConf()
+
+ val entry = createEntry(10)
+ conf.set(entry, -1)
+ val e1 = intercept[IllegalArgumentException] {
+ conf.get(entry)
+ }
+ assert(e1.getMessage == "value must be non-negative")
+
+ val e2 = intercept[IllegalArgumentException] {
+ createEntry(-1)
+ }
+ assert(e2.getMessage == "value must be non-negative")
+ }
+
test("conf entry: valid values check") {
val conf = new SparkConf()
val enum = ConfigBuilder(testKey("enum"))
http://git-wip-us.apache.org/repos/asf/spark/blob/9d9d67c7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala
index 0b6fa56..5f50ce1 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala
@@ -34,6 +34,8 @@ trait CatalystConf {
def optimizerInSetConversionThreshold: Int
def maxCaseBranchesForCodegen: Int
+ def tableRelationCacheSize: Int
+
def runSQLonFile: Boolean
def warehousePath: String
@@ -69,6 +71,7 @@ case class SimpleCatalystConf(
optimizerMaxIterations: Int = 100,
optimizerInSetConversionThreshold: Int = 10,
maxCaseBranchesForCodegen: Int = 20,
+ tableRelationCacheSize: Int = 1000,
runSQLonFile: Boolean = true,
crossJoinEnabled: Boolean = false,
cboEnabled: Boolean = false,
http://git-wip-us.apache.org/repos/asf/spark/blob/9d9d67c7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index e9543f7..dd0c5cb 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -118,11 +118,11 @@ class SessionCatalog(
}
/**
- * A cache of qualified table name to table relation plan.
+ * A cache of qualified table names to table relation plans.
*/
val tableRelationCache: Cache[QualifiedTableName, LogicalPlan] = {
- // TODO: create a config instead of hardcode 1000 here.
- CacheBuilder.newBuilder().maximumSize(1000).build[QualifiedTableName, LogicalPlan]()
+ val cacheSize = conf.tableRelationCacheSize
+ CacheBuilder.newBuilder().maximumSize(cacheSize).build[QualifiedTableName, LogicalPlan]()
}
/**
http://git-wip-us.apache.org/repos/asf/spark/blob/9d9d67c7/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 8c77da1..dc0f130 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -786,6 +786,9 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
def maxCaseBranchesForCodegen: Int = getConf(MAX_CASES_BRANCHES)
+ def tableRelationCacheSize: Int =
+ getConf(StaticSQLConf.FILESOURCE_TABLE_RELATION_CACHE_SIZE)
+
def exchangeReuseEnabled: Boolean = getConf(EXCHANGE_REUSE_ENABLED)
def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE)
@@ -1034,6 +1037,14 @@ object StaticSQLConf {
.intConf
.createWithDefault(4000)
+ val FILESOURCE_TABLE_RELATION_CACHE_SIZE =
+ buildStaticConf("spark.sql.filesourceTableRelationCacheSize")
+ .internal()
+ .doc("The maximum size of the cache that maps qualified table names to table relation plans.")
+ .intConf
+ .checkValue(cacheSize => cacheSize >= 0, "The maximum size of the cache must not be negative")
+ .createWithDefault(1000)
+
// When enabling the debug, Spark SQL internal table properties are not filtered out; however,
// some related DDL commands (e.g., ANALYZE TABLE and CREATE TABLE LIKE) might not work properly.
val DEBUG_MODE = buildStaticConf("spark.sql.debug")
http://git-wip-us.apache.org/repos/asf/spark/blob/9d9d67c7/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfEntrySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfEntrySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfEntrySuite.scala
index 6c12f0f..0e3a5ca 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfEntrySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfEntrySuite.scala
@@ -171,4 +171,20 @@ class SQLConfEntrySuite extends SparkFunSuite {
buildConf(key).stringConf.createOptional
}
}
+
+ test("StaticSQLConf.FILESOURCE_TABLE_RELATION_CACHE_SIZE") {
+ val confEntry = StaticSQLConf.FILESOURCE_TABLE_RELATION_CACHE_SIZE
+ assert(conf.getConf(confEntry) === 1000)
+
+ conf.setConf(confEntry, -1)
+ val e1 = intercept[IllegalArgumentException] {
+ conf.getConf(confEntry)
+ }
+ assert(e1.getMessage === "The maximum size of the cache must not be negative")
+
+ val e2 = intercept[IllegalArgumentException] {
+ conf.setConfString(confEntry.key, "-1")
+ }
+ assert(e2.getMessage === "The maximum size of the cache must not be negative")
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org