You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ka...@apache.org on 2023/06/09 11:49:45 UTC

[spark] branch branch-3.4 updated: [SPARK-43404][SS][3.4] Skip reusing sst file for same version of RocksDB state store to avoid id mismatch error

This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new 238da78766f [SPARK-43404][SS][3.4] Skip reusing sst file for same version of RocksDB state store to avoid id mismatch error
238da78766f is described below

commit 238da78766f146955d953014dc9cbf85ded1d075
Author: Anish Shrigondekar <an...@databricks.com>
AuthorDate: Fri Jun 9 20:49:25 2023 +0900

    [SPARK-43404][SS][3.4] Skip reusing sst file for same version of RocksDB state store to avoid id mismatch error
    
    NOTE: This ports back the commit https://github.com/apache/spark/commit/d3b9f4e7b1be7d89466cf80c3e38890c7add7625 (PR https://github.com/apache/spark/pull/41089) to branch-3.4. This is a clean cherry-pick.
    
    ### What changes were proposed in this pull request?
    Skip reusing sst file for same version of RocksDB state store to avoid id mismatch error
    
    ### Why are the changes needed?
    In case of task retry on the same executor, its possible that the original task completed the phase of creating the SST files and uploading them to the object store. In this case, we also might have added an entry to the in-memory map for `versionToRocksDBFiles` for the given version. When the retry task creates the local checkpoint, its possible the file name and size is the same, but the metadata ID embedded within the file may be different. So, when we try to load this version on s [...]
    
    ```
    Mismatch in unique ID on table file 24220. Expected: {9692563551998415634,4655083329411385714} Actual: {9692563551998415639,10299185534092933087} in file /local_disk0/spark-f58a741d-576f-400c-9b56-53497745ac01/executor-18e08e59-20e8-4a00-bd7e-94ad4599150b/spark-5d980399-3425-4951-894a-808b943054ea/StateStoreId(opId=2147483648,partId=53,name=default)-d89e082e-4e33-4371-8efd-78d927ad3ba3/workingDir-9928750e-f648-4013-a300-ac96cb6ec139/MANIFEST-024212
    ```
    
    This change avoids reusing files for the same version on the same host based on the map entries to reduce the chance of running into the error above.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Unit test
    
    RocksDBSuite
    ```
    [info] Run completed in 35 seconds, 995 milliseconds.
    [info] Total number of tests run: 33
    [info] Suites: completed 1, aborted 0
    [info] Tests: succeeded 33, failed 0, canceled 0, ignored 0, pending 0
    [info] All tests passed.
    ```
    
    Closes #41530 from HeartSaVioR/SPARK-43404-3.4.
    
    Authored-by: Anish Shrigondekar <an...@databricks.com>
    Signed-off-by: Jungtaek Lim <ka...@gmail.com>
---
 .../sql/execution/streaming/state/RocksDBFileManager.scala   |  3 ++-
 .../spark/sql/execution/streaming/state/RocksDBSuite.scala   | 12 ++++++------
 2 files changed, 8 insertions(+), 7 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
index 26084747c32..6a736beffbf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
@@ -318,7 +318,8 @@ class RocksDBFileManager(
     // Get the immutable files used in previous versions, as some of those uploaded files can be
     // reused for this version
     logInfo(s"Saving RocksDB files to DFS for $version")
-    val prevFilesToSizes = versionToRocksDBFiles.values.asScala.flatten.map { f =>
+    val prevFilesToSizes = versionToRocksDBFiles.asScala.filterKeys(_ < version)
+      .values.flatten.map { f =>
       f.localFileName -> f
     }.toMap
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
index dd426b8e92b..ae278bc7307 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
@@ -239,19 +239,19 @@ class RocksDBSuite extends SparkFunSuite {
       // Save SAME version again with different checkpoint files and load back again to verify
       // whether files were overwritten.
       val cpFiles1_ = Seq(
-        "sst-file1.sst" -> 10, // same SST file as before, should not get copied
+        "sst-file1.sst" -> 10, // same SST file as before, but same version, so should get copied
         "sst-file2.sst" -> 25, // new SST file with same name as before, but different length
         "sst-file3.sst" -> 30, // new SST file
         "other-file1" -> 100, // same non-SST file as before, should not get copied
         "other-file2" -> 210, // new non-SST file with same name as before, but different length
         "other-file3" -> 300, // new non-SST file
-        "archive/00001.log" -> 1000, // same log file as before, should not get copied
+        "archive/00001.log" -> 1000, // same log file as before and version, so should get copied
         "archive/00002.log" -> 2500, // new log file with same name as before, but different length
         "archive/00003.log" -> 3000 // new log file
       )
       saveCheckpointFiles(fileManager, cpFiles1_, version = 1, numKeys = 1001)
-      assert(numRemoteSSTFiles === 4, "shouldn't copy same files again") // 2 old + 2 new SST files
-      assert(numRemoteLogFiles === 4, "shouldn't copy same files again") // 2 old + 2 new log files
+      assert(numRemoteSSTFiles === 5, "shouldn't copy same files again") // 2 old + 3 new SST files
+      assert(numRemoteLogFiles === 5, "shouldn't copy same files again") // 2 old + 3 new log files
       loadAndVerifyCheckpointFiles(fileManager, verificationDir, version = 1, cpFiles1_, 1001)
 
       // Save another version and verify
@@ -261,8 +261,8 @@ class RocksDBSuite extends SparkFunSuite {
         "archive/00004.log" -> 4000
       )
       saveCheckpointFiles(fileManager, cpFiles2, version = 2, numKeys = 1501)
-      assert(numRemoteSSTFiles === 5) // 1 new file over earlier 4 files
-      assert(numRemoteLogFiles === 5) // 1 new file over earlier 4 files
+      assert(numRemoteSSTFiles === 6) // 1 new file over earlier 5 files
+      assert(numRemoteLogFiles === 6) // 1 new file over earlier 5 files
       loadAndVerifyCheckpointFiles(fileManager, verificationDir, version = 2, cpFiles2, 1501)
 
       // Loading an older version should work


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org