You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/04/08 18:31:23 UTC

[bookkeeper] branch master updated: Allow multiple directories in DbLedgerStorage

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 4939416  Allow multiple directories in DbLedgerStorage
4939416 is described below

commit 4939416c68f20c990d5c98ee19d59ba662f1b4a0
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Sun Apr 8 11:31:17 2018 -0700

    Allow multiple directories in DbLedgerStorage
    
    In normal conditions, the max throughput that a bookie can sustain is mostly determined by how fast we can write entries on the journal.
    
    If we assume a *very* fast journal, for example using multiple journal threads and setting `journalSyncData=false` or with the journal on a RAM disk (there are some cases in which it might make sense), then the LedgerStorage becomes the "bottleneck". We need to be able to flush data on disk faster than the incoming rate, otherwise the write cache will get full and then we have to apply backpressure.
    
    In different testing scenario, the bottleneck has become the LedgerStoarge flush/checkpoint because it's done by a single thread.
    
    For smaller entries (1KB) the limit is ~250 K write/s and the limitation is due to preparing a batch with the offsets to write into rocksdb. Each insertion in the batch needs to cross JNI boundaries and that is expensive.
    
    For bigger entries (10KB), the limit is ~50K write/s (or 500MB /s). This mostly because the single flush thread cannot drive 100% IO utilization on the disks (since it's doing the rest of work). Raw disk writes in that test environment (12 HDDs) could reach 1.4GB/s.
    
    This change is only relative to DbLedgerStorage and it's a simple refactoring that, when configured with multiple directories, it will hash a ledger into a certain directory. Each directory is flushed independently on its own thread.
    
    Author: Matteo Merli <mm...@apache.org>
    
    Reviewers: Sijie Guo <si...@apache.org>
    
    This closes #1289 from merlimat/db-storage-multiple-dirs
---
 .../org/apache/bookkeeper/bookie/BookieShell.java  |   7 +-
 .../bookkeeper/bookie/GarbageCollectorThread.java  |  17 +-
 .../bookkeeper/bookie/LedgerDirsManager.java       |   7 +-
 .../bookie/storage/ldb/DbLedgerStorage.java        | 954 +++------------------
 ...ge.java => SingleDirectoryDbLedgerStorage.java} | 325 ++-----
 .../bookie/storage/ldb/TransientLedgerInfo.java    | 156 ++++
 .../bookie/storage/ldb/DbLedgerStorageTest.java    |  17 +-
 .../storage/ldb/DbLedgerStorageWriteCacheTest.java |  69 +-
 8 files changed, 425 insertions(+), 1127 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
index d01a17c..92863e1 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
@@ -76,7 +76,6 @@ import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
 import org.apache.bookkeeper.bookie.EntryLogger.EntryLogScanner;
 import org.apache.bookkeeper.bookie.Journal.JournalScanner;
 import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage;
-import org.apache.bookkeeper.bookie.storage.ldb.EntryLocationIndex;
 import org.apache.bookkeeper.bookie.storage.ldb.LocationsIndexRebuildOp;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BKException.MetaStoreException;
@@ -2640,8 +2639,6 @@ public class BookieShell implements Tool {
                     null, checkpointSource, checkpointer, NullStatsLogger.INSTANCE);
             LedgerCache interleavedLedgerCache = interleavedStorage.ledgerCache;
 
