You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "HeartSaVioR (via GitHub)" <gi...@apache.org> on 2023/08/01 03:52:47 UTC

[GitHub] [spark] HeartSaVioR commented on a diff in pull request #42066: [SPARK-44480][SS] Use thread pool to perform maintenance activity for hdfs/rocksdb state store providers

HeartSaVioR commented on code in PR #42066:
URL: https://github.com/apache/spark/pull/42066#discussion_r1280065414


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -1852,6 +1852,17 @@ object SQLConf {
       .createWithDefault(
         "org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider")
 
+  val NUM_STATE_STORE_MAINTENANCE_THREADS =
+    buildConf("spark.sql.streaming.stateStore.numStateStoreMaintenanceThreads")
+      .internal()
+      .doc("Number of threads in the thread pool that perform clean up and snapshotting tasks " +
+        "for stateful streaming queries. The default value is 2 so that this thread pool " +

Review Comment:
   nit: the explanation of default value is no longer true. need to update.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -587,6 +622,8 @@ object StateStore extends Logging {
   /** Stop maintenance thread and reset the maintenance task */
   def stopMaintenanceTask(): Unit = loadedProviders.synchronized {
     if (maintenanceTask != null) {
+      maintenanceThreadPool.stop()

Review Comment:
   1. Let's cleanup relevant variables as well, `threadPoolException`, and `maintenancePartitions`. It should be necessary to re-initialize maintenanceTask.
   2. For the full safety, I'd have two if statements, to handle the cleanup of `maintenanceThreadPool` and `maintenanceTask` separately.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -628,17 +687,44 @@ object StateStore extends Logging {
     if (SparkEnv.get == null) {
       throw new IllegalStateException("SparkEnv not active, cannot do maintenance on StateStores")
     }
-    loadedProviders.synchronized { loadedProviders.toSeq }.foreach { case (id, provider) =>
-      try {
-        provider.doMaintenance()
-        if (!verifyIfStoreInstanceActive(id)) {
-          unload(id)
-          logInfo(s"Unloaded $provider")
-        }
-      } catch {
-        case NonFatal(e) =>
-          logWarning(s"Error managing $provider, stopping management thread")
-          throw e
+    loadedProviders.synchronized {
+      loadedProviders.toSeq
+    }.foreach { case (id, provider) =>
+      // check exception
+      if (threadPoolException.get() != null) {

Review Comment:
   First of all, this is not thread-safe although we use AtomicReference, as we are dealing with two different operations.
   Second, if we reset the exception here, other partition(s) will likely still schedule the maintenance task, which may not be something we desire to. We may want to skip scheduling others once the error was found.
   Third, exception could be "null" (race condition can happen - imagine two threads executing L695 concurrently) and it could lead to other issue on throwing null.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -602,22 +639,44 @@ object StateStore extends Logging {
   }
 
   /** Start the periodic maintenance task if not already started and if Spark active */
-  private def startMaintenanceIfNeeded(storeConf: StateStoreConf): Unit =
+  private def startMaintenanceIfNeeded(storeConf: StateStoreConf): Unit = {
+    val numMaintenanceThreads = storeConf.numStateStoreMaintenanceThreads
     loadedProviders.synchronized {
       if (SparkEnv.get != null && !isMaintenanceRunning) {
         maintenanceTask = new MaintenanceTask(
           storeConf.maintenanceInterval,
-          task = { doMaintenance() },
-          onError = { loadedProviders.synchronized {
+          task = {
+            doMaintenance()
+          },
+          onError = {
+            loadedProviders.synchronized {
               logInfo("Stopping maintenance task since an error was encountered.")
               stopMaintenanceTask()
+              // SPARK-44504 - Unload explicitly to force closing underlying DB instance

Review Comment:
   nit: this looks odd, the change should be already done. Could you please rebase with the latest master?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org