You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by vi...@apache.org on 2021/08/19 07:24:57 UTC

[spark] branch branch-3.2 updated: [SPARK-36519][SS] Store RocksDB format version in the checkpoint for streaming queries

This is an automated email from the ASF dual-hosted git repository.

viirya pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 54cca7f  [SPARK-36519][SS] Store RocksDB format version in the checkpoint for streaming queries
54cca7f is described below

commit 54cca7f82ecf23e062bb4f6d68697abec2dbcc5b
Author: Shixiong Zhu <zs...@gmail.com>
AuthorDate: Thu Aug 19 00:23:40 2021 -0700

    [SPARK-36519][SS] Store RocksDB format version in the checkpoint for streaming queries
    
    ### What changes were proposed in this pull request?
    
    RocksDB provides backward compatibility but it doesn't always provide forward compatibility. It's better to store the RocksDB format version in the checkpoint so that it would give us more information to provide the rollback guarantee when we upgrade the RocksDB version that may introduce incompatible change in a new Spark version.
    
    A typical case is when a user upgrades their query to a new Spark version, and this new Spark version has a new RocksDB version which may use a new format. But the user hits some bug and decide to rollback. But in the old Spark version, the old RocksDB version cannot read the new format.
    
    In order to handle this case, we will write the RocksDB format version to the checkpoint. When restarting from a checkpoint, we will force RocksDB to use the format version stored in the checkpoint. This will ensure the user can rollback their Spark version if needed.
    
    We also provide a config `spark.sql.streaming.stateStore.rocksdb.formatVersion` for users who don't need to rollback their Spark versions to overwrite the format version specified in the checkpoint.
    
    ### Why are the changes needed?
    
    Provide the Spark version rollback guarantee for streaming queries when a new RocksDB introduces an incompatible format change.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No. RocksDB state store is a new feature in Spark 3.2, which has not yet released.
    
    ### How was this patch tested?
    
    The new unit tests.
    
    Closes #33749 from zsxwing/SPARK-36519.
    
    Authored-by: Shixiong Zhu <zs...@gmail.com>
    Signed-off-by: Liang-Chi Hsieh <vi...@gmail.com>
    (cherry picked from commit ea4919801aa91800bf91c561a0c1c9f3f7dfd0e7)
    Signed-off-by: Liang-Chi Hsieh <vi...@gmail.com>
