You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ka...@apache.org on 2023/06/01 05:34:47 UTC

[spark] branch master updated: [SPARK-43421][SS] Implement Changelog based Checkpointing for RocksDB State Store Provider

This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new c02c8be43c6 [SPARK-43421][SS] Implement Changelog based Checkpointing for RocksDB State Store Provider
c02c8be43c6 is described below

commit c02c8be43c64dbd6bffceb53f8b18cd19a0d2f2e
Author: Chaoqin Li <ch...@databricks.com>
AuthorDate: Thu Jun 1 14:34:24 2023 +0900

    [SPARK-43421][SS] Implement Changelog based Checkpointing for RocksDB State Store Provider
    
    ### What changes were proposed in this pull request?
    In order to reduce the checkpoint duration and end to end latency, we propose Changelog Based Checkpointing for RocksDB State Store Provider. Below is the mechanism.
    1. Changelog checkpoint: Upon each put() delete() call to local rocksdb instance, log the operation to a changelog file. During the state change commit,  sync the compressed change log of the current batch to DFS as checkpointDir/{version}.delta.
    2. Version reconstruction: For version j, find latest snapshot i.zip such that i <= j, load snapshot i, and replay i+1.delta ~ j.delta. This is used in loading the initial state as well as creating the latest version snapshot. Note: If a query is shutdown without exception, there won’t be changelog replay during query restart because a maintenance task is executed before the state store instance is unloaded.
    3. Background snapshot: A maintenance thread in executors will launch maintenance tasks periodically. Inside the maintenance task, sync the latest RocksDB local snapshot to DFS as checkpointDir/{version}.zip. Snapshot enables faster failure recovery and allows old versions to be purged.
    4. Garbage collection: Inside the maintenance task, delete snapshot and delta files from DFS for versions that is out of retention range(default retained version number is 100)
    
    ### Why are the changes needed?
    We have identified state checkpointing latency as one of the major performance bottlenecks for stateful streaming queries. Currently, RocksDB state store pauses the RocksDB instances to upload a snapshot to the cloud when committing a batch, which is heavy weight and has unpredictable performance.
    With changelog based checkpointing, we allow the RocksDB instance to run uninterruptibly, which improves RocksDB operation performance. This also dramatically reduces the commit time and batch duration because we are uploading a smaller amount of data during state commit. With this change, stateful query with RocksDB state store will have lower and more predictable latency.
    
    ### How was this patch tested?
    Add unit test for changelog checkpointing utility.
    Add unit test and integration test that check backward compatibility with existing checkpoint.
    Enable RocksDB state store unit test and stateful streaming query integration test to run with changelog checkpointing enabled.
    
    Closes #41099 from chaoqin-li1123/changelog.
    
    Authored-by: Chaoqin Li <ch...@databricks.com>
    Signed-off-by: Jungtaek Lim <ka...@gmail.com>
---
 docs/structured-streaming-programming-guide.md     |  18 +
 .../sql/execution/streaming/state/RocksDB.scala    | 187 ++++++++--
 .../streaming/state/RocksDBFileManager.scala       | 119 ++++--
 .../state/RocksDBStateStoreProvider.scala          |   8 +-
 .../streaming/state/StateStoreChangelog.scala      | 167 +++++++++
 .../state/RocksDBStateStoreIntegrationSuite.scala  |  60 ++-
 .../streaming/state/RocksDBStateStoreSuite.scala   |  80 +++-
 .../execution/streaming/state/RocksDBSuite.scala   | 402 +++++++++++++++++----
 .../state/StateStoreCompatibilitySuite.scala       |   2 +-
 .../streaming/state/StateStoreSuite.scala          | 109 +++---
 .../streaming/FlatMapGroupsWithStateSuite.scala    |   3 +
 .../sql/streaming/RocksDBStateStoreTest.scala      |  52 +++
 .../sql/streaming/StreamingAggregationSuite.scala  |   3 +
 .../streaming/StreamingDeduplicationSuite.scala    |   3 +
 14 files changed, 1012 insertions(+), 201 deletions(-)

diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md
index 267596a3899..53d5919d4dc 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -2320,6 +2320,11 @@ Here are the configs regarding to RocksDB instance of the state store provider:
     <td>Whether we perform a range compaction of RocksDB instance for commit operation</td>
     <td>False</td>
   </tr>
+  <tr>
+    <td>spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled</td>
+    <td>Whether to upload changelog instead of snapshot during RocksDB StateStore commit</td>
+    <td>False</td>
+  </tr>
   <tr>
     <td>spark.sql.streaming.stateStore.rocksdb.blockSizeKB</td>
     <td>Approximate size in KB of user data packed per block for a RocksDB BlockBasedTable, which is a RocksDB's default SST file format.</td>
@@ -2389,6 +2394,19 @@ If you want to cap RocksDB memory usage in your Spark Structured Streaming deplo
 You can also determine the max allowed memory for RocksDB instances by setting the `spark.sql.streaming.stateStore.rocksdb.maxMemoryUsageMB` value to a static number or as a fraction of the physical memory available on the node.
 Limits for individual RocksDB instances can also be configured by setting `spark.sql.streaming.stateStore.rocksdb.writeBufferSizeMB` and `spark.sql.streaming.stateStore.rocksdb.maxWriteBufferNumber` to the required values. By default, RocksDB internal defaults are used for these settings.
 
