You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/12/13 23:04:55 UTC
[spark] branch branch-3.1 updated: [SPARK-33764][SS] Make state
store maintenance interval as SQL config
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new b4ee45a [SPARK-33764][SS] Make state store maintenance interval as SQL config
b4ee45a is described below
commit b4ee45adaed77662a51a127cda7005c81f78861d
Author: Liang-Chi Hsieh <vi...@gmail.com>
AuthorDate: Sun Dec 13 14:57:09 2020 -0800
[SPARK-33764][SS] Make state store maintenance interval as SQL config
### What changes were proposed in this pull request?
Currently the maintenance interval is hard-coded in `StateStore`. This patch proposes to make it as SQL config.
### Why are the changes needed?
Currently the maintenance interval is hard-coded in `StateStore`. For consistency reason, it should be placed together with other SS configs together. SQLConf also has a better way to have doc and default value setting.
### Does this PR introduce _any_ user-facing change?
Yes. Previously users use Spark config to set the maintenance interval. Now they could use SQL config to set it.
### How was this patch tested?
Unit test.
Closes #30741 from viirya/maintenance-interval-sqlconfig.
Authored-by: Liang-Chi Hsieh <vi...@gmail.com>
Signed-off-by: Dongjoon Hyun <do...@apache.org>
(cherry picked from commit 45af3c96889eba1958055206f10524299d0be61c)
Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
.../org/apache/spark/sql/internal/SQLConf.scala | 13 +++++++++++
.../sql/execution/streaming/state/StateStore.scala | 26 +++++++++-------------
.../execution/streaming/state/StateStoreConf.scala | 3 +++
.../streaming/state/StateStoreSuite.scala | 4 ++--
4 files changed, 29 insertions(+), 17 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 82df22b..e33169b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1360,6 +1360,17 @@ object SQLConf {
.intConf
.createWithDefault(2)
+ val STREAMING_MAINTENANCE_INTERVAL =
+ buildConf("spark.sql.streaming.stateStore.maintenanceInterval")
+ .internal()
+ .doc("The interval in milliseconds between triggering maintenance tasks in StateStore. " +
+ "The maintenance task executes background maintenance task in all the loaded store " +
+ "providers if they are still the active instances according to the coordinator. If not, " +
+ "inactive instances of store providers will be closed.")
+ .version("2.0.0")
+ .timeConf(TimeUnit.MILLISECONDS)
+ .createWithDefault(TimeUnit.MINUTES.toMillis(1)) // 1 minute
+
val STATE_STORE_COMPRESSION_CODEC =
buildConf("spark.sql.streaming.stateStore.compression.codec")
.internal()
@@ -3198,6 +3209,8 @@ class SQLConf extends Serializable with Logging {
def maxBatchesToRetainInMemory: Int = getConf(MAX_BATCHES_TO_RETAIN_IN_MEMORY)
+ def streamingMaintenanceInterval: Long = getConf(STREAMING_MAINTENANCE_INTERVAL)
+
def stateStoreCompressionCodec: String = getConf(STATE_STORE_COMPRESSION_CODEC)
def parquetFilterPushDown: Boolean = getConf(PARQUET_FILTER_PUSHDOWN_ENABLED)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
index ab67c19..f87a2fb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
@@ -385,8 +385,6 @@ class UnsafeRowPair(var key: UnsafeRow = null, var value: UnsafeRow = null) {
*/
object StateStore extends Logging {
- val MAINTENANCE_INTERVAL_CONFIG = "spark.sql.streaming.stateStore.maintenanceInterval"
- val MAINTENANCE_INTERVAL_DEFAULT_SECS = 60
val PARTITION_ID_TO_CHECK_SCHEMA = 0
@GuardedBy("loadedProviders")
@@ -471,7 +469,7 @@ object StateStore extends Logging {
storeConf: StateStoreConf,
hadoopConf: Configuration): StateStoreProvider = {
loadedProviders.synchronized {
- startMaintenanceIfNeeded()
+ startMaintenanceIfNeeded(storeConf)
if (storeProviderId.storeId.partitionId == PARTITION_ID_TO_CHECK_SCHEMA) {
val result = schemaValidated.getOrElseUpdate(storeProviderId, {
@@ -534,19 +532,17 @@ object StateStore extends Logging {
}
/** Start the periodic maintenance task if not already started and if Spark active */
- private def startMaintenanceIfNeeded(): Unit = loadedProviders.synchronized {
- val env = SparkEnv.get
- if (env != null && !isMaintenanceRunning) {
- val periodMs = env.conf.getTimeAsMs(
- MAINTENANCE_INTERVAL_CONFIG, s"${MAINTENANCE_INTERVAL_DEFAULT_SECS}s")
- maintenanceTask = new MaintenanceTask(
- periodMs,
- task = { doMaintenance() },
- onError = { loadedProviders.synchronized { loadedProviders.clear() } }
- )
- logInfo("State Store maintenance task started")
+ private def startMaintenanceIfNeeded(storeConf: StateStoreConf): Unit =
+ loadedProviders.synchronized {
+ if (SparkEnv.get != null && !isMaintenanceRunning) {
+ maintenanceTask = new MaintenanceTask(
+ storeConf.maintenanceInterval,
+ task = { doMaintenance() },
+ onError = { loadedProviders.synchronized { loadedProviders.clear() } }
+ )
+ logInfo("State Store maintenance task started")
+ }
}
- }
/**
* Execute background maintenance task in all the loaded store providers if they are still
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
index 23cb3be..58af827 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
@@ -58,6 +58,9 @@ class StateStoreConf(
/** whether to validate state schema during query run. */
val stateSchemaCheckEnabled = sqlConf.isStateSchemaCheckEnabled
+ /** The interval of maintenance tasks. */
+ val maintenanceInterval = sqlConf.streamingMaintenanceInterval
+
/**
* Additional configurations related to state store. This will capture all configs in
* SQLConf that start with `spark.sql.streaming.stateStore.` and extraOptions for a specific
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
index 0c2083a..d4cd3cd 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
@@ -390,8 +390,6 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider]
val conf = new SparkConf()
.setMaster("local")
.setAppName("test")
- // Make maintenance thread do snapshots and cleanups very fast
- .set(StateStore.MAINTENANCE_INTERVAL_CONFIG, "10ms")
// Make sure that when SparkContext stops, the StateStore maintenance thread 'quickly'
// fails to talk to the StateStoreCoordinator and unloads all the StateStores
.set(RPC_NUM_RETRIES, 1)
@@ -400,6 +398,8 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider]
val storeProviderId = StateStoreProviderId(StateStoreId(dir, opId, 0), UUID.randomUUID)
val sqlConf = new SQLConf()
sqlConf.setConf(SQLConf.MIN_BATCHES_TO_RETAIN, 2)
+ // Make maintenance thread do snapshots and cleanups very fast
+ sqlConf.setConf(SQLConf.STREAMING_MAINTENANCE_INTERVAL, 10L)
val storeConf = StateStoreConf(sqlConf)
val hadoopConf = new Configuration()
val provider = newStoreProvider(storeProviderId.storeId)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org