You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/07/04 12:04:37 UTC
spark git commit: [SPARK-24727][SQL] Add a static config to control
cache size for generated classes
Repository: spark
Updated Branches:
refs/heads/master 772060d09 -> b2deef64f
[SPARK-24727][SQL] Add a static config to control cache size for generated classes
## What changes were proposed in this pull request?
Since SPARK-24250 has been resolved, executors correctly references user-defined configurations. So, this pr added a static config to control cache size for generated classes in `CodeGenerator`.
## How was this patch tested?
Added tests in `ExecutorSideSQLConfSuite`.
Author: Takeshi Yamamuro <ya...@apache.org>
Closes #21705 from maropu/SPARK-24727.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b2deef64
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b2deef64
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b2deef64
Branch: refs/heads/master
Commit: b2deef64f604ddd9502a31105ed47cb63470ec85
Parents: 772060d
Author: Takeshi Yamamuro <ya...@apache.org>
Authored: Wed Jul 4 20:04:18 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Wed Jul 4 20:04:18 2018 +0800
----------------------------------------------------------------------
.../expressions/codegen/CodeGenerator.scala | 2 +-
.../org/apache/spark/sql/internal/SQLConf.scala | 2 ++
.../spark/sql/internal/StaticSQLConf.scala | 8 +++++
.../sql/internal/ExecutorSideSQLConfSuite.scala | 31 ++++++++++++++++----
4 files changed, 36 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/b2deef64/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
index 4cc0968..838c045 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
@@ -1415,7 +1415,7 @@ object CodeGenerator extends Logging {
* weak keys/values and thus does not respond to memory pressure.
*/
private val cache = CacheBuilder.newBuilder()
- .maximumSize(100)
+ .maximumSize(SQLConf.get.codegenCacheMaxEntries)
.build(
new CacheLoader[CodeAndComment, (GeneratedClass, Int)]() {
override def load(code: CodeAndComment): (GeneratedClass, Int) = {
http://git-wip-us.apache.org/repos/asf/spark/blob/b2deef64/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index e2c48e2..50965c1 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1508,6 +1508,8 @@ class SQLConf extends Serializable with Logging {
def tableRelationCacheSize: Int =
getConf(StaticSQLConf.FILESOURCE_TABLE_RELATION_CACHE_SIZE)
+ def codegenCacheMaxEntries: Int = getConf(StaticSQLConf.CODEGEN_CACHE_MAX_ENTRIES)
+
def exchangeReuseEnabled: Boolean = getConf(EXCHANGE_REUSE_ENABLED)
def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE)
http://git-wip-us.apache.org/repos/asf/spark/blob/b2deef64/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala
index 382ef28..384b191 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala
@@ -66,6 +66,14 @@ object StaticSQLConf {
.checkValue(cacheSize => cacheSize >= 0, "The maximum size of the cache must not be negative")
.createWithDefault(1000)
+ val CODEGEN_CACHE_MAX_ENTRIES = buildStaticConf("spark.sql.codegen.cache.maxEntries")
+ .internal()
+ .doc("When nonzero, enable caching of generated classes for operators and expressions. " +
+ "All jobs share the cache that can use up to the specified number for generated classes.")
+ .intConf
+ .checkValue(maxEntries => maxEntries >= 0, "The maximum must not be negative")
+ .createWithDefault(100)
+
// When enabling the debug, Spark SQL internal table properties are not filtered out; however,
// some related DDL commands (e.g., ANALYZE TABLE and CREATE TABLE LIKE) might not work properly.
val DEBUG_MODE = buildStaticConf("spark.sql.debug")
http://git-wip-us.apache.org/repos/asf/spark/blob/b2deef64/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala
index 3dd0712..855fe4f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.internal
import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.test.SQLTestUtils
class ExecutorSideSQLConfSuite extends SparkFunSuite with SQLTestUtils {
@@ -40,16 +40,24 @@ class ExecutorSideSQLConfSuite extends SparkFunSuite with SQLTestUtils {
spark = null
}
+ override def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
+ pairs.foreach { case (k, v) =>
+ SQLConf.get.setConfString(k, v)
+ }
+ try f finally {
+ pairs.foreach { case (k, _) =>
+ SQLConf.get.unsetConf(k)
+ }
+ }
+ }
+
test("ReadOnlySQLConf is correctly created at the executor side") {
- SQLConf.get.setConfString("spark.sql.x", "a")
- try {
- val checks = spark.range(10).mapPartitions { it =>
+ withSQLConf("spark.sql.x" -> "a") {
+ val checks = spark.range(10).mapPartitions { _ =>
val conf = SQLConf.get
Iterator(conf.isInstanceOf[ReadOnlySQLConf] && conf.getConfString("spark.sql.x") == "a")
}.collect()
assert(checks.forall(_ == true))
- } finally {
- SQLConf.get.unsetConf("spark.sql.x")
}
}
@@ -63,4 +71,15 @@ class ExecutorSideSQLConfSuite extends SparkFunSuite with SQLTestUtils {
}
}
}
+
+ test("SPARK-24727 CODEGEN_CACHE_MAX_ENTRIES is correctly referenced at the executor side") {
+ withSQLConf(StaticSQLConf.CODEGEN_CACHE_MAX_ENTRIES.key -> "300") {
+ val checks = spark.range(10).mapPartitions { _ =>
+ val conf = SQLConf.get
+ Iterator(conf.isInstanceOf[ReadOnlySQLConf] &&
+ conf.getConfString(StaticSQLConf.CODEGEN_CACHE_MAX_ENTRIES.key) == "300")
+ }.collect()
+ assert(checks.forall(_ == true))
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org