You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2023/12/05 08:49:41 UTC

(spark) branch master updated: [SPARK-46258][CORE] Add `RocksDBPersistenceEngine`

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new dd11075db618 [SPARK-46258][CORE] Add `RocksDBPersistenceEngine`
dd11075db618 is described below

commit dd11075db61879e200b4121b83d4239954881ddd
Author: Dongjoon Hyun <dh...@apple.com>
AuthorDate: Tue Dec 5 00:49:21 2023 -0800

    [SPARK-46258][CORE] Add `RocksDBPersistenceEngine`
    
    ### What changes were proposed in this pull request?
    
    This PR aims to add `RocksDBPersistenceEngine`.
    
    ### Why are the changes needed?
    
    To speed up `Spark Master` HA operations by **6.1x**.
    
    ```
    OpenJDK 64-Bit Server VM 21.0.1+12-LTS on Linux 5.15.0-1051-azure
    AMD EPYC 7763 64-Core Processor
    1000 Workers:                                        Best Time(ms)   Avg Time(ms)   Relative
    --------------------------------------------------------------------------------------------
    FileSystemPersistenceEngine with JavaSerializer               1571           1616       3.6X
    RocksDBPersistenceEngine with JavaSerializer                   257            258      22.0X
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    No. This is a new backend.
    
    ### How was this patch tested?
    
    Pass the CIs with the newly added test cases.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #44173 from dongjoon-hyun/SPARK-46258.
    
    Authored-by: Dongjoon Hyun <dh...@apple.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../PersistenceEngineBenchmark-jdk21-results.txt   |  28 +++---
 .../PersistenceEngineBenchmark-results.txt         |  28 +++---
 .../org/apache/spark/deploy/master/Master.scala    |   4 +
 .../spark/deploy/master/RecoveryModeFactory.scala  |  18 ++++
 .../deploy/master/RocksDBPersistenceEngine.scala   | 103 +++++++++++++++++++++
 .../apache/spark/deploy/master/MasterSuite.scala   |  20 ++++
 .../deploy/master/PersistenceEngineBenchmark.scala |  11 +++
 .../deploy/master/PersistenceEngineSuite.scala     |   9 ++
 8 files changed, 195 insertions(+), 26 deletions(-)

diff --git a/core/benchmarks/PersistenceEngineBenchmark-jdk21-results.txt b/core/benchmarks/PersistenceEngineBenchmark-jdk21-results.txt
index 314fb6958b69..99035eb336a3 100644
--- a/core/benchmarks/PersistenceEngineBenchmark-jdk21-results.txt
+++ b/core/benchmarks/PersistenceEngineBenchmark-jdk21-results.txt
@@ -6,18 +6,20 @@ OpenJDK 64-Bit Server VM 21.0.1+12-LTS on Linux 5.15.0-1051-azure
 AMD EPYC 7763 64-Core Processor
 1000 Workers:                                             Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 ----------------------------------------------------------------------------------------------------------------------------------------
-ZooKeeperPersistenceEngine with JavaSerializer                     5402           5546         233          0.0     5402030.8       1.0X
-ZooKeeperPersistenceEngine with KryoSerializer                     4185           4220          32          0.0     4184623.1       1.3X
-FileSystemPersistenceEngine with JavaSerializer                    1591           1634          37          0.0     1590836.4       3.4X
-FileSystemPersistenceEngine with JavaSerializer (lz4)               611            623          14          0.0      611256.6       8.8X
-FileSystemPersistenceEngine with JavaSerializer (lzf)               626            640          13          0.0      626072.2       8.6X
-FileSystemPersistenceEngine with JavaSerializer (snappy)            595            628          29          0.0      594744.4       9.1X
-FileSystemPersistenceEngine with JavaSerializer (zstd)              755            774          21          0.0      754604.4       7.2X
-FileSystemPersistenceEngine with KryoSerializer                     479            489           8          0.0      479404.7      11.3X
-FileSystemPersistenceEngine with KryoSerializer (lz4)               392            406          12          0.0      392165.7      13.8X
-FileSystemPersistenceEngine with KryoSerializer (lzf)               525            536          14          0.0      524916.7      10.3X
-FileSystemPersistenceEngine with KryoSerializer (snappy)            519            533          14          0.0      518569.3      10.4X
-FileSystemPersistenceEngine with KryoSerializer (zstd)              627            663          31          0.0      627233.2       8.6X
-BlackHolePersistenceEngine                                            0              0           0          6.0         166.0   32541.8X
+ZooKeeperPersistenceEngine with JavaSerializer                     5863           6053         265          0.0     5862988.1       1.0X
+ZooKeeperPersistenceEngine with KryoSerializer                     4553           4612          54          0.0     4553477.9       1.3X
+FileSystemPersistenceEngine with JavaSerializer                    1619           1632          17          0.0     1618500.5       3.6X
+FileSystemPersistenceEngine with JavaSerializer (lz4)               619            631          10          0.0      619255.8       9.5X
+FileSystemPersistenceEngine with JavaSerializer (lzf)               623            640          20          0.0      623222.4       9.4X
+FileSystemPersistenceEngine with JavaSerializer (snappy)            553            596          37          0.0      553417.4      10.6X
+FileSystemPersistenceEngine with JavaSerializer (zstd)              747            767          26          0.0      747197.1       7.8X
+FileSystemPersistenceEngine with KryoSerializer                     394            460          57          0.0      393534.6      14.9X
+FileSystemPersistenceEngine with KryoSerializer (lz4)               368            406          33          0.0      367925.8      15.9X
+FileSystemPersistenceEngine with KryoSerializer (lzf)               509            532          35          0.0      509170.2      11.5X
+FileSystemPersistenceEngine with KryoSerializer (snappy)            515            540          28          0.0      515190.9      11.4X
+FileSystemPersistenceEngine with KryoSerializer (zstd)              632            656          32          0.0      631522.4       9.3X
+RocksDBPersistenceEngine with JavaSerializer                        265            266           1          0.0      265026.0      22.1X
+RocksDBPersistenceEngine with KryoSerializer                         93             94           2          0.0       92732.3      63.2X
+BlackHolePersistenceEngine                                            0              0           0          6.0         166.8   35151.7X
 
 
diff --git a/core/benchmarks/PersistenceEngineBenchmark-results.txt b/core/benchmarks/PersistenceEngineBenchmark-results.txt
index 64c8bb440842..6c33de480b1c 100644
--- a/core/benchmarks/PersistenceEngineBenchmark-results.txt
+++ b/core/benchmarks/PersistenceEngineBenchmark-results.txt
@@ -6,18 +6,20 @@ OpenJDK 64-Bit Server VM 17.0.9+9-LTS on Linux 5.15.0-1051-azure
 AMD EPYC 7763 64-Core Processor
 1000 Workers:                                             Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 ----------------------------------------------------------------------------------------------------------------------------------------
-ZooKeeperPersistenceEngine with JavaSerializer                     5463           5745         246          0.0     5463433.7       1.0X
-ZooKeeperPersistenceEngine with KryoSerializer                     4478           4513          31          0.0     4477926.8       1.2X
-FileSystemPersistenceEngine with JavaSerializer                    1650           1670          19          0.0     1649557.9       3.3X
-FileSystemPersistenceEngine with JavaSerializer (lz4)               628            649          19          0.0      628265.9       8.7X
-FileSystemPersistenceEngine with JavaSerializer (lzf)               545            575          28          0.0      544502.0      10.0X
-FileSystemPersistenceEngine with JavaSerializer (snappy)            608            625          27          0.0      607926.2       9.0X
-FileSystemPersistenceEngine with JavaSerializer (zstd)              805            814           8          0.0      804867.1       6.8X
-FileSystemPersistenceEngine with KryoSerializer                     420            461          40          0.0      420137.2      13.0X
-FileSystemPersistenceEngine with KryoSerializer (lz4)               380            452          64          0.0      379742.4      14.4X
-FileSystemPersistenceEngine with KryoSerializer (lzf)               518            543          31          0.0      518217.1      10.5X
-FileSystemPersistenceEngine with KryoSerializer (snappy)            483            513          31          0.0      483112.6      11.3X
-FileSystemPersistenceEngine with KryoSerializer (zstd)              668            679           9          0.0      668335.1       8.2X
-BlackHolePersistenceEngine                                            0              0           0          5.8         172.5   31668.2X
+ZooKeeperPersistenceEngine with JavaSerializer                     6123           6309         172          0.0     6122561.9       1.0X
+ZooKeeperPersistenceEngine with KryoSerializer                     4676           4753          71          0.0     4675978.2       1.3X
+FileSystemPersistenceEngine with JavaSerializer                    1657           1679          20          0.0     1656526.3       3.7X
+FileSystemPersistenceEngine with JavaSerializer (lz4)               641            657          18          0.0      641219.3       9.5X
+FileSystemPersistenceEngine with JavaSerializer (lzf)               610            613           4          0.0      609684.2      10.0X
+FileSystemPersistenceEngine with JavaSerializer (snappy)            615            641          23          0.0      615266.4      10.0X
+FileSystemPersistenceEngine with JavaSerializer (zstd)              749            764          17          0.0      749140.8       8.2X
+FileSystemPersistenceEngine with KryoSerializer                     460            477          15          0.0      460196.8      13.3X
+FileSystemPersistenceEngine with KryoSerializer (lz4)               403            439          32          0.0      402877.4      15.2X
+FileSystemPersistenceEngine with KryoSerializer (lzf)               543            571          49          0.0      542685.4      11.3X
+FileSystemPersistenceEngine with KryoSerializer (snappy)            498            507          16          0.0      497754.2      12.3X
+FileSystemPersistenceEngine with KryoSerializer (zstd)              644            653           8          0.0      643776.5       9.5X
+RocksDBPersistenceEngine with JavaSerializer                        279            281           2          0.0      278935.2      21.9X
+RocksDBPersistenceEngine with KryoSerializer                         92             92           1          0.0       91713.1      66.8X
+BlackHolePersistenceEngine                                            0              0           0          6.0         165.6   36965.5X
 
 
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 2e1d7b9bce33..e2c652f944c9 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -186,6 +186,10 @@ private[deploy] class Master(
         val fsFactory =
           new FileSystemRecoveryModeFactory(conf, serializer)
         (fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
+      case "ROCKSDB" =>
+        val rdbFactory =
+          new RocksDBRecoveryModeFactory(conf, serializer)
+        (rdbFactory.createPersistenceEngine(), rdbFactory.createLeaderElectionAgent(this))
       case "CUSTOM" =>
         val clazz = Utils.classForName(conf.get(RECOVERY_MODE_FACTORY))
         val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serializer])
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala b/core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala
index c24c4e5fe6be..106acc9a7944 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala
@@ -67,6 +67,24 @@ private[master] class FileSystemRecoveryModeFactory(conf: SparkConf, serializer:
   }
 }
 
+/**
+ * LeaderAgent in this case is a no-op. Since leader is forever leader as the actual
+ * recovery is made by restoring from RocksDB.
+ */
+private[master] class RocksDBRecoveryModeFactory(conf: SparkConf, serializer: Serializer)
+  extends StandaloneRecoveryModeFactory(conf, serializer) with Logging {
+
+  def createPersistenceEngine(): PersistenceEngine = {
+    val recoveryDir = conf.get(RECOVERY_DIRECTORY)
+    logInfo("Persisting recovery state to directory: " + recoveryDir)
+    new RocksDBPersistenceEngine(recoveryDir, serializer)
+  }
+
+  def createLeaderElectionAgent(master: LeaderElectable): LeaderElectionAgent = {
+    new MonarchyLeaderAgent(master)
+  }
+}
+
 private[master] class ZooKeeperRecoveryModeFactory(conf: SparkConf, serializer: Serializer)
   extends StandaloneRecoveryModeFactory(conf, serializer) {
 
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/RocksDBPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/RocksDBPersistenceEngine.scala
new file mode 100644
index 000000000000..5c43dab4d066
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/master/RocksDBPersistenceEngine.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+import java.nio.ByteBuffer
+import java.nio.charset.StandardCharsets.UTF_8
+import java.nio.file.{Files, Paths}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
+
+import org.rocksdb._
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.serializer.Serializer
+
+
+/**
+ * Stores data in RocksDB.
+ *
+ * @param dir Directory to setup RocksDB. Created if non-existent.
+ * @param serializer Used to serialize our objects.
+ */
+private[master] class RocksDBPersistenceEngine(
+    val dir: String,
+    val serializer: Serializer)
+  extends PersistenceEngine with Logging {
+
+  RocksDB.loadLibrary()
+
+  private val path = Files.createDirectories(Paths.get(dir))
+
+  /**
+   * Use full filter.
+   * Disable compression in index data.
+   *
+   * https://github.com/facebook/rocksdb/wiki/RocksDB-Bloom-Filter#full-filters-new-format
+   */
+  private val tableFormatConfig = new BlockBasedTableConfig()
+    .setFilterPolicy(new BloomFilter(10.0D, false))
+    .setEnableIndexCompression(false)
+    .setIndexBlockRestartInterval(8)
+    .setFormatVersion(5)
+
+  /**
+   * Use ZSTD at the bottom most level to reduce the disk space
+   * Use LZ4 at the other levels because it's better than Snappy in general.
+   *
+   * https://github.com/facebook/rocksdb/wiki/Compression#configuration
+   */
+  private val options = new Options()
+    .setCreateIfMissing(true)
+    .setBottommostCompressionType(CompressionType.ZSTD_COMPRESSION)
+    .setCompressionType(CompressionType.LZ4_COMPRESSION)
+    .setTableFormatConfig(tableFormatConfig)
+
+  private val db: RocksDB = RocksDB.open(options, path.toString)
+
+  override def persist(name: String, obj: Object): Unit = {
+    val serialized = serializer.newInstance().serialize(obj)
+    if (serialized.hasArray) {
+      db.put(name.getBytes(UTF_8), serialized.array())
+    } else {
+      val bytes = new Array[Byte](serialized.remaining())
+      serialized.get(bytes)
+      db.put(name.getBytes(UTF_8), bytes)
+    }
+  }
+
+  override def unpersist(name: String): Unit = {
+    db.delete(name.getBytes(UTF_8))
+  }
+
+  override def read[T: ClassTag](name: String): Seq[T] = {
+    val result = new ArrayBuffer[T]
+    val iter = db.newIterator()
+    try {
+      iter.seek(name.getBytes(UTF_8))
+      while (iter.isValid && new String(iter.key()).startsWith(name)) {
+        result.append(serializer.newInstance().deserialize[T](ByteBuffer.wrap(iter.value())))
+        iter.next()
+      }
+    } finally {
+      iter.close()
+    }
+    result.toSeq
+  }
+}
diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
index 81756d807b2e..9fd1991dab02 100644
--- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
@@ -386,6 +386,26 @@ class MasterSuite extends SparkFunSuite
     }
   }
 
+  test("SPARK-46258: Recovery with RocksDB") {
+    val conf = new SparkConf(loadDefaults = false)
+    conf.set(RECOVERY_MODE, "ROCKSDB")
+    conf.set(RECOVERY_SERIALIZER, "Kryo")
+    conf.set(RECOVERY_DIRECTORY, System.getProperty("java.io.tmpdir"))
+
+    var master: Master = null
+    try {
+      master = makeAliveMaster(conf)
+      val e = master.invokePrivate(_persistenceEngine()).asInstanceOf[RocksDBPersistenceEngine]
+      assert(e.serializer.isInstanceOf[KryoSerializer])
+    } finally {
+      if (master != null) {
+        master.rpcEnv.shutdown()
+        master.rpcEnv.awaitTermination()
+        master = null
+      }
+    }
+  }
+
   test("master/worker web ui available") {
     implicit val formats = org.json4s.DefaultFormats
     val conf = new SparkConf()
diff --git a/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineBenchmark.scala b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineBenchmark.scala
index f538a1a06f6d..34a447efe528 100644
--- a/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineBenchmark.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineBenchmark.scala
@@ -110,6 +110,17 @@ object PersistenceEngineBenchmark extends BenchmarkBase {
         }
       }
 
+      serializers.foreach { serializer =>
+        val serializerName = serializer.getClass.getSimpleName
+        val name = s"RocksDBPersistenceEngine with $serializerName"
+        benchmark.addCase(name, numIters) { _ =>
+          val dir = Utils.createTempDir().getAbsolutePath
+          val engine = new RocksDBPersistenceEngine(dir, serializer)
+          writeAndRead(engine)
+          engine.close()
+        }
+      }
+
       benchmark.addCase("BlackHolePersistenceEngine", numIters) { _ =>
         val engine = new BlackHolePersistenceEngine()
         writeAndRead(engine)
diff --git a/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala
index c02b84fc1a66..84181ea3fca3 100644
--- a/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala
@@ -41,6 +41,15 @@ class PersistenceEngineSuite extends SparkFunSuite {
     }
   }
 
+  test("SPARK-46258: RocksDBPersistenceEngine") {
+    withTempDir { dir =>
+      val conf = new SparkConf()
+      testPersistenceEngine(conf, serializer =>
+        new RocksDBPersistenceEngine(dir.getAbsolutePath, serializer)
+      )
+    }
+  }
+
   test("SPARK-46191: FileSystemPersistenceEngine.persist error message for the existing file") {
     withTempDir { dir =>
       val conf = new SparkConf()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org