You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/12/13 23:04:55 UTC

[spark] branch branch-3.1 updated: [SPARK-33764][SS] Make state store maintenance interval as SQL config

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new b4ee45a  [SPARK-33764][SS] Make state store maintenance interval as SQL config
b4ee45a is described below

commit b4ee45adaed77662a51a127cda7005c81f78861d
Author: Liang-Chi Hsieh <vi...@gmail.com>
AuthorDate: Sun Dec 13 14:57:09 2020 -0800

    [SPARK-33764][SS] Make state store maintenance interval as SQL config
    
    ### What changes were proposed in this pull request?
    
    Currently the maintenance interval is hard-coded in `StateStore`. This patch proposes to make it as SQL config.
    
    ### Why are the changes needed?
    
    Currently the maintenance interval is hard-coded in `StateStore`. For consistency reason, it should be placed together with other SS configs together. SQLConf also has a better way to have doc and default value setting.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes. Previously users use Spark config to set the maintenance interval. Now they could use SQL config to set it.
    
    ### How was this patch tested?
    
    Unit test.
    
    Closes #30741 from viirya/maintenance-interval-sqlconfig.
    
    Authored-by: Liang-Chi Hsieh <vi...@gmail.com>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
    (cherry picked from commit 45af3c96889eba1958055206f10524299d0be61c)
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 .../org/apache/spark/sql/internal/SQLConf.scala    | 13 +++++++++++
 .../sql/execution/streaming/state/StateStore.scala | 26 +++++++++-------------
 .../execution/streaming/state/StateStoreConf.scala |  3 +++
 .../streaming/state/StateStoreSuite.scala          |  4 ++--
 4 files changed, 29 insertions(+), 17 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 82df22b..e33169b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1360,6 +1360,17 @@ object SQLConf {
     .intConf
     .createWithDefault(2)
 
+  val STREAMING_MAINTENANCE_INTERVAL =
+    buildConf("spark.sql.streaming.stateStore.maintenanceInterval")
+      .internal()
+      .doc("The interval in milliseconds between triggering maintenance tasks in StateStore. " +
+        "The maintenance task executes background maintenance task in all the loaded store " +
+        "providers if they are still the active instances according to the coordinator. If not, " +
+        "inactive instances of store providers will be closed.")
+      .version("2.0.0")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .createWithDefault(TimeUnit.MINUTES.toMillis(1)) // 1 minute
+
   val STATE_STORE_COMPRESSION_CODEC =
     buildConf("spark.sql.streaming.stateStore.compression.codec")
       .internal()
@@ -3198,6 +3209,8 @@ class SQLConf extends Serializable with Logging {
 
   def maxBatchesToRetainInMemory: Int = getConf(MAX_BATCHES_TO_RETAIN_IN_MEMORY)
 
+  def streamingMaintenanceInterval: Long = getConf(STREAMING_MAINTENANCE_INTERVAL)
+
   def stateStoreCompressionCodec: String = getConf(STATE_STORE_COMPRESSION_CODEC)
 
   def parquetFilterPushDown: Boolean = getConf(PARQUET_FILTER_PUSHDOWN_ENABLED)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
index ab67c19..f87a2fb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
@@ -385,8 +385,6 @@ class UnsafeRowPair(var key: UnsafeRow = null, var value: UnsafeRow = null) {
  */
 object StateStore extends Logging {
 
-  val MAINTENANCE_INTERVAL_CONFIG = "spark.sql.streaming.stateStore.maintenanceInterval"
-  val MAINTENANCE_INTERVAL_DEFAULT_SECS = 60
   val PARTITION_ID_TO_CHECK_SCHEMA = 0
 
   @GuardedBy("loadedProviders")
@@ -471,7 +469,7 @@ object StateStore extends Logging {
       storeConf: StateStoreConf,
       hadoopConf: Configuration): StateStoreProvider = {
     loadedProviders.synchronized {
-      startMaintenanceIfNeeded()
+      startMaintenanceIfNeeded(storeConf)
 
       if (storeProviderId.storeId.partitionId == PARTITION_ID_TO_CHECK_SCHEMA) {
         val result = schemaValidated.getOrElseUpdate(storeProviderId, {
@@ -534,19 +532,17 @@ object StateStore extends Logging {
   }
 
   /** Start the periodic maintenance task if not already started and if Spark active */
-  private def startMaintenanceIfNeeded(): Unit = loadedProviders.synchronized {
-    val env = SparkEnv.get
-    if (env != null && !isMaintenanceRunning) {
-      val periodMs = env.conf.getTimeAsMs(
-        MAINTENANCE_INTERVAL_CONFIG, s"${MAINTENANCE_INTERVAL_DEFAULT_SECS}s")
-      maintenanceTask = new MaintenanceTask(
-        periodMs,
-        task = { doMaintenance() },
-        onError = { loadedProviders.synchronized { loadedProviders.clear() } }
-      )
-      logInfo("State Store maintenance task started")
+  private def startMaintenanceIfNeeded(storeConf: StateStoreConf): Unit =
+    loadedProviders.synchronized {
+      if (SparkEnv.get != null && !isMaintenanceRunning) {
+        maintenanceTask = new MaintenanceTask(
+          storeConf.maintenanceInterval,
+          task = { doMaintenance() },
+          onError = { loadedProviders.synchronized { loadedProviders.clear() } }
+        )
+        logInfo("State Store maintenance task started")
+      }
     }
-  }
 
   /**
    * Execute background maintenance task in all the loaded store providers if they are still
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
index 23cb3be..58af827 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
@@ -58,6 +58,9 @@ class StateStoreConf(
   /** whether to validate state schema during query run. */
   val stateSchemaCheckEnabled = sqlConf.isStateSchemaCheckEnabled
 
+  /** The interval of maintenance tasks. */
+  val maintenanceInterval = sqlConf.streamingMaintenanceInterval
+
   /**
    * Additional configurations related to state store. This will capture all configs in
    * SQLConf that start with `spark.sql.streaming.stateStore.` and extraOptions for a specific
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
index 0c2083a..d4cd3cd 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
@@ -390,8 +390,6 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider]
     val conf = new SparkConf()
       .setMaster("local")
       .setAppName("test")
-      // Make maintenance thread do snapshots and cleanups very fast
-      .set(StateStore.MAINTENANCE_INTERVAL_CONFIG, "10ms")
       // Make sure that when SparkContext stops, the StateStore maintenance thread 'quickly'
       // fails to talk to the StateStoreCoordinator and unloads all the StateStores
       .set(RPC_NUM_RETRIES, 1)
@@ -400,6 +398,8 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider]
     val storeProviderId = StateStoreProviderId(StateStoreId(dir, opId, 0), UUID.randomUUID)
     val sqlConf = new SQLConf()
     sqlConf.setConf(SQLConf.MIN_BATCHES_TO_RETAIN, 2)
+    // Make maintenance thread do snapshots and cleanups very fast
+    sqlConf.setConf(SQLConf.STREAMING_MAINTENANCE_INTERVAL, 10L)
     val storeConf = StateStoreConf(sqlConf)
     val hadoopConf = new Configuration()
     val provider = newStoreProvider(storeProviderId.storeId)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org