You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ka...@apache.org on 2022/07/15 03:34:57 UTC

[spark] branch master updated: [SPARK-39781][SS] Add support for providing max_open_files to rocksdb state store provider

This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new c05d0fda2b3 [SPARK-39781][SS] Add support for providing max_open_files to rocksdb state store provider
c05d0fda2b3 is described below

commit c05d0fda2b312dbae035bb4166f4e89dd4dd0c1e
Author: Anish Shrigondekar <an...@databricks.com>
AuthorDate: Fri Jul 15 12:34:31 2022 +0900

    [SPARK-39781][SS] Add support for providing max_open_files to rocksdb state store provider
    
    ### What changes were proposed in this pull request?
    For some large users of stateful queries with lot of rocksdb related files open, they run into IO exceptions around "too many open files".
    ```
    Job aborted due to stage failure: ... : org.rocksdb.RocksDBException: While open a file for random read: ... XXX.sst: Too many open files
    ```
    This change allows configuring the max_open_files property for the underlying RocksDB instance.
    
    ### Why are the changes needed?
    By default, value for maxOpenFiles is -1, which means that the DB can keep opened files always open. However, in some cases, this will hit the OS limit and crash the process. As part of this change, we provide a state store config option for RocksDB to set this to a finite value so that number of opened files can be bounded per RocksDB instance.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Added tests to validate config passed through a RocksDB conf as well as through Spark session.
    
    ```
    [info] - RocksDB confs are passed correctly from SparkSession to db instance (2 seconds, 377 milliseconds)
    12:54:57.927 WARN org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreSuite:
    
    ===== POSSIBLE THREAD LEAK IN SUITE o.a.s.sql.execution.streaming.state.RocksDBStateStoreSuite, threads: rpc-boss-3-1 (daemon=true), shuffle-boss-6-1 (daemon=true) =====
    [info] Run completed in 4 seconds, 24 milliseconds.
    [info] Total number of tests run: 1
    [info] Suites: completed 1, aborted 0
    [info] Tests: succeeded 1, failed 0, canceled 0, ignored 0, pending 0
    [info] All tests passed.
    ```
    
    ```
    [info] RocksDBSuite:
    12:55:56.165 WARN org.apache.hadoop.util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    [info] - SPARK-39781: adding valid max_open_files=-1 config property for RocksDB state store instance should succeed (1 second, 553 milliseconds)
    [info] - SPARK-39781: adding valid max_open_files=100 config property for RocksDB state store instance should succeed (664 milliseconds)
    [info] - SPARK-39781: adding valid max_open_files=1000 config property for RocksDB state store instance should succeed (558 milliseconds)
    [info] - SPARK-39781: adding invalid max_open_files=test config property for RocksDB state store instance should fail (9 milliseconds)
    [info] - SPARK-39781: adding invalid max_open_files=true config property for RocksDB state store instance should fail (8 milliseconds)
    [info] Run completed in 3 seconds, 815 milliseconds.
    [info] Total number of tests run: 5
    [info] Suites: completed 1, aborted 0
    [info] Tests: succeeded 5, failed 0, canceled 0, ignored 0, pending 0
    [info] All tests passed.
    ```
    
    Closes #37196 from anishshri-db/task/SPARK-39781.
    
    Authored-by: Anish Shrigondekar <an...@databricks.com>
    Signed-off-by: Jungtaek Lim <ka...@gmail.com>
---
 docs/structured-streaming-programming-guide.md     |  5 +++
 .../sql/execution/streaming/state/RocksDB.scala    | 17 +++++++-
 .../streaming/state/RocksDBStateStoreSuite.scala   |  2 +
 .../execution/streaming/state/RocksDBSuite.scala   | 49 ++++++++++++++++++++++
 4 files changed, 71 insertions(+), 2 deletions(-)

diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md
index 8bb3cdf3ef3..c0f501a3d92 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -1958,6 +1958,11 @@ Here are the configs regarding to RocksDB instance of the state store provider:
     <td>The waiting time in millisecond for acquiring lock in the load operation for RocksDB instance.</td>
     <td>60000</td>
   </tr>
