You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "dongjoon-hyun (via GitHub)" <gi...@apache.org> on 2023/12/05 03:29:57 UTC

[PR] [SPARK-46258][CORE] Add RocksDBPersistenceEngine [spark]

dongjoon-hyun opened a new pull request, #44173:
URL: https://github.com/apache/spark/pull/44173

   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the guideline first in
        'core/src/main/resources/error/README.md'.
   -->
   
   ### What changes were proposed in this pull request?
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. 
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   
   
   ### Why are the changes needed?
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   
   
   ### Does this PR introduce _any_ user-facing change?
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   If benchmark tests were added, please run the benchmarks in GitHub Actions for the consistent environment, and the instructions could accord to: https://spark.apache.org/developer-tools.html#github-workflow-benchmarks.
   -->
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   <!--
   If generative AI tooling has been used in the process of authoring this patch, please include the
   phrase: 'Generated-by: ' followed by the name of the tool and its version.
   If no, write 'No'.
   Please refer to the [ASF Generative Tooling Guidance](https://www.apache.org/legal/generative-tooling.html) for details.
   -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on PR #44173:
URL: https://github.com/apache/spark/pull/44173#issuecomment-1840012871

   Could you review this PR, @LuciferYang ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #44173:
URL: https://github.com/apache/spark/pull/44173#discussion_r1414999619


##########
core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala:
##########
@@ -67,6 +67,25 @@ private[master] class FileSystemRecoveryModeFactory(conf: SparkConf, serializer:
   }
 }
 
