You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ka...@apache.org on 2023/03/16 10:52:43 UTC

[spark] branch master updated: [SPARK-42819][SS] Add support for setting max_write_buffer_number and write_buffer_size for RocksDB used in streaming

This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new bafb953f32e [SPARK-42819][SS] Add support for setting max_write_buffer_number and  write_buffer_size for RocksDB used in streaming
bafb953f32e is described below

commit bafb953f32e4f7c04f7f163354ec997bae8aa0e6
Author: Anish Shrigondekar <an...@databricks.com>
AuthorDate: Thu Mar 16 19:52:16 2023 +0900

    [SPARK-42819][SS] Add support for setting max_write_buffer_number and  write_buffer_size for RocksDB used in streaming
    
    ### What changes were proposed in this pull request?
    Add support for setting max_write_buffer_number and write_buffer_size for RocksDB used in streaming
    
    ### Why are the changes needed?
    We need these settings in order to control memory tuning for RocksDB. We already expose settings for blockCache size. However, these 2 settings are missing. This change proposes to add them.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Added unit tests and docs in the guide doc
    RocksDBSuite
    ```
    [info] Run completed in 59 seconds, 336 milliseconds.
    [info] Total number of tests run: 27
    [info] Suites: completed 1, aborted 0
    [info] Tests: succeeded 27, failed 0, canceled 0, ignored 0, pending 0
    [info] All tests passed.
    [success] Total time: 165 s (02:45), completed Mar 15, 2023, 11:24:17 PM
    ```
    RocksDBStateStoreSuite
    ```
    [info] Run completed in 1 minute, 16 seconds.
    [info] Total number of tests run: 73
    [info] Suites: completed 1, aborted 0
    [info] Tests: succeeded 73, failed 0, canceled 0, ignored 0, pending 0
    [info] All tests passed.
    ```
    
    Closes #40455 from anishshri-db/task/SPARK-42819.
    
    Authored-by: Anish Shrigondekar <an...@databricks.com>
    Signed-off-by: Jungtaek Lim <ka...@gmail.com>
---
 docs/structured-streaming-programming-guide.md     | 10 ++++++
 .../sql/execution/streaming/state/RocksDB.scala    | 41 ++++++++++++++++++++--
 .../streaming/state/RocksDBStateStoreSuite.scala   |  4 +++
 .../execution/streaming/state/RocksDBSuite.scala   | 27 ++++++++++++++
 4 files changed, 79 insertions(+), 3 deletions(-)

diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md
index a71c774f328..486bed7184f 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -2289,6 +2289,16 @@ Here are the configs regarding to RocksDB instance of the state store provider:
     <td>Whether we track the total number of rows in state store. Please refer the details in <a href="#performance-aspect-considerations">Performance-aspect considerations</a>.</td>
     <td>True</td>
   </tr>
