You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2017/12/13 08:56:47 UTC

[GitHub] jiazhai closed pull request #829: DbLedgerStorage -- KeyValue storage interface with RocksDB implementation

jiazhai closed pull request #829: DbLedgerStorage -- KeyValue storage interface with RocksDB implementation
URL: https://github.com/apache/bookkeeper/pull/829
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bookkeeper-server/pom.xml b/bookkeeper-server/pom.xml
index 28d90ba0e..bb456fca5 100644
--- a/bookkeeper-server/pom.xml
+++ b/bookkeeper-server/pom.xml
@@ -39,6 +39,11 @@
       <artifactId>bookkeeper-proto</artifactId>
       <version>${project.parent.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.rocksdb</groupId>
+      <artifactId>rocksdbjni</artifactId>
+      <version>${rocksdb.version}</version>
+    </dependency>
     <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-log4j12</artifactId>
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ArrayUtil.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ArrayUtil.java
new file mode 100644
index 000000000..67e98f265
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ArrayUtil.java
@@ -0,0 +1,72 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+// CHECKSTYLE.OFF: IllegalImport
+import io.netty.util.internal.PlatformDependent;
+// CHECKSTYLE.ON: IllegalImport
+
+import java.nio.ByteOrder;
+
+/**
+ * Utility to serialize/deserialize longs into byte arrays.
+ */
+class ArrayUtil {
+
+    private static final boolean UNALIGNED = PlatformDependent.isUnaligned();
+    private static final boolean HAS_UNSAFE = PlatformDependent.hasUnsafe();
+    private static final boolean BIG_ENDIAN_NATIVE_ORDER = ByteOrder.nativeOrder() == ByteOrder.BIG_ENDIAN;
+
+    public static long getLong(byte[] array, int index) {
+        if (HAS_UNSAFE && UNALIGNED) {
+            long v = PlatformDependent.getLong(array, index);
+            return BIG_ENDIAN_NATIVE_ORDER ? v : Long.reverseBytes(v);
+        }
+
+        return ((long) array[index] & 0xff) << 56 | //
+                ((long) array[index + 1] & 0xff) << 48 | //
+                ((long) array[index + 2] & 0xff) << 40 | //
+                ((long) array[index + 3] & 0xff) << 32 | //
+                ((long) array[index + 4] & 0xff) << 24 | //
+                ((long) array[index + 5] & 0xff) << 16 | //
+                ((long) array[index + 6] & 0xff) << 8 | //
+                (long) array[index + 7] & 0xff;
+    }
+
+    public static void setLong(byte[] array, int index, long value) {
+        if (HAS_UNSAFE && UNALIGNED) {
+            PlatformDependent.putLong(array, index, BIG_ENDIAN_NATIVE_ORDER ? value : Long.reverseBytes(value));
+        } else {
+            array[index] = (byte) (value >>> 56);
+            array[index + 1] = (byte) (value >>> 48);
+            array[index + 2] = (byte) (value >>> 40);
+            array[index + 3] = (byte) (value >>> 32);
+            array[index + 4] = (byte) (value >>> 24);
+            array[index + 5] = (byte) (value >>> 16);
+            array[index + 6] = (byte) (value >>> 8);
+            array[index + 7] = (byte) value;
+        }
+    }
+
+    public static final boolean isArrayAllZeros(final byte[] array) {
+        return PlatformDependent.isZero(array, 0, array.length);
+    }
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorage.java
new file mode 100644
index 000000000..b9bbb2a6a
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorage.java
@@ -0,0 +1,162 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Map.Entry;
+
+/**
+ * Abstraction of a generic key-value local database.
+ */
+public interface KeyValueStorage extends Closeable {
+
+    void put(byte[] key, byte[] value) throws IOException;
+
+    /**
+     * Get the value associated with the given key.
+     *
+     * @param key
+     *            the key to lookup
+     * @return the value or null if the key was not found
+     */
+    byte[] get(byte[] key) throws IOException;
+
+    /**
+     * Get the value associated with the given key.
+     *
+     * <p>This method will use the provided array store the value
+     *
+     * @param key
+     *            the key to lookup
+     * @param value
+     *            an array where to store the result
+     * @return -1 if the entry was not found or the length of the value
+     * @throws IOException
+     *             if the value array could not hold the result
+     */
+    int get(byte[] key, byte[] value) throws IOException;
+
+    /**
+     * Get the entry whose key is the biggest and it's lesser than the supplied key.
+     *
+     * <p>For example if the db contains :
+     *
+     * <pre>
+     * {
+     *      1 : 'a',
+     *      2 : 'b',
+     *      3 : 'c'
+     * }
+     * </pre>
+     *
+     * <p>Then:
+     *
+     * <pre>
+     * getFloor(3) --> (2, 'b')
+     * </pre>
+     *
+     * @param key
+     *            the non-inclusive upper limit key
+     * @return the entry before or null if there's no entry before key
+     */
+    Entry<byte[], byte[]> getFloor(byte[] key) throws IOException;
+
+    /**
+     * Get the entry whose key is bigger or equal the supplied key.
+     *
+     * @param key
+     * @return
+     * @throws IOException
+     */
+    Entry<byte[], byte[]> getCeil(byte[] key) throws IOException;
+
+    /**
+     *
+     * @param key
+     * @throws IOException
+     */
+    void delete(byte[] key) throws IOException;
+
+    /**
+     * Get an iterator over to scan sequentially through all the keys in the
+     * database.
+     *
+     * @return
+     */
+    CloseableIterator<byte[]> keys();
+
+    /**
+     * Get an iterator over to scan sequentially through all the keys within a
+     * specified range.
+     *
+     * @param firstKey
+     *            the first key in the range (included)
+     * @param lastKey
+     *            the lastKey in the range (not included)
+     *
+     */
+    CloseableIterator<byte[]> keys(byte[] firstKey, byte[] lastKey);
+
+    /**
+     * Return an iterator object that can be used to sequentially scan through all
+     * the entries in the database.
+     */
+    CloseableIterator<Entry<byte[], byte[]>> iterator();
+
+    /**
+     * Commit all pending write to durable storage.
+     */
+    void sync() throws IOException;
+
+    /**
+     * @return the number of keys.
+     */
+    long count() throws IOException;
+
+    /**
+     * Iterator interface.
+     *
+     * @param <T>
+     */
+    interface CloseableIterator<T> extends Closeable {
+        boolean hasNext() throws IOException;
+
+        T next() throws IOException;
+    }
+
+    Batch newBatch();
+
+    /**
+     * Interface for a batch to be written in the storage.
+     */
+    public interface Batch extends Closeable {
+        void put(byte[] key, byte[] value);
+
+        void remove(byte[] key);
+
+        void deleteRange(byte[] beginKey, byte[] endKey);
+
+        void clear();
+
+        void flush() throws IOException;
+    }
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageFactory.java
new file mode 100644
index 000000000..c35628d77
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageFactory.java
@@ -0,0 +1,42 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import java.io.IOException;
+
+import org.apache.bookkeeper.conf.ServerConfiguration;
+
+/**
+ * Factory class to create instances of the key-value storage implementation.
+ */
+public interface KeyValueStorageFactory {
+
+    /**
+     * Enum used to specify different config profiles in the underlying storage.
+     */
+    enum DbConfigType {
+        Small, // Used for ledgers db, doesn't need particular configuration
+        Huge // Used for location index, lots of writes and much bigger dataset
+    }
+
+    KeyValueStorage newKeyValueStorage(String path, DbConfigType dbConfigType, ServerConfiguration conf)
+            throws IOException;
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageRocksDB.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageRocksDB.java
new file mode 100644
index 000000000..0978fc0b2
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageRocksDB.java
@@ -0,0 +1,447 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.primitives.UnsignedBytes;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorageFactory.DbConfigType;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.rocksdb.BlockBasedTableConfig;
+import org.rocksdb.BloomFilter;
+import org.rocksdb.ChecksumType;
+import org.rocksdb.CompressionType;
+import org.rocksdb.InfoLogLevel;
+import org.rocksdb.Options;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * RocksDB based implementation of the KeyValueStorage.
+ */
+public class KeyValueStorageRocksDB implements KeyValueStorage {
+
+    static KeyValueStorageFactory factory = (path, dbConfigType, conf) -> new KeyValueStorageRocksDB(path, dbConfigType,
+            conf);
+
+    private final RocksDB db;
+
+    private final WriteOptions optionSync = new WriteOptions();
+    private final WriteOptions optionDontSync = new WriteOptions();
+
+    private final ReadOptions optionCache = new ReadOptions();
+    private final ReadOptions optionDontCache = new ReadOptions();
+
+    private final WriteBatch emptyBatch = new WriteBatch();
+
+    private static final String ROCKSDB_LOG_LEVEL = "dbStorage_rocksDB_logLevel";
+    private static final String ROCKSDB_WRITE_BUFFER_SIZE_MB = "dbStorage_rocksDB_writeBufferSizeMB";
+    private static final String ROCKSDB_SST_SIZE_MB = "dbStorage_rocksDB_sstSizeInMB";
+    private static final String ROCKSDB_BLOCK_SIZE = "dbStorage_rocksDB_blockSize";
+    private static final String ROCKSDB_BLOOM_FILTERS_BITS_PER_KEY = "dbStorage_rocksDB_bloomFilterBitsPerKey";
+    private static final String ROCKSDB_BLOCK_CACHE_SIZE = "dbStorage_rocksDB_blockCacheSize";
+    private static final String ROCKSDB_NUM_LEVELS = "dbStorage_rocksDB_numLevels";
+    private static final String ROCKSDB_NUM_FILES_IN_LEVEL0 = "dbStorage_rocksDB_numFilesInLevel0";
+    private static final String ROCKSDB_MAX_SIZE_IN_LEVEL1_MB = "dbStorage_rocksDB_maxSizeInLevel1MB";
+
+    public KeyValueStorageRocksDB(String path, DbConfigType dbConfigType, ServerConfiguration conf) throws IOException {
+        this(path, dbConfigType, conf, false);
+    }
+
+    public KeyValueStorageRocksDB(String path, DbConfigType dbConfigType, ServerConfiguration conf, boolean readOnly)
+            throws IOException {
+        try {
+            RocksDB.loadLibrary();
+        } catch (Throwable t) {
+            throw new IOException("Failed to load RocksDB JNI library", t);
+        }
+
+        try (Options options = new Options()) {
+            options.setCreateIfMissing(true);
+
+            if (dbConfigType == DbConfigType.Huge) {
+                long writeBufferSizeMB = conf.getInt(ROCKSDB_WRITE_BUFFER_SIZE_MB, 64);
+                long sstSizeMB = conf.getInt(ROCKSDB_SST_SIZE_MB, 64);
+                int numLevels = conf.getInt(ROCKSDB_NUM_LEVELS, -1);
+                int numFilesInLevel0 = conf.getInt(ROCKSDB_NUM_FILES_IN_LEVEL0, 4);
+                long maxSizeInLevel1MB = conf.getLong(ROCKSDB_MAX_SIZE_IN_LEVEL1_MB, 256);
+                int blockSize = conf.getInt(ROCKSDB_BLOCK_SIZE, 64 * 1024);
+                long blockCacheSize = conf.getLong(ROCKSDB_BLOCK_CACHE_SIZE, 256 * 1024 * 1024);
+                int bloomFilterBitsPerKey = conf.getInt(ROCKSDB_BLOOM_FILTERS_BITS_PER_KEY, 10);
+
+                options.setCompressionType(CompressionType.LZ4_COMPRESSION);
+                options.setWriteBufferSize(writeBufferSizeMB * 1024 * 1024);
+                options.setMaxWriteBufferNumber(4);
+                if (numLevels > 0) {
+                    options.setNumLevels(numLevels);
+                }
+                options.setLevelZeroFileNumCompactionTrigger(numFilesInLevel0);
+                options.setMaxBytesForLevelBase(maxSizeInLevel1MB * 1024 * 1024);
+                options.setMaxBackgroundCompactions(16);
+                options.setMaxBackgroundFlushes(16);
+                options.setIncreaseParallelism(32);
+                options.setMaxTotalWalSize(512 * 1024 * 1024);
+                options.setMaxOpenFiles(-1);
+                options.setTargetFileSizeBase(sstSizeMB * 1024 * 1024);
+                options.setDeleteObsoleteFilesPeriodMicros(TimeUnit.HOURS.toMicros(1));
+
+                BlockBasedTableConfig tableOptions = new BlockBasedTableConfig();
+                tableOptions.setBlockSize(blockSize);
+                tableOptions.setBlockCacheSize(blockCacheSize);
+                tableOptions.setFormatVersion(2);
+                tableOptions.setChecksumType(ChecksumType.kxxHash);
+                if (bloomFilterBitsPerKey > 0) {
+                    tableOptions.setFilter(new BloomFilter(bloomFilterBitsPerKey, false));
+                }
+
+                // Options best suited for HDDs
+                tableOptions.setCacheIndexAndFilterBlocks(true);
+                options.setLevelCompactionDynamicLevelBytes(true);
+
+                options.setTableFormatConfig(tableOptions);
+            }
+
+            // Configure log level
+            String logLevel = conf.getString(ROCKSDB_LOG_LEVEL, "info");
+            switch (logLevel) {
+            case "debug":
+                options.setInfoLogLevel(InfoLogLevel.DEBUG_LEVEL);
+                break;
+            case "info":
+                options.setInfoLogLevel(InfoLogLevel.INFO_LEVEL);
+                break;
+            case "warn":
+                options.setInfoLogLevel(InfoLogLevel.WARN_LEVEL);
+                break;
+            case "error":
+                options.setInfoLogLevel(InfoLogLevel.ERROR_LEVEL);
+                break;
+            default:
+                log.warn("Unrecognized RockDB log level: {}", logLevel);
+            }
+
+            // Keep log files for 1month
+            options.setKeepLogFileNum(30);
+            options.setLogFileTimeToRoll(TimeUnit.DAYS.toSeconds(1));
+
+            try {
+                if (readOnly) {
+                    db = RocksDB.openReadOnly(options, path);
+                } else {
+                    db = RocksDB.open(options, path);
+                }
+            } catch (RocksDBException e) {
+                throw new IOException("Error open RocksDB database", e);
+            }
+        }
+
+        optionSync.setSync(true);
+        optionDontSync.setSync(false);
+
+        optionCache.setFillCache(true);
+        optionDontCache.setFillCache(false);
+    }
+
+    @Override
+    public void close() throws IOException {
+        db.close();
+        optionSync.close();
+        optionDontSync.close();
+        optionCache.close();
+        optionDontCache.close();
+        emptyBatch.close();
+    }
+
+    @Override
+    public void put(byte[] key, byte[] value) throws IOException {
+        try {
+            db.put(optionDontSync, key, value);
+        } catch (RocksDBException e) {
+            throw new IOException("Error in RocksDB put", e);
+        }
+    }
+
+    @Override
+    public byte[] get(byte[] key) throws IOException {
+        try {
+            return db.get(key);
+        } catch (RocksDBException e) {
+            throw new IOException("Error in RocksDB get", e);
+        }
+    }
+
+    @Override
+    public int get(byte[] key, byte[] value) throws IOException {
+        try {
+            int res = db.get(key, value);
+            if (res == RocksDB.NOT_FOUND) {
+                return -1;
+            } else if (res > value.length) {
+                throw new IOException("Value array is too small to fit the result");
+            } else {
+                return res;
+            }
+        } catch (RocksDBException e) {
+            throw new IOException("Error in RocksDB get", e);
+        }
+    }
+
+    @Override
+    public Entry<byte[], byte[]> getFloor(byte[] key) throws IOException {
+        try (RocksIterator iterator = db.newIterator(optionCache)) {
+            // Position the iterator on the record whose key is >= to the supplied key
+            iterator.seek(key);
+
+            if (!iterator.isValid()) {
+                // There are no entries >= key
+                iterator.seekToLast();
+                if (iterator.isValid()) {
+                    return new EntryWrapper(iterator.key(), iterator.value());
+                } else {
+                    // Db is empty
+                    return null;
+                }
+            }
+
+            iterator.prev();
+
+            if (!iterator.isValid()) {
+                // Iterator is on the 1st entry of the db and this entry key is >= to the target
+                // key
+                return null;
+            } else {
+                return new EntryWrapper(iterator.key(), iterator.value());
+            }
+        }
+    }
+
+    @Override
+    public Entry<byte[], byte[]> getCeil(byte[] key) throws IOException {
+        try (RocksIterator iterator = db.newIterator(optionCache)) {
+            // Position the iterator on the record whose key is >= to the supplied key
+            iterator.seek(key);
+
+            if (iterator.isValid()) {
+                return new EntryWrapper(iterator.key(), iterator.value());
+            } else {
+                return null;
+            }
+        }
+    }
+
+    @Override
+    public void delete(byte[] key) throws IOException {
+        try {
+            db.delete(optionDontSync, key);
+        } catch (RocksDBException e) {
+            throw new IOException("Error in RocksDB delete", e);
+        }
+    }
+
+    @Override
+    public void sync() throws IOException {
+        try {
+            db.write(optionSync, emptyBatch);
+        } catch (RocksDBException e) {
+            throw new IOException(e);
+        }
+    }
+
+    @Override
+    public CloseableIterator<byte[]> keys() {
+        final RocksIterator iterator = db.newIterator(optionCache);
+        iterator.seekToFirst();
+
+        return new CloseableIterator<byte[]>() {
+            @Override
+            public boolean hasNext() {
+                return iterator.isValid();
+            }
+
+            @Override
+            public byte[] next() {
+                checkState(iterator.isValid());
+                byte[] key = iterator.key();
+                iterator.next();
+                return key;
+            }
+
+            @Override
+            public void close() {
+                iterator.close();
+            }
+        };
+    }
+
+    @Override
+    public CloseableIterator<byte[]> keys(byte[] firstKey, byte[] lastKey) {
+        final RocksIterator iterator = db.newIterator(optionCache);
+        iterator.seek(firstKey);
+
+        return new CloseableIterator<byte[]>() {
+            @Override
+            public boolean hasNext() {
+                return iterator.isValid() && ByteComparator.compare(iterator.key(), lastKey) < 0;
+            }
+
+            @Override
+            public byte[] next() {
+                checkState(iterator.isValid());
+                byte[] key = iterator.key();
+                iterator.next();
+                return key;
+            }
+
+            @Override
+            public void close() {
+                iterator.close();
+            }
+        };
+    }
+
+    @Override
+    public CloseableIterator<Entry<byte[], byte[]>> iterator() {
+        final RocksIterator iterator = db.newIterator(optionDontCache);
+        iterator.seekToFirst();
+        final EntryWrapper entryWrapper = new EntryWrapper();
+
+        return new CloseableIterator<Entry<byte[], byte[]>>() {
+            @Override
+            public boolean hasNext() {
+                return iterator.isValid();
+            }
+
+            @Override
+            public Entry<byte[], byte[]> next() {
+                checkState(iterator.isValid());
+                entryWrapper.key = iterator.key();
+                entryWrapper.value = iterator.value();
+                iterator.next();
+                return entryWrapper;
+            }
+
+            @Override
+            public void close() {
+                iterator.close();
+            }
+        };
+    }
+
+    @Override
+    public long count() throws IOException {
+        try {
+            return db.getLongProperty("rocksdb.estimate-num-keys");
+        } catch (RocksDBException e) {
+            throw new IOException("Error in getting records count", e);
+        }
+    }
+
+    @Override
+    public Batch newBatch() {
+        return new RocksDBBatch();
+    }
+
+    private class RocksDBBatch implements Batch {
+        private final WriteBatch writeBatch = new WriteBatch();
+
+        @Override
+        public void close() {
+            writeBatch.close();
+        }
+
+        @Override
+        public void put(byte[] key, byte[] value) {
+            writeBatch.put(key, value);
+        }
+
+        @Override
+        public void remove(byte[] key) {
+            writeBatch.remove(key);
+        }
+
+        @Override
+        public void clear() {
+            writeBatch.clear();
+        }
+
+        @Override
+        public void deleteRange(byte[] beginKey, byte[] endKey) {
+            writeBatch.deleteRange(beginKey, endKey);
+        }
+
+        @Override
+        public void flush() throws IOException {
+            try {
+                db.write(optionSync, writeBatch);
+            } catch (RocksDBException e) {
+                throw new IOException("Failed to flush RocksDB batch", e);
+            }
+        }
+    }
+
+    private static final class EntryWrapper implements Entry<byte[], byte[]> {
+        // This is not final since the iterator will reuse the same EntryWrapper
+        // instance at each step
+        private byte[] key;
+        private byte[] value;
+
+        public EntryWrapper() {
+            this.key = null;
+            this.value = null;
+        }
+
+        public EntryWrapper(byte[] key, byte[] value) {
+            this.key = key;
+            this.value = value;
+        }
+
+        @Override
+        public byte[] setValue(byte[] value) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public byte[] getValue() {
+            return value;
+        }
+
+        @Override
+        public byte[] getKey() {
+            return key;
+        }
+    }
+
+    private static final Comparator<byte[]> ByteComparator = UnsignedBytes.lexicographicalComparator();
+
+    private static final Logger log = LoggerFactory.getLogger(KeyValueStorageRocksDB.class);
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/package-info.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/package-info.java
new file mode 100644
index 000000000..6c6cd8c92
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/package-info.java
@@ -0,0 +1,25 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**
+ * Classes related to DB based ledger storage.
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
\ No newline at end of file
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageTest.java
new file mode 100644
index 000000000..d1c366fc3
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageTest.java
@@ -0,0 +1,175 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.collect.Lists;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorage.Batch;
+import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorage.CloseableIterator;
+import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorageFactory.DbConfigType;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * Unit test for {@link KeyValueStorage}.
+ */
+@RunWith(Parameterized.class)
+public class KeyValueStorageTest {
+
+    private final KeyValueStorageFactory storageFactory;
+    private final ServerConfiguration configuration;
+
+    @Parameters
+    public static Collection<Object[]> configs() {
+        return Arrays.asList(new Object[][] { { KeyValueStorageRocksDB.factory } });
+    }
+
+    public KeyValueStorageTest(KeyValueStorageFactory storageFactory) {
+        this.storageFactory = storageFactory;
+        this.configuration = new ServerConfiguration();
+    }
+
+    private static long fromArray(byte[] array) {
+        return ArrayUtil.getLong(array, 0);
+    }
+
+    private static byte[] toArray(long n) {
+        byte[] b = new byte[8];
+        ArrayUtil.setLong(b, 0, n);
+        return b;
+    }
+
+    @Test
+    public void simple() throws Exception {
+        File tmpDir = File.createTempFile("bookie", "test");
+        tmpDir.delete();
+
+        KeyValueStorage db = storageFactory.newKeyValueStorage(tmpDir.getAbsolutePath(), DbConfigType.Small,
+                configuration);
+
+        assertEquals(null, db.getFloor(toArray(3)));
+        assertEquals(0, db.count());
+
+        db.put(toArray(5), toArray(5));
+
+        assertEquals(null, db.getFloor(toArray(3)));
+        assertEquals(1, db.count());
+
+        assertEquals(null, db.getFloor(toArray(5)));
+        assertEquals(5, fromArray(db.getFloor(toArray(6)).getKey()));
+
+        db.put(toArray(3), toArray(3));
+
+        assertEquals(null, db.getFloor(toArray(3)));
+        assertEquals(2, db.count());
+
+        // //
+
+        db.put(toArray(5), toArray(5));
+        // Count can be imprecise
+        assertTrue(db.count() > 0);
+
+        assertEquals(null, db.getFloor(toArray(1)));
+        assertEquals(null, db.getFloor(toArray(3)));
+        assertEquals(3, fromArray(db.getFloor(toArray(5)).getKey()));
+        assertEquals(5, fromArray(db.getFloor(toArray(6)).getKey()));
+        assertEquals(5, fromArray(db.getFloor(toArray(10)).getKey()));
+
+        // Iterate
+        List<Long> foundKeys = Lists.newArrayList();
+        CloseableIterator<Entry<byte[], byte[]>> iter = db.iterator();
+        try {
+            while (iter.hasNext()) {
+                foundKeys.add(fromArray(iter.next().getKey()));
+            }
+        } finally {
+            iter.close();
+        }
+
+        assertEquals(Lists.newArrayList(3L, 5L), foundKeys);
+
+        // Iterate over keys
+        foundKeys = Lists.newArrayList();
+        CloseableIterator<byte[]> iter2 = db.keys();
+        try {
+            while (iter2.hasNext()) {
+                foundKeys.add(fromArray(iter2.next()));
+            }
+        } finally {
+            iter2.close();
+        }
+
+        assertEquals(Lists.newArrayList(3L, 5L), foundKeys);
+
+        // Scan with limits
+        foundKeys = Lists.newArrayList();
+        iter2 = db.keys(toArray(1), toArray(4));
+        try {
+            while (iter2.hasNext()) {
+                foundKeys.add(fromArray(iter2.next()));
+            }
+        } finally {
+            iter2.close();
+        }
+
+        assertEquals(Lists.newArrayList(3L), foundKeys);
+
+        // Test deletion
+        db.put(toArray(10), toArray(10));
+        db.put(toArray(11), toArray(11));
+        db.put(toArray(12), toArray(12));
+        db.put(toArray(14), toArray(14));
+
+        // Count can be imprecise
+        assertTrue(db.count() > 0);
+
+        assertEquals(10L, fromArray(db.get(toArray(10))));
+        db.delete(toArray(10));
+        assertEquals(null, db.get(toArray(10)));
+        assertTrue(db.count() > 0);
+
+        Batch batch = db.newBatch();
+        batch.remove(toArray(11));
+        batch.remove(toArray(12));
+        batch.remove(toArray(13));
+        batch.flush();
+        assertEquals(null, db.get(toArray(11)));
+        assertEquals(null, db.get(toArray(12)));
+        assertEquals(null, db.get(toArray(13)));
+        assertEquals(14L, fromArray(db.get(toArray(14))));
+        batch.close();
+
+        db.close();
+        tmpDir.delete();
+    }
+}
diff --git a/pom.xml b/pom.xml
index ccb5a9272..9c7a00270 100644
--- a/pom.xml
+++ b/pom.xml
@@ -114,6 +114,7 @@
     <netty-boringssl.version>2.0.3.Final</netty-boringssl.version>
     <slf4j.version>1.7.25</slf4j.version>
     <zookeeper.version>3.5.3-beta</zookeeper.version>
+    <rocksdb.version>5.8.6</rocksdb.version>
     <!-- plugin dependencies -->
     <findbugs-maven-plugin.version>3.0.5</findbugs-maven-plugin.version>
     <puppycrawl.checkstyle.version>6.19</puppycrawl.checkstyle.version>


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services