You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "anishshri-db (via GitHub)" <gi...@apache.org> on 2023/12/30 09:25:16 UTC

[PR] [SPARK-46547] Fix deadlock between maintenance thread and streaming aggregation operator [spark]

anishshri-db opened a new pull request, #44542:
URL: https://github.com/apache/spark/pull/44542

   ### What changes were proposed in this pull request?
   Fix deadlock between maintenance thread and streaming aggregation operator
   
   ### Why are the changes needed?
   This change fixes a race condition that causes a deadlock between the task thread and the maintenance thread. This is primarily only possible with the streaming aggregation operator. In this case, we use 2 physical operators - `StateStoreRestoreExec` and `StateStoreSaveExec`. The first one opens the store in read-only mode and the 2nd one does the actual commit.
   
   However, the following sequence of events creates an issue
   1. Task thread runs the `StateStoreRestoreExec` and gets the store instance and thereby the DB instance lock
   2. Maintenance thread fails with an error for some reason
   3. Maintenance thread takes the `loadedProviders` lock and tries to call `close` on all the loaded providers
   4. Task thread tries to execute the StateStoreRDD for the `StateStoreSaveExec` operator and tries to acquire the `loadedProviders` lock which is held by the thread above
   
   So basically if the maintenance thread is interleaved between the `restore/save` operations, there is a deadlock condition based on the `loadedProviders` lock and the DB instance lock.
   
   The fix proposes to simply release the resources at the end of the `StateStoreRestoreExec` operator (note that `abort` for `ReadStateStore` is likely a misnomer - but we choose to follow the already provided API in this case)
   
   Relevant Logs:
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   ### How was this patch tested?
   Existing unit tests
   ```
   [info] Run completed in 6 minutes, 20 seconds.
   [info] Total number of tests run: 80
   [info] Suites: completed 1, aborted 0
   [info] Tests: succeeded 80, failed 0, canceled 0, ignored 0, pending 0
   [info] All tests passed.
   ```
   
   ### Was this patch authored or co-authored using generative AI tooling?
   Yes
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46547][SS] Fix deadlock between maintenance thread and streaming aggregation operator [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on PR #44542:
URL: https://github.com/apache/spark/pull/44542#issuecomment-1882621196

   I believe the real thing here is that the failure of maintenance task is hammering all active state store providers, effectively impacting to all stateful tasks on the executor.
   
   Let's look back what we do in maintenance task. Mostly we do snapshotting and cleaning up orphaned files. If we suppose the task fails, would the state store (provider) be impacted? From what I understand, no, it is not impacted.
   
   This is reflected in the HDFS backed state store provider. If we look at maintenance task in HDFS backed state store provider, it swallows non-fatal exception. If we agree that the failure of maintenance task in RocksDB state store provider does not impact the actual state store (provider), we can do the same to RocksDB state store provider.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46547][SS] Fix deadlock between maintenance thread and streaming aggregation operator [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #44542:
URL: https://github.com/apache/spark/pull/44542#discussion_r1446857416


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##########


Review Comment:
   Actually looked at other places - it does seem that they are fine. Could you please check the pattern once again ?
   
   ```
       child.execute().mapPartitionsWithReadStateStore(
         getStateInfo,
         keyExpressions.toStructType,
         stateManager.getStateValueSchema,
         numColsPrefixKey = 0,
         session.sessionState,
         Some(session.streams.stateStoreCoordinator)) { case (store, iter) =>
   ```
   
   2 spaces are already added in the beginning. Do we need 2 more for the next line statement ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46547][SS] Swallow non-fatal exception in maintenance task to avoid deadlock between maintenance thread and streaming aggregation operator [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #44542:
URL: https://github.com/apache/spark/pull/44542#discussion_r1446987559


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##########
@@ -233,7 +235,12 @@ private[sql] class RocksDBStateStoreProvider
   }
 
   override def doMaintenance(): Unit = {
-    rocksDB.doMaintenance()
+    try {
+      rocksDB.doMaintenance()
+    } catch {
+      case NonFatal(ex) =>
+        logWarning(s"Error performing maintenance operations with exception=$ex")

Review Comment:
   Done - updated the PR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46547][SS] Swallow non-fatal exception in maintenance task to avoid deadlock between maintenance thread and streaming aggregation operator [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on PR #44542:
URL: https://github.com/apache/spark/pull/44542#issuecomment-1884934131

   Thanks! Merging to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46547][SS] Fix deadlock between maintenance thread and streaming aggregation operator [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #44542:
URL: https://github.com/apache/spark/pull/44542#discussion_r1438619998


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##########
@@ -434,22 +434,26 @@ case class StateStoreRestoreExec(
       numColsPrefixKey = 0,
       session.sessionState,
       Some(session.streams.stateStoreCoordinator)) { case (store, iter) =>
-        val hasInput = iter.hasNext
-        if (!hasInput && keyExpressions.isEmpty) {
-          // If our `keyExpressions` are empty, we're getting a global aggregation. In that case
-          // the `HashAggregateExec` will output a 0 value for the partial merge. We need to
-          // restore the value, so that we don't overwrite our state with a 0 value, but rather
-          // merge the 0 with existing state.
-          store.iterator().map(_.value)
-        } else {
-          iter.flatMap { row =>
-            val key = stateManager.getKey(row.asInstanceOf[UnsafeRow])
-            val restoredRow = stateManager.get(store, key)
-            val outputRows = Option(restoredRow).toSeq :+ row
-            numOutputRows += outputRows.size
-            outputRows
-          }
+      val hasInput = iter.hasNext
+      val result = if (!hasInput && keyExpressions.isEmpty) {
+        // If our `keyExpressions` are empty, we're getting a global aggregation. In that case
+        // the `HashAggregateExec` will output a 0 value for the partial merge. We need to
+        // restore the value, so that we don't overwrite our state with a 0 value, but rather
+        // merge the 0 with existing state.
+        store.iterator().map(_.value)
+      } else {
+        iter.flatMap { row =>
+          val key = stateManager.getKey(row.asInstanceOf[UnsafeRow])
+          val restoredRow = stateManager.get(store, key)
+          val outputRows = Option(restoredRow).toSeq :+ row
+          numOutputRows += outputRows.size
+          outputRows
         }
+      }
+      // SPARK-46547 - Release any locks/resources if required, to prevent
+      // deadlocks with the maintenance thread.
+      store.abort()

Review Comment:
   @HeartSaVioR - we could probably make this more light-weight - in order to just release the instance lock and keep the loaded version intact. but then we probably need to add another API on the store or pass an argument to `abort` ? Thoughts ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46547][SS] Fix deadlock between maintenance thread and streaming aggregation operator [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #44542:
URL: https://github.com/apache/spark/pull/44542#discussion_r1438964350


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##########
@@ -434,22 +434,26 @@ case class StateStoreRestoreExec(
       numColsPrefixKey = 0,
       session.sessionState,
       Some(session.streams.stateStoreCoordinator)) { case (store, iter) =>
-        val hasInput = iter.hasNext
-        if (!hasInput && keyExpressions.isEmpty) {
-          // If our `keyExpressions` are empty, we're getting a global aggregation. In that case
-          // the `HashAggregateExec` will output a 0 value for the partial merge. We need to
-          // restore the value, so that we don't overwrite our state with a 0 value, but rather
-          // merge the 0 with existing state.
-          store.iterator().map(_.value)
-        } else {
-          iter.flatMap { row =>
-            val key = stateManager.getKey(row.asInstanceOf[UnsafeRow])
-            val restoredRow = stateManager.get(store, key)
-            val outputRows = Option(restoredRow).toSeq :+ row
-            numOutputRows += outputRows.size
-            outputRows
-          }
+      val hasInput = iter.hasNext
+      val result = if (!hasInput && keyExpressions.isEmpty) {
+        // If our `keyExpressions` are empty, we're getting a global aggregation. In that case
+        // the `HashAggregateExec` will output a 0 value for the partial merge. We need to
+        // restore the value, so that we don't overwrite our state with a 0 value, but rather
+        // merge the 0 with existing state.
+        store.iterator().map(_.value)
+      } else {
+        iter.flatMap { row =>
+          val key = stateManager.getKey(row.asInstanceOf[UnsafeRow])
+          val restoredRow = stateManager.get(store, key)
+          val outputRows = Option(restoredRow).toSeq :+ row
+          numOutputRows += outputRows.size
+          outputRows
         }
+      }
+      // SPARK-46547 - Release any locks/resources if required, to prevent
+      // deadlocks with the maintenance thread.
+      store.abort()

Review Comment:
   I believe this issue always existed. In the common case, we won't see this that often though. Basically the restore is followed by the save operator which opens the db instance in read-write mode, in the context of the same task thread. At the end of the save operator, we would always release the instance lock. For the `ReadStateStore` invocations though, there is no such functionality (we only abort on task failure) - so the interleaving of the maintenance thread error case (also likely rare) and the execution of StateStoreRDD for the `save` operator causes this deadlock



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46547][SS] Fix deadlock between maintenance thread and streaming aggregation operator [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #44542:
URL: https://github.com/apache/spark/pull/44542#discussion_r1446857416


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##########


Review Comment:
   Actually looked at other places - it does seem that they are fine. Could you please check the pattern once again ?
   
   ```
       child.execute().mapPartitionsWithReadStateStore(
         getStateInfo,
         keyExpressions.toStructType,
         stateManager.getStateValueSchema,
         numColsPrefixKey = 0,
         session.sessionState,
         Some(session.streams.stateStoreCoordinator)) { case (store, iter) =>
   ``
   
   2 spaces are already added in the beginning. Do we need 2 more for the next line statement ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46547][SS] Swallow non-fatal exception in maintenance task to avoid deadlock between maintenance thread and streaming aggregation operator [spark]

Posted by "rangadi (via GitHub)" <gi...@apache.org>.
rangadi commented on code in PR #44542:
URL: https://github.com/apache/spark/pull/44542#discussion_r1446944782


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##########
@@ -233,7 +235,12 @@ private[sql] class RocksDBStateStoreProvider
   }
 
   override def doMaintenance(): Unit = {
-    rocksDB.doMaintenance()
+    try {
+      rocksDB.doMaintenance()
+    } catch {
+      case NonFatal(ex) =>
+        logWarning(s"Error performing maintenance operations with exception=$ex")

Review Comment:
   Please change this to include full stacktrace `logWarning("...", ex)` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46547][SS] Swallow non-fatal exception in maintenance task to avoid deadlock between maintenance thread and streaming aggregation operator [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR closed pull request #44542: [SPARK-46547][SS] Swallow non-fatal exception in maintenance task to avoid deadlock between maintenance thread and streaming aggregation operator
URL: https://github.com/apache/spark/pull/44542


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46547][SS] Fix deadlock between maintenance thread and streaming aggregation operator [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #44542:
URL: https://github.com/apache/spark/pull/44542#discussion_r1446918772


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -452,6 +452,7 @@ class RocksDB(
    * Drop uncommitted changes, and roll back to previous version.
    */
   def rollback(): Unit = {
+    acquire()

Review Comment:
   Discussed with @HeartSaVioR - this is broken. but will fix this in a separate PR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46547][SS] Fix deadlock between maintenance thread and streaming aggregation operator [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #44542:
URL: https://github.com/apache/spark/pull/44542#discussion_r1446918879


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##########


Review Comment:
   Same here - will fix in a separate PR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46547][SS] Fix deadlock between maintenance thread and streaming aggregation operator [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #44542:
URL: https://github.com/apache/spark/pull/44542#discussion_r1445805415


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -84,10 +84,11 @@ trait ReadStateStore {
 
   /**
    * Clean up the resource.
-   *
+   * @param releaseOnly - if true, only release the instance lock and do not run the full abort
+   *                    sequence.
    * The method name is to respect backward compatibility on [[StateStore]].
    */
-  def abort(): Unit
+  def abort(releaseOnly: Boolean = false): Unit

Review Comment:
   This is technically a breaking API change. Also, releasing instance lock is very specific to RocksDB state store provider implementation, which breaks abstraction. It should be probably the last resort.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46547][SS] Fix deadlock between maintenance thread and streaming aggregation operator [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #44542:
URL: https://github.com/apache/spark/pull/44542#discussion_r1445834995


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -84,10 +84,11 @@ trait ReadStateStore {
 
   /**
    * Clean up the resource.
-   *
+   * @param releaseOnly - if true, only release the instance lock and do not run the full abort
+   *                    sequence.
    * The method name is to respect backward compatibility on [[StateStore]].
    */
-  def abort(): Unit
+  def abort(releaseOnly: Boolean = false): Unit

Review Comment:
   Reverted and moved to the exception handling approach. PTAL



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46547][SS] Fix deadlock between maintenance thread and streaming aggregation operator [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on PR #44542:
URL: https://github.com/apache/spark/pull/44542#issuecomment-1872492772

   cc - @HeartSaVioR - PTAL whenever you get a chance, thx !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46547][SS] Fix deadlock between maintenance thread and streaming aggregation operator [spark]

Posted by "rangadi (via GitHub)" <gi...@apache.org>.
rangadi commented on code in PR #44542:
URL: https://github.com/apache/spark/pull/44542#discussion_r1438942186


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##########
@@ -434,22 +434,26 @@ case class StateStoreRestoreExec(
       numColsPrefixKey = 0,
       session.sessionState,
       Some(session.streams.stateStoreCoordinator)) { case (store, iter) =>
-        val hasInput = iter.hasNext
-        if (!hasInput && keyExpressions.isEmpty) {
-          // If our `keyExpressions` are empty, we're getting a global aggregation. In that case
-          // the `HashAggregateExec` will output a 0 value for the partial merge. We need to
-          // restore the value, so that we don't overwrite our state with a 0 value, but rather
-          // merge the 0 with existing state.
-          store.iterator().map(_.value)
-        } else {
-          iter.flatMap { row =>
-            val key = stateManager.getKey(row.asInstanceOf[UnsafeRow])
-            val restoredRow = stateManager.get(store, key)
-            val outputRows = Option(restoredRow).toSeq :+ row
-            numOutputRows += outputRows.size
-            outputRows
-          }
+      val hasInput = iter.hasNext
+      val result = if (!hasInput && keyExpressions.isEmpty) {
+        // If our `keyExpressions` are empty, we're getting a global aggregation. In that case
+        // the `HashAggregateExec` will output a 0 value for the partial merge. We need to
+        // restore the value, so that we don't overwrite our state with a 0 value, but rather
+        // merge the 0 with existing state.
+        store.iterator().map(_.value)
+      } else {
+        iter.flatMap { row =>
+          val key = stateManager.getKey(row.asInstanceOf[UnsafeRow])
+          val restoredRow = stateManager.get(store, key)
+          val outputRows = Option(restoredRow).toSeq :+ row
+          numOutputRows += outputRows.size
+          outputRows
         }
+      }
+      // SPARK-46547 - Release any locks/resources if required, to prevent
+      // deadlocks with the maintenance thread.
+      store.abort()

Review Comment:
   Btw, why was this not an issue before? When are these locks/resources released before this PR?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##########
@@ -434,22 +434,26 @@ case class StateStoreRestoreExec(
       numColsPrefixKey = 0,
       session.sessionState,
       Some(session.streams.stateStoreCoordinator)) { case (store, iter) =>
-        val hasInput = iter.hasNext
-        if (!hasInput && keyExpressions.isEmpty) {
-          // If our `keyExpressions` are empty, we're getting a global aggregation. In that case
-          // the `HashAggregateExec` will output a 0 value for the partial merge. We need to
-          // restore the value, so that we don't overwrite our state with a 0 value, but rather
-          // merge the 0 with existing state.
-          store.iterator().map(_.value)
-        } else {
-          iter.flatMap { row =>
-            val key = stateManager.getKey(row.asInstanceOf[UnsafeRow])
-            val restoredRow = stateManager.get(store, key)
-            val outputRows = Option(restoredRow).toSeq :+ row
-            numOutputRows += outputRows.size
-            outputRows
-          }
+      val hasInput = iter.hasNext

Review Comment:
   Is this spurious change? Seems like indentation changed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46547][SS] Fix deadlock between maintenance thread and streaming aggregation operator [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #44542:
URL: https://github.com/apache/spark/pull/44542#discussion_r1446858015


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -452,6 +452,7 @@ class RocksDB(
    * Drop uncommitted changes, and roll back to previous version.
    */
   def rollback(): Unit = {
+    acquire()

Review Comment:
   This is actually not exactly related to this change. But we had missed this call for this function. Do you prefer to create this in a separate change ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46547][SS] Swallow non-fatal exception in maintenance task to avoid deadlock between maintenance thread and streaming aggregation operator [spark]

Posted by "rangadi (via GitHub)" <gi...@apache.org>.
rangadi commented on code in PR #44542:
URL: https://github.com/apache/spark/pull/44542#discussion_r1446945440


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##########
@@ -233,7 +235,12 @@ private[sql] class RocksDBStateStoreProvider
   }
 
   override def doMaintenance(): Unit = {
-    rocksDB.doMaintenance()
+    try {
+      rocksDB.doMaintenance()
+    } catch {
+      case NonFatal(ex) =>
+        logWarning(s"Error performing maintenance operations with exception=$ex")

Review Comment:
   Also make it explicit that this error is ignored in the log.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46547][SS] Swallow non-fatal exception in maintenance task to avoid deadlock between maintenance thread and streaming aggregation operator [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on PR #44542:
URL: https://github.com/apache/spark/pull/44542#issuecomment-1884371290

   @rangadi 
   
   Please read through my comments in above. Here are links for you:
   
   * https://github.com/apache/spark/pull/44542#issuecomment-1882621196
   * https://github.com/apache/spark/pull/44542#issuecomment-1882649328


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46547][SS] Fix deadlock between maintenance thread and streaming aggregation operator [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #44542:
URL: https://github.com/apache/spark/pull/44542#discussion_r1438964350


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##########
@@ -434,22 +434,26 @@ case class StateStoreRestoreExec(
       numColsPrefixKey = 0,
       session.sessionState,
       Some(session.streams.stateStoreCoordinator)) { case (store, iter) =>
-        val hasInput = iter.hasNext
-        if (!hasInput && keyExpressions.isEmpty) {
-          // If our `keyExpressions` are empty, we're getting a global aggregation. In that case
-          // the `HashAggregateExec` will output a 0 value for the partial merge. We need to
-          // restore the value, so that we don't overwrite our state with a 0 value, but rather
-          // merge the 0 with existing state.
-          store.iterator().map(_.value)
-        } else {
-          iter.flatMap { row =>
-            val key = stateManager.getKey(row.asInstanceOf[UnsafeRow])
-            val restoredRow = stateManager.get(store, key)
-            val outputRows = Option(restoredRow).toSeq :+ row
-            numOutputRows += outputRows.size
-            outputRows
-          }
+      val hasInput = iter.hasNext
+      val result = if (!hasInput && keyExpressions.isEmpty) {
+        // If our `keyExpressions` are empty, we're getting a global aggregation. In that case
+        // the `HashAggregateExec` will output a 0 value for the partial merge. We need to
+        // restore the value, so that we don't overwrite our state with a 0 value, but rather
+        // merge the 0 with existing state.
+        store.iterator().map(_.value)
+      } else {
+        iter.flatMap { row =>
+          val key = stateManager.getKey(row.asInstanceOf[UnsafeRow])
+          val restoredRow = stateManager.get(store, key)
+          val outputRows = Option(restoredRow).toSeq :+ row
+          numOutputRows += outputRows.size
+          outputRows
         }
+      }
+      // SPARK-46547 - Release any locks/resources if required, to prevent
+      // deadlocks with the maintenance thread.
+      store.abort()

Review Comment:
   I believe this issue always existed. In the common case, we won't see this that often though. Basically the restore is followed by the save operator which opens the db instance in read-write mode. At the end of the save operator, we would always release the instance lock. For the `ReadStateStore` invocations though, there is no such functionality (we only abort on task failure) - so the interleaving of the maintenance thread error case (also likely rare) and the execution of StateStoreRDD for the `save` operator causes this deadlock



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46547][SS] Fix deadlock between maintenance thread and streaming aggregation operator [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #44542:
URL: https://github.com/apache/spark/pull/44542#discussion_r1438963938


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##########
@@ -434,22 +434,26 @@ case class StateStoreRestoreExec(
       numColsPrefixKey = 0,
       session.sessionState,
       Some(session.streams.stateStoreCoordinator)) { case (store, iter) =>
-        val hasInput = iter.hasNext
-        if (!hasInput && keyExpressions.isEmpty) {
-          // If our `keyExpressions` are empty, we're getting a global aggregation. In that case
-          // the `HashAggregateExec` will output a 0 value for the partial merge. We need to
-          // restore the value, so that we don't overwrite our state with a 0 value, but rather
-          // merge the 0 with existing state.
-          store.iterator().map(_.value)
-        } else {
-          iter.flatMap { row =>
-            val key = stateManager.getKey(row.asInstanceOf[UnsafeRow])
-            val restoredRow = stateManager.get(store, key)
-            val outputRows = Option(restoredRow).toSeq :+ row
-            numOutputRows += outputRows.size
-            outputRows
-          }
+      val hasInput = iter.hasNext

Review Comment:
   Yea just fixed the indentation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46547][SS] Fix deadlock between maintenance thread and streaming aggregation operator [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #44542:
URL: https://github.com/apache/spark/pull/44542#discussion_r1446853378


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##########


Review Comment:
   If you look at the indent pattern - the braces seem off. It seems this pattern is used through this file though. Ok - will just update to original



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46547][SS] Fix deadlock between maintenance thread and streaming aggregation operator [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #44542:
URL: https://github.com/apache/spark/pull/44542#discussion_r1446779347


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -452,6 +452,7 @@ class RocksDB(
    * Drop uncommitted changes, and roll back to previous version.
    */
   def rollback(): Unit = {
+    acquire()

Review Comment:
   nit: This is now a side-effect change. Do we want to keep this or just remove this? I feel like it's safer to add acquire() here, but once you update the PR title and description to reflect the new direction, this change is beyond the scope.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##########


Review Comment:
   nit: revert the change, previous indentation seems to be right



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46547][SS] Swallow non-fatal exception in maintenance task to avoid deadlock between maintenance thread and streaming aggregation operator [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on PR #44542:
URL: https://github.com/apache/spark/pull/44542#issuecomment-1884933933

   https://github.com/anishshri-db/spark/actions/runs/7471692378/job/20332452975
   Failing module is unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46547][SS] Fix deadlock between maintenance thread and streaming aggregation operator [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on PR #44542:
URL: https://github.com/apache/spark/pull/44542#issuecomment-1882649328

   That said, what we really need to fix is the behavior when maintenance task fails. A single streaming query using faulty state store provider implementation can lead every other stateful queries to fail in the same executor. While this is really something we have to fix (maybe only close the state store provider for the faulty one), but as I commented earlier, if we do not think the failure of maintenance task in RocksDB state store provider impacts the actual state store (provider), we should just swallow the exception.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org