+/**
+ * LeaderAgent in this case is a no-op. Since leader is forever leader as the actual
+ * recovery is made by restoring from RocksDB.
+ */
+private[master] class RocksDBRecoveryModeFactory(conf: SparkConf, serializer: Serializer)
+  extends StandaloneRecoveryModeFactory(conf, serializer) with Logging {
+
+  val recoveryDir = conf.get(RECOVERY_DIRECTORY)

Review Comment:
   From this perspective, I think reusing the configuration is also fine. But, I have another question: If the user first uses `FILESYSTEM` and then switches to `ROCKSDB`, if the user does not clean up `/opt/master` manually, will the residual data in the `/opt/master` directory cause the master to fail to start In this scenario?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on code in PR #44173:
URL: https://github.com/apache/spark/pull/44173#discussion_r1414987963


##########
core/src/main/scala/org/apache/spark/deploy/master/Master.scala:
##########
@@ -186,6 +186,10 @@ private[deploy] class Master(
         val fsFactory =
           new FileSystemRecoveryModeFactory(conf, serializer)
         (fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
+      case "ROCKSDB" =>

Review Comment:
   Yes, it's a legacy behavior which `spark.deploy.recoveryMode` always assumes all upper cases from Spark 0.8.1.
   - NONE, ZOOKEEPER, FILESYSTEM, CUSTOM



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #44173:
URL: https://github.com/apache/spark/pull/44173#discussion_r1414916176


##########
core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala:
##########
@@ -67,6 +67,25 @@ private[master] class FileSystemRecoveryModeFactory(conf: SparkConf, serializer:
   }
 }
 
+/**
+ * LeaderAgent in this case is a no-op. Since leader is forever leader as the actual
+ * recovery is made by restoring from RocksDB.
+ */
+private[master] class RocksDBRecoveryModeFactory(conf: SparkConf, serializer: Serializer)
+  extends StandaloneRecoveryModeFactory(conf, serializer) with Logging {
+
+  val recoveryDir = conf.get(RECOVERY_DIRECTORY)

Review Comment:
   Although this may be to maintain consistency with the style of `FileSystemRecoveryModeFactory`, it may be more suitable as a local variable within the `createPersistenceEngine()` method.



##########
core/src/main/scala/org/apache/spark/deploy/master/RocksDBPersistenceEngine.scala:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+import java.nio.ByteBuffer
+import java.nio.charset.StandardCharsets.UTF_8
+import java.nio.file.{Files, Paths}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
+
+import org.rocksdb._
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.serializer.Serializer
+
+
+/**
+ * Stores data in RocksDB.
+ *
+ * @param dir Directory to setup RocksDB. Created if non-existent.
+ * @param serializer Used to serialize our objects.
+ */
+private[master] class RocksDBPersistenceEngine(
+    val dir: String,
+    val serializer: Serializer)
+  extends PersistenceEngine with Logging {
+
+  RocksDB.loadLibrary()
+
+  val path = Files.createDirectories(Paths.get(dir))
+
+  /**
+   * Use full filter.
+   * Disable compression in index data.
+   *
+   * https://github.com/facebook/rocksdb/wiki/RocksDB-Bloom-Filter#full-filters-new-format
+   */
+  private val tableFormatConfig = new BlockBasedTableConfig()
+    .setFilterPolicy(new BloomFilter(10.0D, false))
+    .setEnableIndexCompression(false)
+    .setIndexBlockRestartInterval(8)
+    .setFormatVersion(5)
+
+  /**
+   * Use ZSTD at the bottom most level to reduce the disk space
+   * Use LZ4 at the other levels because it's better than Snappy in general.
+   *
+   * https://github.com/facebook/rocksdb/wiki/Compression#configuration
+   */
+  private val options = new Options()
+    .setCreateIfMissing(true)
+    .setBottommostCompressionType(CompressionType.ZSTD_COMPRESSION)
+    .setCompressionType(CompressionType.LZ4_COMPRESSION)
+    .setTableFormatConfig(tableFormatConfig)
+
+  private val db: RocksDB = RocksDB.open(options, path.toString)
+
+  override def persist(name: String, obj: Object): Unit = {
+    val serialized = serializer.newInstance().serialize(obj)
+    val bytes = new Array[Byte](serialized.remaining())
+    serialized.get(bytes)
+    db.put(name.getBytes(UTF_8), bytes)
+  }
+
+  override def unpersist(name: String): Unit = {
+    db.delete(name.getBytes(UTF_8))
+  }
+
+  override def read[T: ClassTag](name: String): Seq[T] = {
+    val result = new ArrayBuffer[T]
+    val iter = db.newIterator()
+    iter.seek(name.getBytes(UTF_8))
+    while (iter.isValid && new String(iter.key()).startsWith(name)) {
+      result.append(serializer.newInstance().deserialize[T](ByteBuffer.wrap(iter.value())))
+      iter.next()
+    }
+    iter.close()

Review Comment:
   should we close iter in finally block?



##########
core/src/main/scala/org/apache/spark/deploy/master/RocksDBPersistenceEngine.scala:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+import java.nio.ByteBuffer
+import java.nio.charset.StandardCharsets.UTF_8
+import java.nio.file.{Files, Paths}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
+
+import org.rocksdb._
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.serializer.Serializer
+
+
+/**
+ * Stores data in RocksDB.
+ *
+ * @param dir Directory to setup RocksDB. Created if non-existent.
+ * @param serializer Used to serialize our objects.
+ */
+private[master] class RocksDBPersistenceEngine(
+    val dir: String,
+    val serializer: Serializer)
+  extends PersistenceEngine with Logging {
+
+  RocksDB.loadLibrary()
+
+  val path = Files.createDirectories(Paths.get(dir))

Review Comment:
   ```suggestion
     val path: Path = Files.createDirectories(Paths.get(dir))
   ```



##########
core/src/main/scala/org/apache/spark/deploy/master/RocksDBPersistenceEngine.scala:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+import java.nio.ByteBuffer
+import java.nio.charset.StandardCharsets.UTF_8
+import java.nio.file.{Files, Paths}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
+
+import org.rocksdb._
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.serializer.Serializer
+
+
+/**
+ * Stores data in RocksDB.
+ *
+ * @param dir Directory to setup RocksDB. Created if non-existent.
+ * @param serializer Used to serialize our objects.
+ */
+private[master] class RocksDBPersistenceEngine(
+    val dir: String,
+    val serializer: Serializer)
+  extends PersistenceEngine with Logging {
+
+  RocksDB.loadLibrary()
+
+  val path = Files.createDirectories(Paths.get(dir))
+
+  /**
+   * Use full filter.
+   * Disable compression in index data.
+   *
+   * https://github.com/facebook/rocksdb/wiki/RocksDB-Bloom-Filter#full-filters-new-format
+   */
+  private val tableFormatConfig = new BlockBasedTableConfig()
+    .setFilterPolicy(new BloomFilter(10.0D, false))
+    .setEnableIndexCompression(false)
+    .setIndexBlockRestartInterval(8)
+    .setFormatVersion(5)
+
+  /**
+   * Use ZSTD at the bottom most level to reduce the disk space
+   * Use LZ4 at the other levels because it's better than Snappy in general.
+   *
+   * https://github.com/facebook/rocksdb/wiki/Compression#configuration
+   */
+  private val options = new Options()
+    .setCreateIfMissing(true)
+    .setBottommostCompressionType(CompressionType.ZSTD_COMPRESSION)
+    .setCompressionType(CompressionType.LZ4_COMPRESSION)
+    .setTableFormatConfig(tableFormatConfig)
+
+  private val db: RocksDB = RocksDB.open(options, path.toString)
+
+  override def persist(name: String, obj: Object): Unit = {
+    val serialized = serializer.newInstance().serialize(obj)

Review Comment:
   Can `db.put(name.getBytes(UTF_8), serialized.array())` be executed directly when ` serialized.hasArray` is true? Is this a feasible optimization?
   
   
   
   



##########
core/src/main/scala/org/apache/spark/deploy/master/RocksDBPersistenceEngine.scala:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+import java.nio.ByteBuffer
+import java.nio.charset.StandardCharsets.UTF_8
+import java.nio.file.{Files, Paths}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
+
+import org.rocksdb._
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.serializer.Serializer
+
+
+/**
+ * Stores data in RocksDB.
+ *
+ * @param dir Directory to setup RocksDB. Created if non-existent.
+ * @param serializer Used to serialize our objects.
+ */
+private[master] class RocksDBPersistenceEngine(
+    val dir: String,
+    val serializer: Serializer)
+  extends PersistenceEngine with Logging {
+
+  RocksDB.loadLibrary()
+
+  val path = Files.createDirectories(Paths.get(dir))

Review Comment:
   due to this a `public` member



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #44173:
URL: https://github.com/apache/spark/pull/44173#discussion_r1414938380


##########
core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala:
##########
@@ -67,6 +67,25 @@ private[master] class FileSystemRecoveryModeFactory(conf: SparkConf, serializer:
   }
 }
 
+/**
+ * LeaderAgent in this case is a no-op. Since leader is forever leader as the actual
+ * recovery is made by restoring from RocksDB.
+ */
+private[master] class RocksDBRecoveryModeFactory(conf: SparkConf, serializer: Serializer)
+  extends StandaloneRecoveryModeFactory(conf, serializer) with Logging {
+
+  val recoveryDir = conf.get(RECOVERY_DIRECTORY)

Review Comment:
   +1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on code in PR #44173:
URL: https://github.com/apache/spark/pull/44173#discussion_r1415142557


##########
core/src/main/scala/org/apache/spark/deploy/master/RocksDBPersistenceEngine.scala:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+import java.nio.ByteBuffer
+import java.nio.charset.StandardCharsets.UTF_8
+import java.nio.file.{Files, Paths}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
+
+import org.rocksdb._
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.serializer.Serializer
+
+
+/**
+ * Stores data in RocksDB.
+ *
+ * @param dir Directory to setup RocksDB. Created if non-existent.
+ * @param serializer Used to serialize our objects.
+ */
+private[master] class RocksDBPersistenceEngine(
+    val dir: String,
+    val serializer: Serializer)
+  extends PersistenceEngine with Logging {
+
+  RocksDB.loadLibrary()
+
+  private val path = Files.createDirectories(Paths.get(dir))
+
+  /**
+   * Use full filter.
+   * Disable compression in index data.
+   *
+   * https://github.com/facebook/rocksdb/wiki/RocksDB-Bloom-Filter#full-filters-new-format
+   */
+  private val tableFormatConfig = new BlockBasedTableConfig()
+    .setFilterPolicy(new BloomFilter(10.0D, false))
+    .setEnableIndexCompression(false)
+    .setIndexBlockRestartInterval(8)
+    .setFormatVersion(5)
+
+  /**
+   * Use ZSTD at the bottom most level to reduce the disk space
+   * Use LZ4 at the other levels because it's better than Snappy in general.
+   *
+   * https://github.com/facebook/rocksdb/wiki/Compression#configuration
+   */
+  private val options = new Options()
+    .setCreateIfMissing(true)
+    .setBottommostCompressionType(CompressionType.ZSTD_COMPRESSION)
+    .setCompressionType(CompressionType.LZ4_COMPRESSION)
+    .setTableFormatConfig(tableFormatConfig)
+
+  private val db: RocksDB = RocksDB.open(options, path.toString)
+
+  override def persist(name: String, obj: Object): Unit = {
+    val serialized = serializer.newInstance().serialize(obj)
+    if (serialized.hasArray) {
+      db.put(name.getBytes(UTF_8), serialized.array())

Review Comment:
   To @yaooqinn , `PersistenceEngine` is a secondary storage instead of lookup tables. In general, `Master` has a way to keep the uniqueness in the memory structure before reaching here.
   
   For example, `WorkerInfo` is under tracking of 
   
   https://github.com/apache/spark/blob/1d80d80a56c418f841e282ad753fad6671c3baae/core/src/main/scala/org/apache/spark/deploy/master/Master.scala#L76C9-L76C9
   ```
   private val idToWorker = new HashMap[String, WorkerInfo]
   ```
   
   In addition, Driver IDs and App IDs are generated by Master in the unique way including timestamps.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun closed pull request #44173: [SPARK-46258][CORE] Add `RocksDBPersistenceEngine`
URL: https://github.com/apache/spark/pull/44173


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on PR #44173:
URL: https://github.com/apache/spark/pull/44173#issuecomment-1840278347

   I updated the benchmark result with the final commit, too~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]

Posted by "yaooqinn (via GitHub)" <gi...@apache.org>.
yaooqinn commented on PR #44173:
URL: https://github.com/apache/spark/pull/44173#issuecomment-1840281483

   LGTM


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #44173:
URL: https://github.com/apache/spark/pull/44173#discussion_r1414940118


##########
core/src/main/scala/org/apache/spark/deploy/master/Master.scala:
##########
@@ -186,6 +186,10 @@ private[deploy] class Master(
         val fsFactory =
           new FileSystemRecoveryModeFactory(conf, serializer)
         (fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
+      case "ROCKSDB" =>

Review Comment:
   Is this configuration item required to be `ROCKSDB`? Is it not allowed to configure it as `rocksdb` or `Rocksdb` now? (Perhaps this is a legacy issue?)



##########
core/src/main/scala/org/apache/spark/deploy/master/Master.scala:
##########
@@ -186,6 +186,10 @@ private[deploy] class Master(
         val fsFactory =
           new FileSystemRecoveryModeFactory(conf, serializer)
         (fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
+      case "ROCKSDB" =>

Review Comment:
   Is this configuration item required to be `ROCKSDB`? Is it not allowed to configure it as `rocksdb` or `Rocksdb` now? (Perhaps this is a legacy issue?)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on PR #44173:
URL: https://github.com/apache/spark/pull/44173#issuecomment-1840164897

   Maybe the documents also need to be updated in this pr?
   
   https://github.com/apache/spark/blob/1d80d80a56c418f841e282ad753fad6671c3baae/docs/spark-standalone.md?plain=1#L736


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on code in PR #44173:
URL: https://github.com/apache/spark/pull/44173#discussion_r1415000391


##########
core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala:
##########
@@ -67,6 +67,25 @@ private[master] class FileSystemRecoveryModeFactory(conf: SparkConf, serializer:
   }
 }
 
+/**
+ * LeaderAgent in this case is a no-op. Since leader is forever leader as the actual
+ * recovery is made by restoring from RocksDB.
+ */
+private[master] class RocksDBRecoveryModeFactory(conf: SparkConf, serializer: Serializer)
+  extends StandaloneRecoveryModeFactory(conf, serializer) with Logging {
+
+  val recoveryDir = conf.get(RECOVERY_DIRECTORY)

Review Comment:
   Sure.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on code in PR #44173:
URL: https://github.com/apache/spark/pull/44173#discussion_r1415108828


##########
core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala:
##########
@@ -67,6 +67,25 @@ private[master] class FileSystemRecoveryModeFactory(conf: SparkConf, serializer:
   }
 }
 
+/**
+ * LeaderAgent in this case is a no-op. Since leader is forever leader as the actual
+ * recovery is made by restoring from RocksDB.
+ */
+private[master] class RocksDBRecoveryModeFactory(conf: SparkConf, serializer: Serializer)
+  extends StandaloneRecoveryModeFactory(conf, serializer) with Logging {
+
+  val recoveryDir = conf.get(RECOVERY_DIRECTORY)

Review Comment:
   Thank you, @yaooqinn . Ya, that's true. ZookeeperPersistenceEngine has a separate configuration namespace for that. That's one of the reasons why I don't want to add a new namespace.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on PR #44173:
URL: https://github.com/apache/spark/pull/44173#issuecomment-1840284937

   Thank you so much both of you, @LuciferYang and @yaooqinn . Merged to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]

Posted by "yaooqinn (via GitHub)" <gi...@apache.org>.
yaooqinn commented on code in PR #44173:
URL: https://github.com/apache/spark/pull/44173#discussion_r1415148979


##########
core/src/main/scala/org/apache/spark/deploy/master/RocksDBPersistenceEngine.scala:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+import java.nio.ByteBuffer
+import java.nio.charset.StandardCharsets.UTF_8
+import java.nio.file.{Files, Paths}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
+
+import org.rocksdb._
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.serializer.Serializer
+
+
+/**
+ * Stores data in RocksDB.
+ *
+ * @param dir Directory to setup RocksDB. Created if non-existent.
+ * @param serializer Used to serialize our objects.
+ */
+private[master] class RocksDBPersistenceEngine(
+    val dir: String,
+    val serializer: Serializer)
+  extends PersistenceEngine with Logging {
+
+  RocksDB.loadLibrary()
+
+  private val path = Files.createDirectories(Paths.get(dir))
+
+  /**
+   * Use full filter.
+   * Disable compression in index data.
+   *
+   * https://github.com/facebook/rocksdb/wiki/RocksDB-Bloom-Filter#full-filters-new-format
+   */
+  private val tableFormatConfig = new BlockBasedTableConfig()
+    .setFilterPolicy(new BloomFilter(10.0D, false))
+    .setEnableIndexCompression(false)
+    .setIndexBlockRestartInterval(8)
+    .setFormatVersion(5)
+
+  /**
+   * Use ZSTD at the bottom most level to reduce the disk space
+   * Use LZ4 at the other levels because it's better than Snappy in general.
+   *
+   * https://github.com/facebook/rocksdb/wiki/Compression#configuration
+   */
+  private val options = new Options()
+    .setCreateIfMissing(true)
+    .setBottommostCompressionType(CompressionType.ZSTD_COMPRESSION)
+    .setCompressionType(CompressionType.LZ4_COMPRESSION)
+    .setTableFormatConfig(tableFormatConfig)
+
+  private val db: RocksDB = RocksDB.open(options, path.toString)
+
+  override def persist(name: String, obj: Object): Unit = {
+    val serialized = serializer.newInstance().serialize(obj)
+    if (serialized.hasArray) {
+      db.put(name.getBytes(UTF_8), serialized.array())

Review Comment:
   > When do you think persist is invoked with the duplication?
   
   I guess it won't happen. I'm just asking because of the existing test `SPARK-46191: FileSystemPersistenceEngine.persist error message for the existing file`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on code in PR #44173:
URL: https://github.com/apache/spark/pull/44173#discussion_r1415126869


##########
core/src/main/scala/org/apache/spark/deploy/master/RocksDBPersistenceEngine.scala:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+import java.nio.ByteBuffer
+import java.nio.charset.StandardCharsets.UTF_8
+import java.nio.file.{Files, Paths}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
+
+import org.rocksdb._
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.serializer.Serializer
+
+
+/**
+ * Stores data in RocksDB.
+ *
+ * @param dir Directory to setup RocksDB. Created if non-existent.
+ * @param serializer Used to serialize our objects.
+ */
+private[master] class RocksDBPersistenceEngine(
+    val dir: String,
+    val serializer: Serializer)
+  extends PersistenceEngine with Logging {
+
+  RocksDB.loadLibrary()
+
+  private val path = Files.createDirectories(Paths.get(dir))
+
+  /**
+   * Use full filter.
+   * Disable compression in index data.
+   *
+   * https://github.com/facebook/rocksdb/wiki/RocksDB-Bloom-Filter#full-filters-new-format
+   */
+  private val tableFormatConfig = new BlockBasedTableConfig()
+    .setFilterPolicy(new BloomFilter(10.0D, false))
+    .setEnableIndexCompression(false)
+    .setIndexBlockRestartInterval(8)
+    .setFormatVersion(5)
+
+  /**
+   * Use ZSTD at the bottom most level to reduce the disk space
+   * Use LZ4 at the other levels because it's better than Snappy in general.
+   *
+   * https://github.com/facebook/rocksdb/wiki/Compression#configuration
+   */
+  private val options = new Options()
+    .setCreateIfMissing(true)
+    .setBottommostCompressionType(CompressionType.ZSTD_COMPRESSION)
+    .setCompressionType(CompressionType.LZ4_COMPRESSION)
+    .setTableFormatConfig(tableFormatConfig)
+
+  private val db: RocksDB = RocksDB.open(options, path.toString)
+
+  override def persist(name: String, obj: Object): Unit = {
+    val serialized = serializer.newInstance().serialize(obj)
+    if (serialized.hasArray) {
+      db.put(name.getBytes(UTF_8), serialized.array())

Review Comment:
   When do you think `persist` is invoked with the duplication?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]

Posted by "yaooqinn (via GitHub)" <gi...@apache.org>.
yaooqinn commented on code in PR #44173:
URL: https://github.com/apache/spark/pull/44173#discussion_r1415066164


##########
core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala:
##########
@@ -67,6 +67,25 @@ private[master] class FileSystemRecoveryModeFactory(conf: SparkConf, serializer:
   }
 }
 
+/**
+ * LeaderAgent in this case is a no-op. Since leader is forever leader as the actual
+ * recovery is made by restoring from RocksDB.
+ */
+private[master] class RocksDBRecoveryModeFactory(conf: SparkConf, serializer: Serializer)
+  extends StandaloneRecoveryModeFactory(conf, serializer) with Logging {
+
+  val recoveryDir = conf.get(RECOVERY_DIRECTORY)

Review Comment:
   The thing is, it's pretty common to see that old naming conventions just don't cut it when adding new features.
   
   Technically, we have the `spark.deploy.zookeeper.dir` for ZK-mode although it's a znode. 
   
   I don't have a strong opinion for a new configuration here as it's trivial.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on PR #44173:
URL: https://github.com/apache/spark/pull/44173#issuecomment-1840203815

   Thank you, @LuciferYang .


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on PR #44173:
URL: https://github.com/apache/spark/pull/44173#issuecomment-1840044360

   Could you review this PR, @yaooqinn ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]

Posted by "yaooqinn (via GitHub)" <gi...@apache.org>.
yaooqinn commented on code in PR #44173:
URL: https://github.com/apache/spark/pull/44173#discussion_r1414926099


##########
core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala:
##########
@@ -67,6 +67,25 @@ private[master] class FileSystemRecoveryModeFactory(conf: SparkConf, serializer:
   }
 }
 
+/**
+ * LeaderAgent in this case is a no-op. Since leader is forever leader as the actual
+ * recovery is made by restoring from RocksDB.
+ */
+private[master] class RocksDBRecoveryModeFactory(conf: SparkConf, serializer: Serializer)
+  extends StandaloneRecoveryModeFactory(conf, serializer) with Logging {
+
+  val recoveryDir = conf.get(RECOVERY_DIRECTORY)

Review Comment:
   It looks better to have a separate directory from the FileSystemRecoveryModeFactory



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]

Posted by "yaooqinn (via GitHub)" <gi...@apache.org>.
yaooqinn commented on code in PR #44173:
URL: https://github.com/apache/spark/pull/44173#discussion_r1415095668


##########
core/src/main/scala/org/apache/spark/deploy/master/RocksDBPersistenceEngine.scala:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+import java.nio.ByteBuffer
+import java.nio.charset.StandardCharsets.UTF_8
+import java.nio.file.{Files, Paths}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
+
+import org.rocksdb._
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.serializer.Serializer
+
+
+/**
+ * Stores data in RocksDB.
+ *
+ * @param dir Directory to setup RocksDB. Created if non-existent.
+ * @param serializer Used to serialize our objects.
+ */
+private[master] class RocksDBPersistenceEngine(
+    val dir: String,
+    val serializer: Serializer)
+  extends PersistenceEngine with Logging {
+
+  RocksDB.loadLibrary()
+
+  private val path = Files.createDirectories(Paths.get(dir))
+
+  /**
+   * Use full filter.
+   * Disable compression in index data.
+   *
+   * https://github.com/facebook/rocksdb/wiki/RocksDB-Bloom-Filter#full-filters-new-format
+   */
+  private val tableFormatConfig = new BlockBasedTableConfig()
+    .setFilterPolicy(new BloomFilter(10.0D, false))
+    .setEnableIndexCompression(false)
+    .setIndexBlockRestartInterval(8)
+    .setFormatVersion(5)
+
+  /**
+   * Use ZSTD at the bottom most level to reduce the disk space
+   * Use LZ4 at the other levels because it's better than Snappy in general.
+   *
+   * https://github.com/facebook/rocksdb/wiki/Compression#configuration
+   */
+  private val options = new Options()
+    .setCreateIfMissing(true)
+    .setBottommostCompressionType(CompressionType.ZSTD_COMPRESSION)
+    .setCompressionType(CompressionType.LZ4_COMPRESSION)
+    .setTableFormatConfig(tableFormatConfig)
+
+  private val db: RocksDB = RocksDB.open(options, path.toString)
+
+  override def persist(name: String, obj: Object): Unit = {
+    val serialized = serializer.newInstance().serialize(obj)
+    if (serialized.hasArray) {
+      db.put(name.getBytes(UTF_8), serialized.array())

Review Comment:
   Does `db.put` behave the same as file creation in FileSystemPersistenceEngine and znode creation in ZooKeeperPersistenceEngine with duplicated `name`s?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on code in PR #44173:
URL: https://github.com/apache/spark/pull/44173#discussion_r1415034531


##########
core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala:
##########
@@ -67,6 +67,25 @@ private[master] class FileSystemRecoveryModeFactory(conf: SparkConf, serializer:
   }
 }
 
+/**
+ * LeaderAgent in this case is a no-op. Since leader is forever leader as the actual
+ * recovery is made by restoring from RocksDB.
+ */
+private[master] class RocksDBRecoveryModeFactory(conf: SparkConf, serializer: Serializer)
+  extends StandaloneRecoveryModeFactory(conf, serializer) with Logging {
+
+  val recoveryDir = conf.get(RECOVERY_DIRECTORY)

Review Comment:
   No, it will be okay. For example, rocksdb content will be ignored because FILESYSTEM scans only the following `prefix` files. I guess RocksDB also do the same things for the FILESYSTEM's leftover.
   
   https://github.com/apache/spark/blob/1d80d80a56c418f841e282ad753fad6671c3baae/core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala#L56-L78
   
   However, that kind of changes are not supposed to be there. For the following three configurations, the directory should be cleared.
   
   - spark.deploy.recoveryMode
   - spark.deploy.recoverySerializer
   - spark.deploy.recoveryCompressionCodec



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on code in PR #44173:
URL: https://github.com/apache/spark/pull/44173#discussion_r1414996134


##########
core/src/main/scala/org/apache/spark/deploy/master/RocksDBPersistenceEngine.scala:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+import java.nio.ByteBuffer
+import java.nio.charset.StandardCharsets.UTF_8
+import java.nio.file.{Files, Paths}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
+
+import org.rocksdb._
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.serializer.Serializer
+
+
+/**
+ * Stores data in RocksDB.
+ *
+ * @param dir Directory to setup RocksDB. Created if non-existent.
+ * @param serializer Used to serialize our objects.
+ */
+private[master] class RocksDBPersistenceEngine(
+    val dir: String,
+    val serializer: Serializer)
+  extends PersistenceEngine with Logging {
+
+  RocksDB.loadLibrary()
+
+  val path = Files.createDirectories(Paths.get(dir))
+
+  /**
+   * Use full filter.
+   * Disable compression in index data.
+   *
+   * https://github.com/facebook/rocksdb/wiki/RocksDB-Bloom-Filter#full-filters-new-format
+   */
+  private val tableFormatConfig = new BlockBasedTableConfig()
+    .setFilterPolicy(new BloomFilter(10.0D, false))
+    .setEnableIndexCompression(false)
+    .setIndexBlockRestartInterval(8)
+    .setFormatVersion(5)
+
+  /**
+   * Use ZSTD at the bottom most level to reduce the disk space
+   * Use LZ4 at the other levels because it's better than Snappy in general.
+   *
+   * https://github.com/facebook/rocksdb/wiki/Compression#configuration
+   */
+  private val options = new Options()
+    .setCreateIfMissing(true)
+    .setBottommostCompressionType(CompressionType.ZSTD_COMPRESSION)
+    .setCompressionType(CompressionType.LZ4_COMPRESSION)
+    .setTableFormatConfig(tableFormatConfig)
+
+  private val db: RocksDB = RocksDB.open(options, path.toString)
+
+  override def persist(name: String, obj: Object): Unit = {
+    val serialized = serializer.newInstance().serialize(obj)

Review Comment:
   This one, right?
   ```scala
     override def persist(name: String, obj: Object): Unit = {
       val serialized = serializer.newInstance().serialize(obj)
       if (serialized.hasArray) {
         db.put(name.getBytes(UTF_8), serialized.array())
       } else {
         val bytes = new Array[Byte](serialized.remaining())
         serialized.get(bytes)
         db.put(name.getBytes(UTF_8), bytes)
       }
     }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on code in PR #44173:
URL: https://github.com/apache/spark/pull/44173#discussion_r1414983766


##########
core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala:
##########
@@ -67,6 +67,25 @@ private[master] class FileSystemRecoveryModeFactory(conf: SparkConf, serializer:
   }
 }
 
+/**
+ * LeaderAgent in this case is a no-op. Since leader is forever leader as the actual
+ * recovery is made by restoring from RocksDB.
+ */
+private[master] class RocksDBRecoveryModeFactory(conf: SparkConf, serializer: Serializer)
+  extends StandaloneRecoveryModeFactory(conf, serializer) with Logging {
+
+  val recoveryDir = conf.get(RECOVERY_DIRECTORY)

Review Comment:
   Thank you for review. @yaooqinn and @LuciferYang 
   
   I tried according to your advices but it seems to make the configuration namespace a little weird because this existing configuration is a general one, `spark.deploy.recoveryDirectory`.
   
   https://github.com/apache/spark/blob/1d80d80a56c418f841e282ad753fad6671c3baae/core/src/main/scala/org/apache/spark/internal/config/Deploy.scala#L54
   
   If we introduce a new configuration for `RocksDBPersistenceEngine` like the following, the users will ask why we cannot use `spark.deploy.recoveryDirectory` for `ROCKSDB` mode. Here is the example. The AS-IS design aims to allow the users to switch a single configuration `recoveryMode`.
   
   **BEFORE**
   ```
   spark.deploy.recoveryMode=FILESYSTEM
   spark.deploy.recoveryDirectory=/opt/master
   ```
   
   **AFTER**
   ```
   spark.deploy.recoveryMode=ROCKSDB
   spark.deploy.recoveryDirectory=/opt/master
   ```
   
   If we add a new configuration, it would be the following where `spark.deploy.recoveryDirectory` is no-op.
   ```
   spark.deploy.recoveryMode=ROCKSDB
   spark.deploy.recoveryDirectory=/opt/master
   spark.deploy.recoveryRocksDBDirectory=/opt/db
   ```
   
   In addition, we assume that the users clear the location properly when they changes the settings like the following. 
   - `spark.deploy.recoverySerializer`
   - `spark.deploy.recoveryCompressionCodec`
   
   To sum up, if you guys don't mind, I'd like to use the existing general configuration name for this backend.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on PR #44173:
URL: https://github.com/apache/spark/pull/44173#issuecomment-1840187474

   Thank you for pointing out that.
   > Maybe the documents also need to be updated in this pr?
   
   Actually, I thought about making a new doc PR for the following accumulated stuffs. I also want to add some recommendation for the user choices. Let me proceed them separately~
   ```
   [SPARK-46258][CORE] Add RocksDBPersistenceEngine
   [SPARK-46216][CORE] Improve `FileSystemPersistenceEngine` to support compressions
   [SPARK-46205][CORE] Improve `PersistenceEngine` performance with `KryoSerializer`
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on code in PR #44173:
URL: https://github.com/apache/spark/pull/44173#discussion_r1415147988


##########
core/src/main/scala/org/apache/spark/deploy/master/RocksDBPersistenceEngine.scala:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+import java.nio.ByteBuffer
+import java.nio.charset.StandardCharsets.UTF_8
+import java.nio.file.{Files, Paths}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
+
+import org.rocksdb._
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.serializer.Serializer
+
+
+/**
+ * Stores data in RocksDB.
+ *
+ * @param dir Directory to setup RocksDB. Created if non-existent.
+ * @param serializer Used to serialize our objects.
+ */
+private[master] class RocksDBPersistenceEngine(
+    val dir: String,
+    val serializer: Serializer)
+  extends PersistenceEngine with Logging {
+
+  RocksDB.loadLibrary()
+
+  val path = Files.createDirectories(Paths.get(dir))
+
+  /**
+   * Use full filter.
+   * Disable compression in index data.
+   *
+   * https://github.com/facebook/rocksdb/wiki/RocksDB-Bloom-Filter#full-filters-new-format
+   */
+  private val tableFormatConfig = new BlockBasedTableConfig()
+    .setFilterPolicy(new BloomFilter(10.0D, false))
+    .setEnableIndexCompression(false)
+    .setIndexBlockRestartInterval(8)
+    .setFormatVersion(5)
+
+  /**
+   * Use ZSTD at the bottom most level to reduce the disk space
+   * Use LZ4 at the other levels because it's better than Snappy in general.
+   *
+   * https://github.com/facebook/rocksdb/wiki/Compression#configuration
+   */
+  private val options = new Options()
+    .setCreateIfMissing(true)
+    .setBottommostCompressionType(CompressionType.ZSTD_COMPRESSION)
+    .setCompressionType(CompressionType.LZ4_COMPRESSION)
+    .setTableFormatConfig(tableFormatConfig)
+
+  private val db: RocksDB = RocksDB.open(options, path.toString)
+
+  override def persist(name: String, obj: Object): Unit = {
+    val serialized = serializer.newInstance().serialize(obj)

Review Comment:
   To @LuciferYang , the benchmark is updated with your suggestion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on code in PR #44173:
URL: https://github.com/apache/spark/pull/44173#discussion_r1415142557


##########
core/src/main/scala/org/apache/spark/deploy/master/RocksDBPersistenceEngine.scala:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+import java.nio.ByteBuffer
+import java.nio.charset.StandardCharsets.UTF_8
+import java.nio.file.{Files, Paths}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
+
+import org.rocksdb._
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.serializer.Serializer
+
+
+/**
+ * Stores data in RocksDB.
+ *
+ * @param dir Directory to setup RocksDB. Created if non-existent.
+ * @param serializer Used to serialize our objects.
+ */
+private[master] class RocksDBPersistenceEngine(
+    val dir: String,
+    val serializer: Serializer)
+  extends PersistenceEngine with Logging {
+
+  RocksDB.loadLibrary()
+
+  private val path = Files.createDirectories(Paths.get(dir))
+
+  /**
+   * Use full filter.
+   * Disable compression in index data.
+   *
+   * https://github.com/facebook/rocksdb/wiki/RocksDB-Bloom-Filter#full-filters-new-format
+   */
+  private val tableFormatConfig = new BlockBasedTableConfig()
+    .setFilterPolicy(new BloomFilter(10.0D, false))
+    .setEnableIndexCompression(false)
+    .setIndexBlockRestartInterval(8)
+    .setFormatVersion(5)
+
+  /**
+   * Use ZSTD at the bottom most level to reduce the disk space
+   * Use LZ4 at the other levels because it's better than Snappy in general.
+   *
+   * https://github.com/facebook/rocksdb/wiki/Compression#configuration
+   */
+  private val options = new Options()
+    .setCreateIfMissing(true)
+    .setBottommostCompressionType(CompressionType.ZSTD_COMPRESSION)
+    .setCompressionType(CompressionType.LZ4_COMPRESSION)
+    .setTableFormatConfig(tableFormatConfig)
+
+  private val db: RocksDB = RocksDB.open(options, path.toString)
+
+  override def persist(name: String, obj: Object): Unit = {
+    val serialized = serializer.newInstance().serialize(obj)
+    if (serialized.hasArray) {
+      db.put(name.getBytes(UTF_8), serialized.array())

Review Comment:
   To @yaooqinn , `PersistenceEngine` is a secondary storage instead of lookup tables. In general, `Master` has a way to keep the uniqueness in the memory structure before reaching here.
   
   For example, `WorkerInfo` is under tracking of 
   
   https://github.com/apache/spark/blob/1d80d80a56c418f841e282ad753fad6671c3baae/core/src/main/scala/org/apache/spark/deploy/master/Master.scala#L76C9-L76C9
   ```
   private val idToWorker = new HashMap[String, WorkerInfo]
   ```
   
   In addition, Driver IDs and App IDs are generated by Master in the unique way by the patterns including timestamps.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on code in PR #44173:
URL: https://github.com/apache/spark/pull/44173#discussion_r1414986088


##########
core/src/main/scala/org/apache/spark/deploy/master/RocksDBPersistenceEngine.scala:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+import java.nio.ByteBuffer
+import java.nio.charset.StandardCharsets.UTF_8
+import java.nio.file.{Files, Paths}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
+
+import org.rocksdb._
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.serializer.Serializer
+
+
+/**
+ * Stores data in RocksDB.
+ *
+ * @param dir Directory to setup RocksDB. Created if non-existent.
+ * @param serializer Used to serialize our objects.
+ */
+private[master] class RocksDBPersistenceEngine(
+    val dir: String,
+    val serializer: Serializer)
+  extends PersistenceEngine with Logging {
+
+  RocksDB.loadLibrary()
+
+  val path = Files.createDirectories(Paths.get(dir))
+
+  /**
+   * Use full filter.
+   * Disable compression in index data.
+   *
+   * https://github.com/facebook/rocksdb/wiki/RocksDB-Bloom-Filter#full-filters-new-format
+   */
+  private val tableFormatConfig = new BlockBasedTableConfig()
+    .setFilterPolicy(new BloomFilter(10.0D, false))
+    .setEnableIndexCompression(false)
+    .setIndexBlockRestartInterval(8)
+    .setFormatVersion(5)
+
+  /**
+   * Use ZSTD at the bottom most level to reduce the disk space
+   * Use LZ4 at the other levels because it's better than Snappy in general.
+   *
+   * https://github.com/facebook/rocksdb/wiki/Compression#configuration
+   */
+  private val options = new Options()
+    .setCreateIfMissing(true)
+    .setBottommostCompressionType(CompressionType.ZSTD_COMPRESSION)
+    .setCompressionType(CompressionType.LZ4_COMPRESSION)
+    .setTableFormatConfig(tableFormatConfig)
+
+  private val db: RocksDB = RocksDB.open(options, path.toString)
+
+  override def persist(name: String, obj: Object): Unit = {
+    val serialized = serializer.newInstance().serialize(obj)
+    val bytes = new Array[Byte](serialized.remaining())
+    serialized.get(bytes)
+    db.put(name.getBytes(UTF_8), bytes)
+  }
+
+  override def unpersist(name: String): Unit = {
+    db.delete(name.getBytes(UTF_8))
+  }
+
+  override def read[T: ClassTag](name: String): Seq[T] = {
+    val result = new ArrayBuffer[T]
+    val iter = db.newIterator()
+    iter.seek(name.getBytes(UTF_8))
+    while (iter.isValid && new String(iter.key()).startsWith(name)) {
+      result.append(serializer.newInstance().deserialize[T](ByteBuffer.wrap(iter.value())))
+      iter.next()
+    }
+    iter.close()

Review Comment:
   Sure.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #44173:
URL: https://github.com/apache/spark/pull/44173#discussion_r1415001683


##########
core/src/main/scala/org/apache/spark/deploy/master/RocksDBPersistenceEngine.scala:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+import java.nio.ByteBuffer
+import java.nio.charset.StandardCharsets.UTF_8
+import java.nio.file.{Files, Paths}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
+
+import org.rocksdb._
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.serializer.Serializer
+
+
+/**
+ * Stores data in RocksDB.
+ *
+ * @param dir Directory to setup RocksDB. Created if non-existent.
+ * @param serializer Used to serialize our objects.
+ */
+private[master] class RocksDBPersistenceEngine(
+    val dir: String,
+    val serializer: Serializer)
+  extends PersistenceEngine with Logging {
+
+  RocksDB.loadLibrary()
+
+  val path = Files.createDirectories(Paths.get(dir))
+
+  /**
+   * Use full filter.
+   * Disable compression in index data.
+   *
+   * https://github.com/facebook/rocksdb/wiki/RocksDB-Bloom-Filter#full-filters-new-format
+   */
+  private val tableFormatConfig = new BlockBasedTableConfig()
+    .setFilterPolicy(new BloomFilter(10.0D, false))
+    .setEnableIndexCompression(false)
+    .setIndexBlockRestartInterval(8)
+    .setFormatVersion(5)
+
+  /**
+   * Use ZSTD at the bottom most level to reduce the disk space
+   * Use LZ4 at the other levels because it's better than Snappy in general.
+   *
+   * https://github.com/facebook/rocksdb/wiki/Compression#configuration
+   */
+  private val options = new Options()
+    .setCreateIfMissing(true)
+    .setBottommostCompressionType(CompressionType.ZSTD_COMPRESSION)
+    .setCompressionType(CompressionType.LZ4_COMPRESSION)
+    .setTableFormatConfig(tableFormatConfig)
+
+  private val db: RocksDB = RocksDB.open(options, path.toString)
+
+  override def persist(name: String, obj: Object): Unit = {
+    val serialized = serializer.newInstance().serialize(obj)

Review Comment:
   Yes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on code in PR #44173:
URL: https://github.com/apache/spark/pull/44173#discussion_r1414983766


##########
core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala:
##########
@@ -67,6 +67,25 @@ private[master] class FileSystemRecoveryModeFactory(conf: SparkConf, serializer:
   }
 }
 
+/**
+ * LeaderAgent in this case is a no-op. Since leader is forever leader as the actual
+ * recovery is made by restoring from RocksDB.
+ */
+private[master] class RocksDBRecoveryModeFactory(conf: SparkConf, serializer: Serializer)
+  extends StandaloneRecoveryModeFactory(conf, serializer) with Logging {
+
+  val recoveryDir = conf.get(RECOVERY_DIRECTORY)

Review Comment:
   Thank you for review. @yaooqinn and @LuciferYang 
   
   I tried according to your advices but it seems to make the configuration namespace a little weird because this existing configuration is a general one, `spark.deploy.recoveryDirectory`.
   
   https://github.com/apache/spark/blob/1d80d80a56c418f841e282ad753fad6671c3baae/core/src/main/scala/org/apache/spark/internal/config/Deploy.scala#L54
   
   If we introduce a new configuration for `RocksDBPersistenceEngine` like the following, the users will ask why we cannot use `spark.deploy.recoveryDirectory` for `ROCKSDB` mode. Here is the example. The AS-IS design aims to allow the users to switch a single configuration `recoveryMode`.
   
   **BEFORE**
   ```
   spark.deploy.recoveryMode=FILESYSTEM
   spark.deploy.recoveryDirectory=/opt/master
   ```
   
   **AFTER**
   ```
   spark.deploy.recoveryMode=ROCKSDB
   spark.deploy.recoveryDirectory=/opt/master
   ```
   
   If we add a new configuration, it would be the following where `spark.deploy.recoveryDirectory` is no-op.
   ```
   spark.deploy.recoveryMode=ROCKSDB
   spark.deploy.recoveryDirectory=/opt/master
   spark.deploy.recoveryRocksDBDirectory=/opt/db
   ```
   
   In addition, we assume that the users clear the location properly when they changes the setting like the following. 
   - `spark.deploy.recoverySerializer`
   - `spark.deploy.recoveryCompressionCodec`
   
   To sum up, if you guys don't mind, I'd like to use the existing general configuration name for this backend.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46258][CORE] Add `RocksDBPersistenceEngine` [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on code in PR #44173:
URL: https://github.com/apache/spark/pull/44173#discussion_r1414985071


##########
core/src/main/scala/org/apache/spark/deploy/master/RocksDBPersistenceEngine.scala:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+import java.nio.ByteBuffer
+import java.nio.charset.StandardCharsets.UTF_8
+import java.nio.file.{Files, Paths}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
+
+import org.rocksdb._
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.serializer.Serializer
+
+
+/**
+ * Stores data in RocksDB.
+ *
+ * @param dir Directory to setup RocksDB. Created if non-existent.
+ * @param serializer Used to serialize our objects.
+ */
+private[master] class RocksDBPersistenceEngine(
+    val dir: String,
+    val serializer: Serializer)
+  extends PersistenceEngine with Logging {
+
+  RocksDB.loadLibrary()
+
+  val path = Files.createDirectories(Paths.get(dir))

Review Comment:
   Thanks. Let me switch it to `private`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org