You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by ch...@apache.org on 2023/09/19 01:20:42 UTC

[incubator-celeborn] branch main updated: [CELEBORN-977] Support RocksDB as recover DB backend

This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new beed2a85b [CELEBORN-977] Support RocksDB as recover DB backend
beed2a85b is described below

commit beed2a85b04db41b7c36384798848e187a088f8b
Author: sychen <sy...@ctrip.com>
AuthorDate: Tue Sep 19 09:20:33 2023 +0800

    [CELEBORN-977] Support RocksDB as recover DB backend
    
    ### What changes were proposed in this pull request?
    
    ### Why are the changes needed?
    
    LevelDB does not support mac arm version.
    
    ```java
    java.lang.UnsatisfiedLinkError: Could not load library. Reasons: [no leveldbjni64-1.8 in java.library.path, no leveldbjni-1.8 in java.library.path, no leveldbjni in java.library.path, /private/var/folders/tc/r2n_8g6j4731h7clfqwntg880000gn/T/libleveldbjni-64-1-4616234670453989010.8: dlopen(/private/var/folders/tc/r2n_8g6j4731h7clfqwntg880000gn/T/libleveldbjni-64-1-4616234670453989010.8, 0x0001): tried: '/private/var/folders/tc/r2n_8g6j4731h7clfqwntg880000gn/T/libleveldbjni-64-1-4616234 [...]
            at org.fusesource.hawtjni.runtime.Library.doLoad(Library.java:182)
            at org.fusesource.hawtjni.runtime.Library.load(Library.java:140)
            at org.fusesource.leveldbjni.JniDBFactory.<clinit>(JniDBFactory.java:48)
            at org.apache.celeborn.service.deploy.worker.shuffledb.LevelDBProvider.initLevelDB(LevelDBProvider.java:49)
            at org.apache.celeborn.service.deploy.worker.shuffledb.DBProvider.initDB(DBProvider.java:30)
            at org.apache.celeborn.service.deploy.worker.storage.StorageManager.<init>(StorageManager.scala:197)
            at org.apache.celeborn.service.deploy.worker.Worker.<init>(Worker.scala:109)
            at org.apache.celeborn.service.deploy.worker.Worker$.main(Worker.scala:734)
            at org.apache.celeborn.service.deploy.worker.Worker.main(Worker.scala)
    ```
    
    The released `leveldbjni-all` for `org.fusesource.leveldbjni` does not support AArch64 Linux, we need to use `org.openlabtesting.leveldbjni`.
    
    See https://issues.apache.org/jira/browse/HADOOP-16614
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    local test
    
    Closes #1913 from cxzl25/CELEBORN-977.
    
    Authored-by: sychen <sy...@ctrip.com>
    Signed-off-by: Cheng Pan <ch...@apache.org>
---
 LICENSE-binary                                     |   1 +
 .../org/apache/celeborn/common/CelebornConf.scala  |  14 +-
 dev/deps/dependencies-server                       |   1 +
 docs/configuration/worker.md                       |   3 +-
 pom.xml                                            |   6 +
 project/CelebornBuild.scala                        |   5 +-
 worker/pom.xml                                     |   4 +
 .../deploy/worker/ShuffleRecoverHelper.java        |   4 +-
 .../DB.java}                                       |  33 +++--
 .../DBBackend.java}                                |  33 +++--
 .../DBIterator.java}                               |  23 ++-
 .../DBProvider.java}                               |  33 +++--
 .../service/deploy/worker/shuffledb/LevelDB.java   |  66 +++++++++
 .../deploy/worker/shuffledb/LevelDBIterator.java   |  89 ++++++++++++
 .../worker/{ => shuffledb}/LevelDBProvider.java    |  46 ++----
 .../service/deploy/worker/shuffledb/RocksDB.java   |  88 ++++++++++++
 .../deploy/worker/shuffledb/RocksDBIterator.java   |  96 +++++++++++++
 .../deploy/worker/shuffledb/RocksDBProvider.java   | 157 +++++++++++++++++++++
 .../StoreVersion.java}                             |  41 ++++--
 .../worker/storage/PartitionFilesSorter.java       |  26 ++--
 .../deploy/worker/storage/StorageManager.scala     |  21 +--
 21 files changed, 664 insertions(+), 126 deletions(-)

diff --git a/LICENSE-binary b/LICENSE-binary
index 8f081384d..6971f6b64 100644
--- a/LICENSE-binary
+++ b/LICENSE-binary
@@ -268,6 +268,7 @@ org.scala-lang:scala-library
 org.scala-lang:scala-reflect
 org.slf4j:jcl-over-slf4j
 org.yaml:snakeyaml
+org.rocksdb:rocksdbjni
 
 
 ------------------------------------------------------------------------------------
diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 16a113b9b..429ad5ec7 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -932,6 +932,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
   def workerGracefulShutdownCheckSlotsFinishedTimeoutMs: Long =
     get(WORKER_CHECK_SLOTS_FINISHED_TIMEOUT)
   def workerGracefulShutdownRecoverPath: String = get(WORKER_GRACEFUL_SHUTDOWN_RECOVER_PATH)
+  def workerGracefulShutdownRecoverDbBackend: String =
+    get(WORKER_GRACEFUL_SHUTDOWN_RECOVER_DB_BACKEND)
   def workerGracefulShutdownPartitionSorterCloseAwaitTimeMs: Long =
     get(WORKER_PARTITION_SORTER_SHUTDOWN_TIMEOUT)
   def workerGracefulShutdownFlusherShutdownTimeoutMs: Long = get(WORKER_FLUSHER_SHUTDOWN_TIMEOUT)
