You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ya...@apache.org on 2020/04/24 23:54:55 UTC
[spark] branch branch-3.0 updated: [SPARK-31532][SQL] Builder
should not propagate static sql configs to the existing active or default
SparkSession
This is an automated email from the ASF dual-hosted git repository.
yamamuro pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 0dbd69c [SPARK-31532][SQL] Builder should not propagate static sql configs to the existing active or default SparkSession
0dbd69c is described below
commit 0dbd69c61492f537bc0326d6ad86b616577f46df
Author: Kent Yao <ya...@hotmail.com>
AuthorDate: Sat Apr 25 08:53:00 2020 +0900
[SPARK-31532][SQL] Builder should not propagate static sql configs to the existing active or default SparkSession
### What changes were proposed in this pull request?
SparkSessionBuilder shoud not propagate static sql configurations to the existing active/default SparkSession
This seems a long-standing bug.
```scala
scala> spark.sql("set spark.sql.warehouse.dir").show
+--------------------+--------------------+
| key| value|
+--------------------+--------------------+
|spark.sql.warehou...|file:/Users/kenty...|
+--------------------+--------------------+
scala> spark.sql("set spark.sql.warehouse.dir=2");
org.apache.spark.sql.AnalysisException: Cannot modify the value of a static config: spark.sql.warehouse.dir;
at org.apache.spark.sql.RuntimeConfig.requireNonStaticConf(RuntimeConfig.scala:154)
at org.apache.spark.sql.RuntimeConfig.set(RuntimeConfig.scala:42)
at org.apache.spark.sql.execution.command.SetCommand.$anonfun$x$7$6(SetCommand.scala:100)
at org.apache.spark.sql.execution.command.SetCommand.run(SetCommand.scala:156)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229)
at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3644)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3642)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:229)
at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:607)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:602)
... 47 elided
scala> import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession
scala> SparkSession.builder.config("spark.sql.warehouse.dir", "xyz").get
getClass getOrCreate
scala> SparkSession.builder.config("spark.sql.warehouse.dir", "xyz").getOrCreate
20/04/23 23:49:13 WARN SparkSession$Builder: Using an existing SparkSession; some configuration may not take effect.
res7: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession6403d574
scala> spark.sql("set spark.sql.warehouse.dir").show
+--------------------+-----+
| key|value|
+--------------------+-----+
|spark.sql.warehou...| xyz|
+--------------------+-----+
scala>
OptionsAttachments
```
### Why are the changes needed?
bugfix as shown in the previous section
### Does this PR introduce any user-facing change?
Yes, static SQL configurations with SparkSession.builder.config do not propagate to any existing or new SparkSession instances.
### How was this patch tested?
new ut.
Closes #28316 from yaooqinn/SPARK-31532.
Authored-by: Kent Yao <ya...@hotmail.com>
Signed-off-by: Takeshi Yamamuro <ya...@apache.org>
(cherry picked from commit 8424f552293677717da7411ed43e68e73aa7f0d6)
Signed-off-by: Takeshi Yamamuro <ya...@apache.org>
---
.../scala/org/apache/spark/sql/SparkSession.scala | 28 +++++++++----
.../spark/sql/SparkSessionBuilderSuite.scala | 49 +++++++++++++++++++++-
2 files changed, 67 insertions(+), 10 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 731aae8..be597ed 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -895,7 +895,7 @@ object SparkSession extends Logging {
* SparkSession exists, the method creates a new SparkSession and assigns the
* newly created SparkSession as the global default.
*
- * In case an existing SparkSession is returned, the config options specified in
+ * In case an existing SparkSession is returned, the non-static config options specified in
* this builder will be applied to the existing SparkSession.
*
* @since 2.0.0
@@ -905,10 +905,7 @@ object SparkSession extends Logging {
// Get the session from current thread's active session.
var session = activeThreadSession.get()
if ((session ne null) && !session.sparkContext.isStopped) {
- options.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) }
- if (options.nonEmpty) {
- logWarning("Using an existing SparkSession; some configuration may not take effect.")
- }
+ applyModifiableSettings(session)
return session
}
@@ -917,10 +914,7 @@ object SparkSession extends Logging {
// If the current thread does not have an active session, get it from the global session.
session = defaultSession.get()
if ((session ne null) && !session.sparkContext.isStopped) {
- options.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) }
- if (options.nonEmpty) {
- logWarning("Using an existing SparkSession; some configuration may not take effect.")
- }
+ applyModifiableSettings(session)
return session
}
@@ -959,6 +953,22 @@ object SparkSession extends Logging {
return session
}
+
+ private def applyModifiableSettings(session: SparkSession): Unit = {
+ val (staticConfs, otherConfs) =
+ options.partition(kv => SQLConf.staticConfKeys.contains(kv._1))
+
+ otherConfs.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) }
+
+ if (staticConfs.nonEmpty) {
+ logWarning("Using an existing SparkSession; the static sql configurations will not take" +
+ " effect.")
+ }
+ if (otherConfs.nonEmpty) {
+ logWarning("Using an existing SparkSession; some spark core configurations may not take" +
+ " effect.")
+ }
+ }
}
/**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala
index f238641..7b76d07 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala
@@ -22,7 +22,7 @@ import org.scalatest.BeforeAndAfterEach
import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark.internal.config.UI.UI_ENABLED
import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.internal.StaticSQLConf.GLOBAL_TEMP_DATABASE
+import org.apache.spark.sql.internal.StaticSQLConf._
/**
* Test cases for the builder pattern of [[SparkSession]].
@@ -168,4 +168,51 @@ class SparkSessionBuilderSuite extends SparkFunSuite with BeforeAndAfterEach {
assert(session.sessionState.conf.getConfString("spark.app.name") === "test-app-SPARK-31234")
assert(session.sessionState.conf.getConf(GLOBAL_TEMP_DATABASE) === "globaltempdb-spark-31234")
}
+
+ test("SPARK-31532: should not propagate static sql configs to the existing" +
+ " active/default SparkSession") {
+ val session = SparkSession.builder()
+ .master("local")
+ .config(GLOBAL_TEMP_DATABASE.key, value = "globalTempDB-SPARK-31532")
+ .config("spark.app.name", "test-app-SPARK-31532")
+ .getOrCreate()
+ // do not propagate static sql configs to the existing active session
+ val session1 = SparkSession
+ .builder()
+ .config(GLOBAL_TEMP_DATABASE.key, "globalTempDB-SPARK-31532-1")
+ .getOrCreate()
+ assert(session.conf.get(GLOBAL_TEMP_DATABASE) === "globaltempdb-spark-31532")
+ assert(session1.conf.get(GLOBAL_TEMP_DATABASE) === "globaltempdb-spark-31532")
+
+ // do not propagate static sql configs to the existing default session
+ SparkSession.clearActiveSession()
+ val session2 = SparkSession
+ .builder()
+ .config(WAREHOUSE_PATH.key, "SPARK-31532-db")
+ .config(GLOBAL_TEMP_DATABASE.key, value = "globalTempDB-SPARK-31532-2")
+ .getOrCreate()
+
+ assert(!session.conf.get(WAREHOUSE_PATH).contains("SPARK-31532-db"))
+ assert(session.conf.get(WAREHOUSE_PATH) === session2.conf.get(WAREHOUSE_PATH))
+ assert(session2.conf.get(GLOBAL_TEMP_DATABASE) === "globaltempdb-spark-31532")
+ }
+
+ test("SPARK-31532: propagate static sql configs if no existing SparkSession") {
+ val conf = new SparkConf()
+ .setMaster("local")
+ .setAppName("test-app-SPARK-31532-2")
+ .set(GLOBAL_TEMP_DATABASE.key, "globaltempdb-spark-31532")
+ .set(WAREHOUSE_PATH.key, "SPARK-31532-db")
+ SparkContext.getOrCreate(conf)
+
+ // propagate static sql configs if no existing session
+ val session = SparkSession
+ .builder()
+ .config(GLOBAL_TEMP_DATABASE.key, "globalTempDB-SPARK-31532-2")
+ .config(WAREHOUSE_PATH.key, "SPARK-31532-db-2")
+ .getOrCreate()
+ assert(session.conf.get("spark.app.name") === "test-app-SPARK-31532-2")
+ assert(session.conf.get(GLOBAL_TEMP_DATABASE) === "globaltempdb-spark-31532-2")
+ assert(session.conf.get(WAREHOUSE_PATH) === "SPARK-31532-db-2")
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org