+##### RocksDB State Store Changelog Checkpointing
+In newer version of Spark, changelog checkpointing is introduced for RocksDB state store. The traditional checkpointing mechanism for RocksDB State Store is incremental snapshot checkpointing, where the manifest files and newly generated RocksDB SST files of RocksDB instances are uploaded to a durable storage.
+Instead of uploading data files of RocksDB instances, changelog checkpointing uploads changes made to the state since the last checkpoint for durability.
+Snapshots are persisted periodically in the background for predictable failure recovery and changelog trimming.
+Changelog checkpointing avoids cost of capturing and uploading snapshots of RocksDB instances and significantly reduce streaming query latency.
+
+Changelog checkpointing is disabled by default. You can enable RocksDB State Store changelog checkpointing by setting `spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled` config to `true`.
+Changelog checkpointing is designed to be backward compatible with traditional checkpointing mechanism.
+RocksDB state store provider offers seamless support for transitioning between two checkpointing mechanisms in both directions. This allows you to leverage the performance benefits of changelog checkpointing without discarding the old state checkpoint.
+In a version of spark that supports changelog checkpointing, you can migrate streaming queries from older versions of Spark to changelog checkpointing by enabling changelog checkpointing in the spark session.
+Vice versa, you can disable changelog checkpointing safely in newer version of Spark, then any query that already run with changelog checkpointing will switch back to traditional checkpointing.
+You would need to restart you streaming queries for change in checkpointing mechanism to be applied, but you won't observe any performance degrade in the process.
+
 ##### Performance-aspect considerations
 
 1. You may want to disable the track of total number of rows to aim the better performance on RocksDB state store.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index 84c6963ab0d..a9c15cf7f7d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -56,6 +56,15 @@ class RocksDB(
     hadoopConf: Configuration = new Configuration,
     loggingId: String = "") extends Logging {
 
+  case class RocksDBSnapshot(checkpointDir: File, version: Long, numKeys: Long) {
+    def close(): Unit = {
+      silentDeleteRecursively(checkpointDir, s"Free up local checkpoint of snapshot $version")
+    }
+  }
+
+  @volatile private var latestSnapshot: Option[RocksDBSnapshot] = None
+  @volatile private var lastSnapshotVersion = 0L
+
   RocksDBLoader.loadLibrary()
 
   // Java wrapper objects linking to native RocksDB objects
@@ -109,13 +118,15 @@ class RocksDB(
   private val nativeStats = dbOptions.statistics()
 
   private val workingDir = createTempDir("workingDir")
-  private val fileManager = new RocksDBFileManager(
-    dfsRootDir, createTempDir("fileManager"), hadoopConf, loggingId = loggingId)
+  private val fileManager = new RocksDBFileManager(dfsRootDir, createTempDir("fileManager"),
+    hadoopConf, conf.compressionCodec, loggingId = loggingId)
   private val byteArrayPair = new ByteArrayPair()
   private val commitLatencyMs = new mutable.HashMap[String, Long]()
   private val acquireLock = new Object
 
   @volatile private var db: NativeRocksDB = _
+  @volatile private var changelogWriter: Option[StateStoreChangelogWriter] = None
+  private val enableChangelogCheckpointing: Boolean = conf.enableChangelogCheckpointing
   @volatile private var loadedVersion = -1L   // -1 = nothing valid is loaded
   @volatile private var numKeysOnLoadedVersion = 0L
   @volatile private var numKeysOnWritingVersion = 0L
@@ -129,17 +140,20 @@ class RocksDB(
    * Note that this will copy all the necessary file from DFS to local disk as needed,
    * and possibly restart the native RocksDB instance.
    */
-  def load(version: Long): RocksDB = {
+  def load(version: Long, readOnly: Boolean = false): RocksDB = {
     assert(version >= 0)
     acquire()
     logInfo(s"Loading $version")
     try {
       if (loadedVersion != version) {
         closeDB()
-        val metadata = fileManager.loadCheckpointFromDfs(version, workingDir)
+        val latestSnapshotVersion = fileManager.getLatestSnapshotVersion(version)
+        val metadata = fileManager.loadCheckpointFromDfs(latestSnapshotVersion, workingDir)
+        loadedVersion = latestSnapshotVersion
+
         openDB()
 
-        val numKeys = if (!conf.trackTotalNumberOfRows) {
+        numKeysOnWritingVersion = if (!conf.trackTotalNumberOfRows) {
           // we don't track the total number of rows - discard the number being track
           -1L
         } else if (metadata.numKeys < 0) {
@@ -149,10 +163,10 @@ class RocksDB(
         } else {
           metadata.numKeys
         }
-        numKeysOnWritingVersion = numKeys
-        numKeysOnLoadedVersion = numKeys
-
-        loadedVersion = version
+        if (loadedVersion != version) replayChangelog(version)
+        // After changelog replay the numKeysOnWritingVersion will be updated to
+        // the correct number of keys in the loaded version.
+        numKeysOnLoadedVersion = numKeysOnWritingVersion
         fileManagerMetrics = fileManager.latestLoadCheckpointMetrics
       }
       if (conf.resetStatsOnLoad) {
@@ -164,9 +178,36 @@ class RocksDB(
         loadedVersion = -1  // invalidate loaded data
         throw t
     }
+    if (enableChangelogCheckpointing && !readOnly) {
+      // Make sure we don't leak resource.
+      changelogWriter.foreach(_.abort())
+      changelogWriter = Some(fileManager.getChangeLogWriter(version + 1))
+    }
     this
   }
 
+  /**
+   * Replay change log from the loaded version to the target version.
+   */
+  private def replayChangelog(endVersion: Long): Unit = {
+    for (v <- loadedVersion + 1 to endVersion) {
+      var changelogReader: StateStoreChangelogReader = null
+      try {
+        changelogReader = fileManager.getChangelogReader(v)
+        changelogReader.foreach { case (key, value) =>
+          if (value != null) {
+            put(key, value)
+          } else {
+            remove(key)
+          }
+        }
+      } finally {
+        if (changelogReader != null) changelogReader.close()
+      }
+    }
+    loadedVersion = endVersion
+  }
+
   /**
    * Get the value for the given key if present, or null.
    * @note This will return the last written value even if it was uncommitted.
@@ -187,6 +228,7 @@ class RocksDB(
       }
     }
     db.put(writeOptions, key, value)
+    changelogWriter.foreach(_.put(key, value))
   }
 
   /**
@@ -201,6 +243,7 @@ class RocksDB(
       }
     }
     db.delete(writeOptions, key)
+    changelogWriter.foreach(_.delete(key))
   }
 
   /**
@@ -286,44 +329,66 @@ class RocksDB(
    */
   def commit(): Long = {
     val newVersion = loadedVersion + 1
-    val checkpointDir = createTempDir("checkpoint")
-    var rocksDBBackgroundThreadPaused = false
     try {
-      // Make sure the directory does not exist. Native RocksDB fails if the directory to
-      // checkpoint exists.
-      Utils.deleteRecursively(checkpointDir)
 
       logInfo(s"Flushing updates for $newVersion")
-      val flushTimeMs = timeTakenMs { db.flush(flushOptions) }
 
-      val compactTimeMs = if (conf.compactOnCommit) {
-        logInfo("Compacting")
-        timeTakenMs { db.compactRange() }
-      } else 0
-
-      logInfo("Pausing background work")
-      val pauseTimeMs = timeTakenMs {
-        db.pauseBackgroundWork() // To avoid files being changed while committing
-        rocksDBBackgroundThreadPaused = true
-      }
-
-      logInfo(s"Creating checkpoint for $newVersion in $checkpointDir")
-      val checkpointTimeMs = timeTakenMs {
-        val cp = Checkpoint.create(db)
-        cp.createCheckpoint(checkpointDir.toString)
+      var compactTimeMs = 0L
+      var flushTimeMs = 0L
+      var checkpointTimeMs = 0L
+      if (shouldCreateSnapshot()) {
+        // Need to flush the change to disk before creating a checkpoint
+        // because rocksdb wal is disabled.
+        logInfo(s"Flushing updates for $newVersion")
+        flushTimeMs = timeTakenMs { db.flush(flushOptions) }
+        if (conf.compactOnCommit) {
+          logInfo("Compacting")
+          compactTimeMs = timeTakenMs { db.compactRange() }
+        }
+        checkpointTimeMs = timeTakenMs {
+          val checkpointDir = createTempDir("checkpoint")
+          logInfo(s"Creating checkpoint for $newVersion in $checkpointDir")
+          // Make sure the directory does not exist. Native RocksDB fails if the directory to
+          // checkpoint exists.
+          Utils.deleteRecursively(checkpointDir)
+          // We no longer pause background operation before creating a RocksDB checkpoint because
+          // it is unnecessary. The captured snapshot will still be consistent with ongoing
+          // background operations.
+          val cp = Checkpoint.create(db)
+          cp.createCheckpoint(checkpointDir.toString)
+          synchronized {
+            // if changelog checkpointing is disabled, the snapshot is uploaded synchronously
+            // inside the uploadSnapshot() called below.
+            // If changelog checkpointing is enabled, snapshot will be uploaded asynchronously
+            // during state store maintenance.
+            latestSnapshot.foreach(_.close())
+            latestSnapshot = Some(
+              RocksDBSnapshot(checkpointDir, newVersion, numKeysOnWritingVersion))
+            lastSnapshotVersion = newVersion
+          }
+        }
       }
 
       logInfo(s"Syncing checkpoint for $newVersion to DFS")
       val fileSyncTimeMs = timeTakenMs {
-        fileManager.saveCheckpointToDfs(checkpointDir, newVersion, numKeysOnWritingVersion)
+        if (enableChangelogCheckpointing) {
+          try {
+            assert(changelogWriter.isDefined)
+            changelogWriter.foreach(_.commit())
+          } finally {
+            changelogWriter = None
+          }
+        } else {
+          assert(changelogWriter.isEmpty)
+          uploadSnapshot()
+        }
       }
+
       numKeysOnLoadedVersion = numKeysOnWritingVersion
       loadedVersion = newVersion
-      fileManagerMetrics = fileManager.latestSaveCheckpointMetrics
       commitLatencyMs ++= Map(
         "flush" -> flushTimeMs,
         "compact" -> compactTimeMs,
-        "pause" -> pauseTimeMs,
         "checkpoint" -> checkpointTimeMs,
         "fileSync" -> fileSyncTimeMs
       )
@@ -334,25 +399,60 @@ class RocksDB(
         loadedVersion = -1  // invalidate loaded version
         throw t
     } finally {
-      if (rocksDBBackgroundThreadPaused) db.continueBackgroundWork()
-      silentDeleteRecursively(checkpointDir, s"committing $newVersion")
       // reset resources as either 1) we already pushed the changes and it has been committed or
       // 2) commit has failed and the current version is "invalidated".
       release()
     }
   }
 
+  private def shouldCreateSnapshot(): Boolean = {
+    if (enableChangelogCheckpointing) {
+      assert(changelogWriter.isDefined)
+      val newVersion = loadedVersion + 1
+      newVersion - lastSnapshotVersion >= conf.minDeltasForSnapshot ||
+        changelogWriter.get.size > 10000
+    } else true
+  }
+
+  private def uploadSnapshot(): Unit = {
+    val localCheckpoint = synchronized {
+      val checkpoint = latestSnapshot
+      latestSnapshot = None
+      checkpoint
+    }
+    localCheckpoint match {
+      case Some(RocksDBSnapshot(localDir, version, numKeys)) =>
+        try {
+          val uploadTime = timeTakenMs {
+            fileManager.saveCheckpointToDfs(localDir, version, numKeys)
+            fileManagerMetrics = fileManager.latestSaveCheckpointMetrics
+          }
+          logInfo(s"$loggingId: Upload snapshot of version $version," +
+            s" time taken: $uploadTime ms")
+        } finally {
+          localCheckpoint.foreach(_.close())
+        }
+      case _ =>
+    }
+  }
+
   /**
    * Drop uncommitted changes, and roll back to previous version.
    */
   def rollback(): Unit = {
     numKeysOnWritingVersion = numKeysOnLoadedVersion
     loadedVersion = -1L
+    changelogWriter.foreach(_.abort())
+    // Make sure changelogWriter gets recreated next time.
+    changelogWriter = None
     release()
     logInfo(s"Rolled back to $loadedVersion")
   }
 
-  def cleanup(): Unit = {
+  def doMaintenance(): Unit = {
+    if (enableChangelogCheckpointing) {
+      uploadSnapshot()
+    }
     val cleanupTime = timeTakenMs {
       fileManager.deleteOldVersions(conf.minVersionsToRetain)
     }
@@ -369,6 +469,9 @@ class RocksDB(
       flushOptions.close()
       dbOptions.close()
       dbLogger.close()
+      synchronized {
+        latestSnapshot.foreach(_.close())
+      }
       silentDeleteRecursively(localRootDir, "closing RocksDB")
     } catch {
       case e: Exception =>
@@ -550,7 +653,9 @@ class ByteArrayPair(var key: Array[Byte] = null, var value: Array[Byte] = null)
  */
 case class RocksDBConf(
     minVersionsToRetain: Int,
+    minDeltasForSnapshot: Int,
     compactOnCommit: Boolean,
+    enableChangelogCheckpointing: Boolean,
     blockSizeKB: Long,
     blockCacheSizeMB: Long,
     lockAcquireTimeoutMs: Long,
@@ -563,7 +668,8 @@ case class RocksDBConf(
     boundedMemoryUsage: Boolean,
     totalMemoryUsageMB: Long,
     writeBufferCacheRatio: Double,
-    highPriorityPoolRatio: Double)
+    highPriorityPoolRatio: Double,
+    compressionCodec: String)
 
 object RocksDBConf {
   /** Common prefix of all confs in SQLConf that affects RocksDB */
@@ -585,6 +691,8 @@ object RocksDBConf {
 
   // Configuration that specifies whether to compact the RocksDB data every time data is committed
   private val COMPACT_ON_COMMIT_CONF = SQLConfEntry("compactOnCommit", "false")
+  private val ENABLE_CHANGELOG_CHECKPOINTING_CONF = SQLConfEntry(
+    "changelogCheckpointing.enabled", "false")
   private val BLOCK_SIZE_KB_CONF = SQLConfEntry("blockSizeKB", "4")
   private val BLOCK_CACHE_SIZE_MB_CONF = SQLConfEntry("blockCacheSizeMB", "8")
   // See SPARK-42794 for details.
@@ -705,7 +813,9 @@ object RocksDBConf {
 
     RocksDBConf(
       storeConf.minVersionsToRetain,
+      storeConf.minDeltasForSnapshot,
       getBooleanConf(COMPACT_ON_COMMIT_CONF),
+      getBooleanConf(ENABLE_CHANGELOG_CHECKPOINTING_CONF),
       getPositiveLongConf(BLOCK_SIZE_KB_CONF),
       getPositiveLongConf(BLOCK_CACHE_SIZE_MB_CONF),
       getPositiveLongConf(LOCK_ACQUIRE_TIMEOUT_MS_CONF),
@@ -718,7 +828,8 @@ object RocksDBConf {
       getBooleanConf(BOUNDED_MEMORY_USAGE_CONF),
       getLongConf(MAX_MEMORY_USAGE_MB_CONF),
       getRatioConf(WRITE_BUFFER_CACHE_RATIO_CONF),
-      getRatioConf(HIGH_PRIORITY_POOL_RATIO_CONF))
+      getRatioConf(HIGH_PRIORITY_POOL_RATIO_CONF),
+      storeConf.compressionCodec)
   }
 
   def apply(): RocksDBConf = apply(new StateStoreConf())
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
index d8f6c1b2abb..0891d773713 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
@@ -37,7 +37,9 @@ import org.apache.hadoop.fs.{FileStatus, Path, PathFilter}
 import org.json4s.NoTypeHints
 import org.json4s.jackson.Serialization
 
+import org.apache.spark.{SparkConf, SparkEnv}
 import org.apache.spark.internal.Logging
+import org.apache.spark.io.CompressionCodec
 import org.apache.spark.sql.execution.streaming.CheckpointFileManager
 import org.apache.spark.util.Utils
 
@@ -123,6 +125,7 @@ class RocksDBFileManager(
     dfsRootDir: String,
     localTempDir: File,
     hadoopConf: Configuration,
+    codecName: String = "zstd",
     loggingId: String = "")
   extends Logging {
 
@@ -134,6 +137,27 @@ class RocksDBFileManager(
   private val onlyZipFiles = new PathFilter {
     override def accept(path: Path): Boolean = path.toString.endsWith(".zip")
   }
+  private val onlyChangelogFiles = new PathFilter {
+    override def accept(path: Path): Boolean = path.toString.endsWith(".changelog")
+  }
+
+  private lazy val sparkConf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf)
+
+  private def codec = CompressionCodec.createCodec(sparkConf, codecName)
+
+  def getChangeLogWriter(version: Long): StateStoreChangelogWriter = {
+    val rootDir = new Path(dfsRootDir)
+    val changelogFile = dfsChangelogFile(version)
+    if (!fm.exists(rootDir)) fm.mkdirs(rootDir)
+    val changelogWriter = new StateStoreChangelogWriter(fm, changelogFile, codec)
+    changelogWriter
+  }
+
+  // Get the changelog file at version
+  def getChangelogReader(version: Long): StateStoreChangelogReader = {
+    val changelogFile = dfsChangelogFile(version)
+    new StateStoreChangelogReader(fm, changelogFile, codec)
+  }
 
   /**
    * Metrics for loading checkpoint from DFS. Every loadCheckpointFromDFS call will update this
@@ -205,19 +229,45 @@ class RocksDBFileManager(
     metadata
   }
 
-  /** Get the latest version available in the DFS directory. If no data present, it returns 0. */
-  def getLatestVersion(): Long = {
+  // Get latest snapshot version <= version
+  def getLatestSnapshotVersion(version: Long): Long = {
     val path = new Path(dfsRootDir)
     if (fm.exists(path)) {
+      // If the latest version snapshot exists, we avoid listing.
+      if (fm.exists(dfsBatchZipFile(version))) {
+        return version
+      }
       fm.list(path, onlyZipFiles)
         .map(_.getPath.getName.stripSuffix(".zip"))
         .map(_.toLong)
+        .filter(_ <= version)
         .foldLeft(0L)(math.max)
     } else {
       0
     }
   }
 
+
+  /** Get the latest version available in the DFS directory. If no data present, it returns 0. */
+  def getLatestVersion(): Long = {
+    val path = new Path(dfsRootDir)
+    if (fm.exists(path)) {
+      val files = fm.list(path).map(_.getPath)
+      val changelogFileVersions = files
+        .filter(onlyChangelogFiles.accept(_))
+        .map(_.getName.stripSuffix(".changelog"))
+        .map(_.toLong)
+      val snapshotFileVersions = files
+        .filter(onlyZipFiles.accept(_))
+        .map(_.getName.stripSuffix(".zip"))
+        .map(_.toLong)
+      val versions = changelogFileVersions ++ snapshotFileVersions
+      versions.foldLeft(0L)(math.max)
+    } else {
+      0
+    }
+  }
+
   /**
    * Find orphan files which are not tracked by zip files.
    * Both sst files and log files can be orphan files.
@@ -250,6 +300,18 @@ class RocksDBFileManager(
     }
   }
 
+  private def deleteChangelogFiles(versionsToDelete: Array[Long]): Unit = {
+    versionsToDelete.foreach { version =>
+      try {
+        fm.delete(dfsChangelogFile(version))
+        logInfo(s"Deleted changelog file $version")
+      } catch {
+        case e: Exception =>
+          logWarning(s"Error deleting changelog file for version $version", e)
+      }
+    }
+  }
+
   /**
    * Delete old versions by deleting the associated version and SST files.
    * At a high-level, this method finds which versions to delete, and which SST files that were
@@ -268,7 +330,9 @@ class RocksDBFileManager(
    * - Find the orphan sst and log files whose zip files are not uploaded successfully
    *   or have been overwritten. To avoid deleting files of ongoing tasks, only delete orphan files
    *   that are older than all tracked files when there are at least 2 versions.
-   * - Delete files in both to-be-deleted versions and orphan files.
+   * - Delete sst and log files in to-be-deleted versions.
+   * - Delete orphan files.
+   * - Delete changelog files of to-be-deleted versions.
    *
    * Note that it only deletes files that it knows are safe to delete.
    * It may not delete the following files.
@@ -278,36 +342,39 @@ class RocksDBFileManager(
    */
   def deleteOldVersions(numVersionsToRetain: Int): Unit = {
     val path = new Path(dfsRootDir)
-
+    val allFiles = fm.list(path).map(_.getPath)
+    val snapshotFiles = allFiles.filter(file => onlyZipFiles.accept(file))
+    val changelogFiles = allFiles.filter(file => onlyChangelogFiles.accept(file))
     // All versions present in DFS, sorted
-    val sortedVersions = fm.list(path, onlyZipFiles)
-      .map(_.getPath.getName.stripSuffix(".zip"))
+    val sortedSnapshotVersions = snapshotFiles
+      .map(_.getName.stripSuffix(".zip"))
       .map(_.toLong)
       .sorted
 
     // Return if no versions generated yet
-    if (sortedVersions.isEmpty) return
+    if (sortedSnapshotVersions.isEmpty) return
 
     // Find the versions to delete
-    val maxVersionPresent = sortedVersions.last
-    val minVersionPresent = sortedVersions.head
-    val minVersionToRetain =
-      math.max(minVersionPresent, maxVersionPresent - numVersionsToRetain + 1)
-    val versionsToDelete = sortedVersions.takeWhile(_ < minVersionToRetain).toSet[Long]
+    val maxSnapshotVersionPresent = sortedSnapshotVersions.last
 
-    // When versionToDelete is non-empty, there are at least 2 versions.
+    // In order to reconstruct numVersionsToRetain version, retain the latest snapshot
+    // that satisfies (version <= maxSnapshotVersionPresent - numVersionsToRetain + 1).
+    // If none of the snapshots satisfy the condition, minVersionToRetain will be 0 and
+    // no version gets deleted.
+    val minVersionToRetain = sortedSnapshotVersions
+      .filter(_ <= maxSnapshotVersionPresent - numVersionsToRetain + 1)
+      .foldLeft(0L)(math.max)
+
+    // When snapshotVersionToDelete is non-empty, there are at least 2 snapshot versions.
     // We only delete orphan files when there are at least 2 versions,
     // which avoid deleting files for running tasks.
-    if (versionsToDelete.isEmpty) return
+    val snapshotVersionsToDelete = sortedSnapshotVersions.filter(_ < minVersionToRetain)
+    if (snapshotVersionsToDelete.isEmpty) return
 
-    logInfo(
-      s"Versions present: (min $minVersionPresent, max $maxVersionPresent), " +
-        s"cleaning up all versions older than $minVersionToRetain to retain last " +
-        s"$numVersionsToRetain versions")
 
     // Resolve RocksDB files for all the versions and find the max version each file is used
     val fileToMaxUsedVersion = new mutable.HashMap[String, Long]
-    sortedVersions.foreach { version =>
+    sortedSnapshotVersions.foreach { version =>
       val files = Option(versionToRocksDBFiles.get(version)).getOrElse {
         val newResolvedFiles = getImmutableFilesFromVersionZip(version)
         versionToRocksDBFiles.put(version, newResolvedFiles)
@@ -318,7 +385,9 @@ class RocksDBFileManager(
     }
 
     // Best effort attempt to delete SST files that were last used in to-be-deleted versions
-    val filesToDelete = fileToMaxUsedVersion.filter { case (_, v) => versionsToDelete.contains(v) }
+    val filesToDelete = fileToMaxUsedVersion.filter {
+      case (_, v) => snapshotVersionsToDelete.contains(v)
+    }
 
     val sstDir = new Path(dfsRootDir, RocksDBImmutableFile.SST_FILES_DFS_SUBDIR)
     val logDir = new Path(dfsRootDir, RocksDBImmutableFile.LOG_FILES_DFS_SUBDIR)
@@ -349,7 +418,7 @@ class RocksDBFileManager(
     }
 
     // Delete the version files and forget about them
-    versionsToDelete.foreach { version =>
+    snapshotVersionsToDelete.foreach { version =>
       val versionFile = dfsBatchZipFile(version)
       try {
         fm.delete(versionFile)
@@ -362,6 +431,11 @@ class RocksDBFileManager(
     }
     logInfo(s"Deleted ${filesToDelete.size - failedToDelete} files (failed to delete" +
       s"$failedToDelete files) not used in versions >= $minVersionToRetain")
+
+    val changelogVersionsToDelete = changelogFiles
+      .map(_.getName.stripSuffix(".changelog")).map(_.toLong)
+      .filter(_ < minVersionToRetain)
+    deleteChangelogFiles(changelogVersionsToDelete)
   }
 
   /** Save immutable files to DFS directory */
@@ -534,6 +608,9 @@ class RocksDBFileManager(
   }
 
   private def dfsBatchZipFile(version: Long): Path = new Path(s"$dfsRootDir/$version.zip")
+  // We use changelog suffix intentionally so that we can tell the difference from changelog file of
+  // HDFSBackedStateStore which is named version.delta.
+  private def dfsChangelogFile(version: Long): Path = new Path(s"$dfsRootDir/$version.changelog")
 
   private def localMetadataFile(parentDir: File): File = new File(parentDir, "metadata")
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
index b4b648c3693..10f207c7ec1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
@@ -195,8 +195,14 @@ private[sql] class RocksDBStateStoreProvider
     new RocksDBStateStore(version)
   }
 
+  override def getReadStore(version: Long): StateStore = {
+    require(version >= 0, "Version cannot be less than 0")
+    rocksDB.load(version, true)
+    new RocksDBStateStore(version)
+  }
+
   override def doMaintenance(): Unit = {
-    rocksDB.cleanup()
+    rocksDB.doMaintenance()
   }
 
   override def close(): Unit = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala
new file mode 100644
index 00000000000..372cbb6d986
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.io.{DataInputStream, DataOutputStream, FileNotFoundException, IOException}
+
+import scala.util.control.NonFatal
+
+import com.google.common.io.ByteStreams
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.fs.{FSError, Path}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.sql.execution.streaming.CheckpointFileManager
+import org.apache.spark.sql.execution.streaming.CheckpointFileManager.CancellableFSDataOutputStream
+import org.apache.spark.util.NextIterator
+
+/**
+ * Write changes to the key value state store instance to a changelog file.
+ * There are 2 types of records, put and delete.
+ * A put record is written as: | key length | key content | value length | value content |
+ * A delete record is written as: | key length | key content | -1 |
+ * Write an Int -1 to signal the end of file.
+ * The overall changelog format is: | put record | delete record | ... | put record | -1 |
+ */
+class StateStoreChangelogWriter(
+    fm: CheckpointFileManager,
+    file: Path,
+    compressionCodec: CompressionCodec) extends Logging {
+
+  private def compressStream(outputStream: DataOutputStream): DataOutputStream = {
+    val compressed = compressionCodec.compressedOutputStream(outputStream)
+    new DataOutputStream(compressed)
+  }
+
+  private var backingFileStream: CancellableFSDataOutputStream =
+    fm.createAtomic(file, overwriteIfPossible = true)
+  private var compressedStream: DataOutputStream = compressStream(backingFileStream)
+  var size = 0
+
+  def put(key: Array[Byte], value: Array[Byte]): Unit = {
+    assert(compressedStream != null)
+    compressedStream.writeInt(key.size)
+    compressedStream.write(key)
+    compressedStream.writeInt(value.size)
+    compressedStream.write(value)
+    size += 1
+  }
+
+  def delete(key: Array[Byte]): Unit = {
+    assert(compressedStream != null)
+    compressedStream.writeInt(key.size)
+    compressedStream.write(key)
+    // -1 in the value field means record deletion.
+    compressedStream.writeInt(-1)
+    size += 1
+  }
+
+  def abort(): Unit = {
+    try {
+      if (backingFileStream != null) backingFileStream.cancel()
+      if (compressedStream != null) IOUtils.closeQuietly(compressedStream)
+    } catch {
+      // Closing the compressedStream causes the stream to write/flush flush data into the
+      // rawStream. Since the rawStream is already closed, there may be errors.
+      // Usually its an IOException. However, Hadoop's RawLocalFileSystem wraps
+      // IOException into FSError.
+      case e: FSError if e.getCause.isInstanceOf[IOException] =>
+      case NonFatal(ex) =>
+        logInfo(s"Failed to cancel changelog file $file for state store provider " +
+          s"with exception=$ex")
+    } finally {
+      backingFileStream = null
+      compressedStream = null
+    }
+  }
+
+  def commit(): Unit = {
+    try {
+      // -1 in the key length field mean EOF.
+      compressedStream.writeInt(-1)
+      compressedStream.close()
+    } catch {
+      case e: Throwable =>
+        abort()
+        logError(s"Fail to commit changelog file $file because of exception $e")
+        throw e
+    } finally {
+      backingFileStream = null
+      compressedStream = null
+    }
+  }
+}
+
+
+/**
+ * Read an iterator of change record from the changelog file.
+ * A record is represented by ByteArrayPair(key: Array[Byte], value: Array[Byte])
+ * A put record is returned as a ByteArrayPair(key, value)
+ * A delete record is return as a ByteArrayPair(key, null)
+ */
+class StateStoreChangelogReader(
+    fm: CheckpointFileManager,
+    fileToRead: Path,
+    compressionCodec: CompressionCodec)
+  extends NextIterator[(Array[Byte], Array[Byte])] with Logging {
+
+  private def decompressStream(inputStream: DataInputStream): DataInputStream = {
+    val compressed = compressionCodec.compressedInputStream(inputStream)
+    new DataInputStream(compressed)
+  }
+
+  private val sourceStream = try {
+    fm.open(fileToRead)
+  } catch {
+    case f: FileNotFoundException =>
+      throw new IllegalStateException(
+        s"Error reading streaming state file of $fileToRead does not exist. " +
+          "If the stream job is restarted with a new or updated state operation, please" +
+          " create a new checkpoint location or clear the existing checkpoint location.", f)
+  }
+  private val input: DataInputStream = decompressStream(sourceStream)
+
+  def close(): Unit = { if (input != null) input.close() }
+
+  override def getNext(): (Array[Byte], Array[Byte]) = {
+    val keySize = input.readInt()
+    // A -1 key size mean end of file.
+    if (keySize == -1) {
+      finished = true
+      null
+    } else if (keySize < 0) {
+      throw new IOException(
+        s"Error reading streaming state file $fileToRead: key size cannot be $keySize")
+    } else {
+      // TODO: reuse the key buffer and value buffer across records.
+      val keyBuffer = new Array[Byte](keySize)
+      ByteStreams.readFully(input, keyBuffer, 0, keySize)
+      val valueSize = input.readInt()
+      if (valueSize < 0) {
+        // A deletion record
+        (keyBuffer, null)
+      } else {
+        val valueBuffer = new Array[Byte](valueSize)
+        ByteStreams.readFully(input, valueBuffer, 0, valueSize)
+        // A put record.
+        (keyBuffer, valueBuffer)
+      }
+    }
+  }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala
index 345526bb986..339d00058fc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala
@@ -27,8 +27,11 @@ import org.apache.spark.sql.execution.streaming.{MemoryStream, StreamingQueryWra
 import org.apache.spark.sql.functions.count
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.streaming._
+import org.apache.spark.sql.streaming.OutputMode.Update
+import org.apache.spark.util.Utils
 
-class RocksDBStateStoreIntegrationSuite extends StreamTest {
+class RocksDBStateStoreIntegrationSuite extends StreamTest
+  with AlsoTestWithChangelogCheckpointingEnabled {
   import testImplicits._
 
   test("RocksDBStateStore") {
@@ -45,7 +48,11 @@ class RocksDBStateStoreIntegrationSuite extends StreamTest {
           // Verify that RocksDBStateStore by verify the state checkpoints are [version].zip
           val storeCheckpointDir = StateStoreId(
             dir.getAbsolutePath + "/state", 0, 0).storeCheckpointLocation()
-          val storeCheckpointFile = storeCheckpointDir + "/1.zip"
+          val storeCheckpointFile = if (isChangelogCheckpointingEnabled) {
+            storeCheckpointDir + "/1.changelog"
+          } else {
+            storeCheckpointDir + "/1.zip"
+          }
           new File(storeCheckpointFile).exists()
         }
       )
@@ -205,4 +212,53 @@ class RocksDBStateStoreIntegrationSuite extends StreamTest {
       }
     }
   }
+
+  testWithChangelogCheckpointingEnabled(
+    "Streaming aggregation RocksDB State Store backward compatibility.") {
+    val checkpointDir = Utils.createTempDir().getCanonicalFile
+    checkpointDir.delete()
+
+    val dirForPartition0 = new File(checkpointDir.getAbsolutePath, "/state/0/0")
+    val inputData = MemoryStream[Int]
+    val aggregated =
+      inputData.toDF()
+        .groupBy($"value")
+        .agg(count("*"))
+        .as[(Int, Long)]
+
+    // Run the stream with changelog checkpointing disabled.
+    testStream(aggregated, Update)(
+      StartStream(checkpointLocation = checkpointDir.getAbsolutePath,
+        additionalConfs = Map(rocksdbChangelogCheckpointingConfKey -> "false")),
+      AddData(inputData, 3),
+      CheckLastBatch((3, 1)),
+      AddData(inputData, 3, 2),
+      CheckLastBatch((3, 2), (2, 1)),
+      StopStream
+    )
+    assert(changelogVersionsPresent(dirForPartition0).isEmpty)
+    assert(snapshotVersionsPresent(dirForPartition0) == List(1L, 2L))
+
+    // Run the stream with changelog checkpointing enabled.
+    testStream(aggregated, Update)(
+      StartStream(checkpointLocation = checkpointDir.getAbsolutePath,
+        additionalConfs = Map(rocksdbChangelogCheckpointingConfKey -> "true")),
+      AddData(inputData, 3, 2, 1),
+      CheckLastBatch((3, 3), (2, 2), (1, 1)),
+      // By default we run in new tuple mode.
+      AddData(inputData, 4, 4, 4, 4),
+      CheckLastBatch((4, 4))
+    )
+    assert(changelogVersionsPresent(dirForPartition0) == List(3L, 4L))
+
+    // Run the stream with changelog checkpointing disabled.
+    testStream(aggregated, Update)(
+      StartStream(checkpointLocation = checkpointDir.getAbsolutePath,
+        additionalConfs = Map(rocksdbChangelogCheckpointingConfKey -> "false")),
+      AddData(inputData, 4),
+      CheckLastBatch((4, 5))
+    )
+    assert(changelogVersionsPresent(dirForPartition0) == List(3L, 4L))
+    assert(snapshotVersionsPresent(dirForPartition0).contains(5L))
+  }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
index 82167db6d7d..d113085fd1c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
@@ -25,10 +25,13 @@ import org.apache.hadoop.conf.Configuration
 import org.scalatest.BeforeAndAfter
 
 import org.apache.spark.SparkConf
+import org.apache.spark.io.CompressionCodec
 import org.apache.spark.sql.LocalSparkSession.withSparkSession
 import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.util.quietly
 import org.apache.spark.sql.execution.streaming.StatefulOperatorStateInfo
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.sql.types._
 import org.apache.spark.tags.ExtendedSQLTest
 import org.apache.spark.unsafe.Platform
@@ -36,6 +39,8 @@ import org.apache.spark.util.Utils
 
 @ExtendedSQLTest
 class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvider]
+  with AlsoTestWithChangelogCheckpointingEnabled
+  with SharedSparkSession
   with BeforeAndAfter {
 
   before {
@@ -76,6 +81,7 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid
         ("spark.sql.streaming.stateStore.providerClass",
           classOf[RocksDBStateStoreProvider].getName),
         (RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".compactOnCommit", "true"),
+        (RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".changelogCheckpointing.enabled", "true"),
         (RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".lockAcquireTimeoutMs", "10"),
         (RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".maxOpenFiles", "1000"),
         (RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".maxWriteBufferNumber", "3"),
@@ -103,6 +109,7 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid
 
       // Verify the confs are same as those configured in the session conf
       assert(rocksDBConfInTask.compactOnCommit == true)
+      assert(rocksDBConfInTask.enableChangelogCheckpointing == true)
       assert(rocksDBConfInTask.lockAcquireTimeoutMs == 10L)
       assert(rocksDBConfInTask.formatVersion == 4)
       assert(rocksDBConfInTask.maxOpenFiles == 1000)
@@ -118,20 +125,22 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid
       assert(metricPair.isDefined)
       metricPair.get._2
     }
-
-    tryWithProviderResource(newStoreProvider()) { provider =>
-      val store = provider.getStore(0)
-      // Verify state after updating
-      put(store, "a", 0, 1)
-      assert(get(store, "a", 0) === Some(1))
-      assert(store.commit() === 1)
-      assert(store.hasCommitted)
-      val storeMetrics = store.metrics
-      assert(storeMetrics.numKeys === 1)
-      assert(getCustomMetric(storeMetrics, CUSTOM_METRIC_FILES_COPIED) > 0L)
-      assert(getCustomMetric(storeMetrics, CUSTOM_METRIC_FILES_REUSED) == 0L)
-      assert(getCustomMetric(storeMetrics, CUSTOM_METRIC_BYTES_COPIED) > 0L)
-      assert(getCustomMetric(storeMetrics, CUSTOM_METRIC_ZIP_FILE_BYTES_UNCOMPRESSED) > 0L)
+    withSQLConf(SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1") {
+      tryWithProviderResource(newStoreProvider()) { provider =>
+          val store = provider.getStore(0)
+          // Verify state after updating
+          put(store, "a", 0, 1)
+          assert(get(store, "a", 0) === Some(1))
+          assert(store.commit() === 1)
+          provider.doMaintenance()
+          assert(store.hasCommitted)
+          val storeMetrics = store.metrics
+          assert(storeMetrics.numKeys === 1)
+          assert(getCustomMetric(storeMetrics, CUSTOM_METRIC_FILES_COPIED) > 0L)
+          assert(getCustomMetric(storeMetrics, CUSTOM_METRIC_FILES_REUSED) == 0L)
+          assert(getCustomMetric(storeMetrics, CUSTOM_METRIC_BYTES_COPIED) > 0L)
+          assert(getCustomMetric(storeMetrics, CUSTOM_METRIC_ZIP_FILE_BYTES_UNCOMPRESSED) > 0L)
+      }
     }
   }
 
@@ -149,11 +158,12 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid
 
   def newStoreProvider(
       storeId: StateStoreId,
-      numColsPrefixKey: Int): RocksDBStateStoreProvider = {
+      numColsPrefixKey: Int,
+      sqlConf: Option[SQLConf] = None): RocksDBStateStoreProvider = {
     val provider = new RocksDBStateStoreProvider()
     provider.init(
       storeId, keySchema, valueSchema, numColsPrefixKey = numColsPrefixKey,
-      new StateStoreConf, new Configuration)
+      new StateStoreConf(sqlConf.getOrElse(SQLConf.get)), new Configuration)
     provider
   }
 
@@ -173,10 +183,44 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid
 
   override def newStoreProvider(
     minDeltasForSnapshot: Int,
-    numOfVersToRetainInMemory: Int): RocksDBStateStoreProvider = newStoreProvider()
+    numOfVersToRetainInMemory: Int): RocksDBStateStoreProvider = {
+    newStoreProvider(StateStoreId(newDir(), Random.nextInt(), 0), 0,
+      Some(getDefaultSQLConf(minDeltasForSnapshot, numOfVersToRetainInMemory)))
+  }
 
   override def getDefaultSQLConf(
     minDeltasForSnapshot: Int,
-    numOfVersToRetainInMemory: Int): SQLConf = new SQLConf()
+    numOfVersToRetainInMemory: Int): SQLConf = {
+    val sqlConf = SQLConf.get.clone()
+    sqlConf.setConfString(
+      SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key, minDeltasForSnapshot.toString)
+    sqlConf
+  }
+
+  override def testQuietly(name: String)(f: => Unit): Unit = {
+    test(name) {
+      quietly {
+        f
+      }
+    }
+  }
+
+  override def testWithAllCodec(name: String)(func: => Any): Unit = {
+    codecsInShortName.foreach { codecName =>
+      super.test(s"$name - with codec $codecName") {
+        withSQLConf(SQLConf.STATE_STORE_COMPRESSION_CODEC.key -> codecName) {
+          func
+        }
+      }
+    }
+
+    CompressionCodec.ALL_COMPRESSION_CODECS.foreach { codecName =>
+      super.test(s"$name - with codec $codecName") {
+        withSQLConf(SQLConf.STATE_STORE_COMPRESSION_CODEC.key -> codecName) {
+          func
+        }
+      }
+    }
+  }
 }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
index f66ac0de8c6..3023a445930 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
@@ -24,14 +24,297 @@ import scala.language.implicitConversions
 
 import org.apache.commons.io.FileUtils
 import org.apache.hadoop.conf.Configuration
+import org.scalactic.source.Position
+import org.scalatest.Tag
 
-import org.apache.spark._
 import org.apache.spark.sql.catalyst.util.quietly
 import org.apache.spark.sql.execution.streaming.CreateAtomicTestManager
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}
 import org.apache.spark.util.{ThreadUtils, Utils}
 
-class RocksDBSuite extends SparkFunSuite {
+trait RocksDBStateStoreChangelogCheckpointingTestUtil {
+  val rocksdbChangelogCheckpointingConfKey: String = RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX +
+    ".changelogCheckpointing.enabled"
+
+  def isChangelogCheckpointingEnabled: Boolean =
+    SQLConf.get.getConfString(rocksdbChangelogCheckpointingConfKey) == "true"
+
+  def snapshotVersionsPresent(dir: File): Seq[Long] = {
+    dir.listFiles.filter(_.getName.endsWith(".zip"))
+      .map(_.getName.stripSuffix(".zip"))
+      .map(_.toLong)
+      .sorted
+  }
+
+  def changelogVersionsPresent(dir: File): Seq[Long] = {
+    dir.listFiles.filter(_.getName.endsWith(".changelog"))
+      .map(_.getName.stripSuffix(".changelog"))
+      .map(_.toLong)
+      .sorted
+  }
+}
+
+
+trait AlsoTestWithChangelogCheckpointingEnabled
+  extends SQLTestUtils with RocksDBStateStoreChangelogCheckpointingTestUtil {
+
+  override protected def test(testName: String, testTags: Tag*)(testBody: => Any)
+                             (implicit pos: Position): Unit = {
+    testWithChangelogCheckpointingEnabled(testName, testTags: _*)(testBody)
+    testWithChangelogCheckpointingDisabled(testName, testTags: _*)(testBody)
+  }
+
+  def testWithChangelogCheckpointingEnabled(testName: String, testTags: Tag*)
+                                        (testBody: => Any): Unit = {
+    super.test(testName + " (with changelog checkpointing)", testTags: _*) {
+      // in case tests have any code that needs to execute before every test
+      super.beforeEach()
+      withSQLConf(rocksdbChangelogCheckpointingConfKey -> "true",
+        SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName) {
+        testBody
+      }
+      // in case tests have any code that needs to execute after every test
+      super.afterEach()
+    }
+  }
+
+  def testWithChangelogCheckpointingDisabled(testName: String, testTags: Tag*)
+                                           (testBody: => Any): Unit = {
+    super.test(testName + " (without changelog checkpointing)", testTags: _*) {
+      // in case tests have any code that needs to execute before every test
+      super.beforeEach()
+      withSQLConf(rocksdbChangelogCheckpointingConfKey -> "false",
+        SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName) {
+        testBody
+      }
+      // in case tests have any code that needs to execute after every test
+      super.afterEach()
+    }
+  }
+}
+
+class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with SharedSparkSession {
+
+  sqlConf.setConf(SQLConf.STATE_STORE_PROVIDER_CLASS, classOf[RocksDBStateStoreProvider].getName)
+
+  testWithChangelogCheckpointingEnabled(
+    "RocksDB: check changelog and snapshot version") {
+    val remoteDir = Utils.createTempDir().toString
+    val conf = dbConf.copy(minDeltasForSnapshot = 1)
+    new File(remoteDir).delete()  // to make sure that the directory gets created
+    for (version <- 0 to 49) {
+      withDB(remoteDir, version = version, conf = conf) { db =>
+          db.put(version.toString, version.toString)
+          db.commit()
+          if ((version + 1) % 5 == 0) db.doMaintenance()
+      }
+    }
+
+    if (isChangelogCheckpointingEnabled) {
+      assert(changelogVersionsPresent(remoteDir) === (1 to 50))
+      assert(snapshotVersionsPresent(remoteDir) === Range.inclusive(5, 50, 5))
+    } else {
+      assert(changelogVersionsPresent(remoteDir) === Seq.empty)
+      assert(snapshotVersionsPresent(remoteDir) === (1 to 50))
+    }
+  }
+
+  test("RocksDB: load version that doesn't exist") {
+    val remoteDir = Utils.createTempDir().toString
+    new File(remoteDir).delete()  // to make sure that the directory gets created
+    withDB(remoteDir) { db =>
+      intercept[IllegalStateException] {
+        db.load(1)
+      }
+    }
+  }
+
+  testWithChangelogCheckpointingEnabled(
+    "RocksDB: purge changelog and snapshots") {
+    val remoteDir = Utils.createTempDir().toString
+    new File(remoteDir).delete()  // to make sure that the directory gets created
+    val conf = dbConf.copy(enableChangelogCheckpointing = true,
+      minVersionsToRetain = 3, minDeltasForSnapshot = 1)
+    withDB(remoteDir, conf = conf) { db =>
+      db.load(0)
+      db.commit()
+      for (version <- 1 to 2) {
+        db.load(version)
+        db.commit()
+        db.doMaintenance()
+      }
+      assert(snapshotVersionsPresent(remoteDir) === Seq(2, 3))
+      assert(changelogVersionsPresent(remoteDir) == Seq(1, 2, 3))
+
+      for (version <- 3 to 4) {
+        db.load(version)
+        db.commit()
+      }
+      assert(snapshotVersionsPresent(remoteDir) === Seq(2, 3))
+      assert(changelogVersionsPresent(remoteDir) == (1 to 5))
+      db.doMaintenance()
+      // 3 is the latest snapshot <= maxSnapshotVersionPresent - minVersionsToRetain + 1
+      assert(snapshotVersionsPresent(remoteDir) === Seq(3, 5))
+      assert(changelogVersionsPresent(remoteDir) == (3 to 5))
+
+      for (version <- 5 to 7) {
+        db.load(version)
+        db.commit()
+      }
+      assert(snapshotVersionsPresent(remoteDir) === Seq(3, 5))
+      assert(changelogVersionsPresent(remoteDir) == (3 to 8))
+      db.doMaintenance()
+      // 5 is the latest snapshot <= maxSnapshotVersionPresent - minVersionsToRetain + 1
+      assert(snapshotVersionsPresent(remoteDir) === Seq(5, 8))
+      assert(changelogVersionsPresent(remoteDir) == (5 to 8))
+    }
+  }
+
+  testWithChangelogCheckpointingEnabled(
+    "RocksDB: minDeltasForSnapshot") {
+    val remoteDir = Utils.createTempDir().toString
+    new File(remoteDir).delete()  // to make sure that the directory gets created
+    val conf = dbConf.copy(enableChangelogCheckpointing = true, minDeltasForSnapshot = 3)
+    withDB(remoteDir, conf = conf) { db =>
+      for (version <- 0 to 1) {
+        db.load(version)
+        db.commit()
+        db.doMaintenance()
+      }
+      // Snapshot should not be created because minDeltasForSnapshot = 3
+      assert(snapshotVersionsPresent(remoteDir) === Seq.empty)
+      assert(changelogVersionsPresent(remoteDir) == Seq(1, 2))
+      db.load(2)
+      db.commit()
+      db.doMaintenance()
+      assert(snapshotVersionsPresent(remoteDir) === Seq(3))
+      db.load(3)
+      for (i <- 1 to 10001) {
+        db.put(i.toString, i.toString)
+      }
+      db.commit()
+      db.doMaintenance()
+      // Snapshot should be created this time because the size of the change log > 1000
+      assert(snapshotVersionsPresent(remoteDir) === Seq(3, 4))
+      for (version <- 4 to 7) {
+        db.load(version)
+        db.commit()
+        db.doMaintenance()
+      }
+      assert(snapshotVersionsPresent(remoteDir) === Seq(3, 4, 7))
+      for (version <- 8 to 20) {
+        db.load(version)
+        db.commit()
+      }
+      db.doMaintenance()
+      assert(snapshotVersionsPresent(remoteDir) === Seq(3, 4, 7, 19))
+    }
+  }
+
+  // A rocksdb instance with changelog checkpointing enabled should be able to load
+  // an existing checkpoint without changelog.
+  testWithChangelogCheckpointingEnabled(
+    "RocksDB: changelog checkpointing backward compatibility") {
+    val remoteDir = Utils.createTempDir().toString
+    new File(remoteDir).delete()  // to make sure that the directory gets created
+    val disableChangelogCheckpointingConf =
+      dbConf.copy(enableChangelogCheckpointing = false, minVersionsToRetain = 30)
+    withDB(remoteDir, conf = disableChangelogCheckpointingConf) { db =>
+      for (version <- 1 to 30) {
+        db.load(version - 1)
+        db.put(version.toString, version.toString)
+        db.remove((version - 1).toString)
+        db.commit()
+      }
+      assert(snapshotVersionsPresent(remoteDir) === (1 to 30))
+    }
+
+    // Now enable changelog checkpointing in a checkpoint created by a state store
+    // that disable changelog checkpointing.
+    val enableChangelogCheckpointingConf =
+      dbConf.copy(enableChangelogCheckpointing = true, minVersionsToRetain = 30,
+        minDeltasForSnapshot = 1)
+    withDB(remoteDir, conf = enableChangelogCheckpointingConf) { db =>
+      for (version <- 1 to 30) {
+        db.load(version)
+        assert(db.iterator().map(toStr).toSet === Set((version.toString, version.toString)))
+      }
+      for (version <- 30 to 60) {
+        db.load(version - 1)
+        db.put(version.toString, version.toString)
+        db.remove((version - 1).toString)
+        db.commit()
+      }
+      assert(snapshotVersionsPresent(remoteDir) === (1 to 30))
+      assert(changelogVersionsPresent(remoteDir) === (30 to 60))
+      for (version <- 1 to 60) {
+        db.load(version, readOnly = true)
+        assert(db.iterator().map(toStr).toSet === Set((version.toString, version.toString)))
+      }
+      // Check that snapshots and changelogs get purged correctly.
+      db.doMaintenance()
+      assert(snapshotVersionsPresent(remoteDir) === Seq(30, 60))
+      assert(changelogVersionsPresent(remoteDir) === (30 to 60))
+      // Verify the content of retained versions.
+      for (version <- 30 to 60) {
+        db.load(version, readOnly = true)
+        assert(db.iterator().map(toStr).toSet === Set((version.toString, version.toString)))
+      }
+    }
+  }
+
+  // A rocksdb instance with changelog checkpointing disabled should be able to load
+  // an existing checkpoint with changelog.
+  testWithChangelogCheckpointingEnabled(
+    "RocksDB: changelog checkpointing forward compatibility") {
+    val remoteDir = Utils.createTempDir().toString
+    new File(remoteDir).delete()  // to make sure that the directory gets created
+    val enableChangelogCheckpointingConf =
+      dbConf.copy(enableChangelogCheckpointing = true, minVersionsToRetain = 20,
+        minDeltasForSnapshot = 3)
+    withDB(remoteDir, conf = enableChangelogCheckpointingConf) { db =>
+      for (version <- 1 to 30) {
+        db.load(version - 1)
+        db.put(version.toString, version.toString)
+        db.remove((version - 1).toString)
+        db.commit()
+      }
+    }
+
+    // Now disable changelog checkpointing in a checkpoint created by a state store
+    // that enable changelog checkpointing.
+    val disableChangelogCheckpointingConf =
+    dbConf.copy(enableChangelogCheckpointing = false, minVersionsToRetain = 20,
+      minDeltasForSnapshot = 1)
+    withDB(remoteDir, conf = disableChangelogCheckpointingConf) { db =>
+      for (version <- 1 to 30) {
+        db.load(version)
+        assert(db.iterator().map(toStr).toSet === Set((version.toString, version.toString)))
+      }
+      for (version <- 31 to 60) {
+        db.load(version - 1)
+        db.put(version.toString, version.toString)
+        db.remove((version - 1).toString)
+        db.commit()
+      }
+      assert(changelogVersionsPresent(remoteDir) === (1 to 30))
+      assert(snapshotVersionsPresent(remoteDir) === (31 to 60))
+      for (version <- 1 to 60) {
+        db.load(version, readOnly = true)
+        assert(db.iterator().map(toStr).toSet === Set((version.toString, version.toString)))
+      }
+      // Check that snapshots and changelogs get purged correctly.
+      db.doMaintenance()
+      assert(snapshotVersionsPresent(remoteDir) === (41 to 60))
+      assert(changelogVersionsPresent(remoteDir) === Seq.empty)
+      // Verify the content of retained versions.
+      for (version <- 41 to 60) {
+        db.load(version, readOnly = true)
+        assert(db.iterator().map(toStr).toSet === Set((version.toString, version.toString)))
+      }
+    }
+  }
 
   test("RocksDB: get, put, iterator, commit, load") {
     def testOps(compactOnCommit: Boolean): Unit = {
@@ -102,56 +385,17 @@ class RocksDBSuite extends SparkFunSuite {
     }
   }
 
-  test("RocksDB: cleanup old files") {
-    val remoteDir = Utils.createTempDir().toString
-    val conf = RocksDBConf().copy(compactOnCommit = true, minVersionsToRetain = 10)
-
-    def versionsPresent: Seq[Long] = {
-      remoteDir.listFiles.filter(_.getName.endsWith(".zip"))
-        .map(_.getName.stripSuffix(".zip"))
-        .map(_.toLong)
-        .sorted
-    }
-
-    withDB(remoteDir, conf = conf) { db =>
-      // Generate versions without cleaning up
-      for (version <- 1 to 50) {
-        if (version > 1) {
-          // remove keys we wrote in previous iteration to ensure compaction happens
-          db.remove((version - 1).toString)
-        }
-        db.put(version.toString, version.toString)
-        db.commit()
-      }
-
-      // Clean up and verify version files and SST files were deleted
-      require(versionsPresent === (1L to 50L))
-      val sstDir = new File(remoteDir, "SSTs")
-      val numSstFiles = listFiles(sstDir).length
-      db.cleanup()
-      assert(versionsPresent === (41L to 50L))
-      assert(listFiles(sstDir).length < numSstFiles)
-
-      // Verify data in retained vesions.
-      versionsPresent.foreach { version =>
-        db.load(version)
-        val data = db.iterator().map(toStr).toSet
-        assert(data === Set((version.toString, version.toString)))
-      }
-    }
-  }
-
   test("RocksDB: handle commit failures and aborts") {
     val hadoopConf = new Configuration()
     hadoopConf.set(
       SQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key,
       classOf[CreateAtomicTestManager].getName)
     val remoteDir = Utils.createTempDir().getAbsolutePath
-    val conf = RocksDBConf().copy(compactOnCommit = true)
-    withDB(remoteDir, conf = conf, hadoopConf = hadoopConf) { db =>
+    withDB(remoteDir, hadoopConf = hadoopConf) { db =>
       // Disable failure of output stream and generate versions
       CreateAtomicTestManager.shouldFailInCreateAtomic = false
       for (version <- 1 to 10) {
+        db.load(version - 1)
         db.put(version.toString, version.toString) // update "1" -> "1", "2" -> "2", ...
         db.commit()
       }
@@ -159,16 +403,35 @@ class RocksDBSuite extends SparkFunSuite {
 
       // Fail commit for next version and verify that reloading resets the files
       CreateAtomicTestManager.shouldFailInCreateAtomic = true
+      db.load(10)
       db.put("11", "11")
       intercept[IOException] { quietly { db.commit() } }
-      assert(db.load(10).iterator().map(toStr).toSet === version10Data)
+      assert(db.load(10, readOnly = true).iterator().map(toStr).toSet === version10Data)
       CreateAtomicTestManager.shouldFailInCreateAtomic = false
 
       // Abort commit for next version and verify that reloading resets the files
       db.load(10)
       db.put("11", "11")
       db.rollback()
-      assert(db.load(10).iterator().map(toStr).toSet === version10Data)
+      assert(db.load(10, readOnly = true).iterator().map(toStr).toSet === version10Data)
+    }
+  }
+
+  testWithChangelogCheckpointingEnabled("RocksDBFileManager: read and write changelog") {
+    val dfsRootDir = new File(Utils.createTempDir().getAbsolutePath + "/state/1/1")
+    val fileManager = new RocksDBFileManager(
+      dfsRootDir.getAbsolutePath, Utils.createTempDir(), new Configuration)
+    val changelogWriter = fileManager.getChangeLogWriter(1)
+    for (i <- 1 to 5) changelogWriter.put(i.toString, i.toString)
+    for (j <- 2 to 4) changelogWriter.delete(j.toString)
+    changelogWriter.commit()
+    val changelogReader = fileManager.getChangelogReader(1)
+    val entries = changelogReader.toSeq
+    val expectedEntries = (1 to 5).map(i => (i.toString.getBytes, i.toString.getBytes)) ++
+      (2 to 4).map(j => (j.toString.getBytes, null))
+    assert(entries.size == expectedEntries.size)
+    entries.zip(expectedEntries).map{
+      case (e1, e2) => assert(e1._1 === e2._1 && e1._2 === e2._2)
     }
   }
 
@@ -433,7 +696,7 @@ class RocksDBSuite extends SparkFunSuite {
     quietly {
       withDB(
         Utils.createTempDir().toString,
-        conf = RocksDBConf().copy(lockAcquireTimeoutMs = 20)) { db =>
+        conf = dbConf.copy(lockAcquireTimeoutMs = 20)) { db =>
         // DB has been loaded so current thread has alread acquired the lock on the RocksDB instance
 
         db.load(0)  // Current thread should be able to load again
@@ -470,26 +733,19 @@ class RocksDBSuite extends SparkFunSuite {
   }
 
   test("ensure concurrent access lock is released after Spark task completes") {
-    val conf = new SparkConf().setAppName("test").setMaster("local")
-    val sc = new SparkContext(conf)
-
-    try {
-      RocksDBSuite.withSingletonDB {
-        // Load a RocksDB instance, that is, get a lock inside a task and then fail
-        quietly {
-          intercept[Exception] {
-            sc.makeRDD[Int](1 to 1, 1).map { i =>
-              RocksDBSuite.singleton.load(0)
-              throw new Exception("fail this task to test lock release")
-            }.count()
-          }
+    RocksDBSuite.withSingletonDB {
+      // Load a RocksDB instance, that is, get a lock inside a task and then fail
+      quietly {
+        intercept[Exception] {
+          sparkContext.makeRDD[Int](1 to 1, 1).map { i =>
+            RocksDBSuite.singleton.load(0)
+            throw new Exception("fail this task to test lock release")
+          }.count()
         }
-
-        // Test whether you can load again, that is, will it successfully lock again
-        RocksDBSuite.singleton.load(0)
       }
-    } finally {
-      sc.stop()
+
+      // Test whether you can load again, that is, will it successfully lock again
+      RocksDBSuite.singleton.load(0)
     }
   }
 
@@ -572,7 +828,7 @@ class RocksDBSuite extends SparkFunSuite {
     // disable resetting stats
     withTempDir { dir =>
       val remoteDir = dir.getCanonicalPath
-      withDB(remoteDir, conf = RocksDBConf().copy(resetStatsOnLoad = false)) { db =>
+      withDB(remoteDir, conf = dbConf.copy(resetStatsOnLoad = false)) { db =>
         verifyMetrics(putCount = 0, getCount = 0, metrics = db.metrics)
         db.load(0)
         db.put("a", "1") // put also triggers a db get
@@ -618,7 +874,7 @@ class RocksDBSuite extends SparkFunSuite {
     test(s"SPARK-39781: adding valid max_open_files=$maxOpenFiles config property " +
       "for RocksDB state store instance should succeed") {
       withTempDir { dir =>
-        val sqlConf = SQLConf.get
+        val sqlConf = SQLConf.get.clone()
         sqlConf.setConfString("spark.sql.streaming.stateStore.rocksdb.maxOpenFiles", maxOpenFiles)
         val dbConf = RocksDBConf(StateStoreConf(sqlConf))
         assert(dbConf.maxOpenFiles === maxOpenFiles.toInt)
@@ -640,7 +896,7 @@ class RocksDBSuite extends SparkFunSuite {
       "for RocksDB state store instance should fail") {
       withTempDir { dir =>
         val ex = intercept[IllegalArgumentException] {
-          val sqlConf = SQLConf.get
+          val sqlConf = SQLConf.get.clone()
           sqlConf.setConfString("spark.sql.streaming.stateStore.rocksdb.maxOpenFiles",
             maxOpenFiles)
           val dbConf = RocksDBConf(StateStoreConf(sqlConf))
@@ -812,7 +1068,7 @@ class RocksDBSuite extends SparkFunSuite {
       var curVersion: Long = 0
       // starting with the config "trackTotalNumberOfRows = true"
       // this should track the number of rows correctly
-      withDB(remoteDir, conf = RocksDBConf().copy(trackTotalNumberOfRows = true)) { db =>
+      withDB(remoteDir, conf = dbConf.copy(trackTotalNumberOfRows = true)) { db =>
         db.load(curVersion)
         db.put("a", "5")
         db.put("b", "5")
@@ -828,7 +1084,7 @@ class RocksDBSuite extends SparkFunSuite {
 
       // restart with config "trackTotalNumberOfRows = false"
       // this should reset the number of keys as -1, and keep the number as -1
-      withDB(remoteDir, conf = RocksDBConf().copy(trackTotalNumberOfRows = false)) { db =>
+      withDB(remoteDir, conf = dbConf.copy(trackTotalNumberOfRows = false)) { db =>
         db.load(curVersion)
 
         assert(db.metrics.numUncommittedKeys === -1)
@@ -845,7 +1101,7 @@ class RocksDBSuite extends SparkFunSuite {
 
       // restart with config "trackTotalNumberOfRows = true" again
       // this should count the number of keys at the load phase, and continue tracking the number
-      withDB(remoteDir, conf = RocksDBConf().copy(trackTotalNumberOfRows = true)) { db =>
+      withDB(remoteDir, conf = dbConf.copy(trackTotalNumberOfRows = true)) { db =>
         db.load(curVersion)
 
         assert(db.metrics.numUncommittedKeys === 3)
@@ -865,10 +1121,14 @@ class RocksDBSuite extends SparkFunSuite {
     }
   }
 
+  private def sqlConf = SQLConf.get.clone()
+
+  private def dbConf = RocksDBConf(StateStoreConf(sqlConf))
+
   def withDB[T](
       remoteDir: String,
       version: Int = 0,
-      conf: RocksDBConf = RocksDBConf().copy(compactOnCommit = false, minVersionsToRetain = 100),
+      conf: RocksDBConf = dbConf,
       hadoopConf: Configuration = new Configuration())(
       func: RocksDB => T): T = {
     var db: RocksDB = null
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCompatibilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCompatibilitySuite.scala
index b189de8d2a2..b535d7e48d0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCompatibilitySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCompatibilitySuite.scala
@@ -61,7 +61,7 @@ class StateStoreCompatibilitySuite extends StreamTest with StateStoreCodecsTest
 }
 
 trait StateStoreCodecsTest extends SparkFunSuite with PlanTestBase {
-  private val codecsInShortName =
+  protected val codecsInShortName =
     CompressionCodec.ALL_COMPRESSION_CODECS.map { c => CompressionCodec.getShortName(c) }
 
   protected def testWithAllCodec(name: String)(func: => Any): Unit = {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
index 387e96142c5..9f8a588cc32 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
@@ -39,6 +39,7 @@ import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection, UnsafeRow}
 import org.apache.spark.sql.catalyst.util.quietly
 import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorSuite.withCoordinatorRef
 import org.apache.spark.sql.functions.count
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
@@ -279,7 +280,7 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider]
     val timeoutDuration = 1.minute
 
     quietly {
-      withSpark(new SparkContext(conf)) { sc =>
+      withSpark(SparkContext.getOrCreate(conf)) { sc =>
         withCoordinatorRef(sc) { coordinatorRef =>
           require(!StateStore.isMaintenanceRunning, "StateStore is unexpectedly running")
 
@@ -389,7 +390,7 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider]
     val timeoutDuration = 1.minute
 
     quietly {
-      withSpark(new SparkContext(conf)) { sc =>
+      withSpark(SparkContext.getOrCreate(conf)) { sc =>
         withCoordinatorRef(sc) { coordinatorRef =>
           require(!StateStore.isMaintenanceRunning, "StateStore is unexpectedly running")
 
@@ -1159,59 +1160,69 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider]
   }
 
   test("StateStore.get") {
+    val conf = new SparkConf()
+      .setMaster("local")
+      .setAppName("test")
     quietly {
-      val dir = newDir()
-      val storeId = StateStoreProviderId(StateStoreId(dir, 0, 0), UUID.randomUUID)
-      val storeConf = getDefaultStoreConf
-      val hadoopConf = new Configuration()
-
-      // Verify that trying to get incorrect versions throw errors
-      intercept[IllegalArgumentException] {
-        StateStore.get(
-          storeId, keySchema, valueSchema, 0, -1, storeConf, hadoopConf)
-      }
-      assert(!StateStore.isLoaded(storeId)) // version -1 should not attempt to load the store
-
-      intercept[IllegalStateException] {
-        StateStore.get(
-          storeId, keySchema, valueSchema, 0, 1, storeConf, hadoopConf)
-      }
+      withSpark(SparkContext.getOrCreate(conf)) { sc =>
+        withCoordinatorRef(sc) { coordinatorRef =>
+          val dir = newDir()
+          val storeId = StateStoreProviderId(StateStoreId(dir, 0, 0), UUID.randomUUID)
+          val storeConf = getDefaultStoreConf
+          val hadoopConf = new Configuration()
+
+          // Verify that trying to get incorrect versions throw errors
+          intercept[IllegalArgumentException] {
+            StateStore.get(
+              storeId, keySchema, valueSchema, 0, -1, storeConf, hadoopConf)
+          }
+          assert(!StateStore.isLoaded(storeId)) // version -1 should not attempt to load the store
 
-      // Increase version of the store and try to get again
-      val store0 = StateStore.get(
-        storeId, keySchema, valueSchema, 0, 0, storeConf, hadoopConf)
-      assert(store0.version === 0)
-      put(store0, "a", 0, 1)
-      store0.commit()
-
-      val store1 = StateStore.get(
-        storeId, keySchema, valueSchema, 0, 1, storeConf, hadoopConf)
-      assert(StateStore.isLoaded(storeId))
-      assert(store1.version === 1)
-      assert(rowPairsToDataSet(store1.iterator()) === Set(("a", 0) -> 1))
+          intercept[IllegalStateException] {
+            StateStore.get(
+              storeId, keySchema, valueSchema, 0, 1, storeConf, hadoopConf)
+          }
 
-      // Verify that you can also load older version
-      val store0reloaded = StateStore.get(
-        storeId, keySchema, valueSchema, 0, 0, storeConf, hadoopConf)
-      assert(store0reloaded.version === 0)
-      assert(rowPairsToDataSet(store0reloaded.iterator()) === Set.empty)
-
-      // Verify that you can remove the store and still reload and use it
-      StateStore.unload(storeId)
-      assert(!StateStore.isLoaded(storeId))
-
-      val store1reloaded = StateStore.get(
-        storeId, keySchema, valueSchema, 0, 1, storeConf, hadoopConf)
-      assert(StateStore.isLoaded(storeId))
-      assert(store1reloaded.version === 1)
-      put(store1reloaded, "a", 0, 2)
-      assert(store1reloaded.commit() === 2)
-      assert(rowPairsToDataSet(store1reloaded.iterator()) === Set(("a", 0) -> 2))
+          // Increase version of the store and try to get again
+          val store0 = StateStore.get(
+            storeId, keySchema, valueSchema, 0, 0, storeConf, hadoopConf)
+          assert(store0.version === 0)
+          put(store0, "a", 0, 1)
+          store0.commit()
+
+          val store1 = StateStore.get(
+            storeId, keySchema, valueSchema, 0, 1, storeConf, hadoopConf)
+          assert(StateStore.isLoaded(storeId))
+          assert(store1.version === 1)
+          assert(rowPairsToDataSet(store1.iterator()) === Set(("a", 0) -> 1))
+
+          // Verify that you can also load older version
+          val store0reloaded = StateStore.get(
+            storeId, keySchema, valueSchema, 0, 0, storeConf, hadoopConf)
+          assert(store0reloaded.version === 0)
+          assert(rowPairsToDataSet(store0reloaded.iterator()) === Set.empty)
+
+          // Verify that you can remove the store and still reload and use it
+          StateStore.unload(storeId)
+          assert(!StateStore.isLoaded(storeId))
+
+          val store1reloaded = StateStore.get(
+            storeId, keySchema, valueSchema, 0, 1, storeConf, hadoopConf)
+          assert(StateStore.isLoaded(storeId))
+          assert(store1reloaded.version === 1)
+          put(store1reloaded, "a", 0, 2)
+          assert(store1reloaded.commit() === 2)
+          assert(rowPairsToDataSet(store1reloaded.iterator()) === Set(("a", 0) -> 2))
+        }
+      }
     }
   }
 
   test("reports memory usage") {
-    tryWithProviderResource(newStoreProvider()) { provider =>
+    // RocksDB metrics is only guaranteed to update when snapshot is created, so we set
+    // minDeltasForSnapshot = 1 to enable snapshot generation here.
+    tryWithProviderResource(newStoreProvider(minDeltasForSnapshot = 1,
+      numOfVersToRetainInMemory = 1)) { provider =>
       val store = provider.getStore(0)
       val noDataMemoryUsed = store.metrics.memoryUsedBytes
       put(store, "a", 0, 1)
@@ -1262,7 +1273,7 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider]
     assert(metricNew.name === "m1", "incorrect name in copied instance")
 
     val conf = new SparkConf().setMaster("local").setAppName("SPARK-35763")
-    withSpark(new SparkContext(conf)) { sc =>
+    withSpark(SparkContext.getOrCreate(conf)) { sc =>
       val sqlMetric = metric.createSQLMetric(sc)
       assert(sqlMetric != null)
       assert(sqlMetric.name === Some("desc1"))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
index 49f4214ac1a..9be699a17d2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
@@ -1101,3 +1101,6 @@ object FlatMapGroupsWithStateSuite {
     throw new TestFailedException("Could get watermark when not expected", 20)
   }
 }
+
+class RocksDBStateStoreFlatMapGroupsWithStateSuite
+  extends FlatMapGroupsWithStateSuite with RocksDBStateStoreTest
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/RocksDBStateStoreTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/RocksDBStateStoreTest.scala
new file mode 100644
index 00000000000..4c73dd328b8
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/RocksDBStateStoreTest.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.scalactic.source.Position
+import org.scalatest.Tag
+
+import org.apache.spark.sql.execution.streaming.state.{RocksDBConf, RocksDBStateStoreProvider}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SQLTestUtils
+
+trait RocksDBStateStoreTest extends SQLTestUtils {
+
+  val rocksdbChangelogCheckpointingConfKey: String = RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX +
+    ".changelogCheckpointing.enabled"
+
+  override protected def test(testName: String, testTags: Tag*)(testBody: => Any)
+                             (implicit pos: Position): Unit = {
+    super.test(testName + " (RocksDBStateStore)", testTags: _*) {
+      withSQLConf(rocksdbChangelogCheckpointingConfKey -> "false",
+        SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName) {
+        testBody
+      }
+      // in case tests have any code that needs to execute after every test
+      super.afterEach()
+    }
+
+    super.test(testName + " (RocksDBStateStore with changelog checkpointing)", testTags: _*) {
+      // in case tests have any code that needs to execute before every test
+      super.beforeEach()
+      withSQLConf(rocksdbChangelogCheckpointingConfKey -> "true",
+        SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName) {
+        testBody
+      }
+    }
+  }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
index 09a0d969459..4ea59fe7405 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
@@ -956,3 +956,6 @@ class StreamingAggregationSuite extends StateStoreMetricsTest with Assertions {
     }
   }
 }
+
+class RocksDBStateStoreStreamingAggregationSuite
+  extends StreamingAggregationSuite with RocksDBStateStoreTest
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala
index 8607de38942..4c2a889c68d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala
@@ -483,3 +483,6 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest {
     )
   }
 }
+
+class RocksDBStateStoreStreamingDeduplicationSuite
+  extends StreamingDeduplicationSuite with RocksDBStateStoreTest


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org