You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/03/31 10:52:02 UTC

[spark] branch master updated: [SPARK-38419][SQL] Change spark.sessionstate.conf.getConf/setConf operation to spark.conf.get/set

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 454fe9c  [SPARK-38419][SQL] Change spark.sessionstate.conf.getConf/setConf operation to spark.conf.get/set
454fe9c is described below

commit 454fe9c8eb1903e1f199e4d5e13b171e519f793e
Author: jackylee-ch <li...@baidu.com>
AuthorDate: Thu Mar 31 18:50:32 2022 +0800

    [SPARK-38419][SQL] Change spark.sessionstate.conf.getConf/setConf operation to spark.conf.get/set
    
    ### Why are the changes needed?
    In the sql module, we provide `SparkSession.conf` as a unified entry for `SQLConf.set/get`, which can prevent users or logic from modifying StaticSQLConf and Spark configs. However, I found `SparkSession.sessionstate.conf` is used in some code to getConf or setConf, which can skip the check of `RuntimeConfig`.
    In this PR, we want to unify the behavior of `SQLConf.getConf/setConf` to `SparkSession.conf`.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Origin GA
    
    Closes #35950 from jackylee-ch/change_spark_sessionstate_conf_operation_to_spark_conf.
    
    Lead-authored-by: jackylee-ch <li...@baidu.com>
    Co-authored-by: Jacky Lee <qc...@gmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../src/main/scala/org/apache/spark/sql/Dataset.scala    |  4 ++--
 .../main/scala/org/apache/spark/sql/RuntimeConfig.scala  |  8 ++++++++
 .../main/scala/org/apache/spark/sql/SparkSession.scala   |  4 ++--
 .../org/apache/spark/sql/execution/CacheManager.scala    |  2 +-
 .../org/apache/spark/sql/execution/command/views.scala   |  2 +-
 .../spark/sql/execution/datasources/DataSource.scala     |  3 +--
 .../org/apache/spark/sql/execution/datasources/ddl.scala |  2 +-
 .../spark/sql/execution/streaming/StreamExecution.scala  |  2 +-
 .../spark/sql/streaming/StreamingQueryManager.scala      |  2 +-
 .../org/apache/spark/sql/FileBasedDataSourceSuite.scala  |  2 +-
 .../test/scala/org/apache/spark/sql/SQLQuerySuite.scala  |  6 +++---
 .../org/apache/spark/sql/SparkSessionBuilderSuite.scala  |  8 ++++----
 .../apache/spark/sql/SparkSessionExtensionSuite.scala    |  4 ++--
 .../spark/sql/connector/DataSourceV2SQLSuite.scala       |  8 ++++----
 .../state/RocksDBStateStoreIntegrationSuite.scala        |  2 +-
 .../spark/sql/expressions/ExpressionInfoSuite.scala      |  2 +-
 .../org/apache/spark/sql/internal/SQLConfSuite.scala     | 16 ++++++++--------
 .../org/apache/spark/sql/sources/BucketedReadSuite.scala |  2 +-
 .../apache/spark/sql/streaming/FileStreamSinkSuite.scala |  2 +-
 .../spark/sql/streaming/FileStreamSourceSuite.scala      |  4 ++--
 20 files changed, 46 insertions(+), 39 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 62dea96..5e5de1f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -218,7 +218,7 @@ class Dataset[T] private[sql](
 
   @transient private[sql] val logicalPlan: LogicalPlan = {
     val plan = queryExecution.commandExecuted
-    if (sparkSession.sessionState.conf.getConf(SQLConf.FAIL_AMBIGUOUS_SELF_JOIN_ENABLED)) {
+    if (sparkSession.conf.get(SQLConf.FAIL_AMBIGUOUS_SELF_JOIN_ENABLED)) {
       val dsIds = plan.getTagValue(Dataset.DATASET_ID_TAG).getOrElse(new HashSet[Long])
       dsIds.add(id)
       plan.setTagValue(Dataset.DATASET_ID_TAG, dsIds)
@@ -1426,7 +1426,7 @@ class Dataset[T] private[sql](
   private def addDataFrameIdToCol(expr: NamedExpression): NamedExpression = {
     val newExpr = expr transform {
       case a: AttributeReference
-        if sparkSession.sessionState.conf.getConf(SQLConf.FAIL_AMBIGUOUS_SELF_JOIN_ENABLED) =>
+        if sparkSession.conf.get(SQLConf.FAIL_AMBIGUOUS_SELF_JOIN_ENABLED) =>
         val metadata = new MetadataBuilder()
           .withMetadata(a.metadata)
           .putLong(Dataset.DATASET_ID_KEY, id)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala b/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala
index 6c9b150..a3237cf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala
@@ -61,6 +61,14 @@ class RuntimeConfig private[sql](sqlConf: SQLConf = new SQLConf) {
   }
 
   /**
+   * Sets the given Spark runtime configuration property.
+   */
+  protected[sql] def set[T](entry: ConfigEntry[T], value: T): Unit = {
+    requireNonStaticConf(entry.key)
+    sqlConf.setConf(entry, value)
+  }
+
+  /**
    * Returns the value of Spark runtime configuration property for the given key.
    *
    * @throws java.util.NoSuchElementException if the key is not set and does not have a default
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 734b8e5..5b212c3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -1104,13 +1104,13 @@ object SparkSession extends Logging {
   private[sql] def getOrCloneSessionWithConfigsOff(
       session: SparkSession,
       configurations: Seq[ConfigEntry[Boolean]]): SparkSession = {
-    val configsEnabled = configurations.filter(session.sessionState.conf.getConf(_))
+    val configsEnabled = configurations.filter(session.conf.get[Boolean])
     if (configsEnabled.isEmpty) {
       session
     } else {
       val newSession = session.cloneSession()
       configsEnabled.foreach(conf => {
-        newSession.sessionState.conf.setConf(conf, false)
+        newSession.conf.set(conf, false)
       })
       newSession
     }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index 27d6bed..527f78e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -332,7 +332,7 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper {
    * If CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING is enabled, just return original session.
    */
   private def getOrCloneSessionWithConfigsOff(session: SparkSession): SparkSession = {
-    if (session.sessionState.conf.getConf(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING)) {
+    if (session.conf.get(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING)) {
       session
     } else {
       SparkSession.getOrCloneSessionWithConfigsOff(session, forceDisableConfigs)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
index eca48a6..3a2e32c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
@@ -123,7 +123,7 @@ case class CreateViewCommand(
         referredTempFunctions)
       catalog.createTempView(name.table, tableDefinition, overrideIfExists = replace)
     } else if (viewType == GlobalTempView) {
-      val db = sparkSession.sessionState.conf.getConf(StaticSQLConf.GLOBAL_TEMP_DATABASE)
+      val db = sparkSession.conf.get(StaticSQLConf.GLOBAL_TEMP_DATABASE)
       val viewIdent = TableIdentifier(name.table, Option(db))
       val aliasedPlan = aliasPlan(sparkSession, analyzedPlan)
       val tableDefinition = createTemporaryViewRelation(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 2bb3d48..6b3dfac 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -267,8 +267,7 @@ case class DataSource(
             checkAndGlobPathIfNecessary(checkEmptyGlobPath = false, checkFilesExist = false)
           createInMemoryFileIndex(globbedPaths)
         })
-        val forceNullable =
-          sparkSession.sessionState.conf.getConf(SQLConf.FILE_SOURCE_SCHEMA_FORCE_NULLABLE)
+        val forceNullable = sparkSession.conf.get(SQLConf.FILE_SOURCE_SCHEMA_FORCE_NULLABLE)
         val sourceDataSchema = if (forceNullable) dataSchema.asNullable else dataSchema
         SourceInfo(
           s"FileSource[$path]",
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
index 9a81f2a..dc5894e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
@@ -106,7 +106,7 @@ case class CreateTempViewUsing(
       }.logicalPlan
 
     if (global) {
-      val db = sparkSession.sessionState.conf.getConf(StaticSQLConf.GLOBAL_TEMP_DATABASE)
+      val db = sparkSession.conf.get(StaticSQLConf.GLOBAL_TEMP_DATABASE)
       val viewIdent = TableIdentifier(tableIdent.table, Option(db))
       val viewDefinition = createTemporaryViewRelation(
         viewIdent,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index f9ae65c..324a833 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -417,7 +417,7 @@ abstract class StreamExecution(
   @throws[TimeoutException]
   protected def interruptAndAwaitExecutionThreadTermination(): Unit = {
     val timeout = math.max(
-      sparkSession.sessionState.conf.getConf(SQLConf.STREAMING_STOP_TIMEOUT), 0)
+      sparkSession.conf.get(SQLConf.STREAMING_STOP_TIMEOUT), 0)
     queryExecutionThread.interrupt()
     queryExecutionThread.join(timeout)
     if (queryExecutionThread.isAlive) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
index 4e1c7cc..6548d5f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
@@ -343,7 +343,7 @@ class StreamingQueryManager private[sql] (
         .orElse(activeQueries.get(query.id)) // shouldn't be needed but paranoia ...
 
       val shouldStopActiveRun =
-        sparkSession.sessionState.conf.getConf(SQLConf.STREAMING_STOP_ACTIVE_RUN_ON_RESTART)
+        sparkSession.conf.get(SQLConf.STREAMING_STOP_ACTIVE_RUN_ON_RESTART)
       if (activeOption.isDefined) {
         if (shouldStopActiveRun) {
           val oldQuery = activeOption.get
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
index bc7a7b2..e4d6eb3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
@@ -53,7 +53,7 @@ class FileBasedDataSourceSuite extends QueryTest
 
   override def beforeAll(): Unit = {
     super.beforeAll()
-    spark.sessionState.conf.setConf(SQLConf.ORC_IMPLEMENTATION, "native")
+    spark.conf.set(SQLConf.ORC_IMPLEMENTATION, "native")
   }
 
   override def afterAll(): Unit = {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index c28dde9..f7c35ec 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -2556,17 +2556,17 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
       val originalValue = newSession.sessionState.conf.runSQLonFile
 
       try {
-        newSession.sessionState.conf.setConf(SQLConf.RUN_SQL_ON_FILES, false)
+        newSession.conf.set(SQLConf.RUN_SQL_ON_FILES, false)
         intercept[AnalysisException] {
           newSession.sql(s"SELECT i, j FROM parquet.`${path.getCanonicalPath}`")
         }
 
-        newSession.sessionState.conf.setConf(SQLConf.RUN_SQL_ON_FILES, true)
+        newSession.conf.set(SQLConf.RUN_SQL_ON_FILES, true)
         checkAnswer(
           newSession.sql(s"SELECT i, j FROM parquet.`${path.getCanonicalPath}`"),
           Row(1, "a"))
       } finally {
-        newSession.sessionState.conf.setConf(SQLConf.RUN_SQL_ON_FILES, originalValue)
+        newSession.conf.set(SQLConf.RUN_SQL_ON_FILES, originalValue)
       }
     }
   }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala
index 0a7c684..ee8b6d2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala
@@ -199,11 +199,11 @@ class SparkSessionBuilderSuite extends SparkFunSuite with BeforeAndAfterEach wit
       .config("spark.app.name", "test-app-SPARK-31234")
       .getOrCreate()
 
-    assert(session.sessionState.conf.getConfString("spark.app.name") === "test-app-SPARK-31234")
-    assert(session.sessionState.conf.getConf(GLOBAL_TEMP_DATABASE) === "globaltempdb-spark-31234")
+    assert(session.conf.get("spark.app.name") === "test-app-SPARK-31234")
+    assert(session.conf.get(GLOBAL_TEMP_DATABASE) === "globaltempdb-spark-31234")
     session.sql("RESET")
-    assert(session.sessionState.conf.getConfString("spark.app.name") === "test-app-SPARK-31234")
-    assert(session.sessionState.conf.getConf(GLOBAL_TEMP_DATABASE) === "globaltempdb-spark-31234")
+    assert(session.conf.get("spark.app.name") === "test-app-SPARK-31234")
+    assert(session.conf.get(GLOBAL_TEMP_DATABASE) === "globaltempdb-spark-31234")
   }
 
   test("SPARK-31354: SparkContext only register one SparkSession ApplicationEnd listener") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
index 3577812..17124cc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
@@ -170,7 +170,7 @@ class SparkSessionExtensionSuite extends SparkFunSuite {
         MyColumnarRule(MyNewQueryStageRule(), MyNewQueryStageRule()))
     }
     withSession(extensions) { session =>
-      session.sessionState.conf.setConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED, true)
+      session.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED, true)
       assert(session.sessionState.queryStagePrepRules.contains(MyQueryStagePrepRule()))
       assert(session.sessionState.columnarRules.contains(
         MyColumnarRule(MyNewQueryStageRule(), MyNewQueryStageRule())))
@@ -209,7 +209,7 @@ class SparkSessionExtensionSuite extends SparkFunSuite {
         MyColumnarRule(PreRuleReplaceAddWithBrokenVersion(), MyPostRule()))
     }
     withSession(extensions) { session =>
-      session.sessionState.conf.setConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED, enableAQE)
+      session.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED, enableAQE)
       assert(session.sessionState.columnarRules.contains(
         MyColumnarRule(PreRuleReplaceAddWithBrokenVersion(), MyPostRule())))
       import session.sqlContext.implicits._
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index 6f14b1f..98dec46 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -1927,7 +1927,7 @@ class DataSourceV2SQLSuite
   }
 
   test("global temp view should not be masked by v2 catalog") {
-    val globalTempDB = spark.sessionState.conf.getConf(StaticSQLConf.GLOBAL_TEMP_DATABASE)
+    val globalTempDB = spark.conf.get(StaticSQLConf.GLOBAL_TEMP_DATABASE)
     spark.conf.set(s"spark.sql.catalog.$globalTempDB", classOf[InMemoryTableCatalog].getName)
 
     try {
@@ -1941,7 +1941,7 @@ class DataSourceV2SQLSuite
   }
 
   test("SPARK-30104: global temp db is used as a table name under v2 catalog") {
-    val globalTempDB = spark.sessionState.conf.getConf(StaticSQLConf.GLOBAL_TEMP_DATABASE)
+    val globalTempDB = spark.conf.get(StaticSQLConf.GLOBAL_TEMP_DATABASE)
     val t = s"testcat.$globalTempDB"
     withTable(t) {
       sql(s"CREATE TABLE $t (id bigint, data string) USING foo")
@@ -1952,7 +1952,7 @@ class DataSourceV2SQLSuite
   }
 
   test("SPARK-30104: v2 catalog named global_temp will be masked") {
-    val globalTempDB = spark.sessionState.conf.getConf(StaticSQLConf.GLOBAL_TEMP_DATABASE)
+    val globalTempDB = spark.conf.get(StaticSQLConf.GLOBAL_TEMP_DATABASE)
     spark.conf.set(s"spark.sql.catalog.$globalTempDB", classOf[InMemoryTableCatalog].getName)
 
     val e = intercept[AnalysisException] {
@@ -2132,7 +2132,7 @@ class DataSourceV2SQLSuite
     }
     intercept[AnalysisException](sql("COMMENT ON TABLE testcat.abc IS NULL"))
 
-    val globalTempDB = spark.sessionState.conf.getConf(StaticSQLConf.GLOBAL_TEMP_DATABASE)
+    val globalTempDB = spark.conf.get(StaticSQLConf.GLOBAL_TEMP_DATABASE)
     spark.conf.set(s"spark.sql.catalog.$globalTempDB", classOf[InMemoryTableCatalog].getName)
     withTempView("v") {
       sql("create global temp view v as select 1")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala
index 25d0a80..b50ac71 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala
@@ -107,7 +107,7 @@ class RocksDBStateStoreIntegrationSuite extends StreamTest {
   testQuietly("SPARK-36519: store RocksDB format version in the checkpoint") {
     def getFormatVersion(query: StreamingQuery): Int = {
       query.asInstanceOf[StreamingQueryWrapper].streamingQuery.lastExecution.sparkSession
-        .sessionState.conf.getConf(SQLConf.STATE_STORE_ROCKSDB_FORMAT_VERSION)
+        .conf.get(SQLConf.STATE_STORE_ROCKSDB_FORMAT_VERSION)
     }
 
     withSQLConf(
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala
index 7f25831..d568b7a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala
@@ -199,7 +199,7 @@ class ExpressionInfoSuite extends SparkFunSuite with SharedSparkSession {
       // Examples can change settings. We clone the session to prevent tests clashing.
       val clonedSpark = spark.cloneSession()
       // Coalescing partitions can change result order, so disable it.
-      clonedSpark.sessionState.conf.setConf(SQLConf.COALESCE_PARTITIONS_ENABLED, false)
+      clonedSpark.conf.set(SQLConf.COALESCE_PARTITIONS_ENABLED, false)
       val info = clonedSpark.sessionState.catalog.lookupFunctionInfo(funcId)
       val className = info.getClassName
       if (!ignoreSet.contains(className)) {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
index a589d4e..55d91ae 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
@@ -343,10 +343,10 @@ class SQLConfSuite extends QueryTest with SharedSparkSession {
     assert(spark.sessionState.conf.parquetOutputTimestampType ==
       SQLConf.ParquetOutputTimestampType.INT96)
 
-    spark.sessionState.conf.setConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE, "timestamp_micros")
+    spark.conf.set(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE, "timestamp_micros")
     assert(spark.sessionState.conf.parquetOutputTimestampType ==
       SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS)
-    spark.sessionState.conf.setConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE, "int96")
+    spark.conf.set(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE, "int96")
     assert(spark.sessionState.conf.parquetOutputTimestampType ==
       SQLConf.ParquetOutputTimestampType.INT96)
 
@@ -362,9 +362,9 @@ class SQLConfSuite extends QueryTest with SharedSparkSession {
     val fallback = SQLConf.buildConf("spark.sql.__test__.spark_22779")
       .fallbackConf(SQLConf.PARQUET_COMPRESSION)
 
-    assert(spark.sessionState.conf.getConfString(fallback.key) ===
+    assert(spark.conf.get(fallback.key) ===
       SQLConf.PARQUET_COMPRESSION.defaultValue.get)
-    assert(spark.sessionState.conf.getConfString(fallback.key, "lzo") === "lzo")
+    assert(spark.conf.get(fallback.key, "lzo") === "lzo")
 
     val displayValue = spark.sessionState.conf.getAllDefinedConfs
       .find { case (key, _, _, _) => key == fallback.key }
@@ -372,11 +372,11 @@ class SQLConfSuite extends QueryTest with SharedSparkSession {
       .get
     assert(displayValue === fallback.defaultValueString)
 
-    spark.sessionState.conf.setConf(SQLConf.PARQUET_COMPRESSION, "gzip")
-    assert(spark.sessionState.conf.getConfString(fallback.key) === "gzip")
+    spark.conf.set(SQLConf.PARQUET_COMPRESSION, "gzip")
+    assert(spark.conf.get(fallback.key) === "gzip")
 
-    spark.sessionState.conf.setConf(fallback, "lzo")
-    assert(spark.sessionState.conf.getConfString(fallback.key) === "lzo")
+    spark.conf.set(fallback, "lzo")
+    assert(spark.conf.get(fallback.key) === "lzo")
 
     val newDisplayValue = spark.sessionState.conf.getAllDefinedConfs
       .find { case (key, _, _, _) => key == fallback.key }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
index 18039db..921cff2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
@@ -53,7 +53,7 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
 
   protected override def beforeAll(): Unit = {
     super.beforeAll()
-    spark.sessionState.conf.setConf(SQLConf.LEGACY_BUCKETED_TABLE_SCAN_OUTPUT_ORDERING, true)
+    spark.conf.set(SQLConf.LEGACY_BUCKETED_TABLE_SCAN_OUTPUT_ORDERING, true)
   }
 
   protected override def afterAll(): Unit = {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
index 407d783..49cbbe1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
@@ -47,7 +47,7 @@ abstract class FileStreamSinkSuite extends StreamTest {
 
   override def beforeAll(): Unit = {
     super.beforeAll()
-    spark.sessionState.conf.setConf(SQLConf.ORC_IMPLEMENTATION, "native")
+    spark.conf.set(SQLConf.ORC_IMPLEMENTATION, "native")
   }
 
   override def afterAll(): Unit = {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index b907323..8a45895 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -257,7 +257,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
 
   override def beforeAll(): Unit = {
     super.beforeAll()
-    spark.sessionState.conf.setConf(SQLConf.ORC_IMPLEMENTATION, "native")
+    spark.conf.set(SQLConf.ORC_IMPLEMENTATION, "native")
   }
 
   override def afterAll(): Unit = {
@@ -1433,7 +1433,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
 
     // This is to avoid running a spark job to list of files in parallel
     // by the InMemoryFileIndex.
-    spark.sessionState.conf.setConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD, numFiles * 2)
+    spark.conf.set(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD, numFiles * 2)
 
     withTempDirs { case (root, tmp) =>
       val src = new File(root, "a=1")

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org