You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ya...@apache.org on 2020/04/24 23:54:55 UTC

[spark] branch branch-3.0 updated: [SPARK-31532][SQL] Builder should not propagate static sql configs to the existing active or default SparkSession

This is an automated email from the ASF dual-hosted git repository.

yamamuro pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 0dbd69c  [SPARK-31532][SQL] Builder should not propagate static sql configs to the existing active or default SparkSession
0dbd69c is described below

commit 0dbd69c61492f537bc0326d6ad86b616577f46df
Author: Kent Yao <ya...@hotmail.com>
AuthorDate: Sat Apr 25 08:53:00 2020 +0900

    [SPARK-31532][SQL] Builder should not propagate static sql configs to the existing active or default SparkSession
    
    ### What changes were proposed in this pull request?
    
    SparkSessionBuilder shoud not propagate static sql configurations to the existing active/default SparkSession
    This seems a long-standing bug.
    
    ```scala
    scala> spark.sql("set spark.sql.warehouse.dir").show
    +--------------------+--------------------+
    |                 key|               value|
    +--------------------+--------------------+
    |spark.sql.warehou...|file:/Users/kenty...|
    +--------------------+--------------------+
    
    scala> spark.sql("set spark.sql.warehouse.dir=2");
    org.apache.spark.sql.AnalysisException: Cannot modify the value of a static config: spark.sql.warehouse.dir;
      at org.apache.spark.sql.RuntimeConfig.requireNonStaticConf(RuntimeConfig.scala:154)
      at org.apache.spark.sql.RuntimeConfig.set(RuntimeConfig.scala:42)
      at org.apache.spark.sql.execution.command.SetCommand.$anonfun$x$7$6(SetCommand.scala:100)
      at org.apache.spark.sql.execution.command.SetCommand.run(SetCommand.scala:156)
      at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
      at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
      at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
      at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229)
      at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3644)
      at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
      at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
      at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
      at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
      at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
      at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3642)
      at org.apache.spark.sql.Dataset.<init>(Dataset.scala:229)
      at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
      at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
      at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
      at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:607)
      at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
      at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:602)
      ... 47 elided
    
    scala> import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.SparkSession
    
    scala> SparkSession.builder.config("spark.sql.warehouse.dir", "xyz").get
    getClass   getOrCreate
    
    scala> SparkSession.builder.config("spark.sql.warehouse.dir", "xyz").getOrCreate
    20/04/23 23:49:13 WARN SparkSession$Builder: Using an existing SparkSession; some configuration may not take effect.
    res7: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession6403d574
    
    scala> spark.sql("set spark.sql.warehouse.dir").show
    +--------------------+-----+
    |                 key|value|
    +--------------------+-----+
    |spark.sql.warehou...|  xyz|
    +--------------------+-----+
    
    scala>
    OptionsAttachments
    ```
    
    ### Why are the changes needed?
    bugfix as shown in the previous section
    
    ### Does this PR introduce any user-facing change?
    
    Yes, static SQL configurations with SparkSession.builder.config do not propagate to any existing or new SparkSession instances.
    
    ### How was this patch tested?
    
    new ut.
    
    Closes #28316 from yaooqinn/SPARK-31532.
    
    Authored-by: Kent Yao <ya...@hotmail.com>
    Signed-off-by: Takeshi Yamamuro <ya...@apache.org>
    (cherry picked from commit 8424f552293677717da7411ed43e68e73aa7f0d6)
    Signed-off-by: Takeshi Yamamuro <ya...@apache.org>
---
 .../scala/org/apache/spark/sql/SparkSession.scala  | 28 +++++++++----
 .../spark/sql/SparkSessionBuilderSuite.scala       | 49 +++++++++++++++++++++-
 2 files changed, 67 insertions(+), 10 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 731aae8..be597ed 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -895,7 +895,7 @@ object SparkSession extends Logging {
      * SparkSession exists, the method creates a new SparkSession and assigns the
      * newly created SparkSession as the global default.
      *
-     * In case an existing SparkSession is returned, the config options specified in
+     * In case an existing SparkSession is returned, the non-static config options specified in
      * this builder will be applied to the existing SparkSession.
      *
      * @since 2.0.0
@@ -905,10 +905,7 @@ object SparkSession extends Logging {
       // Get the session from current thread's active session.
       var session = activeThreadSession.get()
       if ((session ne null) && !session.sparkContext.isStopped) {
-        options.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) }
-        if (options.nonEmpty) {
-          logWarning("Using an existing SparkSession; some configuration may not take effect.")
-        }
+        applyModifiableSettings(session)
         return session
       }
 