+  <tr>
+    <td>spark.sql.streaming.stateStore.rocksdb.writeBufferSizeMB</td>
+    <td>The maximum size of MemTable in RocksDB. Value of -1 means that RocksDB internal default values will be used</td>
+    <td>-1</td>
+  </tr>
+  <tr>
+    <td>spark.sql.streaming.stateStore.rocksdb.maxWriteBufferNumber</td>
+    <td>The maximum number of MemTables in RocksDB, both active and immutable. Value of -1 means that RocksDB internal default values will be used</td>
+    <td>-1</td>
+  </tr>
 </table>
 
 ##### Performance-aspect considerations
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index 89872afb80e..363cc2b5c46 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -72,7 +72,21 @@ class RocksDB(
   tableFormatConfig.setFilterPolicy(bloomFilter)
   tableFormatConfig.setFormatVersion(conf.formatVersion)
 
-  private val dbOptions = new Options() // options to open the RocksDB
+  private val columnFamilyOptions = new ColumnFamilyOptions()
+
+  private val dbOptions =
+    new Options(new DBOptions(), columnFamilyOptions) // options to open the RocksDB
+
+  // Set RocksDB options around MemTable memory usage. By default, we let RocksDB
+  // use its internal default values for these settings.
+  if (conf.writeBufferSizeMB > 0L) {
+    columnFamilyOptions.setWriteBufferSize(conf.writeBufferSizeMB * 1024 * 1024)
+  }
+
+  if (conf.maxWriteBufferNumber > 0L) {
+    columnFamilyOptions.setMaxWriteBufferNumber(conf.maxWriteBufferNumber)
+  }
+
   dbOptions.setCreateIfMissing(true)
   dbOptions.setTableFormatConfig(tableFormatConfig)
   dbOptions.setMaxOpenFiles(conf.maxOpenFiles)
@@ -558,7 +572,9 @@ case class RocksDBConf(
     resetStatsOnLoad : Boolean,
     formatVersion: Int,
     trackTotalNumberOfRows: Boolean,
-    maxOpenFiles: Int)
+    maxOpenFiles: Int,
+    writeBufferSizeMB: Long,
+    maxWriteBufferNumber: Int)
 
 object RocksDBConf {
   /** Common prefix of all confs in SQLConf that affects RocksDB */
@@ -609,6 +625,15 @@ object RocksDBConf {
   // again when you really need the know the number for observability/debuggability.
   private val TRACK_TOTAL_NUMBER_OF_ROWS = SQLConfEntry("trackTotalNumberOfRows", "true")
 
+  // Configuration to control maximum size of MemTable in RocksDB
+  private val WRITE_BUFFER_SIZE_MB_CONF = SQLConfEntry("writeBufferSizeMB", "-1")
+
+  // Configuration to set maximum number of MemTables in RocksDB, both active and immutable.
+  // If the active MemTable fills up and the total number of MemTables is larger than
+  // maxWriteBufferNumber, then RocksDB will stall further writes.
+  // This may happen if the flush process is slower than the write rate.
+  private val MAX_WRITE_BUFFER_NUMBER_CONF = SQLConfEntry("maxWriteBufferNumber", "-1")
+
   def apply(storeConf: StateStoreConf): RocksDBConf = {
     val sqlConfs = CaseInsensitiveMap[String](storeConf.sqlConfs)
     val extraConfs = CaseInsensitiveMap[String](storeConf.extraOptions)
@@ -633,6 +658,14 @@ object RocksDBConf {
       }
     }
 
+    def getLongConf(conf: ConfEntry): Long = {
+      Try { getConfigMap(conf).getOrElse(conf.fullName, conf.default).toLong } getOrElse {
+        throw new IllegalArgumentException(
+          s"Invalid value for '${conf.fullName}', must be a long"
+        )
+      }
+    }
+
     def getPositiveLongConf(conf: ConfEntry): Long = {
       Try {
         getConfigMap(conf).getOrElse(conf.fullName, conf.default).toLong
@@ -660,7 +693,9 @@ object RocksDBConf {
       getBooleanConf(RESET_STATS_ON_LOAD),
       getPositiveIntConf(FORMAT_VERSION),
       getBooleanConf(TRACK_TOTAL_NUMBER_OF_ROWS),
-      getIntConf(MAX_OPEN_FILES_CONF))
+      getIntConf(MAX_OPEN_FILES_CONF),
+      getLongConf(WRITE_BUFFER_SIZE_MB_CONF),
+      getIntConf(MAX_WRITE_BUFFER_NUMBER_CONF))
   }
 
   def apply(): RocksDBConf = apply(new StateStoreConf())
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
index 1998e2af114..54c99741874 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
@@ -76,6 +76,8 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid
         (RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".compactOnCommit", "true"),
         (RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".lockAcquireTimeoutMs", "10"),
         (RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".maxOpenFiles", "1000"),
+        (RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".maxWriteBufferNumber", "3"),
+        (RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".writeBufferSizeMB", "16"),
         (SQLConf.STATE_STORE_ROCKSDB_FORMAT_VERSION.key, "4")
       )
       testConfs.foreach { case (k, v) => spark.conf.set(k, v) }
@@ -102,6 +104,8 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid
       assert(rocksDBConfInTask.lockAcquireTimeoutMs == 10L)
       assert(rocksDBConfInTask.formatVersion == 4)
       assert(rocksDBConfInTask.maxOpenFiles == 1000)
+      assert(rocksDBConfInTask.maxWriteBufferNumber == 3)
+      assert(rocksDBConfInTask.writeBufferSizeMB == 16L)
     }
   }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
index 417eff65482..3d45a8868e0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
@@ -660,6 +660,33 @@ class RocksDBSuite extends SparkFunSuite {
     }
   }
 
+  Seq("1", "2", "3").foreach { maxWriteBufferNumber =>
+    Seq("16", "32", "64").foreach {writeBufferSizeMB =>
+      test(s"SPARK-42819: configure memtable memory usage with " +
+        s"maxWriteBufferNumber=$maxWriteBufferNumber and writeBufferSize=$writeBufferSizeMB") {
+        withTempDir { dir =>
+          val sqlConf = new SQLConf
+          sqlConf.setConfString("spark.sql.streaming.stateStore.rocksdb.maxWriteBufferNumber",
+            maxWriteBufferNumber)
+          sqlConf.setConfString("spark.sql.streaming.stateStore.rocksdb.writeBufferSizeMB",
+            writeBufferSizeMB)
+          val dbConf = RocksDBConf(StateStoreConf(sqlConf))
+          assert(dbConf.maxWriteBufferNumber === maxWriteBufferNumber.toInt)
+          assert(dbConf.writeBufferSizeMB === writeBufferSizeMB.toInt)
+
+          val remoteDir = dir.getCanonicalPath
+          withDB(remoteDir, conf = dbConf) { db =>
+            // Do some DB ops
+            db.load(0)
+            db.put("a", "1")
+            db.commit()
+            assert(toStr(db.get("a")) === "1")
+          }
+        }
+      }
+    }
+  }
+
   test("SPARK-37224: flipping option 'trackTotalNumberOfRows' during restart") {
     withTempDir { dir =>
       val remoteDir = dir.getCanonicalPath


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org