You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by yo...@apache.org on 2023/08/14 00:49:34 UTC
[bookkeeper] branch master updated: Allow to set max operation numbers in a single rocksdb batch (#4044)
This is an automated email from the ASF dual-hosted git repository.
yong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new ad0ed213c0 Allow to set max operation numbers in a single rocksdb batch (#4044)
ad0ed213c0 is described below
commit ad0ed213c0d3abf971441c7160334af99d94159c
Author: Yong Zhang <zh...@gmail.com>
AuthorDate: Mon Aug 14 08:49:27 2023 +0800
Allow to set max operation numbers in a single rocksdb batch (#4044)
---
## Motivation
In rocksdb, the memory usage is related to the batch size.
The more operations in a single batch, the more memory is consumed.
Expose the configuration to allow control the batch size.
---
.../bookie/storage/ldb/KeyValueStorage.java | 4 +++
.../bookie/storage/ldb/KeyValueStorageRocksDB.java | 28 +++++++++++++++-
.../bookkeeper/conf/ServerConfiguration.java | 24 ++++++++++++++
.../bookie/storage/ldb/KeyValueStorageTest.java | 37 ++++++++++++++++++++++
4 files changed, 92 insertions(+), 1 deletion(-)
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorage.java
index ab724d73bb..8e18148c08 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorage.java
@@ -178,5 +178,9 @@ public interface KeyValueStorage extends Closeable {
void clear();
void flush() throws IOException;
+
+ default int batchCount() {
+ return -1;
+ }
}
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageRocksDB.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageRocksDB.java
index fd87050693..a77a0a18f7 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageRocksDB.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageRocksDB.java
@@ -84,6 +84,7 @@ public class KeyValueStorageRocksDB implements KeyValueStorage {
private final ReadOptions optionCache;
private final ReadOptions optionDontCache;
private final WriteBatch emptyBatch;
+ private final int writeBatchMaxSize;
private String dbPath;
@@ -143,6 +144,8 @@ public class KeyValueStorageRocksDB implements KeyValueStorage {
optionCache.setFillCache(true);
optionDontCache.setFillCache(false);
+
+ this.writeBatchMaxSize = conf.getMaxOperationNumbersInSingleRocksDBBatch();
}
private RocksDB initializeRocksDBWithConfFile(String basePath, String subPath, DbConfigType dbConfigType,
@@ -516,21 +519,29 @@ public class KeyValueStorageRocksDB implements KeyValueStorage {
@Override
public Batch newBatch() {
- return new RocksDBBatch();
+ return new RocksDBBatch(writeBatchMaxSize);
}
private class RocksDBBatch implements Batch {
private final WriteBatch writeBatch = new WriteBatch();
+ private final int batchSize;
+ private int batchCount = 0;
+
+ RocksDBBatch(int batchSize) {
+ this.batchSize = batchSize;
+ }
@Override
public void close() {
writeBatch.close();
+ batchCount = 0;
}
@Override
public void put(byte[] key, byte[] value) throws IOException {
try {
writeBatch.put(key, value);
+ countBatchAndFlushIfNeeded();
} catch (RocksDBException e) {
throw new IOException("Failed to flush RocksDB batch", e);
}
@@ -540,6 +551,7 @@ public class KeyValueStorageRocksDB implements KeyValueStorage {
public void remove(byte[] key) throws IOException {
try {
writeBatch.delete(key);
+ countBatchAndFlushIfNeeded();
} catch (RocksDBException e) {
throw new IOException("Failed to flush RocksDB batch", e);
}
@@ -548,17 +560,31 @@ public class KeyValueStorageRocksDB implements KeyValueStorage {
@Override
public void clear() {
writeBatch.clear();
+ batchCount = 0;
}
@Override
public void deleteRange(byte[] beginKey, byte[] endKey) throws IOException {
try {
writeBatch.deleteRange(beginKey, endKey);
+ countBatchAndFlushIfNeeded();
} catch (RocksDBException e) {
throw new IOException("Failed to flush RocksDB batch", e);
}
}
+ private void countBatchAndFlushIfNeeded() throws IOException {
+ if (++batchCount >= batchSize) {
+ flush();
+ clear();
+ }
+ }
+
+ @Override
+ public int batchCount() {
+ return batchCount;
+ }
+
@Override
public void flush() throws IOException {
try {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index 14d98ce43c..454069a04d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -339,6 +339,9 @@ public class ServerConfiguration extends AbstractConfiguration<ServerConfigurati
// Used for location index, lots of writes and much bigger dataset
protected static final String LEDGER_METADATA_ROCKSDB_CONF = "ledgerMetadataRocksdbConf";
+ protected static final String MAX_OPERATION_NUMBERS_IN_SINGLE_ROCKSDB_WRITE_BATCH =
+ "maxOperationNumbersInSingleRocksdbWriteBatch";
+
protected static final String SKIP_REPLAY_JOURNAL_INVALID_RECORD = "skipReplayJournalInvalidRecord";
/**
@@ -4102,4 +4105,25 @@ public class ServerConfiguration extends AbstractConfiguration<ServerConfigurati
this.setProperty(LEDGER_METADATA_ROCKSDB_CONF, ledgerMetadataRocksdbConf);
return this;
}
+
+ /**
+ * Set the max operation numbers in a single rocksdb write batch.
+ * The rocksdb write batch is related to the memory usage. If the batch is too large, it will cause the OOM.
+ *
+ * @param maxNumbersInSingleRocksDBBatch
+ * @return
+ */
+ public ServerConfiguration setOperationMaxNumbersInSingleRocksDBWriteBatch(int maxNumbersInSingleRocksDBBatch) {
+ this.setProperty(MAX_OPERATION_NUMBERS_IN_SINGLE_ROCKSDB_WRITE_BATCH, maxNumbersInSingleRocksDBBatch);
+ return this;
+ }
+
+ /**
+ * Get the max operation numbers in a single rocksdb write batch.
+ *
+ * @return
+ */
+ public int getMaxOperationNumbersInSingleRocksDBBatch() {
+ return getInt(MAX_OPERATION_NUMBERS_IN_SINGLE_ROCKSDB_WRITE_BATCH, 100000);
+ }
}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageTest.java
index 458f3c69f9..d52f19305e 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageTest.java
@@ -170,4 +170,41 @@ public class KeyValueStorageTest {
db.close();
FileUtils.deleteDirectory(tmpDir);
}
+
+ @Test
+ public void testBatch() throws Exception {
+
+ configuration.setOperationMaxNumbersInSingleRocksDBWriteBatch(5);
+
+ File tmpDir = Files.createTempDirectory("junitTemporaryFolder").toFile();
+ Files.createDirectory(Paths.get(tmpDir.toString(), "subDir"));
+
+ KeyValueStorage db = storageFactory.newKeyValueStorage(tmpDir.toString(), "subDir", DbConfigType.Default,
+ configuration);
+
+ assertEquals(null, db.getFloor(toArray(3)));
+ assertEquals(0, db.count());
+
+ Batch batch = db.newBatch();
+ assertEquals(0, batch.batchCount());
+
+ batch.put(toArray(1), toArray(1));
+ batch.put(toArray(2), toArray(2));
+ assertEquals(2, batch.batchCount());
+
+ batch.put(toArray(3), toArray(3));
+ batch.put(toArray(4), toArray(4));
+ batch.put(toArray(5), toArray(5));
+ assertEquals(0, batch.batchCount());
+ batch.put(toArray(6), toArray(6));
+ assertEquals(1, batch.batchCount());
+
+ batch.flush();
+ assertEquals(1, batch.batchCount());
+ batch.close();
+ assertEquals(0, batch.batchCount());
+
+ db.close();
+ FileUtils.deleteDirectory(tmpDir);
+ }
}