@@ -2603,12 +2605,22 @@ object CelebornConf extends Logging {
   val WORKER_GRACEFUL_SHUTDOWN_RECOVER_PATH: ConfigEntry[String] =
     buildConf("celeborn.worker.graceful.shutdown.recoverPath")
       .categories("worker")
-      .doc("The path to store levelDB.")
+      .doc("The path to store DB.")
       .version("0.2.0")
       .stringConf
       .transform(_.replace("<tmp>", System.getProperty("java.io.tmpdir")))
       .createWithDefault(s"<tmp>/recover")
 
+  val WORKER_GRACEFUL_SHUTDOWN_RECOVER_DB_BACKEND: ConfigEntry[String] =
+    buildConf("celeborn.worker.graceful.shutdown.recoverDbBackend")
+      .categories("worker")
+      .doc("Specifies a disk-based store used in local db. LEVELDB or ROCKSDB.")
+      .version("0.4.0")
+      .stringConf
+      .transform(_.toUpperCase(Locale.ROOT))
+      .checkValues(Set("LEVELDB", "ROCKSDB"))
+      .createWithDefault("LEVELDB")
+
   val WORKER_PARTITION_SORTER_SHUTDOWN_TIMEOUT: ConfigEntry[Long] =
     buildConf("celeborn.worker.graceful.shutdown.partitionSorter.shutdownTimeout")
       .categories("worker")
diff --git a/dev/deps/dependencies-server b/dev/deps/dependencies-server
index d52e773db..f93601cc3 100644
--- a/dev/deps/dependencies-server
+++ b/dev/deps/dependencies-server
@@ -84,6 +84,7 @@ ratis-server/2.5.1//ratis-server-2.5.1.jar
 ratis-shell/2.5.1//ratis-shell-2.5.1.jar
 ratis-thirdparty-misc/1.0.4//ratis-thirdparty-misc-1.0.4.jar
 reflections/0.10.2//reflections-0.10.2.jar
+rocksdbjni/8.5.3//rocksdbjni-8.5.3.jar
 scala-library/2.12.15//scala-library-2.12.15.jar
 scala-reflect/2.12.15//scala-reflect-2.12.15.jar
 shims/0.9.32//shims-0.9.32.jar
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index 52719bf66..3833726d5 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -56,7 +56,8 @@ license: |
 | celeborn.worker.graceful.shutdown.checkSlotsFinished.timeout | 480s | The wait time of waiting for the released slots to be committed or destroyed during worker graceful shutdown. | 0.2.0 | 
 | celeborn.worker.graceful.shutdown.enabled | false | When true, during worker shutdown, the worker will wait for all released slots to be committed or destroyed. | 0.2.0 | 
 | celeborn.worker.graceful.shutdown.partitionSorter.shutdownTimeout | 120s | The wait time of waiting for sorting partition files during worker graceful shutdown. | 0.2.0 | 
-| celeborn.worker.graceful.shutdown.recoverPath | &lt;tmp&gt;/recover | The path to store levelDB. | 0.2.0 | 
+| celeborn.worker.graceful.shutdown.recoverDbBackend | LEVELDB | Specifies a disk-based store used in local db. LEVELDB or ROCKSDB. | 0.4.0 | 
+| celeborn.worker.graceful.shutdown.recoverPath | &lt;tmp&gt;/recover | The path to store DB. | 0.2.0 | 
 | celeborn.worker.graceful.shutdown.saveCommittedFileInfo.interval | 5s | Interval for a Celeborn worker to flush committed file infos into Level DB. | 0.3.1 | 
 | celeborn.worker.graceful.shutdown.saveCommittedFileInfo.sync | false | Whether to call sync method to save committed file infos into Level DB to handle OS crash. | 0.3.1 | 
 | celeborn.worker.graceful.shutdown.timeout | 600s | The worker's graceful shutdown timeout time. | 0.2.0 | 
diff --git a/pom.xml b/pom.xml
index 2d3daa6ce..287376cf9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -92,6 +92,7 @@
     <snakeyaml.version>1.33</snakeyaml.version>
     <zstd-jni.version>1.5.2-1</zstd-jni.version>
     <kubernetes-client.version>6.7.0</kubernetes-client.version>
+    <rocksdbjni.version>8.5.3</rocksdbjni.version>
 
     <shading.prefix>org.apache.celeborn.shaded</shading.prefix>
 
@@ -441,6 +442,11 @@
         <version>${kubernetes-client.version}</version>
         <scope>test</scope>
       </dependency>
+      <dependency>
+        <groupId>org.rocksdb</groupId>
+        <artifactId>rocksdbjni</artifactId>
+        <version>${rocksdbjni.version}</version>
+      </dependency>
     </dependencies>
   </dependencyManagement>
 
diff --git a/project/CelebornBuild.scala b/project/CelebornBuild.scala
index 6aa6e872b..9188e42c3 100644
--- a/project/CelebornBuild.scala
+++ b/project/CelebornBuild.scala
@@ -53,11 +53,12 @@ object Dependencies {
   val nettyVersion = "4.1.93.Final"
   val ratisVersion = "2.5.1"
   val roaringBitmapVersion = "0.9.32"
+  val rocksdbJniVersion = "8.5.3"
   val scalatestMockitoVersion = "1.17.14"
   val scalatestVersion = "3.2.16"
   val slf4jVersion = "1.7.36"
   val snakeyamlVersion = "1.33"
-  
+
   // Versions for proto
   val protocVersion = "3.19.2"
   val protoVersion = "3.19.2"
@@ -89,6 +90,7 @@ object Dependencies {
   val ratisShell = "org.apache.ratis" % "ratis-shell" % ratisVersion excludeAll(
     ExclusionRule("org.slf4j", "slf4j-simple"))
   val roaringBitmap = "org.roaringbitmap" % "RoaringBitmap" % roaringBitmapVersion
+  val rocksdbJni = "org.rocksdb" % "rocksdbjni" % rocksdbJniVersion
   val scalaReflect = "org.scala-lang" % "scala-reflect" % projectScalaVersion
   val slf4jApi = "org.slf4j" % "slf4j-api" % slf4jVersion
   val slf4jJulToSlf4j = "org.slf4j" % "jul-to-slf4j" % slf4jVersion
@@ -404,6 +406,7 @@ object CelebornWorker {
         Dependencies.log4jSlf4jImpl,
         Dependencies.leveldbJniAll,
         Dependencies.roaringBitmap,
+        Dependencies.rocksdbJni,
         Dependencies.scalatestMockito % "test"
       ) ++ commonUnitTestDependencies
     )
diff --git a/worker/pom.xml b/worker/pom.xml
index b3d0daf86..5a8914e3f 100644
--- a/worker/pom.xml
+++ b/worker/pom.xml
@@ -76,6 +76,10 @@
       <groupId>org.apache.logging.log4j</groupId>
       <artifactId>log4j-1.2-api</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.rocksdb</groupId>
+      <artifactId>rocksdbjni</artifactId>
+    </dependency>
 
     <dependency>
       <groupId>org.apache.celeborn</groupId>
diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/ShuffleRecoverHelper.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/ShuffleRecoverHelper.java
index df1147993..fcfbc7338 100644
--- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/ShuffleRecoverHelper.java
+++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/ShuffleRecoverHelper.java
@@ -19,9 +19,11 @@ package org.apache.celeborn.service.deploy.worker;
 
 import java.nio.charset.StandardCharsets;
 
+import org.apache.celeborn.service.deploy.worker.shuffledb.StoreVersion;
+
 public abstract class ShuffleRecoverHelper {
   protected String SHUFFLE_KEY_PREFIX = "SHUFFLE-KEY";
-  protected LevelDBProvider.StoreVersion CURRENT_VERSION = new LevelDBProvider.StoreVersion(1, 0);
+  protected StoreVersion CURRENT_VERSION = new StoreVersion(1, 0);
 
   protected byte[] dbShuffleKey(String shuffleKey) {
     return (SHUFFLE_KEY_PREFIX + ";" + shuffleKey).getBytes(StandardCharsets.UTF_8);
diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/ShuffleRecoverHelper.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/DB.java
similarity index 53%
copy from worker/src/main/java/org/apache/celeborn/service/deploy/worker/ShuffleRecoverHelper.java
copy to worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/DB.java
index df1147993..20b275d8d 100644
--- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/ShuffleRecoverHelper.java
+++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/DB.java
@@ -15,22 +15,27 @@
  * limitations under the License.
  */
 
-package org.apache.celeborn.service.deploy.worker;
+package org.apache.celeborn.service.deploy.worker.shuffledb;
 
-import java.nio.charset.StandardCharsets;
+import java.io.Closeable;
 
-public abstract class ShuffleRecoverHelper {
-  protected String SHUFFLE_KEY_PREFIX = "SHUFFLE-KEY";
-  protected LevelDBProvider.StoreVersion CURRENT_VERSION = new LevelDBProvider.StoreVersion(1, 0);
+/** Note: code copied from Apache Spark. */
+public interface DB extends Closeable {
+  /** Set the DB entry for "key" to "value". */
+  void put(byte[] key, byte[] value);
 
-  protected byte[] dbShuffleKey(String shuffleKey) {
-    return (SHUFFLE_KEY_PREFIX + ";" + shuffleKey).getBytes(StandardCharsets.UTF_8);
-  }
+  /** Set the DB entry for "key" to "value". Support Sync option */
+  void put(byte[] key, byte[] value, boolean sync);
 
-  protected String parseDbShuffleKey(String s) {
-    if (!s.startsWith(SHUFFLE_KEY_PREFIX)) {
-      throw new IllegalArgumentException("Expected a string starting with " + SHUFFLE_KEY_PREFIX);
-    }
-    return s.substring(SHUFFLE_KEY_PREFIX.length() + 1);
-  }
+  /**
+   * Get which returns a new byte array storing the value associated with the specified input key if
+   * any.
+   */
+  byte[] get(byte[] key);
+
+  /** Delete the DB entry (if any) for "key". */
+  void delete(byte[] key);
+
+  /** Return an iterator over the contents of the DB. */
+  DBIterator iterator();
 }
diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/ShuffleRecoverHelper.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/DBBackend.java
similarity index 54%
copy from worker/src/main/java/org/apache/celeborn/service/deploy/worker/ShuffleRecoverHelper.java
copy to worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/DBBackend.java
index df1147993..0aefb4a7e 100644
--- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/ShuffleRecoverHelper.java
+++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/DBBackend.java
@@ -15,22 +15,31 @@
  * limitations under the License.
  */
 
-package org.apache.celeborn.service.deploy.worker;
+package org.apache.celeborn.service.deploy.worker.shuffledb;
 
-import java.nio.charset.StandardCharsets;
+import java.util.Locale;
 
-public abstract class ShuffleRecoverHelper {
-  protected String SHUFFLE_KEY_PREFIX = "SHUFFLE-KEY";
-  protected LevelDBProvider.StoreVersion CURRENT_VERSION = new LevelDBProvider.StoreVersion(1, 0);
+/**
+ * The enum `DBBackend` use to specify a disk-based store used in shuffle service local db. Support
+ * the use of LevelDB and RocksDB.
+ *
+ * <p>Note: code copied from Apache Spark.
+ */
+public enum DBBackend {
+  LEVELDB(".ldb"),
+  ROCKSDB(".rdb");
+
+  private final String fileSuffix;
+
+  DBBackend(String fileSuffix) {
+    this.fileSuffix = fileSuffix;
+  }
 
-  protected byte[] dbShuffleKey(String shuffleKey) {
-    return (SHUFFLE_KEY_PREFIX + ";" + shuffleKey).getBytes(StandardCharsets.UTF_8);
+  public String fileName(String prefix) {
+    return prefix + fileSuffix;
   }
 
-  protected String parseDbShuffleKey(String s) {
-    if (!s.startsWith(SHUFFLE_KEY_PREFIX)) {
-      throw new IllegalArgumentException("Expected a string starting with " + SHUFFLE_KEY_PREFIX);
-    }
-    return s.substring(SHUFFLE_KEY_PREFIX.length() + 1);
+  public static DBBackend byName(String value) {
+    return DBBackend.valueOf(value.toUpperCase(Locale.ROOT));
   }
 }
diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/ShuffleRecoverHelper.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/DBIterator.java
similarity index 54%
copy from worker/src/main/java/org/apache/celeborn/service/deploy/worker/ShuffleRecoverHelper.java
copy to worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/DBIterator.java
index df1147993..351b773df 100644
--- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/ShuffleRecoverHelper.java
+++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/DBIterator.java
@@ -15,22 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.celeborn.service.deploy.worker;
+package org.apache.celeborn.service.deploy.worker.shuffledb;
 
-import java.nio.charset.StandardCharsets;
+import java.io.Closeable;
+import java.util.Iterator;
+import java.util.Map;
 
-public abstract class ShuffleRecoverHelper {
-  protected String SHUFFLE_KEY_PREFIX = "SHUFFLE-KEY";
-  protected LevelDBProvider.StoreVersion CURRENT_VERSION = new LevelDBProvider.StoreVersion(1, 0);
+/** Note: code copied from Apache Spark. */
+public interface DBIterator extends Iterator<Map.Entry<byte[], byte[]>>, Closeable {
 
-  protected byte[] dbShuffleKey(String shuffleKey) {
-    return (SHUFFLE_KEY_PREFIX + ";" + shuffleKey).getBytes(StandardCharsets.UTF_8);
-  }
+  /** Position at the first entry in the source whose `key` is at target. */
+  void seek(byte[] key);
 
-  protected String parseDbShuffleKey(String s) {
-    if (!s.startsWith(SHUFFLE_KEY_PREFIX)) {
-      throw new IllegalArgumentException("Expected a string starting with " + SHUFFLE_KEY_PREFIX);
-    }
-    return s.substring(SHUFFLE_KEY_PREFIX.length() + 1);
+  default void remove() {
+    throw new UnsupportedOperationException();
   }
 }
diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/ShuffleRecoverHelper.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/DBProvider.java
similarity index 50%
copy from worker/src/main/java/org/apache/celeborn/service/deploy/worker/ShuffleRecoverHelper.java
copy to worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/DBProvider.java
index df1147993..8223e6a77 100644
--- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/ShuffleRecoverHelper.java
+++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/DBProvider.java
@@ -15,22 +15,27 @@
  * limitations under the License.
  */
 
-package org.apache.celeborn.service.deploy.worker;
+package org.apache.celeborn.service.deploy.worker.shuffledb;
 
-import java.nio.charset.StandardCharsets;
+import java.io.File;
+import java.io.IOException;
 
-public abstract class ShuffleRecoverHelper {
-  protected String SHUFFLE_KEY_PREFIX = "SHUFFLE-KEY";
-  protected LevelDBProvider.StoreVersion CURRENT_VERSION = new LevelDBProvider.StoreVersion(1, 0);
-
-  protected byte[] dbShuffleKey(String shuffleKey) {
-    return (SHUFFLE_KEY_PREFIX + ";" + shuffleKey).getBytes(StandardCharsets.UTF_8);
-  }
-
-  protected String parseDbShuffleKey(String s) {
-    if (!s.startsWith(SHUFFLE_KEY_PREFIX)) {
-      throw new IllegalArgumentException("Expected a string starting with " + SHUFFLE_KEY_PREFIX);
+/** Note: code copied from Apache Spark. */
+public class DBProvider {
+  public static DB initDB(DBBackend dbBackend, File dbFile, StoreVersion version)
+      throws IOException {
+    if (dbFile != null) {
+      switch (dbBackend) {
+        case LEVELDB:
+          org.iq80.leveldb.DB levelDB = LevelDBProvider.initLevelDB(dbFile, version);
+          return levelDB != null ? new LevelDB(levelDB) : null;
+        case ROCKSDB:
+          org.rocksdb.RocksDB rocksDB = RocksDBProvider.initRockDB(dbFile, version);
+          return rocksDB != null ? new RocksDB(rocksDB) : null;
+        default:
+          throw new IllegalArgumentException("Unsupported DBBackend: " + dbBackend);
+      }
     }
-    return s.substring(SHUFFLE_KEY_PREFIX.length() + 1);
+    return null;
   }
 }
diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/LevelDB.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/LevelDB.java
new file mode 100644
index 000000000..2dfeadb46
--- /dev/null
+++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/LevelDB.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.worker.shuffledb;
+
+import java.io.IOException;
+
+import org.iq80.leveldb.WriteOptions;
+
+/** Note: code copied from Apache Spark. */
+public class LevelDB implements DB {
+  private final org.iq80.leveldb.DB db;
+  private final WriteOptions SYNC_WRITE_OPTIONS = new WriteOptions().sync(true);
+
+  public LevelDB(org.iq80.leveldb.DB db) {
+    this.db = db;
+  }
+
+  @Override
+  public void put(byte[] key, byte[] value) {
+    db.put(key, value);
+  }
+
+  @Override
+  public void put(byte[] key, byte[] value, boolean sync) {
+    if (sync) {
+      db.put(key, value, SYNC_WRITE_OPTIONS);
+    } else {
+      db.put(key, value);
+    }
+  }
+
+  @Override
+  public byte[] get(byte[] key) {
+    return db.get(key);
+  }
+
+  @Override
+  public void delete(byte[] key) {
+    db.delete(key);
+  }
+
+  @Override
+  public void close() throws IOException {
+    db.close();
+  }
+
+  @Override
+  public DBIterator iterator() {
+    return new LevelDBIterator(db.iterator());
+  }
+}
diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/LevelDBIterator.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/LevelDBIterator.java
new file mode 100644
index 000000000..c96b74900
--- /dev/null
+++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/LevelDBIterator.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.worker.shuffledb;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+import com.google.common.base.Throwables;
+
+/** Note: code copied from Apache Spark. */
+public class LevelDBIterator implements DBIterator {
+
+  private final org.iq80.leveldb.DBIterator it;
+
+  private boolean checkedNext;
+
+  private boolean closed;
+
+  private Map.Entry<byte[], byte[]> next;
+
+  public LevelDBIterator(org.iq80.leveldb.DBIterator it) {
+    this.it = it;
+  }
+
+  @Override
+  public boolean hasNext() {
+    if (!checkedNext && !closed) {
+      next = loadNext();
+      checkedNext = true;
+    }
+    if (!closed && next == null) {
+      try {
+        close();
+      } catch (IOException ioe) {
+        throw Throwables.propagate(ioe);
+      }
+    }
+    return next != null;
+  }
+
+  @Override
+  public Map.Entry<byte[], byte[]> next() {
+    if (!hasNext()) {
+      throw new NoSuchElementException();
+    }
+    checkedNext = false;
+    Map.Entry<byte[], byte[]> ret = next;
+    next = null;
+    return ret;
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (!closed) {
+      it.close();
+      closed = true;
+      next = null;
+    }
+  }
+
+  @Override
+  public void seek(byte[] key) {
+    it.seek(key);
+  }
+
+  private Map.Entry<byte[], byte[]> loadNext() {
+    boolean hasNext = it.hasNext();
+    if (!hasNext) {
+      return null;
+    }
+    return it.next();
+  }
+}
diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/LevelDBProvider.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/LevelDBProvider.java
similarity index 78%
rename from worker/src/main/java/org/apache/celeborn/service/deploy/worker/LevelDBProvider.java
rename to worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/LevelDBProvider.java
index 8da381e2a..27734dca5 100644
--- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/LevelDBProvider.java
+++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/LevelDBProvider.java
@@ -15,11 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.celeborn.service.deploy.worker;
+package org.apache.celeborn.service.deploy.worker.shuffledb;
 
 import java.io.File;
 import java.io.IOException;
-import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 
 import org.fusesource.leveldbjni.JniDBFactory;
@@ -34,13 +33,14 @@ import org.apache.celeborn.common.util.PbSerDeUtils;
 /**
  * LevelDB utility class available in the network package.
  *
- * <p>Note: code copied from Apache Spark's LevelDBProvider.
+ * <p>Note: code copied from Apache Spark.
  */
 public class LevelDBProvider {
   private static final Logger logger = LoggerFactory.getLogger(LevelDBProvider.class);
 
-  public static DB initLevelDB(File dbFile, StoreVersion version) throws IOException {
-    DB tmpDb = null;
+  public static org.iq80.leveldb.DB initLevelDB(File dbFile, StoreVersion version)
+      throws IOException {
+    org.iq80.leveldb.DB tmpDb = null;
     if (dbFile != null) {
       Options options = new Options();
       options.createIfMissing(false);
@@ -49,12 +49,12 @@ public class LevelDBProvider {
         tmpDb = JniDBFactory.factory.open(dbFile, options);
       } catch (NativeDB.DBException e) {
         if (e.isNotFound() || e.getMessage().contains(" does not exist ")) {
-          logger.info("Creating recover database at " + dbFile);
+          logger.info("Creating state database at " + dbFile);
           options.createIfMissing(true);
           try {
             tmpDb = JniDBFactory.factory.open(dbFile, options);
           } catch (NativeDB.DBException dbExc) {
-            throw new IOException("Unable to create recover store", dbExc);
+            throw new IOException("Unable to create state store", dbExc);
           }
         } else {
           // the leveldb file seems to be corrupt somehow.  Lets just blow it away and create a new
@@ -78,7 +78,7 @@ public class LevelDBProvider {
           try {
             tmpDb = JniDBFactory.factory.open(dbFile, options);
           } catch (NativeDB.DBException dbExc) {
-            throw new IOException("Unable to create recover store", dbExc);
+            throw new IOException("Unable to create state store", dbExc);
           }
         }
       }
@@ -124,34 +124,4 @@ public class LevelDBProvider {
   public static void storeVersion(DB db, StoreVersion version) throws IOException {
     db.put(StoreVersion.KEY, PbSerDeUtils.toPbStoreVersion(version.major, version.minor));
   }
-
-  public static class StoreVersion {
-
-    static final byte[] KEY = "StoreVersion".getBytes(StandardCharsets.UTF_8);
-
-    public final int major;
-    public final int minor;
-
-    public StoreVersion(int major, int minor) {
-      this.major = major;
-      this.minor = minor;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) return true;
-      if (o == null || getClass() != o.getClass()) return false;
-
-      StoreVersion that = (StoreVersion) o;
-
-      return major == that.major && minor == that.minor;
-    }
-
-    @Override
-    public int hashCode() {
-      int result = major;
-      result = 31 * result + minor;
-      return result;
-    }
-  }
 }
diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/RocksDB.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/RocksDB.java
new file mode 100644
index 000000000..df534d554
--- /dev/null
+++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/RocksDB.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.worker.shuffledb;
+
+import java.io.IOException;
+
+import com.google.common.base.Throwables;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteOptions;
+
+/**
+ * RocksDB implementation of the local KV storage used to persist the shuffle state.
+ *
+ * <p>Note: code copied from Apache Spark.
+ */
+public class RocksDB implements DB {
+  private final org.rocksdb.RocksDB db;
+  private final WriteOptions SYNC_WRITE_OPTIONS = new WriteOptions().setSync(true);
+
+  public RocksDB(org.rocksdb.RocksDB db) {
+    this.db = db;
+  }
+
+  @Override
+  public void put(byte[] key, byte[] value) {
+    try {
+      db.put(key, value);
+    } catch (RocksDBException e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
+  @Override
+  public void put(byte[] key, byte[] value, boolean sync) {
+    try {
+      if (sync) {
+        db.put(SYNC_WRITE_OPTIONS, key, value);
+      } else {
+        db.put(key, value);
+      }
+    } catch (RocksDBException e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
+  @Override
+  public byte[] get(byte[] key) {
+    try {
+      return db.get(key);
+    } catch (RocksDBException e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
+  @Override
+  public void delete(byte[] key) {
+    try {
+      db.delete(key);
+    } catch (RocksDBException e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
+  @Override
+  public DBIterator iterator() {
+    return new RocksDBIterator(db.newIterator());
+  }
+
+  @Override
+  public void close() throws IOException {
+    db.close();
+  }
+}
diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/RocksDBIterator.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/RocksDBIterator.java
new file mode 100644
index 000000000..3e67b9e27
--- /dev/null
+++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/RocksDBIterator.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.worker.shuffledb;
+
+import java.io.IOException;
+import java.util.AbstractMap;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+import com.google.common.base.Throwables;
+import org.rocksdb.RocksIterator;
+
+/**
+ * RocksDB implementation of `DBIterator`.
+ *
+ * <p>Note: code copied from Apache Spark.
+ */
+public class RocksDBIterator implements DBIterator {
+
+  private final RocksIterator it;
+
+  private boolean checkedNext;
+
+  private boolean closed;
+
+  private Map.Entry<byte[], byte[]> next;
+
+  public RocksDBIterator(RocksIterator it) {
+    this.it = it;
+  }
+
+  @Override
+  public boolean hasNext() {
+    if (!checkedNext && !closed) {
+      next = loadNext();
+      checkedNext = true;
+    }
+    if (!closed && next == null) {
+      try {
+        close();
+      } catch (IOException ioe) {
+        throw Throwables.propagate(ioe);
+      }
+    }
+    return next != null;
+  }
+
+  @Override
+  public Map.Entry<byte[], byte[]> next() {
+    if (!hasNext()) {
+      throw new NoSuchElementException();
+    }
+    checkedNext = false;
+    Map.Entry<byte[], byte[]> ret = next;
+    next = null;
+    return ret;
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (!closed) {
+      it.close();
+      closed = true;
+      next = null;
+    }
+  }
+
+  @Override
+  public void seek(byte[] key) {
+    it.seek(key);
+  }
+
+  private Map.Entry<byte[], byte[]> loadNext() {
+    if (it.isValid()) {
+      Map.Entry<byte[], byte[]> nextEntry = new AbstractMap.SimpleEntry<>(it.key(), it.value());
+      it.next();
+      return nextEntry;
+    }
+    return null;
+  }
+}
diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/RocksDBProvider.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/RocksDBProvider.java
new file mode 100644
index 000000000..13d65a86e
--- /dev/null
+++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/RocksDBProvider.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.worker.shuffledb;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Objects;
+
+import org.rocksdb.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.common.util.PbSerDeUtils;
+
+/**
+ * RocksDB utility class available in the network package.
+ *
+ * <p>Note: code copied from Apache Spark.
+ */
+public class RocksDBProvider {
+
+  static {
+    org.rocksdb.RocksDB.loadLibrary();
+  }
+
+  private static final Logger logger = LoggerFactory.getLogger(RocksDBProvider.class);
+
+  public static org.rocksdb.RocksDB initRockDB(File dbFile, StoreVersion version)
+      throws IOException {
+    org.rocksdb.RocksDB tmpDb = null;
+    if (dbFile != null) {
+      BloomFilter fullFilter = new BloomFilter(10.0D /* BloomFilter.DEFAULT_BITS_PER_KEY */, false);
+      BlockBasedTableConfig tableFormatConfig =
+          new BlockBasedTableConfig()
+              .setFilterPolicy(fullFilter)
+              .setEnableIndexCompression(false)
+              .setIndexBlockRestartInterval(8)
+              .setFormatVersion(5);
+
+      Options dbOptions = new Options();
+      RocksDBLogger rocksDBLogger = new RocksDBLogger(dbOptions);
+
+      dbOptions.setCreateIfMissing(false);
+      dbOptions.setBottommostCompressionType(CompressionType.ZSTD_COMPRESSION);
+      dbOptions.setCompressionType(CompressionType.LZ4_COMPRESSION);
+      dbOptions.setTableFormatConfig(tableFormatConfig);
+      dbOptions.setLogger(rocksDBLogger);
+
+      try {
+        tmpDb = org.rocksdb.RocksDB.open(dbOptions, dbFile.toString());
+      } catch (RocksDBException e) {
+        if (e.getStatus().getCode() == Status.Code.NotFound) {
+          logger.info("Creating state database at " + dbFile);
+          dbOptions.setCreateIfMissing(true);
+          try {
+            tmpDb = org.rocksdb.RocksDB.open(dbOptions, dbFile.toString());
+          } catch (RocksDBException dbExc) {
+            throw new IOException("Unable to create state store", dbExc);
+          }
+        } else {
+          // the RocksDB file seems to be corrupt somehow.  Let's just blow it away and create
+          // a new one, so we can keep processing new apps
+          logger.error(
+              "error opening rocksdb file {}. Creating new file, will not be able to "
+                  + "recover state for existing applications",
+              dbFile,
+              e);
+          if (dbFile.isDirectory()) {
+            for (File f : Objects.requireNonNull(dbFile.listFiles())) {
+              if (!f.delete()) {
+                logger.warn("error deleting {}", f.getPath());
+              }
+            }
+          }
+          if (!dbFile.delete()) {
+            logger.warn("error deleting {}", dbFile.getPath());
+          }
+          dbOptions.setCreateIfMissing(true);
+          try {
+            tmpDb = org.rocksdb.RocksDB.open(dbOptions, dbFile.toString());
+          } catch (RocksDBException dbExc) {
+            throw new IOException("Unable to create state store", dbExc);
+          }
+        }
+      }
+      try {
+        // if there is a version mismatch, we throw an exception, which means the service
+        // is unusable
+        checkVersion(tmpDb, version);
+      } catch (RocksDBException e) {
+        throw new IOException(e.getMessage(), e);
+      }
+    }
+    return tmpDb;
+  }
+
+  private static class RocksDBLogger extends org.rocksdb.Logger {
+    private static final Logger LOG = LoggerFactory.getLogger(RocksDBLogger.class);
+
+    RocksDBLogger(Options options) {
+      super(options);
+    }
+
+    @Override
+    protected void log(InfoLogLevel infoLogLevel, String message) {
+      if (infoLogLevel == InfoLogLevel.INFO_LEVEL) {
+        LOG.info(message);
+      }
+    }
+  }
+
+  /**
+   * Simple major.minor versioning scheme. Any incompatible changes should be across major versions.
+   * Minor version differences are allowed -- meaning we should be able to read dbs that are either
+   * earlier *or* later on the minor version.
+   */
+  public static void checkVersion(org.rocksdb.RocksDB db, StoreVersion newversion)
+      throws IOException, RocksDBException {
+    byte[] bytes = db.get(StoreVersion.KEY);
+    if (bytes == null) {
+      storeVersion(db, newversion);
+    } else {
+      ArrayList<Integer> versions = PbSerDeUtils.fromPbStoreVersion(bytes);
+      StoreVersion version = new StoreVersion(versions.get(0), versions.get(1));
+      if (version.major != newversion.major) {
+        throw new IOException(
+            "cannot read state DB with version "
+                + version
+                + ", incompatible "
+                + "with current version "
+                + newversion);
+      }
+      storeVersion(db, newversion);
+    }
+  }
+
+  public static void storeVersion(org.rocksdb.RocksDB db, StoreVersion version)
+      throws IOException, RocksDBException {
+    db.put(StoreVersion.KEY, PbSerDeUtils.toPbStoreVersion(version.major, version.minor));
+  }
+}
diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/ShuffleRecoverHelper.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/StoreVersion.java
similarity index 51%
copy from worker/src/main/java/org/apache/celeborn/service/deploy/worker/ShuffleRecoverHelper.java
copy to worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/StoreVersion.java
index df1147993..e77cff970 100644
--- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/ShuffleRecoverHelper.java
+++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/StoreVersion.java
@@ -15,22 +15,41 @@
  * limitations under the License.
  */
 
-package org.apache.celeborn.service.deploy.worker;
+package org.apache.celeborn.service.deploy.worker.shuffledb;
 
 import java.nio.charset.StandardCharsets;
 
-public abstract class ShuffleRecoverHelper {
-  protected String SHUFFLE_KEY_PREFIX = "SHUFFLE-KEY";
-  protected LevelDBProvider.StoreVersion CURRENT_VERSION = new LevelDBProvider.StoreVersion(1, 0);
+/**
+ * Used to identify the version of data stored in local shuffle state DB.
+ *
+ * <p>Note: code copied from Apache Spark.
+ */
+public class StoreVersion {
+
+  public static final byte[] KEY = "StoreVersion".getBytes(StandardCharsets.UTF_8);
+
+  public final int major;
+  public final int minor;
+
+  public StoreVersion(int major, int minor) {
+    this.major = major;
+    this.minor = minor;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    StoreVersion that = (StoreVersion) o;
 
-  protected byte[] dbShuffleKey(String shuffleKey) {
-    return (SHUFFLE_KEY_PREFIX + ";" + shuffleKey).getBytes(StandardCharsets.UTF_8);
+    return major == that.major && minor == that.minor;
   }
 
-  protected String parseDbShuffleKey(String s) {
-    if (!s.startsWith(SHUFFLE_KEY_PREFIX)) {
-      throw new IllegalArgumentException("Expected a string starting with " + SHUFFLE_KEY_PREFIX);
-    }
-    return s.substring(SHUFFLE_KEY_PREFIX.length() + 1);
+  @Override
+  public int hashCode() {
+    int result = major;
+    result = 31 * result + minor;
+    return result;
   }
 }
diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
index ed1f3f130..c729c8402 100644
--- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
+++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
@@ -41,8 +41,6 @@ import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
-import org.iq80.leveldb.DB;
-import org.iq80.leveldb.DBIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,17 +51,20 @@ import org.apache.celeborn.common.metrics.source.AbstractSource;
 import org.apache.celeborn.common.unsafe.Platform;
 import org.apache.celeborn.common.util.*;
 import org.apache.celeborn.common.util.ShuffleBlockInfoUtils.ShuffleBlockInfo;
-import org.apache.celeborn.service.deploy.worker.LevelDBProvider;
 import org.apache.celeborn.service.deploy.worker.ShuffleRecoverHelper;
 import org.apache.celeborn.service.deploy.worker.WorkerSource;
 import org.apache.celeborn.service.deploy.worker.memory.MemoryManager;
+import org.apache.celeborn.service.deploy.worker.shuffledb.DB;
+import org.apache.celeborn.service.deploy.worker.shuffledb.DBBackend;
+import org.apache.celeborn.service.deploy.worker.shuffledb.DBIterator;
+import org.apache.celeborn.service.deploy.worker.shuffledb.DBProvider;
+import org.apache.celeborn.service.deploy.worker.shuffledb.StoreVersion;
 
 public class PartitionFilesSorter extends ShuffleRecoverHelper {
   private static final Logger logger = LoggerFactory.getLogger(PartitionFilesSorter.class);
 
-  private static final LevelDBProvider.StoreVersion CURRENT_VERSION =
-      new LevelDBProvider.StoreVersion(1, 0);
-  private static final String RECOVERY_SORTED_FILES_FILE_NAME = "sortedFiles.ldb";
+  private static final StoreVersion CURRENT_VERSION = new StoreVersion(1, 0);
+  private static final String RECOVERY_SORTED_FILES_FILE_NAME_PREFIX = "sortedFiles";
   private File recoverFile;
   private volatile boolean shutdown = false;
   private final ConcurrentHashMap<String, Set<String>> sortedShuffleFiles =
@@ -104,11 +105,14 @@ public class PartitionFilesSorter extends ShuffleRecoverHelper {
     if (gracefulShutdown) {
       try {
         String recoverPath = conf.workerGracefulShutdownRecoverPath();
-        this.recoverFile = new File(recoverPath, RECOVERY_SORTED_FILES_FILE_NAME);
-        this.sortedFilesDb = LevelDBProvider.initLevelDB(recoverFile, CURRENT_VERSION);
+        DBBackend dbBackend = DBBackend.byName(conf.workerGracefulShutdownRecoverDbBackend());
+        String recoverySortedFilesFileName =
+            dbBackend.fileName(RECOVERY_SORTED_FILES_FILE_NAME_PREFIX);
+        this.recoverFile = new File(recoverPath, recoverySortedFilesFileName);
+        this.sortedFilesDb = DBProvider.initDB(dbBackend, recoverFile, CURRENT_VERSION);
         reloadAndCleanSortedShuffleFiles(this.sortedFilesDb);
       } catch (Exception e) {
-        logger.error("Failed to reload LevelDB for sorted shuffle files from: " + recoverFile, e);
+        logger.error("Failed to reload DB for sorted shuffle files from: " + recoverFile, e);
         this.sortedFilesDb = null;
       }
     } else {
@@ -260,7 +264,7 @@ public class PartitionFilesSorter extends ShuffleRecoverHelper {
           updateSortedShuffleFilesInDB();
           sortedFilesDb.close();
         } catch (IOException e) {
-          logger.error("Store recover data to LevelDB failed.", e);
+          logger.error("Store recover data to DB failed.", e);
         }
       }
       long end = System.currentTimeMillis();
@@ -273,7 +277,7 @@ public class PartitionFilesSorter extends ShuffleRecoverHelper {
           sortedFilesDb.close();
           recoverFile.delete();
         } catch (IOException e) {
-          logger.error("Clean LevelDB failed.", e);
+          logger.error("Clean DB failed.", e);
         }
       }
     }
diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index 64538f1fc..4aaedd2a8 100644
--- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -31,7 +31,6 @@ import scala.concurrent.duration._
 import io.netty.buffer.PooledByteBufAllocator
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.fs.permission.FsPermission
-import org.iq80.leveldb.{DB, WriteOptions}
 
 import org.apache.celeborn.common.CelebornConf
 import org.apache.celeborn.common.exception.CelebornException
@@ -45,6 +44,7 @@ import org.apache.celeborn.common.quota.ResourceConsumption
 import org.apache.celeborn.common.util.{CelebornExitKind, CelebornHadoopUtils, JavaUtils, PbSerDeUtils, ThreadUtils, Utils}
 import org.apache.celeborn.service.deploy.worker._
 import org.apache.celeborn.service.deploy.worker.memory.MemoryManager.MemoryPressureListener
+import org.apache.celeborn.service.deploy.worker.shuffledb.{DB, DBBackend, DBProvider}
 import org.apache.celeborn.service.deploy.worker.storage.StorageManager.hadoopFs
 
 final private[worker] class StorageManager(conf: CelebornConf, workerSource: AbstractSource)
@@ -177,7 +177,8 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
   // shuffleKey -> (fileName -> file info)
   private val fileInfos =
     JavaUtils.newConcurrentHashMap[String, ConcurrentHashMap[String, FileInfo]]()
-  private val RECOVERY_FILE_NAME = "recovery.ldb"
+  private val RECOVERY_FILE_NAME_PREFIX = "recovery"
+  private var RECOVERY_FILE_NAME = "recovery.ldb"
   private var db: DB = null
   private var saveCommittedFileInfosExecutor: ScheduledExecutorService = _
   private val saveCommittedFileInfoBySyncMethod =
@@ -190,8 +191,10 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
   val workerGracefulShutdown = conf.workerGracefulShutdown
   if (workerGracefulShutdown) {
     try {
+      val dbBackend = DBBackend.byName(conf.workerGracefulShutdownRecoverDbBackend)
+      RECOVERY_FILE_NAME = dbBackend.fileName(RECOVERY_FILE_NAME_PREFIX)
       val recoverFile = new File(conf.workerGracefulShutdownRecoverPath, RECOVERY_FILE_NAME)
-      this.db = LevelDBProvider.initLevelDB(recoverFile, CURRENT_VERSION)
+      this.db = DBProvider.initDB(dbBackend, recoverFile, CURRENT_VERSION)
       reloadAndCleanFileInfos(this.db)
     } catch {
       case e: Exception =>
@@ -212,7 +215,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
               db.put(
                 dbShuffleKey(shuffleKey),
                 PbSerDeUtils.toPbFileInfoMap(files),
-                new WriteOptions().sync(saveCommittedFileInfoBySyncMethod))
+                saveCommittedFileInfoBySyncMethod)
             }
           }
         }
@@ -260,16 +263,16 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
   def saveAllCommittedFileInfosToDB(): Unit = {
     // save committed fileinfo to DB should be done within the time of saveCommittedFileInfoInterval
     saveCommittedFileInfosExecutor.awaitTermination(saveCommittedFileInfoInterval, MILLISECONDS)
-    // graceful shutdown might be timed out, persist all committed fileinfos to levelDB
+    // graceful shutdown might be timed out, persist all committed fileinfos to DB
     // final flush write through
     committedFileInfos.asScala.foreach { case (shuffleKey, files) =>
       try {
+        // K8s container might gone
         db.put(
           dbShuffleKey(shuffleKey),
           PbSerDeUtils.toPbFileInfoMap(files),
-          // K8s container might gone
-          new WriteOptions().sync(true))
-        logDebug(s"Update FileInfos into DB: ${shuffleKey} -> ${files}")
+          true)
+        logDebug(s"Update FileInfos into DB: $shuffleKey -> $files")
       } catch {
         case exception: Exception =>
           logError(s"Update FileInfos into DB: ${shuffleKey} failed.", exception)
@@ -696,7 +699,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
           db.close()
         } catch {
           case exception: Exception =>
-            logError("Store recover data to LevelDB failed.", exception)
+            logError("Store recover data to DB failed.", exception)
         }
       } else {
         if (db != null) {