You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by ch...@apache.org on 2023/09/19 01:20:42 UTC
[incubator-celeborn] branch main updated: [CELEBORN-977] Support RocksDB as recover DB backend
This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new beed2a85b [CELEBORN-977] Support RocksDB as recover DB backend
beed2a85b is described below
commit beed2a85b04db41b7c36384798848e187a088f8b
Author: sychen <sy...@ctrip.com>
AuthorDate: Tue Sep 19 09:20:33 2023 +0800
[CELEBORN-977] Support RocksDB as recover DB backend
### What changes were proposed in this pull request?
### Why are the changes needed?
LevelDB does not support mac arm version.
```java
java.lang.UnsatisfiedLinkError: Could not load library. Reasons: [no leveldbjni64-1.8 in java.library.path, no leveldbjni-1.8 in java.library.path, no leveldbjni in java.library.path, /private/var/folders/tc/r2n_8g6j4731h7clfqwntg880000gn/T/libleveldbjni-64-1-4616234670453989010.8: dlopen(/private/var/folders/tc/r2n_8g6j4731h7clfqwntg880000gn/T/libleveldbjni-64-1-4616234670453989010.8, 0x0001): tried: '/private/var/folders/tc/r2n_8g6j4731h7clfqwntg880000gn/T/libleveldbjni-64-1-4616234 [...]
at org.fusesource.hawtjni.runtime.Library.doLoad(Library.java:182)
at org.fusesource.hawtjni.runtime.Library.load(Library.java:140)
at org.fusesource.leveldbjni.JniDBFactory.<clinit>(JniDBFactory.java:48)
at org.apache.celeborn.service.deploy.worker.shuffledb.LevelDBProvider.initLevelDB(LevelDBProvider.java:49)
at org.apache.celeborn.service.deploy.worker.shuffledb.DBProvider.initDB(DBProvider.java:30)
at org.apache.celeborn.service.deploy.worker.storage.StorageManager.<init>(StorageManager.scala:197)
at org.apache.celeborn.service.deploy.worker.Worker.<init>(Worker.scala:109)
at org.apache.celeborn.service.deploy.worker.Worker$.main(Worker.scala:734)
at org.apache.celeborn.service.deploy.worker.Worker.main(Worker.scala)
```
The released `leveldbjni-all` for `org.fusesource.leveldbjni` does not support AArch64 Linux, we need to use `org.openlabtesting.leveldbjni`.
See https://issues.apache.org/jira/browse/HADOOP-16614
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
local test
Closes #1913 from cxzl25/CELEBORN-977.
Authored-by: sychen <sy...@ctrip.com>
Signed-off-by: Cheng Pan <ch...@apache.org>
---
LICENSE-binary | 1 +
.../org/apache/celeborn/common/CelebornConf.scala | 14 +-
dev/deps/dependencies-server | 1 +
docs/configuration/worker.md | 3 +-
pom.xml | 6 +
project/CelebornBuild.scala | 5 +-
worker/pom.xml | 4 +
.../deploy/worker/ShuffleRecoverHelper.java | 4 +-
.../DB.java} | 33 +++--
.../DBBackend.java} | 33 +++--
.../DBIterator.java} | 23 ++-
.../DBProvider.java} | 33 +++--
.../service/deploy/worker/shuffledb/LevelDB.java | 66 +++++++++
.../deploy/worker/shuffledb/LevelDBIterator.java | 89 ++++++++++++
.../worker/{ => shuffledb}/LevelDBProvider.java | 46 ++----
.../service/deploy/worker/shuffledb/RocksDB.java | 88 ++++++++++++
.../deploy/worker/shuffledb/RocksDBIterator.java | 96 +++++++++++++
.../deploy/worker/shuffledb/RocksDBProvider.java | 157 +++++++++++++++++++++
.../StoreVersion.java} | 41 ++++--
.../worker/storage/PartitionFilesSorter.java | 26 ++--
.../deploy/worker/storage/StorageManager.scala | 21 +--
21 files changed, 664 insertions(+), 126 deletions(-)
diff --git a/LICENSE-binary b/LICENSE-binary
index 8f081384d..6971f6b64 100644
--- a/LICENSE-binary
+++ b/LICENSE-binary
@@ -268,6 +268,7 @@ org.scala-lang:scala-library
org.scala-lang:scala-reflect
org.slf4j:jcl-over-slf4j
org.yaml:snakeyaml
+org.rocksdb:rocksdbjni
------------------------------------------------------------------------------------
diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 16a113b9b..429ad5ec7 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -932,6 +932,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
def workerGracefulShutdownCheckSlotsFinishedTimeoutMs: Long =
get(WORKER_CHECK_SLOTS_FINISHED_TIMEOUT)
def workerGracefulShutdownRecoverPath: String = get(WORKER_GRACEFUL_SHUTDOWN_RECOVER_PATH)
+ def workerGracefulShutdownRecoverDbBackend: String =
+ get(WORKER_GRACEFUL_SHUTDOWN_RECOVER_DB_BACKEND)
def workerGracefulShutdownPartitionSorterCloseAwaitTimeMs: Long =
get(WORKER_PARTITION_SORTER_SHUTDOWN_TIMEOUT)
def workerGracefulShutdownFlusherShutdownTimeoutMs: Long = get(WORKER_FLUSHER_SHUTDOWN_TIMEOUT)
@@ -2603,12 +2605,22 @@ object CelebornConf extends Logging {
val WORKER_GRACEFUL_SHUTDOWN_RECOVER_PATH: ConfigEntry[String] =
buildConf("celeborn.worker.graceful.shutdown.recoverPath")
.categories("worker")
- .doc("The path to store levelDB.")
+ .doc("The path to store DB.")
.version("0.2.0")
.stringConf
.transform(_.replace("<tmp>", System.getProperty("java.io.tmpdir")))
.createWithDefault(s"<tmp>/recover")
+ val WORKER_GRACEFUL_SHUTDOWN_RECOVER_DB_BACKEND: ConfigEntry[String] =
+ buildConf("celeborn.worker.graceful.shutdown.recoverDbBackend")
+ .categories("worker")
+ .doc("Specifies a disk-based store used in local db. LEVELDB or ROCKSDB.")
+ .version("0.4.0")
+ .stringConf
+ .transform(_.toUpperCase(Locale.ROOT))
+ .checkValues(Set("LEVELDB", "ROCKSDB"))
+ .createWithDefault("LEVELDB")
+
val WORKER_PARTITION_SORTER_SHUTDOWN_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.worker.graceful.shutdown.partitionSorter.shutdownTimeout")
.categories("worker")
diff --git a/dev/deps/dependencies-server b/dev/deps/dependencies-server
index d52e773db..f93601cc3 100644
--- a/dev/deps/dependencies-server
+++ b/dev/deps/dependencies-server
@@ -84,6 +84,7 @@ ratis-server/2.5.1//ratis-server-2.5.1.jar
ratis-shell/2.5.1//ratis-shell-2.5.1.jar
ratis-thirdparty-misc/1.0.4//ratis-thirdparty-misc-1.0.4.jar
reflections/0.10.2//reflections-0.10.2.jar
+rocksdbjni/8.5.3//rocksdbjni-8.5.3.jar
scala-library/2.12.15//scala-library-2.12.15.jar
scala-reflect/2.12.15//scala-reflect-2.12.15.jar
shims/0.9.32//shims-0.9.32.jar
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index 52719bf66..3833726d5 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -56,7 +56,8 @@ license: |
| celeborn.worker.graceful.shutdown.checkSlotsFinished.timeout | 480s | The wait time of waiting for the released slots to be committed or destroyed during worker graceful shutdown. | 0.2.0 |
| celeborn.worker.graceful.shutdown.enabled | false | When true, during worker shutdown, the worker will wait for all released slots to be committed or destroyed. | 0.2.0 |
| celeborn.worker.graceful.shutdown.partitionSorter.shutdownTimeout | 120s | The wait time of waiting for sorting partition files during worker graceful shutdown. | 0.2.0 |
-| celeborn.worker.graceful.shutdown.recoverPath | <tmp>/recover | The path to store levelDB. | 0.2.0 |
+| celeborn.worker.graceful.shutdown.recoverDbBackend | LEVELDB | Specifies a disk-based store used in local db. LEVELDB or ROCKSDB. | 0.4.0 |
+| celeborn.worker.graceful.shutdown.recoverPath | <tmp>/recover | The path to store DB. | 0.2.0 |
| celeborn.worker.graceful.shutdown.saveCommittedFileInfo.interval | 5s | Interval for a Celeborn worker to flush committed file infos into Level DB. | 0.3.1 |
| celeborn.worker.graceful.shutdown.saveCommittedFileInfo.sync | false | Whether to call sync method to save committed file infos into Level DB to handle OS crash. | 0.3.1 |
| celeborn.worker.graceful.shutdown.timeout | 600s | The worker's graceful shutdown timeout time. | 0.2.0 |
diff --git a/pom.xml b/pom.xml
index 2d3daa6ce..287376cf9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -92,6 +92,7 @@
<snakeyaml.version>1.33</snakeyaml.version>
<zstd-jni.version>1.5.2-1</zstd-jni.version>
<kubernetes-client.version>6.7.0</kubernetes-client.version>
+ <rocksdbjni.version>8.5.3</rocksdbjni.version>
<shading.prefix>org.apache.celeborn.shaded</shading.prefix>
@@ -441,6 +442,11 @@
<version>${kubernetes-client.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.rocksdb</groupId>
+ <artifactId>rocksdbjni</artifactId>
+ <version>${rocksdbjni.version}</version>
+ </dependency>
</dependencies>
</dependencyManagement>
diff --git a/project/CelebornBuild.scala b/project/CelebornBuild.scala
index 6aa6e872b..9188e42c3 100644
--- a/project/CelebornBuild.scala
+++ b/project/CelebornBuild.scala
@@ -53,11 +53,12 @@ object Dependencies {
val nettyVersion = "4.1.93.Final"
val ratisVersion = "2.5.1"
val roaringBitmapVersion = "0.9.32"
+ val rocksdbJniVersion = "8.5.3"
val scalatestMockitoVersion = "1.17.14"
val scalatestVersion = "3.2.16"
val slf4jVersion = "1.7.36"
val snakeyamlVersion = "1.33"
-
+
// Versions for proto
val protocVersion = "3.19.2"
val protoVersion = "3.19.2"
@@ -89,6 +90,7 @@ object Dependencies {
val ratisShell = "org.apache.ratis" % "ratis-shell" % ratisVersion excludeAll(
ExclusionRule("org.slf4j", "slf4j-simple"))
val roaringBitmap = "org.roaringbitmap" % "RoaringBitmap" % roaringBitmapVersion
+ val rocksdbJni = "org.rocksdb" % "rocksdbjni" % rocksdbJniVersion
val scalaReflect = "org.scala-lang" % "scala-reflect" % projectScalaVersion
val slf4jApi = "org.slf4j" % "slf4j-api" % slf4jVersion
val slf4jJulToSlf4j = "org.slf4j" % "jul-to-slf4j" % slf4jVersion
@@ -404,6 +406,7 @@ object CelebornWorker {
Dependencies.log4jSlf4jImpl,
Dependencies.leveldbJniAll,
Dependencies.roaringBitmap,
+ Dependencies.rocksdbJni,
Dependencies.scalatestMockito % "test"
) ++ commonUnitTestDependencies
)
diff --git a/worker/pom.xml b/worker/pom.xml
index b3d0daf86..5a8914e3f 100644
--- a/worker/pom.xml
+++ b/worker/pom.xml
@@ -76,6 +76,10 @@
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-1.2-api</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.rocksdb</groupId>
+ <artifactId>rocksdbjni</artifactId>
+ </dependency>
<dependency>
<groupId>org.apache.celeborn</groupId>
diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/ShuffleRecoverHelper.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/ShuffleRecoverHelper.java
index df1147993..fcfbc7338 100644
--- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/ShuffleRecoverHelper.java
+++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/ShuffleRecoverHelper.java
@@ -19,9 +19,11 @@ package org.apache.celeborn.service.deploy.worker;
import java.nio.charset.StandardCharsets;
+import org.apache.celeborn.service.deploy.worker.shuffledb.StoreVersion;
+
public abstract class ShuffleRecoverHelper {
protected String SHUFFLE_KEY_PREFIX = "SHUFFLE-KEY";
- protected LevelDBProvider.StoreVersion CURRENT_VERSION = new LevelDBProvider.StoreVersion(1, 0);
+ protected StoreVersion CURRENT_VERSION = new StoreVersion(1, 0);
protected byte[] dbShuffleKey(String shuffleKey) {
return (SHUFFLE_KEY_PREFIX + ";" + shuffleKey).getBytes(StandardCharsets.UTF_8);
diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/ShuffleRecoverHelper.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/DB.java
similarity index 53%
copy from worker/src/main/java/org/apache/celeborn/service/deploy/worker/ShuffleRecoverHelper.java
copy to worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/DB.java
index df1147993..20b275d8d 100644
--- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/ShuffleRecoverHelper.java
+++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/DB.java
@@ -15,22 +15,27 @@
* limitations under the License.
*/
-package org.apache.celeborn.service.deploy.worker;
+package org.apache.celeborn.service.deploy.worker.shuffledb;
-import java.nio.charset.StandardCharsets;
+import java.io.Closeable;
-public abstract class ShuffleRecoverHelper {
- protected String SHUFFLE_KEY_PREFIX = "SHUFFLE-KEY";
- protected LevelDBProvider.StoreVersion CURRENT_VERSION = new LevelDBProvider.StoreVersion(1, 0);
+/** Note: code copied from Apache Spark. */
+public interface DB extends Closeable {
+ /** Set the DB entry for "key" to "value". */
+ void put(byte[] key, byte[] value);
- protected byte[] dbShuffleKey(String shuffleKey) {
- return (SHUFFLE_KEY_PREFIX + ";" + shuffleKey).getBytes(StandardCharsets.UTF_8);
- }
+ /** Set the DB entry for "key" to "value". Support Sync option */
+ void put(byte[] key, byte[] value, boolean sync);
- protected String parseDbShuffleKey(String s) {
- if (!s.startsWith(SHUFFLE_KEY_PREFIX)) {
- throw new IllegalArgumentException("Expected a string starting with " + SHUFFLE_KEY_PREFIX);
- }
- return s.substring(SHUFFLE_KEY_PREFIX.length() + 1);
- }
+ /**
+ * Get which returns a new byte array storing the value associated with the specified input key if
+ * any.
+ */
+ byte[] get(byte[] key);
+
+ /** Delete the DB entry (if any) for "key". */
+ void delete(byte[] key);
+
+ /** Return an iterator over the contents of the DB. */
+ DBIterator iterator();
}
diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/ShuffleRecoverHelper.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/DBBackend.java
similarity index 54%
copy from worker/src/main/java/org/apache/celeborn/service/deploy/worker/ShuffleRecoverHelper.java
copy to worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/DBBackend.java
index df1147993..0aefb4a7e 100644
--- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/ShuffleRecoverHelper.java
+++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/DBBackend.java
@@ -15,22 +15,31 @@
* limitations under the License.
*/
-package org.apache.celeborn.service.deploy.worker;
+package org.apache.celeborn.service.deploy.worker.shuffledb;
-import java.nio.charset.StandardCharsets;
+import java.util.Locale;
-public abstract class ShuffleRecoverHelper {
- protected String SHUFFLE_KEY_PREFIX = "SHUFFLE-KEY";
- protected LevelDBProvider.StoreVersion CURRENT_VERSION = new LevelDBProvider.StoreVersion(1, 0);
+/**
+ * The enum `DBBackend` use to specify a disk-based store used in shuffle service local db. Support
+ * the use of LevelDB and RocksDB.
+ *
+ * <p>Note: code copied from Apache Spark.
+ */
+public enum DBBackend {
+ LEVELDB(".ldb"),
+ ROCKSDB(".rdb");
+
+ private final String fileSuffix;
+
+ DBBackend(String fileSuffix) {
+ this.fileSuffix = fileSuffix;
+ }
- protected byte[] dbShuffleKey(String shuffleKey) {
- return (SHUFFLE_KEY_PREFIX + ";" + shuffleKey).getBytes(StandardCharsets.UTF_8);
+ public String fileName(String prefix) {
+ return prefix + fileSuffix;
}
- protected String parseDbShuffleKey(String s) {
- if (!s.startsWith(SHUFFLE_KEY_PREFIX)) {
- throw new IllegalArgumentException("Expected a string starting with " + SHUFFLE_KEY_PREFIX);
- }
- return s.substring(SHUFFLE_KEY_PREFIX.length() + 1);
+ public static DBBackend byName(String value) {
+ return DBBackend.valueOf(value.toUpperCase(Locale.ROOT));
}
}
diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/ShuffleRecoverHelper.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/DBIterator.java
similarity index 54%
copy from worker/src/main/java/org/apache/celeborn/service/deploy/worker/ShuffleRecoverHelper.java
copy to worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/DBIterator.java
index df1147993..351b773df 100644
--- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/ShuffleRecoverHelper.java
+++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/DBIterator.java
@@ -15,22 +15,19 @@
* limitations under the License.
*/
-package org.apache.celeborn.service.deploy.worker;
+package org.apache.celeborn.service.deploy.worker.shuffledb;
-import java.nio.charset.StandardCharsets;
+import java.io.Closeable;
+import java.util.Iterator;
+import java.util.Map;
-public abstract class ShuffleRecoverHelper {
- protected String SHUFFLE_KEY_PREFIX = "SHUFFLE-KEY";
- protected LevelDBProvider.StoreVersion CURRENT_VERSION = new LevelDBProvider.StoreVersion(1, 0);
+/** Note: code copied from Apache Spark. */
+public interface DBIterator extends Iterator<Map.Entry<byte[], byte[]>>, Closeable {
- protected byte[] dbShuffleKey(String shuffleKey) {
- return (SHUFFLE_KEY_PREFIX + ";" + shuffleKey).getBytes(StandardCharsets.UTF_8);
- }
+ /** Position at the first entry in the source whose `key` is at target. */
+ void seek(byte[] key);
- protected String parseDbShuffleKey(String s) {
- if (!s.startsWith(SHUFFLE_KEY_PREFIX)) {
- throw new IllegalArgumentException("Expected a string starting with " + SHUFFLE_KEY_PREFIX);
- }
- return s.substring(SHUFFLE_KEY_PREFIX.length() + 1);
+ default void remove() {
+ throw new UnsupportedOperationException();
}
}
diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/ShuffleRecoverHelper.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/DBProvider.java
similarity index 50%
copy from worker/src/main/java/org/apache/celeborn/service/deploy/worker/ShuffleRecoverHelper.java
copy to worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/DBProvider.java
index df1147993..8223e6a77 100644
--- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/ShuffleRecoverHelper.java
+++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/DBProvider.java
@@ -15,22 +15,27 @@
* limitations under the License.
*/
-package org.apache.celeborn.service.deploy.worker;
+package org.apache.celeborn.service.deploy.worker.shuffledb;
-import java.nio.charset.StandardCharsets;
+import java.io.File;
+import java.io.IOException;
-public abstract class ShuffleRecoverHelper {
- protected String SHUFFLE_KEY_PREFIX = "SHUFFLE-KEY";
- protected LevelDBProvider.StoreVersion CURRENT_VERSION = new LevelDBProvider.StoreVersion(1, 0);
-
- protected byte[] dbShuffleKey(String shuffleKey) {
- return (SHUFFLE_KEY_PREFIX + ";" + shuffleKey).getBytes(StandardCharsets.UTF_8);
- }
-
- protected String parseDbShuffleKey(String s) {
- if (!s.startsWith(SHUFFLE_KEY_PREFIX)) {
- throw new IllegalArgumentException("Expected a string starting with " + SHUFFLE_KEY_PREFIX);
+/** Note: code copied from Apache Spark. */
+public class DBProvider {
+ public static DB initDB(DBBackend dbBackend, File dbFile, StoreVersion version)
+ throws IOException {
+ if (dbFile != null) {
+ switch (dbBackend) {
+ case LEVELDB:
+ org.iq80.leveldb.DB levelDB = LevelDBProvider.initLevelDB(dbFile, version);
+ return levelDB != null ? new LevelDB(levelDB) : null;
+ case ROCKSDB:
+ org.rocksdb.RocksDB rocksDB = RocksDBProvider.initRockDB(dbFile, version);
+ return rocksDB != null ? new RocksDB(rocksDB) : null;
+ default:
+ throw new IllegalArgumentException("Unsupported DBBackend: " + dbBackend);
+ }
}
- return s.substring(SHUFFLE_KEY_PREFIX.length() + 1);
+ return null;
}
}
diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/LevelDB.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/LevelDB.java
new file mode 100644
index 000000000..2dfeadb46
--- /dev/null
+++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/LevelDB.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.worker.shuffledb;
+
+import java.io.IOException;
+
+import org.iq80.leveldb.WriteOptions;
+
+/** Note: code copied from Apache Spark. */
+public class LevelDB implements DB {
+ private final org.iq80.leveldb.DB db;
+ private final WriteOptions SYNC_WRITE_OPTIONS = new WriteOptions().sync(true);
+
+ public LevelDB(org.iq80.leveldb.DB db) {
+ this.db = db;
+ }
+
+ @Override
+ public void put(byte[] key, byte[] value) {
+ db.put(key, value);
+ }
+
+ @Override
+ public void put(byte[] key, byte[] value, boolean sync) {
+ if (sync) {
+ db.put(key, value, SYNC_WRITE_OPTIONS);
+ } else {
+ db.put(key, value);
+ }
+ }
+
+ @Override
+ public byte[] get(byte[] key) {
+ return db.get(key);
+ }
+
+ @Override
+ public void delete(byte[] key) {
+ db.delete(key);
+ }
+
+ @Override
+ public void close() throws IOException {
+ db.close();
+ }
+
+ @Override
+ public DBIterator iterator() {
+ return new LevelDBIterator(db.iterator());
+ }
+}
diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/LevelDBIterator.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/LevelDBIterator.java
new file mode 100644
index 000000000..c96b74900
--- /dev/null
+++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/LevelDBIterator.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.worker.shuffledb;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+import com.google.common.base.Throwables;
+
+/** Note: code copied from Apache Spark. */
+public class LevelDBIterator implements DBIterator {
+
+ private final org.iq80.leveldb.DBIterator it;
+
+ private boolean checkedNext;
+
+ private boolean closed;
+
+ private Map.Entry<byte[], byte[]> next;
+
+ public LevelDBIterator(org.iq80.leveldb.DBIterator it) {
+ this.it = it;
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (!checkedNext && !closed) {
+ next = loadNext();
+ checkedNext = true;
+ }
+ if (!closed && next == null) {
+ try {
+ close();
+ } catch (IOException ioe) {
+ throw Throwables.propagate(ioe);
+ }
+ }
+ return next != null;
+ }
+
+ @Override
+ public Map.Entry<byte[], byte[]> next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ checkedNext = false;
+ Map.Entry<byte[], byte[]> ret = next;
+ next = null;
+ return ret;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (!closed) {
+ it.close();
+ closed = true;
+ next = null;
+ }
+ }
+
+ @Override
+ public void seek(byte[] key) {
+ it.seek(key);
+ }
+
+ private Map.Entry<byte[], byte[]> loadNext() {
+ boolean hasNext = it.hasNext();
+ if (!hasNext) {
+ return null;
+ }
+ return it.next();
+ }
+}
diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/LevelDBProvider.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/LevelDBProvider.java
similarity index 78%
rename from worker/src/main/java/org/apache/celeborn/service/deploy/worker/LevelDBProvider.java
rename to worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/LevelDBProvider.java
index 8da381e2a..27734dca5 100644
--- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/LevelDBProvider.java
+++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/LevelDBProvider.java
@@ -15,11 +15,10 @@
* limitations under the License.
*/
-package org.apache.celeborn.service.deploy.worker;
+package org.apache.celeborn.service.deploy.worker.shuffledb;
import java.io.File;
import java.io.IOException;
-import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import org.fusesource.leveldbjni.JniDBFactory;
@@ -34,13 +33,14 @@ import org.apache.celeborn.common.util.PbSerDeUtils;
/**
* LevelDB utility class available in the network package.
*
- * <p>Note: code copied from Apache Spark's LevelDBProvider.
+ * <p>Note: code copied from Apache Spark.
*/
public class LevelDBProvider {
private static final Logger logger = LoggerFactory.getLogger(LevelDBProvider.class);
- public static DB initLevelDB(File dbFile, StoreVersion version) throws IOException {
- DB tmpDb = null;
+ public static org.iq80.leveldb.DB initLevelDB(File dbFile, StoreVersion version)
+ throws IOException {
+ org.iq80.leveldb.DB tmpDb = null;
if (dbFile != null) {
Options options = new Options();
options.createIfMissing(false);
@@ -49,12 +49,12 @@ public class LevelDBProvider {
tmpDb = JniDBFactory.factory.open(dbFile, options);
} catch (NativeDB.DBException e) {
if (e.isNotFound() || e.getMessage().contains(" does not exist ")) {
- logger.info("Creating recover database at " + dbFile);
+ logger.info("Creating state database at " + dbFile);
options.createIfMissing(true);
try {
tmpDb = JniDBFactory.factory.open(dbFile, options);
} catch (NativeDB.DBException dbExc) {
- throw new IOException("Unable to create recover store", dbExc);
+ throw new IOException("Unable to create state store", dbExc);
}
} else {
// the leveldb file seems to be corrupt somehow. Lets just blow it away and create a new
@@ -78,7 +78,7 @@ public class LevelDBProvider {
try {
tmpDb = JniDBFactory.factory.open(dbFile, options);
} catch (NativeDB.DBException dbExc) {
- throw new IOException("Unable to create recover store", dbExc);
+ throw new IOException("Unable to create state store", dbExc);
}
}
}
@@ -124,34 +124,4 @@ public class LevelDBProvider {
public static void storeVersion(DB db, StoreVersion version) throws IOException {
db.put(StoreVersion.KEY, PbSerDeUtils.toPbStoreVersion(version.major, version.minor));
}
-
- public static class StoreVersion {
-
- static final byte[] KEY = "StoreVersion".getBytes(StandardCharsets.UTF_8);
-
- public final int major;
- public final int minor;
-
- public StoreVersion(int major, int minor) {
- this.major = major;
- this.minor = minor;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- StoreVersion that = (StoreVersion) o;
-
- return major == that.major && minor == that.minor;
- }
-
- @Override
- public int hashCode() {
- int result = major;
- result = 31 * result + minor;
- return result;
- }
- }
}
diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/RocksDB.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/RocksDB.java
new file mode 100644
index 000000000..df534d554
--- /dev/null
+++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/RocksDB.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.worker.shuffledb;
+
+import java.io.IOException;
+
+import com.google.common.base.Throwables;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteOptions;
+
+/**
+ * RocksDB implementation of the local KV storage used to persist the shuffle state.
+ *
+ * <p>Note: code copied from Apache Spark.
+ */
+public class RocksDB implements DB {
+ private final org.rocksdb.RocksDB db;
+ private final WriteOptions SYNC_WRITE_OPTIONS = new WriteOptions().setSync(true);
+
+ public RocksDB(org.rocksdb.RocksDB db) {
+ this.db = db;
+ }
+
+ @Override
+ public void put(byte[] key, byte[] value) {
+ try {
+ db.put(key, value);
+ } catch (RocksDBException e) {
+ throw Throwables.propagate(e);
+ }
+ }
+
+ @Override
+ public void put(byte[] key, byte[] value, boolean sync) {
+ try {
+ if (sync) {
+ db.put(SYNC_WRITE_OPTIONS, key, value);
+ } else {
+ db.put(key, value);
+ }
+ } catch (RocksDBException e) {
+ throw Throwables.propagate(e);
+ }
+ }
+
+ @Override
+ public byte[] get(byte[] key) {
+ try {
+ return db.get(key);
+ } catch (RocksDBException e) {
+ throw Throwables.propagate(e);
+ }
+ }
+
+ @Override
+ public void delete(byte[] key) {
+ try {
+ db.delete(key);
+ } catch (RocksDBException e) {
+ throw Throwables.propagate(e);
+ }
+ }
+
+ @Override
+ public DBIterator iterator() {
+ return new RocksDBIterator(db.newIterator());
+ }
+
+ @Override
+ public void close() throws IOException {
+ db.close();
+ }
+}
diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/RocksDBIterator.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/RocksDBIterator.java
new file mode 100644
index 000000000..3e67b9e27
--- /dev/null
+++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/RocksDBIterator.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.worker.shuffledb;
+
+import java.io.IOException;
+import java.util.AbstractMap;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+import com.google.common.base.Throwables;
+import org.rocksdb.RocksIterator;
+
+/**
+ * RocksDB implementation of `DBIterator`.
+ *
+ * <p>Note: code copied from Apache Spark.
+ */
+public class RocksDBIterator implements DBIterator {
+
+ private final RocksIterator it;
+
+ private boolean checkedNext;
+
+ private boolean closed;
+
+ private Map.Entry<byte[], byte[]> next;
+
+ public RocksDBIterator(RocksIterator it) {
+ this.it = it;
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (!checkedNext && !closed) {
+ next = loadNext();
+ checkedNext = true;
+ }
+ if (!closed && next == null) {
+ try {
+ close();
+ } catch (IOException ioe) {
+ throw Throwables.propagate(ioe);
+ }
+ }
+ return next != null;
+ }
+
+ @Override
+ public Map.Entry<byte[], byte[]> next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ checkedNext = false;
+ Map.Entry<byte[], byte[]> ret = next;
+ next = null;
+ return ret;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (!closed) {
+ it.close();
+ closed = true;
+ next = null;
+ }
+ }
+
+ @Override
+ public void seek(byte[] key) {
+ it.seek(key);
+ }
+
+ private Map.Entry<byte[], byte[]> loadNext() {
+ if (it.isValid()) {
+ Map.Entry<byte[], byte[]> nextEntry = new AbstractMap.SimpleEntry<>(it.key(), it.value());
+ it.next();
+ return nextEntry;
+ }
+ return null;
+ }
+}
diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/RocksDBProvider.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/RocksDBProvider.java
new file mode 100644
index 000000000..13d65a86e
--- /dev/null
+++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/RocksDBProvider.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.worker.shuffledb;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Objects;
+
+import org.rocksdb.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.common.util.PbSerDeUtils;
+
+/**
+ * RocksDB utility class available in the network package.
+ *
+ * <p>Note: code copied from Apache Spark.
+ */
+public class RocksDBProvider {
+
+ static {
+ org.rocksdb.RocksDB.loadLibrary();
+ }
+
+ private static final Logger logger = LoggerFactory.getLogger(RocksDBProvider.class);
+
+ public static org.rocksdb.RocksDB initRockDB(File dbFile, StoreVersion version)
+ throws IOException {
+ org.rocksdb.RocksDB tmpDb = null;
+ if (dbFile != null) {
+ BloomFilter fullFilter = new BloomFilter(10.0D /* BloomFilter.DEFAULT_BITS_PER_KEY */, false);
+ BlockBasedTableConfig tableFormatConfig =
+ new BlockBasedTableConfig()
+ .setFilterPolicy(fullFilter)
+ .setEnableIndexCompression(false)
+ .setIndexBlockRestartInterval(8)
+ .setFormatVersion(5);
+
+ Options dbOptions = new Options();
+ RocksDBLogger rocksDBLogger = new RocksDBLogger(dbOptions);
+
+ dbOptions.setCreateIfMissing(false);
+ dbOptions.setBottommostCompressionType(CompressionType.ZSTD_COMPRESSION);
+ dbOptions.setCompressionType(CompressionType.LZ4_COMPRESSION);
+ dbOptions.setTableFormatConfig(tableFormatConfig);
+ dbOptions.setLogger(rocksDBLogger);
+
+ try {
+ tmpDb = org.rocksdb.RocksDB.open(dbOptions, dbFile.toString());
+ } catch (RocksDBException e) {
+ if (e.getStatus().getCode() == Status.Code.NotFound) {
+ logger.info("Creating state database at " + dbFile);
+ dbOptions.setCreateIfMissing(true);
+ try {
+ tmpDb = org.rocksdb.RocksDB.open(dbOptions, dbFile.toString());
+ } catch (RocksDBException dbExc) {
+ throw new IOException("Unable to create state store", dbExc);
+ }
+ } else {
+ // the RocksDB file seems to be corrupt somehow. Let's just blow it away and create
+ // a new one, so we can keep processing new apps
+ logger.error(
+ "error opening rocksdb file {}. Creating new file, will not be able to "
+ + "recover state for existing applications",
+ dbFile,
+ e);
+ if (dbFile.isDirectory()) {
+ for (File f : Objects.requireNonNull(dbFile.listFiles())) {
+ if (!f.delete()) {
+ logger.warn("error deleting {}", f.getPath());
+ }
+ }
+ }
+ if (!dbFile.delete()) {
+ logger.warn("error deleting {}", dbFile.getPath());
+ }
+ dbOptions.setCreateIfMissing(true);
+ try {
+ tmpDb = org.rocksdb.RocksDB.open(dbOptions, dbFile.toString());
+ } catch (RocksDBException dbExc) {
+ throw new IOException("Unable to create state store", dbExc);
+ }
+ }
+ }
+ try {
+ // if there is a version mismatch, we throw an exception, which means the service
+ // is unusable
+ checkVersion(tmpDb, version);
+ } catch (RocksDBException e) {
+ throw new IOException(e.getMessage(), e);
+ }
+ }
+ return tmpDb;
+ }
+
+ private static class RocksDBLogger extends org.rocksdb.Logger {
+ private static final Logger LOG = LoggerFactory.getLogger(RocksDBLogger.class);
+
+ RocksDBLogger(Options options) {
+ super(options);
+ }
+
+ @Override
+ protected void log(InfoLogLevel infoLogLevel, String message) {
+ if (infoLogLevel == InfoLogLevel.INFO_LEVEL) {
+ LOG.info(message);
+ }
+ }
+ }
+
+ /**
+ * Simple major.minor versioning scheme. Any incompatible changes should be across major versions.
+ * Minor version differences are allowed -- meaning we should be able to read dbs that are either
+ * earlier *or* later on the minor version.
+ */
+ public static void checkVersion(org.rocksdb.RocksDB db, StoreVersion newversion)
+ throws IOException, RocksDBException {
+ byte[] bytes = db.get(StoreVersion.KEY);
+ if (bytes == null) {
+ storeVersion(db, newversion);
+ } else {
+ ArrayList<Integer> versions = PbSerDeUtils.fromPbStoreVersion(bytes);
+ StoreVersion version = new StoreVersion(versions.get(0), versions.get(1));
+ if (version.major != newversion.major) {
+ throw new IOException(
+ "cannot read state DB with version "
+ + version
+ + ", incompatible "
+ + "with current version "
+ + newversion);
+ }
+ storeVersion(db, newversion);
+ }
+ }
+
+ public static void storeVersion(org.rocksdb.RocksDB db, StoreVersion version)
+ throws IOException, RocksDBException {
+ db.put(StoreVersion.KEY, PbSerDeUtils.toPbStoreVersion(version.major, version.minor));
+ }
+}
diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/ShuffleRecoverHelper.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/StoreVersion.java
similarity index 51%
copy from worker/src/main/java/org/apache/celeborn/service/deploy/worker/ShuffleRecoverHelper.java
copy to worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/StoreVersion.java
index df1147993..e77cff970 100644
--- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/ShuffleRecoverHelper.java
+++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/shuffledb/StoreVersion.java
@@ -15,22 +15,41 @@
* limitations under the License.
*/
-package org.apache.celeborn.service.deploy.worker;
+package org.apache.celeborn.service.deploy.worker.shuffledb;
import java.nio.charset.StandardCharsets;
-public abstract class ShuffleRecoverHelper {
- protected String SHUFFLE_KEY_PREFIX = "SHUFFLE-KEY";
- protected LevelDBProvider.StoreVersion CURRENT_VERSION = new LevelDBProvider.StoreVersion(1, 0);
+/**
+ * Used to identify the version of data stored in local shuffle state DB.
+ *
+ * <p>Note: code copied from Apache Spark.
+ */
+public class StoreVersion {
+
+ public static final byte[] KEY = "StoreVersion".getBytes(StandardCharsets.UTF_8);
+
+ public final int major;
+ public final int minor;
+
+ public StoreVersion(int major, int minor) {
+ this.major = major;
+ this.minor = minor;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ StoreVersion that = (StoreVersion) o;
- protected byte[] dbShuffleKey(String shuffleKey) {
- return (SHUFFLE_KEY_PREFIX + ";" + shuffleKey).getBytes(StandardCharsets.UTF_8);
+ return major == that.major && minor == that.minor;
}
- protected String parseDbShuffleKey(String s) {
- if (!s.startsWith(SHUFFLE_KEY_PREFIX)) {
- throw new IllegalArgumentException("Expected a string starting with " + SHUFFLE_KEY_PREFIX);
- }
- return s.substring(SHUFFLE_KEY_PREFIX.length() + 1);
+ @Override
+ public int hashCode() {
+ int result = major;
+ result = 31 * result + minor;
+ return result;
}
}
diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
index ed1f3f130..c729c8402 100644
--- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
+++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
@@ -41,8 +41,6 @@ import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
-import org.iq80.leveldb.DB;
-import org.iq80.leveldb.DBIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,17 +51,20 @@ import org.apache.celeborn.common.metrics.source.AbstractSource;
import org.apache.celeborn.common.unsafe.Platform;
import org.apache.celeborn.common.util.*;
import org.apache.celeborn.common.util.ShuffleBlockInfoUtils.ShuffleBlockInfo;
-import org.apache.celeborn.service.deploy.worker.LevelDBProvider;
import org.apache.celeborn.service.deploy.worker.ShuffleRecoverHelper;
import org.apache.celeborn.service.deploy.worker.WorkerSource;
import org.apache.celeborn.service.deploy.worker.memory.MemoryManager;
+import org.apache.celeborn.service.deploy.worker.shuffledb.DB;
+import org.apache.celeborn.service.deploy.worker.shuffledb.DBBackend;
+import org.apache.celeborn.service.deploy.worker.shuffledb.DBIterator;
+import org.apache.celeborn.service.deploy.worker.shuffledb.DBProvider;
+import org.apache.celeborn.service.deploy.worker.shuffledb.StoreVersion;
public class PartitionFilesSorter extends ShuffleRecoverHelper {
private static final Logger logger = LoggerFactory.getLogger(PartitionFilesSorter.class);
- private static final LevelDBProvider.StoreVersion CURRENT_VERSION =
- new LevelDBProvider.StoreVersion(1, 0);
- private static final String RECOVERY_SORTED_FILES_FILE_NAME = "sortedFiles.ldb";
+ private static final StoreVersion CURRENT_VERSION = new StoreVersion(1, 0);
+ private static final String RECOVERY_SORTED_FILES_FILE_NAME_PREFIX = "sortedFiles";
private File recoverFile;
private volatile boolean shutdown = false;
private final ConcurrentHashMap<String, Set<String>> sortedShuffleFiles =
@@ -104,11 +105,14 @@ public class PartitionFilesSorter extends ShuffleRecoverHelper {
if (gracefulShutdown) {
try {
String recoverPath = conf.workerGracefulShutdownRecoverPath();
- this.recoverFile = new File(recoverPath, RECOVERY_SORTED_FILES_FILE_NAME);
- this.sortedFilesDb = LevelDBProvider.initLevelDB(recoverFile, CURRENT_VERSION);
+ DBBackend dbBackend = DBBackend.byName(conf.workerGracefulShutdownRecoverDbBackend());
+ String recoverySortedFilesFileName =
+ dbBackend.fileName(RECOVERY_SORTED_FILES_FILE_NAME_PREFIX);
+ this.recoverFile = new File(recoverPath, recoverySortedFilesFileName);
+ this.sortedFilesDb = DBProvider.initDB(dbBackend, recoverFile, CURRENT_VERSION);
reloadAndCleanSortedShuffleFiles(this.sortedFilesDb);
} catch (Exception e) {
- logger.error("Failed to reload LevelDB for sorted shuffle files from: " + recoverFile, e);
+ logger.error("Failed to reload DB for sorted shuffle files from: " + recoverFile, e);
this.sortedFilesDb = null;
}
} else {
@@ -260,7 +264,7 @@ public class PartitionFilesSorter extends ShuffleRecoverHelper {
updateSortedShuffleFilesInDB();
sortedFilesDb.close();
} catch (IOException e) {
- logger.error("Store recover data to LevelDB failed.", e);
+ logger.error("Store recover data to DB failed.", e);
}
}
long end = System.currentTimeMillis();
@@ -273,7 +277,7 @@ public class PartitionFilesSorter extends ShuffleRecoverHelper {
sortedFilesDb.close();
recoverFile.delete();
} catch (IOException e) {
- logger.error("Clean LevelDB failed.", e);
+ logger.error("Clean DB failed.", e);
}
}
}
diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index 64538f1fc..4aaedd2a8 100644
--- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -31,7 +31,6 @@ import scala.concurrent.duration._
import io.netty.buffer.PooledByteBufAllocator
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.fs.permission.FsPermission
-import org.iq80.leveldb.{DB, WriteOptions}
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.exception.CelebornException
@@ -45,6 +44,7 @@ import org.apache.celeborn.common.quota.ResourceConsumption
import org.apache.celeborn.common.util.{CelebornExitKind, CelebornHadoopUtils, JavaUtils, PbSerDeUtils, ThreadUtils, Utils}
import org.apache.celeborn.service.deploy.worker._
import org.apache.celeborn.service.deploy.worker.memory.MemoryManager.MemoryPressureListener
+import org.apache.celeborn.service.deploy.worker.shuffledb.{DB, DBBackend, DBProvider}
import org.apache.celeborn.service.deploy.worker.storage.StorageManager.hadoopFs
final private[worker] class StorageManager(conf: CelebornConf, workerSource: AbstractSource)
@@ -177,7 +177,8 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
// shuffleKey -> (fileName -> file info)
private val fileInfos =
JavaUtils.newConcurrentHashMap[String, ConcurrentHashMap[String, FileInfo]]()
- private val RECOVERY_FILE_NAME = "recovery.ldb"
+ private val RECOVERY_FILE_NAME_PREFIX = "recovery"
+ private var RECOVERY_FILE_NAME = "recovery.ldb"
private var db: DB = null
private var saveCommittedFileInfosExecutor: ScheduledExecutorService = _
private val saveCommittedFileInfoBySyncMethod =
@@ -190,8 +191,10 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
val workerGracefulShutdown = conf.workerGracefulShutdown
if (workerGracefulShutdown) {
try {
+ val dbBackend = DBBackend.byName(conf.workerGracefulShutdownRecoverDbBackend)
+ RECOVERY_FILE_NAME = dbBackend.fileName(RECOVERY_FILE_NAME_PREFIX)
val recoverFile = new File(conf.workerGracefulShutdownRecoverPath, RECOVERY_FILE_NAME)
- this.db = LevelDBProvider.initLevelDB(recoverFile, CURRENT_VERSION)
+ this.db = DBProvider.initDB(dbBackend, recoverFile, CURRENT_VERSION)
reloadAndCleanFileInfos(this.db)
} catch {
case e: Exception =>
@@ -212,7 +215,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
db.put(
dbShuffleKey(shuffleKey),
PbSerDeUtils.toPbFileInfoMap(files),
- new WriteOptions().sync(saveCommittedFileInfoBySyncMethod))
+ saveCommittedFileInfoBySyncMethod)
}
}
}
@@ -260,16 +263,16 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
def saveAllCommittedFileInfosToDB(): Unit = {
// save committed fileinfo to DB should be done within the time of saveCommittedFileInfoInterval
saveCommittedFileInfosExecutor.awaitTermination(saveCommittedFileInfoInterval, MILLISECONDS)
- // graceful shutdown might be timed out, persist all committed fileinfos to levelDB
+ // graceful shutdown might be timed out, persist all committed fileinfos to DB
// final flush write through
committedFileInfos.asScala.foreach { case (shuffleKey, files) =>
try {
+ // K8s container might gone
db.put(
dbShuffleKey(shuffleKey),
PbSerDeUtils.toPbFileInfoMap(files),
- // K8s container might gone
- new WriteOptions().sync(true))
- logDebug(s"Update FileInfos into DB: ${shuffleKey} -> ${files}")
+ true)
+ logDebug(s"Update FileInfos into DB: $shuffleKey -> $files")
} catch {
case exception: Exception =>
logError(s"Update FileInfos into DB: ${shuffleKey} failed.", exception)
@@ -696,7 +699,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
db.close()
} catch {
case exception: Exception =>
- logError("Store recover data to LevelDB failed.", exception)
+ logError("Store recover data to DB failed.", exception)
}
} else {
if (db != null) {