You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2023/12/05 08:49:41 UTC
(spark) branch master updated: [SPARK-46258][CORE] Add `RocksDBPersistenceEngine`
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new dd11075db618 [SPARK-46258][CORE] Add `RocksDBPersistenceEngine`
dd11075db618 is described below
commit dd11075db61879e200b4121b83d4239954881ddd
Author: Dongjoon Hyun <dh...@apple.com>
AuthorDate: Tue Dec 5 00:49:21 2023 -0800
[SPARK-46258][CORE] Add `RocksDBPersistenceEngine`
### What changes were proposed in this pull request?
This PR aims to add `RocksDBPersistenceEngine`.
### Why are the changes needed?
To speed up `Spark Master` HA operations by **6.1x**.
```
OpenJDK 64-Bit Server VM 21.0.1+12-LTS on Linux 5.15.0-1051-azure
AMD EPYC 7763 64-Core Processor
1000 Workers: Best Time(ms) Avg Time(ms) Relative
--------------------------------------------------------------------------------------------
FileSystemPersistenceEngine with JavaSerializer 1571 1616 3.6X
RocksDBPersistenceEngine with JavaSerializer 257 258 22.0X
```
### Does this PR introduce _any_ user-facing change?
No. This is a new backend.
### How was this patch tested?
Pass the CIs with the newly added test cases.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #44173 from dongjoon-hyun/SPARK-46258.
Authored-by: Dongjoon Hyun <dh...@apple.com>
Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
.../PersistenceEngineBenchmark-jdk21-results.txt | 28 +++---
.../PersistenceEngineBenchmark-results.txt | 28 +++---
.../org/apache/spark/deploy/master/Master.scala | 4 +
.../spark/deploy/master/RecoveryModeFactory.scala | 18 ++++
.../deploy/master/RocksDBPersistenceEngine.scala | 103 +++++++++++++++++++++
.../apache/spark/deploy/master/MasterSuite.scala | 20 ++++
.../deploy/master/PersistenceEngineBenchmark.scala | 11 +++
.../deploy/master/PersistenceEngineSuite.scala | 9 ++
8 files changed, 195 insertions(+), 26 deletions(-)
diff --git a/core/benchmarks/PersistenceEngineBenchmark-jdk21-results.txt b/core/benchmarks/PersistenceEngineBenchmark-jdk21-results.txt
index 314fb6958b69..99035eb336a3 100644
--- a/core/benchmarks/PersistenceEngineBenchmark-jdk21-results.txt
+++ b/core/benchmarks/PersistenceEngineBenchmark-jdk21-results.txt
@@ -6,18 +6,20 @@ OpenJDK 64-Bit Server VM 21.0.1+12-LTS on Linux 5.15.0-1051-azure
AMD EPYC 7763 64-Core Processor
1000 Workers: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
----------------------------------------------------------------------------------------------------------------------------------------
-ZooKeeperPersistenceEngine with JavaSerializer 5402 5546 233 0.0 5402030.8 1.0X
-ZooKeeperPersistenceEngine with KryoSerializer 4185 4220 32 0.0 4184623.1 1.3X
-FileSystemPersistenceEngine with JavaSerializer 1591 1634 37 0.0 1590836.4 3.4X
-FileSystemPersistenceEngine with JavaSerializer (lz4) 611 623 14 0.0 611256.6 8.8X
-FileSystemPersistenceEngine with JavaSerializer (lzf) 626 640 13 0.0 626072.2 8.6X
-FileSystemPersistenceEngine with JavaSerializer (snappy) 595 628 29 0.0 594744.4 9.1X
-FileSystemPersistenceEngine with JavaSerializer (zstd) 755 774 21 0.0 754604.4 7.2X
-FileSystemPersistenceEngine with KryoSerializer 479 489 8 0.0 479404.7 11.3X
-FileSystemPersistenceEngine with KryoSerializer (lz4) 392 406 12 0.0 392165.7 13.8X
-FileSystemPersistenceEngine with KryoSerializer (lzf) 525 536 14 0.0 524916.7 10.3X
-FileSystemPersistenceEngine with KryoSerializer (snappy) 519 533 14 0.0 518569.3 10.4X
-FileSystemPersistenceEngine with KryoSerializer (zstd) 627 663 31 0.0 627233.2 8.6X
-BlackHolePersistenceEngine 0 0 0 6.0 166.0 32541.8X
+ZooKeeperPersistenceEngine with JavaSerializer 5863 6053 265 0.0 5862988.1 1.0X
+ZooKeeperPersistenceEngine with KryoSerializer 4553 4612 54 0.0 4553477.9 1.3X
+FileSystemPersistenceEngine with JavaSerializer 1619 1632 17 0.0 1618500.5 3.6X
+FileSystemPersistenceEngine with JavaSerializer (lz4) 619 631 10 0.0 619255.8 9.5X
+FileSystemPersistenceEngine with JavaSerializer (lzf) 623 640 20 0.0 623222.4 9.4X
+FileSystemPersistenceEngine with JavaSerializer (snappy) 553 596 37 0.0 553417.4 10.6X
+FileSystemPersistenceEngine with JavaSerializer (zstd) 747 767 26 0.0 747197.1 7.8X
+FileSystemPersistenceEngine with KryoSerializer 394 460 57 0.0 393534.6 14.9X
+FileSystemPersistenceEngine with KryoSerializer (lz4) 368 406 33 0.0 367925.8 15.9X
+FileSystemPersistenceEngine with KryoSerializer (lzf) 509 532 35 0.0 509170.2 11.5X
+FileSystemPersistenceEngine with KryoSerializer (snappy) 515 540 28 0.0 515190.9 11.4X
+FileSystemPersistenceEngine with KryoSerializer (zstd) 632 656 32 0.0 631522.4 9.3X
+RocksDBPersistenceEngine with JavaSerializer 265 266 1 0.0 265026.0 22.1X
+RocksDBPersistenceEngine with KryoSerializer 93 94 2 0.0 92732.3 63.2X
+BlackHolePersistenceEngine 0 0 0 6.0 166.8 35151.7X
diff --git a/core/benchmarks/PersistenceEngineBenchmark-results.txt b/core/benchmarks/PersistenceEngineBenchmark-results.txt
index 64c8bb440842..6c33de480b1c 100644
--- a/core/benchmarks/PersistenceEngineBenchmark-results.txt
+++ b/core/benchmarks/PersistenceEngineBenchmark-results.txt
@@ -6,18 +6,20 @@ OpenJDK 64-Bit Server VM 17.0.9+9-LTS on Linux 5.15.0-1051-azure
AMD EPYC 7763 64-Core Processor
1000 Workers: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
----------------------------------------------------------------------------------------------------------------------------------------
-ZooKeeperPersistenceEngine with JavaSerializer 5463 5745 246 0.0 5463433.7 1.0X
-ZooKeeperPersistenceEngine with KryoSerializer 4478 4513 31 0.0 4477926.8 1.2X
-FileSystemPersistenceEngine with JavaSerializer 1650 1670 19 0.0 1649557.9 3.3X
-FileSystemPersistenceEngine with JavaSerializer (lz4) 628 649 19 0.0 628265.9 8.7X
-FileSystemPersistenceEngine with JavaSerializer (lzf) 545 575 28 0.0 544502.0 10.0X
-FileSystemPersistenceEngine with JavaSerializer (snappy) 608 625 27 0.0 607926.2 9.0X
-FileSystemPersistenceEngine with JavaSerializer (zstd) 805 814 8 0.0 804867.1 6.8X
-FileSystemPersistenceEngine with KryoSerializer 420 461 40 0.0 420137.2 13.0X
-FileSystemPersistenceEngine with KryoSerializer (lz4) 380 452 64 0.0 379742.4 14.4X
-FileSystemPersistenceEngine with KryoSerializer (lzf) 518 543 31 0.0 518217.1 10.5X
-FileSystemPersistenceEngine with KryoSerializer (snappy) 483 513 31 0.0 483112.6 11.3X
-FileSystemPersistenceEngine with KryoSerializer (zstd) 668 679 9 0.0 668335.1 8.2X
-BlackHolePersistenceEngine 0 0 0 5.8 172.5 31668.2X
+ZooKeeperPersistenceEngine with JavaSerializer 6123 6309 172 0.0 6122561.9 1.0X
+ZooKeeperPersistenceEngine with KryoSerializer 4676 4753 71 0.0 4675978.2 1.3X
+FileSystemPersistenceEngine with JavaSerializer 1657 1679 20 0.0 1656526.3 3.7X
+FileSystemPersistenceEngine with JavaSerializer (lz4) 641 657 18 0.0 641219.3 9.5X
+FileSystemPersistenceEngine with JavaSerializer (lzf) 610 613 4 0.0 609684.2 10.0X
+FileSystemPersistenceEngine with JavaSerializer (snappy) 615 641 23 0.0 615266.4 10.0X
+FileSystemPersistenceEngine with JavaSerializer (zstd) 749 764 17 0.0 749140.8 8.2X
+FileSystemPersistenceEngine with KryoSerializer 460 477 15 0.0 460196.8 13.3X
+FileSystemPersistenceEngine with KryoSerializer (lz4) 403 439 32 0.0 402877.4 15.2X
+FileSystemPersistenceEngine with KryoSerializer (lzf) 543 571 49 0.0 542685.4 11.3X
+FileSystemPersistenceEngine with KryoSerializer (snappy) 498 507 16 0.0 497754.2 12.3X
+FileSystemPersistenceEngine with KryoSerializer (zstd) 644 653 8 0.0 643776.5 9.5X
+RocksDBPersistenceEngine with JavaSerializer 279 281 2 0.0 278935.2 21.9X
+RocksDBPersistenceEngine with KryoSerializer 92 92 1 0.0 91713.1 66.8X
+BlackHolePersistenceEngine 0 0 0 6.0 165.6 36965.5X
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 2e1d7b9bce33..e2c652f944c9 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -186,6 +186,10 @@ private[deploy] class Master(
val fsFactory =
new FileSystemRecoveryModeFactory(conf, serializer)
(fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
+ case "ROCKSDB" =>
+ val rdbFactory =
+ new RocksDBRecoveryModeFactory(conf, serializer)
+ (rdbFactory.createPersistenceEngine(), rdbFactory.createLeaderElectionAgent(this))
case "CUSTOM" =>
val clazz = Utils.classForName(conf.get(RECOVERY_MODE_FACTORY))
val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serializer])
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala b/core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala
index c24c4e5fe6be..106acc9a7944 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala
@@ -67,6 +67,24 @@ private[master] class FileSystemRecoveryModeFactory(conf: SparkConf, serializer:
}
}
+/**
+ * LeaderAgent in this case is a no-op. Since leader is forever leader as the actual
+ * recovery is made by restoring from RocksDB.
+ */
+private[master] class RocksDBRecoveryModeFactory(conf: SparkConf, serializer: Serializer)
+ extends StandaloneRecoveryModeFactory(conf, serializer) with Logging {
+
+ def createPersistenceEngine(): PersistenceEngine = {
+ val recoveryDir = conf.get(RECOVERY_DIRECTORY)
+ logInfo("Persisting recovery state to directory: " + recoveryDir)
+ new RocksDBPersistenceEngine(recoveryDir, serializer)
+ }
+
+ def createLeaderElectionAgent(master: LeaderElectable): LeaderElectionAgent = {
+ new MonarchyLeaderAgent(master)
+ }
+}
+
private[master] class ZooKeeperRecoveryModeFactory(conf: SparkConf, serializer: Serializer)
extends StandaloneRecoveryModeFactory(conf, serializer) {
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/RocksDBPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/RocksDBPersistenceEngine.scala
new file mode 100644
index 000000000000..5c43dab4d066
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/master/RocksDBPersistenceEngine.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+import java.nio.ByteBuffer
+import java.nio.charset.StandardCharsets.UTF_8
+import java.nio.file.{Files, Paths}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
+
+import org.rocksdb._
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.serializer.Serializer
+
+
+/**
+ * Stores data in RocksDB.
+ *
+ * @param dir Directory to setup RocksDB. Created if non-existent.
+ * @param serializer Used to serialize our objects.
+ */
+private[master] class RocksDBPersistenceEngine(
+ val dir: String,
+ val serializer: Serializer)
+ extends PersistenceEngine with Logging {
+
+ RocksDB.loadLibrary()
+
+ private val path = Files.createDirectories(Paths.get(dir))
+
+ /**
+ * Use full filter.
+ * Disable compression in index data.
+ *
+ * https://github.com/facebook/rocksdb/wiki/RocksDB-Bloom-Filter#full-filters-new-format
+ */
+ private val tableFormatConfig = new BlockBasedTableConfig()
+ .setFilterPolicy(new BloomFilter(10.0D, false))
+ .setEnableIndexCompression(false)
+ .setIndexBlockRestartInterval(8)
+ .setFormatVersion(5)
+
+ /**
+ * Use ZSTD at the bottom most level to reduce the disk space
+ * Use LZ4 at the other levels because it's better than Snappy in general.
+ *
+ * https://github.com/facebook/rocksdb/wiki/Compression#configuration
+ */
+ private val options = new Options()
+ .setCreateIfMissing(true)
+ .setBottommostCompressionType(CompressionType.ZSTD_COMPRESSION)
+ .setCompressionType(CompressionType.LZ4_COMPRESSION)
+ .setTableFormatConfig(tableFormatConfig)
+
+ private val db: RocksDB = RocksDB.open(options, path.toString)
+
+ override def persist(name: String, obj: Object): Unit = {
+ val serialized = serializer.newInstance().serialize(obj)
+ if (serialized.hasArray) {
+ db.put(name.getBytes(UTF_8), serialized.array())
+ } else {
+ val bytes = new Array[Byte](serialized.remaining())
+ serialized.get(bytes)
+ db.put(name.getBytes(UTF_8), bytes)
+ }
+ }
+
+ override def unpersist(name: String): Unit = {
+ db.delete(name.getBytes(UTF_8))
+ }
+
+ override def read[T: ClassTag](name: String): Seq[T] = {
+ val result = new ArrayBuffer[T]
+ val iter = db.newIterator()
+ try {
+ iter.seek(name.getBytes(UTF_8))
+ while (iter.isValid && new String(iter.key()).startsWith(name)) {
+ result.append(serializer.newInstance().deserialize[T](ByteBuffer.wrap(iter.value())))
+ iter.next()
+ }
+ } finally {
+ iter.close()
+ }
+ result.toSeq
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
index 81756d807b2e..9fd1991dab02 100644
--- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
@@ -386,6 +386,26 @@ class MasterSuite extends SparkFunSuite
}
}
+ test("SPARK-46258: Recovery with RocksDB") {
+ val conf = new SparkConf(loadDefaults = false)
+ conf.set(RECOVERY_MODE, "ROCKSDB")
+ conf.set(RECOVERY_SERIALIZER, "Kryo")
+ conf.set(RECOVERY_DIRECTORY, System.getProperty("java.io.tmpdir"))
+
+ var master: Master = null
+ try {
+ master = makeAliveMaster(conf)
+ val e = master.invokePrivate(_persistenceEngine()).asInstanceOf[RocksDBPersistenceEngine]
+ assert(e.serializer.isInstanceOf[KryoSerializer])
+ } finally {
+ if (master != null) {
+ master.rpcEnv.shutdown()
+ master.rpcEnv.awaitTermination()
+ master = null
+ }
+ }
+ }
+
test("master/worker web ui available") {
implicit val formats = org.json4s.DefaultFormats
val conf = new SparkConf()
diff --git a/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineBenchmark.scala b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineBenchmark.scala
index f538a1a06f6d..34a447efe528 100644
--- a/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineBenchmark.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineBenchmark.scala
@@ -110,6 +110,17 @@ object PersistenceEngineBenchmark extends BenchmarkBase {
}
}
+ serializers.foreach { serializer =>
+ val serializerName = serializer.getClass.getSimpleName
+ val name = s"RocksDBPersistenceEngine with $serializerName"
+ benchmark.addCase(name, numIters) { _ =>
+ val dir = Utils.createTempDir().getAbsolutePath
+ val engine = new RocksDBPersistenceEngine(dir, serializer)
+ writeAndRead(engine)
+ engine.close()
+ }
+ }
+
benchmark.addCase("BlackHolePersistenceEngine", numIters) { _ =>
val engine = new BlackHolePersistenceEngine()
writeAndRead(engine)
diff --git a/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala
index c02b84fc1a66..84181ea3fca3 100644
--- a/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala
@@ -41,6 +41,15 @@ class PersistenceEngineSuite extends SparkFunSuite {
}
}
+ test("SPARK-46258: RocksDBPersistenceEngine") {
+ withTempDir { dir =>
+ val conf = new SparkConf()
+ testPersistenceEngine(conf, serializer =>
+ new RocksDBPersistenceEngine(dir.getAbsolutePath, serializer)
+ )
+ }
+ }
+
test("SPARK-46191: FileSystemPersistenceEngine.persist error message for the existing file") {
withTempDir { dir =>
val conf = new SparkConf()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org