You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by sj...@apache.org on 2018/12/11 20:43:30 UTC

[bookkeeper] branch master updated: ISSUE #1770: Add local checker for Sorted/InterleavedLedgerStorage

This is an automated email from the ASF dual-hosted git repository.

sjust pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 9a0da5c  ISSUE #1770: Add local checker for Sorted/InterleavedLedgerStorage
9a0da5c is described below

commit 9a0da5c5a667704afbb120c029cf1bb309ac985f
Author: Samuel Just <sj...@salesforce.com>
AuthorDate: Tue Dec 11 12:43:25 2018 -0800

    ISSUE #1770: Add local checker for Sorted/InterleavedLedgerStorage
    
    The main goal of this patch is the ScrubberService LifecycleComponent
    which runs in the background periodically verifying the internal
    consistency of the LedgerStorage. To get that to work:
    
    - LedgerStorage now has a localConsistencyCheck call with
    implementations in Interleaved and Sorted.
    
    - In service of that implementation, LedgerCache gains an interface for
    iterating safely over the entries of a ledger with a way of handling
    concurrently modified or deleted ledgers with corresponding
    modifications to LedgerEntryPage for detecting deletion.
    
    - EntryLogger has been refactored to permit checking the correctness
    (and throwing a descriptive exception in case of a problem) of an entry
    without actually reading it for use within localConsistencyCheck.
    
    - The two mechanisms for iterating over a ledger's entries in
    BookieShell have both been replaced with the new single implementation
    (InterleavedLedgerStorageTest now has a test checking the output of the
    affected command.)
    
    - Misc changes to *LogCompactor to support tests in
    InterleavedLedgerStorageTest.
    
    Because the consistency check needs to run in the background and hold
    LEP instances potentially for a relatively long time, a delete may
    overlap with the scan of an LEP page. As part of this patch,
    IndexInMemPageMgr and LedgerEntryPage now permit an LEP to be marked
    deleted and not added back to the set of free pages until released.
    
    This patch also adds an option to run the checker on startup (defaults
    to false).
    
    (bug W-5188823)
    (bug W-5153309)
    (rev cguttapalem)
    Signed-off-by: Samuel Just <sjustsalesforce.com>
    
    Reviewers: Enrico Olivelli <eo...@gmail.com>, Sijie Guo <si...@apache.org>
    
    This closes #1819 from athanatos/forupstream/wip-1770, closes #1770
---
 .../bookkeeper/bookie/AbstractLogCompactor.java    |  13 +-
 .../bookkeeper/bookie/BookKeeperServerStats.java   |   4 +
 .../java/org/apache/bookkeeper/bookie/Bookie.java  |  88 +++++-
 .../org/apache/bookkeeper/bookie/BookieShell.java  | 314 ++++++++-------------
 .../bookkeeper/bookie/BufferedChannelBase.java     |   8 +-
 .../bookkeeper/bookie/BufferedReadChannel.java     |   2 +-
 .../bookkeeper/bookie/EntryLogCompactor.java       |  15 +-
 .../org/apache/bookkeeper/bookie/EntryLogger.java  | 165 ++++++++---
 .../bookkeeper/bookie/GarbageCollectorThread.java  |  17 +-
 .../bookkeeper/bookie/IndexInMemPageMgr.java       | 117 ++++++--
 .../bookkeeper/bookie/IndexPersistenceMgr.java     |  20 +-
 .../bookie/InterleavedLedgerStorage.java           | 109 +++++++
 .../InterleavedStorageRegenerateIndexOp.java       |   8 +
 .../org/apache/bookkeeper/bookie/LedgerCache.java  |  54 +++-
 .../apache/bookkeeper/bookie/LedgerCacheImpl.java  |  10 +
 .../apache/bookkeeper/bookie/LedgerEntryPage.java  |  44 ++-
 .../apache/bookkeeper/bookie/LedgerStorage.java    |  43 +++
 .../bookkeeper/bookie/ReadOnlyEntryLogger.java     |   4 +-
 ...ReadOnlyEntryLogger.java => ScrubberStats.java} |  31 +-
 .../bookkeeper/bookie/SortedLedgerStorage.java     |   8 +
 .../bookie/TransactionalEntryLogCompactor.java     |  17 +-
 .../bookie/storage/ldb/DbLedgerStorage.java        |   6 +-
 .../ldb/SingleDirectoryDbLedgerStorage.java        |  33 ++-
 .../bookkeeper/conf/ServerConfiguration.java       |  57 ++++
 .../java/org/apache/bookkeeper/server/Main.java    |   9 +
 .../bookkeeper/server/service/ScrubberService.java | 145 ++++++++++
 .../apache/bookkeeper/bookie/CompactionTest.java   |  11 +-
 .../bookie/InterleavedLedgerStorageTest.java       | 303 ++++++++++++++++++++
 28 files changed, 1314 insertions(+), 341 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/AbstractLogCompactor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/AbstractLogCompactor.java
