You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2017/12/15 17:29:24 UTC

[GitHub] merlimat closed pull request #855: DbLedgerStorage -- Main implementation

merlimat closed pull request #855: DbLedgerStorage -- Main implementation
URL: https://github.com/apache/bookkeeper/pull/855
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bookkeeper-proto/pom.xml b/bookkeeper-proto/pom.xml
index cc9169da7..388bdcb7b 100644
--- a/bookkeeper-proto/pom.xml
+++ b/bookkeeper-proto/pom.xml
@@ -45,6 +45,7 @@
             <!-- exclude generated file //-->
             <exclude>**/DataFormats.java</exclude>
             <exclude>**/BookkeeperProtocol.java</exclude>
+            <exclude>**/DbLedgerStorageDataFormats.java</exclude>
           </excludes>
         </configuration>
       </plugin>
diff --git a/bookkeeper-proto/src/main/proto/DbLedgerStorageDataFormats.proto b/bookkeeper-proto/src/main/proto/DbLedgerStorageDataFormats.proto
new file mode 100644
index 000000000..e68b2d0d5
--- /dev/null
+++ b/bookkeeper-proto/src/main/proto/DbLedgerStorageDataFormats.proto
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+syntax = "proto2";
+
+option java_package = "org.apache.bookkeeper.bookie.storage.ldb";
+option optimize_for = SPEED;
+
+/**
+ * Ledger metadata stored in the bookie
+ */
+message LedgerData {
+    required bool exists = 1;
+    required bool fenced = 2;
+    required bytes masterKey = 3;
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
new file mode 100644
index 000000000..fa8001677
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
@@ -0,0 +1,794 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.protobuf.ByteString;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.util.concurrent.DefaultThreadFactory;
+
+import java.io.IOException;
+import java.util.Observable;
+import java.util.Observer;
+import java.util.SortedMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.Bookie.NoEntryException;
+import org.apache.bookkeeper.bookie.BookieException;
+import org.apache.bookkeeper.bookie.CheckpointSource;
+import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
+import org.apache.bookkeeper.bookie.Checkpointer;
+import org.apache.bookkeeper.bookie.CompactableLedgerStorage;
+import org.apache.bookkeeper.bookie.EntryLocation;
+import org.apache.bookkeeper.bookie.EntryLogger;
+import org.apache.bookkeeper.bookie.GarbageCollectorThread;
+import org.apache.bookkeeper.bookie.LedgerDirsManager;
+import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorageDataFormats.LedgerData;
+import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorage.Batch;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.proto.BookieProtocol;
+import org.apache.bookkeeper.stats.Gauge;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.util.MathUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of LedgerStorage that uses RocksDB to keep the indexes for
+ * entries stored in EntryLogs.
+ */
+public class DbLedgerStorage implements CompactableLedgerStorage {
+
+    private EntryLogger entryLogger;
+
+    private LedgerMetadataIndex ledgerIndex;
+    private EntryLocationIndex entryLocationIndex;
+
+    private GarbageCollectorThread gcThread;
+
+    // Write cache where all new entries are inserted into
+    protected WriteCache writeCache;
+
+    // Write cache that is used to swap with writeCache during flushes
+    protected WriteCache writeCacheBeingFlushed;
+
+    // Cache where we insert entries for speculative reading
+    private ReadCache readCache;
+
+    private final ReentrantReadWriteLock writeCacheMutex = new ReentrantReadWriteLock();
+    private final Condition flushWriteCacheCondition = writeCacheMutex.writeLock().newCondition();
+
+    protected final ReentrantLock flushMutex = new ReentrantLock();
+
+    protected final AtomicBoolean hasFlushBeenTriggered = new AtomicBoolean(false);
+    private final AtomicBoolean isFlushOngoing = new AtomicBoolean(false);
+
+    private final ExecutorService executor = Executors.newSingleThreadExecutor(new DefaultThreadFactory("db-storage"));
+
+    // Executor used to for db index cleanup
+    private final ExecutorService cleanupExecutor = Executors
+            .newSingleThreadExecutor(new DefaultThreadFactory("db-storage-cleanup"));
+
+    static final String WRITE_CACHE_MAX_SIZE_MB = "dbStorage_writeCacheMaxSizeMb";
+    static final String READ_AHEAD_CACHE_BATCH_SIZE = "dbStorage_readAheadCacheBatchSize";
+    static final String READ_AHEAD_CACHE_MAX_SIZE_MB = "dbStorage_readAheadCacheMaxSizeMb";
+
+    private static final long DEFAULT_WRITE_CACHE_MAX_SIZE_MB = 16;
+    private static final long DEFAULT_READ_CACHE_MAX_SIZE_MB = 16;
+    private static final int DEFAULT_READ_AHEAD_CACHE_BATCH_SIZE = 100;
+
+    private static final int MB = 1024 * 1024;
+
+    private final CopyOnWriteArrayList<LedgerDeletionListener> ledgerDeletionListeners = Lists
+            .newCopyOnWriteArrayList();
+
+    private long writeCacheMaxSize;
+
+    private CheckpointSource checkpointSource = null;
+    private Checkpoint lastCheckpoint = Checkpoint.MIN;
+
+    private long readCacheMaxSize;
+    private int readAheadCacheBatchSize;
+
+    private StatsLogger stats;
+
+    private OpStatsLogger addEntryStats;
+    private OpStatsLogger readEntryStats;
+    private OpStatsLogger readCacheHitStats;
+    private OpStatsLogger readCacheMissStats;
+    private OpStatsLogger readAheadBatchCountStats;
+    private OpStatsLogger readAheadBatchSizeStats;
+    private OpStatsLogger flushStats;
+    private OpStatsLogger flushSizeStats;
+
+    @Override
+    public void initialize(ServerConfiguration conf, LedgerManager ledgerManager, LedgerDirsManager ledgerDirsManager,
+            LedgerDirsManager indexDirsManager, CheckpointSource checkpointSource, Checkpointer checkpointer,
+            StatsLogger statsLogger) throws IOException {
+        checkArgument(ledgerDirsManager.getAllLedgerDirs().size() == 1,
+                "Db implementation only allows for one storage dir");
+
+        String baseDir = ledgerDirsManager.getAllLedgerDirs().get(0).toString();
+
+        writeCacheMaxSize = conf.getLong(WRITE_CACHE_MAX_SIZE_MB, DEFAULT_WRITE_CACHE_MAX_SIZE_MB) * MB;
+
+        writeCache = new WriteCache(writeCacheMaxSize / 2);
+        writeCacheBeingFlushed = new WriteCache(writeCacheMaxSize / 2);
+
+        this.checkpointSource = checkpointSource;
+
+        readCacheMaxSize = conf.getLong(READ_AHEAD_CACHE_MAX_SIZE_MB, DEFAULT_READ_CACHE_MAX_SIZE_MB) * MB;
+        readAheadCacheBatchSize = conf.getInt(READ_AHEAD_CACHE_BATCH_SIZE, DEFAULT_READ_AHEAD_CACHE_BATCH_SIZE);
+
+        readCache = new ReadCache(readCacheMaxSize);
+
+        this.stats = statsLogger;
+
+        log.info("Started Db Ledger Storage");
+        log.info(" - Write cache size: {} MB", writeCacheMaxSize / MB);
+        log.info(" - Read Cache: {} MB", readCacheMaxSize / MB);
+        log.info(" - Read Ahead Batch size: : {}", readAheadCacheBatchSize);
+
+        ledgerIndex = new LedgerMetadataIndex(conf, KeyValueStorageRocksDB.factory, baseDir, stats);
+        entryLocationIndex = new EntryLocationIndex(conf, KeyValueStorageRocksDB.factory, baseDir, stats);
+
+        entryLogger = new EntryLogger(conf, ledgerDirsManager);
+        gcThread = new GarbageCollectorThread(conf, ledgerManager, this);
+
+        registerStats();
+    }
+
+    public void registerStats() {
+        stats.registerGauge("write-cache-size", new Gauge<Long>() {
+            @Override
+            public Long getDefaultValue() {
+                return 0L;
+            }
+
+            @Override
+            public Long getSample() {
+                return writeCache.size() + writeCacheBeingFlushed.size();
+            }
+        });
+        stats.registerGauge("write-cache-count", new Gauge<Long>() {
+            @Override
+            public Long getDefaultValue() {
+                return 0L;
+            }
+
+            @Override
+            public Long getSample() {
+                return writeCache.count() + writeCacheBeingFlushed.count();
+            }
+        });
+        stats.registerGauge("read-cache-size", new Gauge<Long>() {
+            @Override
+            public Long getDefaultValue() {
+                return 0L;
+            }
+
+            @Override
+            public Long getSample() {
+                return readCache.size();
+            }
+        });
+        stats.registerGauge("read-cache-count", new Gauge<Long>() {
+            @Override
+            public Long getDefaultValue() {
+                return 0L;
+            }
+
+            @Override
+            public Long getSample() {
+                return readCache.count();
+            }
+        });
+
+        addEntryStats = stats.getOpStatsLogger("add-entry");
+        readEntryStats = stats.getOpStatsLogger("read-entry");
+        readCacheHitStats = stats.getOpStatsLogger("read-cache-hits");
+        readCacheMissStats = stats.getOpStatsLogger("read-cache-misses");
+        readAheadBatchCountStats = stats.getOpStatsLogger("readahead-batch-count");
+        readAheadBatchSizeStats = stats.getOpStatsLogger("readahead-batch-size");
+        flushStats = stats.getOpStatsLogger("flush");
+        flushSizeStats = stats.getOpStatsLogger("flush-size");
+    }
+
+    @Override
+    public void start() {
+        gcThread.start();
+    }
+
+    @Override
+    public void shutdown() throws InterruptedException {
+        try {
+            flush();
+
+            gcThread.shutdown();
+            entryLogger.shutdown();
+
+            cleanupExecutor.shutdown();
+            cleanupExecutor.awaitTermination(1, TimeUnit.SECONDS);
+
+            ledgerIndex.close();
+            entryLocationIndex.close();
+
+            writeCache.close();
+            writeCacheBeingFlushed.close();
+            readCache.close();
+            executor.shutdown();
+
+        } catch (IOException e) {
+            log.error("Error closing db storage", e);
+        }
+    }
+
+    @Override
+    public boolean ledgerExists(long ledgerId) throws IOException {
+        try {
+            LedgerData ledgerData = ledgerIndex.get(ledgerId);
+            if (log.isDebugEnabled()) {
+                log.debug("Ledger exists. ledger: {} : {}", ledgerId, ledgerData.getExists());
+            }
+            return ledgerData.getExists();
+        } catch (Bookie.NoLedgerException nle) {
+            // ledger does not exist
+            return false;
+        }
+    }
+
+    @Override
+    public boolean isFenced(long ledgerId) throws IOException {
+        if (log.isDebugEnabled()) {
+            log.debug("isFenced. ledger: {}", ledgerId);
+        }
+        return ledgerIndex.get(ledgerId).getFenced();
+    }
+
+    @Override
+    public boolean setFenced(long ledgerId) throws IOException {
+        if (log.isDebugEnabled()) {
+            log.debug("Set fenced. ledger: {}", ledgerId);
+        }
+        return ledgerIndex.setFenced(ledgerId);
+    }
+
+    @Override
+    public void setMasterKey(long ledgerId, byte[] masterKey) throws IOException {
+        if (log.isDebugEnabled()) {
+            log.debug("Set master key. ledger: {}", ledgerId);
+        }
+        ledgerIndex.setMasterKey(ledgerId, masterKey);
+    }
+
+    @Override
+    public byte[] readMasterKey(long ledgerId) throws IOException, BookieException {
+        if (log.isDebugEnabled()) {
+            log.debug("Read master key. ledger: {}", ledgerId);
+        }
+        return ledgerIndex.get(ledgerId).getMasterKey().toByteArray();
+    }
+
+    @Override
+    public long addEntry(ByteBuf entry) throws IOException {
+        long startTime = MathUtils.nowInNano();
+
+        long ledgerId = entry.readLong();
+        long entryId = entry.readLong();
+        entry.resetReaderIndex();
+
+        if (log.isDebugEnabled()) {
+            log.debug("Add entry. {}@{}", ledgerId, entryId);
+        }
+
+        // Waits if the write cache is being switched for a flush
+        writeCacheMutex.readLock().lock();
+        boolean inserted;
+        try {
+            inserted = writeCache.put(ledgerId, entryId, entry);
+        } finally {
+            writeCacheMutex.readLock().unlock();
+        }
+
+        if (!inserted) {
+            triggerFlushAndAddEntry(ledgerId, entryId, entry);
+        }
+
+        recordSuccessfulEvent(addEntryStats, startTime);
+        return entryId;
+    }
+
+    private void triggerFlushAndAddEntry(long ledgerId, long entryId, ByteBuf entry) throws IOException {
+        // Write cache is full, we need to trigger a flush so that it gets rotated
+        writeCacheMutex.writeLock().lock();
+
+        try {
+            // If the flush has already been triggered or flush has already switched the
+            // cache, we don't need to
+            // trigger another flush
+            if (!isFlushOngoing.get() && hasFlushBeenTriggered.compareAndSet(false, true)) {
+                // Trigger an early flush in background
+                log.info("Write cache is full, triggering flush");
+                executor.execute(() -> {
+                    try {
+                        flush();
+                    } catch (IOException e) {
+                        log.error("Error during flush", e);
+                    }
+                });
+            }
+
+            long timeoutNs = TimeUnit.MILLISECONDS.toNanos(100);
+            while (hasFlushBeenTriggered.get()) {
+                if (timeoutNs <= 0L) {
+                    throw new IOException("Write cache was not trigger within the timeout, cannot add entry " + ledgerId
+                            + "@" + entryId);
+                }
+                timeoutNs = flushWriteCacheCondition.awaitNanos(timeoutNs);
+            }
+
+            if (!writeCache.put(ledgerId, entryId, entry)) {
+                // Still wasn't able to cache entry
+                throw new IOException("Error while inserting entry in write cache" + ledgerId + "@" + entryId);
+            }
+
+        } catch (InterruptedException e) {
+            throw new IOException("Interrupted when adding entry " + ledgerId + "@" + entryId);
+        } finally {
+            writeCacheMutex.writeLock().unlock();
+        }
+    }
+
+    @Override
+    public ByteBuf getEntry(long ledgerId, long entryId) throws IOException {
+        long startTime = MathUtils.nowInNano();
+        if (log.isDebugEnabled()) {
+            log.debug("Get Entry: {}@{}", ledgerId, entryId);
+        }
+
+        if (entryId == BookieProtocol.LAST_ADD_CONFIRMED) {
+            return getLastEntry(ledgerId);
+        }
+
+        writeCacheMutex.readLock().lock();
+        try {
+            // First try to read from the write cache of recent entries
+            ByteBuf entry = writeCache.get(ledgerId, entryId);
+            if (entry != null) {
+                recordSuccessfulEvent(readCacheHitStats, startTime);
+                recordSuccessfulEvent(readEntryStats, startTime);
+                return entry;
+            }
+
+            // If there's a flush going on, the entry might be in the flush buffer
+            entry = writeCacheBeingFlushed.get(ledgerId, entryId);
+            if (entry != null) {
+                recordSuccessfulEvent(readCacheHitStats, startTime);
+                recordSuccessfulEvent(readEntryStats, startTime);
+                return entry;
+            }
+        } finally {
+            writeCacheMutex.readLock().unlock();
+        }
+
+        // Try reading from read-ahead cache
+        ByteBuf entry = readCache.get(ledgerId, entryId);
+        if (entry != null) {
+            recordSuccessfulEvent(readCacheHitStats, startTime);
+            recordSuccessfulEvent(readEntryStats, startTime);
+            return entry;
+        }
+
+        // Read from main storage
+        long entryLocation;
+        try {
+            entryLocation = entryLocationIndex.getLocation(ledgerId, entryId);
+            if (entryLocation == 0) {
+                throw new NoEntryException(ledgerId, entryId);
+            }
+            entry = entryLogger.readEntry(ledgerId, entryId, entryLocation);
+        } catch (NoEntryException e) {
+            recordFailedEvent(readEntryStats, startTime);
+            throw e;
+        }
+
+        readCache.put(ledgerId, entryId, entry);
+
+        // Try to read more entries
+        long nextEntryLocation = entryLocation + 4 /* size header */ + entry.readableBytes();
+        fillReadAheadCache(ledgerId, entryId + 1, nextEntryLocation);
+
+        recordSuccessfulEvent(readCacheMissStats, startTime);
+        recordSuccessfulEvent(readEntryStats, startTime);
+        return entry;
+    }
+
+    private void fillReadAheadCache(long orginalLedgerId, long firstEntryId, long firstEntryLocation) {
+        try {
+            long firstEntryLogId = (firstEntryLocation >> 32);
+            long currentEntryLogId = firstEntryLogId;
+            long currentEntryLocation = firstEntryLocation;
+            int count = 0;
+            long size = 0;
+
+            while (count < readAheadCacheBatchSize && currentEntryLogId == firstEntryLogId) {
+                ByteBuf entry = entryLogger.internalReadEntry(orginalLedgerId, -1, currentEntryLocation);
+
+                try {
+                    long currentEntryLedgerId = entry.getLong(0);
+                    long currentEntryId = entry.getLong(8);
+
+                    if (currentEntryLedgerId != orginalLedgerId) {
+                        // Found an entry belonging to a different ledger, stopping read-ahead
+                        entry.release();
+                        return;
+                    }
+
+                    // Insert entry in read cache
+                    readCache.put(orginalLedgerId, currentEntryId, entry);
+
+                    count++;
+                    size += entry.readableBytes();
+
+                    currentEntryLocation += 4 + entry.readableBytes();
+                    currentEntryLogId = currentEntryLocation >> 32;
+                } finally {
+                    entry.release();
+                }
+            }
+
+            readAheadBatchCountStats.registerSuccessfulValue(count);
+            readAheadBatchSizeStats.registerSuccessfulValue(size);
+        } catch (Exception e) {
+            if (log.isDebugEnabled()) {
+                log.debug("Exception during read ahead for ledger: {}: e", orginalLedgerId, e);
+            }
+        }
+    }
+
+    public ByteBuf getLastEntry(long ledgerId) throws IOException {
+        long startTime = MathUtils.nowInNano();
+
+        writeCacheMutex.readLock().lock();
+        try {
+            // First try to read from the write cache of recent entries
+            ByteBuf entry = writeCache.getLastEntry(ledgerId);
+            if (entry != null) {
+                if (log.isDebugEnabled()) {
+                    long foundLedgerId = entry.readLong(); // ledgedId
+                    long entryId = entry.readLong();
+                    entry.resetReaderIndex();
+                    if (log.isDebugEnabled()) {
+                        log.debug("Found last entry for ledger {} in write cache: {}@{}", ledgerId, foundLedgerId,
+                                entryId);
+                    }
+                }
+
+                recordSuccessfulEvent(readCacheHitStats, startTime);
+                recordSuccessfulEvent(readEntryStats, startTime);
+                return entry;
+            }
+
+            // If there's a flush going on, the entry might be in the flush buffer
+            entry = writeCacheBeingFlushed.getLastEntry(ledgerId);
+            if (entry != null) {
+                if (log.isDebugEnabled()) {
+                    entry.readLong(); // ledgedId
+                    long entryId = entry.readLong();
+                    entry.resetReaderIndex();
+                    if (log.isDebugEnabled()) {
+                        log.debug("Found last entry for ledger {} in write cache being flushed: {}", ledgerId, entryId);
+                    }
+                }
+
+                recordSuccessfulEvent(readCacheHitStats, startTime);
+                recordSuccessfulEvent(readEntryStats, startTime);
+                return entry;
+            }
+        } finally {
+            writeCacheMutex.readLock().unlock();
+        }
+
+        // Search the last entry in storage
+        long lastEntryId = entryLocationIndex.getLastEntryInLedger(ledgerId);
+        if (log.isDebugEnabled()) {
+            log.debug("Found last entry for ledger {} in db: {}", ledgerId, lastEntryId);
+        }
+
+        long entryLocation = entryLocationIndex.getLocation(ledgerId, lastEntryId);
+        ByteBuf content = entryLogger.readEntry(ledgerId, lastEntryId, entryLocation);
+
+        recordSuccessfulEvent(readCacheMissStats, startTime);
+        recordSuccessfulEvent(readEntryStats, startTime);
+        return content;
+    }
+
+    @VisibleForTesting
+    boolean isFlushRequired() {
+        writeCacheMutex.readLock().lock();
+        try {
+            return !writeCache.isEmpty();
+        } finally {
+            writeCacheMutex.readLock().unlock();
+        }
+    }
+
+    @Override
+    public void checkpoint(Checkpoint checkpoint) throws IOException {
+        Checkpoint thisCheckpoint = checkpointSource.newCheckpoint();
+        if (lastCheckpoint.compareTo(checkpoint) > 0) {
+            return;
+        }
+
+        long startTime = MathUtils.nowInNano();
+
+        // Only a single flush operation can happen at a time
+        flushMutex.lock();
+
+        try {
+            // Swap the write cache so that writes can continue to happen while the flush is
+            // ongoing
+            swapWriteCache();
+
+            long sizeToFlush = writeCacheBeingFlushed.size();
+            if (log.isDebugEnabled()) {
+                log.debug("Flushing entries. count: {} -- size {} Mb", writeCacheBeingFlushed.count(),
+                        sizeToFlush / 1024.0 / 1024);
+            }
+
+            // Write all the pending entries into the entry logger and collect the offset
+            // position for each entry
+
+            Batch batch = entryLocationIndex.newBatch();
+            writeCacheBeingFlushed.forEach((ledgerId, entryId, entry) -> {
+                try {
+                    long location = entryLogger.addEntry(ledgerId, entry, true);
+                    entryLocationIndex.addLocation(batch, ledgerId, entryId, location);
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            });
+
+            entryLogger.flush();
+
+            long batchFlushStarTime = System.nanoTime();
+            batch.flush();
+            batch.close();
+            if (log.isDebugEnabled()) {
+                log.debug("DB batch flushed time : {} s",
+                        MathUtils.elapsedNanos(batchFlushStarTime) / (double) TimeUnit.SECONDS.toNanos(1));
+            }
+
+            ledgerIndex.flush();
+
+            cleanupExecutor.execute(() -> {
+                // There can only be one single cleanup task running because the cleanupExecutor
+                // is single-threaded
+                try {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Removing deleted ledgers from db indexes");
+                    }
+
+                    entryLocationIndex.removeOffsetFromDeletedLedgers();
+                    ledgerIndex.removeDeletedLedgers();
+                } catch (Throwable t) {
+                    log.warn("Failed to cleanup db indexes", t);
+                }
+            });
+
+            lastCheckpoint = thisCheckpoint;
+
+            // Discard all the entry from the write cache, since they're now persisted
+            writeCacheBeingFlushed.clear();
+
+            double flushTimeSeconds = MathUtils.elapsedNanos(startTime) / (double) TimeUnit.SECONDS.toNanos(1);
+            double flushThroughput = sizeToFlush / 1024.0 / 1024.0 / flushTimeSeconds;
+
+            if (log.isDebugEnabled()) {
+                log.debug("Flushing done time {} s -- Written {} MB/s", flushTimeSeconds, flushThroughput);
+            }
+
+            recordSuccessfulEvent(flushStats, startTime);
+            flushSizeStats.registerSuccessfulValue(sizeToFlush);
+        } catch (IOException e) {
+            // Leave IOExecption as it is
+            throw e;
+        } catch (RuntimeException e) {
+            // Wrap unchecked exceptions
+            throw new IOException(e);
+        } finally {
+            try {
+                isFlushOngoing.set(false);
+            } finally {
+                flushMutex.unlock();
+            }
+        }
+    }
+
+    /**
+     * Swap the current write cache with the replacement cache.
+     */
+    private void swapWriteCache() {
+        writeCacheMutex.writeLock().lock();
+        try {
+            // First, swap the current write-cache map with an empty one so that writes will
+            // go on unaffected. Only a single flush is happening at the same time
+            WriteCache tmp = writeCacheBeingFlushed;
+            writeCacheBeingFlushed = writeCache;
+            writeCache = tmp;
+
+            // since the cache is switched, we can allow flush to be triggered
+            hasFlushBeenTriggered.set(false);
+            flushWriteCacheCondition.signalAll();
+        } finally {
+            try {
+                isFlushOngoing.set(true);
+            } finally {
+                writeCacheMutex.writeLock().unlock();
+            }
+        }
+    }
+
+    @Override
+    public void flush() throws IOException {
+        checkpoint(Checkpoint.MAX);
+    }
+
+    @Override
+    public void deleteLedger(long ledgerId) throws IOException {
+        if (log.isDebugEnabled()) {
+            log.debug("Deleting ledger {}", ledgerId);
+        }
+
+        // Delete entries from this ledger that are still in the write cache
+        writeCacheMutex.readLock().lock();
+        try {
+            writeCache.deleteLedger(ledgerId);
+        } finally {
+            writeCacheMutex.readLock().unlock();
+        }
+
+        entryLocationIndex.delete(ledgerId);
+        ledgerIndex.delete(ledgerId);
+
+        for (int i = 0, size = ledgerDeletionListeners.size(); i < size; i++) {
+            LedgerDeletionListener listener = ledgerDeletionListeners.get(i);
+            listener.ledgerDeleted(ledgerId);
+        }
+    }
+
+    @Override
+    public Iterable<Long> getActiveLedgersInRange(long firstLedgerId, long lastLedgerId) throws IOException {
+        return ledgerIndex.getActiveLedgersInRange(firstLedgerId, lastLedgerId);
+    }
+
+    @Override
+    public void updateEntriesLocations(Iterable<EntryLocation> locations) throws IOException {
+        // Trigger a flush to have all the entries being compacted in the db storage
+        flush();
+
+        entryLocationIndex.updateLocations(locations);
+    }
+
+    @Override
+    public EntryLogger getEntryLogger() {
+        return entryLogger;
+    }
+
+    @Override
+    public long getLastAddConfirmed(long ledgerId) throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Observable waitForLastAddConfirmedUpdate(long ledgerId, long previoisLAC, Observer observer)
+            throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void setExplicitlac(long ledgerId, ByteBuf lac) throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public ByteBuf getExplicitLac(long ledgerId) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void flushEntriesLocationsIndex() throws IOException {
+        // No-op. Location index is already flushed in updateEntriesLocations() call
+    }
+
+    /**
+     * Add an already existing ledger to the index.
+     *
+     * <p>This method is only used as a tool to help the migration from
+     * InterleaveLedgerStorage to DbLedgerStorage
+     *
+     * @param ledgerId
+     *            the ledger id
+     * @param entries
+     *            a map of entryId -> location
+     * @return the number of
+     */
+    public long addLedgerToIndex(long ledgerId, boolean isFenced, byte[] masterKey,
+            Iterable<SortedMap<Long, Long>> entries) throws Exception {
+        LedgerData ledgerData = LedgerData.newBuilder().setExists(true).setFenced(isFenced)
+                .setMasterKey(ByteString.copyFrom(masterKey)).build();
+        ledgerIndex.set(ledgerId, ledgerData);
+        AtomicLong numberOfEntries = new AtomicLong();
+
+        // Iterate over all the entries pages
+        Batch batch = entryLocationIndex.newBatch();
+        entries.forEach(map -> {
+            map.forEach((entryId, location) -> {
+                try {
+                    entryLocationIndex.addLocation(batch, ledgerId, entryId, location);
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+
+                numberOfEntries.incrementAndGet();
+            });
+        });
+
+        batch.flush();
+        batch.close();
+
+        return numberOfEntries.get();
+    }
+
+    @Override
+    public void registerLedgerDeletionListener(LedgerDeletionListener listener) {
+        ledgerDeletionListeners.add(listener);
+    }
+
+    public EntryLocationIndex getEntryLocationIndex() {
+        return entryLocationIndex;
+    }
+
+    private void recordSuccessfulEvent(OpStatsLogger logger, long startTimeNanos) {
+        logger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS);
+    }
+
+    private void recordFailedEvent(OpStatsLogger logger, long startTimeNanos) {
+        logger.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS);
+    }
+
+    private static final Logger log = LoggerFactory.getLogger(DbLedgerStorage.class);
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java
new file mode 100644
index 000000000..53e37a263
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java
@@ -0,0 +1,232 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import com.google.common.collect.Iterables;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.file.FileSystems;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.EntryLocation;
+import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorage.Batch;
+import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorageFactory.DbConfigType;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.stats.Gauge;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.util.collections.ConcurrentLongHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Maintains an index of the entry locations in the EntryLogger.
+ *
+ * <p>For each ledger multiple entries are stored in the same "record", represented
+ * by the {@link LedgerIndexPage} class.
+ */
+public class EntryLocationIndex implements Closeable {
+
+    private final KeyValueStorage locationsDb;
+    private final ConcurrentLongHashSet deletedLedgers = new ConcurrentLongHashSet();
+
+    private StatsLogger stats;
+
+    public EntryLocationIndex(ServerConfiguration conf, KeyValueStorageFactory storageFactory, String basePath,
+            StatsLogger stats) throws IOException {
+        String locationsDbPath = FileSystems.getDefault().getPath(basePath, "locations").toFile().toString();
+        locationsDb = storageFactory.newKeyValueStorage(locationsDbPath, DbConfigType.Huge, conf);
+
+        this.stats = stats;
+        registerStats();
+    }
+
+    public void registerStats() {
+        stats.registerGauge("entries-count", new Gauge<Long>() {
+            @Override
+            public Long getDefaultValue() {
+                return 0L;
+            }
+
+            @Override
+            public Long getSample() {
+                try {
+                    return locationsDb.count();
+                } catch (IOException e) {
+                    return -1L;
+                }
+            }
+        });
+    }
+
+    @Override
+    public void close() throws IOException {
+        locationsDb.close();
+    }
+
+    public long getLocation(long ledgerId, long entryId) throws IOException {
+        LongPairWrapper key = LongPairWrapper.get(ledgerId, entryId);
+        LongWrapper value = LongWrapper.get();
+
+        try {
+            if (locationsDb.get(key.array, value.array) < 0) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Entry not found {}@{} in db index", ledgerId, entryId);
+                }
+                return 0;
+            }
+
+            return value.getValue();
+        } finally {
+            key.recycle();
+            value.recycle();
+        }
+    }
+
+    public long getLastEntryInLedger(long ledgerId) throws IOException {
+        if (deletedLedgers.contains(ledgerId)) {
+            // Ledger already deleted
+            return -1;
+        }
+
+        return getLastEntryInLedgerInternal(ledgerId);
+    }
+
+    private long getLastEntryInLedgerInternal(long ledgerId) throws IOException {
+        LongPairWrapper maxEntryId = LongPairWrapper.get(ledgerId, Long.MAX_VALUE);
+
+        // Search the last entry in storage
+        Entry<byte[], byte[]> entry = locationsDb.getFloor(maxEntryId.array);
+        maxEntryId.recycle();
+
+        if (entry == null) {
+            throw new Bookie.NoEntryException(ledgerId, -1);
+        } else {
+            long foundLedgerId = ArrayUtil.getLong(entry.getKey(), 0);
+            long lastEntryId = ArrayUtil.getLong(entry.getKey(), 8);
+
+            if (foundLedgerId == ledgerId) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Found last page in storage db for ledger {} - last entry: {}", ledgerId, lastEntryId);
+                }
+                return lastEntryId;
+            } else {
+                throw new Bookie.NoEntryException(ledgerId, -1);
+            }
+        }
+    }
+
+    public void addLocation(long ledgerId, long entryId, long location) throws IOException {
+        Batch batch = locationsDb.newBatch();
+        addLocation(batch, ledgerId, entryId, location);
+        batch.flush();
+        batch.close();
+    }
+
+    public Batch newBatch() {
+        return locationsDb.newBatch();
+    }
+
+    public void addLocation(Batch batch, long ledgerId, long entryId, long location) throws IOException {
+        LongPairWrapper key = LongPairWrapper.get(ledgerId, entryId);
+        LongWrapper value = LongWrapper.get(location);
+
+        if (log.isDebugEnabled()) {
+            log.debug("Add location - ledger: {} -- entry: {} -- location: {}", ledgerId, entryId, location);
+        }
+
+        try {
+            batch.put(key.array, value.array);
+        } finally {
+            key.recycle();
+            value.recycle();
+        }
+    }
+
+    public void updateLocations(Iterable<EntryLocation> newLocations) throws IOException {
+        if (log.isDebugEnabled()) {
+            log.debug("Update locations -- {}", Iterables.size(newLocations));
+        }
+
+        Batch batch = newBatch();
+        // Update all the ledger index pages with the new locations
+        for (EntryLocation e : newLocations) {
+            if (log.isDebugEnabled()) {
+                log.debug("Update location - ledger: {} -- entry: {}", e.ledger, e.entry);
+            }
+
+            addLocation(batch, e.ledger, e.entry, e.location);
+        }
+
+        batch.flush();
+        batch.close();
+    }
+
+    public void delete(long ledgerId) throws IOException {
+        // We need to find all the LedgerIndexPage records belonging to one specific
+        // ledgers
+        deletedLedgers.add(ledgerId);
+    }
+
+    public void removeOffsetFromDeletedLedgers() throws IOException {
+        LongPairWrapper firstKeyWrapper = LongPairWrapper.get(-1, -1);
+        LongPairWrapper lastKeyWrapper = LongPairWrapper.get(-1, -1);
+        LongPairWrapper keyToDelete = LongPairWrapper.get(-1, -1);
+
+        Set<Long> ledgersToDelete = deletedLedgers.items();
+
+        if (ledgersToDelete.isEmpty()) {
+            return;
+        }
+
+        log.info("Deleting indexes for ledgers: {}", ledgersToDelete);
+        Batch batch = locationsDb.newBatch();
+
+        try {
+            for (long ledgerId : ledgersToDelete) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Deleting indexes from ledger {}", ledgerId);
+                }
+
+                firstKeyWrapper.set(ledgerId, 0);
+                lastKeyWrapper.set(ledgerId, Long.MAX_VALUE);
+
+                batch.deleteRange(firstKeyWrapper.array, lastKeyWrapper.array);
+            }
+
+            batch.flush();
+
+            // Removed from pending set
+            for (long ledgerId : ledgersToDelete) {
+                deletedLedgers.remove(ledgerId);
+            }
+        } finally {
+            firstKeyWrapper.recycle();
+            lastKeyWrapper.recycle();
+            keyToDelete.recycle();
+            batch.close();
+        }
+    }
+
+    private static final Logger log = LoggerFactory.getLogger(EntryLocationIndex.class);
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndex.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndex.java
new file mode 100644
index 000000000..04bf32dba
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgerMetadataIndex.java
@@ -0,0 +1,262 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import com.google.protobuf.ByteString;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.file.FileSystems;
+import java.util.AbstractMap.SimpleEntry;
+import java.util.Arrays;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.BookieException;
+import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorageDataFormats.LedgerData;
+import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorage.CloseableIterator;
+import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorageFactory.DbConfigType;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.stats.Gauge;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.util.collections.ConcurrentLongHashMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Maintains an index for the ledgers metadata.
+ *
+ * <p>The key is the ledgerId and the value is the {@link LedgerData} content.
+ */
+public class LedgerMetadataIndex implements Closeable {
+    // Contains all ledgers stored in the bookie
+    private final ConcurrentLongHashMap<LedgerData> ledgers;
+    private final AtomicInteger ledgersCount;
+
+    private final KeyValueStorage ledgersDb;
+    private StatsLogger stats;
+
+    // Holds ledger modifications applied in memory map, and pending to be flushed on db
+    private final ConcurrentLinkedQueue<Entry<Long, LedgerData>> pendingLedgersUpdates;
+
+    // Holds ledger ids that were delete from memory map, and pending to be flushed on db
+    private final ConcurrentLinkedQueue<Long> pendingDeletedLedgers;
+
+    public LedgerMetadataIndex(ServerConfiguration conf, KeyValueStorageFactory storageFactory, String basePath,
+            StatsLogger stats) throws IOException {
+        String ledgersPath = FileSystems.getDefault().getPath(basePath, "ledgers").toFile().toString();
+        ledgersDb = storageFactory.newKeyValueStorage(ledgersPath, DbConfigType.Small, conf);
+
+        ledgers = new ConcurrentLongHashMap<>();
+        ledgersCount = new AtomicInteger();
+
+        // Read all ledgers from db
+        CloseableIterator<Entry<byte[], byte[]>> iterator = ledgersDb.iterator();
+        try {
+            while (iterator.hasNext()) {
+                Entry<byte[], byte[]> entry = iterator.next();
+                long ledgerId = ArrayUtil.getLong(entry.getKey(), 0);
+                LedgerData ledgerData = LedgerData.parseFrom(entry.getValue());
+                ledgers.put(ledgerId, ledgerData);
+                ledgersCount.incrementAndGet();
+            }
+        } finally {
+            iterator.close();
+        }
+
+        this.pendingLedgersUpdates = new ConcurrentLinkedQueue<Entry<Long, LedgerData>>();
+        this.pendingDeletedLedgers = new ConcurrentLinkedQueue<Long>();
+
+        this.stats = stats;
+        registerStats();
+    }
+
+    public void registerStats() {
+        stats.registerGauge("ledgers-count", new Gauge<Long>() {
+            @Override
+            public Long getDefaultValue() {
+                return 0L;
+            }
+
+            @Override
+            public Long getSample() {
+                return (long) ledgersCount.get();
+            }
+        });
+    }
+
+    @Override
+    public void close() throws IOException {
+        ledgersDb.close();
+    }
+
+    public LedgerData get(long ledgerId) throws IOException {
+        LedgerData ledgerData = ledgers.get(ledgerId);
+        if (ledgerData == null) {
+            if (log.isDebugEnabled()) {
+                log.debug("Ledger not found {}", ledgerId);
+            }
+            throw new Bookie.NoLedgerException(ledgerId);
+        }
+
+        return ledgerData;
+    }
+
+    public void set(long ledgerId, LedgerData ledgerData) throws IOException {
+        ledgerData = LedgerData.newBuilder(ledgerData).setExists(true).build();
+
+        if (ledgers.put(ledgerId, ledgerData) == null) {
+            if (log.isDebugEnabled()) {
+                log.debug("Added new ledger {}", ledgerId);
+            }
+            ledgersCount.incrementAndGet();
+        }
+
+        pendingLedgersUpdates.add(new SimpleEntry<Long, LedgerData>(ledgerId, ledgerData));
+        pendingDeletedLedgers.remove(ledgerId);
+    }
+
+    public void delete(long ledgerId) throws IOException {
+        if (ledgers.remove(ledgerId) != null) {
+            if (log.isDebugEnabled()) {
+                log.debug("Removed ledger {}", ledgerId);
+            }
+            ledgersCount.decrementAndGet();
+        }
+
+        pendingDeletedLedgers.add(ledgerId);
+        pendingLedgersUpdates.removeIf(e -> e.getKey() == ledgerId);
+    }
+
+    public Iterable<Long> getActiveLedgersInRange(final long firstLedgerId, final long lastLedgerId)
+            throws IOException {
+        return Iterables.filter(ledgers.keys(), new Predicate<Long>() {
+            @Override
+            public boolean apply(Long ledgerId) {
+                return ledgerId >= firstLedgerId && ledgerId < lastLedgerId;
+            }
+        });
+    }
+
+    public boolean setFenced(long ledgerId) throws IOException {
+        LedgerData ledgerData = get(ledgerId);
+        if (ledgerData.getFenced()) {
+            return false;
+        }
+
+        LedgerData newLedgerData = LedgerData.newBuilder(ledgerData).setFenced(true).build();
+
+        if (ledgers.put(ledgerId, newLedgerData) == null) {
+            // Ledger had been deleted
+            if (log.isDebugEnabled()) {
+                log.debug("Re-inserted fenced ledger {}", ledgerId);
+            }
+            ledgersCount.incrementAndGet();
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug("Set fenced ledger {}", ledgerId);
+            }
+        }
+
+        pendingLedgersUpdates.add(new SimpleEntry<Long, LedgerData>(ledgerId, newLedgerData));
+        pendingDeletedLedgers.remove(ledgerId);
+        return true;
+    }
+
+    public void setMasterKey(long ledgerId, byte[] masterKey) throws IOException {
+        LedgerData ledgerData = ledgers.get(ledgerId);
+        if (ledgerData == null) {
+            // New ledger inserted
+            ledgerData = LedgerData.newBuilder().setExists(true).setFenced(false)
+                    .setMasterKey(ByteString.copyFrom(masterKey)).build();
+            if (log.isDebugEnabled()) {
+                log.debug("Inserting new ledger {}", ledgerId);
+            }
+        } else {
+            byte[] storedMasterKey = ledgerData.getMasterKey().toByteArray();
+            if (ArrayUtil.isArrayAllZeros(storedMasterKey)) {
+                // update master key of the ledger
+                ledgerData = LedgerData.newBuilder(ledgerData).setMasterKey(ByteString.copyFrom(masterKey)).build();
+                if (log.isDebugEnabled()) {
+                    log.debug("Replace old master key {} with new master key {}", storedMasterKey, masterKey);
+                }
+            } else if (!Arrays.equals(storedMasterKey, masterKey) && !ArrayUtil.isArrayAllZeros(masterKey)) {
+                log.warn("Ledger {} masterKey in db can only be set once.", ledgerId);
+                throw new IOException(BookieException.create(BookieException.Code.IllegalOpException));
+            }
+        }
+
+        if (ledgers.put(ledgerId, ledgerData) == null) {
+            ledgersCount.incrementAndGet();
+        }
+
+        pendingLedgersUpdates.add(new SimpleEntry<Long, LedgerData>(ledgerId, ledgerData));
+        pendingDeletedLedgers.remove(ledgerId);
+    }
+
+    /**
+     * Flushes all pending changes.
+     */
+    public void flush() throws IOException {
+        LongWrapper key = LongWrapper.get();
+
+        int updatedLedgers = 0;
+        while (!pendingLedgersUpdates.isEmpty()) {
+            Entry<Long, LedgerData> entry = pendingLedgersUpdates.poll();
+            key.set(entry.getKey());
+            byte[] value = entry.getValue().toByteArray();
+            ledgersDb.put(key.array, value);
+            ++updatedLedgers;
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug("Persisting updates to {} ledgers", updatedLedgers);
+        }
+
+        ledgersDb.sync();
+        key.recycle();
+    }
+
+    public void removeDeletedLedgers() throws IOException {
+        LongWrapper key = LongWrapper.get();
+
+        int deletedLedgers = 0;
+        while (!pendingDeletedLedgers.isEmpty()) {
+            long ledgerId = pendingDeletedLedgers.poll();
+            key.set(ledgerId);
+            ledgersDb.delete(key.array);
+            deletedLedgers++;
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug("Persisting deletes of ledgers {}", deletedLedgers);
+        }
+
+        ledgersDb.sync();
+        key.recycle();
+    }
+
+    private static final Logger log = LoggerFactory.getLogger(LedgerMetadataIndex.class);
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LongPairWrapper.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LongPairWrapper.java
new file mode 100644
index 000000000..993297c2b
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LongPairWrapper.java
@@ -0,0 +1,69 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import io.netty.util.Recycler;
+import io.netty.util.Recycler.Handle;
+
+/**
+ * Recyclable wrapper that holds a pair of longs.
+ */
+class LongPairWrapper {
+
+    final byte[] array = new byte[16];
+
+    public void set(long first, long second) {
+        ArrayUtil.setLong(array, 0, first);
+        ArrayUtil.setLong(array, 8, second);
+    }
+
+    public long getFirst() {
+        return ArrayUtil.getLong(array, 0);
+    }
+
+    public long getSecond() {
+        return ArrayUtil.getLong(array, 8);
+    }
+
+    public static LongPairWrapper get(long first, long second) {
+        LongPairWrapper lp = RECYCLER.get();
+        ArrayUtil.setLong(lp.array, 0, first);
+        ArrayUtil.setLong(lp.array, 8, second);
+        return lp;
+    }
+
+    public void recycle() {
+        handle.recycle(this);
+    }
+
+    private static final Recycler<LongPairWrapper> RECYCLER = new Recycler<LongPairWrapper>() {
+        @Override
+        protected LongPairWrapper newObject(Handle<LongPairWrapper> handle) {
+            return new LongPairWrapper(handle);
+        }
+    };
+
+    private final Handle<LongPairWrapper> handle;
+
+    private LongPairWrapper(Handle<LongPairWrapper> handle) {
+        this.handle = handle;
+    }
+}
\ No newline at end of file
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LongWrapper.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LongWrapper.java
new file mode 100644
index 000000000..144b9ee75
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LongWrapper.java
@@ -0,0 +1,67 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import io.netty.util.Recycler;
+import io.netty.util.Recycler.Handle;
+
+/**
+ * Wrapper for a long serialized into a byte array.
+ */
+class LongWrapper {
+
+    final byte[] array = new byte[8];
+
+    public void set(long value) {
+        ArrayUtil.setLong(array, 0, value);
+    }
+
+    public long getValue() {
+        return ArrayUtil.getLong(array, 0);
+    }
+
+    public static LongWrapper get() {
+        return RECYCLER.get();
+    }
+
+    public static LongWrapper get(long value) {
+        LongWrapper lp = RECYCLER.get();
+        ArrayUtil.setLong(lp.array, 0, value);
+        return lp;
+    }
+
+    public void recycle() {
+        handle.recycle(this);
+    }
+
+    private static final Recycler<LongWrapper> RECYCLER = new Recycler<LongWrapper>() {
+        @Override
+        protected LongWrapper newObject(Handle<LongWrapper> handle) {
+            return new LongWrapper(handle);
+        }
+    };
+
+    private final Handle<LongWrapper> handle;
+
+    private LongWrapper(Handle<LongWrapper> handle) {
+        this.handle = handle;
+    }
+}
\ No newline at end of file
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageBookieTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageBookieTest.java
new file mode 100644
index 000000000..da7f32a8d
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageBookieTest.java
@@ -0,0 +1,52 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.junit.Test;
+
+/**
+ * Unit test for {@link DbLedgerStorageBookieTest}.
+ */
+public class DbLedgerStorageBookieTest extends BookKeeperClusterTestCase {
+
+    public DbLedgerStorageBookieTest() {
+        super(1);
+        baseConf.setLedgerStorageClass(DbLedgerStorage.class.getName());
+        baseConf.setFlushInterval(60000);
+        baseConf.setGcWaitTime(60000);
+    }
+
+    @Test
+    public void testRecoveryEmptyLedger() throws Exception {
+        LedgerHandle lh1 = bkc.createLedger(1, 1, DigestType.MAC, new byte[0]);
+
+        // Force ledger close & recovery
+        LedgerHandle lh2 = bkc.openLedger(lh1.getId(), DigestType.MAC, new byte[0]);
+
+        assertEquals(0, lh2.getLength());
+        assertEquals(-1, lh2.getLastAddConfirmed());
+    }
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java
new file mode 100644
index 000000000..1e91c2ce8
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java
@@ -0,0 +1,423 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import com.google.common.collect.Lists;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.buffer.Unpooled;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.Bookie.NoEntryException;
+import org.apache.bookkeeper.bookie.BookieException;
+import org.apache.bookkeeper.bookie.EntryLocation;
+import org.apache.bookkeeper.bookie.EntryLogger;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.proto.BookieProtocol;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Unit test for {@link DbLedgerStorage}.
+ */
+public class DbLedgerStorageTest {
+
+    private DbLedgerStorage storage;
+    private File tmpDir;
+
+    @Before
+    public void setup() throws Exception {
+        tmpDir = File.createTempFile("bkTest", ".dir");
+        tmpDir.delete();
+        tmpDir.mkdir();
+        File curDir = Bookie.getCurrentDirectory(tmpDir);
+        Bookie.checkDirectoryStructure(curDir);
+
+        int gcWaitTime = 1000;
+        ServerConfiguration conf = new ServerConfiguration();
+        conf.setGcWaitTime(gcWaitTime);
+        conf.setAllowLoopback(true);
+        conf.setLedgerStorageClass(DbLedgerStorage.class.getName());
+        conf.setLedgerDirNames(new String[] { tmpDir.toString() });
+        Bookie bookie = new Bookie(conf);
+
+        storage = (DbLedgerStorage) bookie.getLedgerStorage();
+    }
+
+    @After
+    public void teardown() throws Exception {
+        storage.shutdown();
+        tmpDir.delete();
+    }
+
+    @Test
+    public void simple() throws Exception {
+        assertEquals(false, storage.ledgerExists(3));
+        try {
+            storage.isFenced(3);
+            fail("should have failed");
+        } catch (Bookie.NoLedgerException nle) {
+            // OK
+        }
+        assertEquals(false, storage.ledgerExists(3));
+        try {
+            storage.setFenced(3);
+            fail("should have failed");
+        } catch (Bookie.NoLedgerException nle) {
+            // OK
+        }
+        storage.setMasterKey(3, "key".getBytes());
+        try {
+            storage.setMasterKey(3, "other-key".getBytes());
+            fail("should have failed");
+        } catch (IOException ioe) {
+            assertTrue(ioe.getCause() instanceof BookieException.BookieIllegalOpException);
+        }
+        // setting the same key is NOOP
+        storage.setMasterKey(3, "key".getBytes());
+        assertEquals(true, storage.ledgerExists(3));
+        assertEquals(true, storage.setFenced(3));
+        assertEquals(true, storage.isFenced(3));
+        assertEquals(false, storage.setFenced(3));
+
+        storage.setMasterKey(4, "key".getBytes());
+        assertEquals(false, storage.isFenced(4));
+        assertEquals(true, storage.ledgerExists(4));
+
+        assertEquals("key", new String(storage.readMasterKey(4)));
+
+        assertEquals(Lists.newArrayList(4L, 3L), Lists.newArrayList(storage.getActiveLedgersInRange(0, 100)));
+        assertEquals(Lists.newArrayList(4L, 3L), Lists.newArrayList(storage.getActiveLedgersInRange(3, 100)));
+        assertEquals(Lists.newArrayList(3L), Lists.newArrayList(storage.getActiveLedgersInRange(0, 4)));
+
+        // Add / read entries
+        ByteBuf entry = Unpooled.buffer(1024);
+        entry.writeLong(4); // ledger id
+        entry.writeLong(1); // entry id
+        entry.writeBytes("entry-1".getBytes());
+
+        assertEquals(false, ((DbLedgerStorage) storage).isFlushRequired());
+
+        assertEquals(1, storage.addEntry(entry));
+
+        assertEquals(true, ((DbLedgerStorage) storage).isFlushRequired());
+
+        // Read from write cache
+        ByteBuf res = storage.getEntry(4, 1);
+        assertEquals(entry, res);
+
+        storage.flush();
+
+        assertEquals(false, ((DbLedgerStorage) storage).isFlushRequired());
+
+        // Read from db
+        res = storage.getEntry(4, 1);
+        assertEquals(entry, res);
+
+        try {
+            storage.getEntry(4, 2);
+            fail("Should have thrown exception");
+        } catch (NoEntryException e) {
+            // ok
+        }
+
+        ByteBuf entry2 = Unpooled.buffer(1024);
+        entry2.writeLong(4); // ledger id
+        entry2.writeLong(2); // entry id
+        entry2.writeBytes("entry-2".getBytes());
+
+        storage.addEntry(entry2);
+
+        // Read last entry in ledger
+        res = storage.getEntry(4, BookieProtocol.LAST_ADD_CONFIRMED);
+        assertEquals(entry2, res);
+
+        ByteBuf entry3 = Unpooled.buffer(1024);
+        entry3.writeLong(4); // ledger id
+        entry3.writeLong(3); // entry id
+        entry3.writeBytes("entry-3".getBytes());
+        storage.addEntry(entry3);
+
+        ByteBuf entry4 = Unpooled.buffer(1024);
+        entry4.writeLong(4); // ledger id
+        entry4.writeLong(4); // entry id
+        entry4.writeBytes("entry-4".getBytes());
+        storage.addEntry(entry4);
+
+        res = storage.getEntry(4, 4);
+        assertEquals(entry4, res);
+
+        // Delete
+        assertEquals(true, storage.ledgerExists(4));
+        storage.deleteLedger(4);
+        assertEquals(false, storage.ledgerExists(4));
+
+        // Should not throw exception event if the ledger was deleted
+        storage.getEntry(4, 4);
+
+        storage.addEntry(Unpooled.wrappedBuffer(entry2));
+        res = storage.getEntry(4, BookieProtocol.LAST_ADD_CONFIRMED);
+        assertEquals(entry4, res);
+
+        // Get last entry from storage
+        storage.flush();
+
+        try {
+            storage.getEntry(4, 4);
+            fail("Should have thrown exception since the ledger was deleted");
+        } catch (NoEntryException e) {
+            // ok
+        }
+    }
+
+    @Test
+    public void testBookieCompaction() throws Exception {
+        storage.setMasterKey(4, "key".getBytes());
+
+        ByteBuf entry3 = Unpooled.buffer(1024);
+        entry3.writeLong(4); // ledger id
+        entry3.writeLong(3); // entry id
+        entry3.writeBytes("entry-3".getBytes());
+        storage.addEntry(entry3);
+
+        // Simulate bookie compaction
+        EntryLogger entryLogger = ((DbLedgerStorage) storage).getEntryLogger();
+        // Rewrite entry-3
+        ByteBuf newEntry3 = Unpooled.buffer(1024);
+        newEntry3.writeLong(4); // ledger id
+        newEntry3.writeLong(3); // entry id
+        newEntry3.writeBytes("new-entry-3".getBytes());
+        long location = entryLogger.addEntry(4, newEntry3, false);
+
+        List<EntryLocation> locations = Lists.newArrayList(new EntryLocation(4, 3, location));
+        storage.updateEntriesLocations(locations);
+
+        ByteBuf res = storage.getEntry(4, 3);
+        System.out.println("res:       " + ByteBufUtil.hexDump(res));
+        System.out.println("newEntry3: " + ByteBufUtil.hexDump(newEntry3));
+        assertEquals(newEntry3, res);
+    }
+
+    @Test
+    public void doubleDirectoryError() throws Exception {
+        int gcWaitTime = 1000;
+        ServerConfiguration conf = new ServerConfiguration();
+        conf.setGcWaitTime(gcWaitTime);
+        conf.setAllowLoopback(true);
+        conf.setLedgerStorageClass(DbLedgerStorage.class.getName());
+        conf.setLedgerDirNames(new String[] { "dir1", "dir2" });
+
+        try {
+            new Bookie(conf);
+            fail("Should have failed because of the 2 directories");
+        } catch (IllegalArgumentException e) {
+            // ok
+        }
+
+    }
+
+    @Test
+    public void testRewritingEntries() throws Exception {
+        storage.setMasterKey(1, "key".getBytes());
+
+        try {
+            storage.getEntry(1, -1);
+            fail("Should throw exception");
+        } catch (Bookie.NoEntryException e) {
+            // ok
+        }
+
+        ByteBuf entry1 = Unpooled.buffer(1024);
+        entry1.writeLong(1); // ledger id
+        entry1.writeLong(1); // entry id
+        entry1.writeBytes("entry-1".getBytes());
+
+        storage.addEntry(entry1);
+        storage.flush();
+
+        ByteBuf newEntry1 = Unpooled.buffer(1024);
+        newEntry1.writeLong(1); // ledger id
+        newEntry1.writeLong(1); // entry id
+        newEntry1.writeBytes("new-entry-1".getBytes());
+
+        storage.addEntry(newEntry1);
+        storage.flush();
+
+        ByteBuf response = storage.getEntry(1, 1);
+        assertEquals(newEntry1, response);
+    }
+
+    @Test
+    public void testEntriesOutOfOrder() throws Exception {
+        storage.setMasterKey(1, "key".getBytes());
+
+        ByteBuf entry2 = Unpooled.buffer(1024);
+        entry2.writeLong(1); // ledger id
+        entry2.writeLong(2); // entry id
+        entry2.writeBytes("entry-2".getBytes());
+
+        storage.addEntry(entry2);
+
+        try {
+            storage.getEntry(1, 1);
+            fail("Entry doesn't exist");
+        } catch (NoEntryException e) {
+            // Ok, entry doesn't exist
+        }
+
+        ByteBuf res = storage.getEntry(1, 2);
+        assertEquals(entry2, res);
+
+        ByteBuf entry1 = Unpooled.buffer(1024);
+        entry1.writeLong(1); // ledger id
+        entry1.writeLong(1); // entry id
+        entry1.writeBytes("entry-1".getBytes());
+
+        storage.addEntry(entry1);
+
+        res = storage.getEntry(1, 1);
+        assertEquals(entry1, res);
+
+        res = storage.getEntry(1, 2);
+        assertEquals(entry2, res);
+
+        storage.flush();
+
+        res = storage.getEntry(1, 1);
+        assertEquals(entry1, res);
+
+        res = storage.getEntry(1, 2);
+        assertEquals(entry2, res);
+    }
+
+    @Test
+    public void testEntriesOutOfOrderWithFlush() throws Exception {
+        storage.setMasterKey(1, "key".getBytes());
+
+        ByteBuf entry2 = Unpooled.buffer(1024);
+        entry2.writeLong(1); // ledger id
+        entry2.writeLong(2); // entry id
+        entry2.writeBytes("entry-2".getBytes());
+
+        storage.addEntry(entry2);
+
+        try {
+            storage.getEntry(1, 1);
+            fail("Entry doesn't exist");
+        } catch (NoEntryException e) {
+            // Ok, entry doesn't exist
+        }
+
+        ByteBuf res = storage.getEntry(1, 2);
+        assertEquals(entry2, res);
+        res.release();
+
+        storage.flush();
+
+        try {
+            storage.getEntry(1, 1);
+            fail("Entry doesn't exist");
+        } catch (NoEntryException e) {
+            // Ok, entry doesn't exist
+        }
+
+        res = storage.getEntry(1, 2);
+        assertEquals(entry2, res);
+        res.release();
+
+        ByteBuf entry1 = Unpooled.buffer(1024);
+        entry1.writeLong(1); // ledger id
+        entry1.writeLong(1); // entry id
+        entry1.writeBytes("entry-1".getBytes());
+
+        storage.addEntry(entry1);
+
+        res = storage.getEntry(1, 1);
+        assertEquals(entry1, res);
+        res.release();
+
+        res = storage.getEntry(1, 2);
+        assertEquals(entry2, res);
+        res.release();
+
+        storage.flush();
+
+        res = storage.getEntry(1, 1);
+        assertEquals(entry1, res);
+        res.release();
+
+        res = storage.getEntry(1, 2);
+        assertEquals(entry2, res);
+        res.release();
+    }
+
+    @Test
+    public void testAddEntriesAfterDelete() throws Exception {
+        storage.setMasterKey(1, "key".getBytes());
+
+        ByteBuf entry0 = Unpooled.buffer(1024);
+        entry0.writeLong(1); // ledger id
+        entry0.writeLong(0); // entry id
+        entry0.writeBytes("entry-0".getBytes());
+
+        ByteBuf entry1 = Unpooled.buffer(1024);
+        entry1.writeLong(1); // ledger id
+        entry1.writeLong(1); // entry id
+        entry1.writeBytes("entry-1".getBytes());
+
+        storage.addEntry(entry0);
+        storage.addEntry(entry1);
+
+        storage.flush();
+
+        storage.deleteLedger(1);
+
+        storage.setMasterKey(1, "key".getBytes());
+
+        entry0 = Unpooled.buffer(1024);
+        entry0.writeLong(1); // ledger id
+        entry0.writeLong(0); // entry id
+        entry0.writeBytes("entry-0".getBytes());
+
+        entry1 = Unpooled.buffer(1024);
+        entry1.writeLong(1); // ledger id
+        entry1.writeLong(1); // entry id
+        entry1.writeBytes("entry-1".getBytes());
+
+        storage.addEntry(entry0);
+        storage.addEntry(entry1);
+
+        assertEquals(entry0, storage.getEntry(1, 0));
+        assertEquals(entry1, storage.getEntry(1, 1));
+
+        storage.flush();
+    }
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWriteCacheTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWriteCacheTest.java
new file mode 100644
index 000000000..0d3f5bbc8
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWriteCacheTest.java
@@ -0,0 +1,137 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Unit test for {@link DbLedgerStorage}.
+ */
+public class DbLedgerStorageWriteCacheTest {
+
+    private DbLedgerStorage storage;
+    private File tmpDir;
+
+    private static class MockedDbLedgerStorage extends DbLedgerStorage {
+
+        @Override
+        public void flush() throws IOException {
+            flushMutex.lock();
+            try {
+                // Swap the write caches and block indefinitely to simulate a slow disk
+                WriteCache tmp = writeCacheBeingFlushed;
+                writeCacheBeingFlushed = writeCache;
+                writeCache = tmp;
+
+                // since the cache is switched, we can allow flush to be triggered
+                hasFlushBeenTriggered.set(false);
+
+                // Block the flushing thread
+                try {
+                    Thread.sleep(1000);
+                } catch (InterruptedException e) {
+                    return;
+                }
+            } finally {
+                flushMutex.unlock();
+            }
+        }
+
+    }
+
+    @Before
+    public void setup() throws Exception {
+        tmpDir = File.createTempFile("bkTest", ".dir");
+        tmpDir.delete();
+        tmpDir.mkdir();
+        File curDir = Bookie.getCurrentDirectory(tmpDir);
+        Bookie.checkDirectoryStructure(curDir);
+
+        int gcWaitTime = 1000;
+        ServerConfiguration conf = new ServerConfiguration();
+        conf.setGcWaitTime(gcWaitTime);
+        conf.setAllowLoopback(true);
+        conf.setLedgerStorageClass(MockedDbLedgerStorage.class.getName());
+        conf.setProperty(DbLedgerStorage.WRITE_CACHE_MAX_SIZE_MB, 1);
+        conf.setLedgerDirNames(new String[] { tmpDir.toString() });
+        Bookie bookie = new Bookie(conf);
+
+        storage = (DbLedgerStorage) bookie.getLedgerStorage();
+    }
+
+    @After
+    public void teardown() throws Exception {
+        storage.shutdown();
+        tmpDir.delete();
+    }
+
+    @Test
+    public void writeCacheFull() throws Exception {
+        storage.setMasterKey(4, "key".getBytes());
+        assertEquals(false, storage.isFenced(4));
+        assertEquals(true, storage.ledgerExists(4));
+
+        assertEquals("key", new String(storage.readMasterKey(4)));
+
+        // Add enough entries to fill the 1st write cache
+        for (int i = 0; i < 5; i++) {
+            ByteBuf entry = Unpooled.buffer(100 * 1024 + 2 * 8);
+            entry.writeLong(4); // ledger id
+            entry.writeLong(i); // entry id
+            entry.writeZero(100 * 1024);
+            storage.addEntry(entry);
+        }
+
+        for (int i = 0; i < 5; i++) {
+            ByteBuf entry = Unpooled.buffer(100 * 1024 + 2 * 8);
+            entry.writeLong(4); // ledger id
+            entry.writeLong(5 + i); // entry id
+            entry.writeZero(100 * 1024);
+            storage.addEntry(entry);
+        }
+
+        // Next add should fail for cache full
+        ByteBuf entry = Unpooled.buffer(100 * 1024 + 2 * 8);
+        entry.writeLong(4); // ledger id
+        entry.writeLong(22); // entry id
+        entry.writeZero(100 * 1024);
+
+        try {
+            storage.addEntry(entry);
+            fail("Should have thrown exception");
+        } catch (IOException e) {
+            // Expected
+        }
+    }
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndexTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndexTest.java
new file mode 100644
index 000000000..6e83ffd58
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndexTest.java
@@ -0,0 +1,111 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.junit.Test;
+
+/**
+ * Unit test for {@link EntryLocationIndex}.
+ */
+public class EntryLocationIndexTest {
+
+    private final ServerConfiguration serverConfiguration = new ServerConfiguration();
+
+    @Test
+    public void deleteLedgerTest() throws Exception {
+        File tmpDir = File.createTempFile("bkTest", ".dir");
+        tmpDir.delete();
+        tmpDir.mkdir();
+        tmpDir.deleteOnExit();
+
+        EntryLocationIndex idx = new EntryLocationIndex(serverConfiguration, KeyValueStorageRocksDB.factory,
+                tmpDir.getAbsolutePath(), NullStatsLogger.INSTANCE);
+
+        // Add some dummy indexes
+        idx.addLocation(40312, 0, 1);
+        idx.addLocation(40313, 10, 2);
+        idx.addLocation(40320, 0, 3);
+
+        // Add more indexes in a different batch
+        idx.addLocation(40313, 11, 5);
+        idx.addLocation(40313, 12, 6);
+        idx.addLocation(40320, 1, 7);
+        idx.addLocation(40312, 3, 4);
+
+        idx.delete(40313);
+
+        assertEquals(1, idx.getLocation(40312, 0));
+        assertEquals(4, idx.getLocation(40312, 3));
+        assertEquals(3, idx.getLocation(40320, 0));
+        assertEquals(7, idx.getLocation(40320, 1));
+
+        assertEquals(2, idx.getLocation(40313, 10));
+        assertEquals(5, idx.getLocation(40313, 11));
+        assertEquals(6, idx.getLocation(40313, 12));
+
+        idx.removeOffsetFromDeletedLedgers();
+
+        // After flush the keys will be removed
+        assertEquals(0, idx.getLocation(40313, 10));
+        assertEquals(0, idx.getLocation(40313, 11));
+        assertEquals(0, idx.getLocation(40313, 12));
+
+        idx.close();
+    }
+
+    // this tests if a ledger is added after it has been deleted
+    @Test
+    public void addLedgerAfterDeleteTest() throws Exception {
+        File tmpDir = File.createTempFile("bkTest", ".dir");
+        tmpDir.delete();
+        tmpDir.mkdir();
+        tmpDir.deleteOnExit();
+
+        EntryLocationIndex idx = new EntryLocationIndex(serverConfiguration, KeyValueStorageRocksDB.factory,
+                tmpDir.getAbsolutePath(), NullStatsLogger.INSTANCE);
+
+        // Add some dummy indexes
+        idx.addLocation(40312, 0, 1);
+        idx.addLocation(40313, 10, 2);
+        idx.addLocation(40320, 0, 3);
+
+        idx.delete(40313);
+
+        // Add more indexes in a different batch
+        idx.addLocation(40313, 11, 5);
+        idx.addLocation(40313, 12, 6);
+        idx.addLocation(40320, 1, 7);
+        idx.addLocation(40312, 3, 4);
+
+        idx.removeOffsetFromDeletedLedgers();
+
+        assertEquals(0, idx.getLocation(40313, 11));
+        assertEquals(0, idx.getLocation(40313, 12));
+
+        idx.close();
+    }
+}
diff --git a/buildtools/src/main/resources/bookkeeper/findbugsExclude.xml b/buildtools/src/main/resources/bookkeeper/findbugsExclude.xml
index 2cc5ce766..736c88b11 100644
--- a/buildtools/src/main/resources/bookkeeper/findbugsExclude.xml
+++ b/buildtools/src/main/resources/bookkeeper/findbugsExclude.xml
@@ -24,6 +24,10 @@
     <!-- generated code, we can't be held responsible for findbugs in it //-->
     <Class name="~org\.apache\.bookkeeper\.proto\.BookkeeperProtocol.*" />
   </Match>
+  <Match>
+    <!-- generated code, we can't be held responsible for findbugs in it //-->
+    <Class name="~org\.apache\.bookkeeper\.bookie\.storage\.ldb\.DbLedgerStorageDataFormats.*" />
+  </Match>
   <Match>
     <!-- generated code, we can't be held responsible for findbugs in it //-->
     <Class name="~org\.apache\.bookkeeper\.tests\.generated.*" />


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services