You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2017/12/11 19:23:04 UTC

[GitHub] merlimat closed pull request #826: DbLedgerStorage implementation

merlimat closed pull request #826: DbLedgerStorage implementation
URL: https://github.com/apache/bookkeeper/pull/826
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bookkeeper-proto/pom.xml b/bookkeeper-proto/pom.xml
index cc9169da7..388bdcb7b 100644
--- a/bookkeeper-proto/pom.xml
+++ b/bookkeeper-proto/pom.xml
@@ -45,6 +45,7 @@
             <!-- exclude generated file //-->
             <exclude>**/DataFormats.java</exclude>
             <exclude>**/BookkeeperProtocol.java</exclude>
+            <exclude>**/DbLedgerStorageDataFormats.java</exclude>
           </excludes>
         </configuration>
       </plugin>
diff --git a/bookkeeper-proto/src/main/proto/DbLedgerStorageDataFormats.proto b/bookkeeper-proto/src/main/proto/DbLedgerStorageDataFormats.proto
new file mode 100644
index 000000000..e68b2d0d5
--- /dev/null
+++ b/bookkeeper-proto/src/main/proto/DbLedgerStorageDataFormats.proto
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+syntax = "proto2";
+
+option java_package = "org.apache.bookkeeper.bookie.storage.ldb";
+option optimize_for = SPEED;
+
+/**
+ * Ledger metadata stored in the bookie
+ */
+message LedgerData {
+    required bool exists = 1;
+    required bool fenced = 2;
+    required bytes masterKey = 3;
+}
diff --git a/bookkeeper-server/pom.xml b/bookkeeper-server/pom.xml
index 28d90ba0e..8d7022ef3 100644
--- a/bookkeeper-server/pom.xml
+++ b/bookkeeper-server/pom.xml
@@ -39,6 +39,11 @@
       <artifactId>bookkeeper-proto</artifactId>
       <version>${project.parent.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.rocksdb</groupId>
+      <artifactId>rocksdbjni</artifactId>
+      <version>5.8.6</version>
+    </dependency>
     <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-log4j12</artifactId>
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
index 79cd2f930..6ae499e98 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -1437,6 +1437,11 @@ public Observable waitForLastAddConfirmedUpdate(long ledgerId, long previoisLAC,
         return handle.waitForLastAddConfirmedUpdate(previoisLAC, observer);
     }
 
+    @VisibleForTesting
+    public LedgerStorage getLedgerStorage() {
+        return ledgerStorage;
+    }
+
     // The rest of the code is test stuff
     static class CounterCallback implements WriteCallback {
         int count;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
index 1d19a3024..3e334edb7 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
@@ -22,6 +22,7 @@
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.AbstractFuture;
 
 import io.netty.buffer.ByteBuf;
@@ -33,6 +34,7 @@
 import java.io.Serializable;
 import java.math.RoundingMode;
 import java.nio.ByteBuffer;
+import java.nio.file.FileSystems;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
@@ -59,8 +61,12 @@
 
 import org.apache.bookkeeper.bookie.BookieException.CookieNotFoundException;
 import org.apache.bookkeeper.bookie.BookieException.InvalidCookieException;
+import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
 import org.apache.bookkeeper.bookie.EntryLogger.EntryLogScanner;
 import org.apache.bookkeeper.bookie.Journal.JournalScanner;
+import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage;
+import org.apache.bookkeeper.bookie.storage.ldb.EntryLocationIndex;
+import org.apache.bookkeeper.bookie.storage.ldb.LocationsIndexRebuildOp;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
@@ -142,6 +148,9 @@
     static final String CMD_DECOMMISSIONBOOKIE = "decommissionbookie";
     static final String CMD_LOSTBOOKIERECOVERYDELAY = "lostbookierecoverydelay";
     static final String CMD_TRIGGERAUDIT = "triggeraudit";
+    static final String CMD_CONVERT_TO_DB_STORAGE = "convert-to-db-storage";
+    static final String CMD_CONVERT_TO_INTERLEAVED_STORAGE = "convert-to-interleaved-storage";
+    static final String CMD_REBUILD_DB_LEDGER_LOCATIONS_INDEX = "rebuild-db-ledger-locations-index";
     static final String CMD_HELP = "help";
 
     final ServerConfiguration bkConf = new ServerConfiguration();
@@ -1886,7 +1895,6 @@ public void progress(long updated, long issued) {
             }
             return 0;
         }
-
     }
 
     /**
@@ -2120,6 +2128,241 @@ public int runCmd(CommandLine cmdLine) throws Exception {
         void progress(long updated, long issued);
     }
 
+
+    /**
+     * Convert bookie indexes from InterleavedStorage to DbLedgerStorage format.
+     */
+    class ConvertToDbStorageCmd extends MyCommand {
+        Options opts = new Options();
+
+        public ConvertToDbStorageCmd() {
+            super(CMD_CONVERT_TO_DB_STORAGE);
+        }
+
+        @Override
+        Options getOptions() {
+            return opts;
+        }
+
+        @Override
+        String getDescription() {
+            return "Convert bookie indexes from InterleavedStorage to DbLedgerStorage format";
+        }
+
+        String getUsage() {
+            return CMD_CONVERT_TO_DB_STORAGE;
+        }
+
+        @Override
+        int runCmd(CommandLine cmdLine) throws Exception {
+            LOG.info("=== Converting to DbLedgerStorage ===");
+            ServerConfiguration conf = new ServerConfiguration(bkConf);
+            LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(bkConf, bkConf.getLedgerDirs(),
+                    new DiskChecker(bkConf.getDiskUsageThreshold(), bkConf.getDiskUsageWarnThreshold()));
+            LedgerDirsManager ledgerIndexManager = new LedgerDirsManager(bkConf, bkConf.getLedgerDirs(),
+                    new DiskChecker(bkConf.getDiskUsageThreshold(), bkConf.getDiskUsageWarnThreshold()));
+
+            InterleavedLedgerStorage interleavedStorage = new InterleavedLedgerStorage();
+            DbLedgerStorage dbStorage = new DbLedgerStorage();
+
+            CheckpointSource checkpointSource = new CheckpointSource() {
+                    @Override
+                    public Checkpoint newCheckpoint() {
+                        return Checkpoint.MAX;
+                    }
+
+                    @Override
+                    public void checkpointComplete(Checkpoint checkpoint, boolean compact)
+                            throws IOException {
+                    }
+                };
+            Checkpointer checkpointer = new Checkpointer() {
+                @Override
+                public void startCheckpoint(Checkpoint checkpoint) {
+                    // No-op
+                }
+            };
+
+            interleavedStorage.initialize(conf, null, ledgerDirsManager, ledgerIndexManager,
+                    checkpointSource, checkpointer, NullStatsLogger.INSTANCE);
+            dbStorage.initialize(conf, null, ledgerDirsManager, ledgerIndexManager,
+                    checkpointSource, checkpointer, NullStatsLogger.INSTANCE);
+
+            int convertedLedgers = 0;
+            for (long ledgerId : interleavedStorage.getActiveLedgersInRange(0, Long.MAX_VALUE)) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Converting ledger {}", ledgerId);
+                }
+
+                FileInfo fi = getFileInfo(ledgerId);
+
+                Iterable<SortedMap<Long, Long>> entries = getLedgerIndexEntries(ledgerId);
+
+                long numberOfEntries = dbStorage.addLedgerToIndex(ledgerId, fi.isFenced(), fi.getMasterKey(), entries);
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("   -- done. fenced={} entries={}", fi.isFenced(), numberOfEntries);
+                }
+
+                // Remove index from old storage
+                interleavedStorage.deleteLedger(ledgerId);
+
+                if (++convertedLedgers % 1000 == 0) {
+                    LOG.info("Converted {} ledgers", convertedLedgers);
+                }
+            }
+
+            dbStorage.shutdown();
+            interleavedStorage.shutdown();
+
+            LOG.info("---- Done Converting ----");
+            return 0;
+        }
+    }
+
+    /**
+     * Convert bookie indexes from DbLedgerStorage to InterleavedStorage format.
+     */
+    class ConvertToInterleavedStorageCmd extends MyCommand {
+        Options opts = new Options();
+
+        public ConvertToInterleavedStorageCmd() {
+            super(CMD_CONVERT_TO_INTERLEAVED_STORAGE);
+        }
+
+        @Override
+        Options getOptions() {
+            return opts;
+        }
+
+        @Override
+        String getDescription() {
+            return "Convert bookie indexes from DbLedgerStorage to InterleavedStorage format";
+        }
+
+        String getUsage() {
+            return CMD_CONVERT_TO_INTERLEAVED_STORAGE;
+        }
+
+        @Override
+        int runCmd(CommandLine cmdLine) throws Exception {
+            LOG.info("=== Converting DbLedgerStorage ===");
+            ServerConfiguration conf = new ServerConfiguration(bkConf);
+            LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(bkConf, bkConf.getLedgerDirs(),
+                    new DiskChecker(bkConf.getDiskUsageThreshold(), bkConf.getDiskUsageWarnThreshold()));
+            LedgerDirsManager ledgerIndexManager = new LedgerDirsManager(bkConf, bkConf.getLedgerDirs(),
+                    new DiskChecker(bkConf.getDiskUsageThreshold(), bkConf.getDiskUsageWarnThreshold()));
+
+            DbLedgerStorage dbStorage = new DbLedgerStorage();
+            InterleavedLedgerStorage interleavedStorage = new InterleavedLedgerStorage();
+
+            CheckpointSource checkpointSource = new CheckpointSource() {
+                    @Override
+                    public Checkpoint newCheckpoint() {
+                        return Checkpoint.MAX;
+                    }
+
+                    @Override
+                    public void checkpointComplete(Checkpoint checkpoint, boolean compact)
+                            throws IOException {
+                    }
+                };
+            Checkpointer checkpointer = new Checkpointer() {
+                @Override
+                public void startCheckpoint(Checkpoint checkpoint) {
+                    // No-op
+                }
+            };
+
+            dbStorage.initialize(conf, null, ledgerDirsManager, ledgerIndexManager,
+                        checkpointSource, checkpointer, NullStatsLogger.INSTANCE);
+            interleavedStorage.initialize(conf, null, ledgerDirsManager, ledgerIndexManager,
+                    checkpointSource, checkpointer, NullStatsLogger.INSTANCE);
+            LedgerCache interleavedLedgerCache = interleavedStorage.ledgerCache;
+
+            EntryLocationIndex dbEntryLocationIndex = dbStorage.getEntryLocationIndex();
+
+            int convertedLedgers = 0;
+            for (long ledgerId : dbStorage.getActiveLedgersInRange(0, Long.MAX_VALUE)) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Converting ledger {}", ledgerId);
+                }
+
+                interleavedStorage.setMasterKey(ledgerId, dbStorage.readMasterKey(ledgerId));
+                if (dbStorage.isFenced(ledgerId)) {
+                    interleavedStorage.setFenced(ledgerId);
+                }
+
+                long lastEntryInLedger = dbEntryLocationIndex.getLastEntryInLedger(ledgerId);
+                for (long entryId = 0; entryId <= lastEntryInLedger; entryId++) {
+                    try {
+                        long location = dbEntryLocationIndex.getLocation(ledgerId, entryId);
+                        if (location != 0L) {
+                            interleavedLedgerCache.putEntryOffset(ledgerId, entryId, location);
+                        }
+                    } catch (Bookie.NoEntryException e) {
+                        // Ignore entry
+                    }
+                }
+
+                if (++convertedLedgers % 1000 == 0) {
+                    LOG.info("Converted {} ledgers", convertedLedgers);
+                }
+            }
+
+            dbStorage.shutdown();
+
+            interleavedLedgerCache.flushLedger(true);
+            interleavedStorage.flush();
+            interleavedStorage.shutdown();
+
+            String baseDir = ledgerDirsManager.getAllLedgerDirs().get(0).toString();
+
+            // Rename databases and keep backup
+            Files.move(FileSystems.getDefault().getPath(baseDir, "ledgers"),
+                    FileSystems.getDefault().getPath(baseDir, "ledgers.backup"));
+
+            Files.move(FileSystems.getDefault().getPath(baseDir, "locations"),
+                    FileSystems.getDefault().getPath(baseDir, "locations.backup"));
+
+            LOG.info("---- Done Converting {} ledgers ----", convertedLedgers);
+            return 0;
+        }
+    }
+
+    /**
+     * Rebuild DbLedgerStorage locations index.
+     */
+    class RebuildDbLedgerLocationsIndexCmd extends MyCommand {
+        Options opts = new Options();
+
+        public RebuildDbLedgerLocationsIndexCmd() {
+            super(CMD_REBUILD_DB_LEDGER_LOCATIONS_INDEX);
+        }
+
+        @Override
+        Options getOptions() {
+            return opts;
+        }
+
+        @Override
+        String getDescription() {
+            return "Rebuild DbLedgerStorage locations index by scanning the entry logs";
+        }
+
+        String getUsage() {
+            return CMD_REBUILD_DB_LEDGER_LOCATIONS_INDEX;
+        }
+
+        @Override
+        int runCmd(CommandLine cmdLine) throws Exception {
+            LOG.info("=== Rebuilding bookie index ===");
+            ServerConfiguration conf = new ServerConfiguration(bkConf);
+            new LocationsIndexRebuildOp(conf).initiate();
+            LOG.info("-- Done rebuilding bookie index --");
+            return 0;
+        }
+    }
+
     final Map<String, MyCommand> commands = new HashMap<String, MyCommand>();
     {
         commands.put(CMD_METAFORMAT, new MetaFormatCmd());
@@ -2145,6 +2388,9 @@ public int runCmd(CommandLine cmdLine) throws Exception {
         commands.put(CMD_DELETELEDGER, new DeleteLedgerCmd());
         commands.put(CMD_BOOKIEINFO, new BookieInfoCmd());
         commands.put(CMD_DECOMMISSIONBOOKIE, new DecommissionBookieCmd());
+        commands.put(CMD_CONVERT_TO_DB_STORAGE, new ConvertToDbStorageCmd());
+        commands.put(CMD_CONVERT_TO_INTERLEAVED_STORAGE, new ConvertToInterleavedStorageCmd());
+        commands.put(CMD_REBUILD_DB_LEDGER_LOCATIONS_INDEX, new RebuildDbLedgerLocationsIndexCmd());
         commands.put(CMD_HELP, new HelpCmd());
         commands.put(CMD_LOSTBOOKIERECOVERYDELAY, new LostBookieRecoveryDelayCmd());
         commands.put(CMD_TRIGGERAUDIT, new TriggerAuditCmd());
@@ -2439,6 +2685,66 @@ protected void readLedgerIndexEntries(long ledgerId) throws IOException {
         }
     }
 
+    /**
+     * Get an iterable over pages of entries and locations for a ledger.
+     *
+     * @param ledgerId
+     * @return
+     * @throws IOException
+     */
+    protected Iterable<SortedMap<Long, Long>> getLedgerIndexEntries(final long ledgerId) throws IOException {
+        final FileInfo fi = getFileInfo(ledgerId);
+        final long size = fi.size();
+
+        final LedgerEntryPage lep = new LedgerEntryPage(pageSize, entriesPerPage);
+        lep.usePage();
+
+        final Iterator<SortedMap<Long, Long>> iterator = new Iterator<SortedMap<Long, Long>>() {
+            long curSize = 0;
+            long curEntry = 0;
+
+            @Override
+            public boolean hasNext() {
+                return curSize < size;
+            }
+
+            @Override
+            public SortedMap<Long, Long> next() {
+                SortedMap<Long, Long> entries = Maps.newTreeMap();
+                lep.setLedgerAndFirstEntry(ledgerId, curEntry);
+                try {
+                    lep.readPage(fi);
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+
+                // process a page
+                for (int i = 0; i < entriesPerPage; i++) {
+                    long offset = lep.getOffset(i * 8);
+                    if (offset != 0) {
+                        entries.put(curEntry, offset);
+                    }
+                    ++curEntry;
+                }
+
+                curSize += pageSize;
+                return entries;
+            }
+
+            @Override
+            public void remove() {
+                throw new RuntimeException("Cannot remove");
+            }
+
+        };
+
+        return new Iterable<SortedMap<Long, Long>>() {
+            public Iterator<SortedMap<Long, Long>> iterator() {
+                return iterator;
+            }
+        };
+    }
+
     /**
      * Scan over an entry log file.
      *
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
index d2291513d..cdb11a600 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
@@ -26,6 +26,7 @@
 import static org.apache.bookkeeper.util.BookKeeperConstants.MAX_LOG_SIZE_LIMIT;
 
 import com.google.common.collect.MapMaker;
+import com.google.common.collect.Sets;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
@@ -40,6 +41,7 @@
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
+import java.io.FilenameFilter;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.OutputStreamWriter;
@@ -52,6 +54,7 @@
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -203,7 +206,7 @@ public ConcurrentLongLongHashMap getLedgersMap() {
     /**
      * Scan entries in a entry log file.
      */
-    interface EntryLogScanner {
+    public interface EntryLogScanner {
         /**
          * Tests whether or not the entries belongs to the specified ledger
          * should be processed.
@@ -837,7 +840,7 @@ void flushRotatedLogs() throws IOException {
         leastUnflushedLogId = flushedLogId + 1;
     }
 
-    void flush() throws IOException {
+    public void flush() throws IOException {
         flushRotatedLogs();
         flushCurrentLog();
     }
@@ -867,7 +870,7 @@ protected ByteBuf initialValue() throws Exception {
         }
     };
 
-    synchronized long addEntry(long ledger, ByteBuf entry, boolean rollLog) throws IOException {
+    public synchronized long addEntry(long ledger, ByteBuf entry, boolean rollLog) throws IOException {
         int entrySize = entry.readableBytes() + 4; // Adding 4 bytes to prepend the size
         boolean reachEntryLogLimit =
             rollLog ? reachEntryLogLimit(entrySize) : readEntryLogHardLimit(entrySize);
@@ -963,7 +966,6 @@ void removeCurCompactionLog() {
         }
     }
 
-
     private void incrementBytesWrittenAndMaybeFlush(long bytesWritten) throws IOException {
         if (!doRegularFlushes) {
             return;
@@ -986,7 +988,8 @@ synchronized boolean readEntryLogHardLimit(long size) {
         return logChannel.position() + size > Integer.MAX_VALUE;
     }
 
-    ByteBuf readEntry(long ledgerId, long entryId, long location) throws IOException, Bookie.NoEntryException {
+    public ByteBuf internalReadEntry(long ledgerId, long entryId, long location)
+            throws IOException, Bookie.NoEntryException {
         long entryLogId = logIdForOffset(location);
         long pos = location & 0xffffffffL;
         ByteBuf sizeBuff = sizeBuffer.get();
@@ -1035,6 +1038,15 @@ ByteBuf readEntry(long ledgerId, long entryId, long location) throws IOException
                                               + pos + "(" + rc + "!=" + entrySize + ")", ledgerId, entryId);
         }
         data.writerIndex(entrySize);
+
+        return data;
+    }
+
+    public ByteBuf readEntry(long ledgerId, long entryId, long location) throws IOException, Bookie.NoEntryException {
+        long entryLogId = logIdForOffset(location);
+        long pos = location & 0xffffffffL;
+
+        ByteBuf data = internalReadEntry(ledgerId, entryId, location);
         long thisLedgerId = data.getLong(0);
         if (thisLedgerId != ledgerId) {
             data.release();
@@ -1112,6 +1124,35 @@ boolean logExists(long logId) {
         return false;
     }
 
+    /**
+     * Returns a set with the ids of all the entry log files.
+     *
+     * @throws IOException
+     */
+    public Set<Long> getEntryLogsSet() throws IOException {
+        Set<Long> entryLogs = Sets.newTreeSet();
+
+        final FilenameFilter logFileFilter = new FilenameFilter() {
+            @Override
+            public boolean accept(File dir, String name) {
+                return name.endsWith(".log");
+            }
+        };
+
+        for (File d : ledgerDirsManager.getAllLedgerDirs()) {
+            File[] files = d.listFiles(logFileFilter);
+            if (files == null) {
+                throw new IOException("Failed to get list of files in directory " + d);
+            }
+
+            for (File f : files) {
+                Long entryLogId = Long.parseLong(f.getName().split(".log")[0], 16);
+                entryLogs.add(entryLogId);
+            }
+        }
+        return entryLogs;
+    }
+
     private File findFile(long logId) throws FileNotFoundException {
         for (File d : ledgerDirsManager.getAllLedgerDirs()) {
             File f = new File(d, Long.toHexString(logId) + ".log");
@@ -1129,7 +1170,7 @@ private File findFile(long logId) throws FileNotFoundException {
      * @param scanner Entry Log Scanner
      * @throws IOException
      */
-    protected void scanEntryLog(long entryLogId, EntryLogScanner scanner) throws IOException {
+    public void scanEntryLog(long entryLogId, EntryLogScanner scanner) throws IOException {
         // Buffer where to read the entrySize (4 bytes) and the ledgerId (8 bytes)
         ByteBuf headerBuffer = Unpooled.buffer(4 + 8);
         BufferedReadChannel bc;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
index 2b357eee4..b5c951372 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
@@ -25,6 +25,7 @@
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.STORAGE_GET_ENTRY;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.STORAGE_GET_OFFSET;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 
 import io.netty.buffer.ByteBuf;
@@ -83,7 +84,8 @@
     private OpStatsLogger getOffsetStats;
     private OpStatsLogger getEntryStats;
 
-    InterleavedLedgerStorage() {
+    @VisibleForTesting
+    public InterleavedLedgerStorage() {
         activeLedgers = new SnapshotMap<Long, Boolean>();
     }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ArrayGroupSort.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ArrayGroupSort.java
new file mode 100644
index 000000000..719b33dbc
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ArrayGroupSort.java
@@ -0,0 +1,100 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+/**
+ * Sort an array of longs, grouping the items in tuples.
+ *
+ * <p>Group size decides how many longs are included in the tuples and key size controls how many items to use for
+ * comparison.
+ */
+public class ArrayGroupSort {
+
+    private final int keySize;
+    private final int groupSize;
+
+    public ArrayGroupSort(int keySize, int groupSize) {
+        checkArgument(keySize > 0);
+        checkArgument(groupSize > 0);
+        checkArgument(keySize <= groupSize, "keySize need to be less or equal the groupSize");
+        this.keySize = keySize;
+        this.groupSize = groupSize;
+    }
+
+    public void sort(long[] array) {
+        sort(array, 0, array.length);
+    }
+
+    public void sort(long[] array, int offset, int length) {
+        checkArgument(length % groupSize == 0, "Array length must be multiple of groupSize");
+        quickSort(array, offset, (length + offset - groupSize));
+    }
+
+    ////// Private
+
+    private void quickSort(long array[], int low, int high) {
+        if (low < high) {
+            int pivotIdx = partition(array, low, high);
+            quickSort(array, low, pivotIdx - groupSize);
+            quickSort(array, pivotIdx + groupSize, high);
+        }
+    }
+
+    private int partition(long array[], int low, int high) {
+        int pivotIdx = high;
+        int i = low;
+
+        for (int j = low; j < high; j += groupSize) {
+            if (isLess(array, j, pivotIdx)) {
+                swap(array, j, i);
+                i += groupSize;
+            }
+        }
+
+        swap(array, i, high);
+        return i;
+    }
+
+    private void swap(long array[], int a, int b) {
+        long tmp;
+        for (int k = 0; k < groupSize; k++) {
+            tmp = array[a + k];
+            array[a + k] = array[b + k];
+            array[b + k] = tmp;
+        }
+    }
+
+    private boolean isLess(long array[], int idx1, int idx2) {
+        for (int i = 0; i < keySize; i++) {
+            long k1 = array[idx1 + i];
+            long k2 = array[idx2 + i];
+            if (k1 < k2) {
+                return true;
+            } else if (k1 > k2) {
+                return false;
+            }
+        }
+
+        return false;
+    }
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ArrayUtil.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ArrayUtil.java
new file mode 100644
index 000000000..67e98f265
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ArrayUtil.java
@@ -0,0 +1,72 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+// CHECKSTYLE.OFF: IllegalImport
+import io.netty.util.internal.PlatformDependent;
+// CHECKSTYLE.ON: IllegalImport
+
+import java.nio.ByteOrder;
+
+/**
+ * Utility to serialize/deserialize longs into byte arrays.
+ */
+class ArrayUtil {
+
+    private static final boolean UNALIGNED = PlatformDependent.isUnaligned();
+    private static final boolean HAS_UNSAFE = PlatformDependent.hasUnsafe();
+    private static final boolean BIG_ENDIAN_NATIVE_ORDER = ByteOrder.nativeOrder() == ByteOrder.BIG_ENDIAN;
+
+    public static long getLong(byte[] array, int index) {
+        if (HAS_UNSAFE && UNALIGNED) {
+            long v = PlatformDependent.getLong(array, index);
+            return BIG_ENDIAN_NATIVE_ORDER ? v : Long.reverseBytes(v);
+        }
+
+        return ((long) array[index] & 0xff) << 56 | //
+                ((long) array[index + 1] & 0xff) << 48 | //
+                ((long) array[index + 2] & 0xff) << 40 | //
+                ((long) array[index + 3] & 0xff) << 32 | //
+                ((long) array[index + 4] & 0xff) << 24 | //
+                ((long) array[index + 5] & 0xff) << 16 | //
+                ((long) array[index + 6] & 0xff) << 8 | //
+                (long) array[index + 7] & 0xff;
+    }
+
+    public static void setLong(byte[] array, int index, long value) {
+        if (HAS_UNSAFE && UNALIGNED) {
+            PlatformDependent.putLong(array, index, BIG_ENDIAN_NATIVE_ORDER ? value : Long.reverseBytes(value));
+        } else {
+            array[index] = (byte) (value >>> 56);
+            array[index + 1] = (byte) (value >>> 48);
+            array[index + 2] = (byte) (value >>> 40);
+            array[index + 3] = (byte) (value >>> 32);
+            array[index + 4] = (byte) (value >>> 24);
+            array[index + 5] = (byte) (value >>> 16);
+            array[index + 6] = (byte) (value >>> 8);
+            array[index + 7] = (byte) value;
+        }
+    }
+
+    public static final boolean isArrayAllZeros(final byte[] array) {
+        return PlatformDependent.isZero(array, 0, array.length);
+    }
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
new file mode 100644
index 000000000..dd1461909
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
@@ -0,0 +1,792 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.protobuf.ByteString;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.util.concurrent.DefaultThreadFactory;
+
+import java.io.IOException;
+import java.util.Observable;
+import java.util.Observer;
+import java.util.SortedMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.Bookie.NoEntryException;
+import org.apache.bookkeeper.bookie.BookieException;
+import org.apache.bookkeeper.bookie.CheckpointSource;
+import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
+import org.apache.bookkeeper.bookie.Checkpointer;
+import org.apache.bookkeeper.bookie.CompactableLedgerStorage;
+import org.apache.bookkeeper.bookie.EntryLocation;
+import org.apache.bookkeeper.bookie.EntryLogger;
+import org.apache.bookkeeper.bookie.GarbageCollectorThread;
+import org.apache.bookkeeper.bookie.LedgerDirsManager;
+import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorageDataFormats.LedgerData;
+import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorage.Batch;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.proto.BookieProtocol;
+import org.apache.bookkeeper.stats.Gauge;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.util.MathUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of LedgerStorage that uses RocksDB to keep the indexes for
+ * entries stored in EntryLogs.
+ */
+public class DbLedgerStorage implements CompactableLedgerStorage {
+
+    private EntryLogger entryLogger;
+
+    private LedgerMetadataIndex ledgerIndex;
+    private EntryLocationIndex entryLocationIndex;
+
+    private GarbageCollectorThread gcThread;
+
+    // Write cache where all new entries are inserted into
+    protected WriteCache writeCache;
+
+    // Write cache that is used to swap with writeCache during flushes
+    protected WriteCache writeCacheBeingFlushed;
+
+    // Cache where we insert entries for speculative reading
+    private ReadCache readCache;
+
+    private final ReentrantReadWriteLock writeCacheMutex = new ReentrantReadWriteLock();
+    private final Condition flushWriteCacheCondition = writeCacheMutex.writeLock().newCondition();
+
+    private final ReentrantLock flushMutex = new ReentrantLock();
+
+    protected final AtomicBoolean hasFlushBeenTriggered = new AtomicBoolean(false);
+    private final AtomicBoolean isFlushOngoing = new AtomicBoolean(false);
+
+    private final ExecutorService executor = Executors.newSingleThreadExecutor(new DefaultThreadFactory("db-storage"));
+
+    // Executor used to for db index cleanup
+    private final ExecutorService cleanupExecutor = Executors
+            .newSingleThreadExecutor(new DefaultThreadFactory("db-storage-cleanup"));
+
+    static final String WRITE_CACHE_MAX_SIZE_MB = "dbStorage_writeCacheMaxSizeMb";
+    static final String READ_AHEAD_CACHE_BATCH_SIZE = "dbStorage_readAheadCacheBatchSize";
+    static final String READ_AHEAD_CACHE_MAX_SIZE_MB = "dbStorage_readAheadCacheMaxSizeMb";
+
+    private static final long DEFAULT_WRITE_CACHE_MAX_SIZE_MB = 16;
+    private static final long DEFAULT_READ_CACHE_MAX_SIZE_MB = 16;
+    private static final int DEFAULT_READ_AHEAD_CACHE_BATCH_SIZE = 100;
+
+    private static final int MB = 1024 * 1024;
+
+    private final CopyOnWriteArrayList<LedgerDeletionListener> ledgerDeletionListeners = Lists
+            .newCopyOnWriteArrayList();
+
+    private long writeCacheMaxSize;
+
+    private CheckpointSource checkpointSource = null;
+    private Checkpoint lastCheckpoint = Checkpoint.MIN;
+
+    private long readCacheMaxSize;
+    private int readAheadCacheBatchSize;
+
+    private StatsLogger stats;
+
+    private OpStatsLogger addEntryStats;
+    private OpStatsLogger readEntryStats;
+    private OpStatsLogger readCacheHitStats;
+    private OpStatsLogger readCacheMissStats;
+    private OpStatsLogger readAheadBatchCountStats;
+    private OpStatsLogger readAheadBatchSizeStats;
+    private OpStatsLogger flushStats;
+    private OpStatsLogger flushSizeStats;
+
+    @Override
+    public void initialize(ServerConfiguration conf, LedgerManager ledgerManager, LedgerDirsManager ledgerDirsManager,
+            LedgerDirsManager indexDirsManager, CheckpointSource checkpointSource, Checkpointer checkpointer,
+            StatsLogger statsLogger) throws IOException {
+        checkArgument(ledgerDirsManager.getAllLedgerDirs().size() == 1,
+                "Db implementation only allows for one storage dir");
+
+        String baseDir = ledgerDirsManager.getAllLedgerDirs().get(0).toString();
+
+        writeCacheMaxSize = conf.getLong(WRITE_CACHE_MAX_SIZE_MB, DEFAULT_WRITE_CACHE_MAX_SIZE_MB) * MB;
+
+        writeCache = new WriteCache(writeCacheMaxSize / 2);
+        writeCacheBeingFlushed = new WriteCache(writeCacheMaxSize / 2);
+
+        this.checkpointSource = checkpointSource;
+
+        readCacheMaxSize = conf.getLong(READ_AHEAD_CACHE_MAX_SIZE_MB, DEFAULT_READ_CACHE_MAX_SIZE_MB) * MB;
+        readAheadCacheBatchSize = conf.getInt(READ_AHEAD_CACHE_BATCH_SIZE, DEFAULT_READ_AHEAD_CACHE_BATCH_SIZE);
+
+        readCache = new ReadCache(readCacheMaxSize);
+
+        this.stats = statsLogger;
+
+        log.info("Started Db Ledger Storage");
+        log.info(" - Write cache size: {} MB", writeCacheMaxSize / MB);
+        log.info(" - Read Cache: {} MB", readCacheMaxSize / MB);
+        log.info(" - Read Ahead Batch size: : {}", readAheadCacheBatchSize);
+
+        ledgerIndex = new LedgerMetadataIndex(conf, KeyValueStorageRocksDB.factory, baseDir, stats);
+        entryLocationIndex = new EntryLocationIndex(conf, KeyValueStorageRocksDB.factory, baseDir, stats);
+
+        entryLogger = new EntryLogger(conf, ledgerDirsManager);
+        gcThread = new GarbageCollectorThread(conf, ledgerManager, this);
+
+        registerStats();
+    }
+
+    public void registerStats() {
+        stats.registerGauge("write-cache-size", new Gauge<Long>() {
+            @Override
+            public Long getDefaultValue() {
+                return 0L;
+            }
+
+            @Override
+            public Long getSample() {
+                return writeCache.size() + writeCacheBeingFlushed.size();
+            }
+        });
+        stats.registerGauge("write-cache-count", new Gauge<Long>() {
+            @Override
+            public Long getDefaultValue() {
+                return 0L;
+            }
+
+            @Override
+            public Long getSample() {
+                return writeCache.count() + writeCacheBeingFlushed.count();
+            }
+        });
+        stats.registerGauge("read-cache-size", new Gauge<Long>() {
+            @Override
+            public Long getDefaultValue() {
+                return 0L;
+            }
+
+            @Override
+            public Long getSample() {
+                return readCache.size();
+            }
+        });
+        stats.registerGauge("read-cache-count", new Gauge<Long>() {
+            @Override
+            public Long getDefaultValue() {
+                return 0L;
+            }
+
+            @Override
+            public Long getSample() {
+                return readCache.count();
+            }
+        });
+
+        addEntryStats = stats.getOpStatsLogger("add-entry");
+        readEntryStats = stats.getOpStatsLogger("read-entry");
+        readCacheHitStats = stats.getOpStatsLogger("read-cache-hits");
+        readCacheMissStats = stats.getOpStatsLogger("read-cache-misses");
+        readAheadBatchCountStats = stats.getOpStatsLogger("readahead-batch-count");
+        readAheadBatchSizeStats = stats.getOpStatsLogger("readahead-batch-size");
+        flushStats = stats.getOpStatsLogger("flush");
+        flushSizeStats = stats.getOpStatsLogger("flush-size");
+    }
+
+    @Override
+    public void start() {
+        gcThread.start();
+    }
+
+    @Override
+    public void shutdown() throws InterruptedException {
+        try {
+            flush();
+
+            gcThread.shutdown();
+            entryLogger.shutdown();
+
+            cleanupExecutor.shutdown();
+            cleanupExecutor.awaitTermination(1, TimeUnit.SECONDS);
+
+            ledgerIndex.close();
+            entryLocationIndex.close();
+
+            writeCache.close();
+            writeCacheBeingFlushed.close();
+            readCache.close();
+            executor.shutdown();
+
+        } catch (IOException e) {
+            log.error("Error closing db storage", e);
+        }
+    }
+
+    @Override
+    public boolean ledgerExists(long ledgerId) throws IOException {
+        try {
+            LedgerData ledgerData = ledgerIndex.get(ledgerId);
+            if (log.isDebugEnabled()) {
+                log.debug("Ledger exists. ledger: {} : {}", ledgerId, ledgerData.getExists());
+            }
+            return ledgerData.getExists();
+        } catch (Bookie.NoLedgerException nle) {
+            // ledger does not exist
+            return false;
+        }
+    }
+
+    @Override
+    public boolean isFenced(long ledgerId) throws IOException {
+        if (log.isDebugEnabled()) {
+            log.debug("isFenced. ledger: {}", ledgerId);
+        }
+        return ledgerIndex.get(ledgerId).getFenced();
+    }
+
+    @Override
+    public boolean setFenced(long ledgerId) throws IOException {
+        if (log.isDebugEnabled()) {
+            log.debug("Set fenced. ledger: {}", ledgerId);
+        }
+        return ledgerIndex.setFenced(ledgerId);
+    }
+
+    @Override
+    public void setMasterKey(long ledgerId, byte[] masterKey) throws IOException {
+        if (log.isDebugEnabled()) {
+            log.debug("Set master key. ledger: {}", ledgerId);
+        }
+        ledgerIndex.setMasterKey(ledgerId, masterKey);
+    }
+
+    @Override
+    public byte[] readMasterKey(long ledgerId) throws IOException, BookieException {
+        if (log.isDebugEnabled()) {
+            log.debug("Read master key. ledger: {}", ledgerId);
+        }
+        return ledgerIndex.get(ledgerId).getMasterKey().toByteArray();
+    }
+
+    @Override
+    public long addEntry(ByteBuf entry) throws IOException {
+        long startTime = MathUtils.nowInNano();
+
+        long ledgerId = entry.readLong();
+        long entryId = entry.readLong();
+        entry.resetReaderIndex();
+
+        if (log.isDebugEnabled()) {
+            log.debug("Add entry. {}@{}", ledgerId, entryId);
+        }
+
+        // Waits if the write cache is being switched for a flush
+        writeCacheMutex.readLock().lock();
+        boolean inserted;
+        try {
+            inserted = writeCache.put(ledgerId, entryId, entry);
+        } finally {
+            writeCacheMutex.readLock().unlock();
+        }
+
+        if (!inserted) {
+            triggerFlushAndAddEntry(ledgerId, entryId, entry);
+        }
+
+        recordSuccessfulEvent(addEntryStats, startTime);
+        return entryId;
+    }
+
+    private void triggerFlushAndAddEntry(long ledgerId, long entryId, ByteBuf entry) throws IOException {
+        // Write cache is full, we need to trigger a flush so that it gets rotated
+        writeCacheMutex.writeLock().lock();
+
+        try {
+            // If the flush has already been triggered or flush has already switched the
+            // cache, we don't need to
+            // trigger another flush
+            if (!isFlushOngoing.get() && hasFlushBeenTriggered.compareAndSet(false, true)) {
+                // Trigger an early flush in background
+                log.info("Write cache is full, triggering flush");
+                executor.execute(() -> {
+                    try {
+                        flush();
+                    } catch (IOException e) {
+                        log.error("Error during flush", e);
+                    }
+                });
+            }
+
+            long timeoutNs = TimeUnit.MILLISECONDS.toNanos(100);
+            while (hasFlushBeenTriggered.get()) {
+                if (timeoutNs <= 0L) {
+                    throw new IOException("Write cache was not trigger within the timeout, cannot add entry " + ledgerId
+                            + "@" + entryId);
+                }
+                timeoutNs = flushWriteCacheCondition.awaitNanos(timeoutNs);
+            }
+
+            if (!writeCache.put(ledgerId, entryId, entry)) {
+                // Still wasn't able to cache entry
+                throw new IOException("Error while inserting entry in write cache" + ledgerId + "@" + entryId);
+            }
+
+        } catch (InterruptedException e) {
+            throw new IOException("Interrupted when adding entry " + ledgerId + "@" + entryId);
+        } finally {
+            writeCacheMutex.writeLock().unlock();
+        }
+    }
+
+    @Override
+    public ByteBuf getEntry(long ledgerId, long entryId) throws IOException {
+        long startTime = MathUtils.nowInNano();
+        if (log.isDebugEnabled()) {
+            log.debug("Get Entry: {}@{}", ledgerId, entryId);
+        }
+
+        if (entryId == BookieProtocol.LAST_ADD_CONFIRMED) {
+            return getLastEntry(ledgerId);
+        }
+
+        writeCacheMutex.readLock().lock();
+        try {
+            // First try to read from the write cache of recent entries
+            ByteBuf entry = writeCache.get(ledgerId, entryId);
+            if (entry != null) {
+                recordSuccessfulEvent(readCacheHitStats, startTime);
+                recordSuccessfulEvent(readEntryStats, startTime);
+                return entry;
+            }
+
+            // If there's a flush going on, the entry might be in the flush buffer
+            entry = writeCacheBeingFlushed.get(ledgerId, entryId);
+            if (entry != null) {
+                recordSuccessfulEvent(readCacheHitStats, startTime);
+                recordSuccessfulEvent(readEntryStats, startTime);
+                return entry;
+            }
+        } finally {
+            writeCacheMutex.readLock().unlock();
+        }
+
+        // Try reading from read-ahead cache
+        ByteBuf entry = readCache.get(ledgerId, entryId);
+        if (entry != null) {
+            recordSuccessfulEvent(readCacheHitStats, startTime);
+            recordSuccessfulEvent(readEntryStats, startTime);
+            return entry;
+        }
+
+        // Read from main storage
+        long entryLocation;
+        try {
+            entryLocation = entryLocationIndex.getLocation(ledgerId, entryId);
+            if (entryLocation == 0) {
+                throw new NoEntryException(ledgerId, entryId);
+            }
+            entry = entryLogger.readEntry(ledgerId, entryId, entryLocation);
+        } catch (NoEntryException e) {
+            recordFailedEvent(readEntryStats, startTime);
+            throw e;
+        }
+
+        readCache.put(ledgerId, entryId, entry);
+
+        // Try to read more entries
+        long nextEntryLocation = entryLocation + 4 /* size header */ + entry.readableBytes();
+        fillReadAheadCache(ledgerId, entryId + 1, nextEntryLocation);
+
+        recordSuccessfulEvent(readCacheMissStats, startTime);
+        recordSuccessfulEvent(readEntryStats, startTime);
+        return entry;
+    }
+
+    private void fillReadAheadCache(long orginalLedgerId, long firstEntryId, long firstEntryLocation) {
+        try {
+            long firstEntryLogId = (firstEntryLocation >> 32);
+            long currentEntryLogId = firstEntryLogId;
+            long currentEntryLocation = firstEntryLocation;
+            int count = 0;
+            long size = 0;
+
+            while (count < readAheadCacheBatchSize && currentEntryLogId == firstEntryLogId) {
+                ByteBuf entry = entryLogger.internalReadEntry(orginalLedgerId, -1, currentEntryLocation);
+
+                try {
+                    long currentEntryLedgerId = entry.getLong(0);
+                    long currentEntryId = entry.getLong(8);
+
+                    if (currentEntryLedgerId != orginalLedgerId) {
+                        // Found an entry belonging to a different ledger, stopping read-ahead
+                        entry.release();
+                        return;
+                    }
+
+                    // Insert entry in read cache
+                    readCache.put(orginalLedgerId, currentEntryId, entry);
+
+                    count++;
+                    size += entry.readableBytes();
+
+                    currentEntryLocation += 4 + entry.readableBytes();
+                    currentEntryLogId = currentEntryLocation >> 32;
+                } finally {
+                    entry.release();
+                }
+            }
+
+            readAheadBatchCountStats.registerSuccessfulValue(count);
+            readAheadBatchSizeStats.registerSuccessfulValue(size);
+        } catch (Exception e) {
+            if (log.isDebugEnabled()) {
+                log.debug("Exception during read ahead for ledger: {}: e", orginalLedgerId, e);
+            }
+        }
+    }
+
+    public ByteBuf getLastEntry(long ledgerId) throws IOException {
+        long startTime = MathUtils.nowInNano();
+
+        writeCacheMutex.readLock().lock();
+        try {
+            // First try to read from the write cache of recent entries
+            ByteBuf entry = writeCache.getLastEntry(ledgerId);
+            if (entry != null) {
+                if (log.isDebugEnabled()) {
+                    long foundLedgerId = entry.readLong(); // ledgedId
+                    long entryId = entry.readLong();
+                    entry.resetReaderIndex();
+                    if (log.isDebugEnabled()) {
+                        log.debug("Found last entry for ledger {} in write cache: {}@{}", ledgerId, foundLedgerId,
+                                entryId);
+                    }
+                }
+
+                recordSuccessfulEvent(readCacheHitStats, startTime);
+                recordSuccessfulEvent(readEntryStats, startTime);
+                return entry;
+            }
+
+            // If there's a flush going on, the entry might be in the flush buffer
+            entry = writeCacheBeingFlushed.getLastEntry(ledgerId);
+            if (entry != null) {
+                if (log.isDebugEnabled()) {
+                    entry.readLong(); // ledgedId
+                    long entryId = entry.readLong();
+                    entry.resetReaderIndex();
+                    if (log.isDebugEnabled()) {
+                        log.debug("Found last entry for ledger {} in write cache being flushed: {}", ledgerId, entryId);
+                    }
+                }
+
+                recordSuccessfulEvent(readCacheHitStats, startTime);
+                recordSuccessfulEvent(readEntryStats, startTime);
+                return entry;
+            }
+        } finally {
+            writeCacheMutex.readLock().unlock();
+        }
+
+        // Search the last entry in storage
+        long lastEntryId = entryLocationIndex.getLastEntryInLedger(ledgerId);
+        if (log.isDebugEnabled()) {
+            log.debug("Found last entry for ledger {} in db: {}", ledgerId, lastEntryId);
+        }
+
+        long entryLocation = entryLocationIndex.getLocation(ledgerId, lastEntryId);
+        ByteBuf content = entryLogger.readEntry(ledgerId, lastEntryId, entryLocation);
+
+        recordSuccessfulEvent(readCacheMissStats, startTime);
+        recordSuccessfulEvent(readEntryStats, startTime);
+        return content;
+    }
+
+    @VisibleForTesting
+    boolean isFlushRequired() {
+        writeCacheMutex.readLock().lock();
+        try {
+            return !writeCache.isEmpty();
+        } finally {
+            writeCacheMutex.readLock().unlock();
+        }
+    }
+
+    @Override
+    public void checkpoint(Checkpoint checkpoint) throws IOException {
+        Checkpoint thisCheckpoint = checkpointSource.newCheckpoint();
+        if (lastCheckpoint.compareTo(checkpoint) > 0) {
+            return;
+        }
+
+        long startTime = MathUtils.nowInNano();
+
+        // Only a single flush operation can happen at a time
+        flushMutex.lock();
+
+        try {
+            // Swap the write cache so that writes can continue to happen while the flush is
+            // ongoing
+            swapWriteCache();
+
+            long sizeToFlush = writeCacheBeingFlushed.size();
+            if (log.isDebugEnabled()) {
+                log.debug("Flushing entries. count: {} -- size {} Mb", writeCacheBeingFlushed.count(),
+                        sizeToFlush / 1024.0 / 1024);
+            }
+
+            // Write all the pending entries into the entry logger and collect the offset
+            // position for each entry
+
+            Batch batch = entryLocationIndex.newBatch();
+            writeCacheBeingFlushed.forEach((ledgerId, entryId, entry) -> {
+                try {
+                    long location = entryLogger.addEntry(ledgerId, entry, true);
+                    entryLocationIndex.addLocation(batch, ledgerId, entryId, location);
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            });
+
+            entryLogger.flush();
+
+            long batchFlushStarTime = System.nanoTime();
+            batch.flush();
+            batch.close();
+            if (log.isDebugEnabled()) {
+                log.debug("DB batch flushed time : {} s",
+                        MathUtils.elapsedNanos(batchFlushStarTime) / (double) TimeUnit.SECONDS.toNanos(1));
+            }
+
+            ledgerIndex.flush();
+
+            cleanupExecutor.execute(() -> {
+                try {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Removing deleted ledgers from db indexes");
+                    }
+
+                    entryLocationIndex.removeOffsetFromDeletedLedgers();
+                    ledgerIndex.removeDeletedLedgers();
+                } catch (Throwable t) {
+                    log.warn("Failed to cleanup db indexes", t);
+                }
+            });
+
+            lastCheckpoint = thisCheckpoint;
+
+            // Discard all the entry from the write cache, since they're now persisted
+            writeCacheBeingFlushed.clear();
+
+            double flushTimeSeconds = MathUtils.elapsedNanos(startTime) / (double) TimeUnit.SECONDS.toNanos(1);
+            double flushThroughput = sizeToFlush / 1024.0 / 1024.0 / flushTimeSeconds;
+
+            if (log.isDebugEnabled()) {
+                log.debug("Flushing done time {} s -- Written {} MB/s", flushTimeSeconds, flushThroughput);
+            }
+
+            recordSuccessfulEvent(flushStats, startTime);
+            flushSizeStats.registerSuccessfulValue(sizeToFlush);
+        } catch (IOException e) {
+            // Leave IOExecption as it is
+            throw e;
+        } catch (RuntimeException e) {
+            // Wrap unchecked exceptions
+            throw new IOException(e);
+        } finally {
+            try {
+                isFlushOngoing.set(false);
+            } finally {
+                flushMutex.unlock();
+            }
+        }
+    }
+
+    /**
+     * Swap the current write cache with the replacement cache.
+     */
+    private void swapWriteCache() {
+        writeCacheMutex.writeLock().lock();
+        try {
+            // First, swap the current write-cache map with an empty one so that writes will
+            // go on unaffected. Only a single flush is happening at the same time
+            WriteCache tmp = writeCacheBeingFlushed;
+            writeCacheBeingFlushed = writeCache;
+            writeCache = tmp;
+
+            // since the cache is switched, we can allow flush to be triggered
+            hasFlushBeenTriggered.set(false);
+            flushWriteCacheCondition.signalAll();
+        } finally {
+            try {
+                isFlushOngoing.set(true);
+            } finally {
+                writeCacheMutex.writeLock().unlock();
+            }
+        }
+    }
+
+    @Override
+    public void flush() throws IOException {
+        checkpoint(Checkpoint.MAX);
+    }
+
+    @Override
+    public void deleteLedger(long ledgerId) throws IOException {
+        if (log.isDebugEnabled()) {
+            log.debug("Deleting ledger {}", ledgerId);
+        }
+
+        // Delete entries from this ledger that are still in the write cache
+        writeCacheMutex.readLock().lock();
+        try {
+            writeCache.deleteLedger(ledgerId);
+        } finally {
+            writeCacheMutex.readLock().unlock();
+        }
+
+        entryLocationIndex.delete(ledgerId);
+        ledgerIndex.delete(ledgerId);
+
+        for (int i = 0, size = ledgerDeletionListeners.size(); i < size; i++) {
+            LedgerDeletionListener listener = ledgerDeletionListeners.get(i);
+            listener.ledgerDeleted(ledgerId);
+        }
+    }
+
+    @Override
+    public Iterable<Long> getActiveLedgersInRange(long firstLedgerId, long lastLedgerId) throws IOException {
+        return ledgerIndex.getActiveLedgersInRange(firstLedgerId, lastLedgerId);
+    }
+
+    @Override
+    public void updateEntriesLocations(Iterable<EntryLocation> locations) throws IOException {
+        // Trigger a flush to have all the entries being compacted in the db storage
+        flush();
+
+        entryLocationIndex.updateLocations(locations);
+    }
+
+    @Override
+    public EntryLogger getEntryLogger() {
+        return entryLogger;
+    }
+
+    @Override
+    public long getLastAddConfirmed(long ledgerId) throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Observable waitForLastAddConfirmedUpdate(long ledgerId, long previoisLAC, Observer observer)
+            throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void setExplicitlac(long ledgerId, ByteBuf lac) throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public ByteBuf getExplicitLac(long ledgerId) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void flushEntriesLocationsIndex() throws IOException {
+        // No-op. Location index is already flushed in updateEntriesLocations() call
+    }
+
+    /**
+     * Add an already existing ledger to the index.
+     *
+     * <p>This method is only used as a tool to help the migration from
+     * InterleaveLedgerStorage to DbLedgerStorage
+     *
+     * @param ledgerId
+     *            the ledger id
+     * @param entries
+     *            a map of entryId -> location
+     * @return the number of
+     */
+    public long addLedgerToIndex(long ledgerId, boolean isFenced, byte[] masterKey,
+            Iterable<SortedMap<Long, Long>> entries) throws Exception {
+        LedgerData ledgerData = LedgerData.newBuilder().setExists(true).setFenced(isFenced)
+                .setMasterKey(ByteString.copyFrom(masterKey)).build();
+        ledgerIndex.set(ledgerId, ledgerData);
+        AtomicLong numberOfEntries = new AtomicLong();
+
+        // Iterate over all the entries pages
+        Batch batch = entryLocationIndex.newBatch();
+        entries.forEach(map -> {
+            map.forEach((entryId, location) -> {
+                try {
+                    entryLocationIndex.addLocation(batch, ledgerId, entryId, location);
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+
+                numberOfEntries.incrementAndGet();
+            });
+        });
+
+        batch.flush();
+        batch.close();
+
+        return numberOfEntries.get();
+    }
+
+    @Override
+    public void registerLedgerDeletionListener(LedgerDeletionListener listener) {
+        ledgerDeletionListeners.add(listener);
+    }
+
+    public EntryLocationIndex getEntryLocationIndex() {
+        return entryLocationIndex;
+    }
+
+    private void recordSuccessfulEvent(OpStatsLogger logger, long startTimeNanos) {
+        logger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS);
+    }
+
+    private void recordFailedEvent(OpStatsLogger logger, long startTimeNanos) {
+        logger.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS);
+    }
+
+    private static final Logger log = LoggerFactory.getLogger(DbLedgerStorage.class);
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java
new file mode 100644
index 000000000..e17221d75
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java
@@ -0,0 +1,338 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import com.google.common.collect.Iterables;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.FileSystem;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.EntryLocation;
+import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorage.Batch;
+import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorage.CloseableIterator;
+import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorageFactory.DbConfigType;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.stats.Gauge;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.util.collections.ConcurrentLongHashSet;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.time.DurationFormatUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Maintains an index of the entry locations in the EntryLogger.
+ *
+ * <p>For each ledger multiple entries are stored in the same "record", represented
+ * by the {@link LedgerIndexPage} class.
+ */
+public class EntryLocationIndex implements Closeable {
+
+    private final KeyValueStorage locationsDb;
+    private final ConcurrentLongHashSet deletedLedgers = new ConcurrentLongHashSet();
+
+    private StatsLogger stats;
+
+    public EntryLocationIndex(ServerConfiguration conf, KeyValueStorageFactory storageFactory, String basePath,
+            StatsLogger stats) throws IOException {
+        String locationsDbPath = FileSystems.getDefault().getPath(basePath, "locations").toFile().toString();
+        convertIfNeeded(locationsDbPath, conf);
+        locationsDb = storageFactory.newKeyValueStorage(locationsDbPath, DbConfigType.Huge, conf);
+        doCreateMarkerFile(locationsDbPath);
+
+        this.stats = stats;
+        registerStats();
+    }
+
+    public void registerStats() {
+        stats.registerGauge("entries-count", new Gauge<Long>() {
+            @Override
+            public Long getDefaultValue() {
+                return 0L;
+            }
+
+            @Override
+            public Long getSample() {
+                try {
+                    return locationsDb.count();
+                } catch (IOException e) {
+                    return -1L;
+                }
+            }
+        });
+    }
+
+    @Override
+    public void close() throws IOException {
+        locationsDb.close();
+    }
+
+    public long getLocation(long ledgerId, long entryId) throws IOException {
+        LongPairWrapper key = LongPairWrapper.get(ledgerId, entryId);
+        LongWrapper value = LongWrapper.get();
+
+        try {
+            if (locationsDb.get(key.array, value.array) < 0) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Entry not found {}@{} in db index", ledgerId, entryId);
+                }
+                return 0;
+            }
+
+            return value.getValue();
+        } finally {
+            key.recycle();
+            value.recycle();
+        }
+    }
+
+    public long getLastEntryInLedger(long ledgerId) throws IOException {
+        if (deletedLedgers.contains(ledgerId)) {
+            // Ledger already deleted
+            return -1;
+        }
+
+        return getLastEntryInLedgerInternal(ledgerId);
+    }
+
+    private long getLastEntryInLedgerInternal(long ledgerId) throws IOException {
+        LongPairWrapper maxEntryId = LongPairWrapper.get(ledgerId, Long.MAX_VALUE);
+
+        // Search the last entry in storage
+        Entry<byte[], byte[]> entry = locationsDb.getFloor(maxEntryId.array);
+        maxEntryId.recycle();
+
+        if (entry == null) {
+            throw new Bookie.NoEntryException(ledgerId, -1);
+        } else {
+            long foundLedgerId = ArrayUtil.getLong(entry.getKey(), 0);
+            long lastEntryId = ArrayUtil.getLong(entry.getKey(), 8);
+
+            if (foundLedgerId == ledgerId) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Found last page in storage db for ledger {} - last entry: {}", ledgerId, lastEntryId);
+                }
+                return lastEntryId;
+            } else {
+                throw new Bookie.NoEntryException(ledgerId, -1);
+            }
+        }
+    }
+
+    public void addLocation(long ledgerId, long entryId, long location) throws IOException {
+        Batch batch = locationsDb.newBatch();
+        addLocation(batch, ledgerId, entryId, location);
+        batch.flush();
+        batch.close();
+    }
+
+    public Batch newBatch() {
+        return locationsDb.newBatch();
+    }
+
+    public void addLocation(Batch batch, long ledgerId, long entryId, long location) throws IOException {
+        LongPairWrapper key = LongPairWrapper.get(ledgerId, entryId);
+        LongWrapper value = LongWrapper.get(location);
+
+        if (log.isDebugEnabled()) {
+            log.debug("Add location - ledger: {} -- entry: {} -- location: {}", ledgerId, entryId, location);
+        }
+
+        try {
+            batch.put(key.array, value.array);
+        } finally {
+            key.recycle();
+            value.recycle();
+        }
+    }
+
+    public void updateLocations(Iterable<EntryLocation> newLocations) throws IOException {
+        if (log.isDebugEnabled()) {
+            log.debug("Update locations -- {}", Iterables.size(newLocations));
+        }
+
+        Batch batch = newBatch();
+        // Update all the ledger index pages with the new locations
+        for (EntryLocation e : newLocations) {
+            if (log.isDebugEnabled()) {
+                log.debug("Update location - ledger: {} -- entry: {}", e.ledger, e.entry);
+            }
+
+            addLocation(batch, e.ledger, e.entry, e.location);
+        }
+
+        batch.flush();
+        batch.close();
+    }
+
+    public void delete(long ledgerId) throws IOException {
+        // We need to find all the LedgerIndexPage records belonging to one specific
+        // ledgers
+        deletedLedgers.add(ledgerId);
+    }
+
+    public void removeOffsetFromDeletedLedgers() throws IOException {
+        LongPairWrapper firstKeyWrapper = LongPairWrapper.get(-1, -1);
+        LongPairWrapper lastKeyWrapper = LongPairWrapper.get(-1, -1);
+        LongPairWrapper keyToDelete = LongPairWrapper.get(-1, -1);
+
+        Set<Long> ledgersToDelete = deletedLedgers.items();
+
+        if (ledgersToDelete.isEmpty()) {
+            return;
+        }
+
+        log.info("Deleting indexes for ledgers: {}", ledgersToDelete);
+        Batch batch = locationsDb.newBatch();
+
+        try {
+            for (long ledgerId : ledgersToDelete) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Deleting indexes from ledger {}", ledgerId);
+                }
+
+                firstKeyWrapper.set(ledgerId, 0);
+                lastKeyWrapper.set(ledgerId, Long.MAX_VALUE);
+
+                batch.deleteRange(firstKeyWrapper.array, lastKeyWrapper.array);
+            }
+
+            batch.flush();
+
+            // Removed from pending set
+            for (long ledgerId : ledgersToDelete) {
+                deletedLedgers.remove(ledgerId);
+            }
+        } finally {
+            firstKeyWrapper.recycle();
+            lastKeyWrapper.recycle();
+            keyToDelete.recycle();
+            batch.close();
+        }
+    }
+
+    private static final String NEW_FORMAT_MARKER = "single-location-record";
+
+    private void convertIfNeeded(String path, ServerConfiguration conf) throws IOException {
+        FileSystem fileSystem = FileSystems.getDefault();
+        final Path newFormatMarkerFile = fileSystem.getPath(path, NEW_FORMAT_MARKER);
+
+        if (!Files.exists(fileSystem.getPath(path))) {
+            // Database not existing, no need to convert
+            return;
+        } else if (Files.exists(newFormatMarkerFile)) {
+            // Database was already created with new format, no conversion needed
+            return;
+        }
+
+        // Do conversion from pages of offsets to single record
+        log.info("Converting index format to single location records: {}", path);
+        long startTime = System.nanoTime();
+
+        KeyValueStorage source = new KeyValueStorageRocksDB(path, DbConfigType.Huge, conf, true /* read-only */);
+
+        long recordsToConvert = source.count();
+        log.info("Opened existing db, starting conversion of {} records", recordsToConvert);
+
+        String targetDbPath = path + ".updated";
+        KeyValueStorage target = new KeyValueStorageRocksDB(targetDbPath, DbConfigType.Huge, conf);
+
+        double convertedRecords = 0;
+        long targetDbRecords = 0;
+
+        LongPairWrapper key = LongPairWrapper.get(0, 0);
+        LongWrapper value = LongWrapper.get();
+
+        final int progressIntervalPercent = 10; // update progress at every 10%
+        double nextUpdateAtPercent = progressIntervalPercent; // start updating at 10% completion
+
+        // Copy into new database. Write in batches to speed up the insertion
+        CloseableIterator<Entry<byte[], byte[]>> iterator = source.iterator();
+        try {
+            while (iterator.hasNext()) {
+                Entry<byte[], byte[]> entry = iterator.next();
+
+                // Add all the entries on the page
+                long ledgerId = ArrayUtil.getLong(entry.getKey(), 0);
+                long firstEntryId = ArrayUtil.getLong(entry.getKey(), 8);
+
+                byte[] page = entry.getValue();
+                for (int i = 0; i < page.length; i += 8) {
+                    long location = ArrayUtil.getLong(page, i);
+
+                    if (location != 0) {
+                        key.set(ledgerId, firstEntryId + (i / 8));
+                        value.set(location);
+                        target.put(key.array, value.array);
+                        ++targetDbRecords;
+                    }
+                }
+
+                ++convertedRecords;
+                if (recordsToConvert > 0 && convertedRecords / recordsToConvert >= nextUpdateAtPercent / 100) {
+                    // Report progress at 10 percent intervals
+                    log.info("Updated records {}/{}   {} %", convertedRecords, recordsToConvert,
+                            100.0 * convertedRecords / recordsToConvert);
+                    nextUpdateAtPercent += progressIntervalPercent;
+                }
+            }
+
+        } finally {
+            iterator.close();
+            source.close();
+
+            target.sync();
+            target.close();
+            key.recycle();
+            value.recycle();
+        }
+
+        FileUtils.deleteDirectory(new File(path));
+        Files.move(fileSystem.getPath(targetDbPath), fileSystem.getPath(path));
+
+        // Create the marked to avoid conversion next time
+        Files.createFile(newFormatMarkerFile);
+
+        log.info("Database update done. Total time: {} -- Target db records: {}",
+                DurationFormatUtils.formatDurationHMS(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime)),
+                targetDbRecords);
+    }
+
+    static void doCreateMarkerFile(String path) throws IOException {
+        try {
+            Files.createFile(FileSystems.getDefault().getPath(path, NEW_FORMAT_MARKER));
+        } catch (FileAlreadyExistsException e) {
+            // Ignore
+        }
+    }
+
+    private static final Logger log = LoggerFactory.getLogger(EntryLocationIndex.class);
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorage.java
new file mode 100644
index 000000000..b9bbb2a6a
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorage.java
@@ -0,0 +1,162 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Map.Entry;
+
+/**
+ * Abstraction of a generic key-value local database.
+ */
+public interface KeyValueStorage extends Closeable {
+
+    void put(byte[] key, byte[] value) throws IOException;
+
+    /**
+     * Get the value associated with the given key.
+     *
+     * @param key
+     *            the key to lookup
+     * @return the value or null if the key was not found
+     */
+    byte[] get(byte[] key) throws IOException;
+
+    /**
+     * Get the value associated with the given key.
+     *
+     * <p>This method will use the provided array store the value
+     *
+     * @param key
+     *            the key to lookup
+     * @param value
+     *            an array where to store the result
+     * @return -1 if the entry was not found or the length of the value
+     * @throws IOException
+     *             if the value array could not hold the result
+     */
+    int get(byte[] key, byte[] value) throws IOException;
+
+    /**
+     * Get the entry whose key is the biggest and it's lesser than the supplied key.
+     *
+     * <p>For example if the db contains :
+     *
+     * <pre>
+     * {
+     *      1 : 'a',
+     *      2 : 'b',
+     *      3 : 'c'
+     * }
+     * </pre>
+     *
+     * <p>Then:
+     *
+     * <pre>
+     * getFloor(3) --> (2, 'b')
+     * </pre>
+     *
+     * @param key
+     *            the non-inclusive upper limit key
+     * @return the entry before or null if there's no entry before key
+     */
+    Entry<byte[], byte[]> getFloor(byte[] key) throws IOException;
+
+    /**
+     * Get the entry whose key is bigger or equal the supplied key.
+     *
+     * @param key
+     * @return
+     * @throws IOException
+     */
+    Entry<byte[], byte[]> getCeil(byte[] key) throws IOException;
+
+    /**
+     *
+     * @param key
+     * @throws IOException
+     */
+    void delete(byte[] key) throws IOException;
+
+    /**
+     * Get an iterator over to scan sequentially through all the keys in the
+     * database.
+     *
+     * @return
+     */
+    CloseableIterator<byte[]> keys();
+
+    /**
+     * Get an iterator over to scan sequentially through all the keys within a
+     * specified range.
+     *
+     * @param firstKey
+     *            the first key in the range (included)
+     * @param lastKey
+     *            the lastKey in the range (not included)
+     *
+     */
+    CloseableIterator<byte[]> keys(byte[] firstKey, byte[] lastKey);
+
+    /**
+     * Return an iterator object that can be used to sequentially scan through all
+     * the entries in the database.
+     */
+    CloseableIterator<Entry<byte[], byte[]>> iterator();
+
+    /**
+     * Commit all pending write to durable storage.
+     */
+    void sync() throws IOException;
+
+    /**
+     * @return the number of keys.
+     */
+    long count() throws IOException;
+
+    /**
+     * Iterator interface.
+     *
+     * @param <T>
+     */
+    interface CloseableIterator<T> extends Closeable {
+        boolean hasNext() throws IOException;
+
+        T next() throws IOException;
+    }
+
+    Batch newBatch();
+
+    /**
+     * Interface for a batch to be written in the storage.
+     */
+    public interface Batch extends Closeable {
+        void put(byte[] key, byte[] value);
+
+        void remove(byte[] key);
+
+        void deleteRange(byte[] beginKey, byte[] endKey);
+
+        void clear();
+
+        void flush() throws IOException;
+    }
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageFactory.java
new file mode 100644
index 000000000..c35628d77
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageFactory.java
@@ -0,0 +1,42 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import java.io.IOException;
+
+import org.apache.bookkeeper.conf.ServerConfiguration;
+
+/**
+ * Factory class to create instances of the key-value storage implementation.
+ */
+public interface KeyValueStorageFactory {
+
+    /**
+     * Enum used to specify different config profiles in the underlying storage.
+     */
+    enum DbConfigType {
+        Small, // Used for ledgers db, doesn't need particular configuration
+        Huge // Used for location index, lots of writes and much bigger dataset
+    }
+
+    KeyValueStorage newKeyValueStorage(String path, DbConfigType dbConfigType, ServerConfiguration conf)
+            throws IOException;
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageRocksDB.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageRocksDB.java
new file mode 100644
index 000000000..1ab8172e1
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageRocksDB.java
@@ -0,0 +1,539 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.primitives.UnsignedBytes;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.FileSystem;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Comparator;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorageFactory.DbConfigType;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.time.DurationFormatUtils;
+import org.rocksdb.BlockBasedTableConfig;
+import org.rocksdb.BloomFilter;
+import org.rocksdb.ChecksumType;
+import org.rocksdb.CompressionType;
+import org.rocksdb.InfoLogLevel;
+import org.rocksdb.Options;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * RocksDB based implementation of the KeyValueStorage.
+ */
+public class KeyValueStorageRocksDB implements KeyValueStorage {
+
+    static KeyValueStorageFactory factory = new KeyValueStorageFactory() {
+        @Override
+        public KeyValueStorage newKeyValueStorage(String path, DbConfigType dbConfigType, ServerConfiguration conf)
+                throws IOException {
+            doUpgradeIfNeeded(path, dbConfigType, conf);
+            KeyValueStorageRocksDB db = new KeyValueStorageRocksDB(path, dbConfigType, conf);
+            doCreateRocksDbMarker(path);
+            return db;
+        }
+    };
+
+    private final RocksDB db;
+
+    private final WriteOptions optionSync = new WriteOptions();
+    private final WriteOptions optionDontSync = new WriteOptions();
+
+    private final ReadOptions optionCache = new ReadOptions();
+    private final ReadOptions optionDontCache = new ReadOptions();
+
+    private final WriteBatch emptyBatch = new WriteBatch();
+
+    private static final String ROCKSDB_MARKER = "rocksdb-enabled";
+
+    private static final String ROCKSDB_LOG_LEVEL = "dbStorage_rocksDB_logLevel";
+    private static final String ROCKSDB_WRITE_BUFFER_SIZE_MB = "dbStorage_rocksDB_writeBufferSizeMB";
+    private static final String ROCKSDB_SST_SIZE_MB = "dbStorage_rocksDB_sstSizeInMB";
+    private static final String ROCKSDB_BLOCK_SIZE = "dbStorage_rocksDB_blockSize";
+    private static final String ROCKSDB_BLOOM_FILTERS_BITS_PER_KEY = "dbStorage_rocksDB_bloomFilterBitsPerKey";
+    private static final String ROCKSDB_BLOCK_CACHE_SIZE = "dbStorage_rocksDB_blockCacheSize";
+    private static final String ROCKSDB_NUM_LEVELS = "dbStorage_rocksDB_numLevels";
+    private static final String ROCKSDB_NUM_FILES_IN_LEVEL0 = "dbStorage_rocksDB_numFilesInLevel0";
+    private static final String ROCKSDB_MAX_SIZE_IN_LEVEL1_MB = "dbStorage_rocksDB_maxSizeInLevel1MB";
+
+    public KeyValueStorageRocksDB(String path, DbConfigType dbConfigType, ServerConfiguration conf) throws IOException {
+        this(path, dbConfigType, conf, false);
+    }
+
+    public KeyValueStorageRocksDB(String path, DbConfigType dbConfigType, ServerConfiguration conf, boolean readOnly)
+            throws IOException {
+        try {
+            RocksDB.loadLibrary();
+        } catch (Throwable t) {
+            throw new IOException("Failed to load RocksDB JNI library", t);
+        }
+
+        try (Options options = new Options()) {
+            options.setCreateIfMissing(true);
+
+            if (dbConfigType == DbConfigType.Huge) {
+                long writeBufferSizeMB = conf.getInt(ROCKSDB_WRITE_BUFFER_SIZE_MB, 64);
+                long sstSizeMB = conf.getInt(ROCKSDB_SST_SIZE_MB, 64);
+                int numLevels = conf.getInt(ROCKSDB_NUM_LEVELS, -1);
+                int numFilesInLevel0 = conf.getInt(ROCKSDB_NUM_FILES_IN_LEVEL0, 4);
+                long maxSizeInLevel1MB = conf.getLong(ROCKSDB_MAX_SIZE_IN_LEVEL1_MB, 256);
+                int blockSize = conf.getInt(ROCKSDB_BLOCK_SIZE, 64 * 1024);
+                long blockCacheSize = conf.getLong(ROCKSDB_BLOCK_CACHE_SIZE, 256 * 1024 * 1024);
+                int bloomFilterBitsPerKey = conf.getInt(ROCKSDB_BLOOM_FILTERS_BITS_PER_KEY, 10);
+
+                options.setCompressionType(CompressionType.LZ4_COMPRESSION);
+                options.setWriteBufferSize(writeBufferSizeMB * 1024 * 1024);
+                options.setMaxWriteBufferNumber(4);
+                if (numLevels > 0) {
+                    options.setNumLevels(numLevels);
+                }
+                options.setLevelZeroFileNumCompactionTrigger(numFilesInLevel0);
+                options.setMaxBytesForLevelBase(maxSizeInLevel1MB * 1024 * 1024);
+                options.setMaxBackgroundCompactions(16);
+                options.setMaxBackgroundFlushes(16);
+                options.setIncreaseParallelism(32);
+                options.setMaxTotalWalSize(512 * 1024 * 1024);
+                options.setMaxOpenFiles(-1);
+                options.setTargetFileSizeBase(sstSizeMB * 1024 * 1024);
+                options.setDeleteObsoleteFilesPeriodMicros(TimeUnit.HOURS.toMicros(1));
+
+                BlockBasedTableConfig tableOptions = new BlockBasedTableConfig();
+                tableOptions.setBlockSize(blockSize);
+                tableOptions.setBlockCacheSize(blockCacheSize);
+                tableOptions.setFormatVersion(2);
+                tableOptions.setChecksumType(ChecksumType.kxxHash);
+                if (bloomFilterBitsPerKey > 0) {
+                    tableOptions.setFilter(new BloomFilter(bloomFilterBitsPerKey, false));
+                }
+
+                // Options best suited for HDDs
+                tableOptions.setCacheIndexAndFilterBlocks(true);
+                options.setLevelCompactionDynamicLevelBytes(true);
+
+                options.setTableFormatConfig(tableOptions);
+            }
+
+            // Configure log level
+            String logLevel = conf.getString(ROCKSDB_LOG_LEVEL, "info");
+            switch (logLevel) {
+            case "debug":
+                options.setInfoLogLevel(InfoLogLevel.DEBUG_LEVEL);
+                break;
+            case "info":
+                options.setInfoLogLevel(InfoLogLevel.INFO_LEVEL);
+                break;
+            case "warn":
+                options.setInfoLogLevel(InfoLogLevel.WARN_LEVEL);
+                break;
+            case "error":
+                options.setInfoLogLevel(InfoLogLevel.ERROR_LEVEL);
+                break;
+            default:
+                log.warn("Unrecognized RockDB log level: {}", logLevel);
+            }
+
+            // Keep log files for 1month
+            options.setKeepLogFileNum(30);
+            options.setLogFileTimeToRoll(TimeUnit.DAYS.toSeconds(1));
+
+            try {
+                if (readOnly) {
+                    db = RocksDB.openReadOnly(options, path);
+                } else {
+                    db = RocksDB.open(options, path);
+                }
+            } catch (RocksDBException e) {
+                throw new IOException("Error open RocksDB database", e);
+            }
+        }
+
+        optionSync.setSync(true);
+        optionDontSync.setSync(false);
+
+        optionCache.setFillCache(true);
+        optionDontCache.setFillCache(false);
+    }
+
+    @Override
+    public void close() throws IOException {
+        db.close();
+        optionSync.close();
+        optionDontSync.close();
+        optionCache.close();
+        optionDontCache.close();
+        emptyBatch.close();
+    }
+
+    @Override
+    public void put(byte[] key, byte[] value) throws IOException {
+        try {
+            db.put(optionDontSync, key, value);
+        } catch (RocksDBException e) {
+            throw new IOException("Error in RocksDB put", e);
+        }
+    }
+
+    @Override
+    public byte[] get(byte[] key) throws IOException {
+        try {
+            return db.get(key);
+        } catch (RocksDBException e) {
+            throw new IOException("Error in RocksDB get", e);
+        }
+    }
+
+    @Override
+    public int get(byte[] key, byte[] value) throws IOException {
+        try {
+            int res = db.get(key, value);
+            if (res == RocksDB.NOT_FOUND) {
+                return -1;
+            } else if (res > value.length) {
+                throw new IOException("Value array is too small to fit the result");
+            } else {
+                return res;
+            }
+        } catch (RocksDBException e) {
+            throw new IOException("Error in RocksDB get", e);
+        }
+    }
+
+    @Override
+    public Entry<byte[], byte[]> getFloor(byte[] key) throws IOException {
+        try (RocksIterator iterator = db.newIterator(optionCache)) {
+            // Position the iterator on the record whose key is >= to the supplied key
+            iterator.seek(key);
+
+            if (!iterator.isValid()) {
+                // There are no entries >= key
+                iterator.seekToLast();
+                if (iterator.isValid()) {
+                    return new EntryWrapper(iterator.key(), iterator.value());
+                } else {
+                    // Db is empty
+                    return null;
+                }
+            }
+
+            iterator.prev();
+
+            if (!iterator.isValid()) {
+                // Iterator is on the 1st entry of the db and this entry key is >= to the target
+                // key
+                return null;
+            } else {
+                return new EntryWrapper(iterator.key(), iterator.value());
+            }
+        }
+    }
+
+    @Override
+    public Entry<byte[], byte[]> getCeil(byte[] key) throws IOException {
+        try (RocksIterator iterator = db.newIterator(optionCache)) {
+            // Position the iterator on the record whose key is >= to the supplied key
+            iterator.seek(key);
+
+            if (iterator.isValid()) {
+                return new EntryWrapper(iterator.key(), iterator.value());
+            } else {
+                return null;
+            }
+        }
+    }
+
+    @Override
+    public void delete(byte[] key) throws IOException {
+        try {
+            db.delete(optionDontSync, key);
+        } catch (RocksDBException e) {
+            throw new IOException("Error in RocksDB delete", e);
+        }
+    }
+
+    @Override
+    public void sync() throws IOException {
+        try {
+            db.write(optionSync, emptyBatch);
+        } catch (RocksDBException e) {
+            throw new IOException(e);
+        }
+    }
+
+    @Override
+    public CloseableIterator<byte[]> keys() {
+        final RocksIterator iterator = db.newIterator(optionCache);
+        iterator.seekToFirst();
+
+        return new CloseableIterator<byte[]>() {
+            @Override
+            public boolean hasNext() {
+                return iterator.isValid();
+            }
+
+            @Override
+            public byte[] next() {
+                checkArgument(iterator.isValid());
+                byte[] key = iterator.key();
+                iterator.next();
+                return key;
+            }
+
+            @Override
+            public void close() {
+                iterator.close();
+            }
+        };
+    }
+
+    @Override
+    public CloseableIterator<byte[]> keys(byte[] firstKey, byte[] lastKey) {
+        final RocksIterator iterator = db.newIterator(optionCache);
+        iterator.seek(firstKey);
+
+        return new CloseableIterator<byte[]>() {
+            @Override
+            public boolean hasNext() {
+                return iterator.isValid() && ByteComparator.compare(iterator.key(), lastKey) < 0;
+            }
+
+            @Override
+            public byte[] next() {
+                checkArgument(iterator.isValid());
+                byte[] key = iterator.key();
+                iterator.next();
+                return key;
+            }
+
+            @Override
+            public void close() {
+                iterator.close();
+            }
+        };
+    }
+
+    @Override
+    public CloseableIterator<Entry<byte[], byte[]>> iterator() {
+        final RocksIterator iterator = db.newIterator(optionDontCache);
+        iterator.seekToFirst();
+        final EntryWrapper entryWrapper = new EntryWrapper();
+
+        return new CloseableIterator<Entry<byte[], byte[]>>() {
+            @Override
+            public boolean hasNext() {
+                return iterator.isValid();
+            }
+
+            @Override
+            public Entry<byte[], byte[]> next() {
+                checkArgument(iterator.isValid());
+                entryWrapper.key = iterator.key();
+                entryWrapper.value = iterator.value();
+                iterator.next();
+                return entryWrapper;
+            }
+
+            @Override
+            public void close() {
+                iterator.close();
+            }
+        };
+    }
+
+    @Override
+    public long count() throws IOException {
+        try {
+            return db.getLongProperty("rocksdb.estimate-num-keys");
+        } catch (RocksDBException e) {
+            throw new IOException("Error in getting records count", e);
+        }
+    }
+
+    @Override
+    public Batch newBatch() {
+        return new RocksDBBatch();
+    }
+
+    private class RocksDBBatch implements Batch {
+        private final WriteBatch writeBatch = new WriteBatch();
+
+        @Override
+        public void close() {
+            writeBatch.close();
+        }
+
+        @Override
+        public void put(byte[] key, byte[] value) {
+            writeBatch.put(key, value);
+        }
+
+        @Override
+        public void remove(byte[] key) {
+            writeBatch.remove(key);
+        }
+
+        @Override
+        public void clear() {
+            writeBatch.clear();
+        }
+
+        @Override
+        public void deleteRange(byte[] beginKey, byte[] endKey) {
+            writeBatch.deleteRange(beginKey, endKey);
+        }
+
+        @Override
+        public void flush() throws IOException {
+            try {
+                db.write(optionSync, writeBatch);
+            } catch (RocksDBException e) {
+                throw new IOException("Failed to flush RocksDB batch", e);
+            }
+        }
+    }
+
+    private static final class EntryWrapper implements Entry<byte[], byte[]> {
+        private final byte[] key;
+        private final byte[] value;
+
+        public EntryWrapper() {
+            this.key = null;
+            this.value = null;
+        }
+
+        public EntryWrapper(byte[] key, byte[] value) {
+            this.key = key;
+            this.value = value;
+        }
+
+        @Override
+        public byte[] setValue(byte[] value) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public byte[] getValue() {
+            return value;
+        }
+
+        @Override
+        public byte[] getKey() {
+            return key;
+        }
+    }
+
+    /**
+     * Checks whether the DB was already created with RocksDB, otherwise copy all
+     * data from one database into a fresh database.
+     *
+     * <p>Useful when switching from LevelDB to RocksDB since it lets us to restart
+     * fresh and build a DB with the new settings (SSTs sizes, levels, etc..)
+     */
+    private static void doUpgradeIfNeeded(String path, DbConfigType dbConfigType, ServerConfiguration conf)
+            throws IOException {
+        FileSystem fileSystem = FileSystems.getDefault();
+        final Path rocksDbMarkerFile = fileSystem.getPath(path, ROCKSDB_MARKER);
+
+        if (Files.exists(fileSystem.getPath(path))) {
+            // Database already existing
+            if (Files.exists(rocksDbMarkerFile)) {
+                // Database was already created with RocksDB
+                return;
+            }
+        } else {
+            // Database not existing, no need to convert
+            return;
+        }
+
+        log.info("Converting existing database to RocksDB: {}", path);
+        long startTime = System.nanoTime();
+
+        String rocksDbPath = path + ".rocksdb";
+        KeyValueStorage source = new KeyValueStorageRocksDB(path, dbConfigType, conf, true /* read-only */);
+
+        log.info("Opened existing db, starting copy");
+        KeyValueStorage target = new KeyValueStorageRocksDB(rocksDbPath, dbConfigType, conf);
+
+        // Copy into new database. Write in batches to speed up the insertion
+        CloseableIterator<Entry<byte[], byte[]>> iterator = source.iterator();
+        Batch batch = target.newBatch();
+        try {
+            final int maxBatchSize = 10000;
+            int currentBatchSize = 0;
+
+            while (iterator.hasNext()) {
+                Entry<byte[], byte[]> entry = iterator.next();
+
+                batch.put(entry.getKey(), entry.getValue());
+                if (++currentBatchSize == maxBatchSize) {
+                    batch.flush();
+                    batch.clear();
+                    currentBatchSize = 0;
+                }
+            }
+
+            batch.flush();
+        } finally {
+            batch.close();
+            iterator.close();
+            source.close();
+            target.close();
+        }
+
+        FileUtils.deleteDirectory(new File(path));
+        Files.move(fileSystem.getPath(rocksDbPath), fileSystem.getPath(path));
+
+        // Create the marked to avoid conversion next time
+        Files.createFile(rocksDbMarkerFile);
+
+        log.info("Database conversion done. Total time: {}",
+                DurationFormatUtils.formatDurationHMS(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime)));
+    }
+
+    private static void doCreateRocksDbMarker(String path) throws IOException {
+        try {
+            Files.createFile(FileSystems.getDefault().getPath(path, ROCKSDB_MARKER));
+        } catch (FileAlreadyExistsException e) {
+            // Ignore
+        }
+    }
+
+    private static final Comparator<byte[]> ByteComparator = UnsignedBytes.lexicographicalComparator();
+
+    private static final Logger log = LoggerFactory.getLogger(KeyValueStorageRocksDB.class);
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndex.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndex.java
new file mode 100644
index 000000000..04bf32dba
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndex.java
@@ -0,0 +1,262 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import com.google.protobuf.ByteString;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.file.FileSystems;
+import java.util.AbstractMap.SimpleEntry;
+import java.util.Arrays;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.BookieException;
+import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorageDataFormats.LedgerData;
+import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorage.CloseableIterator;
+import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorageFactory.DbConfigType;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.stats.Gauge;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.util.collections.ConcurrentLongHashMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Maintains an index for the ledgers metadata.
+ *
+ * <p>The key is the ledgerId and the value is the {@link LedgerData} content.
+ */
+public class LedgerMetadataIndex implements Closeable {
+    // Contains all ledgers stored in the bookie
+    private final ConcurrentLongHashMap<LedgerData> ledgers;
+    private final AtomicInteger ledgersCount;
+
+    private final KeyValueStorage ledgersDb;
+    private StatsLogger stats;
+
+    // Holds ledger modifications applied in memory map, and pending to be flushed on db
+    private final ConcurrentLinkedQueue<Entry<Long, LedgerData>> pendingLedgersUpdates;
+
+    // Holds ledger ids that were delete from memory map, and pending to be flushed on db
+    private final ConcurrentLinkedQueue<Long> pendingDeletedLedgers;
+
+    public LedgerMetadataIndex(ServerConfiguration conf, KeyValueStorageFactory storageFactory, String basePath,
+            StatsLogger stats) throws IOException {
+        String ledgersPath = FileSystems.getDefault().getPath(basePath, "ledgers").toFile().toString();
+        ledgersDb = storageFactory.newKeyValueStorage(ledgersPath, DbConfigType.Small, conf);
+
+        ledgers = new ConcurrentLongHashMap<>();
+        ledgersCount = new AtomicInteger();
+
+        // Read all ledgers from db
+        CloseableIterator<Entry<byte[], byte[]>> iterator = ledgersDb.iterator();
+        try {
+            while (iterator.hasNext()) {
+                Entry<byte[], byte[]> entry = iterator.next();
+                long ledgerId = ArrayUtil.getLong(entry.getKey(), 0);
+                LedgerData ledgerData = LedgerData.parseFrom(entry.getValue());
+                ledgers.put(ledgerId, ledgerData);
+                ledgersCount.incrementAndGet();
+            }
+        } finally {
+            iterator.close();
+        }
+
+        this.pendingLedgersUpdates = new ConcurrentLinkedQueue<Entry<Long, LedgerData>>();
+        this.pendingDeletedLedgers = new ConcurrentLinkedQueue<Long>();
+
+        this.stats = stats;
+        registerStats();
+    }
+
+    public void registerStats() {
+        stats.registerGauge("ledgers-count", new Gauge<Long>() {
+            @Override
+            public Long getDefaultValue() {
+                return 0L;
+            }
+
+            @Override
+            public Long getSample() {
+                return (long) ledgersCount.get();
+            }
+        });
+    }
+
+    @Override
+    public void close() throws IOException {
+        ledgersDb.close();
+    }
+
+    public LedgerData get(long ledgerId) throws IOException {
+        LedgerData ledgerData = ledgers.get(ledgerId);
+        if (ledgerData == null) {
+            if (log.isDebugEnabled()) {
+                log.debug("Ledger not found {}", ledgerId);
+            }
+            throw new Bookie.NoLedgerException(ledgerId);
+        }
+
+        return ledgerData;
+    }
+
+    public void set(long ledgerId, LedgerData ledgerData) throws IOException {
+        ledgerData = LedgerData.newBuilder(ledgerData).setExists(true).build();
+
+        if (ledgers.put(ledgerId, ledgerData) == null) {
+            if (log.isDebugEnabled()) {
+                log.debug("Added new ledger {}", ledgerId);
+            }
+            ledgersCount.incrementAndGet();
+        }
+
+        pendingLedgersUpdates.add(new SimpleEntry<Long, LedgerData>(ledgerId, ledgerData));
+        pendingDeletedLedgers.remove(ledgerId);
+    }
+
+    public void delete(long ledgerId) throws IOException {
+        if (ledgers.remove(ledgerId) != null) {
+            if (log.isDebugEnabled()) {
+                log.debug("Removed ledger {}", ledgerId);
+            }
+            ledgersCount.decrementAndGet();
+        }
+
+        pendingDeletedLedgers.add(ledgerId);
+        pendingLedgersUpdates.removeIf(e -> e.getKey() == ledgerId);
+    }
+
+    public Iterable<Long> getActiveLedgersInRange(final long firstLedgerId, final long lastLedgerId)
+            throws IOException {
+        return Iterables.filter(ledgers.keys(), new Predicate<Long>() {
+            @Override
+            public boolean apply(Long ledgerId) {
+                return ledgerId >= firstLedgerId && ledgerId < lastLedgerId;
+            }
+        });
+    }
+
+    public boolean setFenced(long ledgerId) throws IOException {
+        LedgerData ledgerData = get(ledgerId);
+        if (ledgerData.getFenced()) {
+            return false;
+        }
+
+        LedgerData newLedgerData = LedgerData.newBuilder(ledgerData).setFenced(true).build();
+
+        if (ledgers.put(ledgerId, newLedgerData) == null) {
+            // Ledger had been deleted
+            if (log.isDebugEnabled()) {
+                log.debug("Re-inserted fenced ledger {}", ledgerId);
+            }
+            ledgersCount.incrementAndGet();
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug("Set fenced ledger {}", ledgerId);
+            }
+        }
+
+        pendingLedgersUpdates.add(new SimpleEntry<Long, LedgerData>(ledgerId, newLedgerData));
+        pendingDeletedLedgers.remove(ledgerId);
+        return true;
+    }
+
+    public void setMasterKey(long ledgerId, byte[] masterKey) throws IOException {
+        LedgerData ledgerData = ledgers.get(ledgerId);
+        if (ledgerData == null) {
+            // New ledger inserted
+            ledgerData = LedgerData.newBuilder().setExists(true).setFenced(false)
+                    .setMasterKey(ByteString.copyFrom(masterKey)).build();
+            if (log.isDebugEnabled()) {
+                log.debug("Inserting new ledger {}", ledgerId);
+            }
+        } else {
+            byte[] storedMasterKey = ledgerData.getMasterKey().toByteArray();
+            if (ArrayUtil.isArrayAllZeros(storedMasterKey)) {
+                // update master key of the ledger
+                ledgerData = LedgerData.newBuilder(ledgerData).setMasterKey(ByteString.copyFrom(masterKey)).build();
+                if (log.isDebugEnabled()) {
+                    log.debug("Replace old master key {} with new master key {}", storedMasterKey, masterKey);
+                }
+            } else if (!Arrays.equals(storedMasterKey, masterKey) && !ArrayUtil.isArrayAllZeros(masterKey)) {
+                log.warn("Ledger {} masterKey in db can only be set once.", ledgerId);
+                throw new IOException(BookieException.create(BookieException.Code.IllegalOpException));
+            }
+        }
+
+        if (ledgers.put(ledgerId, ledgerData) == null) {
+            ledgersCount.incrementAndGet();
+        }
+
+        pendingLedgersUpdates.add(new SimpleEntry<Long, LedgerData>(ledgerId, ledgerData));
+        pendingDeletedLedgers.remove(ledgerId);
+    }
+
+    /**
+     * Flushes all pending changes.
+     */
+    public void flush() throws IOException {
+        LongWrapper key = LongWrapper.get();
+
+        int updatedLedgers = 0;
+        while (!pendingLedgersUpdates.isEmpty()) {
+            Entry<Long, LedgerData> entry = pendingLedgersUpdates.poll();
+            key.set(entry.getKey());
+            byte[] value = entry.getValue().toByteArray();
+            ledgersDb.put(key.array, value);
+            ++updatedLedgers;
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug("Persisting updates to {} ledgers", updatedLedgers);
+        }
+
+        ledgersDb.sync();
+        key.recycle();
+    }
+
+    public void removeDeletedLedgers() throws IOException {
+        LongWrapper key = LongWrapper.get();
+
+        int deletedLedgers = 0;
+        while (!pendingDeletedLedgers.isEmpty()) {
+            long ledgerId = pendingDeletedLedgers.poll();
+            key.set(ledgerId);
+            ledgersDb.delete(key.array);
+            deletedLedgers++;
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug("Persisting deletes of ledgers {}", deletedLedgers);
+        }
+
+        ledgersDb.sync();
+        key.recycle();
+    }
+
+    private static final Logger log = LoggerFactory.getLogger(LedgerMetadataIndex.class);
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java
new file mode 100644
index 000000000..0326335cd
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java
@@ -0,0 +1,140 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import com.google.common.collect.Sets;
+
+import io.netty.buffer.ByteBuf;
+
+import java.io.IOException;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.EntryLogger;
+import org.apache.bookkeeper.bookie.EntryLogger.EntryLogScanner;
+import org.apache.bookkeeper.bookie.LedgerDirsManager;
+import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorageFactory.DbConfigType;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.util.DiskChecker;
+import org.apache.commons.lang.time.DurationFormatUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Scan all entries in the entry log and rebuild the ledgerStorageIndex.
+ */
+public class LocationsIndexRebuildOp {
+    private final ServerConfiguration conf;
+
+    public LocationsIndexRebuildOp(ServerConfiguration conf) {
+        this.conf = conf;
+    }
+
+    public void initiate() throws IOException {
+        LOG.info("Starting index rebuilding");
+
+        // Move locations index to a backup directory
+        String basePath = Bookie.getCurrentDirectory(conf.getLedgerDirs()[0]).toString();
+        Path currentPath = FileSystems.getDefault().getPath(basePath, "locations");
+        String timestamp = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ").format(new Date());
+        Path backupPath = FileSystems.getDefault().getPath(basePath, "locations.BACKUP-" + timestamp);
+        Files.move(currentPath, backupPath);
+
+        LOG.info("Created locations index backup at {}", backupPath);
+
+        long startTime = System.nanoTime();
+
+        EntryLogger entryLogger = new EntryLogger(conf, new LedgerDirsManager(conf, conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold())));
+        Set<Long> entryLogs = entryLogger.getEntryLogsSet();
+
+        String locationsDbPath = FileSystems.getDefault().getPath(basePath, "locations").toFile().toString();
+
+        Set<Long> activeLedgers = getActiveLedgers(conf, KeyValueStorageRocksDB.factory, basePath);
+        LOG.info("Found {} active ledgers in ledger manager", activeLedgers.size());
+
+        KeyValueStorage newIndex = KeyValueStorageRocksDB.factory.newKeyValueStorage(locationsDbPath, DbConfigType.Huge,
+                conf);
+
+        int totalEntryLogs = entryLogs.size();
+        int completedEntryLogs = 0;
+        LOG.info("Scanning {} entry logs", totalEntryLogs);
+
+        for (long entryLogId : entryLogs) {
+            entryLogger.scanEntryLog(entryLogId, new EntryLogScanner() {
+                @Override
+                public void process(long ledgerId, long offset, ByteBuf entry) throws IOException {
+                    long entryId = entry.getLong(8);
+
+                    // Actual location indexed is pointing past the entry size
+                    long location = (entryLogId << 32L) | (offset + 4);
+
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Rebuilding {}:{} at location {} / {}", ledgerId, entryId, location >> 32,
+                                location & (Integer.MAX_VALUE - 1));
+                    }
+
+                    // Update the ledger index page
+                    LongPairWrapper key = LongPairWrapper.get(ledgerId, entryId);
+                    LongWrapper value = LongWrapper.get(location);
+                    newIndex.put(key.array, value.array);
+                }
+
+                @Override
+                public boolean accept(long ledgerId) {
+                    return activeLedgers.contains(ledgerId);
+                }
+            });
+
+            ++completedEntryLogs;
+            LOG.info("Completed scanning of log {}.log -- {} / {}", Long.toHexString(entryLogId), completedEntryLogs,
+                    totalEntryLogs);
+        }
+
+        newIndex.sync();
+        newIndex.close();
+        EntryLocationIndex.doCreateMarkerFile(locationsDbPath);
+
+        LOG.info("Rebuilding index is done. Total time: {}",
+                DurationFormatUtils.formatDurationHMS(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime)));
+    }
+
+    private Set<Long> getActiveLedgers(ServerConfiguration conf, KeyValueStorageFactory storageFactory, String basePath)
+            throws IOException {
+        LedgerMetadataIndex ledgers = new LedgerMetadataIndex(conf, storageFactory, basePath, NullStatsLogger.INSTANCE);
+        Set<Long> activeLedgers = Sets.newHashSet();
+        for (Long ledger : ledgers.getActiveLedgersInRange(0, Long.MAX_VALUE)) {
+            activeLedgers.add(ledger);
+        }
+
+        ledgers.close();
+        return activeLedgers;
+    }
+
+    private static final Logger LOG = LoggerFactory.getLogger(LocationsIndexRebuildOp.class);
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LongPairWrapper.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LongPairWrapper.java
new file mode 100644
index 000000000..993297c2b
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LongPairWrapper.java
@@ -0,0 +1,69 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import io.netty.util.Recycler;
+import io.netty.util.Recycler.Handle;
+
+/**
+ * Recyclable wrapper that holds a pair of longs.
+ */
+class LongPairWrapper {
+
+    final byte[] array = new byte[16];
+
+    public void set(long first, long second) {
+        ArrayUtil.setLong(array, 0, first);
+        ArrayUtil.setLong(array, 8, second);
+    }
+
+    public long getFirst() {
+        return ArrayUtil.getLong(array, 0);
+    }
+
+    public long getSecond() {
+        return ArrayUtil.getLong(array, 8);
+    }
+
+    public static LongPairWrapper get(long first, long second) {
+        LongPairWrapper lp = RECYCLER.get();
+        ArrayUtil.setLong(lp.array, 0, first);
+        ArrayUtil.setLong(lp.array, 8, second);
+        return lp;
+    }
+
+    public void recycle() {
+        handle.recycle(this);
+    }
+
+    private static final Recycler<LongPairWrapper> RECYCLER = new Recycler<LongPairWrapper>() {
+        @Override
+        protected LongPairWrapper newObject(Handle<LongPairWrapper> handle) {
+            return new LongPairWrapper(handle);
+        }
+    };
+
+    private final Handle<LongPairWrapper> handle;
+
+    private LongPairWrapper(Handle<LongPairWrapper> handle) {
+        this.handle = handle;
+    }
+}
\ No newline at end of file
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LongWrapper.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LongWrapper.java
new file mode 100644
index 000000000..b0525f7c1
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LongWrapper.java
@@ -0,0 +1,64 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import io.netty.util.Recycler;
+import io.netty.util.Recycler.Handle;
+
+class LongWrapper {
+
+    final byte[] array = new byte[8];
+
+    public void set(long value) {
+        ArrayUtil.setLong(array, 0, value);
+    }
+
+    public long getValue() {
+        return ArrayUtil.getLong(array, 0);
+    }
+
+    public static LongWrapper get() {
+        return RECYCLER.get();
+    }
+
+    public static LongWrapper get(long value) {
+        LongWrapper lp = RECYCLER.get();
+        ArrayUtil.setLong(lp.array, 0, value);
+        return lp;
+    }
+
+    public void recycle() {
+        handle.recycle(this);
+    }
+
+    private static final Recycler<LongWrapper> RECYCLER = new Recycler<LongWrapper>() {
+        @Override
+        protected LongWrapper newObject(Handle<LongWrapper> handle) {
+            return new LongWrapper(handle);
+        }
+    };
+
+    private final Handle<LongWrapper> handle;
+
+    private LongWrapper(Handle<LongWrapper> handle) {
+        this.handle = handle;
+    }
+}
\ No newline at end of file
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ReadCache.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ReadCache.java
new file mode 100644
index 000000000..e12df41d3
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ReadCache.java
@@ -0,0 +1,186 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import static org.apache.bookkeeper.bookie.storage.ldb.WriteCache.align64;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.Unpooled;
+
+import java.io.Closeable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap;
+import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap.LongPair;
+
+/**
+ * Read cache implementation.
+ *
+ * <p>Uses the specified amount of memory and pairs it with a hashmap.
+ */
+public class ReadCache implements Closeable {
+
+    private static final int MaxSegmentSize = 1 * 1024 * 1024 * 1024;
+
+    private final List<ByteBuf> cacheSegments;
+    private final List<ConcurrentLongLongPairHashMap> cacheIndexes;
+
+    private int currentSegmentIdx;
+    private AtomicInteger currentSegmentOffset = new AtomicInteger(0);
+
+    private final int segmentSize;
+
+    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
+    public ReadCache(long maxCacheSize) {
+        int segmentsCount = Math.max(2, (int) (maxCacheSize / MaxSegmentSize));
+        segmentSize = (int) (maxCacheSize / segmentsCount);
+
+        cacheSegments = new ArrayList<>();
+        cacheIndexes = new ArrayList<>();
+
+        for (int i = 0; i < segmentsCount; i++) {
+            cacheSegments.add(Unpooled.directBuffer(segmentSize, segmentSize));
+            cacheIndexes.add(new ConcurrentLongLongPairHashMap(4096, 2 * Runtime.getRuntime().availableProcessors()));
+        }
+    }
+
+    @Override
+    public void close() {
+        cacheSegments.forEach(ByteBuf::release);
+    }
+
+    public void put(long ledgerId, long entryId, ByteBuf entry) {
+        int entrySize = entry.readableBytes();
+        int alignedSize = align64(entrySize);
+
+        lock.readLock().lock();
+
+        try {
+            int offset = currentSegmentOffset.getAndAdd(alignedSize);
+            if (offset + entrySize > segmentSize) {
+                // Roll-over the segment (outside the read-lock)
+            } else {
+                // Copy entry into read cache segment
+                cacheSegments.get(currentSegmentIdx).setBytes(offset, entry, entry.readerIndex(),
+                        entry.readableBytes());
+                cacheIndexes.get(currentSegmentIdx).put(ledgerId, entryId, offset, entrySize);
+                return;
+            }
+        } finally {
+            lock.readLock().unlock();
+        }
+
+        // We could not insert in segment, we to get the write lock and roll-over to next segment
+        lock.writeLock().lock();
+
+        try {
+            int offset = currentSegmentOffset.getAndAdd(entrySize);
+            if (offset + entrySize > segmentSize) {
+                // Rollover to next segment
+                currentSegmentIdx = (currentSegmentIdx + 1) % cacheSegments.size();
+                currentSegmentOffset.set(alignedSize);
+                cacheIndexes.get(currentSegmentIdx).clear();
+                offset = 0;
+            }
+
+            // Copy entry into read cache segment
+            cacheSegments.get(currentSegmentIdx).setBytes(offset, entry, entry.readerIndex(), entry.readableBytes());
+            cacheIndexes.get(currentSegmentIdx).put(ledgerId, entryId, offset, entrySize);
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    public ByteBuf get(long ledgerId, long entryId) {
+        lock.readLock().lock();
+
+        try {
+            // We need to check all the segments, starting from the current one and looking backward to minimize the
+            // checks for recently inserted entries
+            int size = cacheSegments.size();
+            for (int i = 0; i < size; i++) {
+                int segmentIdx = (currentSegmentIdx + (size - i)) % size;
+
+                LongPair res = cacheIndexes.get(segmentIdx).get(ledgerId, entryId);
+                if (res != null) {
+                    int entryOffset = (int) res.first;
+                    int entryLen = (int) res.second;
+
+                    ByteBuf entry = ByteBufAllocator.DEFAULT.directBuffer(entryLen, entryLen);
+                    entry.writeBytes(cacheSegments.get(segmentIdx), entryOffset, entryLen);
+                    return entry;
+                }
+            }
+        } finally {
+            lock.readLock().unlock();
+        }
+
+        // Entry not found in any segment
+        return null;
+    }
+
+    /**
+     * @return the total size of cached entries
+     */
+    public long size() {
+        lock.readLock().lock();
+
+        try {
+            long size = 0;
+            for (int i = 0; i < cacheIndexes.size(); i++) {
+                if (i == currentSegmentIdx) {
+                    size += currentSegmentOffset.get();
+                } else if (!cacheIndexes.get(i).isEmpty()) {
+                    size += segmentSize;
+                } else {
+                    // the segment is empty
+                }
+            }
+
+            return size;
+        } finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    /**
+     * @return the total number of cached entries
+     */
+    public long count() {
+        lock.readLock().lock();
+
+        try {
+            long count = 0;
+            for (int i = 0; i < cacheIndexes.size(); i++) {
+                count += cacheIndexes.get(i).size();
+            }
+
+            return count;
+        } finally {
+            lock.readLock().unlock();
+        }
+    }
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/WriteCache.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/WriteCache.java
new file mode 100644
index 000000000..b70acb5e9
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/WriteCache.java
@@ -0,0 +1,269 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.Unpooled;
+
+import java.io.Closeable;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.bookkeeper.util.collections.ConcurrentLongHashSet;
+import org.apache.bookkeeper.util.collections.ConcurrentLongLongHashMap;
+import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap;
+import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap.LongPair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Write cache implementation.
+ */
+public class WriteCache implements Closeable {
+
+    /**
+     * Consumer that is used to scan the entire write cache.
+     */
+    public interface EntryConsumer {
+        void accept(long ledgerId, long entryId, ByteBuf entry);
+    }
+
+    private final ConcurrentLongLongPairHashMap index = new ConcurrentLongLongPairHashMap(4096,
+            2 * Runtime.getRuntime().availableProcessors());
+
+    private final ConcurrentLongLongHashMap lastEntryMap = new ConcurrentLongLongHashMap(4096,
+            2 * Runtime.getRuntime().availableProcessors());
+
+    private final ByteBuf[] cacheSegments;
+    private final int segmentsCount;
+
+    private static final int MaxSegmentSize = Integer.MAX_VALUE;
+    private static final long SegmentOffsetMask = (long) Integer.MAX_VALUE;
+
+    private final long maxCacheSize;
+    private final AtomicLong cacheSize = new AtomicLong(0);
+    private final LongAdder cacheCount = new LongAdder();
+
+    private final ConcurrentLongHashSet deletedLedgers = new ConcurrentLongHashSet();
+
+    public WriteCache(long maxCacheSize) {
+        this.maxCacheSize = maxCacheSize;
+        this.segmentsCount = 1 + (int) (maxCacheSize / MaxSegmentSize);
+
+        this.cacheSegments = new ByteBuf[segmentsCount];
+
+        for (int i = 0; i < segmentsCount - 1; i++) {
+            // All intermediate segments will be full-size
+            cacheSegments[i] = Unpooled.directBuffer(MaxSegmentSize, MaxSegmentSize);
+        }
+
+        int lastSegmentSize = (int) (maxCacheSize % MaxSegmentSize);
+        cacheSegments[segmentsCount - 1] = Unpooled.directBuffer(lastSegmentSize, lastSegmentSize);
+    }
+
+    public void clear() {
+        cacheSize.set(0L);
+        cacheCount.reset();
+        index.clear();
+        lastEntryMap.clear();
+        deletedLedgers.clear();
+    }
+
+    @Override
+    public void close() {
+        for (ByteBuf buf : cacheSegments) {
+            buf.release();
+        }
+    }
+
+    public boolean put(long ledgerId, long entryId, ByteBuf entry) {
+        int size = entry.readableBytes();
+
+        // Align to 64 bytes so that different threads will not contend the same L1 cache line
+        int alignedSize = align64(size);
+
+        long offset;
+        int localOffset;
+        int segmentIdx;
+
+        while (true) {
+            offset = cacheSize.getAndAdd(alignedSize);
+            localOffset = (int) (offset & SegmentOffsetMask);
+            segmentIdx = (int) (offset / MaxSegmentSize);
+
+            if ((offset + size) > maxCacheSize) {
+                // Cache is full
+                return false;
+            } else if (MaxSegmentSize - localOffset < size) {
+                // If an entry is at the end of a segment, we need to get a new offset and try again in next segment
+                continue;
+            } else {
+                // Found a good offset
+                break;
+            }
+        }
+
+        cacheSegments[segmentIdx].setBytes(localOffset, entry, entry.readerIndex(), entry.readableBytes());
+
+        // Update last entryId for ledger. This logic is to handle writes for the same ledger coming out of order and
+        // from different thread, though in practice it should not happen and the compareAndSet should be always
+        // uncontended.
+        while (true) {
+            long currentLastEntryId = lastEntryMap.get(ledgerId);
+            if (currentLastEntryId > entryId) {
+                // A newer entry is already there
+                break;
+            }
+
+            if (lastEntryMap.compareAndSet(ledgerId, currentLastEntryId, entryId)) {
+                break;
+            }
+        }
+
+        index.put(ledgerId, entryId, offset, size);
+        cacheCount.increment();
+        return true;
+    }
+
+    public ByteBuf get(long ledgerId, long entryId) {
+        LongPair result = index.get(ledgerId, entryId);
+        if (result == null) {
+            return null;
+        }
+
+        long offset = result.first;
+        int size = (int) result.second;
+        ByteBuf entry = ByteBufAllocator.DEFAULT.buffer(size, size);
+
+        int localOffset = (int) (offset & SegmentOffsetMask);
+        int segmentIdx = (int) (offset / MaxSegmentSize);
+        entry.writeBytes(cacheSegments[segmentIdx], localOffset, size);
+        return entry;
+    }
+
+    public ByteBuf getLastEntry(long ledgerId) {
+        long lastEntryId = lastEntryMap.get(ledgerId);
+        if (lastEntryId == -1) {
+            // Ledger not found in write cache
+            return null;
+        } else {
+            return get(ledgerId, lastEntryId);
+        }
+    }
+
+    public void deleteLedger(long ledgerId) {
+        deletedLedgers.add(ledgerId);
+    }
+
+    private static final ArrayGroupSort groupSorter = new ArrayGroupSort(2, 4);
+
+    public void forEach(EntryConsumer consumer) {
+        sortedEntriesLock.lock();
+
+        try {
+            int entriesToSort = (int) index.size();
+            int arrayLen = entriesToSort * 4;
+            if (sortedEntries == null || sortedEntries.length < arrayLen) {
+                sortedEntries = new long[(int) (arrayLen * 2)];
+            }
+
+            long startTime = System.nanoTime();
+
+            sortedEntriesIdx = 0;
+            index.forEach((ledgerId, entryId, offset, length) -> {
+                if (deletedLedgers.contains(ledgerId)) {
+                    // Ignore deleted ledgers
+                    return;
+                }
+
+                sortedEntries[sortedEntriesIdx] = ledgerId;
+                sortedEntries[sortedEntriesIdx + 1] = entryId;
+                sortedEntries[sortedEntriesIdx + 2] = offset;
+                sortedEntries[sortedEntriesIdx + 3] = length;
+                sortedEntriesIdx += 4;
+            });
+
+            if (log.isDebugEnabled()) {
+                log.debug("iteration took {} ms", (System.nanoTime() - startTime) / 1e6);
+            }
+            startTime = System.nanoTime();
+
+            // Sort entries by (ledgerId, entryId) maintaining the 4 items groups
+            groupSorter.sort(sortedEntries, 0, sortedEntriesIdx);
+            if (log.isDebugEnabled()) {
+                log.debug("sorting {} ms", (System.nanoTime() - startTime) / 1e6);
+            }
+            startTime = System.nanoTime();
+
+            ByteBuf[] entrySegments = new ByteBuf[segmentsCount];
+            for (int i = 0; i < segmentsCount; i++) {
+                entrySegments[i] = cacheSegments[i].slice(0, cacheSegments[i].capacity());
+            }
+
+            for (int i = 0; i < sortedEntriesIdx; i += 4) {
+                long ledgerId = sortedEntries[i];
+                long entryId = sortedEntries[i + 1];
+                long offset = sortedEntries[i + 2];
+                long length = sortedEntries[i + 3];
+
+                int localOffset = (int) (offset & SegmentOffsetMask);
+                int segmentIdx = (int) (offset / MaxSegmentSize);
+                ByteBuf entry = entrySegments[segmentIdx];
+                entry.setIndex(localOffset, localOffset + (int) length);
+                consumer.accept(ledgerId, entryId, entry);
+            }
+
+            if (log.isDebugEnabled()) {
+                log.debug("entry log adding {} ms", (System.nanoTime() - startTime) / 1e6);
+            }
+        } finally {
+            sortedEntriesLock.unlock();
+        }
+    }
+
+    public long size() {
+        // The internal cache size is used as offset and can go above the max cache size, though in that case, the entry
+        // will be rejected
+        return Math.min(maxCacheSize, cacheSize.get());
+    }
+
+    public long count() {
+        return cacheCount.sum();
+    }
+
+    public boolean isEmpty() {
+        return cacheSize.get() == 0L;
+    }
+
+    private static final int ALIGN_64_MASK = ~(64 - 1);
+
+    static int align64(int size) {
+        return (size + 64 - 1) & ALIGN_64_MASK;
+    }
+
+    private final ReentrantLock sortedEntriesLock = new ReentrantLock();
+    private long[] sortedEntries;
+    private int sortedEntriesIdx;
+
+    private static final Logger log = LoggerFactory.getLogger(WriteCache.class);
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/package-info.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/package-info.java
new file mode 100644
index 000000000..6c6cd8c92
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/package-info.java
@@ -0,0 +1,25 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**
+ * Classes related to DB based ledger storage.
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
\ No newline at end of file
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
index 527762b35..93bf0fb27 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
@@ -46,6 +46,9 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Sets;
+
+import static org.junit.Assert.*;
 
 /**
  * Tests for EntryLog.
@@ -351,4 +354,34 @@ public void testPreAllocateLog() throws Exception {
 
     }
 
+    /**
+     * Test the getEntryLogsSet() method
+     */
+    @Test(timeout = 60000)
+    public void testGetEntryLogsSet() throws Exception {
+        File tmpDir = createTempDir("bkTest", ".dir");
+        File curDir = Bookie.getCurrentDirectory(tmpDir);
+        Bookie.checkDirectoryStructure(curDir);
+
+        int gcWaitTime = 1000;
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+        conf.setGcWaitTime(gcWaitTime);
+        conf.setLedgerDirNames(new String[] { tmpDir.toString() });
+        Bookie bookie = new Bookie(conf);
+
+        // create some entries
+        EntryLogger logger = ((InterleavedLedgerStorage) bookie.ledgerStorage).entryLogger;
+
+        assertEquals(Sets.newHashSet(0L), logger.getEntryLogsSet());
+
+        logger.rollLog();
+        logger.flushRotatedLogs();
+
+        assertEquals(Sets.newHashSet(0L, 1L), logger.getEntryLogsSet());
+
+        logger.rollLog();
+        logger.flushRotatedLogs();
+
+        assertEquals(Sets.newHashSet(0L, 1L, 2L), logger.getEntryLogsSet());
+    }
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ArraySortGroupTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ArraySortGroupTest.java
new file mode 100644
index 000000000..187d7c51e
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ArraySortGroupTest.java
@@ -0,0 +1,157 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.fail;
+
+import org.junit.Test;
+
+public class ArraySortGroupTest {
+
+    @Test
+    public void simple() {
+        long[] data = new long[] { //
+                1, 2, 3, 4, //
+                5, 6, 3, 1, //
+                4, 8, 1, 2, //
+                4, 5, 12, 10, //
+                3, 3, 3, 3, //
+                4, 3, 1, 2, //
+                3, 3, 3, 3, //
+        };
+
+        long[] expectedSorted = new long[] { //
+                1, 2, 3, 4, //
+                3, 3, 3, 3, //
+                3, 3, 3, 3, //
+                4, 3, 1, 2, //
+                4, 5, 12, 10, //
+                4, 8, 1, 2, //
+                5, 6, 3, 1, //
+        };
+
+        ArrayGroupSort sorter = new ArrayGroupSort(2, 4);
+        sorter.sort(data);
+
+        assertArrayEquals(expectedSorted, data);
+    }
+
+    @Test
+    public void keySmallerThanTotalSize() {
+        try {
+            new ArrayGroupSort(3, 2);
+            fail("should have failed");
+        } catch (IllegalArgumentException e) {
+            // ok
+        }
+    }
+
+    @Test
+    public void negativeKeySize() {
+        try {
+            new ArrayGroupSort(-1, 2);
+            fail("should have failed");
+        } catch (IllegalArgumentException e) {
+            // ok
+        }
+    }
+
+    @Test
+    public void negativeTotalSize() {
+        try {
+            new ArrayGroupSort(1, -1);
+            fail("should have failed");
+        } catch (IllegalArgumentException e) {
+            // ok
+        }
+    }
+
+    @Test
+    public void arraySizeIsNotMultiple() {
+        ArrayGroupSort sorter = new ArrayGroupSort(1, 3);
+
+        try {
+            sorter.sort(new long[] { 1, 2, 3, 4 });
+            fail("should have failed");
+        } catch (IllegalArgumentException e) {
+            // ok
+        }
+    }
+
+    @Test
+    public void arraySizeIsShorterThanRequired() {
+        ArrayGroupSort sorter = new ArrayGroupSort(1, 3);
+
+        try {
+            sorter.sort(new long[] { 1, 2 });
+            fail("should have failed");
+        } catch (IllegalArgumentException e) {
+            // ok
+        }
+    }
+
+    @Test
+    public void emtpy() {
+        long[] data = new long[] {};
+
+        long[] expectedSorted = new long[] {};
+
+        ArrayGroupSort sorter = new ArrayGroupSort(2, 4);
+        sorter.sort(data);
+
+        assertArrayEquals(expectedSorted, data);
+    }
+
+    @Test
+    public void singleItem() {
+        long[] data = new long[] { 1, 2, 3, 4 };
+        long[] expectedSorted = new long[] { 1, 2, 3, 4 };
+
+        ArrayGroupSort sorter = new ArrayGroupSort(2, 4);
+        sorter.sort(data);
+
+        assertArrayEquals(expectedSorted, data);
+    }
+
+    @Test
+    public void twoItems() {
+        long[] data = new long[] { 1, 2, 3, 4, 1, 1, 5, 5 };
+        long[] expectedSorted = new long[] { 1, 1, 5, 5, 1, 2, 3, 4 };
+
+        ArrayGroupSort sorter = new ArrayGroupSort(2, 4);
+        sorter.sort(data);
+
+        assertArrayEquals(expectedSorted, data);
+    }
+
+    @Test
+    public void threeItems() {
+        long[] data = new long[] { 1, 2, 3, 4, 1, 1, 5, 5, 1, 0, 2, 1 };
+        long[] expectedSorted = new long[] { 1, 0, 2, 1, 1, 1, 5, 5, 1, 2, 3, 4 };
+
+        ArrayGroupSort sorter = new ArrayGroupSort(2, 4);
+        sorter.sort(data);
+
+        assertArrayEquals(expectedSorted, data);
+    }
+
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ConversionRollbackTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ConversionRollbackTest.java
new file mode 100644
index 000000000..73fb049a5
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ConversionRollbackTest.java
@@ -0,0 +1,141 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.BookieShell;
+import org.apache.bookkeeper.bookie.CheckpointSource;
+import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
+import org.apache.bookkeeper.bookie.Checkpointer;
+import org.apache.bookkeeper.bookie.InterleavedLedgerStorage;
+import org.apache.bookkeeper.bookie.LedgerDirsManager;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.util.DiskChecker;
+import org.apache.commons.io.FileUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class ConversionRollbackTest {
+
+    CheckpointSource checkpointSource = new CheckpointSource() {
+        @Override
+        public Checkpoint newCheckpoint() {
+            return Checkpoint.MAX;
+        }
+
+        @Override
+        public void checkpointComplete(Checkpoint checkpoint, boolean compact) throws IOException {
+        }
+    };
+
+    Checkpointer checkpointer = new Checkpointer() {
+        @Override
+        public void startCheckpoint(Checkpoint checkpoint) {
+            // No-op
+        }
+    };
+
+    @Test
+    public void convertFromDbStorageToInterleaved() throws Exception {
+        File tmpDir = File.createTempFile("bkTest", ".dir");
+        tmpDir.delete();
+        tmpDir.mkdir();
+        File curDir = Bookie.getCurrentDirectory(tmpDir);
+        Bookie.checkDirectoryStructure(curDir);
+
+        log.info("Using temp directory: {}", tmpDir);
+
+        ServerConfiguration conf = new ServerConfiguration();
+        conf.setLedgerDirNames(new String[] { tmpDir.toString() });
+        conf.setAllowLoopback(true);
+        LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
+
+        DbLedgerStorage dbStorage = new DbLedgerStorage();
+        dbStorage.initialize(conf, null, ledgerDirsManager, ledgerDirsManager, checkpointSource, checkpointer,
+                NullStatsLogger.INSTANCE);
+
+        // Insert some ledger & entries in the dbStorage
+        for (long ledgerId = 0; ledgerId < 5; ledgerId++) {
+            dbStorage.setMasterKey(ledgerId, ("ledger-" + ledgerId).getBytes());
+            dbStorage.setFenced(ledgerId);
+
+            for (long entryId = 0; entryId < 10000; entryId++) {
+                ByteBuf entry = Unpooled.buffer(128);
+                entry.writeLong(ledgerId);
+                entry.writeLong(entryId);
+                entry.writeBytes(("entry-" + entryId).getBytes());
+
+                dbStorage.addEntry(entry);
+            }
+        }
+
+        dbStorage.flush();
+        dbStorage.shutdown();
+
+        // Run conversion tool
+        BookieShell shell = new BookieShell();
+        shell.setConf(conf);
+        int res = shell.run(new String[] { "convert-to-interleaved-storage" });
+
+        Assert.assertEquals(0, res);
+
+        // Verify that interleaved storage index has the same entries
+        InterleavedLedgerStorage interleavedStorage = new InterleavedLedgerStorage();
+        interleavedStorage.initialize(conf, null, ledgerDirsManager, ledgerDirsManager, checkpointSource, checkpointer,
+                NullStatsLogger.INSTANCE);
+
+        Set<Long> ledgers = Sets.newTreeSet(interleavedStorage.getActiveLedgersInRange(0, Long.MAX_VALUE));
+        Assert.assertEquals(Sets.newTreeSet(Lists.newArrayList(0l, 1l, 2l, 3l, 4l)), ledgers);
+
+        for (long ledgerId = 0; ledgerId < 5; ledgerId++) {
+            Assert.assertEquals(true, interleavedStorage.isFenced(ledgerId));
+            Assert.assertEquals("ledger-" + ledgerId, new String(interleavedStorage.readMasterKey(ledgerId)));
+
+            for (long entryId = 0; entryId < 10000; entryId++) {
+                ByteBuf entry = Unpooled.buffer(1024);
+                entry.writeLong(ledgerId);
+                entry.writeLong(entryId);
+                entry.writeBytes(("entry-" + entryId).getBytes());
+
+                ByteBuf result = interleavedStorage.getEntry(ledgerId, entryId);
+                Assert.assertEquals(entry, result);
+            }
+        }
+
+        interleavedStorage.shutdown();
+        FileUtils.forceDelete(tmpDir);
+    }
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ConversionTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ConversionTest.java
new file mode 100644
index 000000000..5b6f7d853
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ConversionTest.java
@@ -0,0 +1,155 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.Bookie.NoLedgerException;
+import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
+import org.apache.bookkeeper.bookie.BookieShell;
+import org.apache.bookkeeper.bookie.CheckpointSource;
+import org.apache.bookkeeper.bookie.Checkpointer;
+import org.apache.bookkeeper.bookie.InterleavedLedgerStorage;
+import org.apache.bookkeeper.bookie.LedgerDirsManager;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.util.DiskChecker;
+import org.apache.commons.io.FileUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+public class ConversionTest {
+
+    CheckpointSource checkpointSource = new CheckpointSource() {
+        @Override
+        public Checkpoint newCheckpoint() {
+            return Checkpoint.MAX;
+        }
+
+        @Override
+        public void checkpointComplete(Checkpoint checkpoint, boolean compact) throws IOException {
+        }
+    };
+    
+    Checkpointer checkpointer = new Checkpointer() {
+        @Override
+        public void startCheckpoint(Checkpoint checkpoint) {
+            // No-op
+        }
+    };
+
+    @Test
+    public void test() throws Exception {
+        File tmpDir = File.createTempFile("bkTest", ".dir");
+        tmpDir.delete();
+        tmpDir.mkdir();
+        File curDir = Bookie.getCurrentDirectory(tmpDir);
+        Bookie.checkDirectoryStructure(curDir);
+
+        System.out.println(tmpDir);
+
+        ServerConfiguration conf = new ServerConfiguration();
+        conf.setLedgerDirNames(new String[] { tmpDir.toString() });
+        conf.setAllowLoopback(true);
+        LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
+
+        InterleavedLedgerStorage interleavedStorage = new InterleavedLedgerStorage();
+        interleavedStorage.initialize(conf, null, ledgerDirsManager, ledgerDirsManager, checkpointSource, checkpointer,
+                NullStatsLogger.INSTANCE);
+
+        // Insert some ledger & entries in the interleaved storage
+        for (long ledgerId = 0; ledgerId < 5; ledgerId++) {
+            interleavedStorage.setMasterKey(ledgerId, ("ledger-" + ledgerId).getBytes());
+            interleavedStorage.setFenced(ledgerId);
+
+            for (long entryId = 0; entryId < 10000; entryId++) {
+                ByteBuf entry = Unpooled.buffer(128);
+                entry.writeLong(ledgerId);
+                entry.writeLong(entryId);
+                entry.writeBytes(("entry-" + entryId).getBytes());
+
+                interleavedStorage.addEntry(entry);
+            }
+        }
+
+        interleavedStorage.flush();
+        interleavedStorage.shutdown();
+
+        // Run conversion tool
+        BookieShell shell = new BookieShell();
+        shell.setConf(conf);
+        int res = shell.run(new String[] { "convert-to-db-storage" });
+
+        Assert.assertEquals(0, res);
+
+        // Verify that db index has the same entries
+        DbLedgerStorage dbStorage = new DbLedgerStorage();
+        dbStorage.initialize(conf, null, ledgerDirsManager, ledgerDirsManager, checkpointSource, checkpointer,
+                NullStatsLogger.INSTANCE);
+
+        interleavedStorage = new InterleavedLedgerStorage();
+        interleavedStorage.initialize(conf, null, ledgerDirsManager, ledgerDirsManager, checkpointSource, checkpointer,
+                NullStatsLogger.INSTANCE);
+
+        Set<Long> ledgers = Sets.newTreeSet(dbStorage.getActiveLedgersInRange(0, Long.MAX_VALUE));
+        Assert.assertEquals(Sets.newTreeSet(Lists.newArrayList(0l, 1l, 2l, 3l, 4l)), ledgers);
+
+        ledgers = Sets.newTreeSet(interleavedStorage.getActiveLedgersInRange(0, Long.MAX_VALUE));
+        Assert.assertEquals(Sets.newTreeSet(), ledgers);
+
+        for (long ledgerId = 0; ledgerId < 5; ledgerId++) {
+            Assert.assertEquals(true, dbStorage.isFenced(ledgerId));
+            Assert.assertEquals("ledger-" + ledgerId, new String(dbStorage.readMasterKey(ledgerId)));
+
+            for (long entryId = 0; entryId < 10000; entryId++) {
+                ByteBuf entry = Unpooled.buffer(1024);
+                entry.writeLong(ledgerId);
+                entry.writeLong(entryId);
+                entry.writeBytes(("entry-" + entryId).getBytes());
+
+                ByteBuf result = dbStorage.getEntry(ledgerId, entryId);
+                Assert.assertEquals(entry, result);
+                result.release();
+
+                try {
+                    interleavedStorage.getEntry(ledgerId, entryId);
+                    Assert.fail("entry should not exist");
+                } catch (NoLedgerException e) {
+                    // Ok
+                }
+            }
+        }
+
+        interleavedStorage.shutdown();
+        dbStorage.shutdown();
+        FileUtils.forceDelete(tmpDir);
+    }
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageBookieTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageBookieTest.java
new file mode 100644
index 000000000..7bf10d8e8
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageBookieTest.java
@@ -0,0 +1,49 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.junit.Test;
+
+public class DbLedgerStorageBookieTest extends BookKeeperClusterTestCase {
+
+    public DbLedgerStorageBookieTest() {
+        super(1);
+        baseConf.setLedgerStorageClass(DbLedgerStorage.class.getName());
+        baseConf.setFlushInterval(60000);
+        baseConf.setGcWaitTime(60000);
+    }
+
+    @Test
+    public void testRecoveryEmptyLedger() throws Exception {
+        LedgerHandle lh1 = bkc.createLedger(1, 1, DigestType.MAC, new byte[0]);
+
+        // Force ledger close & recovery
+        LedgerHandle lh2 = bkc.openLedger(lh1.getId(), DigestType.MAC, new byte[0]);
+
+        assertEquals(0, lh2.getLength());
+        assertEquals(-1, lh2.getLastAddConfirmed());
+    }
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageRocksDBUpgrade.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageRocksDBUpgrade.java
new file mode 100644
index 000000000..9af51ac7a
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageRocksDBUpgrade.java
@@ -0,0 +1,172 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.CheckpointSource;
+import org.apache.bookkeeper.bookie.Checkpointer;
+import org.apache.bookkeeper.bookie.LedgerDirsManager;
+import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.util.DiskChecker;
+import org.apache.commons.io.FileUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Sets;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+public class DbLedgerStorageRocksDBUpgrade {
+
+    private File tmpDir;
+
+    @Before
+    public void setup() throws Exception {
+        tmpDir = File.createTempFile("bkTest", ".dir");
+        tmpDir.delete();
+        tmpDir.mkdir();
+        System.err.println("Writing to " + tmpDir);
+        File curDir = Bookie.getCurrentDirectory(tmpDir);
+        Bookie.checkDirectoryStructure(curDir);
+    }
+
+    DbLedgerStorage createStorage(boolean rocksDBEnabled) throws Exception {
+        int gcWaitTime = 1000;
+        ServerConfiguration conf = new ServerConfiguration();
+        conf.setGcWaitTime(gcWaitTime);
+        conf.setAllowLoopback(true);
+        conf.setLedgerStorageClass(DbLedgerStorage.class.getName());
+        conf.setLedgerDirNames(new String[] { tmpDir.toString() });
+
+        LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
+
+        DbLedgerStorage storage = new DbLedgerStorage();
+        storage.initialize(conf, null, ledgerDirsManager, ledgerDirsManager, checkpointSource, checkpointer,
+                NullStatsLogger.INSTANCE);
+
+        return storage;
+    }
+
+    @After
+    public void teardown() throws Exception {
+        FileUtils.deleteDirectory(tmpDir);
+    }
+
+    @Test
+    public void simple() throws Exception {
+        DbLedgerStorage levelDbStorage = createStorage(false);
+
+        insertEntries(levelDbStorage, 0, 5);
+
+        levelDbStorage.flush();
+        levelDbStorage.shutdown();
+
+        // Re-open with LevelDB to force sst creation
+        levelDbStorage = createStorage(false);
+        insertEntries(levelDbStorage, 5, 10);
+
+        levelDbStorage.flush();
+        levelDbStorage.shutdown();
+
+        // Re-open with RocksDB storage
+        DbLedgerStorage rocksDBStorage = createStorage(true);
+        verifyEntries(rocksDBStorage, 0, 10);
+
+        // Insert more entries
+        insertEntries(rocksDBStorage, 10, 15);
+        verifyEntries(rocksDBStorage, 0, 15);
+        rocksDBStorage.flush();
+        verifyEntries(rocksDBStorage, 0, 15);
+        rocksDBStorage.shutdown();
+    }
+
+    private void insertEntries(DbLedgerStorage storage, int firstLedger, int lastLedger) throws Exception {
+        // Insert some ledger & entries in the storage
+        for (long ledgerId = firstLedger; ledgerId < lastLedger; ledgerId++) {
+            storage.setMasterKey(ledgerId, ("ledger-" + ledgerId).getBytes());
+            storage.setFenced(ledgerId);
+
+            for (long entryId = 0; entryId < 10000; entryId++) {
+                ByteBuf entry = Unpooled.buffer(128);
+                entry.writeLong(ledgerId);
+                entry.writeLong(entryId);
+                entry.writeBytes(("entry-" + entryId).getBytes());
+
+                storage.addEntry(entry);
+            }
+        }
+    }
+
+    private void verifyEntries(DbLedgerStorage storage, int firstLedger, int lastLedger) throws Exception {
+        // Verify that db index has the same entries
+        Set<Long> ledgers = Sets.newTreeSet(storage.getActiveLedgersInRange(firstLedger, lastLedger));
+
+        Set<Long> expectedSet = Sets.newTreeSet();
+        for (long i = firstLedger; i < lastLedger; i++) {
+            expectedSet.add(i);
+        }
+        Assert.assertEquals(expectedSet, ledgers);
+
+        for (long ledgerId = firstLedger; ledgerId < lastLedger; ledgerId++) {
+            Assert.assertEquals(true, storage.isFenced(ledgerId));
+            Assert.assertEquals("ledger-" + ledgerId, new String(storage.readMasterKey(ledgerId)));
+
+            for (long entryId = 0; entryId < 10000; entryId++) {
+                ByteBuf entry = Unpooled.buffer(1024);
+                entry.writeLong(ledgerId);
+                entry.writeLong(entryId);
+                entry.writeBytes(("entry-" + entryId).getBytes());
+
+                ByteBuf result = storage.getEntry(ledgerId, entryId);
+                Assert.assertEquals(entry, result);
+                result.release();
+            }
+        }
+    }
+
+    CheckpointSource checkpointSource = new CheckpointSource() {
+        @Override
+        public Checkpoint newCheckpoint() {
+            return Checkpoint.MAX;
+        }
+
+        @Override
+        public void checkpointComplete(Checkpoint checkpoint, boolean compact) throws IOException {
+        }
+    };
+    
+    Checkpointer checkpointer = new Checkpointer() {
+        @Override
+        public void startCheckpoint(Checkpoint checkpoint) {
+            // No-op
+        }
+    };
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java
new file mode 100644
index 000000000..b9923a148
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java
@@ -0,0 +1,426 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import com.google.common.collect.Lists;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.buffer.Unpooled;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.Bookie.NoEntryException;
+import org.apache.bookkeeper.bookie.BookieException;
+import org.apache.bookkeeper.bookie.EntryLocation;
+import org.apache.bookkeeper.bookie.EntryLogger;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.proto.BookieProtocol;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class DbLedgerStorageTest {
+
+    private DbLedgerStorage storage;
+    private File tmpDir;
+
+    @Before
+    public void setup() throws Exception {
+        tmpDir = File.createTempFile("bkTest", ".dir");
+        tmpDir.delete();
+        tmpDir.mkdir();
+        File curDir = Bookie.getCurrentDirectory(tmpDir);
+        Bookie.checkDirectoryStructure(curDir);
+
+        int gcWaitTime = 1000;
+        ServerConfiguration conf = new ServerConfiguration();
+        conf.setGcWaitTime(gcWaitTime);
+        conf.setAllowLoopback(true);
+        conf.setLedgerStorageClass(DbLedgerStorage.class.getName());
+        conf.setLedgerDirNames(new String[] { tmpDir.toString() });
+        Bookie bookie = new Bookie(conf);
+
+        storage = (DbLedgerStorage) bookie.getLedgerStorage();
+    }
+
+    @After
+    public void teardown() throws Exception {
+        tmpDir.delete();
+    }
+
+    @Test
+    public void simple() throws Exception {
+        assertEquals(false, storage.ledgerExists(3));
+        try {
+            storage.isFenced(3);
+            fail("should have failed");
+        } catch (Bookie.NoLedgerException nle) {
+            // OK
+        }
+        assertEquals(false, storage.ledgerExists(3));
+        try {
+            storage.setFenced(3);
+            fail("should have failed");
+        } catch (Bookie.NoLedgerException nle) {
+            // OK
+        }
+        storage.setMasterKey(3, "key".getBytes());
+        try {
+            storage.setMasterKey(3, "other-key".getBytes());
+            fail("should have failed");
+        } catch (IOException ioe) {
+            assertTrue(ioe.getCause() instanceof BookieException.BookieIllegalOpException);
+        }
+        // setting the same key is NOOP
+        storage.setMasterKey(3, "key".getBytes());
+        assertEquals(true, storage.ledgerExists(3));
+        assertEquals(true, storage.setFenced(3));
+        assertEquals(true, storage.isFenced(3));
+        assertEquals(false, storage.setFenced(3));
+
+        storage.setMasterKey(4, "key".getBytes());
+        assertEquals(false, storage.isFenced(4));
+        assertEquals(true, storage.ledgerExists(4));
+
+        assertEquals("key", new String(storage.readMasterKey(4)));
+
+        assertEquals(Lists.newArrayList(4l, 3l), Lists.newArrayList(storage.getActiveLedgersInRange(0, 100)));
+        assertEquals(Lists.newArrayList(4l, 3l), Lists.newArrayList(storage.getActiveLedgersInRange(3, 100)));
+        assertEquals(Lists.newArrayList(3l), Lists.newArrayList(storage.getActiveLedgersInRange(0, 4)));
+
+        // Add / read entries
+        ByteBuf entry = Unpooled.buffer(1024);
+        entry.writeLong(4); // ledger id
+        entry.writeLong(1); // entry id
+        entry.writeBytes("entry-1".getBytes());
+
+        assertEquals(false, ((DbLedgerStorage) storage).isFlushRequired());
+
+        assertEquals(1, storage.addEntry(entry));
+
+        assertEquals(true, ((DbLedgerStorage) storage).isFlushRequired());
+
+        // Read from write cache
+        ByteBuf res = storage.getEntry(4, 1);
+        assertEquals(entry, res);
+
+        storage.flush();
+
+        assertEquals(false, ((DbLedgerStorage) storage).isFlushRequired());
+
+        // Read from db
+        res = storage.getEntry(4, 1);
+        assertEquals(entry, res);
+
+        try {
+            storage.getEntry(4, 2);
+            fail("Should have thrown exception");
+        } catch (NoEntryException e) {
+            // ok
+        }
+
+        ByteBuf entry2 = Unpooled.buffer(1024);
+        entry2.writeLong(4); // ledger id
+        entry2.writeLong(2); // entry id
+        entry2.writeBytes("entry-2".getBytes());
+
+        storage.addEntry(entry2);
+
+        // Read last entry in ledger
+        res = storage.getEntry(4, BookieProtocol.LAST_ADD_CONFIRMED);
+        assertEquals(entry2, res);
+
+        ByteBuf entry3 = Unpooled.buffer(1024);
+        entry3.writeLong(4); // ledger id
+        entry3.writeLong(3); // entry id
+        entry3.writeBytes("entry-3".getBytes());
+        storage.addEntry(entry3);
+
+        ByteBuf entry4 = Unpooled.buffer(1024);
+        entry4.writeLong(4); // ledger id
+        entry4.writeLong(4); // entry id
+        entry4.writeBytes("entry-4".getBytes());
+        storage.addEntry(entry4);
+
+        res = storage.getEntry(4, 4);
+        assertEquals(entry4, res);
+
+        // Delete
+        assertEquals(true, storage.ledgerExists(4));
+        storage.deleteLedger(4);
+        assertEquals(false, storage.ledgerExists(4));
+
+        // Should not throw exception event if the ledger was deleted
+        storage.getEntry(4, 4);
+
+        storage.addEntry(Unpooled.wrappedBuffer(entry2));
+        res = storage.getEntry(4, BookieProtocol.LAST_ADD_CONFIRMED);
+        assertEquals(entry4, res);
+
+        // Get last entry from storage
+        storage.flush();
+
+        try {
+            storage.getEntry(4, 4);
+            fail("Should have thrown exception since the ledger was deleted");
+        } catch (NoEntryException e) {
+            // ok
+        }
+
+        storage.shutdown();
+    }
+
+    @Test
+    public void testBookieCompaction() throws Exception {
+        storage.setMasterKey(4, "key".getBytes());
+
+        ByteBuf entry3 = Unpooled.buffer(1024);
+        entry3.writeLong(4); // ledger id
+        entry3.writeLong(3); // entry id
+        entry3.writeBytes("entry-3".getBytes());
+        storage.addEntry(entry3);
+
+        // Simulate bookie compaction
+        EntryLogger entryLogger = ((DbLedgerStorage) storage).getEntryLogger();
+        // Rewrite entry-3
+        ByteBuf newEntry3 = Unpooled.buffer(1024);
+        newEntry3.writeLong(4); // ledger id
+        newEntry3.writeLong(3); // entry id
+        newEntry3.writeBytes("new-entry-3".getBytes());
+        long location = entryLogger.addEntry(4, newEntry3, false);
+
+        List<EntryLocation> locations = Lists.newArrayList(new EntryLocation(4, 3, location));
+        storage.updateEntriesLocations(locations);
+
+        ByteBuf res = storage.getEntry(4, 3);
+        System.out.println("res:       " + ByteBufUtil.hexDump(res));
+        System.out.println("newEntry3: " + ByteBufUtil.hexDump(newEntry3));
+        assertEquals(newEntry3, res);
+
+        storage.shutdown();
+    }
+
+    @Test
+    public void doubleDirectoryError() throws Exception {
+        int gcWaitTime = 1000;
+        ServerConfiguration conf = new ServerConfiguration();
+        conf.setGcWaitTime(gcWaitTime);
+        conf.setAllowLoopback(true);
+        conf.setLedgerStorageClass(DbLedgerStorage.class.getName());
+        conf.setLedgerDirNames(new String[] { "dir1", "dir2" });
+
+        try {
+            new Bookie(conf);
+            fail("Should have failed because of the 2 directories");
+        } catch (IllegalArgumentException e) {
+            // ok
+        }
+
+    }
+
+    @Test
+    public void testRewritingEntries() throws Exception {
+        storage.setMasterKey(1, "key".getBytes());
+
+        try {
+            storage.getEntry(1, -1);
+            fail("Should throw exception");
+        } catch (Bookie.NoEntryException e) {
+            // ok
+        }
+
+        ByteBuf entry1 = Unpooled.buffer(1024);
+        entry1.writeLong(1); // ledger id
+        entry1.writeLong(1); // entry id
+        entry1.writeBytes("entry-1".getBytes());
+
+        storage.addEntry(entry1);
+        storage.flush();
+
+        ByteBuf newEntry1 = Unpooled.buffer(1024);
+        newEntry1.writeLong(1); // ledger id
+        newEntry1.writeLong(1); // entry id
+        newEntry1.writeBytes("new-entry-1".getBytes());
+
+        storage.addEntry(newEntry1);
+        storage.flush();
+
+        ByteBuf response = storage.getEntry(1, 1);
+        assertEquals(newEntry1, response);
+    }
+
+    @Test
+    public void testEntriesOutOfOrder() throws Exception {
+        storage.setMasterKey(1, "key".getBytes());
+
+        ByteBuf entry2 = Unpooled.buffer(1024);
+        entry2.writeLong(1); // ledger id
+        entry2.writeLong(2); // entry id
+        entry2.writeBytes("entry-2".getBytes());
+
+        storage.addEntry(entry2);
+
+        try {
+            storage.getEntry(1, 1);
+            fail("Entry doesn't exist");
+        } catch (NoEntryException e) {
+            // Ok, entry doesn't exist
+        }
+
+        ByteBuf res = storage.getEntry(1, 2);
+        assertEquals(entry2, res);
+
+        ByteBuf entry1 = Unpooled.buffer(1024);
+        entry1.writeLong(1); // ledger id
+        entry1.writeLong(1); // entry id
+        entry1.writeBytes("entry-1".getBytes());
+
+        storage.addEntry(entry1);
+
+        res = storage.getEntry(1, 1);
+        assertEquals(entry1, res);
+
+        res = storage.getEntry(1, 2);
+        assertEquals(entry2, res);
+
+        storage.flush();
+
+        res = storage.getEntry(1, 1);
+        assertEquals(entry1, res);
+
+        res = storage.getEntry(1, 2);
+        assertEquals(entry2, res);
+    }
+
+    @Test
+    public void testEntriesOutOfOrderWithFlush() throws Exception {
+        storage.setMasterKey(1, "key".getBytes());
+
+        ByteBuf entry2 = Unpooled.buffer(1024);
+        entry2.writeLong(1); // ledger id
+        entry2.writeLong(2); // entry id
+        entry2.writeBytes("entry-2".getBytes());
+
+        storage.addEntry(entry2);
+
+        try {
+            storage.getEntry(1, 1);
+            fail("Entry doesn't exist");
+        } catch (NoEntryException e) {
+            // Ok, entry doesn't exist
+        }
+
+        ByteBuf res = storage.getEntry(1, 2);
+        assertEquals(entry2, res);
+        res.release();
+
+        storage.flush();
+
+        try {
+            storage.getEntry(1, 1);
+            fail("Entry doesn't exist");
+        } catch (NoEntryException e) {
+            // Ok, entry doesn't exist
+        }
+
+        res = storage.getEntry(1, 2);
+        assertEquals(entry2, res);
+        res.release();
+
+        ByteBuf entry1 = Unpooled.buffer(1024);
+        entry1.writeLong(1); // ledger id
+        entry1.writeLong(1); // entry id
+        entry1.writeBytes("entry-1".getBytes());
+
+        storage.addEntry(entry1);
+
+        res = storage.getEntry(1, 1);
+        assertEquals(entry1, res);
+        res.release();
+
+        res = storage.getEntry(1, 2);
+        assertEquals(entry2, res);
+        res.release();
+
+        storage.flush();
+
+        res = storage.getEntry(1, 1);
+        assertEquals(entry1, res);
+        res.release();
+
+        res = storage.getEntry(1, 2);
+        assertEquals(entry2, res);
+        res.release();
+    }
+
+    @Test
+    public void testAddEntriesAfterDelete() throws Exception {
+        storage.setMasterKey(1, "key".getBytes());
+
+        ByteBuf entry0 = Unpooled.buffer(1024);
+        entry0.writeLong(1); // ledger id
+        entry0.writeLong(0); // entry id
+        entry0.writeBytes("entry-0".getBytes());
+
+        ByteBuf entry1 = Unpooled.buffer(1024);
+        entry1.writeLong(1); // ledger id
+        entry1.writeLong(1); // entry id
+        entry1.writeBytes("entry-1".getBytes());
+
+        storage.addEntry(entry0);
+        storage.addEntry(entry1);
+
+        storage.flush();
+
+        storage.deleteLedger(1);
+
+        storage.setMasterKey(1, "key".getBytes());
+
+        entry0 = Unpooled.buffer(1024);
+        entry0.writeLong(1); // ledger id
+        entry0.writeLong(0); // entry id
+        entry0.writeBytes("entry-0".getBytes());
+
+        entry1 = Unpooled.buffer(1024);
+        entry1.writeLong(1); // ledger id
+        entry1.writeLong(1); // entry id
+        entry1.writeBytes("entry-1".getBytes());
+
+        storage.addEntry(entry0);
+        storage.addEntry(entry1);
+
+        assertEquals(entry0, storage.getEntry(1, 0));
+        assertEquals(entry1, storage.getEntry(1, 1));
+
+        storage.flush();
+    }
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWriteCacheTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWriteCacheTest.java
new file mode 100644
index 000000000..cfe55ac47
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWriteCacheTest.java
@@ -0,0 +1,130 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+public class DbLedgerStorageWriteCacheTest {
+
+    private DbLedgerStorage storage;
+    private File tmpDir;
+
+    private static class MockedDbLedgerStorage extends DbLedgerStorage {
+
+        @Override
+        public synchronized void flush() throws IOException {
+            // Swap the write caches and block indefinitely to simulate a slow disk
+            WriteCache tmp = writeCacheBeingFlushed;
+            writeCacheBeingFlushed = writeCache;
+            writeCache = tmp;
+
+            // since the cache is switched, we can allow flush to be triggered
+            hasFlushBeenTriggered.set(false);
+
+            // Block the flushing thread
+            while (true) {
+                try {
+                    Thread.sleep(1000);
+                } catch (InterruptedException e) {
+                    return;
+                }
+            }
+        }
+
+    }
+
+    @Before
+    public void setup() throws Exception {
+        tmpDir = File.createTempFile("bkTest", ".dir");
+        tmpDir.delete();
+        tmpDir.mkdir();
+        File curDir = Bookie.getCurrentDirectory(tmpDir);
+        Bookie.checkDirectoryStructure(curDir);
+
+        int gcWaitTime = 1000;
+        ServerConfiguration conf = new ServerConfiguration();
+        conf.setGcWaitTime(gcWaitTime);
+        conf.setAllowLoopback(true);
+        conf.setLedgerStorageClass(MockedDbLedgerStorage.class.getName());
+        conf.setProperty(DbLedgerStorage.WRITE_CACHE_MAX_SIZE_MB, 1);
+        conf.setLedgerDirNames(new String[] { tmpDir.toString() });
+        Bookie bookie = new Bookie(conf);
+
+        storage = (DbLedgerStorage) bookie.getLedgerStorage();
+    }
+
+    @After
+    public void teardown() throws Exception {
+        tmpDir.delete();
+    }
+
+    @Test
+    public void writeCacheFull() throws Exception {
+        storage.setMasterKey(4, "key".getBytes());
+        assertEquals(false, storage.isFenced(4));
+        assertEquals(true, storage.ledgerExists(4));
+
+        assertEquals("key", new String(storage.readMasterKey(4)));
+
+        // Add enough entries to fill the 1st write cache
+        for (int i = 0; i < 5; i++) {
+            ByteBuf entry = Unpooled.buffer(100 * 1024 + 2 * 8);
+            entry.writeLong(4); // ledger id
+            entry.writeLong(i); // entry id
+            entry.writeZero(100 * 1024);
+            storage.addEntry(entry);
+        }
+
+        for (int i = 0; i < 5; i++) {
+            ByteBuf entry = Unpooled.buffer(100 * 1024 + 2 * 8);
+            entry.writeLong(4); // ledger id
+            entry.writeLong(5 + i); // entry id
+            entry.writeZero(100 * 1024);
+            storage.addEntry(entry);
+        }
+
+        // Next add should fail for cache full
+        ByteBuf entry = Unpooled.buffer(100 * 1024 + 2 * 8);
+        entry.writeLong(4); // ledger id
+        entry.writeLong(22); // entry id
+        entry.writeZero(100 * 1024);
+
+        try {
+            storage.addEntry(entry);
+            fail("Should have thrown exception");
+        } catch (IOException e) {
+            // Expected
+        }
+    }
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndexTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndexTest.java
new file mode 100644
index 000000000..ef2bec4b2
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndexTest.java
@@ -0,0 +1,108 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.junit.Test;
+
+public class EntryLocationIndexTest {
+
+    private final ServerConfiguration serverConfiguration = new ServerConfiguration();
+
+    @Test
+    public void deleteLedgerTest() throws Exception {
+        File tmpDir = File.createTempFile("bkTest", ".dir");
+        tmpDir.delete();
+        tmpDir.mkdir();
+        tmpDir.deleteOnExit();
+
+        EntryLocationIndex idx = new EntryLocationIndex(serverConfiguration, KeyValueStorageRocksDB.factory,
+                tmpDir.getAbsolutePath(), NullStatsLogger.INSTANCE);
+
+        // Add some dummy indexes
+        idx.addLocation(40312, 0, 1);
+        idx.addLocation(40313, 10, 2);
+        idx.addLocation(40320, 0, 3);
+
+        // Add more indexes in a different batch
+        idx.addLocation(40313, 11, 5);
+        idx.addLocation(40313, 12, 6);
+        idx.addLocation(40320, 1, 7);
+        idx.addLocation(40312, 3, 4);
+
+        idx.delete(40313);
+
+        assertEquals(1, idx.getLocation(40312, 0));
+        assertEquals(4, idx.getLocation(40312, 3));
+        assertEquals(3, idx.getLocation(40320, 0));
+        assertEquals(7, idx.getLocation(40320, 1));
+
+        assertEquals(2, idx.getLocation(40313, 10));
+        assertEquals(5, idx.getLocation(40313, 11));
+        assertEquals(6, idx.getLocation(40313, 12));
+
+        idx.removeOffsetFromDeletedLedgers();
+
+        // After flush the keys will be removed
+        assertEquals(0, idx.getLocation(40313, 10));
+        assertEquals(0, idx.getLocation(40313, 11));
+        assertEquals(0, idx.getLocation(40313, 12));
+
+        idx.close();
+    }
+
+    // this tests if a ledger is added after it has been deleted
+    @Test
+    public void addLedgerAfterDeleteTest() throws Exception {
+        File tmpDir = File.createTempFile("bkTest", ".dir");
+        tmpDir.delete();
+        tmpDir.mkdir();
+        tmpDir.deleteOnExit();
+
+        EntryLocationIndex idx = new EntryLocationIndex(serverConfiguration, KeyValueStorageRocksDB.factory,
+                tmpDir.getAbsolutePath(), NullStatsLogger.INSTANCE);
+
+        // Add some dummy indexes
+        idx.addLocation(40312, 0, 1);
+        idx.addLocation(40313, 10, 2);
+        idx.addLocation(40320, 0, 3);
+
+        idx.delete(40313);
+
+        // Add more indexes in a different batch
+        idx.addLocation(40313, 11, 5);
+        idx.addLocation(40313, 12, 6);
+        idx.addLocation(40320, 1, 7);
+        idx.addLocation(40312, 3, 4);
+
+        idx.removeOffsetFromDeletedLedgers();
+
+        assertEquals(0, idx.getLocation(40313, 11));
+        assertEquals(0, idx.getLocation(40313, 12));
+
+        idx.close();
+    }
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageTest.java
new file mode 100644
index 000000000..3092e9680
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageTest.java
@@ -0,0 +1,172 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.collect.Lists;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorage.Batch;
+import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorage.CloseableIterator;
+import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorageFactory.DbConfigType;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+public class KeyValueStorageTest {
+
+    private final KeyValueStorageFactory storageFactory;
+    private final ServerConfiguration configuration;
+
+    @Parameters
+    public static Collection<Object[]> configs() {
+        return Arrays.asList(new Object[][] { { KeyValueStorageRocksDB.factory } });
+    }
+
+    public KeyValueStorageTest(KeyValueStorageFactory storageFactory) {
+        this.storageFactory = storageFactory;
+        this.configuration = new ServerConfiguration();
+    }
+
+    private static long fromArray(byte[] array) {
+        return ArrayUtil.getLong(array, 0);
+    }
+
+    private static byte[] toArray(long n) {
+        byte[] b = new byte[8];
+        ArrayUtil.setLong(b, 0, n);
+        return b;
+    }
+
+    @Test
+    public void simple() throws Exception {
+        File tmpDir = File.createTempFile("bookie", "test");
+        tmpDir.delete();
+
+        KeyValueStorage db = storageFactory.newKeyValueStorage(tmpDir.getAbsolutePath(), DbConfigType.Small,
+                configuration);
+
+        assertEquals(null, db.getFloor(toArray(3)));
+        assertEquals(0, db.count());
+
+        db.put(toArray(5), toArray(5));
+
+        assertEquals(null, db.getFloor(toArray(3)));
+        assertEquals(1, db.count());
+
+        assertEquals(null, db.getFloor(toArray(5)));
+        assertEquals(5, fromArray(db.getFloor(toArray(6)).getKey()));
+
+        db.put(toArray(3), toArray(3));
+
+        assertEquals(null, db.getFloor(toArray(3)));
+        assertEquals(2, db.count());
+
+        // //
+
+        db.put(toArray(5), toArray(5));
+        // Count can be imprecise
+        assertTrue(db.count() > 0);
+
+        assertEquals(null, db.getFloor(toArray(1)));
+        assertEquals(null, db.getFloor(toArray(3)));
+        assertEquals(3, fromArray(db.getFloor(toArray(5)).getKey()));
+        assertEquals(5, fromArray(db.getFloor(toArray(6)).getKey()));
+        assertEquals(5, fromArray(db.getFloor(toArray(10)).getKey()));
+
+        // Iterate
+        List<Long> foundKeys = Lists.newArrayList();
+        CloseableIterator<Entry<byte[], byte[]>> iter = db.iterator();
+        try {
+            while (iter.hasNext()) {
+                foundKeys.add(fromArray(iter.next().getKey()));
+            }
+        } finally {
+            iter.close();
+        }
+
+        assertEquals(Lists.newArrayList(3l, 5l), foundKeys);
+
+        // Iterate over keys
+        foundKeys = Lists.newArrayList();
+        CloseableIterator<byte[]> iter2 = db.keys();
+        try {
+            while (iter2.hasNext()) {
+                foundKeys.add(fromArray(iter2.next()));
+            }
+        } finally {
+            iter2.close();
+        }
+
+        assertEquals(Lists.newArrayList(3l, 5l), foundKeys);
+
+        // Scan with limits
+        foundKeys = Lists.newArrayList();
+        iter2 = db.keys(toArray(1), toArray(4));
+        try {
+            while (iter2.hasNext()) {
+                foundKeys.add(fromArray(iter2.next()));
+            }
+        } finally {
+            iter2.close();
+        }
+
+        assertEquals(Lists.newArrayList(3l), foundKeys);
+
+        // Test deletion
+        db.put(toArray(10), toArray(10));
+        db.put(toArray(11), toArray(11));
+        db.put(toArray(12), toArray(12));
+        db.put(toArray(14), toArray(14));
+
+        // Count can be imprecise
+        assertTrue(db.count() > 0);
+
+        assertEquals(10l, fromArray(db.get(toArray(10))));
+        db.delete(toArray(10));
+        assertEquals(null, db.get(toArray(10)));
+        assertTrue(db.count() > 0);
+
+        Batch batch = db.newBatch();
+        batch.remove(toArray(11));
+        batch.remove(toArray(12));
+        batch.remove(toArray(13));
+        batch.flush();
+        assertEquals(null, db.get(toArray(11)));
+        assertEquals(null, db.get(toArray(12)));
+        assertEquals(null, db.get(toArray(13)));
+        assertEquals(14l, fromArray(db.get(toArray(14))));
+        batch.close();
+
+        db.close();
+        tmpDir.delete();
+    }
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildTest.java
new file mode 100644
index 000000000..bfac9d6ac
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildTest.java
@@ -0,0 +1,145 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.BookieShell;
+import org.apache.bookkeeper.bookie.CheckpointSource;
+import org.apache.bookkeeper.bookie.Checkpointer;
+import org.apache.bookkeeper.bookie.LedgerDirsManager;
+import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.util.DiskChecker;
+import org.apache.commons.io.FileUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+public class LocationsIndexRebuildTest {
+
+    CheckpointSource checkpointSource = new CheckpointSource() {
+        @Override
+        public Checkpoint newCheckpoint() {
+            return Checkpoint.MAX;
+        }
+
+        @Override
+        public void checkpointComplete(Checkpoint checkpoint, boolean compact) throws IOException {
+        }
+    };
+
+    Checkpointer checkpointer = new Checkpointer() {
+        @Override
+        public void startCheckpoint(Checkpoint checkpoint) {
+            // No-op
+        }
+    };
+
+    @Test
+    public void test() throws Exception {
+        File tmpDir = File.createTempFile("bkTest", ".dir");
+        tmpDir.delete();
+        tmpDir.mkdir();
+        File curDir = Bookie.getCurrentDirectory(tmpDir);
+        Bookie.checkDirectoryStructure(curDir);
+
+        System.out.println(tmpDir);
+
+        ServerConfiguration conf = new ServerConfiguration();
+        conf.setLedgerDirNames(new String[] { tmpDir.toString() });
+        conf.setLedgerStorageClass(DbLedgerStorage.class.getName());
+        conf.setAllowLoopback(true);
+        LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
+
+        DbLedgerStorage ledgerStorage = new DbLedgerStorage();
+        ledgerStorage.initialize(conf, null, ledgerDirsManager, ledgerDirsManager, checkpointSource, checkpointer,
+                NullStatsLogger.INSTANCE);
+
+        // Insert some ledger & entries in the storage
+        for (long ledgerId = 0; ledgerId < 5; ledgerId++) {
+            ledgerStorage.setMasterKey(ledgerId, ("ledger-" + ledgerId).getBytes());
+            ledgerStorage.setFenced(ledgerId);
+
+            for (long entryId = 0; entryId < 100; entryId++) {
+                ByteBuf entry = Unpooled.buffer(128);
+                entry.writeLong(ledgerId);
+                entry.writeLong(entryId);
+                entry.writeBytes(("entry-" + entryId).getBytes());
+
+                ledgerStorage.addEntry(entry);
+            }
+        }
+
+        ledgerStorage.flush();
+        ledgerStorage.shutdown();
+
+        // Rebuild index through the tool
+        BookieShell shell = new BookieShell();
+        shell.setConf(conf);
+        int res = shell.run(new String[] { "rebuild-db-ledger-locations-index" });
+
+        Assert.assertEquals(0, res);
+
+        // Verify that db index has the same entries
+        ledgerStorage = new DbLedgerStorage();
+        ledgerStorage.initialize(conf, null, ledgerDirsManager, ledgerDirsManager, checkpointSource, checkpointer,
+                NullStatsLogger.INSTANCE);
+
+        Set<Long> ledgers = Sets.newTreeSet(ledgerStorage.getActiveLedgersInRange(0, Long.MAX_VALUE));
+        Assert.assertEquals(Sets.newTreeSet(Lists.newArrayList(0l, 1l, 2l, 3l, 4l)), ledgers);
+
+        for (long ledgerId = 0; ledgerId < 5; ledgerId++) {
+            Assert.assertEquals(true, ledgerStorage.isFenced(ledgerId));
+            Assert.assertEquals("ledger-" + ledgerId, new String(ledgerStorage.readMasterKey(ledgerId)));
+
+            ByteBuf lastEntry = ledgerStorage.getLastEntry(ledgerId);
+            assertEquals(ledgerId, lastEntry.readLong());
+            long lastEntryId = lastEntry.readLong();
+            assertEquals(99, lastEntryId);
+
+            for (long entryId = 0; entryId < 100; entryId++) {
+                ByteBuf entry = Unpooled.buffer(1024);
+                entry.writeLong(ledgerId);
+                entry.writeLong(entryId);
+                entry.writeBytes(("entry-" + entryId).getBytes());
+
+                ByteBuf result = ledgerStorage.getEntry(ledgerId, entryId);
+                Assert.assertEquals(entry, result);
+            }
+        }
+
+        ledgerStorage.shutdown();
+        FileUtils.forceDelete(tmpDir);
+    }
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ReadCacheTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ReadCacheTest.java
new file mode 100644
index 000000000..c309bc1fa
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/ReadCacheTest.java
@@ -0,0 +1,69 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import org.junit.Test;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+public class ReadCacheTest {
+
+    @Test
+    public void simple() {
+        ReadCache cache = new ReadCache(10 * 1024);
+
+        assertEquals(0, cache.count());
+        assertEquals(0, cache.size());
+
+        ByteBuf entry = Unpooled.wrappedBuffer(new byte[1024]);
+        cache.put(1, 0, entry);
+
+        assertEquals(1, cache.count());
+        assertEquals(1024, cache.size());
+
+        assertEquals(entry, cache.get(1, 0));
+        assertNull(cache.get(1, 1));
+
+        for (int i = 1; i < 10; i++) {
+            cache.put(1, i, entry);
+        }
+
+        assertEquals(10, cache.count());
+        assertEquals(10 * 1024, cache.size());
+
+        cache.put(1, 10, entry);
+
+        // First half of entries will have been evicted
+        for (int i = 0; i < 5; i++) {
+            assertNull(cache.get(1, i));
+        }
+
+        for (int i = 5; i < 11; i++) {
+            assertEquals(entry, cache.get(1, i));
+        }
+
+        cache.close();
+    }
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/WriteCacheTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/WriteCacheTest.java
new file mode 100644
index 000000000..911518a2d
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/WriteCacheTest.java
@@ -0,0 +1,113 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Test;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.buffer.PooledByteBufAllocator;
+
+public class WriteCacheTest {
+
+    @Test
+    public void simple() throws Exception {
+        WriteCache cache = new WriteCache(10 * 1024);
+
+        ByteBuf entry1 = PooledByteBufAllocator.DEFAULT.buffer(1024);
+        ByteBufUtil.writeUtf8(entry1, "entry-1");
+        entry1.writerIndex(entry1.capacity());
+
+        assertTrue(cache.isEmpty());
+        assertEquals(0, cache.count());
+        assertEquals(0, cache.size());
+
+        cache.put(1, 1, entry1);
+
+        assertFalse(cache.isEmpty());
+        assertEquals(1, cache.count());
+        assertEquals(entry1.readableBytes(), cache.size());
+
+        assertEquals(entry1, cache.get(1, 1));
+        assertNull(cache.get(1, 2));
+        assertNull(cache.get(2, 1));
+
+        assertEquals(entry1, cache.getLastEntry(1));
+        assertEquals(null, cache.getLastEntry(2));
+
+        cache.clear();
+
+        assertTrue(cache.isEmpty());
+        assertEquals(0, cache.count());
+        assertEquals(0, cache.size());
+
+        entry1.release();
+        cache.close();
+    }
+
+    @Test
+    public void cacheFull() throws Exception {
+        int cacheSize = 10 * 1024;
+        int entrySize = 1024;
+        int entriesCount = cacheSize / entrySize;
+
+        WriteCache cache = new WriteCache(cacheSize);
+
+        ByteBuf entry = PooledByteBufAllocator.DEFAULT.buffer(entrySize);
+        entry.writerIndex(entry.capacity());
+
+        for (int i = 0; i < entriesCount; i++) {
+            assertTrue(cache.put(1, i, entry));
+        }
+
+        assertFalse(cache.put(1, 11, entry));
+
+        assertFalse(cache.isEmpty());
+        assertEquals(entriesCount, cache.count());
+        assertEquals(cacheSize, cache.size());
+
+        AtomicInteger findCount = new AtomicInteger(0);
+        cache.forEach((ledgerId, entryId, data) -> {
+            findCount.incrementAndGet();
+        });
+
+        assertEquals(entriesCount, findCount.get());
+
+        cache.deleteLedger(1);
+
+        findCount.set(0);
+        cache.forEach((ledgerId, entryId, data) -> {
+            findCount.incrementAndGet();
+        });
+
+        assertEquals(0, findCount.get());
+
+        entry.release();
+        cache.close();
+    }
+}
diff --git a/buildtools/src/main/resources/bookkeeper/findbugsExclude.xml b/buildtools/src/main/resources/bookkeeper/findbugsExclude.xml
index 2cc5ce766..736c88b11 100644
--- a/buildtools/src/main/resources/bookkeeper/findbugsExclude.xml
+++ b/buildtools/src/main/resources/bookkeeper/findbugsExclude.xml
@@ -24,6 +24,10 @@
     <!-- generated code, we can't be held responsible for findbugs in it //-->
     <Class name="~org\.apache\.bookkeeper\.proto\.BookkeeperProtocol.*" />
   </Match>
+  <Match>
+    <!-- generated code, we can't be held responsible for findbugs in it //-->
+    <Class name="~org\.apache\.bookkeeper\.bookie\.storage\.ldb\.DbLedgerStorageDataFormats.*" />
+  </Match>
   <Match>
     <!-- generated code, we can't be held responsible for findbugs in it //-->
     <Class name="~org\.apache\.bookkeeper\.tests\.generated.*" />


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services