index 8f190a3..57ec897 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/AbstractLogCompactor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/AbstractLogCompactor.java
@@ -32,12 +32,17 @@ public abstract class AbstractLogCompactor {
 
     protected final ServerConfiguration conf;
     protected final Throttler throttler;
-    protected final GarbageCollectorThread gcThread;
 
-    public AbstractLogCompactor(GarbageCollectorThread gcThread) {
-        this.gcThread = gcThread;
-        this.conf = gcThread.conf;
+    interface LogRemovalListener {
+        void removeEntryLog(long logToRemove);
+    }
+
+    protected final LogRemovalListener logRemovalListener;
+
+    public AbstractLogCompactor(ServerConfiguration conf, LogRemovalListener logRemovalListener) {
+        this.conf = conf;
         this.throttler = new Throttler(conf);
+        this.logRemovalListener = logRemovalListener;
     }
 
     /**
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
index 736d341..69d0eda 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
@@ -108,6 +108,10 @@ public interface BookKeeperServerStats {
     // Ledger Storage Stats
     String STORAGE_GET_OFFSET = "STORAGE_GET_OFFSET";
     String STORAGE_GET_ENTRY = "STORAGE_GET_ENTRY";
+
+    // Ledger Storage Scrub Stats
+    String STORAGE_SCRUB_PAGES_SCANNED = "STORAGE_SCRUB_PAGES_SCANNED";
+
     // Ledger Cache Stats
     String LEDGER_CACHE_READ_PAGE = "LEDGER_CACHE_READ_PAGE";
     // SkipList Stats
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
index ffb92ed..fd352d8 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -47,6 +47,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -602,6 +603,66 @@ public class Bookie extends BookieCriticalThread {
         this(conf, NullStatsLogger.INSTANCE);
     }
 
+    private static LedgerStorage buildLedgerStorage(ServerConfiguration conf) throws IOException {
+        // Instantiate the ledger storage implementation
+        String ledgerStorageClass = conf.getLedgerStorageClass();
+        LOG.info("Using ledger storage: {}", ledgerStorageClass);
+        return LedgerStorageFactory.createLedgerStorage(ledgerStorageClass);
+    }
+
+    /**
+     * Initialize LedgerStorage instance without checkpointing for use within the shell
+     * and other RO users.  ledgerStorage must not have already been initialized.
+     *
+     * <p>The caller is responsible for disposing of the ledgerStorage object.
+     *
+     * @param conf Bookie config.
+     * @param ledgerStorage Instance to initialize.
+     * @return Passed ledgerStorage instance
+     * @throws IOException
+     */
+    static LedgerStorage mountLedgerStorageOffline(
+            ServerConfiguration conf,
+            LedgerStorage ledgerStorage) throws IOException {
+        StatsLogger statsLogger = NullStatsLogger.INSTANCE;
+        DiskChecker diskChecker = new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold());
+
+        LedgerDirsManager ledgerDirsManager = createLedgerDirsManager(
+                conf, diskChecker, statsLogger.scope(LD_LEDGER_SCOPE));
+        LedgerDirsManager indexDirsManager = createIndexDirsManager(
+                conf, diskChecker, statsLogger.scope(LD_INDEX_SCOPE), ledgerDirsManager);
+
+        if (null == ledgerStorage) {
+            ledgerStorage = buildLedgerStorage(conf);
+        }
+
+        CheckpointSource checkpointSource = new CheckpointSource() {
+            @Override
+            public Checkpoint newCheckpoint() {
+                return Checkpoint.MAX;
+            }
+
+            @Override
+            public void checkpointComplete(Checkpoint checkpoint, boolean compact)
+                    throws IOException {
+            }
+        };
+
+        Checkpointer checkpointer = Checkpointer.NULL;
+
+        ledgerStorage.initialize(
+                conf,
+                null,
+                ledgerDirsManager,
+                indexDirsManager,
+                null,
+                checkpointSource,
+                checkpointer,
+                statsLogger);
+
+        return ledgerStorage;
+    }
+
     public Bookie(ServerConfiguration conf, StatsLogger statsLogger)
             throws IOException, InterruptedException, BookieException {
         super("Bookie-" + conf.getBookiePort());
@@ -677,10 +738,7 @@ public class Bookie extends BookieCriticalThread {
         this.entryLogPerLedgerEnabled = conf.isEntryLogPerLedgerEnabled();
         CheckpointSource checkpointSource = new CheckpointSourceList(journals);
 
-        // Instantiate the ledger storage implementation
-        String ledgerStorageClass = conf.getLedgerStorageClass();
-        LOG.info("Using ledger storage: {}", ledgerStorageClass);
-        ledgerStorage = LedgerStorageFactory.createLedgerStorage(ledgerStorageClass);
+        ledgerStorage = buildLedgerStorage(conf);
 
         boolean isDbLedgerStorage = ledgerStorage instanceof DbLedgerStorage;
 
@@ -871,7 +929,29 @@ public class Bookie extends BookieCriticalThread {
         } catch (ExecutionException e) {
             LOG.error("Error on executing a fully flush after replaying journals.");
             shutdown(ExitCode.BOOKIE_EXCEPTION);
+            return;
+        }
+
+        if (conf.isLocalConsistencyCheckOnStartup()) {
+            LOG.info("Running local consistency check on startup prior to accepting IO.");
+            List<LedgerStorage.DetectedInconsistency> errors = null;
+            try {
+                errors = ledgerStorage.localConsistencyCheck(Optional.empty());
+            } catch (IOException e) {
+                LOG.error("Got a fatal exception while checking store", e);
+                shutdown(ExitCode.BOOKIE_EXCEPTION);
+                return;
+            }
+            if (errors != null && errors.size() > 0) {
+                LOG.error("Bookie failed local consistency check:");
+                for (LedgerStorage.DetectedInconsistency error : errors) {
+                    LOG.error("Ledger {}, entry {}: ", error.getLedgerId(), error.getEntryId(), error.getException());
+                }
+                shutdown(ExitCode.BOOKIE_EXCEPTION);
+                return;
+            }
         }
+
         LOG.info("Finished reading journal, starting bookie");
 
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
index 9c6180b..083a661 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
@@ -26,7 +26,6 @@ import static org.apache.bookkeeper.tools.cli.helpers.CommandHelpers.getBookieSo
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.UncheckedExecutionException;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufUtil;
@@ -35,7 +34,6 @@ import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import java.io.File;
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.Serializable;
 import java.math.RoundingMode;
@@ -144,6 +142,7 @@ import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.mutable.MutableBoolean;
+import org.apache.commons.lang.mutable.MutableLong;
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.AsyncCallback.VoidCallback;
@@ -205,6 +204,7 @@ public class BookieShell implements Tool {
     static final String CMD_GENERATE_COOKIE = "cookie_generate";
 
     static final String CMD_HELP = "help";
+    static final String CMD_LOCALCONSISTENCYCHECK = "localconsistencycheck";
 
     final ServerConfiguration bkConf = new ServerConfiguration();
     File[] indexDirectories;
@@ -239,6 +239,14 @@ public class BookieShell implements Tool {
         void printUsage();
     }
 
+    void printInfoLine(String s) {
+        System.out.println(s);
+    }
+
+    void printErrorLine(String s) {
+        System.err.println(s);
+    }
+
     abstract class MyCommand implements Command {
         abstract Options getOptions();
 
@@ -717,7 +725,7 @@ public class BookieShell implements Tool {
         public int runCmd(CommandLine cmdLine) throws Exception {
             String[] leftArgs = cmdLine.getArgs();
             if (leftArgs.length <= 0) {
-                System.err.println("ERROR: missing ledger id");
+                printErrorLine("ERROR: missing ledger id");
                 printUsage();
                 return -1;
             }
@@ -730,7 +738,7 @@ public class BookieShell implements Tool {
             try {
                 ledgerId = ledgerIdFormatter.readLedgerId(leftArgs[0]);
             } catch (IllegalArgumentException iae) {
-                System.err.println("ERROR: invalid ledger id " + leftArgs[0]);
+                printErrorLine("ERROR: invalid ledger id " + leftArgs[0]);
                 printUsage();
                 return -1;
             }
@@ -739,19 +747,69 @@ public class BookieShell implements Tool {
                 // dump ledger info
                 try {
                     DbLedgerStorage.readLedgerIndexEntries(ledgerId, bkConf,
-                            (currentEntry, entryLogId, position) -> System.out.println(
+                            (currentEntry, entryLogId, position) -> printInfoLine(
                                     "entry " + currentEntry + "\t:\t(log: " + entryLogId + ", pos: " + position + ")"));
                 } catch (IOException e) {
                     System.err.printf("ERROR: initializing dbLedgerStorage %s", e.getMessage());
                     return -1;
                 }
-            } else {
+            } else if ((bkConf.getLedgerStorageClass().equals(SortedLedgerStorage.class.getName())
+                    || bkConf.getLedgerStorageClass().equals(InterleavedLedgerStorage.class.getName()))) {
+                ServerConfiguration conf = new ServerConfiguration(bkConf);
+                InterleavedLedgerStorage interleavedStorage = new InterleavedLedgerStorage();
+                Bookie.mountLedgerStorageOffline(conf, interleavedStorage);
+
                 if (printMeta) {
                     // print meta
-                    readLedgerMeta(ledgerId);
+                    printInfoLine("===== LEDGER: " + ledgerIdFormatter.formatLedgerId(ledgerId) + " =====");
+                    LedgerCache.LedgerIndexMetadata meta = interleavedStorage.readLedgerIndexMetadata(ledgerId);
+                    printInfoLine("master key  : " + meta.getMasterKeyHex());
+
+                    long size = meta.size;
+                    if (size % 8 == 0) {
+                        printInfoLine("size        : " + size);
+                    } else {
+                        printInfoLine("size : " + size
+                                + " (not aligned with 8, may be corrupted or under flushing now)");
+                    }
+
+                    printInfoLine("entries     : " + (size / 8));
+                    printInfoLine("isFenced    : " + meta.fenced);
+                }
+
+                try {
+                    // dump ledger info
+                    printInfoLine("===== LEDGER: " + ledgerIdFormatter.formatLedgerId(ledgerId) + " =====");
+                    for (LedgerCache.PageEntries page : interleavedStorage.getIndexEntries(ledgerId)) {
+                        final MutableLong curEntry = new MutableLong(page.getFirstEntry());
+                        try (LedgerEntryPage lep = page.getLEP()){
+                            lep.getEntries((entry, offset) -> {
+                                while (curEntry.longValue() < entry) {
+                                    printInfoLine("entry " + curEntry + "\t:\tN/A");
+                                    curEntry.increment();
+                                }
+                                long entryLogId = offset >> 32L;
+                                long pos = offset & 0xffffffffL;
+                                printInfoLine("entry " + curEntry + "\t:\t(log:" + entryLogId + ", pos: " + pos + ")");
+                                curEntry.increment();
+                                return true;
+                            });
+                        } catch (IOException ie) {
+                            printInfoLine("Failed to read index page @ " + page.getFirstEntry()
+                                    + ", the index file may be corrupted : "
+                                    + ie.getMessage());
+                            return 1;
+                        }
+
+                        while (curEntry.longValue() < page.getLastEntry()) {
+                            printInfoLine("entry " + curEntry + "\t:\tN/A");
+                            curEntry.increment();
+                        }
+                    }
+                } catch (IOException ie) {
+                    LOG.error("Failed to read index page");
+                    return 1;
                 }
-                // dump ledger info
-                readLedgerIndexEntries(ledgerId);
             }
 
             return 0;
@@ -1179,6 +1237,51 @@ public class BookieShell implements Tool {
     }
 
     /**
+     * Print the metadata for a ledger.
+     */
+    class LocalConsistencyCheck extends MyCommand {
+        Options lOpts = new Options();
+
+        LocalConsistencyCheck() {
+            super(CMD_LOCALCONSISTENCYCHECK);
+        }
+
+        @Override
+        public int runCmd(CommandLine cmdLine) throws Exception {
+            LOG.info("=== Performing local consistency check ===");
+            ServerConfiguration conf = new ServerConfiguration(bkConf);
+            LedgerStorage ledgerStorage = Bookie.mountLedgerStorageOffline(conf, null);
+            List <LedgerStorage.DetectedInconsistency> errors = ledgerStorage.localConsistencyCheck(
+                    java.util.Optional.empty());
+            if (errors.size() > 0) {
+                LOG.info("=== Check returned errors: ===");
+                for (LedgerStorage.DetectedInconsistency error : errors) {
+                    LOG.error("Ledger {}, entry {}: ", error.getLedgerId(), error.getEntryId(), error.getException());
+                }
+                return 1;
+            } else {
+                LOG.info("=== Check passed ===");
+                return 0;
+            }
+        }
+
+        @Override
+        String getDescription() {
+            return "Validate Ledger Storage internal metadata";
+        }
+
+        @Override
+        String getUsage() {
+            return "localconsistencycheck";
+        }
+
+        @Override
+        Options getOptions() {
+            return lOpts;
+        }
+    }
+
+    /**
      * Simple test to create a ledger and write to it.
      */
     class SimpleTestCmd extends MyCommand {
@@ -2626,41 +2729,12 @@ public class BookieShell implements Tool {
         int runCmd(CommandLine cmdLine) throws Exception {
             LOG.info("=== Converting to DbLedgerStorage ===");
             ServerConfiguration conf = new ServerConfiguration(bkConf);
-            LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(bkConf, bkConf.getLedgerDirs(),
-                    new DiskChecker(bkConf.getDiskUsageThreshold(), bkConf.getDiskUsageWarnThreshold()));
-            LedgerDirsManager ledgerIndexManager = new LedgerDirsManager(bkConf, bkConf.getLedgerDirs(),
-                    new DiskChecker(bkConf.getDiskUsageThreshold(), bkConf.getDiskUsageWarnThreshold()));
 
             InterleavedLedgerStorage interleavedStorage = new InterleavedLedgerStorage();
-            DbLedgerStorage dbStorage = new DbLedgerStorage();
+            Bookie.mountLedgerStorageOffline(conf, interleavedStorage);
 
-            CheckpointSource checkpointSource = new CheckpointSource() {
-                    @Override
-                    public Checkpoint newCheckpoint() {
-                        return Checkpoint.MAX;
-                    }
-
-                    @Override
-                    public void checkpointComplete(Checkpoint checkpoint, boolean compact)
-                            throws IOException {
-                    }
-                };
-            Checkpointer checkpointer = new Checkpointer() {
-                @Override
-                public void startCheckpoint(Checkpoint checkpoint) {
-                    // No-op
-                }
-
-                @Override
-                public void start() {
-                    // no-op
-                }
-            };
-
-            interleavedStorage.initialize(conf, null, ledgerDirsManager, ledgerIndexManager,
-                    null, checkpointSource, checkpointer, NullStatsLogger.INSTANCE);
-            dbStorage.initialize(conf, null, ledgerDirsManager, ledgerIndexManager, null,
-                    checkpointSource, checkpointer, NullStatsLogger.INSTANCE);
+            DbLedgerStorage dbStorage = new DbLedgerStorage();
+            Bookie.mountLedgerStorageOffline(conf, dbStorage);
 
             int convertedLedgers = 0;
             for (long ledgerId : interleavedStorage.getActiveLedgersInRange(0, Long.MAX_VALUE)) {
@@ -2668,13 +2742,13 @@ public class BookieShell implements Tool {
                     LOG.debug("Converting ledger {}", ledgerIdFormatter.formatLedgerId(ledgerId));
                 }
 
-                FileInfo fi = getFileInfo(ledgerId);
+                LedgerCache.LedgerIndexMetadata fi = interleavedStorage.readLedgerIndexMetadata(ledgerId);
 
-                Iterable<SortedMap<Long, Long>> entries = getLedgerIndexEntries(ledgerId);
+                LedgerCache.PageEntriesIterable pages = interleavedStorage.getIndexEntries(ledgerId);
 
-                long numberOfEntries = dbStorage.addLedgerToIndex(ledgerId, fi.isFenced(), fi.getMasterKey(), entries);
+                long numberOfEntries = dbStorage.addLedgerToIndex(ledgerId, fi.fenced, fi.masterKey, pages);
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug("   -- done. fenced={} entries={}", fi.isFenced(), numberOfEntries);
+                    LOG.debug("   -- done. fenced={} entries={}", fi.fenced, numberOfEntries);
                 }
 
                 // Remove index from old storage
@@ -2921,6 +2995,7 @@ public class BookieShell implements Tool {
         commands.put(CMD_WHOISAUDITOR, new WhoIsAuditorCmd());
         commands.put(CMD_WHATISINSTANCEID, new WhatIsInstanceId());
         commands.put(CMD_LEDGERMETADATA, new LedgerMetadataCmd());
+        commands.put(CMD_LOCALCONSISTENCYCHECK, new LocalConsistencyCheck());
         commands.put(CMD_SIMPLETEST, new SimpleTestCmd());
         commands.put(CMD_BOOKIESANITYTEST, new BookieSanityTestCmd());
         commands.put(CMD_READLOG, new ReadLogCmd());
@@ -3123,23 +3198,6 @@ public class BookieShell implements Tool {
         return lf;
     }
 
-    /**
-     * Get FileInfo for a specified ledger.
-     *
-     * @param ledgerId Ledger Id
-     * @return read only file info instance
-     */
-    ReadOnlyFileInfo getFileInfo(long ledgerId) throws IOException {
-        File ledgerFile = getLedgerFile(ledgerId);
-        if (null == ledgerFile) {
-            throw new FileNotFoundException("No index file found for ledger " + ledgerId
-                    + ". It may be not flushed yet.");
-        }
-        ReadOnlyFileInfo fi = new ReadOnlyFileInfo(ledgerFile, null);
-        fi.readHeader();
-        return fi;
-    }
-
     private synchronized void initEntryLogger() throws IOException {
         if (null == entryLogger) {
             // provide read only entry logger
@@ -3185,77 +3243,6 @@ public class BookieShell implements Tool {
     /// Bookie Shell Commands
     ///
 
-    /**
-     * Read ledger meta.
-     *
-     * @param ledgerId Ledger Id
-     */
-    protected void readLedgerMeta(long ledgerId) throws Exception {
-        System.out.println("===== LEDGER: " + ledgerIdFormatter.formatLedgerId(ledgerId) + " =====");
-        FileInfo fi = getFileInfo(ledgerId);
-        byte[] masterKey = fi.getMasterKey();
-        if (null == masterKey) {
-            System.out.println("master key  : NULL");
-        } else {
-            System.out.println("master key  : " + bytes2Hex(fi.getMasterKey()));
-        }
-        long size = fi.size();
-        if (size % 8 == 0) {
-            System.out.println("size        : " + size);
-        } else {
-            System.out.println("size : " + size + " (not aligned with 8, may be corrupted or under flushing now)");
-        }
-        System.out.println("entries     : " + (size / 8));
-        System.out.println("isFenced    : " + fi.isFenced());
-    }
-
-    /**
-     * Read ledger index entries.
-     *
-     * @param ledgerId Ledger Id
-     * @throws IOException
-     */
-    protected void readLedgerIndexEntries(long ledgerId) throws IOException {
-        System.out.println("===== LEDGER: " + ledgerIdFormatter.formatLedgerId(ledgerId) + " =====");
-        FileInfo fi = getFileInfo(ledgerId);
-        long size = fi.size();
-        System.out.println("size        : " + size);
-        long curSize = 0;
-        long curEntry = 0;
-        LedgerEntryPage lep = new LedgerEntryPage(pageSize, entriesPerPage);
-        lep.usePage();
-        try {
-            while (curSize < size) {
-                lep.setLedgerAndFirstEntry(ledgerId, curEntry);
-                lep.readPage(fi);
-
-                // process a page
-                for (int i = 0; i < entriesPerPage; i++) {
-                    long offset = lep.getOffset(i * 8);
-                    if (0 == offset) {
-                        System.out.println("entry " + curEntry + "\t:\tN/A");
-                    } else {
-                        long entryLogId = offset >> 32L;
-                        long pos = offset & 0xffffffffL;
-                        System.out.println("entry " + curEntry + "\t:\t(log:" + entryLogId + ", pos: " + pos + ")");
-                    }
-                    ++curEntry;
-                }
-
-                curSize += pageSize;
-            }
-        } catch (IOException ie) {
-            LOG.error("Failed to read index page : ", ie);
-            if (curSize + pageSize < size) {
-                System.out.println("Failed to read index page @ " + curSize + ", the index file may be corrupted : "
-                        + ie.getMessage());
-            } else {
-                System.out.println("Failed to read last index page @ " + curSize + ", the index file may be corrupted "
-                        + "or last index page is not fully flushed yet : " + ie.getMessage());
-            }
-        }
-    }
-
     protected void printEntryLogMetadata(long logId) throws IOException {
         LOG.info("Print entryLogMetadata of entrylog {} ({}.log)", logId, Long.toHexString(logId));
         initEntryLogger();
@@ -3267,67 +3254,6 @@ public class BookieShell implements Tool {
     }
 
     /**
-     * Get an iterable over pages of entries and locations for a ledger.
-     *
-     * @param ledgerId
-     * @return
-     * @throws IOException
-     */
-    protected Iterable<SortedMap<Long, Long>> getLedgerIndexEntries(final long ledgerId) throws IOException {
-        final FileInfo fi = getFileInfo(ledgerId);
-        final long size = fi.size();
-
-        final LedgerEntryPage lep = new LedgerEntryPage(pageSize, entriesPerPage);
-        lep.usePage();
-
-        final Iterator<SortedMap<Long, Long>> iterator = new Iterator<SortedMap<Long, Long>>() {
-            long curSize = 0;
-            long curEntry = 0;
-
-            @Override
-            public boolean hasNext() {
-                return curSize < size;
-            }
-
-            @Override
-            public SortedMap<Long, Long> next() {
-                SortedMap<Long, Long> entries = Maps.newTreeMap();
-                lep.setLedgerAndFirstEntry(ledgerId, curEntry);
-                try {
-                    lep.readPage(fi);
-                } catch (IOException e) {
-                    throw new RuntimeException(e);
-                }
-
-                // process a page
-                for (int i = 0; i < entriesPerPage; i++) {
-                    long offset = lep.getOffset(i * 8);
-                    if (offset != 0) {
-                        entries.put(curEntry, offset);
-                    }
-                    ++curEntry;
-                }
-
-                curSize += pageSize;
-                return entries;
-            }
-
-            @Override
-            public void remove() {
-                throw new RuntimeException("Cannot remove");
-            }
-
-        };
-
-        return new Iterable<SortedMap<Long, Long>>() {
-            @Override
-            public Iterator<SortedMap<Long, Long>> iterator() {
-                return iterator;
-            }
-        };
-    }
-
-    /**
      * Scan over an entry log file.
      *
      * @param logId
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannelBase.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannelBase.java
index cfaee56..c982ba0 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannelBase.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannelBase.java
@@ -25,6 +25,12 @@ import java.nio.channels.FileChannel;
  * to buffer the input and output data. This class is a base class for wrapping the {@link FileChannel}.
  */
 public abstract class BufferedChannelBase {
+    static class BufferedChannelClosedException extends IOException {
+        BufferedChannelClosedException() {
+            super("Attempting to access a file channel that has already been closed");
+        }
+    }
+
     protected final FileChannel fileChannel;
 
     protected BufferedChannelBase(FileChannel fc) {
@@ -36,7 +42,7 @@ public abstract class BufferedChannelBase {
         // guarantee that once a log file has been closed and possibly deleted during garbage
         // collection, attempts will not be made to read from it
         if (!fileChannel.isOpen()) {
-            throw new IOException("Attempting to access a file channel that has already been closed");
+            throw new BufferedChannelClosedException();
         }
         return fileChannel;
     }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedReadChannel.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedReadChannel.java
index 96dea6f..04e58f0 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedReadChannel.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedReadChannel.java
@@ -45,7 +45,7 @@ public class BufferedReadChannel extends BufferedChannelBase  {
     long invocationCount = 0;
     long cacheHitCount = 0;
 
-    public BufferedReadChannel(FileChannel fileChannel, int readCapacity) throws IOException {
+    public BufferedReadChannel(FileChannel fileChannel, int readCapacity) {
         super(fileChannel);
         this.readCapacity = readCapacity;
         this.readBuffer = Unpooled.buffer(readCapacity);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogCompactor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogCompactor.java
index a5e2c3f..98f4960 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogCompactor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogCompactor.java
@@ -27,6 +27,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,11 +44,15 @@ public class EntryLogCompactor extends AbstractLogCompactor {
     final CompactableLedgerStorage ledgerStorage;
     private final int maxOutstandingRequests;
 
-    public EntryLogCompactor(GarbageCollectorThread gcThread) {
-        super(gcThread);
+    public EntryLogCompactor(
+            ServerConfiguration conf,
+            EntryLogger entryLogger,
+            CompactableLedgerStorage ledgerStorage,
+            LogRemovalListener logRemover) {
+        super(conf, logRemover);
         this.maxOutstandingRequests = conf.getCompactionMaxOutstandingRequests();
-        this.entryLogger = gcThread.getEntryLogger();
-        this.ledgerStorage = gcThread.getLedgerStorage();
+        this.entryLogger = entryLogger;
+        this.ledgerStorage = ledgerStorage;
     }
 
     @Override
@@ -57,7 +62,7 @@ public class EntryLogCompactor extends AbstractLogCompactor {
                 scannerFactory.newScanner(entryLogMeta));
             scannerFactory.flush();
             LOG.info("Removing entry log {} after compaction", entryLogMeta.getEntryLogId());
-            gcThread.removeEntryLog(entryLogMeta.getEntryLogId());
+            logRemovalListener.removeEntryLog(entryLogMeta.getEntryLogId());
         } catch (LedgerDirsManager.NoWritableLedgerDirException nwlde) {
             LOG.warn("No writable ledger directory available, aborting compaction", nwlde);
             return false;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
index ddf255a..f5913b3 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
@@ -43,6 +43,7 @@ import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
+import java.nio.channels.AsynchronousCloseException;
 import java.nio.channels.FileChannel;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -58,6 +59,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.util.DiskChecker;
 import org.apache.bookkeeper.util.IOUtils;
 import org.apache.bookkeeper.util.collections.ConcurrentLongLongHashMap;
 import org.apache.bookkeeper.util.collections.ConcurrentLongLongHashMap.BiConsumerLong;
@@ -323,6 +325,11 @@ public class EntryLogger {
         void onRotateEntryLog();
     }
 
+    public EntryLogger(ServerConfiguration conf) throws IOException {
+        this(conf, new LedgerDirsManager(conf, conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold())));
+    }
+
     /**
      * Create an EntryLogger that stores it's log files in the given directories.
      */
@@ -616,7 +623,8 @@ public class EntryLogger {
     private final FastThreadLocal<ByteBuf> sizeBuffer = new FastThreadLocal<ByteBuf>() {
         @Override
         protected ByteBuf initialValue() throws Exception {
-            return Unpooled.buffer(4);
+            // Max usage is size (4 bytes) + ledgerId (8 bytes) + entryid (8 bytes)
+            return Unpooled.buffer(4 + 8 + 8);
         }
     };
 
@@ -693,10 +701,85 @@ public class EntryLogger {
 
 
 
-    public ByteBuf internalReadEntry(long ledgerId, long entryId, long location)
-            throws IOException, Bookie.NoEntryException {
-        long entryLogId = logIdForOffset(location);
-        long pos = location & 0xffffffffL;
+    /**
+     * Exception type for representing lookup errors.  Useful for disambiguating different error
+     * conditions for reporting purposes.
+     */
+    static class EntryLookupException extends Exception {
+        EntryLookupException(String message) {
+            super(message);
+        }
+
+        /**
+         * Represents case where log file is missing.
+         */
+        static class MissingLogFileException extends EntryLookupException {
+            MissingLogFileException(long ledgerId, long entryId, long entryLogId, long pos) {
+                super(String.format("Missing entryLog %d for ledgerId %d, entry %d at offset %d",
+                        entryLogId,
+                        ledgerId,
+                        entryId,
+                        pos));
+            }
+        }
+
+        /**
+         * Represents case where entry log is present, but does not contain the specified entry.
+         */
+        static class MissingEntryException extends EntryLookupException {
+            MissingEntryException(long ledgerId, long entryId, long entryLogId, long pos) {
+                super(String.format("pos %d (entry %d for ledgerId %d) past end of entryLog %d",
+                        pos,
+                        entryId,
+                        ledgerId,
+                        entryLogId));
+            }
+        }
+
+        /**
+         * Represents case where log is present, but encoded entry length header is invalid.
+         */
+        static class InvalidEntryLengthException extends EntryLookupException {
+            InvalidEntryLengthException(long ledgerId, long entryId, long entryLogId, long pos) {
+                super(String.format("Invalid entry length at pos %d (entry %d for ledgerId %d) for entryLog %d",
+                        pos,
+                        entryId,
+                        ledgerId,
+                        entryLogId));
+            }
+        }
+
+        /**
+         * Represents case where the entry at pos is wrong.
+         */
+        static class WrongEntryException extends EntryLookupException {
+            WrongEntryException(long foundEntryId, long foundLedgerId, long ledgerId,
+                                long entryId, long entryLogId, long pos) {
+                super(String.format(
+                        "Found entry %d, ledger %d at pos %d entryLog %d, should have found entry %d for ledgerId %d",
+                        foundEntryId,
+                        foundLedgerId,
+                        pos,
+                        entryLogId,
+                        entryId,
+                        ledgerId));
+            }
+        }
+    }
+
+    private static class EntryLogEntry {
+        final int entrySize;
+        final BufferedReadChannel fc;
+
+        EntryLogEntry(int entrySize, BufferedReadChannel fc) {
+            this.entrySize = entrySize;
+            this.fc = fc;
+        }
+    }
+
+    private EntryLogEntry getFCForEntryInternal(
+            long ledgerId, long entryId, long entryLogId, long pos)
+            throws EntryLookupException, IOException {
         ByteBuf sizeBuff = sizeBuffer.get();
         sizeBuff.clear();
         pos -= 4; // we want to get the ledgerId and length to check
@@ -704,15 +787,15 @@ public class EntryLogger {
         try {
             fc = getChannelForLogId(entryLogId);
         } catch (FileNotFoundException e) {
-            FileNotFoundException newe = new FileNotFoundException(e.getMessage() + " for " + ledgerId
-                    + " with location " + location);
-            newe.setStackTrace(e.getStackTrace());
-            throw newe;
+            throw new EntryLookupException.MissingLogFileException(ledgerId, entryId, entryLogId, pos);
         }
 
-        if (readFromLogChannel(entryLogId, fc, sizeBuff, pos) != sizeBuff.capacity()) {
-            throw new Bookie.NoEntryException("Short read from entrylog " + entryLogId,
-                                              ledgerId, entryId);
+        try {
+            if (readFromLogChannel(entryLogId, fc, sizeBuff, pos) != sizeBuff.capacity()) {
+                throw new EntryLookupException.MissingEntryException(ledgerId, entryId, entryLogId, pos);
+            }
+        } catch (BufferedChannelBase.BufferedChannelClosedException | AsynchronousCloseException e) {
+            throw new EntryLookupException.MissingLogFileException(ledgerId, entryId, entryLogId, pos);
         }
         pos += 4;
         int entrySize = sizeBuff.readInt();
@@ -724,12 +807,42 @@ public class EntryLogger {
         }
         if (entrySize < MIN_SANE_ENTRY_SIZE) {
             LOG.error("Read invalid entry length {}", entrySize);
-            throw new IOException("Invalid entry length " + entrySize);
+            throw new EntryLookupException.InvalidEntryLengthException(ledgerId, entryId, entryLogId, pos);
+        }
+
+        long thisLedgerId = sizeBuff.getLong(4);
+        long thisEntryId = sizeBuff.getLong(12);
+        if (thisLedgerId != ledgerId || thisEntryId != entryId) {
+            throw new EntryLookupException.WrongEntryException(
+                    thisEntryId, thisLedgerId, ledgerId, entryId, entryLogId, pos);
         }
+        return new EntryLogEntry(entrySize, fc);
+    }
+
+    void checkEntry(long ledgerId, long entryId, long location) throws EntryLookupException, IOException {
+        long entryLogId = logIdForOffset(location);
+        long pos = location & 0xffffffffL;
+        getFCForEntryInternal(ledgerId, entryId, entryLogId, pos);
+    }
 
-        ByteBuf data = PooledByteBufAllocator.DEFAULT.directBuffer(entrySize, entrySize);
-        int rc = readFromLogChannel(entryLogId, fc, data, pos);
-        if (rc != entrySize) {
+    public ByteBuf internalReadEntry(long ledgerId, long entryId, long location)
+            throws IOException, Bookie.NoEntryException {
+        long entryLogId = logIdForOffset(location);
+        long pos = location & 0xffffffffL;
+
+        final EntryLogEntry entry;
+        try {
+            entry = getFCForEntryInternal(ledgerId, entryId, entryLogId, pos);
+        } catch (EntryLookupException.MissingEntryException entryLookupError) {
+            throw new Bookie.NoEntryException("Short read from entrylog " + entryLogId,
+                    ledgerId, entryId);
+        } catch (EntryLookupException e) {
+            throw new IOException(e.toString());
+        }
+
+        ByteBuf data = PooledByteBufAllocator.DEFAULT.directBuffer(entry.entrySize, entry.entrySize);
+        int rc = readFromLogChannel(entryLogId, entry.fc, data, pos);
+        if (rc != entry.entrySize) {
             // Note that throwing NoEntryException here instead of IOException is not
             // without risk. If all bookies in a quorum throw this same exception
             // the client will assume that it has reached the end of the ledger.
@@ -740,31 +853,15 @@ public class EntryLogger {
             data.release();
             throw new Bookie.NoEntryException("Short read for " + ledgerId + "@"
                                               + entryId + " in " + entryLogId + "@"
-                                              + pos + "(" + rc + "!=" + entrySize + ")", ledgerId, entryId);
+                                              + pos + "(" + rc + "!=" + entry.entrySize + ")", ledgerId, entryId);
         }
-        data.writerIndex(entrySize);
+        data.writerIndex(entry.entrySize);
 
         return data;
     }
 
     public ByteBuf readEntry(long ledgerId, long entryId, long location) throws IOException, Bookie.NoEntryException {
-        long entryLogId = logIdForOffset(location);
-        long pos = location & 0xffffffffL;
-
         ByteBuf data = internalReadEntry(ledgerId, entryId, location);
-        long thisLedgerId = data.getLong(0);
-        if (thisLedgerId != ledgerId) {
-            data.release();
-            throw new IOException("problem found in " + entryLogId + "@" + entryId + " at position + " + pos
-                    + " entry belongs to " + thisLedgerId + " not " + ledgerId);
-        }
-        long thisEntryId = data.getLong(8);
-        if (thisEntryId != entryId) {
-            data.release();
-            throw new IOException("problem found in " + entryLogId + "@" + entryId + " at position + " + pos
-                    + " entry is " + thisEntryId + " not " + entryId);
-        }
-
         return data;
     }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
index c4d9414..369883c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
@@ -149,7 +149,7 @@ public class GarbageCollectorThread extends SafeRunnable {
                                   LedgerManager ledgerManager,
                                   final CompactableLedgerStorage ledgerStorage,
                                   StatsLogger statsLogger,
-                                    ScheduledExecutorService gcExecutor)
+                                  ScheduledExecutorService gcExecutor)
         throws IOException {
         this.gcExecutor = gcExecutor;
         this.conf = conf;
@@ -210,10 +210,17 @@ public class GarbageCollectorThread extends SafeRunnable {
         majorCompactionThreshold = conf.getMajorCompactionThreshold();
         majorCompactionInterval = conf.getMajorCompactionInterval() * SECOND;
         isForceGCAllowWhenNoSpace = conf.getIsForceGCAllowWhenNoSpace();
+
+        AbstractLogCompactor.LogRemovalListener remover = new AbstractLogCompactor.LogRemovalListener() {
+            @Override
+            public void removeEntryLog(long logToRemove) {
+                GarbageCollectorThread.this.removeEntryLog(logToRemove);
+            }
+        };
         if (conf.getUseTransactionalCompaction()) {
-            this.compactor = new TransactionalEntryLogCompactor(this);
+            this.compactor = new TransactionalEntryLogCompactor(conf, entryLogger, ledgerStorage, remover);
         } else {
-            this.compactor = new EntryLogCompactor(this);
+            this.compactor = new EntryLogCompactor(conf, entryLogger, ledgerStorage, remover);
         }
 
         if (minorCompactionInterval > 0 && minorCompactionThreshold > 0) {
@@ -591,10 +598,6 @@ public class GarbageCollectorThread extends SafeRunnable {
         return entryLogMetaMap;
     }
 
-    EntryLogger getEntryLogger() {
-        return entryLogger;
-    }
-
     CompactableLedgerStorage getLedgerStorage() {
         return ledgerStorage;
     }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexInMemPageMgr.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexInMemPageMgr.java
index 0cf5cc9..b236294 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexInMemPageMgr.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexInMemPageMgr.java
@@ -20,6 +20,7 @@
  */
 package org.apache.bookkeeper.bookie;
 
+import static java.lang.Long.max;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.INDEX_INMEM_ILLEGAL_STATE_DELETE;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.INDEX_INMEM_ILLEGAL_STATE_RESET;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.LEDGER_CACHE_HIT;
@@ -153,22 +154,11 @@ class IndexInMemPageMgr {
             ConcurrentMap<Long, LedgerEntryPage> lPages = pages.remove(ledgerId);
             if (null != lPages) {
                 for (Map.Entry<Long, LedgerEntryPage> pageEntry: lPages.entrySet()) {
-                    long entryId = pageEntry.getKey();
-                    synchronized (lruCleanPageMap) {
-                        lruCleanPageMap.remove(new EntryKey(ledgerId, entryId));
-                    }
-
                     LedgerEntryPage lep = pageEntry.getValue();
-                    // Cannot imagine under what circumstances we would have a null entry here
-                    // Just being safe
-                    if (null != lep) {
-                        if (lep.inUse()) {
-                            illegalStateDeleteCounter.inc();
-                        }
-                        listOfFreePages.add(lep);
-                    }
+                    lep.usePage();
+                    lep.markDeleted();
+                    lep.releasePage();
                 }
-
             }
         }
 
@@ -318,7 +308,11 @@ class IndexInMemPageMgr {
 
         @Override
         public void onResetInUse(LedgerEntryPage lep) {
-            addToCleanPagesList(lep);
+            if (!lep.isDeleted()) {
+                addToCleanPagesList(lep);
+            } else {
+                addToListOfFreePages(lep);
+            }
         }
 
         @Override
@@ -397,23 +391,9 @@ class IndexInMemPageMgr {
     }
 
     /**
-     * @return entries per page used in ledger cache
-     */
-    public int getEntriesPerPage() {
-        return entriesPerPage;
-    }
-
-    /**
-     * @return page limitation in ledger cache
-     */
-    public int getPageLimit() {
-        return pageLimit;
-    }
-
-    /**
      * @return number of page used in ledger cache
      */
-    public int getNumUsedPages() {
+    private int getNumUsedPages() {
         return pageCount.get();
     }
 
@@ -427,7 +407,7 @@ class IndexInMemPageMgr {
      * @return ledger entry page
      * @throws IOException
      */
-    public LedgerEntryPage getLedgerEntryPage(long ledger,
+    LedgerEntryPage getLedgerEntryPage(long ledger,
                                               long pageEntry) throws IOException {
         LedgerEntryPage lep = getLedgerEntryPageFromCache(ledger, pageEntry, false);
         if (lep == null) {
@@ -616,4 +596,79 @@ class IndexInMemPageMgr {
             }
         }
     }
+
+    /**
+     * Represents a page of the index.
+     */
+    private class PageEntriesImpl implements LedgerCache.PageEntries {
+        final long ledgerId;
+        final long initEntry;
+
+        PageEntriesImpl(long ledgerId, long initEntry) {
+            this.ledgerId = ledgerId;
+            this.initEntry = initEntry;
+        }
+
+        public LedgerEntryPage getLEP() throws IOException {
+            return getLedgerEntryPage(ledgerId, initEntry);
+        }
+
+        public long getFirstEntry() {
+            return initEntry;
+        }
+
+        public long getLastEntry() {
+            return initEntry + entriesPerPage;
+        }
+    }
+
+    /**
+     * Iterable over index pages -- returns PageEntries rather than individual
+     * entries because getEntries() above needs to be able to throw an IOException.
+     */
+    private class PageEntriesIterableImpl implements LedgerCache.PageEntriesIterable {
+        final long ledgerId;
+        final FileInfoBackingCache.CachedFileInfo fi;
+        final long totalEntries;
+
+        long curEntry = 0;
+
+        PageEntriesIterableImpl(long ledgerId) throws IOException {
+            this.ledgerId = ledgerId;
+            this.fi = indexPersistenceManager.getFileInfo(ledgerId, null);
+            this.totalEntries = max(entriesPerPage * (fi.size() / pageSize), getLastEntryInMem(ledgerId));
+        }
+
+        @Override
+        public Iterator<LedgerCache.PageEntries> iterator() {
+            return new Iterator<LedgerCache.PageEntries>() {
+                @Override
+                public boolean hasNext() {
+                    return curEntry < totalEntries;
+                }
+
+                @Override
+                public LedgerCache.PageEntries next() {
+                    LedgerCache.PageEntries next = new PageEntriesImpl(ledgerId, curEntry);
+                    curEntry += entriesPerPage;
+                    return next;
+                }
+            };
+        }
+
+        @Override
+        public void close() {
+            fi.release();
+        }
+    }
+
+    /**
+     * Return iterator over pages for mapping entries to entry loggers.
+     * @param ledgerId
+     * @return Iterator over pages
+     * @throws IOException
+     */
+    public LedgerCache.PageEntriesIterable listEntries(long ledgerId) throws IOException {
+        return new PageEntriesIterableImpl(ledgerId);
+    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
index 11292d1..a622fea 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
@@ -240,7 +240,7 @@ public class IndexPersistenceMgr {
             if (ee.getCause() instanceof IOException) {
                 throw (IOException) ee.getCause();
             } else {
-                throw new IOException("Failed to load file info for ledger " + ledger, ee);
+                throw new LedgerCache.NoIndexForLedger("Failed to load file info for ledger " + ledger, ee);
             }
         } finally {
             pendingGetFileInfoCounter.dec();
@@ -715,4 +715,22 @@ public class IndexPersistenceMgr {
         return lastEntry;
     }
 
+    /**
+     * Read ledger meta.
+     * @param ledgerId Ledger Id
+     */
+    public LedgerCache.LedgerIndexMetadata readLedgerIndexMetadata(long ledgerId) throws IOException {
+        CachedFileInfo fi = null;
+        try {
+            fi = getFileInfo(ledgerId, null);
+            return new LedgerCache.LedgerIndexMetadata(
+                    fi.getMasterKey(),
+                    fi.size(),
+                    fi.isFenced());
+        } finally {
+            if (fi != null) {
+                fi.release();
+            }
+        }
+    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
index d7d4977..4a96ece 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
@@ -25,18 +25,24 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ENTRYLOGGER_SCOPE;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.STORAGE_GET_ENTRY;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.STORAGE_GET_OFFSET;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.STORAGE_SCRUB_PAGES_SCANNED;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.RateLimiter;
 import io.netty.buffer.ByteBuf;
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
+import java.util.Optional;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import lombok.Cleanup;
 import org.apache.bookkeeper.bookie.Bookie.NoLedgerException;
 import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
 import org.apache.bookkeeper.bookie.EntryLogger.EntryLogListener;
@@ -49,6 +55,8 @@ import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.util.MathUtils;
 import org.apache.bookkeeper.util.SnapshotMap;
+import org.apache.commons.lang.mutable.MutableBoolean;
+import org.apache.commons.lang.mutable.MutableLong;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -82,6 +90,7 @@ public class InterleavedLedgerStorage implements CompactableLedgerStorage, Entry
     // Expose Stats
     private OpStatsLogger getOffsetStats;
     private OpStatsLogger getEntryStats;
+    private OpStatsLogger pageScanStats;
 
     @VisibleForTesting
     public InterleavedLedgerStorage() {
@@ -131,6 +140,7 @@ public class InterleavedLedgerStorage implements CompactableLedgerStorage, Entry
         // Expose Stats
         getOffsetStats = statsLogger.getOpStatsLogger(STORAGE_GET_OFFSET);
         getEntryStats = statsLogger.getOpStatsLogger(STORAGE_GET_ENTRY);
+        pageScanStats = statsLogger.getOpStatsLogger(STORAGE_SCRUB_PAGES_SCANNED);
     }
 
     private LedgerDirsListener getLedgerDirsListener() {
@@ -466,4 +476,103 @@ public class InterleavedLedgerStorage implements CompactableLedgerStorage, Entry
         Checkpoint checkpoint = checkpointSource.newCheckpoint();
         checkpointer.startCheckpoint(checkpoint);
     }
+
+    /**
+     * Return iterable for index entries for ledgerId.
+     * @param ledgerId ledger to scan
+     * @return Iterator
+     */
+    public LedgerCache.PageEntriesIterable getIndexEntries(long ledgerId) throws IOException {
+        return ledgerCache.listEntries(ledgerId);
+    }
+
+    /**
+     * Read implementation metadata for index file.
+     * @param ledgerId
+     * @return Implementation metadata
+     * @throws IOException
+     */
+    public LedgerCache.LedgerIndexMetadata readLedgerIndexMetadata(long ledgerId) throws IOException {
+        return ledgerCache.readLedgerIndexMetadata(ledgerId);
+    }
+
+    @Override
+    public List<DetectedInconsistency> localConsistencyCheck(Optional<RateLimiter> rateLimiter) throws IOException {
+        long checkStart = MathUtils.nowInNano();
+        LOG.info("Starting localConsistencyCheck");
+        long checkedLedgers = 0;
+        long checkedPages = 0;
+        final MutableLong checkedEntries = new MutableLong(0);
+        final MutableLong pageRetries = new MutableLong(0);
+        NavigableMap<Long, Boolean> bkActiveLedgersSnapshot = activeLedgers.snapshot();
+        final List<DetectedInconsistency> errors = new ArrayList<>();
+        for (Long ledger : bkActiveLedgersSnapshot.keySet()) {
+            try (LedgerCache.PageEntriesIterable pages = ledgerCache.listEntries(ledger)) {
+                for (LedgerCache.PageEntries page : pages) {
+                    @Cleanup LedgerEntryPage lep = page.getLEP();
+                    MutableBoolean retry = new MutableBoolean(false);
+                    do {
+                        int version = lep.getVersion();
+
+                        MutableBoolean success = new MutableBoolean(true);
+                        long start = MathUtils.nowInNano();
+                        lep.getEntries((entry, offset) -> {
+                            rateLimiter.ifPresent(RateLimiter::acquire);
+
+                            try {
+                                entryLogger.checkEntry(ledger, entry, offset);
+                                checkedEntries.increment();
+                            } catch (EntryLogger.EntryLookupException e) {
+                                if (version != lep.getVersion()) {
+                                    pageRetries.increment();
+                                    if (lep.isDeleted()) {
+                                        LOG.debug("localConsistencyCheck: ledger {} deleted", ledger);
+                                    } else {
+                                        LOG.debug("localConsistencyCheck: concurrent modification, retrying");
+                                        retry.setValue(true);
+                                    }
+                                    return false;
+                                } else {
+                                    errors.add(new DetectedInconsistency(ledger, entry, e));
+                                    LOG.error("Got error: ", e);
+                                }
+                                success.setValue(false);
+                            }
+                            return true;
+                        });
+
+                        if (success.booleanValue()) {
+                            pageScanStats.registerSuccessfulEvent(
+                                    MathUtils.elapsedNanos(start), TimeUnit.NANOSECONDS);
+                        } else {
+                            pageScanStats.registerFailedEvent(
+                                    MathUtils.elapsedNanos(start), TimeUnit.NANOSECONDS);
+                        }
+                    } while (retry.booleanValue());
+                    checkedPages++;
+                }
+            } catch (NoLedgerException | FileInfo.FileInfoDeletedException e) {
+                if (activeLedgers.containsKey(ledger)) {
+                    LOG.error("Cannot find ledger {}, should exist, exception is ", ledger, e);
+                    errors.add(new DetectedInconsistency(ledger, -1, e));
+                } else {
+                    LOG.debug("ledger {} deleted since snapshot taken", ledger);
+                }
+            } catch (Exception e) {
+                throw new IOException("Got other exception in localConsistencyCheck", e);
+            }
+            checkedLedgers++;
+        }
+        LOG.info(
+                "Finished localConsistencyCheck, took {}s to scan {} ledgers, {} pages, "
+                        + "{} entries with {} retries, {} errors",
+                TimeUnit.NANOSECONDS.toSeconds(MathUtils.elapsedNanos(checkStart)),
+                checkedLedgers,
+                checkedPages,
+                checkedEntries.longValue(),
+                pageRetries.longValue(),
+                errors.size());
+
+        return errors;
+    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedStorageRegenerateIndexOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedStorageRegenerateIndexOp.java
index de48eaf..f5e7c17 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedStorageRegenerateIndexOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedStorageRegenerateIndexOp.java
@@ -226,5 +226,13 @@ public class InterleavedStorageRegenerateIndexOp {
         public ByteBuf getExplicitLac(long ledgerId) {
             throw new UnsupportedOperationException();
         }
+        @Override
+        public PageEntriesIterable listEntries(long ledgerId) throws IOException {
+            throw new UnsupportedOperationException();
+        }
+        @Override
+        public LedgerIndexMetadata readLedgerIndexMetadata(long ledgerId) throws IOException {
+            throw new UnsupportedOperationException();
+        }
     }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java
index 14d4825..86984b7 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java
@@ -21,6 +21,8 @@
 
 package org.apache.bookkeeper.bookie;
 
+import static org.apache.bookkeeper.bookie.BookieShell.bytes2Hex;
+
 import io.netty.buffer.ByteBuf;
 import java.io.Closeable;
 import java.io.IOException;
@@ -31,7 +33,7 @@ import org.apache.bookkeeper.common.util.Watcher;
  * an entry log file. It does user level caching to more efficiently manage disk
  * head scheduling.
  */
-interface LedgerCache extends Closeable {
+public interface LedgerCache extends Closeable {
 
     boolean setFenced(long ledgerId) throws IOException;
     boolean isFenced(long ledgerId) throws IOException;
@@ -56,4 +58,54 @@ interface LedgerCache extends Closeable {
 
     void setExplicitLac(long ledgerId, ByteBuf lac) throws IOException;
     ByteBuf getExplicitLac(long ledgerId);
+
+    /**
+     * Specific exception to encode the case where the index is not present.
+     */
+    class NoIndexForLedger extends IOException {
+        NoIndexForLedger(String reason, Exception cause) {
+            super(reason, cause);
+        }
+    }
+
+    /**
+     * Represents a page of the index.
+     */
+    interface PageEntries {
+        LedgerEntryPage getLEP() throws IOException;
+        long getFirstEntry();
+        long getLastEntry();
+    }
+
+    /**
+     * Iterable over index pages -- returns PageEntries rather than individual
+     * entries because getEntries() above needs to be able to throw an IOException.
+     */
+    interface PageEntriesIterable extends AutoCloseable, Iterable<PageEntries> {}
+
+    PageEntriesIterable listEntries(long ledgerId) throws IOException;
+
+    /**
+     * Represents summary of ledger metadata.
+     */
+    class LedgerIndexMetadata {
+        public final byte[] masterKey;
+        public final long size;
+        public final boolean fenced;
+        LedgerIndexMetadata(byte[] masterKey, long size, boolean fenced) {
+            this.masterKey = masterKey;
+            this.size = size;
+            this.fenced = fenced;
+        }
+
+        public String getMasterKeyHex() {
+            if (null == masterKey) {
+                return "NULL";
+            } else {
+                return bytes2Hex(masterKey);
+            }
+        }
+    }
+
+    LedgerIndexMetadata readLedgerIndexMetadata(long ledgerId) throws IOException;
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
index 1db7d47..119a4f4 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
@@ -169,4 +169,14 @@ public class LedgerCacheImpl implements LedgerCache {
     public void close() throws IOException {
         indexPersistenceManager.close();
     }
+
+    @Override
+    public PageEntriesIterable listEntries(long ledgerId) throws IOException {
+        return indexPageManager.listEntries(ledgerId);
+    }
+
+    @Override
+    public LedgerIndexMetadata readLedgerIndexMetadata(long ledgerId) throws IOException {
+        return indexPersistenceManager.readLedgerIndexMetadata(ledgerId);
+    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerEntryPage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerEntryPage.java
index be87559..c272e7c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerEntryPage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerEntryPage.java
@@ -33,7 +33,7 @@ import org.slf4j.LoggerFactory;
  * This is a page in the LedgerCache. It holds the locations
  * (entrylogfile, offset) for entry ids.
  */
-public class LedgerEntryPage {
+public class LedgerEntryPage implements AutoCloseable {
 
     private static final Logger LOG = LoggerFactory.getLogger(LedgerEntryPage.class);
 
@@ -47,6 +47,7 @@ public class LedgerEntryPage {
     private final AtomicInteger version = new AtomicInteger(0);
     private volatile int last = -1; // Last update position
     private final LEPStateChangeCallback callback;
+    private boolean deleted;
 
     public static int getIndexEntrySize() {
         return indexEntrySize;
@@ -75,11 +76,20 @@ public class LedgerEntryPage {
         entryKey = new EntryKey(-1, BookieProtocol.INVALID_ENTRY_ID);
         clean = true;
         useCount.set(0);
+        deleted = false;
         if (null != this.callback) {
             callback.onResetInUse(this);
         }
     }
 
+    public void markDeleted() {
+        deleted = true;
+        version.incrementAndGet();
+    }
+
+    public boolean isDeleted() {
+        return deleted;
+    }
 
     @Override
     public String toString() {
@@ -215,7 +225,7 @@ public class LedgerEntryPage {
         return entryKey.getLedgerId();
     }
 
-    int getVersion() {
+    public int getVersion() {
         return version.get();
     }
 
@@ -262,4 +272,34 @@ public class LedgerEntryPage {
             return index >= 0 ? (index + entryKey.getEntryId()) : 0;
         }
     }
+
+    /**
+     * Interface for getEntries to propagate entry, pos pairs.
+     */
+    public interface EntryVisitor {
+        boolean visit(long entry, long pos) throws Exception;
+    }
+
+    /**
+     * Iterates over non-empty entry mappings.
+     *
+     * @param vis Consumer for entry position pairs.
+     * @throws Exception
+     */
+    public void getEntries(EntryVisitor vis) throws Exception {
+        // process a page
+        for (int i = 0; i < entriesPerPage; i++) {
+            long offset = getOffset(i * 8);
+            if (offset != 0) {
+                if (!vis.visit(getFirstEntry() + i, offset)) {
+                    return;
+                }
+            }
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        releasePage();
+    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
index ef64ff9..7a98fc7 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
@@ -21,8 +21,12 @@
 
 package org.apache.bookkeeper.bookie;
 
+import com.google.common.util.concurrent.RateLimiter;
 import io.netty.buffer.ByteBuf;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
 import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
 import org.apache.bookkeeper.common.util.Watcher;
 import org.apache.bookkeeper.conf.ServerConfiguration;
@@ -181,4 +185,43 @@ public interface LedgerStorage {
     default void forceGC() {
         return;
     }
+
+    /**
+     * Class for describing location of a generic inconsistency.  Implementations should
+     * ensure that detail is populated with an exception which adequately describes the
+     * nature of the problem.
+     */
+    class DetectedInconsistency {
+        private long ledgerId;
+        private long entryId;
+        private Exception detail;
+
+        DetectedInconsistency(long ledgerId, long entryId, Exception detail) {
+            this.ledgerId = ledgerId;
+            this.entryId = entryId;
+            this.detail = detail;
+        }
+
+        public long getLedgerId() {
+            return ledgerId;
+        }
+
+        public long getEntryId() {
+            return entryId;
+        }
+
+        public Exception getException() {
+            return detail;
+        }
+    }
+
+    /**
+     * Performs internal check of local storage logging any inconsistencies.
+     * @param rateLimiter Provide to rate of entry checking.  null for unlimited.
+     * @return List of inconsistencies detected
+     * @throws IOException
+     */
+    default List<DetectedInconsistency> localConsistencyCheck(Optional<RateLimiter> rateLimiter) throws IOException {
+        return new ArrayList<>();
+    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyEntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyEntryLogger.java
index 3a07ec4..2a683dc 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyEntryLogger.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyEntryLogger.java
@@ -25,7 +25,6 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.util.DiskChecker;
 
 /**
  * Read Only Entry Logger.
@@ -33,8 +32,7 @@ import org.apache.bookkeeper.util.DiskChecker;
 public class ReadOnlyEntryLogger extends EntryLogger {
 
     public ReadOnlyEntryLogger(ServerConfiguration conf) throws IOException {
-        super(conf, new LedgerDirsManager(conf, conf.getLedgerDirs(),
-                new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold())));
+        super(conf);
     }
 
     @Override
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyEntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ScrubberStats.java
similarity index 50%
copy from bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyEntryLogger.java
copy to bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ScrubberStats.java
index 3a07ec4..f22c784 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyEntryLogger.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ScrubberStats.java
@@ -1,4 +1,4 @@
-/*
+/**
  *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -21,30 +21,13 @@
 
 package org.apache.bookkeeper.bookie;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.util.DiskChecker;
-
 /**
- * Read Only Entry Logger.
+ * Stats associated with the consistency checker.
  */
-public class ReadOnlyEntryLogger extends EntryLogger {
-
-    public ReadOnlyEntryLogger(ServerConfiguration conf) throws IOException {
-        super(conf, new LedgerDirsManager(conf, conf.getLedgerDirs(),
-                new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold())));
-    }
-
-    @Override
-    protected boolean removeEntryLog(long entryLogId) {
-        // can't remove entry log in readonly mode
-        return false;
-    }
+public class ScrubberStats {
+    public static final String SCOPE = "scrubber";
 
-    @Override
-    public synchronized long addEntry(long ledgerId, ByteBuffer entry) throws IOException {
-        throw new IOException("Can't add entry to a readonly entry logger.");
-    }
+    public static final String RUN_DURATION = "runTime";
+    public static final String DETECTED_SCRUB_ERRORS = "detectedScrubErrors";
+    public static final String DETECTED_FATAL_SCRUB_ERRORS = "detectedFatalScrubErrors";
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
index dd7b373..dd07d75 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
@@ -21,9 +21,12 @@
 package org.apache.bookkeeper.bookie;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.RateLimiter;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import io.netty.buffer.ByteBuf;
 import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -332,4 +335,9 @@ public class SortedLedgerStorage
     public void forceGC() {
         interleavedLedgerStorage.forceGC();
     }
+
+    @Override
+    public List<DetectedInconsistency> localConsistencyCheck(Optional<RateLimiter> rateLimiter) throws IOException {
+        return interleavedLedgerStorage.localConsistencyCheck(rateLimiter);
+    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java
index 51a2cdd..74a30a6 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java
@@ -29,6 +29,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.bookkeeper.bookie.EntryLogger.EntryLogScanner;
+import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.util.HardLink;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -53,10 +54,14 @@ public class TransactionalEntryLogCompactor extends AbstractLogCompactor {
     // flushed compaction log file suffix
     static final String COMPACTED_SUFFIX = ".compacted";
 
-    public TransactionalEntryLogCompactor(GarbageCollectorThread gcThread) {
-        super(gcThread);
-        this.entryLogger = gcThread.getEntryLogger();
-        this.ledgerStorage = gcThread.getLedgerStorage();
+    public TransactionalEntryLogCompactor(
+            ServerConfiguration conf,
+            EntryLogger entryLogger,
+            CompactableLedgerStorage ledgerStorage,
+            LogRemovalListener logRemover) {
+        super(conf, logRemover);
+        this.entryLogger = entryLogger;
+        this.ledgerStorage = ledgerStorage;
     }
 
     /**
@@ -201,7 +206,7 @@ public class TransactionalEntryLogCompactor extends AbstractLogCompactor {
             if (offsets.isEmpty()) {
                 // no valid entries is compacted, delete entry log file
                 LOG.info("No valid entry is found in entry log after scan, removing entry log now.");
-                gcThread.removeEntryLog(metadata.getEntryLogId());
+                logRemovalListener.removeEntryLog(metadata.getEntryLogId());
                 entryLogger.removeCurCompactionLog();
                 return false;
             }
@@ -335,7 +340,7 @@ public class TransactionalEntryLogCompactor extends AbstractLogCompactor {
                 String compactedFilename = compactedLogFile.getName();
                 String oldEntryLogFilename = compactedFilename.substring(compactedFilename.indexOf(".log") + 5);
                 long entryLogId = EntryLogger.fileName2LogId(oldEntryLogFilename);
-                gcThread.removeEntryLog(entryLogId);
+                logRemovalListener.removeEntryLog(entryLogId);
             }
             return true;
         }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
index aad42ad..4c5fd33 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
@@ -36,7 +36,6 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.SortedMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 
@@ -47,6 +46,7 @@ import org.apache.bookkeeper.bookie.CheckpointSource;
 import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
 import org.apache.bookkeeper.bookie.Checkpointer;
 import org.apache.bookkeeper.bookie.LastAddConfirmedUpdateNotification;
+import org.apache.bookkeeper.bookie.LedgerCache;
 import org.apache.bookkeeper.bookie.LedgerDirsManager;
 import org.apache.bookkeeper.bookie.LedgerStorage;
 import org.apache.bookkeeper.bookie.StateManager;
@@ -275,8 +275,8 @@ public class DbLedgerStorage implements LedgerStorage {
     }
 
     public long addLedgerToIndex(long ledgerId, boolean isFenced, byte[] masterKey,
-            Iterable<SortedMap<Long, Long>> entries) throws Exception {
-        return getLedgerSorage(ledgerId).addLedgerToIndex(ledgerId, isFenced, masterKey, entries);
+                                 LedgerCache.PageEntriesIterable pages) throws Exception {
+        return getLedgerSorage(ledgerId).addLedgerToIndex(ledgerId, isFenced, masterKey, pages);
     }
 
     public long getLastEntryInLedger(long ledgerId) throws IOException {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
index 27b4214..2e3c556 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
@@ -30,14 +30,12 @@ import io.netty.buffer.ByteBuf;
 import io.netty.util.concurrent.DefaultThreadFactory;
 
 import java.io.IOException;
-import java.util.SortedMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.StampedLock;
 
@@ -53,7 +51,9 @@ import org.apache.bookkeeper.bookie.EntryLocation;
 import org.apache.bookkeeper.bookie.EntryLogger;
 import org.apache.bookkeeper.bookie.GarbageCollectorThread;
 import org.apache.bookkeeper.bookie.LastAddConfirmedUpdateNotification;
+import org.apache.bookkeeper.bookie.LedgerCache;
 import org.apache.bookkeeper.bookie.LedgerDirsManager;
+import org.apache.bookkeeper.bookie.LedgerEntryPage;
 import org.apache.bookkeeper.bookie.StateManager;
 import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorageDataFormats.LedgerData;
 import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorage.Batch;
@@ -67,6 +67,7 @@ import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.util.MathUtils;
 import org.apache.bookkeeper.util.collections.ConcurrentLongHashMap;
+import org.apache.commons.lang.mutable.MutableLong;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -859,35 +860,33 @@ public class SingleDirectoryDbLedgerStorage implements CompactableLedgerStorage
      *
      * @param ledgerId
      *            the ledger id
-     * @param entries
-     *            a map of entryId -> location
+     * @param pages
+     *            Iterator over index pages from Indexed
      * @return the number of
      */
     public long addLedgerToIndex(long ledgerId, boolean isFenced, byte[] masterKey,
-            Iterable<SortedMap<Long, Long>> entries) throws Exception {
+            LedgerCache.PageEntriesIterable pages) throws Exception {
         LedgerData ledgerData = LedgerData.newBuilder().setExists(true).setFenced(isFenced)
                 .setMasterKey(ByteString.copyFrom(masterKey)).build();
         ledgerIndex.set(ledgerId, ledgerData);
-        AtomicLong numberOfEntries = new AtomicLong();
+        MutableLong numberOfEntries = new MutableLong();
 
         // Iterate over all the entries pages
         Batch batch = entryLocationIndex.newBatch();
-        entries.forEach(map -> {
-            map.forEach((entryId, location) -> {
-                try {
+        for (LedgerCache.PageEntries page: pages) {
+            try (LedgerEntryPage lep = page.getLEP()) {
+                lep.getEntries((entryId, location) -> {
                     entryLocationIndex.addLocation(batch, ledgerId, entryId, location);
-                } catch (IOException e) {
-                    throw new RuntimeException(e);
-                }
-
-                numberOfEntries.incrementAndGet();
-            });
-        });
+                    numberOfEntries.increment();
+                    return true;
+                });
+            }
+        }
 
         batch.flush();
         batch.close();
 
-        return numberOfEntries.get();
+        return numberOfEntries.longValue();
     }
 
     @Override
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index afcfadc..2a77e91 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -55,6 +55,9 @@ public class ServerConfiguration extends AbstractConfiguration<ServerConfigurati
     protected static final String GC_OVERREPLICATED_LEDGER_WAIT_TIME = "gcOverreplicatedLedgerWaitTime";
     protected static final String USE_TRANSACTIONAL_COMPACTION = "useTransactionalCompaction";
     protected static final String VERIFY_METADATA_ON_GC = "verifyMetadataOnGC";
+    // Scrub Parameters
+    protected static final String LOCAL_SCRUB_PERIOD = "localScrubInterval";
+    protected static final String LOCAL_SCRUB_RATE_LIMIT = "localScrubRateLimit";
     // Sync Parameters
     protected static final String FLUSH_INTERVAL = "flushInterval";
     protected static final String FLUSH_ENTRYLOG_INTERVAL_BYTES = "flushEntrylogBytes";
@@ -226,6 +229,9 @@ public class ServerConfiguration extends AbstractConfiguration<ServerConfigurati
     protected static final String ENTRY_LOG_PER_LEDGER_COUNTER_LIMITS_MULT_FACTOR =
             "entryLogPerLedgerCounterLimitsMultFactor";
 
+    // Perform local consistency check on bookie startup
+    protected static final String LOCAL_CONSISTENCY_CHECK_ON_STARTUP = "localConsistencyCheckOnStartup";
+
     /**
      * Construct a default configuration object.
      */
@@ -375,6 +381,50 @@ public class ServerConfiguration extends AbstractConfiguration<ServerConfigurati
     }
 
     /**
+     * Get whether local scrub is enabled.
+     *
+     * @return Whether local scrub is enabled.
+     */
+    public boolean isLocalScrubEnabled() {
+        return this.getLocalScrubPeriod() > 0;
+    }
+
+    /**
+     * Get local scrub interval.
+     *
+     * @return Number of seconds between scrubs, <= 0 for disabled.
+     */
+    public long getLocalScrubPeriod() {
+        return this.getLong(LOCAL_SCRUB_PERIOD, 0);
+    }
+
+    /**
+     * Set local scrub period in seconds (<= 0 for disabled). Scrub will be scheduled at delays
+     * chosen from the interval (.5 * interval, 1.5 * interval)
+     */
+    public void setLocalScrubPeriod(long period) {
+        this.setProperty(LOCAL_SCRUB_PERIOD, period);
+    }
+
+    /**
+     * Get local scrub rate limit (entries/second).
+     *
+     * @return Max number of entries to scrub per second, 0 for disabled.
+     */
+    public double getLocalScrubRateLimit() {
+        return this.getDouble(LOCAL_SCRUB_RATE_LIMIT, 60);
+    }
+
+    /**
+     * Get local scrub rate limit (entries/second).
+     *
+     * @param scrubRateLimit Max number of entries per second to scan.
+     */
+    public void setLocalScrubRateLimit(double scrubRateLimit) {
+        this.setProperty(LOCAL_SCRUB_RATE_LIMIT, scrubRateLimit);
+    }
+
+    /**
      * Get flush interval. Default value is 10 second. It isn't useful to decrease
      * this value, since ledger storage only checkpoints when an entry logger file
      * is rolled.
@@ -3093,4 +3143,11 @@ public class ServerConfiguration extends AbstractConfiguration<ServerConfigurati
                 Integer.toString(entryLogPerLedgerCounterLimitsMultFactor));
         return this;
     }
+
+    /**
+     * True if a local consistency check should be performed on startup.
+     */
+    public boolean isLocalConsistencyCheckOnStartup() {
+        return this.getBoolean(LOCAL_CONSISTENCY_CHECK_ON_STARTUP, false);
+    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/Main.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/Main.java
index b991d31..ec163f8 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/Main.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/Main.java
@@ -28,6 +28,7 @@ import java.util.List;
 import java.util.concurrent.ExecutionException;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.bookie.ExitCode;
+import org.apache.bookkeeper.bookie.ScrubberStats;
 import org.apache.bookkeeper.common.component.ComponentStarter;
 import org.apache.bookkeeper.common.component.LifecycleComponent;
 import org.apache.bookkeeper.common.component.LifecycleComponentStack;
@@ -39,6 +40,7 @@ import org.apache.bookkeeper.server.http.BKHttpServiceProvider;
 import org.apache.bookkeeper.server.service.AutoRecoveryService;
 import org.apache.bookkeeper.server.service.BookieService;
 import org.apache.bookkeeper.server.service.HttpService;
+import org.apache.bookkeeper.server.service.ScrubberService;
 import org.apache.bookkeeper.server.service.StatsProviderService;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.commons.cli.BasicParser;
@@ -302,6 +304,13 @@ public class Main {
         serverBuilder.addComponent(bookieService);
         log.info("Load lifecycle component : {}", BookieService.class.getName());
 
+        if (conf.getServerConf().isLocalScrubEnabled()) {
+            serverBuilder.addComponent(
+                    new ScrubberService(
+                            rootStatsLogger.scope(ScrubberStats.SCOPE),
+                    conf, bookieService.getServer().getBookie().getLedgerStorage()));
+        }
+
         // 3. build auto recovery
         if (conf.getServerConf().isAutoRecoveryDaemonEnabled()) {
             AutoRecoveryService autoRecoveryService =
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/service/ScrubberService.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/service/ScrubberService.java
new file mode 100644
index 0000000..4c027a6
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/service/ScrubberService.java
@@ -0,0 +1,145 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.server.service;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.bookkeeper.bookie.ScrubberStats.DETECTED_FATAL_SCRUB_ERRORS;
+import static org.apache.bookkeeper.bookie.ScrubberStats.DETECTED_SCRUB_ERRORS;
+import static org.apache.bookkeeper.bookie.ScrubberStats.RUN_DURATION;
+
+import com.google.common.util.concurrent.RateLimiter;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.Random;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.bookkeeper.bookie.ExitCode;
+import org.apache.bookkeeper.bookie.LedgerStorage;
+import org.apache.bookkeeper.server.component.ServerLifecycleComponent;
+import org.apache.bookkeeper.server.conf.BookieConfiguration;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.util.MathUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link org.apache.bookkeeper.common.component.LifecycleComponent} that runs the scrubber background service.
+ */
+public class ScrubberService extends ServerLifecycleComponent {
+    private static final Logger LOG = LoggerFactory.getLogger(ScrubberService.class);
+
+    private static final String NAME = "scrubber";
+    private final ScheduledExecutorService executor;
+    private final Random rng = new Random();
+    private final long scrubPeriod;
+    private final Optional<RateLimiter> scrubRateLimiter;
+    private final AtomicBoolean stop = new AtomicBoolean(false);
+    private final LedgerStorage ledgerStorage;
+
+    private final OpStatsLogger scrubCounter;
+    private final Counter errorCounter;
+    private final Counter fatalErrorCounter;
+
+    public ScrubberService(
+            StatsLogger logger,
+            BookieConfiguration conf,
+            LedgerStorage ledgerStorage) {
+        super(NAME, conf, logger);
+        this.executor = Executors.newSingleThreadScheduledExecutor(
+                new DefaultThreadFactory("ScrubThread"));
+
+        this.scrubPeriod = conf.getServerConf().getLocalScrubPeriod();
+        checkArgument(
+                scrubPeriod > 0,
+                "localScrubInterval must be > 0 for ScrubberService to be used");
+
+        double rateLimit = conf.getServerConf().getLocalScrubRateLimit();
+        this.scrubRateLimiter = rateLimit == 0 ? Optional.empty() : Optional.of(RateLimiter.create(rateLimit));
+
+        this.ledgerStorage = ledgerStorage;
+
+        this.scrubCounter = logger.getOpStatsLogger(RUN_DURATION);
+        this.errorCounter = logger.getCounter(DETECTED_SCRUB_ERRORS);
+        this.fatalErrorCounter = logger.getCounter(DETECTED_FATAL_SCRUB_ERRORS);
+    }
+
+    private long getNextPeriodMS() {
+        return (long) (((double) scrubPeriod) * (1.5 - rng.nextDouble()) * 1000);
+    }
+
+    private void doSchedule() {
+        executor.schedule(
+                this::run,
+                getNextPeriodMS(),
+                TimeUnit.MILLISECONDS);
+
+    }
+
+    private void run() {
+        boolean success = false;
+        long start = MathUtils.nowInNano();
+        try {
+            List<LedgerStorage.DetectedInconsistency> errors = ledgerStorage.localConsistencyCheck(scrubRateLimiter);
+            if (errors.size() > 0) {
+                errorCounter.add(errors.size());
+                LOG.error("Found inconsistency during localConsistencyCheck:");
+                for (LedgerStorage.DetectedInconsistency error : errors) {
+                    LOG.error("Ledger {}, entry {}: ", error.getLedgerId(), error.getEntryId(), error.getException());
+                }
+            }
+            success = true;
+        } catch (IOException e) {
+            fatalErrorCounter.inc();
+            LOG.error("Got fatal exception {} running localConsistencyCheck", e.toString());
+        }
+        if (success) {
+            scrubCounter.registerSuccessfulEvent(MathUtils.elapsedNanos(start), TimeUnit.NANOSECONDS);
+        } else {
+            scrubCounter.registerFailedEvent(MathUtils.elapsedNanos(start), TimeUnit.NANOSECONDS);
+            Runtime.getRuntime().exit(ExitCode.BOOKIE_EXCEPTION);
+        }
+        if (!stop.get()) {
+            doSchedule();
+        }
+    }
+
+    @Override
+    protected void doStart() {
+        doSchedule();
+    }
+
+    @Override
+    protected void doStop() {
+        stop.set(true);
+        executor.shutdown();
+    }
+
+    @Override
+    protected void doClose() throws IOException {
+        // no-op
+    }
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
index c110eeb..bbe9c10 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
@@ -1383,7 +1383,12 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
     private static class MockTransactionalEntryLogCompactor extends TransactionalEntryLogCompactor {
 
         public MockTransactionalEntryLogCompactor(GarbageCollectorThread gcThread) {
-            super(gcThread);
+            super(gcThread.conf,
+                  gcThread.entryLogger,
+                  gcThread.ledgerStorage,
+                  (long entry) -> {
+                gcThread.removeEntryLog(entry);
+            });
         }
 
         synchronized void compactWithIndexFlushFailure(EntryLogMetadata metadata) {
@@ -1405,7 +1410,7 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
                 LOG.info("Compaction for {} end in PartialFlushIndexPhase.", metadata.getEntryLogId());
                 return;
             }
-            gcThread.removeEntryLog(metadata.getEntryLogId());
+            logRemovalListener.removeEntryLog(metadata.getEntryLogId());
             LOG.info("Compacted entry log : {}.", metadata.getEntryLogId());
         }
 
@@ -1428,7 +1433,7 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
                 LOG.info("Compaction for entry log {} end in UpdateIndexPhase.", metadata.getEntryLogId());
                 return;
             }
-            gcThread.removeEntryLog(metadata.getEntryLogId());
+            logRemovalListener.removeEntryLog(metadata.getEntryLogId());
             LOG.info("Compacted entry log : {}.", metadata.getEntryLogId());
         }
 
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorageTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorageTest.java
new file mode 100644
index 0000000..fd4f314
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorageTest.java
@@ -0,0 +1,303 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie;
+
+import static org.junit.Assert.assertEquals;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.conf.TestBKConfiguration;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.util.DiskChecker;
+import org.apache.bookkeeper.util.EntryFormatter;
+import org.apache.bookkeeper.util.LedgerIdFormatter;
+import org.apache.commons.lang.mutable.MutableLong;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test for InterleavedLedgerStorage.
+ */
+public class InterleavedLedgerStorageTest {
+
+    CheckpointSource checkpointSource = new CheckpointSource() {
+        @Override
+        public Checkpoint newCheckpoint() {
+            return Checkpoint.MAX;
+        }
+
+        @Override
+        public void checkpointComplete(Checkpoint checkpoint, boolean compact) throws IOException {
+        }
+    };
+
+    Checkpointer checkpointer = new Checkpointer() {
+        @Override
+        public void startCheckpoint(Checkpoint checkpoint) {
+            // No-op
+        }
+
+        @Override
+        public void start() {
+            // no-op
+        }
+    };
+
+    ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+    LedgerDirsManager ledgerDirsManager;
+    InterleavedLedgerStorage interleavedStorage = new InterleavedLedgerStorage();
+    final long numWrites = 2000;
+    final long entriesPerWrite = 2;
+
+    @Before
+    public void setUp() throws Exception {
+        File tmpDir = File.createTempFile("bkTest", ".dir");
+        tmpDir.delete();
+        tmpDir.mkdir();
+        File curDir = Bookie.getCurrentDirectory(tmpDir);
+        Bookie.checkDirectoryStructure(curDir);
+
+        conf = TestBKConfiguration.newServerConfiguration();
+        conf.setLedgerDirNames(new String[] { tmpDir.toString() });
+        conf.setEntryLogSizeLimit(2048);
+        ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
+
+        interleavedStorage.initialize(conf, null, ledgerDirsManager, ledgerDirsManager,
+                null, checkpointSource, checkpointer, NullStatsLogger.INSTANCE);
+
+        // Insert some ledger & entries in the interleaved storage
+        for (long entryId = 0; entryId < numWrites; entryId++) {
+            for (long ledgerId = 0; ledgerId < 5; ledgerId++) {
+                if (entryId == 0) {
+                    interleavedStorage.setMasterKey(ledgerId, ("ledger-" + ledgerId).getBytes());
+                    interleavedStorage.setFenced(ledgerId);
+                }
+                ByteBuf entry = Unpooled.buffer(128);
+                entry.writeLong(ledgerId);
+                entry.writeLong(entryId * entriesPerWrite);
+                entry.writeBytes(("entry-" + entryId).getBytes());
+
+                interleavedStorage.addEntry(entry);
+            }
+        }
+    }
+
+    @Test
+    public void testIndexEntryIterator() throws Exception {
+        try (LedgerCache.PageEntriesIterable pages = interleavedStorage.getIndexEntries(0)) {
+            MutableLong curEntry = new MutableLong(0);
+            for (LedgerCache.PageEntries page : pages) {
+                try (LedgerEntryPage lep = page.getLEP()) {
+                    lep.getEntries((entry, offset) -> {
+                        Assert.assertEquals(curEntry.longValue(), entry);
+                        Assert.assertNotEquals(0, offset);
+                        curEntry.setValue(entriesPerWrite + entry);
+                        return true;
+                    });
+                }
+            }
+            Assert.assertEquals(entriesPerWrite * numWrites, curEntry.longValue());
+        }
+    }
+
+    @Test
+    public void testConsistencyCheckConcurrentModification() throws Exception {
+        AtomicBoolean done = new AtomicBoolean(false);
+        EntryLogger entryLogger = interleavedStorage.getEntryLogger();
+        List<Exception> asyncErrors = new ArrayList<>();
+        Thread mutator = new Thread(() -> {
+            EntryLogCompactor compactor = new EntryLogCompactor(
+                    conf,
+                    entryLogger,
+                    interleavedStorage,
+                    entryLogger::removeEntryLog);
+            long next = 0;
+            while (!done.get()) {
+                try {
+                    compactor.compact(entryLogger.getEntryLogMetadata(next));
+                    next++;
+                } catch (IOException e) {
+                    asyncErrors.add(e);
+                    break;
+                }
+            }
+        });
+        mutator.start();
+
+        for (int i = 0; i < 100; ++i) {
+            assert interleavedStorage.localConsistencyCheck(Optional.empty()).size() == 0;
+            Thread.sleep(10);
+        }
+
+        done.set(true);
+        mutator.join();
+        for (Exception e: asyncErrors) {
+            throw e;
+        }
+    }
+
+
+    @Test
+    public void testConsistencyMissingEntry() throws Exception {
+        // set 1, 1 to nonsense
+        interleavedStorage.ledgerCache.putEntryOffset(1, 1, 0xFFFFFFFFFFFFFFFFL);
+
+        List<LedgerStorage.DetectedInconsistency> errors = interleavedStorage.localConsistencyCheck(Optional.empty());
+        Assert.assertEquals(1, errors.size());
+        LedgerStorage.DetectedInconsistency inconsistency = errors.remove(0);
+        Assert.assertEquals(1, inconsistency.getEntryId());
+        Assert.assertEquals(1, inconsistency.getLedgerId());
+    }
+
+    @Test
+    public void testWrongEntry() throws Exception {
+        // set 1, 1 to nonsense
+        interleavedStorage.ledgerCache.putEntryOffset(
+                1,
+                1,
+                interleavedStorage.ledgerCache.getEntryOffset(0, 0));
+
+        List<LedgerStorage.DetectedInconsistency> errors = interleavedStorage.localConsistencyCheck(Optional.empty());
+        Assert.assertEquals(1, errors.size());
+        LedgerStorage.DetectedInconsistency inconsistency = errors.remove(0);
+        Assert.assertEquals(1, inconsistency.getEntryId());
+        Assert.assertEquals(1, inconsistency.getLedgerId());
+    }
+
+    @Test
+    public void testShellCommands() throws Exception {
+        interleavedStorage.flush();
+        interleavedStorage.shutdown();
+        final Pattern entryPattern = Pattern.compile(
+                "entry (?<entry>\\d+)\t:\t((?<na>N/A)|\\(log:(?<logid>\\d+), pos: (?<pos>\\d+)\\))");
+
+        class Metadata {
+            final Pattern keyPattern = Pattern.compile("master key +: ([0-9a-f])");
+            final Pattern sizePattern = Pattern.compile("size +: (\\d+)");
+            final Pattern entriesPattern = Pattern.compile("entries +: (\\d+)");
+            final Pattern isFencedPattern = Pattern.compile("isFenced +: (\\w+)");
+
+            public String masterKey;
+            public long size = -1;
+            public long entries = -1;
+            public boolean foundFenced = false;
+
+            void check(String s) {
+                Matcher keyMatcher = keyPattern.matcher(s);
+                if (keyMatcher.matches()) {
+                    masterKey = keyMatcher.group(1);
+                    return;
+                }
+
+                Matcher sizeMatcher = sizePattern.matcher(s);
+                if (sizeMatcher.matches()) {
+                    size = Long.valueOf(sizeMatcher.group(1));
+                    return;
+                }
+
+                Matcher entriesMatcher = entriesPattern.matcher(s);
+                if (entriesMatcher.matches()) {
+                    entries = Long.valueOf(entriesMatcher.group(1));
+                    return;
+                }
+
+                Matcher isFencedMatcher = isFencedPattern.matcher(s);
+                if (isFencedMatcher.matches()) {
+                    Assert.assertEquals("true", isFencedMatcher.group(1));
+                    foundFenced = true;
+                    return;
+                }
+            }
+
+            void validate(long foundEntries) {
+                Assert.assertTrue(entries >= numWrites * entriesPerWrite);
+                Assert.assertEquals(entries, foundEntries);
+                Assert.assertTrue(foundFenced);
+                Assert.assertNotEquals(-1, size);
+            }
+        }
+        final Metadata foundMetadata = new Metadata();
+
+        AtomicLong curEntry = new AtomicLong(0);
+        AtomicLong someEntryLogger = new AtomicLong(-1);
+        BookieShell shell = new BookieShell(
+                LedgerIdFormatter.LONG_LEDGERID_FORMATTER, EntryFormatter.STRING_FORMATTER) {
+            @Override
+            void printInfoLine(String s) {
+                Matcher matcher = entryPattern.matcher(s);
+                System.out.println(s);
+                if (matcher.matches()) {
+                    assertEquals(Long.toString(curEntry.get()), matcher.group("entry"));
+
+                    if (matcher.group("na") == null) {
+                        String logId = matcher.group("logid");
+                        Assert.assertNotEquals(matcher.group("logid"), null);
+                        Assert.assertNotEquals(matcher.group("pos"), null);
+                        Assert.assertTrue((curEntry.get() % entriesPerWrite) == 0);
+                        Assert.assertTrue(curEntry.get() <= numWrites * entriesPerWrite);
+                        if (someEntryLogger.get() == -1) {
+                            someEntryLogger.set(Long.valueOf(logId));
+                        }
+                    } else {
+                        Assert.assertEquals(matcher.group("logid"), null);
+                        Assert.assertEquals(matcher.group("pos"), null);
+                        Assert.assertTrue(((curEntry.get() % entriesPerWrite) != 0)
+                                || ((curEntry.get() >= (entriesPerWrite * numWrites))));
+                    }
+                    curEntry.incrementAndGet();
+                } else {
+                    foundMetadata.check(s);
+                }
+            }
+        };
+        shell.setConf(conf);
+        int res = shell.run(new String[] { "ledger", "-m", "0" });
+        Assert.assertEquals(0, res);
+        Assert.assertTrue(curEntry.get() >= numWrites * entriesPerWrite);
+        foundMetadata.validate(curEntry.get());
+
+        // Should pass consistency checker
+        res = shell.run(new String[] { "localconsistencycheck" });
+        Assert.assertEquals(0, res);
+
+
+        // Remove a logger
+        EntryLogger entryLogger = new EntryLogger(conf);
+        entryLogger.removeEntryLog(someEntryLogger.get());
+
+        // Should fail consistency checker
+        res = shell.run(new String[] { "localconsistencycheck" });
+        Assert.assertEquals(1, res);
+    }
+}