+  <tr>
+    <td>spark.sql.streaming.stateStore.rocksdb.maxOpenFiles</td>
+    <td>The number of open files that can be used by the RocksDB instance. Value of -1 means that files opened are always kept open. If the open file limit is reached, RocksDB will evict entries from the open file cache and close those file descriptors and remove the entries from the cache.</td>
+    <td>-1</td>
+  </tr>
   <tr>
     <td>spark.sql.streaming.stateStore.rocksdb.resetStatsOnLoad</td>
     <td>Whether we resets all ticker and histogram stats for RocksDB on load.</td>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index ae16faa41dc..3e1bcbbbf0d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -75,6 +75,7 @@ class RocksDB(
   private val dbOptions = new Options() // options to open the RocksDB
   dbOptions.setCreateIfMissing(true)
   dbOptions.setTableFormatConfig(tableFormatConfig)
+  dbOptions.setMaxOpenFiles(conf.maxOpenFiles)
   private val dbLogger = createLogger() // for forwarding RocksDB native logs to log4j
   dbOptions.setStatistics(new Statistics())
   private val nativeStats = dbOptions.statistics()
@@ -542,7 +543,8 @@ case class RocksDBConf(
     lockAcquireTimeoutMs: Long,
     resetStatsOnLoad : Boolean,
     formatVersion: Int,
-    trackTotalNumberOfRows: Boolean)
+    trackTotalNumberOfRows: Boolean,
+    maxOpenFiles: Int)
 
 object RocksDBConf {
   /** Common prefix of all confs in SQLConf that affects RocksDB */
@@ -558,6 +560,9 @@ object RocksDBConf {
   private val BLOCK_CACHE_SIZE_MB_CONF = ConfEntry("blockCacheSizeMB", "8")
   private val LOCK_ACQUIRE_TIMEOUT_MS_CONF = ConfEntry("lockAcquireTimeoutMs", "60000")
   private val RESET_STATS_ON_LOAD = ConfEntry("resetStatsOnLoad", "true")
+  // Config to specify the number of open files that can be used by the DB. Value of -1 means
+  // that files opened are always kept open.
+  private val MAX_OPEN_FILES_CONF = ConfEntry("maxOpenFiles", "-1")
   // Configuration to set the RocksDB format version. When upgrading the RocksDB version in Spark,
   // it may introduce a new table format version that can not be supported by an old RocksDB version
   // used by an old Spark version. Hence, we store the table format version in the checkpoint when
@@ -588,6 +593,13 @@ object RocksDBConf {
       }
     }
 
+    def getIntConf(conf: ConfEntry): Int = {
+      Try { confs.getOrElse(conf.fullName, conf.default).toInt } getOrElse {
+        throw new IllegalArgumentException(s"Invalid value for '${conf.fullName}', " +
+          "must be an integer")
+      }
+    }
+
     def getPositiveLongConf(conf: ConfEntry): Long = {
       Try { confs.getOrElse(conf.fullName, conf.default).toLong } filter { _ >= 0 } getOrElse {
         throw new IllegalArgumentException(
@@ -610,7 +622,8 @@ object RocksDBConf {
       getPositiveLongConf(LOCK_ACQUIRE_TIMEOUT_MS_CONF),
       getBooleanConf(RESET_STATS_ON_LOAD),
       getPositiveIntConf(FORMAT_VERSION),
-      getBooleanConf(TRACK_TOTAL_NUMBER_OF_ROWS))
+      getBooleanConf(TRACK_TOTAL_NUMBER_OF_ROWS),
+      getIntConf(MAX_OPEN_FILES_CONF))
   }
 
   def apply(): RocksDBConf = apply(new StateStoreConf())
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
index c93d0f03513..67181d7684e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
@@ -75,6 +75,7 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid
           classOf[RocksDBStateStoreProvider].getName),
         (RocksDBConf.ROCKSDB_CONF_NAME_PREFIX + ".compactOnCommit", "true"),
         (RocksDBConf.ROCKSDB_CONF_NAME_PREFIX + ".lockAcquireTimeoutMs", "10"),
+        (RocksDBConf.ROCKSDB_CONF_NAME_PREFIX + ".maxOpenFiles", "1000"),
         (SQLConf.STATE_STORE_ROCKSDB_FORMAT_VERSION.key, "4")
       )
       testConfs.foreach { case (k, v) => spark.conf.set(k, v) }
@@ -100,6 +101,7 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid
       assert(rocksDBConfInTask.compactOnCommit == true)
       assert(rocksDBConfInTask.lockAcquireTimeoutMs == 10L)
       assert(rocksDBConfInTask.formatVersion == 4)
+      assert(rocksDBConfInTask.maxOpenFiles == 1000)
     }
   }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
index 9b4f569e79f..00f9c7b8c00 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
@@ -472,6 +472,55 @@ class RocksDBSuite extends SparkFunSuite {
     }
   }
 
+  // Add tests to check valid and invalid values for max_open_files passed to the underlying
+  // RocksDB instance.
+  Seq("-1", "100", "1000").foreach { maxOpenFiles =>
+    test(s"SPARK-39781: adding valid max_open_files=$maxOpenFiles config property " +
+      "for RocksDB state store instance should succeed") {
+      withTempDir { dir =>
+        val sqlConf = SQLConf.get
+        sqlConf.setConfString("spark.sql.streaming.stateStore.rocksdb.maxOpenFiles", maxOpenFiles)
+        val dbConf = RocksDBConf(StateStoreConf(sqlConf))
+        assert(dbConf.maxOpenFiles === maxOpenFiles.toInt)
+
+        val remoteDir = dir.getCanonicalPath
+        withDB(remoteDir, conf = dbConf) { db =>
+          // Do some DB ops
+          db.load(0)
+          db.put("a", "1")
+          db.commit()
+          assert(toStr(db.get("a")) === "1")
+        }
+      }
+    }
+  }
+
+  Seq("test", "true").foreach { maxOpenFiles =>
+    test(s"SPARK-39781: adding invalid max_open_files=$maxOpenFiles config property " +
+      "for RocksDB state store instance should fail") {
+      withTempDir { dir =>
+        val ex = intercept[IllegalArgumentException] {
+          val sqlConf = SQLConf.get
+          sqlConf.setConfString("spark.sql.streaming.stateStore.rocksdb.maxOpenFiles",
+            maxOpenFiles)
+          val dbConf = RocksDBConf(StateStoreConf(sqlConf))
+          assert(dbConf.maxOpenFiles === maxOpenFiles.toInt)
+
+          val remoteDir = dir.getCanonicalPath
+          withDB(remoteDir, conf = dbConf) { db =>
+            // Do some DB ops
+            db.load(0)
+            db.put("a", "1")
+            db.commit()
+            assert(toStr(db.get("a")) === "1")
+          }
+        }
+        assert(ex.getMessage.contains("Invalid value for"))
+        assert(ex.getMessage.contains("must be an integer"))
+      }
+    }
+  }
+
   test("SPARK-37224: flipping option 'trackTotalNumberOfRows' during restart") {
     withTempDir { dir =>
       val remoteDir = dir.getCanonicalPath


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org