You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ka...@apache.org on 2023/03/16 10:52:43 UTC
[spark] branch master updated: [SPARK-42819][SS] Add support for setting max_write_buffer_number and write_buffer_size for RocksDB used in streaming
This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new bafb953f32e [SPARK-42819][SS] Add support for setting max_write_buffer_number and write_buffer_size for RocksDB used in streaming
bafb953f32e is described below
commit bafb953f32e4f7c04f7f163354ec997bae8aa0e6
Author: Anish Shrigondekar <an...@databricks.com>
AuthorDate: Thu Mar 16 19:52:16 2023 +0900
[SPARK-42819][SS] Add support for setting max_write_buffer_number and write_buffer_size for RocksDB used in streaming
### What changes were proposed in this pull request?
Add support for setting max_write_buffer_number and write_buffer_size for RocksDB used in streaming
### Why are the changes needed?
We need these settings in order to control memory tuning for RocksDB. We already expose settings for blockCache size. However, these 2 settings are missing. This change proposes to add them.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added unit tests and docs in the guide doc
RocksDBSuite
```
[info] Run completed in 59 seconds, 336 milliseconds.
[info] Total number of tests run: 27
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 27, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
[success] Total time: 165 s (02:45), completed Mar 15, 2023, 11:24:17 PM
```
RocksDBStateStoreSuite
```
[info] Run completed in 1 minute, 16 seconds.
[info] Total number of tests run: 73
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 73, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
```
Closes #40455 from anishshri-db/task/SPARK-42819.
Authored-by: Anish Shrigondekar <an...@databricks.com>
Signed-off-by: Jungtaek Lim <ka...@gmail.com>
---
docs/structured-streaming-programming-guide.md | 10 ++++++
.../sql/execution/streaming/state/RocksDB.scala | 41 ++++++++++++++++++++--
.../streaming/state/RocksDBStateStoreSuite.scala | 4 +++
.../execution/streaming/state/RocksDBSuite.scala | 27 ++++++++++++++
4 files changed, 79 insertions(+), 3 deletions(-)
diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md
index a71c774f328..486bed7184f 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -2289,6 +2289,16 @@ Here are the configs regarding to RocksDB instance of the state store provider:
<td>Whether we track the total number of rows in state store. Please refer the details in <a href="#performance-aspect-considerations">Performance-aspect considerations</a>.</td>
<td>True</td>
</tr>
+ <tr>
+ <td>spark.sql.streaming.stateStore.rocksdb.writeBufferSizeMB</td>
+ <td>The maximum size of MemTable in RocksDB. Value of -1 means that RocksDB internal default values will be used</td>
+ <td>-1</td>
+ </tr>
+ <tr>
+ <td>spark.sql.streaming.stateStore.rocksdb.maxWriteBufferNumber</td>
+ <td>The maximum number of MemTables in RocksDB, both active and immutable. Value of -1 means that RocksDB internal default values will be used</td>
+ <td>-1</td>
+ </tr>
</table>
##### Performance-aspect considerations
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index 89872afb80e..363cc2b5c46 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -72,7 +72,21 @@ class RocksDB(
tableFormatConfig.setFilterPolicy(bloomFilter)
tableFormatConfig.setFormatVersion(conf.formatVersion)
- private val dbOptions = new Options() // options to open the RocksDB
+ private val columnFamilyOptions = new ColumnFamilyOptions()
+
+ private val dbOptions =
+ new Options(new DBOptions(), columnFamilyOptions) // options to open the RocksDB
+
+ // Set RocksDB options around MemTable memory usage. By default, we let RocksDB
+ // use its internal default values for these settings.
+ if (conf.writeBufferSizeMB > 0L) {
+ columnFamilyOptions.setWriteBufferSize(conf.writeBufferSizeMB * 1024 * 1024)
+ }
+
+ if (conf.maxWriteBufferNumber > 0L) {
+ columnFamilyOptions.setMaxWriteBufferNumber(conf.maxWriteBufferNumber)
+ }
+
dbOptions.setCreateIfMissing(true)
dbOptions.setTableFormatConfig(tableFormatConfig)
dbOptions.setMaxOpenFiles(conf.maxOpenFiles)
@@ -558,7 +572,9 @@ case class RocksDBConf(
resetStatsOnLoad : Boolean,
formatVersion: Int,
trackTotalNumberOfRows: Boolean,
- maxOpenFiles: Int)
+ maxOpenFiles: Int,
+ writeBufferSizeMB: Long,
+ maxWriteBufferNumber: Int)
object RocksDBConf {
/** Common prefix of all confs in SQLConf that affects RocksDB */
@@ -609,6 +625,15 @@ object RocksDBConf {
// again when you really need the know the number for observability/debuggability.
private val TRACK_TOTAL_NUMBER_OF_ROWS = SQLConfEntry("trackTotalNumberOfRows", "true")
+ // Configuration to control maximum size of MemTable in RocksDB
+ private val WRITE_BUFFER_SIZE_MB_CONF = SQLConfEntry("writeBufferSizeMB", "-1")
+
+ // Configuration to set maximum number of MemTables in RocksDB, both active and immutable.
+ // If the active MemTable fills up and the total number of MemTables is larger than
+ // maxWriteBufferNumber, then RocksDB will stall further writes.
+ // This may happen if the flush process is slower than the write rate.
+ private val MAX_WRITE_BUFFER_NUMBER_CONF = SQLConfEntry("maxWriteBufferNumber", "-1")
+
def apply(storeConf: StateStoreConf): RocksDBConf = {
val sqlConfs = CaseInsensitiveMap[String](storeConf.sqlConfs)
val extraConfs = CaseInsensitiveMap[String](storeConf.extraOptions)
@@ -633,6 +658,14 @@ object RocksDBConf {
}
}
+ def getLongConf(conf: ConfEntry): Long = {
+ Try { getConfigMap(conf).getOrElse(conf.fullName, conf.default).toLong } getOrElse {
+ throw new IllegalArgumentException(
+ s"Invalid value for '${conf.fullName}', must be a long"
+ )
+ }
+ }
+
def getPositiveLongConf(conf: ConfEntry): Long = {
Try {
getConfigMap(conf).getOrElse(conf.fullName, conf.default).toLong
@@ -660,7 +693,9 @@ object RocksDBConf {
getBooleanConf(RESET_STATS_ON_LOAD),
getPositiveIntConf(FORMAT_VERSION),
getBooleanConf(TRACK_TOTAL_NUMBER_OF_ROWS),
- getIntConf(MAX_OPEN_FILES_CONF))
+ getIntConf(MAX_OPEN_FILES_CONF),
+ getLongConf(WRITE_BUFFER_SIZE_MB_CONF),
+ getIntConf(MAX_WRITE_BUFFER_NUMBER_CONF))
}
def apply(): RocksDBConf = apply(new StateStoreConf())
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
index 1998e2af114..54c99741874 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
@@ -76,6 +76,8 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid
(RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".compactOnCommit", "true"),
(RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".lockAcquireTimeoutMs", "10"),
(RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".maxOpenFiles", "1000"),
+ (RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".maxWriteBufferNumber", "3"),
+ (RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".writeBufferSizeMB", "16"),
(SQLConf.STATE_STORE_ROCKSDB_FORMAT_VERSION.key, "4")
)
testConfs.foreach { case (k, v) => spark.conf.set(k, v) }
@@ -102,6 +104,8 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid
assert(rocksDBConfInTask.lockAcquireTimeoutMs == 10L)
assert(rocksDBConfInTask.formatVersion == 4)
assert(rocksDBConfInTask.maxOpenFiles == 1000)
+ assert(rocksDBConfInTask.maxWriteBufferNumber == 3)
+ assert(rocksDBConfInTask.writeBufferSizeMB == 16L)
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
index 417eff65482..3d45a8868e0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
@@ -660,6 +660,33 @@ class RocksDBSuite extends SparkFunSuite {
}
}
+ Seq("1", "2", "3").foreach { maxWriteBufferNumber =>
+ Seq("16", "32", "64").foreach {writeBufferSizeMB =>
+ test(s"SPARK-42819: configure memtable memory usage with " +
+ s"maxWriteBufferNumber=$maxWriteBufferNumber and writeBufferSize=$writeBufferSizeMB") {
+ withTempDir { dir =>
+ val sqlConf = new SQLConf
+ sqlConf.setConfString("spark.sql.streaming.stateStore.rocksdb.maxWriteBufferNumber",
+ maxWriteBufferNumber)
+ sqlConf.setConfString("spark.sql.streaming.stateStore.rocksdb.writeBufferSizeMB",
+ writeBufferSizeMB)
+ val dbConf = RocksDBConf(StateStoreConf(sqlConf))
+ assert(dbConf.maxWriteBufferNumber === maxWriteBufferNumber.toInt)
+ assert(dbConf.writeBufferSizeMB === writeBufferSizeMB.toInt)
+
+ val remoteDir = dir.getCanonicalPath
+ withDB(remoteDir, conf = dbConf) { db =>
+ // Do some DB ops
+ db.load(0)
+ db.put("a", "1")
+ db.commit()
+ assert(toStr(db.get("a")) === "1")
+ }
+ }
+ }
+ }
+ }
+
test("SPARK-37224: flipping option 'trackTotalNumberOfRows' during restart") {
withTempDir { dir =>
val remoteDir = dir.getCanonicalPath
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org