-            EntryLocationIndex dbEntryLocationIndex = dbStorage.getEntryLocationIndex();
-
             int convertedLedgers = 0;
             for (long ledgerId : dbStorage.getActiveLedgersInRange(0, Long.MAX_VALUE)) {
                 if (LOG.isDebugEnabled()) {
@@ -2653,10 +2650,10 @@ public class BookieShell implements Tool {
                     interleavedStorage.setFenced(ledgerId);
                 }
 
-                long lastEntryInLedger = dbEntryLocationIndex.getLastEntryInLedger(ledgerId);
+                long lastEntryInLedger = dbStorage.getLastEntryInLedger(ledgerId);
                 for (long entryId = 0; entryId <= lastEntryInLedger; entryId++) {
                     try {
-                        long location = dbEntryLocationIndex.getLocation(ledgerId, entryId);
+                        long location = dbStorage.getLocation(ledgerId, entryId);
                         if (location != 0L) {
                             interleavedLedgerCache.putEntryOffset(ledgerId, entryId, location);
                         }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
index 79515ea..4b1c834 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
@@ -125,6 +125,18 @@ public class GarbageCollectorThread extends SafeRunnable {
 
     final ServerConfiguration conf;
 
+    /**
+     * Create a garbage collector thread.
+     *
+     * @param conf
+     *          Server Configuration Object.
+     * @throws IOException
+     */
+    public GarbageCollectorThread(ServerConfiguration conf, LedgerManager ledgerManager,
+            final CompactableLedgerStorage ledgerStorage, StatsLogger statsLogger) throws IOException {
+        this(conf, ledgerManager, ledgerStorage, statsLogger,
+                Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("GarbageCollectorThread")));
+    }
 
     /**
      * Create a garbage collector thread.
@@ -136,9 +148,10 @@ public class GarbageCollectorThread extends SafeRunnable {
     public GarbageCollectorThread(ServerConfiguration conf,
                                   LedgerManager ledgerManager,
                                   final CompactableLedgerStorage ledgerStorage,
-                                  StatsLogger statsLogger)
+                                  StatsLogger statsLogger,
+                                    ScheduledExecutorService gcExecutor)
         throws IOException {
-        gcExecutor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("GarbageCollectorThread"));
+        this.gcExecutor = gcExecutor;
         this.conf = conf;
 
         this.entryLogger = ledgerStorage.getEntryLogger();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsManager.java
index a615cfa..f30821a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsManager.java
@@ -62,8 +62,7 @@ public class LedgerDirsManager {
         this(conf, dirs, diskChecker, NullStatsLogger.INSTANCE);
     }
 
-    @VisibleForTesting
-    LedgerDirsManager(ServerConfiguration conf, File[] dirs, DiskChecker diskChecker, StatsLogger statsLogger) {
+    public LedgerDirsManager(ServerConfiguration conf, File[] dirs, DiskChecker diskChecker, StatsLogger statsLogger) {
         this.ledgerDirectories = Arrays.asList(Bookie
                 .getCurrentDirectories(dirs));
         this.writableLedgerDirectories = new ArrayList<File>(ledgerDirectories);
@@ -343,6 +342,10 @@ public class LedgerDirsManager {
         }
     }
 
+    public DiskChecker getDiskChecker() {
+        return diskChecker;
+    }
+
     /**
      * Indicates All configured ledger directories are full.
      */
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
index 55b0945..8753363 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
@@ -20,323 +20,114 @@
  */
 package org.apache.bookkeeper.bookie.storage.ldb;
 
-import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
-import static org.apache.bookkeeper.bookie.LastAddConfirmedUpdateNotification.WATCHER_RECYCLER;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
-import com.google.protobuf.ByteString;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
 import io.netty.util.concurrent.DefaultThreadFactory;
 
+import java.io.File;
 import java.io.IOException;
-import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.SortedMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.locks.StampedLock;
 
-import org.apache.bookkeeper.bookie.Bookie;
-import org.apache.bookkeeper.bookie.Bookie.NoEntryException;
+import lombok.extern.slf4j.Slf4j;
+
 import org.apache.bookkeeper.bookie.BookieException;
-import org.apache.bookkeeper.bookie.BookieException.OperationRejectedException;
 import org.apache.bookkeeper.bookie.CheckpointSource;
 import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
 import org.apache.bookkeeper.bookie.Checkpointer;
-import org.apache.bookkeeper.bookie.CompactableLedgerStorage;
-import org.apache.bookkeeper.bookie.EntryLocation;
-import org.apache.bookkeeper.bookie.EntryLogger;
-import org.apache.bookkeeper.bookie.GarbageCollectorThread;
 import org.apache.bookkeeper.bookie.LastAddConfirmedUpdateNotification;
 import org.apache.bookkeeper.bookie.LedgerDirsManager;
+import org.apache.bookkeeper.bookie.LedgerStorage;
 import org.apache.bookkeeper.bookie.StateManager;
-import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorageDataFormats.LedgerData;
-import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorage.Batch;
 import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorageFactory.DbConfigType;
-import org.apache.bookkeeper.common.util.Watchable;
+import org.apache.bookkeeper.bookie.storage.ldb.SingleDirectoryDbLedgerStorage.LedgerLoggerProcessor;
+import org.apache.bookkeeper.common.util.MathUtils;
 import org.apache.bookkeeper.common.util.Watcher;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.LedgerManager;
-import org.apache.bookkeeper.proto.BookieProtocol;
-import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.Gauge;
 import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.util.DiskChecker;
-import org.apache.bookkeeper.util.MathUtils;
-import org.apache.bookkeeper.util.collections.ConcurrentLongHashMap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+
 
 /**
  * Implementation of LedgerStorage that uses RocksDB to keep the indexes for entries stored in EntryLogs.
  */
-public class DbLedgerStorage implements CompactableLedgerStorage {
-
-    private static final long NOT_ASSIGNED_LAC = Long.MIN_VALUE;
-
-    /**
-     * This class borrows the logic from FileInfo.
-     *
-     * <p>This class is used for holding all the transient states for a given ledger.
-     */
-    private static class TransientLedgerInfo extends Watchable<LastAddConfirmedUpdateNotification>
-            implements AutoCloseable {
-
-        // lac
-        private volatile long lac = NOT_ASSIGNED_LAC;
-        // request from explicit lac requests
-        private ByteBuffer explicitLac = null;
-        // is the ledger info closed?
-        private boolean isClosed;
-
-        private final long ledgerId;
-        // reference to LedgerMetadataIndex
-        private final LedgerMetadataIndex ledgerIndex;
-
-        private long lastAccessed;
-
-        /**
-         * Construct an Watchable with zero watchers.
-         */
-        public TransientLedgerInfo(long ledgerId, LedgerMetadataIndex ledgerIndex) {
-            super(WATCHER_RECYCLER);
-            this.ledgerId = ledgerId;
-            this.ledgerIndex = ledgerIndex;
-            this.lastAccessed = System.currentTimeMillis();
-        }
-
-        long getLastAddConfirmed() {
-            return lac;
-        }
-
-        long setLastAddConfirmed(long lac) {
-            long lacToReturn;
-            boolean changed = false;
-            synchronized (this) {
-                if (this.lac == NOT_ASSIGNED_LAC || this.lac < lac) {
-                    this.lac = lac;
-                    changed = true;
-                    lastAccessed = System.currentTimeMillis();
-                }
-                lacToReturn = this.lac;
-            }
-            if (changed) {
-                notifyWatchers(lacToReturn);
-            }
-            return lacToReturn;
-        }
-
-        synchronized boolean waitForLastAddConfirmedUpdate(long previousLAC,
-                Watcher<LastAddConfirmedUpdateNotification> watcher) throws IOException {
-            lastAccessed = System.currentTimeMillis();
-            if ((lac != NOT_ASSIGNED_LAC && lac > previousLAC) || isClosed || ledgerIndex.get(ledgerId).getFenced()) {
-                return false;
-            }
-
-            addWatcher(watcher);
-            return true;
-        }
-
-        public ByteBuf getExplicitLac() {
-            ByteBuf retLac = null;
-            synchronized (this) {
-                if (explicitLac != null) {
-                    retLac = Unpooled.buffer(explicitLac.capacity());
-                    explicitLac.rewind(); // copy from the beginning
-                    retLac.writeBytes(explicitLac);
-                    explicitLac.rewind();
-                    return retLac;
-                }
-            }
-            return retLac;
-        }
-
-        public void setExplicitLac(ByteBuf lac) {
-            long explicitLacValue;
-            synchronized (this) {
-                if (explicitLac == null) {
-                    explicitLac = ByteBuffer.allocate(lac.capacity());
-                }
-                lac.readBytes(explicitLac);
-                explicitLac.rewind();
-
-                // skip the ledger id
-                explicitLac.getLong();
-                explicitLacValue = explicitLac.getLong();
-                explicitLac.rewind();
-
-                lastAccessed = System.currentTimeMillis();
-            }
-            setLastAddConfirmed(explicitLacValue);
-        }
-
-        boolean isStale() {
-            return (lastAccessed + TimeUnit.MINUTES.toMillis(LEDGER_INFO_CACHING_TIME_MINUTES)) < System
-                    .currentTimeMillis();
-        }
-
-        void notifyWatchers(long lastAddConfirmed) {
-            notifyWatchers(LastAddConfirmedUpdateNotification.FUNC, lastAddConfirmed);
-        }
-
-        @Override
-        public void close() {
-            synchronized (this) {
-                if (isClosed) {
-                    return;
-                }
-                isClosed = true;
-            }
-            // notify watchers
-            notifyWatchers(Long.MAX_VALUE);
-        }
-
-    }
-
-    private EntryLogger entryLogger;
-
-    private LedgerMetadataIndex ledgerIndex;
-    private EntryLocationIndex entryLocationIndex;
-
-    private static final long LEDGER_INFO_CACHING_TIME_MINUTES = 10;
-    private ConcurrentLongHashMap<TransientLedgerInfo> transientLedgerInfoCache;
-
-    private GarbageCollectorThread gcThread;
-
-    // Write cache where all new entries are inserted into
-    protected volatile WriteCache writeCache;
-
-    // Write cache that is used to swap with writeCache during flushes
-    protected volatile WriteCache writeCacheBeingFlushed;
-
-    // Cache where we insert entries for speculative reading
-    private ReadCache readCache;
-
-    private final StampedLock writeCacheRotationLock = new StampedLock();
-
-    protected final ReentrantLock flushMutex = new ReentrantLock();
-
-    protected final AtomicBoolean hasFlushBeenTriggered = new AtomicBoolean(false);
-    private final AtomicBoolean isFlushOngoing = new AtomicBoolean(false);
-
-    private final ExecutorService executor = Executors.newSingleThreadExecutor(new DefaultThreadFactory("db-storage"));
-
-    // Executor used to for db index cleanup
-    private final ScheduledExecutorService cleanupExecutor = Executors
-            .newSingleThreadScheduledExecutor(new DefaultThreadFactory("db-storage-cleanup"));
+@Slf4j
+public class DbLedgerStorage implements LedgerStorage {
 
     static final String WRITE_CACHE_MAX_SIZE_MB = "dbStorage_writeCacheMaxSizeMb";
-    static final String READ_AHEAD_CACHE_BATCH_SIZE = "dbStorage_readAheadCacheBatchSize";
+
     static final String READ_AHEAD_CACHE_MAX_SIZE_MB = "dbStorage_readAheadCacheMaxSizeMb";
 
     static final String MAX_THROTTLE_TIME_MILLIS = "dbStorage_maxThrottleTimeMs";
 
     private static final long DEFAULT_WRITE_CACHE_MAX_SIZE_MB = 16;
     private static final long DEFAULT_READ_CACHE_MAX_SIZE_MB = 16;
-    private static final int DEFAULT_READ_AHEAD_CACHE_BATCH_SIZE = 100;
-
-    private static final long DEFAUL_MAX_THROTTLE_TIME_MILLIS = TimeUnit.SECONDS.toMillis(10);
 
     private static final int MB = 1024 * 1024;
 
-    private final CopyOnWriteArrayList<LedgerDeletionListener> ledgerDeletionListeners = Lists
-            .newCopyOnWriteArrayList();
-
-    private long writeCacheMaxSize;
-
-    private CheckpointSource checkpointSource = null;
-    private Checkpoint lastCheckpoint = Checkpoint.MIN;
-
-    private long readCacheMaxSize;
-    private int readAheadCacheBatchSize;
-
-    private long maxThrottleTimeNanos;
-
-    private StatsLogger stats;
+    private int numberOfDirs;
+    private List<SingleDirectoryDbLedgerStorage> ledgerStorageList;
 
-    private OpStatsLogger addEntryStats;
-    private OpStatsLogger readEntryStats;
-    private OpStatsLogger readCacheHitStats;
-    private OpStatsLogger readCacheMissStats;
-    private OpStatsLogger readAheadBatchCountStats;
-    private OpStatsLogger readAheadBatchSizeStats;
-    private OpStatsLogger flushStats;
-    private OpStatsLogger flushSizeStats;
-
-    private Counter throttledWriteRequests;
-    private Counter rejectedWriteRequests;
+    // Keep 1 single Bookie GC thread so the the compactions from multiple individual directories are serialized
+    private ScheduledExecutorService gcExecutor;
 
     @Override
     public void initialize(ServerConfiguration conf, LedgerManager ledgerManager, LedgerDirsManager ledgerDirsManager,
             LedgerDirsManager indexDirsManager, StateManager stateManager, CheckpointSource checkpointSource,
             Checkpointer checkpointer, StatsLogger statsLogger) throws IOException {
-        checkArgument(ledgerDirsManager.getAllLedgerDirs().size() == 1,
-                "Db implementation only allows for one storage dir");
-
-        String baseDir = ledgerDirsManager.getAllLedgerDirs().get(0).toString();
-
-        writeCacheMaxSize = conf.getLong(WRITE_CACHE_MAX_SIZE_MB, DEFAULT_WRITE_CACHE_MAX_SIZE_MB) * MB;
-
-        writeCache = new WriteCache(writeCacheMaxSize / 2);
-        writeCacheBeingFlushed = new WriteCache(writeCacheMaxSize / 2);
-
-        this.checkpointSource = checkpointSource;
-
-        readCacheMaxSize = conf.getLong(READ_AHEAD_CACHE_MAX_SIZE_MB, DEFAULT_READ_CACHE_MAX_SIZE_MB) * MB;
-        readAheadCacheBatchSize = conf.getInt(READ_AHEAD_CACHE_BATCH_SIZE, DEFAULT_READ_AHEAD_CACHE_BATCH_SIZE);
-
-        long maxThrottleTimeMillis = conf.getLong(MAX_THROTTLE_TIME_MILLIS, DEFAUL_MAX_THROTTLE_TIME_MILLIS);
-        maxThrottleTimeNanos = TimeUnit.MILLISECONDS.toNanos(maxThrottleTimeMillis);
-
-        readCache = new ReadCache(readCacheMaxSize);
+        long writeCacheMaxSize = conf.getLong(WRITE_CACHE_MAX_SIZE_MB, DEFAULT_WRITE_CACHE_MAX_SIZE_MB) * MB;
+        long readCacheMaxSize = conf.getLong(READ_AHEAD_CACHE_MAX_SIZE_MB, DEFAULT_READ_CACHE_MAX_SIZE_MB) * MB;
 
-        this.stats = statsLogger;
+        this.numberOfDirs = ledgerDirsManager.getAllLedgerDirs().size();
 
         log.info("Started Db Ledger Storage");
+        log.info(" - Number of directories: {}", numberOfDirs);
         log.info(" - Write cache size: {} MB", writeCacheMaxSize / MB);
         log.info(" - Read Cache: {} MB", readCacheMaxSize / MB);
-        log.info(" - Read Ahead Batch size: : {}", readAheadCacheBatchSize);
 
-        ledgerIndex = new LedgerMetadataIndex(conf, KeyValueStorageRocksDB.factory, baseDir, stats);
-        entryLocationIndex = new EntryLocationIndex(conf, KeyValueStorageRocksDB.factory, baseDir, stats);
+        long perDirectoryWriteCacheSize = writeCacheMaxSize / numberOfDirs;
+        long perDirectoryReadCacheSize = readCacheMaxSize / numberOfDirs;
 
-        transientLedgerInfoCache = new ConcurrentLongHashMap<>(16 * 1024,
-                Runtime.getRuntime().availableProcessors() * 2);
-        cleanupExecutor.scheduleAtFixedRate(this::cleanupStaleTransientLedgerInfo, LEDGER_INFO_CACHING_TIME_MINUTES,
-                LEDGER_INFO_CACHING_TIME_MINUTES, TimeUnit.MINUTES);
+        gcExecutor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("GarbageCollector"));
 
-        entryLogger = new EntryLogger(conf, ledgerDirsManager);
-        gcThread = new GarbageCollectorThread(conf, ledgerManager, this, statsLogger);
+        ledgerStorageList = Lists.newArrayList();
+        for (File ledgerDir : ledgerDirsManager.getAllLedgerDirs()) {
+            // Create a ledger dirs manager for the single directory
+            File[] dirs = new File[1];
+            // Remove the `/current` suffix which will be appended again by LedgersDirManager
+            dirs[0] = ledgerDir.getParentFile();
+            LedgerDirsManager ldm = new LedgerDirsManager(conf, dirs, ledgerDirsManager.getDiskChecker(), statsLogger);
+            ledgerStorageList.add(newSingleDirectoryDbLedgerStorage(conf, ledgerManager, ldm, indexDirsManager,
+                    stateManager, checkpointSource, checkpointer, statsLogger, gcExecutor, perDirectoryWriteCacheSize,
+                    perDirectoryReadCacheSize));
+        }
 
-        registerStats();
+        registerStats(statsLogger);
     }
 
-    /**
-     * Evict all the ledger info object that were not used recently.
-     */
-    private void cleanupStaleTransientLedgerInfo() {
-        transientLedgerInfoCache.removeIf((ledgerId, ledgerInfo) -> {
-            boolean isStale = ledgerInfo.isStale();
-            if (isStale) {
-                ledgerInfo.close();
-            }
-
-            return isStale;
-        });
+    @VisibleForTesting
+    protected SingleDirectoryDbLedgerStorage newSingleDirectoryDbLedgerStorage(ServerConfiguration conf,
+            LedgerManager ledgerManager, LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager,
+            StateManager stateManager, CheckpointSource checkpointSource, Checkpointer checkpointer,
+            StatsLogger statsLogger, ScheduledExecutorService gcExecutor, long writeCacheSize, long readCacheSize)
+            throws IOException {
+        return new SingleDirectoryDbLedgerStorage(conf, ledgerManager, ledgerDirsManager, indexDirsManager,
+                stateManager, checkpointSource, checkpointer, statsLogger, gcExecutor, writeCacheSize, readCacheSize);
     }
 
-    public void registerStats() {
+    public void registerStats(StatsLogger stats) {
         stats.registerGauge("write-cache-size", new Gauge<Long>() {
             @Override
             public Long getDefaultValue() {
@@ -345,7 +136,7 @@ public class DbLedgerStorage implements CompactableLedgerStorage {
 
             @Override
             public Long getSample() {
-                return writeCache.size() + writeCacheBeingFlushed.size();
+                return ledgerStorageList.stream().mapToLong(SingleDirectoryDbLedgerStorage::getWriteCacheSize).sum();
             }
         });
         stats.registerGauge("write-cache-count", new Gauge<Long>() {
@@ -356,7 +147,7 @@ public class DbLedgerStorage implements CompactableLedgerStorage {
 
             @Override
             public Long getSample() {
-                return writeCache.count() + writeCacheBeingFlushed.count();
+                return ledgerStorageList.stream().mapToLong(SingleDirectoryDbLedgerStorage::getWriteCacheCount).sum();
             }
         });
         stats.registerGauge("read-cache-size", new Gauge<Long>() {
@@ -367,7 +158,7 @@ public class DbLedgerStorage implements CompactableLedgerStorage {
 
             @Override
             public Long getSample() {
-                return readCache.size();
+                return ledgerStorageList.stream().mapToLong(SingleDirectoryDbLedgerStorage::getReadCacheSize).sum();
             }
         });
         stats.registerGauge("read-cache-count", new Gauge<Long>() {
@@ -378,666 +169,142 @@ public class DbLedgerStorage implements CompactableLedgerStorage {
 
             @Override
             public Long getSample() {
-                return readCache.count();
+                return ledgerStorageList.stream().mapToLong(SingleDirectoryDbLedgerStorage::getReadCacheCount).sum();
             }
         });
-
-        addEntryStats = stats.getOpStatsLogger("add-entry");
-        readEntryStats = stats.getOpStatsLogger("read-entry");
-        readCacheHitStats = stats.getOpStatsLogger("read-cache-hits");
-        readCacheMissStats = stats.getOpStatsLogger("read-cache-misses");
-        readAheadBatchCountStats = stats.getOpStatsLogger("readahead-batch-count");
-        readAheadBatchSizeStats = stats.getOpStatsLogger("readahead-batch-size");
-        flushStats = stats.getOpStatsLogger("flush");
-        flushSizeStats = stats.getOpStatsLogger("flush-size");
-
-        throttledWriteRequests = stats.getCounter("throttled-write-requests");
-        rejectedWriteRequests = stats.getCounter("rejected-write-requests");
     }
 
     @Override
     public void start() {
-        gcThread.start();
+        ledgerStorageList.forEach(LedgerStorage::start);
     }
 
     @Override
     public void shutdown() throws InterruptedException {
-        try {
-            flush();
-
-            gcThread.shutdown();
-            entryLogger.shutdown();
-
-            cleanupExecutor.shutdown();
-            cleanupExecutor.awaitTermination(1, TimeUnit.SECONDS);
-
-            ledgerIndex.close();
-            entryLocationIndex.close();
-
-            writeCache.close();
-            writeCacheBeingFlushed.close();
-            readCache.close();
-            executor.shutdown();
-
-        } catch (IOException e) {
-            log.error("Error closing db storage", e);
+        for (LedgerStorage ls : ledgerStorageList) {
+            ls.shutdown();
         }
     }
 
     @Override
     public boolean ledgerExists(long ledgerId) throws IOException {
-        try {
-            LedgerData ledgerData = ledgerIndex.get(ledgerId);
-            if (log.isDebugEnabled()) {
-                log.debug("Ledger exists. ledger: {} : {}", ledgerId, ledgerData.getExists());
-            }
-            return ledgerData.getExists();
-        } catch (Bookie.NoLedgerException nle) {
-            // ledger does not exist
-            return false;
-        }
+        return getLedgerSorage(ledgerId).ledgerExists(ledgerId);
     }
 
     @Override
-    public boolean isFenced(long ledgerId) throws IOException {
-        if (log.isDebugEnabled()) {
-            log.debug("isFenced. ledger: {}", ledgerId);
-        }
-        return ledgerIndex.get(ledgerId).getFenced();
+    public boolean setFenced(long ledgerId) throws IOException {
+        return getLedgerSorage(ledgerId).setFenced(ledgerId);
     }
 
     @Override
-    public boolean setFenced(long ledgerId) throws IOException {
-        if (log.isDebugEnabled()) {
-            log.debug("Set fenced. ledger: {}", ledgerId);
-        }
-        boolean changed = ledgerIndex.setFenced(ledgerId);
-        if (changed) {
-            // notify all the watchers if a ledger is fenced
-            TransientLedgerInfo ledgerInfo = transientLedgerInfoCache.get(ledgerId);
-            if (null != ledgerInfo) {
-                ledgerInfo.notifyWatchers(Long.MAX_VALUE);
-            }
-        }
-        return changed;
+    public boolean isFenced(long ledgerId) throws IOException {
+        return getLedgerSorage(ledgerId).isFenced(ledgerId);
     }
 
     @Override
     public void setMasterKey(long ledgerId, byte[] masterKey) throws IOException {
-        if (log.isDebugEnabled()) {
-            log.debug("Set master key. ledger: {}", ledgerId);
-        }
-        ledgerIndex.setMasterKey(ledgerId, masterKey);
+        getLedgerSorage(ledgerId).setMasterKey(ledgerId, masterKey);
     }
 
     @Override
     public byte[] readMasterKey(long ledgerId) throws IOException, BookieException {
-        if (log.isDebugEnabled()) {
-            log.debug("Read master key. ledger: {}", ledgerId);
-        }
-        return ledgerIndex.get(ledgerId).getMasterKey().toByteArray();
+        return getLedgerSorage(ledgerId).readMasterKey(ledgerId);
     }
 
     @Override
     public long addEntry(ByteBuf entry) throws IOException, BookieException {
-        long startTime = MathUtils.nowInNano();
-
         long ledgerId = entry.getLong(entry.readerIndex());
-        long entryId = entry.getLong(entry.readerIndex() + 8);
-        long lac = entry.getLong(entry.readerIndex() + 16);
-
-        if (log.isDebugEnabled()) {
-            log.debug("Add entry. {}@{}, lac = {}", ledgerId, entryId, lac);
-        }
-
-        // First we try to do an optimistic locking to get access to the current write cache.
-        // This is based on the fact that the write cache is only being rotated (swapped) every 1 minute. During the
-        // rest of the time, we can have multiple thread using the optimistic lock here without interfering.
-        long stamp = writeCacheRotationLock.tryOptimisticRead();
-        boolean inserted = false;
-
-        inserted = writeCache.put(ledgerId, entryId, entry);
-        if (!writeCacheRotationLock.validate(stamp)) {
-            // The write cache was rotated while we were inserting. We need to acquire the proper read lock and repeat
-            // the operation because we might have inserted in a write cache that was already being flushed and cleared,
-            // without being sure about this last entry being flushed or not.
-            stamp = writeCacheRotationLock.readLock();
-            try {
-                inserted = writeCache.put(ledgerId, entryId, entry);
-            } finally {
-                 writeCacheRotationLock.unlockRead(stamp);
-            }
-        }
-
-        if (!inserted) {
-            triggerFlushAndAddEntry(ledgerId, entryId, entry);
-        }
-
-        // after successfully insert the entry, update LAC and notify the watchers
-        updateCachedLacIfNeeded(ledgerId, lac);
-
-        recordSuccessfulEvent(addEntryStats, startTime);
-        return entryId;
-    }
-
-    private void triggerFlushAndAddEntry(long ledgerId, long entryId, ByteBuf entry)
-            throws IOException, BookieException {
-        // Write cache is full, we need to trigger a flush so that it gets rotated
-        // If the flush has already been triggered or flush has already switched the
-        // cache, we don't need to trigger another flush
-        if (!isFlushOngoing.get() && hasFlushBeenTriggered.compareAndSet(false, true)) {
-            // Trigger an early flush in background
-            log.info("Write cache is full, triggering flush");
-            executor.execute(() -> {
-                try {
-                    flush();
-                } catch (IOException e) {
-                    log.error("Error during flush", e);
-                }
-            });
-        }
-
-        throttledWriteRequests.inc();
-        long absoluteTimeoutNanos = System.nanoTime() + maxThrottleTimeNanos;
-
-        while (System.nanoTime() < absoluteTimeoutNanos) {
-            long stamp = writeCacheRotationLock.readLock();
-            try {
-                if (writeCache.put(ledgerId, entryId, entry)) {
-                    // We succeeded in putting the entry in write cache in the
-                    return;
-                }
-            } finally {
-                 writeCacheRotationLock.unlockRead(stamp);
-            }
-
-            // Wait some time and try again
-            try {
-                Thread.sleep(1);
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                throw new IOException("Interrupted when adding entry " + ledgerId + "@" + entryId);
-            }
-        }
-
-        // Timeout expired and we weren't able to insert in write cache
-        rejectedWriteRequests.inc();
-        throw new OperationRejectedException();
+        return getLedgerSorage(ledgerId).addEntry(entry);
     }
 
     @Override
     public ByteBuf getEntry(long ledgerId, long entryId) throws IOException {
-        long startTime = MathUtils.nowInNano();
-        if (log.isDebugEnabled()) {
-            log.debug("Get Entry: {}@{}", ledgerId, entryId);
-        }
-
-        if (entryId == BookieProtocol.LAST_ADD_CONFIRMED) {
-            return getLastEntry(ledgerId);
-        }
-
-        // We need to try to read from both write caches, since recent entries could be found in either of the two. The
-        // write caches are already thread safe on their own, here we just need to make sure we get references to both
-        // of them. Using an optimistic lock since the read lock is always free, unless we're swapping the caches.
-        long stamp = writeCacheRotationLock.tryOptimisticRead();
-        WriteCache localWriteCache = writeCache;
-        WriteCache localWriteCacheBeingFlushed = writeCacheBeingFlushed;
-        if (!writeCacheRotationLock.validate(stamp)) {
-            // Fallback to regular read lock approach
-            stamp = writeCacheRotationLock.readLock();
-            try {
-                localWriteCache = writeCache;
-                localWriteCacheBeingFlushed = writeCacheBeingFlushed;
-            } finally {
-                writeCacheRotationLock.unlockRead(stamp);
-            }
-        }
-
-        // First try to read from the write cache of recent entries
-        ByteBuf entry = localWriteCache.get(ledgerId, entryId);
-        if (entry != null) {
-            recordSuccessfulEvent(readCacheHitStats, startTime);
-            recordSuccessfulEvent(readEntryStats, startTime);
-            return entry;
-        }
-
-        // If there's a flush going on, the entry might be in the flush buffer
-        entry = localWriteCacheBeingFlushed.get(ledgerId, entryId);
-        if (entry != null) {
-            recordSuccessfulEvent(readCacheHitStats, startTime);
-            recordSuccessfulEvent(readEntryStats, startTime);
-            return entry;
-        }
-
-        // Try reading from read-ahead cache
-        entry = readCache.get(ledgerId, entryId);
-        if (entry != null) {
-            recordSuccessfulEvent(readCacheHitStats, startTime);
-            recordSuccessfulEvent(readEntryStats, startTime);
-            return entry;
-        }
-
-        // Read from main storage
-        long entryLocation;
-        try {
-            entryLocation = entryLocationIndex.getLocation(ledgerId, entryId);
-            if (entryLocation == 0) {
-                throw new NoEntryException(ledgerId, entryId);
-            }
-            entry = entryLogger.readEntry(ledgerId, entryId, entryLocation);
-        } catch (NoEntryException e) {
-            recordFailedEvent(readEntryStats, startTime);
-            throw e;
-        }
-
-        readCache.put(ledgerId, entryId, entry);
-
-        // Try to read more entries
-        long nextEntryLocation = entryLocation + 4 /* size header */ + entry.readableBytes();
-        fillReadAheadCache(ledgerId, entryId + 1, nextEntryLocation);
-
-        recordSuccessfulEvent(readCacheMissStats, startTime);
-        recordSuccessfulEvent(readEntryStats, startTime);
-        return entry;
-    }
-
-    private void fillReadAheadCache(long orginalLedgerId, long firstEntryId, long firstEntryLocation) {
-        try {
-            long firstEntryLogId = (firstEntryLocation >> 32);
-            long currentEntryLogId = firstEntryLogId;
-            long currentEntryLocation = firstEntryLocation;
-            int count = 0;
-            long size = 0;
-
-            while (count < readAheadCacheBatchSize && currentEntryLogId == firstEntryLogId) {
-                ByteBuf entry = entryLogger.internalReadEntry(orginalLedgerId, -1, currentEntryLocation);
-
-                try {
-                    long currentEntryLedgerId = entry.getLong(0);
-                    long currentEntryId = entry.getLong(8);
-
-                    if (currentEntryLedgerId != orginalLedgerId) {
-                        // Found an entry belonging to a different ledger, stopping read-ahead
-                        entry.release();
-                        return;
-                    }
-
-                    // Insert entry in read cache
-                    readCache.put(orginalLedgerId, currentEntryId, entry);
-
-                    count++;
-                    size += entry.readableBytes();
-
-                    currentEntryLocation += 4 + entry.readableBytes();
-                    currentEntryLogId = currentEntryLocation >> 32;
-                } finally {
-                    entry.release();
-                }
-            }
-
-            readAheadBatchCountStats.registerSuccessfulValue(count);
-            readAheadBatchSizeStats.registerSuccessfulValue(size);
-        } catch (Exception e) {
-            if (log.isDebugEnabled()) {
-                log.debug("Exception during read ahead for ledger: {}: e", orginalLedgerId, e);
-            }
-        }
-    }
-
-    public ByteBuf getLastEntry(long ledgerId) throws IOException {
-        long startTime = MathUtils.nowInNano();
-
-        long stamp = writeCacheRotationLock.readLock();
-        try {
-            // First try to read from the write cache of recent entries
-            ByteBuf entry = writeCache.getLastEntry(ledgerId);
-            if (entry != null) {
-                if (log.isDebugEnabled()) {
-                    long foundLedgerId = entry.readLong(); // ledgedId
-                    long entryId = entry.readLong();
-                    entry.resetReaderIndex();
-                    if (log.isDebugEnabled()) {
-                        log.debug("Found last entry for ledger {} in write cache: {}@{}", ledgerId, foundLedgerId,
-                                entryId);
-                    }
-                }
-
-                recordSuccessfulEvent(readCacheHitStats, startTime);
-                recordSuccessfulEvent(readEntryStats, startTime);
-                return entry;
-            }
-
-            // If there's a flush going on, the entry might be in the flush buffer
-            entry = writeCacheBeingFlushed.getLastEntry(ledgerId);
-            if (entry != null) {
-                if (log.isDebugEnabled()) {
-                    entry.readLong(); // ledgedId
-                    long entryId = entry.readLong();
-                    entry.resetReaderIndex();
-                    if (log.isDebugEnabled()) {
-                        log.debug("Found last entry for ledger {} in write cache being flushed: {}", ledgerId, entryId);
-                    }
-                }
-
-                recordSuccessfulEvent(readCacheHitStats, startTime);
-                recordSuccessfulEvent(readEntryStats, startTime);
-                return entry;
-            }
-        } finally {
-            writeCacheRotationLock.unlockRead(stamp);
-        }
-
-        // Search the last entry in storage
-        long lastEntryId = entryLocationIndex.getLastEntryInLedger(ledgerId);
-        if (log.isDebugEnabled()) {
-            log.debug("Found last entry for ledger {} in db: {}", ledgerId, lastEntryId);
-        }
-
-        long entryLocation = entryLocationIndex.getLocation(ledgerId, lastEntryId);
-        ByteBuf content = entryLogger.readEntry(ledgerId, lastEntryId, entryLocation);
-
-        recordSuccessfulEvent(readCacheMissStats, startTime);
-        recordSuccessfulEvent(readEntryStats, startTime);
-        return content;
-    }
-
-    @VisibleForTesting
-    boolean isFlushRequired() {
-        long stamp = writeCacheRotationLock.readLock();
-        try {
-            return !writeCache.isEmpty();
-        } finally {
-            writeCacheRotationLock.unlockRead(stamp);
-        }
+        return getLedgerSorage(ledgerId).getEntry(ledgerId, entryId);
     }
 
     @Override
-    public void checkpoint(Checkpoint checkpoint) throws IOException {
-        Checkpoint thisCheckpoint = checkpointSource.newCheckpoint();
-        if (lastCheckpoint.compareTo(checkpoint) > 0) {
-            return;
-        }
-
-        long startTime = MathUtils.nowInNano();
-
-        // Only a single flush operation can happen at a time
-        flushMutex.lock();
-
-        try {
-            // Swap the write cache so that writes can continue to happen while the flush is
-            // ongoing
-            swapWriteCache();
-
-            long sizeToFlush = writeCacheBeingFlushed.size();
-            if (log.isDebugEnabled()) {
-                log.debug("Flushing entries. count: {} -- size {} Mb", writeCacheBeingFlushed.count(),
-                        sizeToFlush / 1024.0 / 1024);
-            }
-
-            // Write all the pending entries into the entry logger and collect the offset
-            // position for each entry
-
-            Batch batch = entryLocationIndex.newBatch();
-            writeCacheBeingFlushed.forEach((ledgerId, entryId, entry) -> {
-                try {
-                    long location = entryLogger.addEntry(ledgerId, entry, true);
-                    entryLocationIndex.addLocation(batch, ledgerId, entryId, location);
-                } catch (IOException e) {
-                    throw new RuntimeException(e);
-                }
-            });
-
-            entryLogger.flush();
-
-            long batchFlushStarTime = System.nanoTime();
-            batch.flush();
-            batch.close();
-            if (log.isDebugEnabled()) {
-                log.debug("DB batch flushed time : {} s",
-                        MathUtils.elapsedNanos(batchFlushStarTime) / (double) TimeUnit.SECONDS.toNanos(1));
-            }
-
-            ledgerIndex.flush();
-
-            cleanupExecutor.execute(() -> {
-                // There can only be one single cleanup task running because the cleanupExecutor
-                // is single-threaded
-                try {
-                    if (log.isDebugEnabled()) {
-                        log.debug("Removing deleted ledgers from db indexes");
-                    }
-
-                    entryLocationIndex.removeOffsetFromDeletedLedgers();
-                    ledgerIndex.removeDeletedLedgers();
-                } catch (Throwable t) {
-                    log.warn("Failed to cleanup db indexes", t);
-                }
-            });
-
-            lastCheckpoint = thisCheckpoint;
-
-            // Discard all the entry from the write cache, since they're now persisted
-            writeCacheBeingFlushed.clear();
-
-            double flushTimeSeconds = MathUtils.elapsedNanos(startTime) / (double) TimeUnit.SECONDS.toNanos(1);
-            double flushThroughput = sizeToFlush / 1024.0 / 1024.0 / flushTimeSeconds;
-
-            if (log.isDebugEnabled()) {
-                log.debug("Flushing done time {} s -- Written {} MB/s", flushTimeSeconds, flushThroughput);
-            }
-
-            recordSuccessfulEvent(flushStats, startTime);
-            flushSizeStats.registerSuccessfulValue(sizeToFlush);
-        } catch (IOException e) {
-            // Leave IOExecption as it is
-            throw e;
-        } catch (RuntimeException e) {
-            // Wrap unchecked exceptions
-            throw new IOException(e);
-        } finally {
-            try {
-                isFlushOngoing.set(false);
-            } finally {
-                flushMutex.unlock();
-            }
-        }
-    }
-
-    /**
-     * Swap the current write cache with the replacement cache.
-     */
-    private void swapWriteCache() {
-        long stamp = writeCacheRotationLock.writeLock();
-        try {
-            // First, swap the current write-cache map with an empty one so that writes will
-            // go on unaffected. Only a single flush is happening at the same time
-            WriteCache tmp = writeCacheBeingFlushed;
-            writeCacheBeingFlushed = writeCache;
-            writeCache = tmp;
-
-            // since the cache is switched, we can allow flush to be triggered
-            hasFlushBeenTriggered.set(false);
-        } finally {
-            try {
-                isFlushOngoing.set(true);
-            } finally {
-                writeCacheRotationLock.unlockWrite(stamp);
-            }
-        }
+    public long getLastAddConfirmed(long ledgerId) throws IOException {
+        return getLedgerSorage(ledgerId).getLastAddConfirmed(ledgerId);
     }
 
     @Override
-    public void flush() throws IOException {
-        Checkpoint cp = checkpointSource.newCheckpoint();
-        checkpoint(cp);
-        checkpointSource.checkpointComplete(cp, true);
+    public boolean waitForLastAddConfirmedUpdate(long ledgerId, long previousLAC,
+            Watcher<LastAddConfirmedUpdateNotification> watcher) throws IOException {
+        return getLedgerSorage(ledgerId).waitForLastAddConfirmedUpdate(ledgerId, previousLAC, watcher);
     }
 
     @Override
-    public void deleteLedger(long ledgerId) throws IOException {
-        if (log.isDebugEnabled()) {
-            log.debug("Deleting ledger {}", ledgerId);
-        }
-
-        // Delete entries from this ledger that are still in the write cache
-        long stamp = writeCacheRotationLock.readLock();
-        try {
-            writeCache.deleteLedger(ledgerId);
-        } finally {
-            writeCacheRotationLock.unlockRead(stamp);
-        }
-
-        entryLocationIndex.delete(ledgerId);
-        ledgerIndex.delete(ledgerId);
-
-        for (int i = 0, size = ledgerDeletionListeners.size(); i < size; i++) {
-            LedgerDeletionListener listener = ledgerDeletionListeners.get(i);
-            listener.ledgerDeleted(ledgerId);
-        }
-
-        TransientLedgerInfo tli = transientLedgerInfoCache.remove(ledgerId);
-        if (tli != null) {
-            tli.close();
+    public void flush() throws IOException {
+        for (LedgerStorage ls : ledgerStorageList) {
+            ls.flush();
         }
     }
 
     @Override
-    public Iterable<Long> getActiveLedgersInRange(long firstLedgerId, long lastLedgerId) throws IOException {
-        return ledgerIndex.getActiveLedgersInRange(firstLedgerId, lastLedgerId);
-    }
-
-    @Override
-    public void updateEntriesLocations(Iterable<EntryLocation> locations) throws IOException {
-        // Trigger a flush to have all the entries being compacted in the db storage
-        flush();
-
-        entryLocationIndex.updateLocations(locations);
-    }
-
-    @Override
-    public EntryLogger getEntryLogger() {
-        return entryLogger;
+    public void checkpoint(Checkpoint checkpoint) throws IOException {
+        for (LedgerStorage ls : ledgerStorageList) {
+            ls.checkpoint(checkpoint);
+        }
     }
 
     @Override
-    public long getLastAddConfirmed(long ledgerId) throws IOException {
-        TransientLedgerInfo ledgerInfo = transientLedgerInfoCache.get(ledgerId);
-        long lac = null != ledgerInfo ? ledgerInfo.getLastAddConfirmed() : NOT_ASSIGNED_LAC;
-        if (lac == NOT_ASSIGNED_LAC) {
-            ByteBuf bb = getEntry(ledgerId, BookieProtocol.LAST_ADD_CONFIRMED);
-            try {
-                bb.skipBytes(2 * Long.BYTES); // skip ledger id and entry id
-                lac = bb.readLong();
-                lac = getOrAddLedgerInfo(ledgerId).setLastAddConfirmed(lac);
-            } finally {
-                bb.release();
-            }
-        }
-        return lac;
+    public void deleteLedger(long ledgerId) throws IOException {
+        getLedgerSorage(ledgerId).deleteLedger(ledgerId);
     }
 
     @Override
-    public boolean waitForLastAddConfirmedUpdate(long ledgerId, long previousLAC,
-            Watcher<LastAddConfirmedUpdateNotification> watcher) throws IOException {
-        return getOrAddLedgerInfo(ledgerId).waitForLastAddConfirmedUpdate(previousLAC, watcher);
+    public void registerLedgerDeletionListener(LedgerDeletionListener listener) {
+        ledgerStorageList.forEach(ls -> ls.registerLedgerDeletionListener(listener));
     }
 
     @Override
     public void setExplicitlac(long ledgerId, ByteBuf lac) throws IOException {
-        getOrAddLedgerInfo(ledgerId).setExplicitLac(lac);
+        getLedgerSorage(ledgerId).setExplicitlac(ledgerId, lac);
     }
 
     @Override
     public ByteBuf getExplicitLac(long ledgerId) {
-        TransientLedgerInfo ledgerInfo = transientLedgerInfoCache.get(ledgerId);
-        if (null == ledgerInfo) {
-            return null;
-        } else {
-            return ledgerInfo.getExplicitLac();
-        }
+        return getLedgerSorage(ledgerId).getExplicitLac(ledgerId);
     }
 
-    private TransientLedgerInfo getOrAddLedgerInfo(long ledgerId) {
-        TransientLedgerInfo tli = transientLedgerInfoCache.get(ledgerId);
-        if (tli != null) {
-            return tli;
-        } else {
-            TransientLedgerInfo newTli = new TransientLedgerInfo(ledgerId, ledgerIndex);
-            tli = transientLedgerInfoCache.putIfAbsent(ledgerId, newTli);
-            if (tli != null) {
-                newTli.close();
-                return tli;
-            } else {
-                return newTli;
-            }
-        }
+    public long addLedgerToIndex(long ledgerId, boolean isFenced, byte[] masterKey,
+            Iterable<SortedMap<Long, Long>> entries) throws Exception {
+        return getLedgerSorage(ledgerId).addLedgerToIndex(ledgerId, isFenced, masterKey, entries);
     }
 
-    private void updateCachedLacIfNeeded(long ledgerId, long lac) {
-        TransientLedgerInfo tli = transientLedgerInfoCache.get(ledgerId);
-        if (tli != null) {
-            tli.setLastAddConfirmed(lac);
-        }
+    public long getLastEntryInLedger(long ledgerId) throws IOException {
+        return getLedgerSorage(ledgerId).getEntryLocationIndex().getLastEntryInLedger(ledgerId);
     }
 
-    @Override
-    public void flushEntriesLocationsIndex() throws IOException {
-        // No-op. Location index is already flushed in updateEntriesLocations() call
+    public long getLocation(long ledgerId, long entryId) throws IOException {
+        return getLedgerSorage(ledgerId).getEntryLocationIndex().getLocation(ledgerId, entryId);
     }
 
-    /**
-     * Add an already existing ledger to the index.
-     *
-     * <p>This method is only used as a tool to help the migration from InterleaveLedgerStorage to DbLedgerStorage
-     *
-     * @param ledgerId
-     *            the ledger id
-     * @param entries
-     *            a map of entryId -> location
-     * @return the number of
-     */
-    public long addLedgerToIndex(long ledgerId, boolean isFenced, byte[] masterKey,
-            Iterable<SortedMap<Long, Long>> entries) throws Exception {
-        LedgerData ledgerData = LedgerData.newBuilder().setExists(true).setFenced(isFenced)
-                .setMasterKey(ByteString.copyFrom(masterKey)).build();
-        ledgerIndex.set(ledgerId, ledgerData);
-        AtomicLong numberOfEntries = new AtomicLong();
-
-        // Iterate over all the entries pages
-        Batch batch = entryLocationIndex.newBatch();
-        entries.forEach(map -> {
-            map.forEach((entryId, location) -> {
-                try {
-                    entryLocationIndex.addLocation(batch, ledgerId, entryId, location);
-                } catch (IOException e) {
-                    throw new RuntimeException(e);
-                }
-
-                numberOfEntries.incrementAndGet();
-            });
-        });
-
-        batch.flush();
-        batch.close();
-
-        return numberOfEntries.get();
+    private SingleDirectoryDbLedgerStorage getLedgerSorage(long ledgerId) {
+        return ledgerStorageList.get(MathUtils.signSafeMod(ledgerId, numberOfDirs));
     }
 
-    @Override
-    public void registerLedgerDeletionListener(LedgerDeletionListener listener) {
-        ledgerDeletionListeners.add(listener);
+    public Iterable<Long> getActiveLedgersInRange(long firstLedgerId, long lastLedgerId) throws IOException {
+        List<Iterable<Long>> listIt = new ArrayList<>(numberOfDirs);
+        for (SingleDirectoryDbLedgerStorage ls : ledgerStorageList) {
+            listIt.add(ls.getActiveLedgersInRange(firstLedgerId, lastLedgerId));
+        }
+
+        return Iterables.concat(listIt);
     }
 
-    public EntryLocationIndex getEntryLocationIndex() {
-        return entryLocationIndex;
+    public ByteBuf getLastEntry(long ledgerId) throws IOException {
+        return getLedgerSorage(ledgerId).getLastEntry(ledgerId);
     }
 
-    private void recordSuccessfulEvent(OpStatsLogger logger, long startTimeNanos) {
-        logger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS);
+    @VisibleForTesting
+    boolean isFlushRequired() {
+        return ledgerStorageList.stream().allMatch(SingleDirectoryDbLedgerStorage::isFlushRequired);
     }
 
-    private void recordFailedEvent(OpStatsLogger logger, long startTimeNanos) {
-        logger.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS);
+    @VisibleForTesting
+    List<SingleDirectoryDbLedgerStorage> getLedgerStorageList() {
+        return ledgerStorageList;
     }
 
     /**
@@ -1056,7 +323,10 @@ public class DbLedgerStorage implements CompactableLedgerStorage {
 
         LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(serverConf, serverConf.getLedgerDirs(),
                 new DiskChecker(serverConf.getDiskUsageThreshold(), serverConf.getDiskUsageWarnThreshold()));
-        String ledgerBasePath = ledgerDirsManager.getAllLedgerDirs().get(0).toString();
+        List<File> ledgerDirs = ledgerDirsManager.getAllLedgerDirs();
+
+        int dirIndex = MathUtils.signSafeMod(ledgerId, ledgerDirs.size());
+        String ledgerBasePath = ledgerDirs.get(dirIndex).toString();
 
         EntryLocationIndex entryLocationIndex = new EntryLocationIndex(serverConf,
                 (path, dbConfigType, conf1) -> new KeyValueStorageRocksDB(path, DbConfigType.Small, conf1, true),
@@ -1078,12 +348,4 @@ public class DbLedgerStorage implements CompactableLedgerStorage {
         }
     }
 
-    /**
-     * Interface which process ledger logger.
-     */
-    public interface LedgerLoggerProcessor {
-        void process(long entryId, long entryLogId, long position);
-    }
-
-    private static final Logger log = LoggerFactory.getLogger(DbLedgerStorage.class);
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
similarity index 78%
copy from bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
copy to bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
index 55b0945..c8e784a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
@@ -21,19 +21,15 @@
 package org.apache.bookkeeper.bookie.storage.ldb;
 
 import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static org.apache.bookkeeper.bookie.LastAddConfirmedUpdateNotification.WATCHER_RECYCLER;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.protobuf.ByteString;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
 import io.netty.util.concurrent.DefaultThreadFactory;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.SortedMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutorService;
@@ -61,158 +57,34 @@ import org.apache.bookkeeper.bookie.LedgerDirsManager;
 import org.apache.bookkeeper.bookie.StateManager;
 import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorageDataFormats.LedgerData;
 import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorage.Batch;
-import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorageFactory.DbConfigType;
-import org.apache.bookkeeper.common.util.Watchable;
 import org.apache.bookkeeper.common.util.Watcher;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.proto.BookieProtocol;
 import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.Gauge;
-import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.util.DiskChecker;
 import org.apache.bookkeeper.util.MathUtils;
 import org.apache.bookkeeper.util.collections.ConcurrentLongHashMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Implementation of LedgerStorage that uses RocksDB to keep the indexes for entries stored in EntryLogs.
+ * Single directory implementation of LedgerStorage that uses RocksDB to keep the indexes for entries stored in
+ * EntryLogs.
+ *
+ * <p>This is meant only to be used from {@link DbLedgerStorage}.
  */
-public class DbLedgerStorage implements CompactableLedgerStorage {
-
-    private static final long NOT_ASSIGNED_LAC = Long.MIN_VALUE;
-
-    /**
-     * This class borrows the logic from FileInfo.
-     *
-     * <p>This class is used for holding all the transient states for a given ledger.
-     */
-    private static class TransientLedgerInfo extends Watchable<LastAddConfirmedUpdateNotification>
-            implements AutoCloseable {
-
-        // lac
-        private volatile long lac = NOT_ASSIGNED_LAC;
-        // request from explicit lac requests
-        private ByteBuffer explicitLac = null;
-        // is the ledger info closed?
-        private boolean isClosed;
-
-        private final long ledgerId;
-        // reference to LedgerMetadataIndex
-        private final LedgerMetadataIndex ledgerIndex;
-
-        private long lastAccessed;
-
-        /**
-         * Construct an Watchable with zero watchers.
-         */
-        public TransientLedgerInfo(long ledgerId, LedgerMetadataIndex ledgerIndex) {
-            super(WATCHER_RECYCLER);
-            this.ledgerId = ledgerId;
-            this.ledgerIndex = ledgerIndex;
-            this.lastAccessed = System.currentTimeMillis();
-        }
-
-        long getLastAddConfirmed() {
-            return lac;
-        }
-
-        long setLastAddConfirmed(long lac) {
-            long lacToReturn;
-            boolean changed = false;
-            synchronized (this) {
-                if (this.lac == NOT_ASSIGNED_LAC || this.lac < lac) {
-                    this.lac = lac;
-                    changed = true;
-                    lastAccessed = System.currentTimeMillis();
-                }
-                lacToReturn = this.lac;
-            }
-            if (changed) {
-                notifyWatchers(lacToReturn);
-            }
-            return lacToReturn;
-        }
+public class SingleDirectoryDbLedgerStorage implements CompactableLedgerStorage {
+    private final EntryLogger entryLogger;
 
-        synchronized boolean waitForLastAddConfirmedUpdate(long previousLAC,
-                Watcher<LastAddConfirmedUpdateNotification> watcher) throws IOException {
-            lastAccessed = System.currentTimeMillis();
-            if ((lac != NOT_ASSIGNED_LAC && lac > previousLAC) || isClosed || ledgerIndex.get(ledgerId).getFenced()) {
-                return false;
-            }
-
-            addWatcher(watcher);
-            return true;
-        }
-
-        public ByteBuf getExplicitLac() {
-            ByteBuf retLac = null;
-            synchronized (this) {
-                if (explicitLac != null) {
-                    retLac = Unpooled.buffer(explicitLac.capacity());
-                    explicitLac.rewind(); // copy from the beginning
-                    retLac.writeBytes(explicitLac);
-                    explicitLac.rewind();
-                    return retLac;
-                }
-            }
-            return retLac;
-        }
+    private final LedgerMetadataIndex ledgerIndex;
+    private final EntryLocationIndex entryLocationIndex;
 
-        public void setExplicitLac(ByteBuf lac) {
-            long explicitLacValue;
-            synchronized (this) {
-                if (explicitLac == null) {
-                    explicitLac = ByteBuffer.allocate(lac.capacity());
-                }
-                lac.readBytes(explicitLac);
-                explicitLac.rewind();
-
-                // skip the ledger id
-                explicitLac.getLong();
-                explicitLacValue = explicitLac.getLong();
-                explicitLac.rewind();
+    private final ConcurrentLongHashMap<TransientLedgerInfo> transientLedgerInfoCache;
 
-                lastAccessed = System.currentTimeMillis();
-            }
-            setLastAddConfirmed(explicitLacValue);
-        }
-
-        boolean isStale() {
-            return (lastAccessed + TimeUnit.MINUTES.toMillis(LEDGER_INFO_CACHING_TIME_MINUTES)) < System
-                    .currentTimeMillis();
-        }
-
-        void notifyWatchers(long lastAddConfirmed) {
-            notifyWatchers(LastAddConfirmedUpdateNotification.FUNC, lastAddConfirmed);
-        }
-
-        @Override
-        public void close() {
-            synchronized (this) {
-                if (isClosed) {
-                    return;
-                }
-                isClosed = true;
-            }
-            // notify watchers
-            notifyWatchers(Long.MAX_VALUE);
-        }
-
-    }
-
-    private EntryLogger entryLogger;
-
-    private LedgerMetadataIndex ledgerIndex;
-    private EntryLocationIndex entryLocationIndex;
-
-    private static final long LEDGER_INFO_CACHING_TIME_MINUTES = 10;
-    private ConcurrentLongHashMap<TransientLedgerInfo> transientLedgerInfoCache;
-
-    private GarbageCollectorThread gcThread;
+    private final GarbageCollectorThread gcThread;
 
     // Write cache where all new entries are inserted into
     protected volatile WriteCache writeCache;
@@ -221,7 +93,7 @@ public class DbLedgerStorage implements CompactableLedgerStorage {
     protected volatile WriteCache writeCacheBeingFlushed;
 
     // Cache where we insert entries for speculative reading
-    private ReadCache readCache;
+    private final ReadCache readCache;
 
     private final StampedLock writeCacheRotationLock = new StampedLock();
 
@@ -236,107 +108,77 @@ public class DbLedgerStorage implements CompactableLedgerStorage {
     private final ScheduledExecutorService cleanupExecutor = Executors
             .newSingleThreadScheduledExecutor(new DefaultThreadFactory("db-storage-cleanup"));
 
-    static final String WRITE_CACHE_MAX_SIZE_MB = "dbStorage_writeCacheMaxSizeMb";
-    static final String READ_AHEAD_CACHE_BATCH_SIZE = "dbStorage_readAheadCacheBatchSize";
-    static final String READ_AHEAD_CACHE_MAX_SIZE_MB = "dbStorage_readAheadCacheMaxSizeMb";
-
-    static final String MAX_THROTTLE_TIME_MILLIS = "dbStorage_maxThrottleTimeMs";
-
-    private static final long DEFAULT_WRITE_CACHE_MAX_SIZE_MB = 16;
-    private static final long DEFAULT_READ_CACHE_MAX_SIZE_MB = 16;
-    private static final int DEFAULT_READ_AHEAD_CACHE_BATCH_SIZE = 100;
-
-    private static final long DEFAUL_MAX_THROTTLE_TIME_MILLIS = TimeUnit.SECONDS.toMillis(10);
-
-    private static final int MB = 1024 * 1024;
-
     private final CopyOnWriteArrayList<LedgerDeletionListener> ledgerDeletionListeners = Lists
             .newCopyOnWriteArrayList();
 
-    private long writeCacheMaxSize;
-
-    private CheckpointSource checkpointSource = null;
+    private final CheckpointSource checkpointSource;
     private Checkpoint lastCheckpoint = Checkpoint.MIN;
 
-    private long readCacheMaxSize;
-    private int readAheadCacheBatchSize;
+    private final long writeCacheMaxSize;
+    private final long readCacheMaxSize;
+    private final int readAheadCacheBatchSize;
 
-    private long maxThrottleTimeNanos;
+    private final long maxThrottleTimeNanos;
 
-    private StatsLogger stats;
+    private final StatsLogger stats;
 
-    private OpStatsLogger addEntryStats;
-    private OpStatsLogger readEntryStats;
-    private OpStatsLogger readCacheHitStats;
-    private OpStatsLogger readCacheMissStats;
-    private OpStatsLogger readAheadBatchCountStats;
-    private OpStatsLogger readAheadBatchSizeStats;
-    private OpStatsLogger flushStats;
-    private OpStatsLogger flushSizeStats;
+    private final OpStatsLogger addEntryStats;
+    private final OpStatsLogger readEntryStats;
+    private final OpStatsLogger readCacheHitStats;
+    private final OpStatsLogger readCacheMissStats;
+    private final OpStatsLogger readAheadBatchCountStats;
+    private final OpStatsLogger readAheadBatchSizeStats;
+    private final OpStatsLogger flushStats;
+    private final OpStatsLogger flushSizeStats;
 
-    private Counter throttledWriteRequests;
-    private Counter rejectedWriteRequests;
+    private final Counter throttledWriteRequests;
+    private final Counter rejectedWriteRequests;
+
+    static final String READ_AHEAD_CACHE_BATCH_SIZE = "dbStorage_readAheadCacheBatchSize";
+    private static final int DEFAULT_READ_AHEAD_CACHE_BATCH_SIZE = 100;
+
+    private static final long DEFAULT_MAX_THROTTLE_TIME_MILLIS = TimeUnit.SECONDS.toMillis(10);
+
+    public SingleDirectoryDbLedgerStorage(ServerConfiguration conf, LedgerManager ledgerManager,
+            LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager, StateManager stateManager,
+            CheckpointSource checkpointSource, Checkpointer checkpointer, StatsLogger statsLogger,
+            ScheduledExecutorService gcExecutor, long writeCacheSize, long readCacheSize) throws IOException {
 
-    @Override
-    public void initialize(ServerConfiguration conf, LedgerManager ledgerManager, LedgerDirsManager ledgerDirsManager,
-            LedgerDirsManager indexDirsManager, StateManager stateManager, CheckpointSource checkpointSource,
-            Checkpointer checkpointer, StatsLogger statsLogger) throws IOException {
         checkArgument(ledgerDirsManager.getAllLedgerDirs().size() == 1,
                 "Db implementation only allows for one storage dir");
 
         String baseDir = ledgerDirsManager.getAllLedgerDirs().get(0).toString();
+        log.info("Creating single directory db ledger storage on {}", baseDir);
 
-        writeCacheMaxSize = conf.getLong(WRITE_CACHE_MAX_SIZE_MB, DEFAULT_WRITE_CACHE_MAX_SIZE_MB) * MB;
-
-        writeCache = new WriteCache(writeCacheMaxSize / 2);
-        writeCacheBeingFlushed = new WriteCache(writeCacheMaxSize / 2);
+        this.writeCacheMaxSize = writeCacheSize;
+        this.writeCache = new WriteCache(writeCacheMaxSize / 2);
+        this.writeCacheBeingFlushed = new WriteCache(writeCacheMaxSize / 2);
 
         this.checkpointSource = checkpointSource;
 
-        readCacheMaxSize = conf.getLong(READ_AHEAD_CACHE_MAX_SIZE_MB, DEFAULT_READ_CACHE_MAX_SIZE_MB) * MB;
+        readCacheMaxSize = readCacheSize;
         readAheadCacheBatchSize = conf.getInt(READ_AHEAD_CACHE_BATCH_SIZE, DEFAULT_READ_AHEAD_CACHE_BATCH_SIZE);
 
-        long maxThrottleTimeMillis = conf.getLong(MAX_THROTTLE_TIME_MILLIS, DEFAUL_MAX_THROTTLE_TIME_MILLIS);
+        long maxThrottleTimeMillis = conf.getLong(DbLedgerStorage.MAX_THROTTLE_TIME_MILLIS,
+                DEFAULT_MAX_THROTTLE_TIME_MILLIS);
         maxThrottleTimeNanos = TimeUnit.MILLISECONDS.toNanos(maxThrottleTimeMillis);
 
         readCache = new ReadCache(readCacheMaxSize);
 
         this.stats = statsLogger;
 
-        log.info("Started Db Ledger Storage");
-        log.info(" - Write cache size: {} MB", writeCacheMaxSize / MB);
-        log.info(" - Read Cache: {} MB", readCacheMaxSize / MB);
-        log.info(" - Read Ahead Batch size: : {}", readAheadCacheBatchSize);
-
         ledgerIndex = new LedgerMetadataIndex(conf, KeyValueStorageRocksDB.factory, baseDir, stats);
         entryLocationIndex = new EntryLocationIndex(conf, KeyValueStorageRocksDB.factory, baseDir, stats);
 
         transientLedgerInfoCache = new ConcurrentLongHashMap<>(16 * 1024,
                 Runtime.getRuntime().availableProcessors() * 2);
-        cleanupExecutor.scheduleAtFixedRate(this::cleanupStaleTransientLedgerInfo, LEDGER_INFO_CACHING_TIME_MINUTES,
-                LEDGER_INFO_CACHING_TIME_MINUTES, TimeUnit.MINUTES);
+        cleanupExecutor.scheduleAtFixedRate(this::cleanupStaleTransientLedgerInfo,
+                TransientLedgerInfo.LEDGER_INFO_CACHING_TIME_MINUTES,
+                TransientLedgerInfo.LEDGER_INFO_CACHING_TIME_MINUTES, TimeUnit.MINUTES);
 
         entryLogger = new EntryLogger(conf, ledgerDirsManager);
         gcThread = new GarbageCollectorThread(conf, ledgerManager, this, statsLogger);
 
-        registerStats();
-    }
-
-    /**
-     * Evict all the ledger info object that were not used recently.
-     */
-    private void cleanupStaleTransientLedgerInfo() {
-        transientLedgerInfoCache.removeIf((ledgerId, ledgerInfo) -> {
-            boolean isStale = ledgerInfo.isStale();
-            if (isStale) {
-                ledgerInfo.close();
-            }
-
-            return isStale;
-        });
-    }
-
-    public void registerStats() {
         stats.registerGauge("write-cache-size", new Gauge<Long>() {
             @Override
             public Long getDefaultValue() {
@@ -396,6 +238,27 @@ public class DbLedgerStorage implements CompactableLedgerStorage {
     }
 
     @Override
+    public void initialize(ServerConfiguration conf, LedgerManager ledgerManager, LedgerDirsManager ledgerDirsManager,
+            LedgerDirsManager indexDirsManager, StateManager stateManager, CheckpointSource checkpointSource,
+            Checkpointer checkpointer, StatsLogger statsLogger) throws IOException {
+        /// Initialized in constructor
+    }
+
+    /**
+     * Evict all the ledger info object that were not used recently.
+     */
+    private void cleanupStaleTransientLedgerInfo() {
+        transientLedgerInfoCache.removeIf((ledgerId, ledgerInfo) -> {
+            boolean isStale = ledgerInfo.isStale();
+            if (isStale) {
+                ledgerInfo.close();
+            }
+
+            return isStale;
+        });
+    }
+
+    @Override
     public void start() {
         gcThread.start();
     }
@@ -505,7 +368,7 @@ public class DbLedgerStorage implements CompactableLedgerStorage {
             try {
                 inserted = writeCache.put(ledgerId, entryId, entry);
             } finally {
-                 writeCacheRotationLock.unlockRead(stamp);
+                writeCacheRotationLock.unlockRead(stamp);
             }
         }
 
@@ -548,7 +411,7 @@ public class DbLedgerStorage implements CompactableLedgerStorage {
                     return;
                 }
             } finally {
-                 writeCacheRotationLock.unlockRead(stamp);
+                writeCacheRotationLock.unlockRead(stamp);
             }
 
             // Wait some time and try again
@@ -922,8 +785,8 @@ public class DbLedgerStorage implements CompactableLedgerStorage {
     @Override
     public long getLastAddConfirmed(long ledgerId) throws IOException {
         TransientLedgerInfo ledgerInfo = transientLedgerInfoCache.get(ledgerId);
-        long lac = null != ledgerInfo ? ledgerInfo.getLastAddConfirmed() : NOT_ASSIGNED_LAC;
-        if (lac == NOT_ASSIGNED_LAC) {
+        long lac = null != ledgerInfo ? ledgerInfo.getLastAddConfirmed() : TransientLedgerInfo.NOT_ASSIGNED_LAC;
+        if (lac == TransientLedgerInfo.NOT_ASSIGNED_LAC) {
             ByteBuf bb = getEntry(ledgerId, BookieProtocol.LAST_ADD_CONFIRMED);
             try {
                 bb.skipBytes(2 * Long.BYTES); // skip ledger id and entry id
@@ -1040,42 +903,20 @@ public class DbLedgerStorage implements CompactableLedgerStorage {
         logger.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS);
     }
 
-    /**
-     * Reads ledger index entries to get list of entry-logger that contains given ledgerId.
-     *
-     * @param ledgerId
-     * @param serverConf
-     * @param processor
-     * @throws IOException
-     */
-    public static void readLedgerIndexEntries(long ledgerId, ServerConfiguration serverConf,
-            LedgerLoggerProcessor processor) throws IOException {
+    long getWriteCacheSize() {
+        return writeCache.size() + writeCacheBeingFlushed.size();
+    }
 
-        checkNotNull(serverConf, "ServerConfiguration can't be null");
-        checkNotNull(processor, "LedgerLoggger info processor can't null");
+    long getWriteCacheCount() {
+        return writeCache.count() + writeCacheBeingFlushed.count();
+    }
 
-        LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(serverConf, serverConf.getLedgerDirs(),
-                new DiskChecker(serverConf.getDiskUsageThreshold(), serverConf.getDiskUsageWarnThreshold()));
-        String ledgerBasePath = ledgerDirsManager.getAllLedgerDirs().get(0).toString();
+    long getReadCacheSize() {
+        return readCache.size();
+    }
 
-        EntryLocationIndex entryLocationIndex = new EntryLocationIndex(serverConf,
-                (path, dbConfigType, conf1) -> new KeyValueStorageRocksDB(path, DbConfigType.Small, conf1, true),
-                ledgerBasePath, NullStatsLogger.INSTANCE);
-        try {
-            long lastEntryId = entryLocationIndex.getLastEntryInLedger(ledgerId);
-            for (long currentEntry = 0; currentEntry <= lastEntryId; currentEntry++) {
-                long offset = entryLocationIndex.getLocation(ledgerId, currentEntry);
-                if (offset <= 0) {
-                    // entry not found in this bookie
-                    continue;
-                }
-                long entryLogId = offset >> 32L;
-                long position = offset & 0xffffffffL;
-                processor.process(currentEntry, entryLogId, position);
-            }
-        } finally {
-            entryLocationIndex.close();
-        }
+    long getReadCacheCount() {
+        return readCache.count();
     }
 
     /**
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/TransientLedgerInfo.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/TransientLedgerInfo.java
new file mode 100644
index 0000000..27d63e8
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/TransientLedgerInfo.java
@@ -0,0 +1,156 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import static org.apache.bookkeeper.bookie.LastAddConfirmedUpdateNotification.WATCHER_RECYCLER;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.bookkeeper.bookie.LastAddConfirmedUpdateNotification;
+import org.apache.bookkeeper.common.util.Watchable;
+import org.apache.bookkeeper.common.util.Watcher;
+
+/**
+ * This class borrows the logic from FileInfo.
+ *
+ * <p>This class is used for holding all the transient states for a given ledger.
+ */
+class TransientLedgerInfo extends Watchable<LastAddConfirmedUpdateNotification> implements AutoCloseable {
+
+    static final long LEDGER_INFO_CACHING_TIME_MINUTES = 10;
+
+    static final long NOT_ASSIGNED_LAC = Long.MIN_VALUE;
+
+    // lac
+    private volatile long lac = NOT_ASSIGNED_LAC;
+    // request from explicit lac requests
+    private ByteBuffer explicitLac = null;
+    // is the ledger info closed?
+    private boolean isClosed;
+
+    private final long ledgerId;
+    // reference to LedgerMetadataIndex
+    private final LedgerMetadataIndex ledgerIndex;
+
+    private long lastAccessed;
+
+    /**
+     * Construct an Watchable with zero watchers.
+     */
+    public TransientLedgerInfo(long ledgerId, LedgerMetadataIndex ledgerIndex) {
+        super(WATCHER_RECYCLER);
+        this.ledgerId = ledgerId;
+        this.ledgerIndex = ledgerIndex;
+        this.lastAccessed = System.currentTimeMillis();
+    }
+
+    long getLastAddConfirmed() {
+        return lac;
+    }
+
+    long setLastAddConfirmed(long lac) {
+        long lacToReturn;
+        boolean changed = false;
+        synchronized (this) {
+            if (this.lac == NOT_ASSIGNED_LAC || this.lac < lac) {
+                this.lac = lac;
+                changed = true;
+                lastAccessed = System.currentTimeMillis();
+            }
+            lacToReturn = this.lac;
+        }
+        if (changed) {
+            notifyWatchers(lacToReturn);
+        }
+        return lacToReturn;
+    }
+
+    synchronized boolean waitForLastAddConfirmedUpdate(long previousLAC,
+            Watcher<LastAddConfirmedUpdateNotification> watcher) throws IOException {
+        lastAccessed = System.currentTimeMillis();
+        if ((lac != NOT_ASSIGNED_LAC && lac > previousLAC) || isClosed || ledgerIndex.get(ledgerId).getFenced()) {
+            return false;
+        }
+
+        addWatcher(watcher);
+        return true;
+    }
+
+    public ByteBuf getExplicitLac() {
+        ByteBuf retLac = null;
+        synchronized (this) {
+            if (explicitLac != null) {
+                retLac = Unpooled.buffer(explicitLac.capacity());
+                explicitLac.rewind(); // copy from the beginning
+                retLac.writeBytes(explicitLac);
+                explicitLac.rewind();
+                return retLac;
+            }
+        }
+        return retLac;
+    }
+
+    public void setExplicitLac(ByteBuf lac) {
+        long explicitLacValue;
+        synchronized (this) {
+            if (explicitLac == null) {
+                explicitLac = ByteBuffer.allocate(lac.capacity());
+            }
+            lac.readBytes(explicitLac);
+            explicitLac.rewind();
+
+            // skip the ledger id
+            explicitLac.getLong();
+            explicitLacValue = explicitLac.getLong();
+            explicitLac.rewind();
+
+            lastAccessed = System.currentTimeMillis();
+        }
+        setLastAddConfirmed(explicitLacValue);
+    }
+
+    boolean isStale() {
+        return (lastAccessed + TimeUnit.MINUTES.toMillis(LEDGER_INFO_CACHING_TIME_MINUTES)) < System
+                .currentTimeMillis();
+    }
+
+    void notifyWatchers(long lastAddConfirmed) {
+        notifyWatchers(LastAddConfirmedUpdateNotification.FUNC, lastAddConfirmed);
+    }
+
+    @Override
+    public void close() {
+        synchronized (this) {
+            if (isClosed) {
+                return;
+            }
+            isClosed = true;
+        }
+        // notify watchers
+        notifyWatchers(Long.MAX_VALUE);
+    }
+
+}
\ No newline at end of file
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java
index f4d67f2..67c69fe 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java
@@ -220,7 +220,8 @@ public class DbLedgerStorageTest {
         storage.addEntry(entry3);
 
         // Simulate bookie compaction
-        EntryLogger entryLogger = ((DbLedgerStorage) storage).getEntryLogger();
+        SingleDirectoryDbLedgerStorage singleDirStorage = ((DbLedgerStorage) storage).getLedgerStorageList().get(0);
+        EntryLogger entryLogger = singleDirStorage.getEntryLogger();
         // Rewrite entry-3
         ByteBuf newEntry3 = Unpooled.buffer(1024);
         newEntry3.writeLong(4); // ledger id
@@ -229,7 +230,7 @@ public class DbLedgerStorageTest {
         long location = entryLogger.addEntry(4, newEntry3, false);
 
         List<EntryLocation> locations = Lists.newArrayList(new EntryLocation(4, 3, location));
-        storage.updateEntriesLocations(locations);
+        singleDirStorage.updateEntriesLocations(locations);
 
         ByteBuf res = storage.getEntry(4, 3);
         System.out.println("res:       " + ByteBufUtil.hexDump(res));
@@ -238,20 +239,18 @@ public class DbLedgerStorageTest {
     }
 
     @Test
-    public void doubleDirectoryError() throws Exception {
+    public void doubleDirectory() throws Exception {
         int gcWaitTime = 1000;
         ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
         conf.setGcWaitTime(gcWaitTime);
         conf.setLedgerStorageClass(DbLedgerStorage.class.getName());
         conf.setLedgerDirNames(new String[] { "dir1", "dir2" });
 
-        try {
-            new Bookie(conf);
-            fail("Should have failed because of the 2 directories");
-        } catch (IllegalArgumentException e) {
-            // ok
-        }
+        // Should not fail
+        Bookie bookie = new Bookie(conf);
+        assertEquals(2, ((DbLedgerStorage) bookie.getLedgerStorage()).getLedgerStorageList().size());
 
+        bookie.shutdown();
     }
 
     @Test
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWriteCacheTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWriteCacheTest.java
index 4894814..7a45ee7 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWriteCacheTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageWriteCacheTest.java
@@ -28,11 +28,18 @@ import io.netty.buffer.Unpooled;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.bookie.BookieException.OperationRejectedException;
+import org.apache.bookkeeper.bookie.CheckpointSource;
+import org.apache.bookkeeper.bookie.Checkpointer;
+import org.apache.bookkeeper.bookie.LedgerDirsManager;
+import org.apache.bookkeeper.bookie.StateManager;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.conf.TestBKConfiguration;
+import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.stats.StatsLogger;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -48,29 +55,49 @@ public class DbLedgerStorageWriteCacheTest {
     private static class MockedDbLedgerStorage extends DbLedgerStorage {
 
         @Override
-        public void flush() throws IOException {
-            flushMutex.lock();
-            try {
-                // Swap the write caches and block indefinitely to simulate a slow disk
-                WriteCache tmp = writeCacheBeingFlushed;
-                writeCacheBeingFlushed = writeCache;
-                writeCache = tmp;
-
-                // since the cache is switched, we can allow flush to be triggered
-                hasFlushBeenTriggered.set(false);
-
-                // Block the flushing thread
-                try {
-                    Thread.sleep(1000);
-                } catch (InterruptedException e) {
-                    Thread.currentThread().interrupt();
-                    return;
-                }
-            } finally {
-                flushMutex.unlock();
-            }
+        protected SingleDirectoryDbLedgerStorage newSingleDirectoryDbLedgerStorage(ServerConfiguration conf,
+                LedgerManager ledgerManager, LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager,
+                StateManager stateManager, CheckpointSource checkpointSource, Checkpointer checkpointer,
+                StatsLogger statsLogger, ScheduledExecutorService gcExecutor, long writeCacheSize, long readCacheSize)
+                throws IOException {
+            return new MockedSingleDirectoryDbLedgerStorage(conf, ledgerManager, ledgerDirsManager, indexDirsManager,
+                    stateManager, checkpointSource, checkpointer, statsLogger, gcExecutor, writeCacheSize,
+                    readCacheSize);
         }
 
+        private static class MockedSingleDirectoryDbLedgerStorage extends SingleDirectoryDbLedgerStorage {
+            public MockedSingleDirectoryDbLedgerStorage(ServerConfiguration conf, LedgerManager ledgerManager,
+                    LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager, StateManager stateManager,
+                    CheckpointSource checkpointSource, Checkpointer checkpointer, StatsLogger statsLogger,
+                    ScheduledExecutorService gcExecutor, long writeCacheSize, long readCacheSize) throws IOException {
+                super(conf, ledgerManager, ledgerDirsManager, indexDirsManager, stateManager, checkpointSource,
+                        checkpointer, statsLogger, gcExecutor, writeCacheSize, readCacheSize);
+            }
+
+          @Override
+          public void flush() throws IOException {
+              flushMutex.lock();
+              try {
+                  // Swap the write caches and block indefinitely to simulate a slow disk
+                  WriteCache tmp = writeCacheBeingFlushed;
+                  writeCacheBeingFlushed = writeCache;
+                  writeCache = tmp;
+
+                  // since the cache is switched, we can allow flush to be triggered
+                  hasFlushBeenTriggered.set(false);
+
+                  // Block the flushing thread
+                  try {
+                      Thread.sleep(1000);
+                  } catch (InterruptedException e) {
+                      Thread.currentThread().interrupt();
+                      return;
+                  }
+              } finally {
+                  flushMutex.unlock();
+              }
+          }
+        }
     }
 
     @Before

-- 
To stop receiving notification emails like this one, please contact
sijie@apache.org.