---
 .../org/apache/spark/sql/internal/SQLConf.scala    | 16 ++++++
 .../spark/sql/execution/streaming/OffsetSeq.scala  |  3 +-
 .../sql/execution/streaming/state/RocksDB.scala    | 26 ++++++++-
 .../state/RocksDBStateStoreIntegrationSuite.scala  | 65 +++++++++++++++++++++-
 .../streaming/state/RocksDBStateStoreSuite.scala   |  4 +-
 5 files changed, 109 insertions(+), 5 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 6869977..b137fd2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1564,6 +1564,22 @@ object SQLConf {
       .stringConf
       .createWithDefault("lz4")
 
+  /**
+   * Note: this is defined in `RocksDBConf.FORMAT_VERSION`. These two places should be updated
+   * together.
+   */
+  val STATE_STORE_ROCKSDB_FORMAT_VERSION =
+    buildConf("spark.sql.streaming.stateStore.rocksdb.formatVersion")
+      .internal()
+      .doc("Set the RocksDB format version. This will be stored in the checkpoint when starting " +
+        "a streaming query. The checkpoint will use this RocksDB format version in the entire " +
+        "lifetime of the query.")
+      .version("3.2.0")
+      .intConf
+      .checkValue(_ >= 0, "Must not be negative")
+      // 5 is the default table format version for RocksDB 6.20.3.
+      .createWithDefault(5)
+
   val STREAMING_AGGREGATION_STATE_FORMAT_VERSION =
     buildConf("spark.sql.streaming.aggregation.stateFormatVersion")
       .internal()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
index 7d7ec76..c08a14c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
@@ -97,7 +97,8 @@ object OffsetSeqMetadata extends Logging {
   private val relevantSQLConfs = Seq(
     SHUFFLE_PARTITIONS, STATE_STORE_PROVIDER_CLASS, STREAMING_MULTIPLE_WATERMARK_POLICY,
     FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION, STREAMING_AGGREGATION_STATE_FORMAT_VERSION,
-    STREAMING_JOIN_STATE_FORMAT_VERSION, STATE_STORE_COMPRESSION_CODEC)
+    STREAMING_JOIN_STATE_FORMAT_VERSION, STATE_STORE_COMPRESSION_CODEC,
+    STATE_STORE_ROCKSDB_FORMAT_VERSION)
 
   /**
    * Default values of relevant configurations that are used for backward compatibility.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index c47600e..e9ef6e5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -70,6 +70,7 @@ class RocksDB(
   tableFormatConfig.setBlockSize(conf.blockSizeKB * 1024)
   tableFormatConfig.setBlockCache(new LRUCache(conf.blockCacheSizeMB * 1024 * 1024))
   tableFormatConfig.setFilterPolicy(bloomFilter)
+  tableFormatConfig.setFormatVersion(conf.formatVersion)
 
   private val dbOptions = new Options() // options to open the RocksDB
   dbOptions.setCreateIfMissing(true)
@@ -497,7 +498,8 @@ case class RocksDBConf(
     blockSizeKB: Long,
     blockCacheSizeMB: Long,
     lockAcquireTimeoutMs: Long,
-    resetStatsOnLoad : Boolean)
+    resetStatsOnLoad : Boolean,
+    formatVersion: Int)
 
 object RocksDBConf {
   /** Common prefix of all confs in SQLConf that affects RocksDB */
@@ -514,6 +516,18 @@ object RocksDBConf {
   private val BLOCK_CACHE_SIZE_MB_CONF = ConfEntry("blockCacheSizeMB", "8")
   private val LOCK_ACQUIRE_TIMEOUT_MS_CONF = ConfEntry("lockAcquireTimeoutMs", "60000")
   private val RESET_STATS_ON_LOAD = ConfEntry("resetStatsOnLoad", "true")
+  // Configuration to set the RocksDB format version. When upgrading the RocksDB version in Spark,
+  // it may introduce a new table format version that can not be supported by an old RocksDB version
+  // used by an old Spark version. Hence, we store the table format version in the checkpoint when
+  // a query starts and use it in the entire lifetime of the query. This will ensure the user can
+  // still rollback their Spark version for an existing query when RocksDB changes its default
+  // table format in a new version. See
+  // https://github.com/facebook/rocksdb/wiki/RocksDB-Compatibility-Between-Different-Releases
+  // for the RocksDB compatibility guarantee.
+  //
+  // Note: this is also defined in `SQLConf.STATE_STORE_ROCKSDB_FORMAT_VERSION`. These two
+  // places should be updated together.
+  private val FORMAT_VERSION = ConfEntry("formatVersion", "5")
 
   def apply(storeConf: StateStoreConf): RocksDBConf = {
     val confs = CaseInsensitiveMap[String](storeConf.confs)
@@ -531,6 +545,13 @@ object RocksDBConf {
       }
     }
 
+    def getPositiveIntConf(conf: ConfEntry): Int = {
+      Try { confs.getOrElse(conf.fullName, conf.default).toInt } filter { _ >= 0 } getOrElse {
+        throw new IllegalArgumentException(
+          s"Invalid value for '${conf.fullName}', must be a positive integer")
+      }
+    }
+
     RocksDBConf(
       storeConf.minVersionsToRetain,
       getBooleanConf(COMPACT_ON_COMMIT_CONF),
@@ -538,7 +559,8 @@ object RocksDBConf {
       getPositiveLongConf(BLOCK_SIZE_KB_CONF),
       getPositiveLongConf(BLOCK_CACHE_SIZE_MB_CONF),
       getPositiveLongConf(LOCK_ACQUIRE_TIMEOUT_MS_CONF),
-      getBooleanConf(RESET_STATS_ON_LOAD))
+      getBooleanConf(RESET_STATS_ON_LOAD),
+      getPositiveIntConf(FORMAT_VERSION))
   }
 
   def apply(): RocksDBConf = apply(new StateStoreConf())
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala
index e33ec4d..2d741d3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala
@@ -23,7 +23,7 @@ import scala.collection.JavaConverters
 
 import org.scalatest.time.{Minute, Span}
 
-import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.sql.execution.streaming.{MemoryStream, StreamingQueryWrapper}
 import org.apache.spark.sql.functions.count
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.streaming._
@@ -103,5 +103,68 @@ class RocksDBStateStoreIntegrationSuite extends StreamTest {
       }
     }
   }
