You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ka...@apache.org on 2024/03/29 04:23:27 UTC

(spark) branch master updated: [SPARK-47568][SS] Fix race condition between maintenance thread and load/commit for snapshot files

This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 0b844e52b35b [SPARK-47568][SS] Fix race condition between maintenance thread and load/commit for snapshot files
0b844e52b35b is described below

commit 0b844e52b35b0491717ba9f0ae8fe2e0cf45e88d
Author: Bhuwan Sahni <bh...@databricks.com>
AuthorDate: Fri Mar 29 13:23:15 2024 +0900

    [SPARK-47568][SS] Fix race condition between maintenance thread and load/commit for snapshot files
    
    ### What changes were proposed in this pull request?
    
    This PR fixes a race condition between the maintenance thread and task thread when change-log checkpointing is enabled, and ensure all snapshots are valid.
    
    1. The maintenance thread currently relies on class variable lastSnapshot to find the latest checkpoint and uploads it to DFS. This checkpoint can be modified at commit time by Task thread if a new snapshot is created.
    2. The task thread was not resetting the lastSnapshot at load time, which can result in newer snapshots (if a old version is loaded) being considered valid and uploaded to DFS. This results in VersionIdMismatch errors.
    
    ### Why are the changes needed?
    
    These are logical bugs which can cause `VersionIdMismatch` errors causing user to discard the snapshot and restart the query.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Added unit test cases.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #45724 from sahnib/rocks-db-fix.
    
    Authored-by: Bhuwan Sahni <bh...@databricks.com>
    Signed-off-by: Jungtaek Lim <ka...@gmail.com>
---
 .../sql/execution/streaming/state/RocksDB.scala    | 66 ++++++++++++++--------
 .../streaming/state/RocksDBFileManager.scala       |  3 +-
 .../execution/streaming/state/RocksDBSuite.scala   | 37 ++++++++++++
 3 files changed, 82 insertions(+), 24 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index 1817104a5c22..fcefc1666f3a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.streaming.state
 
 import java.io.File
 import java.util.Locale
+import java.util.concurrent.TimeUnit
 import javax.annotation.concurrent.GuardedBy
 
 import scala.collection.{mutable, Map}
@@ -49,6 +50,7 @@ case object RollbackStore extends RocksDBOpType("rollback_store")
 case object CloseStore extends RocksDBOpType("close_store")
 case object ReportStoreMetrics extends RocksDBOpType("report_store_metrics")
 case object StoreTaskCompletionListener extends RocksDBOpType("store_task_completion_listener")
