You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/03/31 10:52:02 UTC
[spark] branch master updated: [SPARK-38419][SQL] Change spark.sessionstate.conf.getConf/setConf operation to spark.conf.get/set
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 454fe9c [SPARK-38419][SQL] Change spark.sessionstate.conf.getConf/setConf operation to spark.conf.get/set
454fe9c is described below
commit 454fe9c8eb1903e1f199e4d5e13b171e519f793e
Author: jackylee-ch <li...@baidu.com>
AuthorDate: Thu Mar 31 18:50:32 2022 +0800
[SPARK-38419][SQL] Change spark.sessionstate.conf.getConf/setConf operation to spark.conf.get/set
### Why are the changes needed?
In the sql module, we provide `SparkSession.conf` as a unified entry for `SQLConf.set/get`, which can prevent users or logic from modifying StaticSQLConf and Spark configs. However, I found `SparkSession.sessionstate.conf` is used in some code to getConf or setConf, which can skip the check of `RuntimeConfig`.
In this PR, we want to unify the behavior of `SQLConf.getConf/setConf` to `SparkSession.conf`.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Origin GA
Closes #35950 from jackylee-ch/change_spark_sessionstate_conf_operation_to_spark_conf.
Lead-authored-by: jackylee-ch <li...@baidu.com>
Co-authored-by: Jacky Lee <qc...@gmail.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../src/main/scala/org/apache/spark/sql/Dataset.scala | 4 ++--
.../main/scala/org/apache/spark/sql/RuntimeConfig.scala | 8 ++++++++
.../main/scala/org/apache/spark/sql/SparkSession.scala | 4 ++--
.../org/apache/spark/sql/execution/CacheManager.scala | 2 +-
.../org/apache/spark/sql/execution/command/views.scala | 2 +-
.../spark/sql/execution/datasources/DataSource.scala | 3 +--
.../org/apache/spark/sql/execution/datasources/ddl.scala | 2 +-
.../spark/sql/execution/streaming/StreamExecution.scala | 2 +-
.../spark/sql/streaming/StreamingQueryManager.scala | 2 +-
.../org/apache/spark/sql/FileBasedDataSourceSuite.scala | 2 +-
.../test/scala/org/apache/spark/sql/SQLQuerySuite.scala | 6 +++---
.../org/apache/spark/sql/SparkSessionBuilderSuite.scala | 8 ++++----
.../apache/spark/sql/SparkSessionExtensionSuite.scala | 4 ++--
.../spark/sql/connector/DataSourceV2SQLSuite.scala | 8 ++++----
.../state/RocksDBStateStoreIntegrationSuite.scala | 2 +-
.../spark/sql/expressions/ExpressionInfoSuite.scala | 2 +-
.../org/apache/spark/sql/internal/SQLConfSuite.scala | 16 ++++++++--------
.../org/apache/spark/sql/sources/BucketedReadSuite.scala | 2 +-
.../apache/spark/sql/streaming/FileStreamSinkSuite.scala | 2 +-
.../spark/sql/streaming/FileStreamSourceSuite.scala | 4 ++--
20 files changed, 46 insertions(+), 39 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 62dea96..5e5de1f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -218,7 +218,7 @@ class Dataset[T] private[sql](
@transient private[sql] val logicalPlan: LogicalPlan = {
val plan = queryExecution.commandExecuted
- if (sparkSession.sessionState.conf.getConf(SQLConf.FAIL_AMBIGUOUS_SELF_JOIN_ENABLED)) {
+ if (sparkSession.conf.get(SQLConf.FAIL_AMBIGUOUS_SELF_JOIN_ENABLED)) {
val dsIds = plan.getTagValue(Dataset.DATASET_ID_TAG).getOrElse(new HashSet[Long])
dsIds.add(id)
plan.setTagValue(Dataset.DATASET_ID_TAG, dsIds)
@@ -1426,7 +1426,7 @@ class Dataset[T] private[sql](
private def addDataFrameIdToCol(expr: NamedExpression): NamedExpression = {
val newExpr = expr transform {
case a: AttributeReference
- if sparkSession.sessionState.conf.getConf(SQLConf.FAIL_AMBIGUOUS_SELF_JOIN_ENABLED) =>
+ if sparkSession.conf.get(SQLConf.FAIL_AMBIGUOUS_SELF_JOIN_ENABLED) =>
val metadata = new MetadataBuilder()
.withMetadata(a.metadata)
.putLong(Dataset.DATASET_ID_KEY, id)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala b/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala
index 6c9b150..a3237cf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala
@@ -61,6 +61,14 @@ class RuntimeConfig private[sql](sqlConf: SQLConf = new SQLConf) {
}
/**
+ * Sets the given Spark runtime configuration property.
+ */
+ protected[sql] def set[T](entry: ConfigEntry[T], value: T): Unit = {
+ requireNonStaticConf(entry.key)
+ sqlConf.setConf(entry, value)
+ }
+
+ /**
* Returns the value of Spark runtime configuration property for the given key.
*
* @throws java.util.NoSuchElementException if the key is not set and does not have a default
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 734b8e5..5b212c3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -1104,13 +1104,13 @@ object SparkSession extends Logging {
private[sql] def getOrCloneSessionWithConfigsOff(
session: SparkSession,
configurations: Seq[ConfigEntry[Boolean]]): SparkSession = {
- val configsEnabled = configurations.filter(session.sessionState.conf.getConf(_))
+ val configsEnabled = configurations.filter(session.conf.get[Boolean])
if (configsEnabled.isEmpty) {
session
} else {
val newSession = session.cloneSession()
configsEnabled.foreach(conf => {
- newSession.sessionState.conf.setConf(conf, false)
+ newSession.conf.set(conf, false)
})
newSession
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index 27d6bed..527f78e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -332,7 +332,7 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper {
* If CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING is enabled, just return original session.
*/
private def getOrCloneSessionWithConfigsOff(session: SparkSession): SparkSession = {
- if (session.sessionState.conf.getConf(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING)) {
+ if (session.conf.get(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING)) {
session
} else {
SparkSession.getOrCloneSessionWithConfigsOff(session, forceDisableConfigs)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
index eca48a6..3a2e32c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
@@ -123,7 +123,7 @@ case class CreateViewCommand(
referredTempFunctions)
catalog.createTempView(name.table, tableDefinition, overrideIfExists = replace)
} else if (viewType == GlobalTempView) {
- val db = sparkSession.sessionState.conf.getConf(StaticSQLConf.GLOBAL_TEMP_DATABASE)
+ val db = sparkSession.conf.get(StaticSQLConf.GLOBAL_TEMP_DATABASE)
val viewIdent = TableIdentifier(name.table, Option(db))
val aliasedPlan = aliasPlan(sparkSession, analyzedPlan)
val tableDefinition = createTemporaryViewRelation(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 2bb3d48..6b3dfac 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -267,8 +267,7 @@ case class DataSource(
checkAndGlobPathIfNecessary(checkEmptyGlobPath = false, checkFilesExist = false)
createInMemoryFileIndex(globbedPaths)
})
- val forceNullable =
- sparkSession.sessionState.conf.getConf(SQLConf.FILE_SOURCE_SCHEMA_FORCE_NULLABLE)
+ val forceNullable = sparkSession.conf.get(SQLConf.FILE_SOURCE_SCHEMA_FORCE_NULLABLE)
val sourceDataSchema = if (forceNullable) dataSchema.asNullable else dataSchema
SourceInfo(
s"FileSource[$path]",
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
index 9a81f2a..dc5894e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
@@ -106,7 +106,7 @@ case class CreateTempViewUsing(
}.logicalPlan
if (global) {
- val db = sparkSession.sessionState.conf.getConf(StaticSQLConf.GLOBAL_TEMP_DATABASE)
+ val db = sparkSession.conf.get(StaticSQLConf.GLOBAL_TEMP_DATABASE)
val viewIdent = TableIdentifier(tableIdent.table, Option(db))
val viewDefinition = createTemporaryViewRelation(
viewIdent,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index f9ae65c..324a833 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -417,7 +417,7 @@ abstract class StreamExecution(
@throws[TimeoutException]
protected def interruptAndAwaitExecutionThreadTermination(): Unit = {
val timeout = math.max(
- sparkSession.sessionState.conf.getConf(SQLConf.STREAMING_STOP_TIMEOUT), 0)
+ sparkSession.conf.get(SQLConf.STREAMING_STOP_TIMEOUT), 0)
queryExecutionThread.interrupt()
queryExecutionThread.join(timeout)
if (queryExecutionThread.isAlive) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
index 4e1c7cc..6548d5f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
@@ -343,7 +343,7 @@ class StreamingQueryManager private[sql] (
.orElse(activeQueries.get(query.id)) // shouldn't be needed but paranoia ...
val shouldStopActiveRun =
- sparkSession.sessionState.conf.getConf(SQLConf.STREAMING_STOP_ACTIVE_RUN_ON_RESTART)
+ sparkSession.conf.get(SQLConf.STREAMING_STOP_ACTIVE_RUN_ON_RESTART)
if (activeOption.isDefined) {
if (shouldStopActiveRun) {
val oldQuery = activeOption.get
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
index bc7a7b2..e4d6eb3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
@@ -53,7 +53,7 @@ class FileBasedDataSourceSuite extends QueryTest
override def beforeAll(): Unit = {
super.beforeAll()
- spark.sessionState.conf.setConf(SQLConf.ORC_IMPLEMENTATION, "native")
+ spark.conf.set(SQLConf.ORC_IMPLEMENTATION, "native")
}
override def afterAll(): Unit = {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index c28dde9..f7c35ec 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -2556,17 +2556,17 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
val originalValue = newSession.sessionState.conf.runSQLonFile
try {
- newSession.sessionState.conf.setConf(SQLConf.RUN_SQL_ON_FILES, false)
+ newSession.conf.set(SQLConf.RUN_SQL_ON_FILES, false)
intercept[AnalysisException] {
newSession.sql(s"SELECT i, j FROM parquet.`${path.getCanonicalPath}`")
}
- newSession.sessionState.conf.setConf(SQLConf.RUN_SQL_ON_FILES, true)
+ newSession.conf.set(SQLConf.RUN_SQL_ON_FILES, true)
checkAnswer(
newSession.sql(s"SELECT i, j FROM parquet.`${path.getCanonicalPath}`"),
Row(1, "a"))
} finally {
- newSession.sessionState.conf.setConf(SQLConf.RUN_SQL_ON_FILES, originalValue)
+ newSession.conf.set(SQLConf.RUN_SQL_ON_FILES, originalValue)
}
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala
index 0a7c684..ee8b6d2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala
@@ -199,11 +199,11 @@ class SparkSessionBuilderSuite extends SparkFunSuite with BeforeAndAfterEach wit
.config("spark.app.name", "test-app-SPARK-31234")
.getOrCreate()
- assert(session.sessionState.conf.getConfString("spark.app.name") === "test-app-SPARK-31234")
- assert(session.sessionState.conf.getConf(GLOBAL_TEMP_DATABASE) === "globaltempdb-spark-31234")
+ assert(session.conf.get("spark.app.name") === "test-app-SPARK-31234")
+ assert(session.conf.get(GLOBAL_TEMP_DATABASE) === "globaltempdb-spark-31234")
session.sql("RESET")
- assert(session.sessionState.conf.getConfString("spark.app.name") === "test-app-SPARK-31234")
- assert(session.sessionState.conf.getConf(GLOBAL_TEMP_DATABASE) === "globaltempdb-spark-31234")
+ assert(session.conf.get("spark.app.name") === "test-app-SPARK-31234")
+ assert(session.conf.get(GLOBAL_TEMP_DATABASE) === "globaltempdb-spark-31234")
}
test("SPARK-31354: SparkContext only register one SparkSession ApplicationEnd listener") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
index 3577812..17124cc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
@@ -170,7 +170,7 @@ class SparkSessionExtensionSuite extends SparkFunSuite {
MyColumnarRule(MyNewQueryStageRule(), MyNewQueryStageRule()))
}
withSession(extensions) { session =>
- session.sessionState.conf.setConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED, true)
+ session.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED, true)
assert(session.sessionState.queryStagePrepRules.contains(MyQueryStagePrepRule()))
assert(session.sessionState.columnarRules.contains(
MyColumnarRule(MyNewQueryStageRule(), MyNewQueryStageRule())))
@@ -209,7 +209,7 @@ class SparkSessionExtensionSuite extends SparkFunSuite {
MyColumnarRule(PreRuleReplaceAddWithBrokenVersion(), MyPostRule()))
}
withSession(extensions) { session =>
- session.sessionState.conf.setConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED, enableAQE)
+ session.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED, enableAQE)
assert(session.sessionState.columnarRules.contains(
MyColumnarRule(PreRuleReplaceAddWithBrokenVersion(), MyPostRule())))
import session.sqlContext.implicits._
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index 6f14b1f..98dec46 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -1927,7 +1927,7 @@ class DataSourceV2SQLSuite
}
test("global temp view should not be masked by v2 catalog") {
- val globalTempDB = spark.sessionState.conf.getConf(StaticSQLConf.GLOBAL_TEMP_DATABASE)
+ val globalTempDB = spark.conf.get(StaticSQLConf.GLOBAL_TEMP_DATABASE)
spark.conf.set(s"spark.sql.catalog.$globalTempDB", classOf[InMemoryTableCatalog].getName)
try {
@@ -1941,7 +1941,7 @@ class DataSourceV2SQLSuite
}
test("SPARK-30104: global temp db is used as a table name under v2 catalog") {
- val globalTempDB = spark.sessionState.conf.getConf(StaticSQLConf.GLOBAL_TEMP_DATABASE)
+ val globalTempDB = spark.conf.get(StaticSQLConf.GLOBAL_TEMP_DATABASE)
val t = s"testcat.$globalTempDB"
withTable(t) {
sql(s"CREATE TABLE $t (id bigint, data string) USING foo")
@@ -1952,7 +1952,7 @@ class DataSourceV2SQLSuite
}
test("SPARK-30104: v2 catalog named global_temp will be masked") {
- val globalTempDB = spark.sessionState.conf.getConf(StaticSQLConf.GLOBAL_TEMP_DATABASE)
+ val globalTempDB = spark.conf.get(StaticSQLConf.GLOBAL_TEMP_DATABASE)
spark.conf.set(s"spark.sql.catalog.$globalTempDB", classOf[InMemoryTableCatalog].getName)
val e = intercept[AnalysisException] {
@@ -2132,7 +2132,7 @@ class DataSourceV2SQLSuite
}
intercept[AnalysisException](sql("COMMENT ON TABLE testcat.abc IS NULL"))
- val globalTempDB = spark.sessionState.conf.getConf(StaticSQLConf.GLOBAL_TEMP_DATABASE)
+ val globalTempDB = spark.conf.get(StaticSQLConf.GLOBAL_TEMP_DATABASE)
spark.conf.set(s"spark.sql.catalog.$globalTempDB", classOf[InMemoryTableCatalog].getName)
withTempView("v") {
sql("create global temp view v as select 1")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala
index 25d0a80..b50ac71 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala
@@ -107,7 +107,7 @@ class RocksDBStateStoreIntegrationSuite extends StreamTest {
testQuietly("SPARK-36519: store RocksDB format version in the checkpoint") {
def getFormatVersion(query: StreamingQuery): Int = {
query.asInstanceOf[StreamingQueryWrapper].streamingQuery.lastExecution.sparkSession
- .sessionState.conf.getConf(SQLConf.STATE_STORE_ROCKSDB_FORMAT_VERSION)
+ .conf.get(SQLConf.STATE_STORE_ROCKSDB_FORMAT_VERSION)
}
withSQLConf(
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala
index 7f25831..d568b7a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala
@@ -199,7 +199,7 @@ class ExpressionInfoSuite extends SparkFunSuite with SharedSparkSession {
// Examples can change settings. We clone the session to prevent tests clashing.
val clonedSpark = spark.cloneSession()
// Coalescing partitions can change result order, so disable it.
- clonedSpark.sessionState.conf.setConf(SQLConf.COALESCE_PARTITIONS_ENABLED, false)
+ clonedSpark.conf.set(SQLConf.COALESCE_PARTITIONS_ENABLED, false)
val info = clonedSpark.sessionState.catalog.lookupFunctionInfo(funcId)
val className = info.getClassName
if (!ignoreSet.contains(className)) {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
index a589d4e..55d91ae 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
@@ -343,10 +343,10 @@ class SQLConfSuite extends QueryTest with SharedSparkSession {
assert(spark.sessionState.conf.parquetOutputTimestampType ==
SQLConf.ParquetOutputTimestampType.INT96)
- spark.sessionState.conf.setConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE, "timestamp_micros")
+ spark.conf.set(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE, "timestamp_micros")
assert(spark.sessionState.conf.parquetOutputTimestampType ==
SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS)
- spark.sessionState.conf.setConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE, "int96")
+ spark.conf.set(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE, "int96")
assert(spark.sessionState.conf.parquetOutputTimestampType ==
SQLConf.ParquetOutputTimestampType.INT96)
@@ -362,9 +362,9 @@ class SQLConfSuite extends QueryTest with SharedSparkSession {
val fallback = SQLConf.buildConf("spark.sql.__test__.spark_22779")
.fallbackConf(SQLConf.PARQUET_COMPRESSION)
- assert(spark.sessionState.conf.getConfString(fallback.key) ===
+ assert(spark.conf.get(fallback.key) ===
SQLConf.PARQUET_COMPRESSION.defaultValue.get)
- assert(spark.sessionState.conf.getConfString(fallback.key, "lzo") === "lzo")
+ assert(spark.conf.get(fallback.key, "lzo") === "lzo")
val displayValue = spark.sessionState.conf.getAllDefinedConfs
.find { case (key, _, _, _) => key == fallback.key }
@@ -372,11 +372,11 @@ class SQLConfSuite extends QueryTest with SharedSparkSession {
.get
assert(displayValue === fallback.defaultValueString)
- spark.sessionState.conf.setConf(SQLConf.PARQUET_COMPRESSION, "gzip")
- assert(spark.sessionState.conf.getConfString(fallback.key) === "gzip")
+ spark.conf.set(SQLConf.PARQUET_COMPRESSION, "gzip")
+ assert(spark.conf.get(fallback.key) === "gzip")
- spark.sessionState.conf.setConf(fallback, "lzo")
- assert(spark.sessionState.conf.getConfString(fallback.key) === "lzo")
+ spark.conf.set(fallback, "lzo")
+ assert(spark.conf.get(fallback.key) === "lzo")
val newDisplayValue = spark.sessionState.conf.getAllDefinedConfs
.find { case (key, _, _, _) => key == fallback.key }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
index 18039db..921cff2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
@@ -53,7 +53,7 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
protected override def beforeAll(): Unit = {
super.beforeAll()
- spark.sessionState.conf.setConf(SQLConf.LEGACY_BUCKETED_TABLE_SCAN_OUTPUT_ORDERING, true)
+ spark.conf.set(SQLConf.LEGACY_BUCKETED_TABLE_SCAN_OUTPUT_ORDERING, true)
}
protected override def afterAll(): Unit = {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
index 407d783..49cbbe1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
@@ -47,7 +47,7 @@ abstract class FileStreamSinkSuite extends StreamTest {
override def beforeAll(): Unit = {
super.beforeAll()
- spark.sessionState.conf.setConf(SQLConf.ORC_IMPLEMENTATION, "native")
+ spark.conf.set(SQLConf.ORC_IMPLEMENTATION, "native")
}
override def afterAll(): Unit = {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index b907323..8a45895 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -257,7 +257,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
override def beforeAll(): Unit = {
super.beforeAll()
- spark.sessionState.conf.setConf(SQLConf.ORC_IMPLEMENTATION, "native")
+ spark.conf.set(SQLConf.ORC_IMPLEMENTATION, "native")
}
override def afterAll(): Unit = {
@@ -1433,7 +1433,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
// This is to avoid running a spark job to list of files in parallel
// by the InMemoryFileIndex.
- spark.sessionState.conf.setConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD, numFiles * 2)
+ spark.conf.set(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD, numFiles * 2)
withTempDirs { case (root, tmp) =>
val src = new File(root, "a=1")
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org