@@ -917,10 +914,7 @@ object SparkSession extends Logging {
         // If the current thread does not have an active session, get it from the global session.
         session = defaultSession.get()
         if ((session ne null) && !session.sparkContext.isStopped) {
-          options.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) }
-          if (options.nonEmpty) {
-            logWarning("Using an existing SparkSession; some configuration may not take effect.")
-          }
+          applyModifiableSettings(session)
           return session
         }
 
@@ -959,6 +953,22 @@ object SparkSession extends Logging {
 
       return session
     }
+
+    private def applyModifiableSettings(session: SparkSession): Unit = {
+      val (staticConfs, otherConfs) =
+        options.partition(kv => SQLConf.staticConfKeys.contains(kv._1))
+
+      otherConfs.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) }
+
+      if (staticConfs.nonEmpty) {
+        logWarning("Using an existing SparkSession; the static sql configurations will not take" +
+          " effect.")
+      }
+      if (otherConfs.nonEmpty) {
+        logWarning("Using an existing SparkSession; some spark core configurations may not take" +
+          " effect.")
+      }
+    }
   }
 
   /**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala
index f238641..7b76d07 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala
@@ -22,7 +22,7 @@ import org.scalatest.BeforeAndAfterEach
 import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
 import org.apache.spark.internal.config.UI.UI_ENABLED
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.internal.StaticSQLConf.GLOBAL_TEMP_DATABASE
+import org.apache.spark.sql.internal.StaticSQLConf._
 
 /**
  * Test cases for the builder pattern of [[SparkSession]].
@@ -168,4 +168,51 @@ class SparkSessionBuilderSuite extends SparkFunSuite with BeforeAndAfterEach {
     assert(session.sessionState.conf.getConfString("spark.app.name") === "test-app-SPARK-31234")
     assert(session.sessionState.conf.getConf(GLOBAL_TEMP_DATABASE) === "globaltempdb-spark-31234")
   }
+
+  test("SPARK-31532: should not propagate static sql configs to the existing" +
+    " active/default SparkSession") {
+    val session = SparkSession.builder()
+      .master("local")
+      .config(GLOBAL_TEMP_DATABASE.key, value = "globalTempDB-SPARK-31532")
+      .config("spark.app.name", "test-app-SPARK-31532")
+      .getOrCreate()
+    // do not propagate static sql configs to the existing active session
+    val session1 = SparkSession
+      .builder()
+      .config(GLOBAL_TEMP_DATABASE.key, "globalTempDB-SPARK-31532-1")
+      .getOrCreate()
+    assert(session.conf.get(GLOBAL_TEMP_DATABASE) === "globaltempdb-spark-31532")
+    assert(session1.conf.get(GLOBAL_TEMP_DATABASE) === "globaltempdb-spark-31532")
+
+    // do not propagate static sql configs to the existing default session
+    SparkSession.clearActiveSession()
+    val session2 = SparkSession
+      .builder()
+      .config(WAREHOUSE_PATH.key, "SPARK-31532-db")
+      .config(GLOBAL_TEMP_DATABASE.key, value = "globalTempDB-SPARK-31532-2")
+      .getOrCreate()
+
+    assert(!session.conf.get(WAREHOUSE_PATH).contains("SPARK-31532-db"))
+    assert(session.conf.get(WAREHOUSE_PATH) === session2.conf.get(WAREHOUSE_PATH))
+    assert(session2.conf.get(GLOBAL_TEMP_DATABASE) === "globaltempdb-spark-31532")
+  }
+
+  test("SPARK-31532: propagate static sql configs if no existing SparkSession") {
+    val conf = new SparkConf()
+      .setMaster("local")
+      .setAppName("test-app-SPARK-31532-2")
+      .set(GLOBAL_TEMP_DATABASE.key, "globaltempdb-spark-31532")
+      .set(WAREHOUSE_PATH.key, "SPARK-31532-db")
+    SparkContext.getOrCreate(conf)
+
+    // propagate static sql configs if no existing session
+    val session = SparkSession
+      .builder()
+      .config(GLOBAL_TEMP_DATABASE.key, "globalTempDB-SPARK-31532-2")
+      .config(WAREHOUSE_PATH.key, "SPARK-31532-db-2")
+      .getOrCreate()
+    assert(session.conf.get("spark.app.name") === "test-app-SPARK-31532-2")
+    assert(session.conf.get(GLOBAL_TEMP_DATABASE) === "globaltempdb-spark-31532-2")
+    assert(session.conf.get(WAREHOUSE_PATH) === "SPARK-31532-db-2")
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org