+
+  testQuietly("SPARK-36519: store RocksDB format version in the checkpoint") {
+    def getFormatVersion(query: StreamingQuery): Int = {
+      query.asInstanceOf[StreamingQueryWrapper].streamingQuery.lastExecution.sparkSession
+        .sessionState.conf.getConf(SQLConf.STATE_STORE_ROCKSDB_FORMAT_VERSION)
+    }
+
+    withSQLConf(
+      SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName) {
+      withTempDir { dir =>
+        val inputData = MemoryStream[Int]
+
+        def startQuery(): StreamingQuery = {
+          inputData.toDS().toDF("value")
+            .select('value)
+            .groupBy($"value")
+            .agg(count("*"))
+            .writeStream
+            .format("console")
+            .option("checkpointLocation", dir.getCanonicalPath)
+            .outputMode("complete")
+            .start()
+        }
+
+        // The format version should be 5 by default
+        var query = startQuery()
+        inputData.addData(1, 2)
+        query.processAllAvailable()
+        assert(getFormatVersion(query) == 5)
+        query.stop()
+
+        // Setting the format version manually should not overwrite the value in the checkpoint
+        withSQLConf(SQLConf.STATE_STORE_ROCKSDB_FORMAT_VERSION.key -> "4") {
+          query = startQuery()
+          inputData.addData(1, 2)
+          query.processAllAvailable()
+          assert(getFormatVersion(query) == 5)
+          query.stop()
+        }
+      }
+    }
+  }
+
+  testQuietly("SPARK-36519: RocksDB format version can be set by the SQL conf") {
+    withSQLConf(
+      SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName,
+      // Set an unsupported RocksDB format version and the query should fail if it's passed down
+      // into RocksDB
+      SQLConf.STATE_STORE_ROCKSDB_FORMAT_VERSION.key -> "100") {
+      val inputData = MemoryStream[Int]
+      val query = inputData.toDS().toDF("value")
+        .select('value)
+        .groupBy($"value")
+        .agg(count("*"))
+        .writeStream
+        .format("console")
+        .outputMode("complete")
+        .start()
+      inputData.addData(1, 2)
+      val e = intercept[StreamingQueryException](query.processAllAvailable())
+      assert(e.getCause.getCause.getMessage.contains("Unsupported BlockBasedTable format_version"))
+    }
+  }
 }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
index b91ed26..24c89a7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
@@ -63,7 +63,8 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid
         ("spark.sql.streaming.stateStore.providerClass",
           classOf[RocksDBStateStoreProvider].getName),
         (RocksDBConf.ROCKSDB_CONF_NAME_PREFIX + ".compactOnCommit", "true"),
-        (RocksDBConf.ROCKSDB_CONF_NAME_PREFIX + ".lockAcquireTimeoutMs", "10")
+        (RocksDBConf.ROCKSDB_CONF_NAME_PREFIX + ".lockAcquireTimeoutMs", "10"),
+        (SQLConf.STATE_STORE_ROCKSDB_FORMAT_VERSION.key, "4")
       )
       testConfs.foreach { case (k, v) => spark.conf.set(k, v) }
 
@@ -87,6 +88,7 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid
       // Verify the confs are same as those configured in the session conf
       assert(rocksDBConfInTask.compactOnCommit == true)
       assert(rocksDBConfInTask.lockAcquireTimeoutMs == 10L)
+      assert(rocksDBConfInTask.formatVersion == 4)
     }
   }
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org