You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "anishshri-db (via GitHub)" <gi...@apache.org> on 2023/12/04 21:15:35 UTC

[PR] [SPARK-46249] Require instance lock for acquiring RocksDB metrics to prevent race with background operations [spark]

anishshri-db opened a new pull request, #44165:
URL: https://github.com/apache/spark/pull/44165

   ### What changes were proposed in this pull request?
   Require instance lock for acquiring RocksDB metrics to prevent race with background operations
   
   
   ### Why are the changes needed?
   The changes are needed to avoid races where the statefulOperator tries to set storeMetrics after the commit and the DB instance has already been closed/aborted/reloaded.
   We have seen a few query failures with the following stack trace due to this reason:
   ```
           org.apache.spark.sql.streaming.StreamingQueryException: Job aborted due to stage failure: Task 3 in stage 531.0 failed 1 times, most recent failure: Lost task 3.0 in stage 531.0 (TID 1544) (ip-10-110-29-251.us-west-2.compute.internal executor driver): java.lang.NullPointerException
   	at org.apache.spark.sql.execution.streaming.state.RocksDB.getDBProperty(RocksDB.scala:838)
   	at org.apache.spark.sql.execution.streaming.state.RocksDB.metrics(RocksDB.scala:678)
   	at org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider$RocksDBStateStore.metrics(RocksDBStateStoreProvider.scala:137)
   	at org.apache.spark.sql.execution.streaming.StateStoreWriter.setStoreMetrics(statefulOperators.scala:198)
   	at org.apache.spark.sql.execution.streaming.StateStoreWriter.setStoreMetrics$(statefulOperators.scala:197)
   	at org.apache.spark.sql.execution.streaming.StateStoreSaveExec.setStoreMetrics(statefulOperators.scala:495)
   	at org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anon$2.close(statefulOperators.scala:626)
   	at org.apache.spark.util.NextIterator.closeIfNeeded(NextIterator.scala:66)
   	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:75)
   	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.hashAgg_doAggregateWithKeys_0$(Unknown Source)
   	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.processNext(Unknown Source)
   	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
   	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
   	at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.$anonfun$run$1(WriteToDataSourceV2Exec.scala:498)
   	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1743)
   	at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run(WriteToDataSourceV2Exec.scala:552)
   	at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run$(WriteToDataSourceV2Exec.scala:482)
   	at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:557)
   	at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:445)
   	at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:82)
   	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
   	at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:82)
   	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
   	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
   	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:196)
   	at org.apache.spark.scheduler.Task.doRunTask(Task.scala:181)
   	at org.apache.spark.scheduler.Task.$anonfun$run$5(Task.scala:146)
   	at com.databricks.unity.EmptyHandle$.runWithAndClose(UCSHandle.scala:125)
   	at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:146)
   ```
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   
   ### How was this patch tested?
   Modified existing unit tests
   
   ```
   [info] Run completed in 1 minute, 31 seconds.
   [info] Total number of tests run: 150
   [info] Suites: completed 1, aborted 0
   [info] Tests: succeeded 150, failed 0, canceled 0, ignored 0, pending 0
   [info] All tests passed.
   ```
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   No
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46249][SS] Require instance lock for acquiring RocksDB metrics to prevent race with background operations [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR closed pull request #44165: [SPARK-46249][SS] Require instance lock for acquiring RocksDB metrics to prevent race with background operations
URL: https://github.com/apache/spark/pull/44165


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46249][SS] Require instance lock for acquiring RocksDB metrics to prevent race with background operations [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #44165:
URL: https://github.com/apache/spark/pull/44165#discussion_r1414814387


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala:
##########
@@ -131,19 +131,21 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid
     }
     withSQLConf(SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1") {
       tryWithProviderResource(newStoreProvider()) { provider =>
-          val store = provider.getStore(0)

Review Comment:
   Nice fix on indentation :)



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala:
##########
@@ -131,19 +131,21 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid
     }
     withSQLConf(SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1") {
       tryWithProviderResource(newStoreProvider()) { provider =>
-          val store = provider.getStore(0)
-          // Verify state after updating
-          put(store, "a", 0, 1)
-          assert(get(store, "a", 0) === Some(1))
-          assert(store.commit() === 1)
-          provider.doMaintenance()
-          assert(store.hasCommitted)
-          val storeMetrics = store.metrics
-          assert(storeMetrics.numKeys === 1)
+        val store = provider.getStore(0)
+        // Verify state after updating
+        put(store, "a", 0, 1)
+        assert(get(store, "a", 0) === Some(1))
+        assert(store.commit() === 1)
+        provider.doMaintenance()
+        assert(store.hasCommitted)
+        val storeMetrics = store.metrics
+        assert(storeMetrics.numKeys === 1)
+        if (!isChangelogCheckpointingEnabled) {

Review Comment:
   Probably better to leave a code comment for future readers.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46249][SS] Require instance lock for acquiring RocksDB metrics to prevent race with background operations [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on PR #44165:
URL: https://github.com/apache/spark/pull/44165#issuecomment-1839490380

   cc - @HeartSaVioR - PTAL, thx !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46249][SS] Require instance lock for acquiring RocksDB metrics to prevent race with background operations [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on PR #44165:
URL: https://github.com/apache/spark/pull/44165#issuecomment-1840065025

   Thanks! Merging to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46249][SS] Require instance lock for acquiring RocksDB metrics to prevent race with background operations [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #44165:
URL: https://github.com/apache/spark/pull/44165#discussion_r1414783355


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -137,6 +137,10 @@ class RocksDB(
   @volatile private var numKeysOnWritingVersion = 0L
   @volatile private var fileManagerMetrics = RocksDBFileManagerMetrics.EMPTY_METRICS
 
+  // SPARK-46249 - Keep track of recorded metrics per version which can be used for querying later
+  // Updates and access to recordedMetrics are protected by the DB instance lock
+  @volatile private var recordedMetrics: Option[RocksDBMetrics] = None

Review Comment:
   Let's explicitly add annotation for which lock the field depends on, like below field.
   
   `@GuardedBy("acquireLock")`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46249][SS] Require instance lock for acquiring RocksDB metrics to prevent race with background operations [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #44165:
URL: https://github.com/apache/spark/pull/44165#discussion_r1414787796


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -137,6 +137,10 @@ class RocksDB(
   @volatile private var numKeysOnWritingVersion = 0L
   @volatile private var fileManagerMetrics = RocksDBFileManagerMetrics.EMPTY_METRICS
 
+  // SPARK-46249 - Keep track of recorded metrics per version which can be used for querying later
+  // Updates and access to recordedMetrics are protected by the DB instance lock
+  @volatile private var recordedMetrics: Option[RocksDBMetrics] = None

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46249][SS] Require instance lock for acquiring RocksDB metrics to prevent race with background operations [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #44165:
URL: https://github.com/apache/spark/pull/44165#discussion_r1414826922


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala:
##########
@@ -131,19 +131,21 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid
     }
     withSQLConf(SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1") {
       tryWithProviderResource(newStoreProvider()) { provider =>
-          val store = provider.getStore(0)
-          // Verify state after updating
-          put(store, "a", 0, 1)
-          assert(get(store, "a", 0) === Some(1))
-          assert(store.commit() === 1)
-          provider.doMaintenance()
-          assert(store.hasCommitted)
-          val storeMetrics = store.metrics
-          assert(storeMetrics.numKeys === 1)
+        val store = provider.getStore(0)
+        // Verify state after updating
+        put(store, "a", 0, 1)
+        assert(get(store, "a", 0) === Some(1))
+        assert(store.commit() === 1)
+        provider.doMaintenance()
+        assert(store.hasCommitted)
+        val storeMetrics = store.metrics
+        assert(storeMetrics.numKeys === 1)
+        if (!isChangelogCheckpointingEnabled) {

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46249][SS] Require instance lock for acquiring RocksDB metrics to prevent race with background operations [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on PR #44165:
URL: https://github.com/apache/spark/pull/44165#issuecomment-1840064037

   This only failed in docker integration test suite which is unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org