You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by yo...@apache.org on 2023/08/14 00:49:34 UTC

[bookkeeper] branch master updated: Allow to set max operation numbers in a single rocksdb batch (#4044)

This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new ad0ed213c0 Allow to set max operation numbers in a single rocksdb batch (#4044)
ad0ed213c0 is described below

commit ad0ed213c0d3abf971441c7160334af99d94159c
Author: Yong Zhang <zh...@gmail.com>
AuthorDate: Mon Aug 14 08:49:27 2023 +0800

    Allow to set max operation numbers in a single rocksdb batch (#4044)
    
    ---
    
    ## Motivation
    
    In rocksdb, the memory usage is related to the batch size.
    The more operations in a single batch, the more memory is consumed.
    Expose the configuration to allow control the batch size.
---
 .../bookie/storage/ldb/KeyValueStorage.java        |  4 +++
 .../bookie/storage/ldb/KeyValueStorageRocksDB.java | 28 +++++++++++++++-
 .../bookkeeper/conf/ServerConfiguration.java       | 24 ++++++++++++++
 .../bookie/storage/ldb/KeyValueStorageTest.java    | 37 ++++++++++++++++++++++
 4 files changed, 92 insertions(+), 1 deletion(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorage.java
index ab724d73bb..8e18148c08 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorage.java
@@ -178,5 +178,9 @@ public interface KeyValueStorage extends Closeable {
         void clear();
 
         void flush() throws IOException;
+
+        default int batchCount() {
+            return -1;
+        }
     }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageRocksDB.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageRocksDB.java
index fd87050693..a77a0a18f7 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageRocksDB.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageRocksDB.java
@@ -84,6 +84,7 @@ public class KeyValueStorageRocksDB implements KeyValueStorage {
     private final ReadOptions optionCache;
     private final ReadOptions optionDontCache;
     private final WriteBatch emptyBatch;
+    private final int writeBatchMaxSize;
 
     private String dbPath;
 
@@ -143,6 +144,8 @@ public class KeyValueStorageRocksDB implements KeyValueStorage {
 
         optionCache.setFillCache(true);
         optionDontCache.setFillCache(false);
+
+        this.writeBatchMaxSize = conf.getMaxOperationNumbersInSingleRocksDBBatch();
     }
 
     private RocksDB initializeRocksDBWithConfFile(String basePath, String subPath, DbConfigType dbConfigType,
@@ -516,21 +519,29 @@ public class KeyValueStorageRocksDB implements KeyValueStorage {
 
     @Override
     public Batch newBatch() {
-        return new RocksDBBatch();
+        return new RocksDBBatch(writeBatchMaxSize);
     }
 
     private class RocksDBBatch implements Batch {
         private final WriteBatch writeBatch = new WriteBatch();
+        private final int batchSize;
+        private int batchCount = 0;
+
+        RocksDBBatch(int batchSize) {
+            this.batchSize = batchSize;
+        }
 
         @Override
         public void close() {
             writeBatch.close();
+            batchCount = 0;
         }
 
         @Override
         public void put(byte[] key, byte[] value) throws IOException {
             try {
                 writeBatch.put(key, value);
+                countBatchAndFlushIfNeeded();
             } catch (RocksDBException e) {
                 throw new IOException("Failed to flush RocksDB batch", e);
             }
@@ -540,6 +551,7 @@ public class KeyValueStorageRocksDB implements KeyValueStorage {
         public void remove(byte[] key) throws IOException {
             try {
                 writeBatch.delete(key);
+                countBatchAndFlushIfNeeded();
             } catch (RocksDBException e) {
                 throw new IOException("Failed to flush RocksDB batch", e);
             }
@@ -548,17 +560,31 @@ public class KeyValueStorageRocksDB implements KeyValueStorage {
         @Override
         public void clear() {
             writeBatch.clear();
+            batchCount = 0;
         }
 
         @Override
         public void deleteRange(byte[] beginKey, byte[] endKey) throws IOException {
             try {
                 writeBatch.deleteRange(beginKey, endKey);
+                countBatchAndFlushIfNeeded();
             } catch (RocksDBException e) {
                 throw new IOException("Failed to flush RocksDB batch", e);
             }
         }
 
+        private void countBatchAndFlushIfNeeded() throws IOException {
+            if (++batchCount >= batchSize) {
+                flush();
+                clear();
+            }
+        }
+
+        @Override
+        public int batchCount() {
+            return batchCount;
+        }
+
         @Override
         public void flush() throws IOException {
             try {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index 14d98ce43c..454069a04d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -339,6 +339,9 @@ public class ServerConfiguration extends AbstractConfiguration<ServerConfigurati
     // Used for location index, lots of writes and much bigger dataset
     protected static final String LEDGER_METADATA_ROCKSDB_CONF = "ledgerMetadataRocksdbConf";
 
+    protected static final String MAX_OPERATION_NUMBERS_IN_SINGLE_ROCKSDB_WRITE_BATCH =
+        "maxOperationNumbersInSingleRocksdbWriteBatch";
+
     protected static final String SKIP_REPLAY_JOURNAL_INVALID_RECORD = "skipReplayJournalInvalidRecord";
 
     /**
@@ -4102,4 +4105,25 @@ public class ServerConfiguration extends AbstractConfiguration<ServerConfigurati
         this.setProperty(LEDGER_METADATA_ROCKSDB_CONF, ledgerMetadataRocksdbConf);
         return this;
     }
+
+    /**
+     * Set the max operation numbers in a single rocksdb write batch.
+     * The rocksdb write batch is related to the memory usage. If the batch is too large, it will cause the OOM.
+     *
+     * @param maxNumbersInSingleRocksDBBatch
+     * @return
+     */
+    public ServerConfiguration setOperationMaxNumbersInSingleRocksDBWriteBatch(int maxNumbersInSingleRocksDBBatch) {
+        this.setProperty(MAX_OPERATION_NUMBERS_IN_SINGLE_ROCKSDB_WRITE_BATCH, maxNumbersInSingleRocksDBBatch);
+        return this;
+    }
+
+    /**
+     * Get the max operation numbers in a single rocksdb write batch.
+     *
+     * @return
+     */
+    public int getMaxOperationNumbersInSingleRocksDBBatch() {
+        return getInt(MAX_OPERATION_NUMBERS_IN_SINGLE_ROCKSDB_WRITE_BATCH, 100000);
+    }
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageTest.java
index 458f3c69f9..d52f19305e 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageTest.java
@@ -170,4 +170,41 @@ public class KeyValueStorageTest {
         db.close();
         FileUtils.deleteDirectory(tmpDir);
     }
+
+    @Test
+    public void testBatch() throws Exception {
+
+        configuration.setOperationMaxNumbersInSingleRocksDBWriteBatch(5);
+
+        File tmpDir = Files.createTempDirectory("junitTemporaryFolder").toFile();
+        Files.createDirectory(Paths.get(tmpDir.toString(), "subDir"));
+
+        KeyValueStorage db = storageFactory.newKeyValueStorage(tmpDir.toString(), "subDir", DbConfigType.Default,
+                configuration);
+
+        assertEquals(null, db.getFloor(toArray(3)));
+        assertEquals(0, db.count());
+
+        Batch batch = db.newBatch();
+        assertEquals(0, batch.batchCount());
+
+        batch.put(toArray(1), toArray(1));
+        batch.put(toArray(2), toArray(2));
+        assertEquals(2, batch.batchCount());
+
+        batch.put(toArray(3), toArray(3));
+        batch.put(toArray(4), toArray(4));
+        batch.put(toArray(5), toArray(5));
+        assertEquals(0, batch.batchCount());
+        batch.put(toArray(6), toArray(6));
+        assertEquals(1, batch.batchCount());
+
+        batch.flush();
+        assertEquals(1, batch.batchCount());
+        batch.close();
+        assertEquals(0, batch.batchCount());
+
+        db.close();
+        FileUtils.deleteDirectory(tmpDir);
+    }
 }