+case object StoreMaintenance extends RocksDBOpType("store_maintenance")
 
 /**
  * Class representing a RocksDB instance that checkpoints version of data to DFS.
@@ -184,19 +186,23 @@ class RocksDB(
         loadedVersion = latestSnapshotVersion
 
         // reset last snapshot version
-        lastSnapshotVersion = 0L
+        if (lastSnapshotVersion > latestSnapshotVersion) {
+          // discard any newer snapshots
+          lastSnapshotVersion = 0L
+          latestSnapshot = None
+        }
         openDB()
 
         numKeysOnWritingVersion = if (!conf.trackTotalNumberOfRows) {
-          // we don't track the total number of rows - discard the number being track
-          -1L
-        } else if (metadata.numKeys < 0) {
-          // we track the total number of rows, but the snapshot doesn't have tracking number
-          // need to count keys now
-          countKeys()
-        } else {
-          metadata.numKeys
-        }
+            // we don't track the total number of rows - discard the number being track
+            -1L
+          } else if (metadata.numKeys < 0) {
+            // we track the total number of rows, but the snapshot doesn't have tracking number
+            // need to count keys now
+            countKeys()
+          } else {
+            metadata.numKeys
+          }
         if (loadedVersion != version) replayChangelog(version)
         // After changelog replay the numKeysOnWritingVersion will be updated to
         // the correct number of keys in the loaded version.
@@ -571,16 +577,14 @@ class RocksDB(
           // background operations.
           val cp = Checkpoint.create(db)
           cp.createCheckpoint(checkpointDir.toString)
-          synchronized {
-            // if changelog checkpointing is disabled, the snapshot is uploaded synchronously
-            // inside the uploadSnapshot() called below.
-            // If changelog checkpointing is enabled, snapshot will be uploaded asynchronously
-            // during state store maintenance.
-            latestSnapshot.foreach(_.close())
-            latestSnapshot = Some(
-              RocksDBSnapshot(checkpointDir, newVersion, numKeysOnWritingVersion))
-            lastSnapshotVersion = newVersion
-          }
+          // if changelog checkpointing is disabled, the snapshot is uploaded synchronously
+          // inside the uploadSnapshot() called below.
+          // If changelog checkpointing is enabled, snapshot will be uploaded asynchronously
+          // during state store maintenance.
+          latestSnapshot.foreach(_.close())
+          latestSnapshot = Some(
+            RocksDBSnapshot(checkpointDir, newVersion, numKeysOnWritingVersion))
+          lastSnapshotVersion = newVersion
         }
       }
 
@@ -668,7 +672,20 @@ class RocksDB(
 
   def doMaintenance(): Unit = {
     if (enableChangelogCheckpointing) {
-      uploadSnapshot()
+      // There is race to update latestSnapshot between load(), commit()
+      // and uploadSnapshot().
+      // The load method will reset latestSnapshot to discard any snapshots taken
+      // from newer versions (when a old version is reloaded).
+      // commit() method deletes the existing snapshot while creating a new snapshot.
+      // In order to ensure that the snapshot being uploaded would not be modified
+      // concurrently, we need to synchronize the snapshot access between task thread
+      // and maintenance thread.
+      acquire(StoreMaintenance)
+      try {
+        uploadSnapshot()
+      } finally {
+        release(StoreMaintenance)
+      }
     }
     val cleanupTime = timeTakenMs {
       fileManager.deleteOldVersions(conf.minVersionsToRetain)
@@ -788,8 +805,11 @@ class RocksDB(
    */
   private def acquire(opType: RocksDBOpType): Unit = acquireLock.synchronized {
     val newAcquiredThreadInfo = AcquiredThreadInfo()
-    val waitStartTime = System.currentTimeMillis
-    def timeWaitedMs = System.currentTimeMillis - waitStartTime
+    val waitStartTime = System.nanoTime()
+    def timeWaitedMs = {
+      val elapsedNanos = System.nanoTime() - waitStartTime
+      TimeUnit.MILLISECONDS.convert(elapsedNanos, TimeUnit.NANOSECONDS)
+    }
     def isAcquiredByDifferentThread = acquiredThreadInfo != null &&
       acquiredThreadInfo.threadRef.get.isDefined &&
       newAcquiredThreadInfo.threadRef.get.get.getId != acquiredThreadInfo.threadRef.get.get.getId
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
index 18c769bed350..bd1daa48f809 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
@@ -554,6 +554,7 @@ class RocksDBFileManager(
     // Delete unnecessary local immutable files
     localImmutableFiles
       .foreach { existingFile =>
+        val existingFileSize = existingFile.length()
         val requiredFile = requiredFileNameToFileDetails.get(existingFile.getName)
         val prevDfsFile = localFilesToDfsFiles.asScala.get(existingFile.getName)
         val isSameFile = if (requiredFile.isDefined && prevDfsFile.isDefined) {
@@ -566,7 +567,7 @@ class RocksDBFileManager(
         if (!isSameFile) {
           existingFile.delete()
           localFilesToDfsFiles.remove(existingFile.getName)
-          logInfo(s"Deleted local file $existingFile with size ${existingFile.length()} mapped" +
+          logInfo(s"Deleted local file $existingFile with size $existingFileSize mapped" +
             s" to previous dfsFile ${prevDfsFile.getOrElse("null")}")
         } else {
           logInfo(s"reusing $prevDfsFile present at $existingFile for $requiredFile")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
index f3393ac10b29..d98b02c2a897 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
@@ -383,6 +383,14 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared
         db.load(version, readOnly = true)
         assert(db.iterator().map(toStr).toSet === Set((version.toString, version.toString)))
       }
+
+      // recommit 60 to ensure that acquireLock is released for maintenance
+      for (version <- 60 to 60) {
+        db.load(version - 1)
+        db.put(version.toString, version.toString)
+        db.remove((version - 1).toString)
+        db.commit()
+      }
       // Check that snapshots and changelogs get purged correctly.
       db.doMaintenance()
       assert(snapshotVersionsPresent(remoteDir) === Seq(30, 60))
@@ -1919,6 +1927,35 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared
     }
   }
 
+  testWithChangelogCheckpointingEnabled("time travel 4 -" +
+    " validate successful RocksDB load") {
+    val remoteDir = Utils.createTempDir().toString
+    val conf = dbConf.copy(minDeltasForSnapshot = 2, compactOnCommit = false)
+    new File(remoteDir).delete() // to make sure that the directory gets created
+    withDB(remoteDir, conf = conf) { db =>
+      for (version <- 0 to 1) {
+        db.load(version)
+        db.put(version.toString, version.toString)
+        db.commit()
+      }
+
+      // load previous version, and recreate the snapshot
+      db.load(1)
+      db.put("3", "3")
+
+      // do maintenance - upload any latest snapshots so far
+      // would fail to acquire lock and no snapshots would be uploaded
+      db.doMaintenance()
+      db.commit()
+      // upload newly created snapshot 2.zip
+      db.doMaintenance()
+    }
+
+    // reload version 2 - should succeed
+    withDB(remoteDir, version = 2, conf = conf) { db =>
+    }
+  }
+
   test("validate Rocks DB SST files do not have a VersionIdMismatch" +
     " when metadata file is not overwritten - scenario 1") {
     val fmClass = "org.apache.spark.sql.execution.streaming.state." +


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org