You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by eo...@apache.org on 2022/04/29 05:44:20 UTC

[bookkeeper] branch master updated: BP-47 (task3): Abstract interface for entrylogger (#3197)

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f85e6d5fa BP-47 (task3): Abstract interface for entrylogger (#3197)
6f85e6d5fa is described below

commit 6f85e6d5fa735dfea035ba4761c6669fbf5c6880
Author: Hang Chen <ch...@apache.org>
AuthorDate: Fri Apr 29 13:44:14 2022 +0800

    BP-47 (task3): Abstract interface for entrylogger (#3197)
    
    * Interface for entrylogger
    
    Define the interface and contract for entrylogger. This is mostly
    taking the entrylogger methods used by other components and prettying
    them up a bit.
    
    Notable changes are:
    - internalReadEntry is now readEntry. There is no 'validate'
      flag. Instead there are two overloads for the method, and validation
      only runs if ledgerId and entryId are passed in.
    - shutdown has been renamed to close.
    - the compaction entrylog methods have been put behind an
      interface. As it was they were leaking implementation
      details. Ultimitely compaction itself should be hidden behind the
      entrylogger, but that's a larger refactor.
    
    (cherry picked from commit c927f4b0128e2b1f0f4b30c41ce442b366d5009c)
    
    * abstract interface for entrylogger
    
    * format code
    
    * format code
    
    * address comments
    
    * Apply suggestions from code review
    
    Co-authored-by: Andrey Yegorov <86...@users.noreply.github.com>
    
    * address comments
    
    * format code
    
    Co-authored-by: Ivan Kelly <ik...@splunk.com>
    Co-authored-by: Andrey Yegorov <86...@users.noreply.github.com>
---
 .../bookkeeper/bookie/AbstractLogCompactor.java    |   5 +-
 .../org/apache/bookkeeper/bookie/BookieShell.java  |   3 +-
 .../bookie/CompactableLedgerStorage.java           |   2 +-
 .../{EntryLogger.java => DefaultEntryLogger.java}  | 301 +++++++++++++++------
 .../bookkeeper/bookie/EntryLogCompactor.java       |   7 +-
 .../apache/bookkeeper/bookie/EntryLogManager.java  |   2 +-
 .../bookkeeper/bookie/EntryLogManagerBase.java     |  10 +-
 .../EntryLogManagerForEntryLogPerLedger.java       |  11 +-
 .../bookie/EntryLogManagerForSingleEntryLog.java   |  16 +-
 .../bookkeeper/bookie/EntryLoggerAllocator.java    |  16 +-
 .../bookkeeper/bookie/GarbageCollectorThread.java  |   6 +-
 .../bookie/InterleavedLedgerStorage.java           |  13 +-
 .../InterleavedStorageRegenerateIndexOp.java       |   4 +-
 ...Logger.java => ReadOnlyDefaultEntryLogger.java} |   6 +-
 .../bookkeeper/bookie/SortedLedgerStorage.java     |  10 +-
 .../bookie/TransactionalEntryLogCompactor.java     | 168 ++++--------
 .../bookie/storage/CompactionEntryLog.java         |  91 +++++++
 .../EntryLogScanner.java}                          |  47 ++--
 .../bookkeeper/bookie/storage/EntryLogger.java     | 149 ++++++++++
 .../bookie/storage/ldb/LedgersIndexRebuildOp.java  |   6 +-
 .../storage/ldb/LocationsIndexRebuildOp.java       |   6 +-
 .../ldb/SingleDirectoryDbLedgerStorage.java        |  13 +-
 .../package-info.java}                             |  30 +-
 .../commands/bookie/ListActiveLedgersCommand.java  |   6 +-
 .../tools/cli/commands/bookie/ReadLogCommand.java  |  15 +-
 .../commands/bookie/ReadLogMetadataCommand.java    |   6 +-
 .../apache/bookkeeper/bookie/CompactionTest.java   |  87 +++---
 .../apache/bookkeeper/bookie/CreateNewLogTest.java |  32 +--
 ...{EntryLogTest.java => DefaultEntryLogTest.java} | 159 +++++------
 .../bookie/GarbageCollectorThreadTest.java         |   2 +-
 .../bookie/InterleavedLedgerStorageTest.java       |  10 +-
 .../bookie/LedgerStorageCheckpointTest.java        |   8 +-
 .../bookie/SlowInterleavedLedgerStorage.java       |  26 +-
 .../bookie/SortedLedgerStorageCheckpointTest.java  |   6 +-
 .../bookie/storage/ldb/DbLedgerStorageTest.java    |   4 +-
 .../org/apache/bookkeeper/meta/GcLedgersTest.java  |   2 +-
 .../bookkeeper/meta/LedgerManagerTestCase.java     |   2 +-
 .../apache/bookkeeper/test/ZooKeeperCluster.java   |   2 +-
 .../cli/commands/bookie/ReadLogCommandTest.java    |   8 +-
 .../bookie/ReadLogMetadataCommandTest.java         |   6 +-
 40 files changed, 816 insertions(+), 487 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/AbstractLogCompactor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/AbstractLogCompactor.java
index 57ec8978cc..f5ae90b336 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/AbstractLogCompactor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/AbstractLogCompactor.java
@@ -57,7 +57,10 @@ public abstract class AbstractLogCompactor {
      */
     public void cleanUpAndRecover() {}
 
-    static class Throttler {
+    /**
+     * class Throttler.
+     */
+    public static class Throttler {
         private final RateLimiter rateLimiter;
         private final boolean isThrottleByBytes;
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
index edf9194f16..a7e15f67b6 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
@@ -38,6 +38,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
+import org.apache.bookkeeper.bookie.storage.EntryLogger;
 import org.apache.bookkeeper.client.LedgerEntry;
 import org.apache.bookkeeper.client.api.LedgerMetadata;
 import org.apache.bookkeeper.common.annotation.InterfaceAudience.Private;
@@ -2441,7 +2442,7 @@ public class BookieShell implements Tool {
     private synchronized void initEntryLogger() throws IOException {
         if (null == entryLogger) {
             // provide read only entry logger
-            entryLogger = new ReadOnlyEntryLogger(bkConf);
+            entryLogger = new ReadOnlyDefaultEntryLogger(bkConf);
         }
     }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/CompactableLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/CompactableLedgerStorage.java
index 9a5a0abbcb..832cb57d1a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/CompactableLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/CompactableLedgerStorage.java
@@ -22,7 +22,7 @@
 package org.apache.bookkeeper.bookie;
 
 import java.io.IOException;
-
+import org.apache.bookkeeper.bookie.storage.EntryLogger;
 /**
  * Interface that identifies LedgerStorage implementations using EntryLogger and running periodic entries compaction.
  */
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java
similarity index 82%
rename from bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
rename to bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java
index 859f3e588b..dc90811e83 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/DefaultEntryLogger.java
@@ -45,6 +45,7 @@ import java.nio.ByteBuffer;
 import java.nio.channels.AsynchronousCloseException;
 import java.nio.channels.FileChannel;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -54,11 +55,16 @@ import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.regex.Pattern;
 
+import org.apache.bookkeeper.bookie.storage.CompactionEntryLog;
+import org.apache.bookkeeper.bookie.storage.EntryLogScanner;
+import org.apache.bookkeeper.bookie.storage.EntryLogger;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.util.DiskChecker;
+import org.apache.bookkeeper.util.HardLink;
 import org.apache.bookkeeper.util.IOUtils;
 import org.apache.bookkeeper.util.collections.ConcurrentLongLongHashMap;
 import org.apache.bookkeeper.util.collections.ConcurrentLongLongHashMap.BiConsumerLong;
@@ -72,11 +78,8 @@ import org.slf4j.LoggerFactory;
  * the actual ledger entry. The entry log files created by this class are
  * identified by a long.
  */
-public class EntryLogger {
-    private static final Logger LOG = LoggerFactory.getLogger(EntryLogger.class);
-    static final long UNASSIGNED_LEDGERID = -1L;
-    // log file suffix
-    static final String LOG_FILE_SUFFIX = ".log";
+public class DefaultEntryLogger implements EntryLogger {
+    private static final Logger LOG = LoggerFactory.getLogger(DefaultEntryLogger.class);
 
     @VisibleForTesting
     static final int UNINITIALIZED_LOG_ID = -0xDEAD;
@@ -285,33 +288,6 @@ public class EntryLogger {
     private final ByteBufAllocator allocator;
 
     final ServerConfiguration conf;
-    /**
-     * Scan entries in a entry log file.
-     */
-    public interface EntryLogScanner {
-        /**
-         * Tests whether or not the entries belongs to the specified ledger
-         * should be processed.
-         *
-         * @param ledgerId
-         *          Ledger ID.
-         * @return true if and only the entries of the ledger should be scanned.
-         */
-        boolean accept(long ledgerId);
-
-        /**
-         * Process an entry.
-         *
-         * @param ledgerId
-         *          Ledger ID.
-         * @param offset
-         *          File offset of this entry.
-         * @param entry
-         *          Entry ByteBuf
-         * @throws IOException
-         */
-        void process(long ledgerId, long offset, ByteBuf entry) throws IOException;
-    }
 
     /**
      * Entry Log Listener.
@@ -323,7 +299,7 @@ public class EntryLogger {
         void onRotateEntryLog();
     }
 
-    public EntryLogger(ServerConfiguration conf) throws IOException {
+    public DefaultEntryLogger(ServerConfiguration conf) throws IOException {
         this(conf, new LedgerDirsManager(conf, conf.getLedgerDirs(),
                 new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold())));
     }
@@ -331,14 +307,14 @@ public class EntryLogger {
     /**
      * Create an EntryLogger that stores it's log files in the given directories.
      */
-    public EntryLogger(ServerConfiguration conf,
-            LedgerDirsManager ledgerDirsManager) throws IOException {
+    public DefaultEntryLogger(ServerConfiguration conf,
+                              LedgerDirsManager ledgerDirsManager) throws IOException {
         this(conf, ledgerDirsManager, null, NullStatsLogger.INSTANCE, PooledByteBufAllocator.DEFAULT);
     }
 
-    public EntryLogger(ServerConfiguration conf,
-            LedgerDirsManager ledgerDirsManager, EntryLogListener listener, StatsLogger statsLogger,
-            ByteBufAllocator allocator) throws IOException {
+    public DefaultEntryLogger(ServerConfiguration conf,
+                              LedgerDirsManager ledgerDirsManager, EntryLogListener listener, StatsLogger statsLogger,
+                              ByteBufAllocator allocator) throws IOException {
         //We reserve 500 bytes as overhead for the protocol.  This is not 100% accurate
         // but the protocol varies so an exact value is difficult to determine
         this.maxSaneEntrySize = conf.getNettyMaxFrameSizeBytes() - 500;
@@ -478,7 +454,8 @@ public class EntryLogger {
      *
      * @return least unflushed log id.
      */
-    long getLeastUnflushedLogId() {
+    @Override
+    public long getLeastUnflushedLogId() {
         return recentlyCreatedEntryLogsStatus.getLeastUnflushedLogId();
     }
 
@@ -489,7 +466,8 @@ public class EntryLogger {
      *
      * @return last entry log id created.
      */
-    long getLastLogId() {
+    @Override
+    public long getLastLogId() {
         return recentlyCreatedEntryLogsStatus.getLastLogId();
     }
 
@@ -499,7 +477,8 @@ public class EntryLogger {
      * @param entryLogId EntryLog id to check.
      * @return Whether the given entryLogId exists and has been rotated.
      */
-    boolean isFlushedEntryLog(Long entryLogId) {
+    @Override
+    public boolean isFlushedEntryLog(Long entryLogId) {
         return recentlyCreatedEntryLogsStatus.isFlushedEntryLog(entryLogId);
     }
 
@@ -510,7 +489,7 @@ public class EntryLogger {
     /**
      * Get the current log file for compaction.
      */
-    File getCurCompactionLogFile() {
+    private File getCurCompactionLogFile() {
         synchronized (compactionLogLock) {
             if (compactionLogChannel == null) {
                 return null;
@@ -544,7 +523,8 @@ public class EntryLogger {
      * @param entryLogId
      *          Entry Log File Id
      */
-    protected boolean removeEntryLog(long entryLogId) {
+    @Override
+    public boolean removeEntryLog(long entryLogId) {
         removeFromChannelsAndClose(entryLogId);
         File entryLogFile;
         try {
@@ -610,6 +590,7 @@ public class EntryLogger {
         entryLogManager.checkpoint();
     }
 
+    @Override
     public void flush() throws IOException {
         entryLogManager.flush();
     }
@@ -618,12 +599,13 @@ public class EntryLogger {
         return entryLogManager.addEntry(ledger, Unpooled.wrappedBuffer(entry), true);
     }
 
-    long addEntry(long ledger, ByteBuf entry) throws IOException {
-        return entryLogManager.addEntry(ledger, entry, true);
+    long addEntry(long ledger, ByteBuf entry, boolean rollLog) throws IOException {
+        return entryLogManager.addEntry(ledger, entry, rollLog);
     }
 
-    public long addEntry(long ledger, ByteBuf entry, boolean rollLog) throws IOException {
-        return entryLogManager.addEntry(ledger, entry, rollLog);
+    @Override
+    public long addEntry(long ledger, ByteBuf entry) throws IOException {
+        return entryLogManager.addEntry(ledger, entry, true);
     }
 
     private final FastThreadLocal<ByteBuf> sizeBuffer = new FastThreadLocal<ByteBuf>() {
@@ -634,7 +616,7 @@ public class EntryLogger {
         }
     };
 
-    long addEntryForCompaction(long ledgerId, ByteBuf entry) throws IOException {
+    private long addEntryForCompaction(long ledgerId, ByteBuf entry) throws IOException {
         synchronized (compactionLogLock) {
             int entrySize = entry.readableBytes() + 4;
             if (compactionLogChannel == null) {
@@ -653,7 +635,7 @@ public class EntryLogger {
         }
     }
 
-    void flushCompactionLog() throws IOException {
+    private void flushCompactionLog() throws IOException {
         synchronized (compactionLogLock) {
             if (compactionLogChannel != null) {
                 compactionLogChannel.appendLedgersMap();
@@ -671,7 +653,7 @@ public class EntryLogger {
         }
     }
 
-    void createNewCompactionLog() throws IOException {
+    private void createNewCompactionLog() throws IOException {
         synchronized (compactionLogLock) {
             if (compactionLogChannel == null) {
                 compactionLogChannel = entryLogManager.createNewLogForCompaction();
@@ -683,7 +665,7 @@ public class EntryLogger {
      * Remove the current compaction log, usually invoked when compaction failed and
      * we need to do some clean up to remove the compaction log file.
      */
-    void removeCurCompactionLog() {
+    private void removeCurCompactionLog() {
         synchronized (compactionLogLock) {
             if (compactionLogChannel != null) {
                 if (!compactionLogChannel.getLogFile().delete()) {
@@ -834,8 +816,20 @@ public class EntryLogger {
         }
     }
 
-    public ByteBuf internalReadEntry(long ledgerId, long entryId, long location, boolean validateEntry)
-            throws IOException {
+    @Override
+    public ByteBuf readEntry(long ledgerId, long entryId, long entryLocation)
+            throws IOException, Bookie.NoEntryException {
+        return internalReadEntry(ledgerId, entryId, entryLocation, true /* validateEntry */);
+    }
+
+    @Override
+    public ByteBuf readEntry(long location) throws IOException, Bookie.NoEntryException {
+        return internalReadEntry(location, -1L, -1L, false /* validateEntry */);
+    }
+
+
+    private ByteBuf internalReadEntry(long ledgerId, long entryId, long location, boolean validateEntry)
+            throws IOException, Bookie.NoEntryException {
         long entryLogId = logIdForOffset(location);
         long pos = posForOffset(location);
 
@@ -877,10 +871,6 @@ public class EntryLogger {
         return data;
     }
 
-    public ByteBuf readEntry(long ledgerId, long entryId, long location) throws IOException, Bookie.NoEntryException {
-        return internalReadEntry(ledgerId, entryId, location, true /* validateEntry */);
-    }
-
     /**
      * Read the header of an entry log.
      */
@@ -932,7 +922,8 @@ public class EntryLogger {
     /**
      * Whether the log file exists or not.
      */
-    boolean logExists(long logId) {
+    @Override
+    public boolean logExists(long logId) {
         for (File d : ledgerDirsManager.getAllLedgerDirs()) {
             File f = new File(d, Long.toHexString(logId) + ".log");
             if (f.exists()) {
@@ -988,6 +979,7 @@ public class EntryLogger {
      * @param scanner Entry Log Scanner
      * @throws IOException
      */
+    @Override
     public void scanEntryLog(long entryLogId, EntryLogScanner scanner) throws IOException {
         // Buffer where to read the entrySize (4 bytes) and the ledgerId (8 bytes)
         ByteBuf headerBuffer = Unpooled.buffer(4 + 8);
@@ -1054,19 +1046,6 @@ public class EntryLogger {
         }
     }
 
-    public EntryLogMetadata getEntryLogMetadata(long entryLogId) throws IOException {
-        // First try to extract the EntryLogMetadata from the index, if there's no index then fallback to scanning the
-        // entry log
-        try {
-            return extractEntryLogMetadataFromIndex(entryLogId);
-        } catch (Exception e) {
-            LOG.info("Failed to get ledgers map index from: {}.log : {}", entryLogId, e.getMessage());
-
-            // Fall-back to scanning
-            return extractEntryLogMetadataByScanning(entryLogId);
-        }
-    }
-
     public EntryLogMetadata getEntryLogMetadata(long entryLogId, AbstractLogCompactor.Throttler throttler)
         throws IOException {
         // First try to extract the EntryLogMetadata from the index, if there's no index then fallback to scanning the
@@ -1164,10 +1143,6 @@ public class EntryLogger {
         return meta;
     }
 
-    private EntryLogMetadata extractEntryLogMetadataByScanning(long entryLogId) throws IOException {
-        return extractEntryLogMetadataByScanning(entryLogId, null);
-    }
-
     private EntryLogMetadata extractEntryLogMetadataByScanning(long entryLogId,
                                                                AbstractLogCompactor.Throttler throttler)
         throws IOException {
@@ -1199,7 +1174,8 @@ public class EntryLogger {
     /**
      * Shutdown method to gracefully stop entry logger.
      */
-    public void shutdown() {
+    @Override
+    public void close() {
         // since logChannel is buffered channel, do flush when shutting down
         LOG.info("Stopping EntryLogger");
         try {
@@ -1303,4 +1279,177 @@ public class EntryLogger {
                     || entryLogId < leastUnflushedLogId;
         }
     }
+
+    @Override
+    public CompactionEntryLog newCompactionLog(long logToCompact) throws IOException {
+        createNewCompactionLog();
+
+        File compactingLogFile = getCurCompactionLogFile();
+        long compactionLogId = fileName2LogId(compactingLogFile.getName());
+        File compactedLogFile = compactedLogFileFromCompacting(compactingLogFile, logToCompact);
+        File finalLogFile = new File(compactingLogFile.getParentFile(),
+                                     compactingLogFile.getName().substring(0,
+                                             compactingLogFile.getName().indexOf(".log") + 4));
+        return new EntryLoggerCompactionEntryLog(
+                compactionLogId, logToCompact, compactingLogFile, compactedLogFile, finalLogFile);
+
+    }
+
+    private class EntryLoggerCompactionEntryLog implements CompactionEntryLog {
+        private final long compactionLogId;
+        private final long logIdToCompact;
+        private final File compactingLogFile;
+        private final File compactedLogFile;
+        private final File finalLogFile;
+
+        EntryLoggerCompactionEntryLog(long compactionLogId, long logIdToCompact,
+                                      File compactingLogFile,
+                                      File compactedLogFile,
+                                      File finalLogFile) {
+            this.compactionLogId = compactionLogId;
+            this.logIdToCompact = logIdToCompact;
+            this.compactingLogFile = compactingLogFile;
+            this.compactedLogFile = compactedLogFile;
+            this.finalLogFile = finalLogFile;
+        }
+
+        @Override
+        public long addEntry(long ledgerId, ByteBuf entry) throws IOException {
+            return addEntryForCompaction(ledgerId, entry);
+        }
+        @Override
+        public void scan(EntryLogScanner scanner) throws IOException {
+            scanEntryLog(compactionLogId, scanner);
+        }
+        @Override
+        public void flush() throws IOException {
+            flushCompactionLog();
+        }
+        @Override
+        public void abort() {
+            removeCurCompactionLog();
+            if (compactedLogFile.exists()) {
+                if (!compactedLogFile.delete()) {
+                    LOG.warn("Could not delete file: {}", compactedLogFile);
+                }
+            }
+        }
+
+        @Override
+        public void markCompacted() throws IOException {
+            if (compactingLogFile.exists()) {
+                if (!compactedLogFile.exists()) {
+                    HardLink.createHardLink(compactingLogFile, compactedLogFile);
+                }
+            } else {
+                throw new IOException("Compaction log doesn't exist any more after flush: " + compactingLogFile);
+            }
+            removeCurCompactionLog();
+        }
+
+        @Override
+        public void makeAvailable() throws IOException {
+            if (!finalLogFile.exists()) {
+                HardLink.createHardLink(compactedLogFile, finalLogFile);
+            }
+        }
+        @Override
+        public void cleanup() {
+            if (compactedLogFile.exists()) {
+                if (!compactedLogFile.delete()) {
+                    LOG.warn("Could not delete file: {}", compactedLogFile);
+                }
+            }
+            if (compactingLogFile.exists()) {
+                if (!compactingLogFile.delete()) {
+                    LOG.warn("Could not delete file: {}", compactingLogFile);
+                }
+            }
+        }
+
+        @Override
+        public long getLogId() {
+            return compactionLogId;
+        }
+        @Override
+        public long getCompactedLogId() {
+            return logIdToCompact;
+        }
+
+        @Override
+        public String toString() {
+            return MoreObjects.toStringHelper(this)
+                .add("logId", compactionLogId)
+                .add("compactedLogId", logIdToCompact)
+                .add("compactingLogFile", compactingLogFile)
+                .add("compactedLogFile", compactedLogFile)
+                .add("finalLogFile", finalLogFile)
+                .toString();
+        }
+    }
+
+    @Override
+    public Collection<CompactionEntryLog> incompleteCompactionLogs() {
+        List<File> ledgerDirs = ledgerDirsManager.getAllLedgerDirs();
+        List<CompactionEntryLog> compactionLogs = new ArrayList<>();
+
+        for (File dir : ledgerDirs) {
+            File[] compactingPhaseFiles = dir.listFiles(
+                    file -> file.getName().endsWith(TransactionalEntryLogCompactor.COMPACTING_SUFFIX));
+            if (compactingPhaseFiles != null) {
+                for (File file : compactingPhaseFiles) {
+                    if (file.delete()) {
+                        LOG.info("Deleted failed compaction file {}", file);
+                    }
+                }
+            }
+            File[] compactedPhaseFiles = dir.listFiles(
+                    file -> file.getName().endsWith(TransactionalEntryLogCompactor.COMPACTED_SUFFIX));
+            if (compactedPhaseFiles != null) {
+                for (File compactedFile : compactedPhaseFiles) {
+                    LOG.info("Found compacted log file {} has partially flushed index, recovering index.",
+                             compactedFile);
+
+                    File compactingLogFile = new File(compactedFile.getParentFile(), "doesntexist");
+                    long compactionLogId = -1L;
+                    long compactedLogId = -1L;
+                    String[] parts = compactedFile.getName().split(Pattern.quote("."));
+                    boolean valid = true;
+                    if (parts.length != 4) {
+                        valid = false;
+                    } else {
+                        try {
+                            compactionLogId = Long.parseLong(parts[0], 16);
+                            compactedLogId = Long.parseLong(parts[2], 16);
+                        } catch (NumberFormatException nfe) {
+                            valid = false;
+                        }
+                    }
+
+                    if (!valid) {
+                        LOG.info("Invalid compacted file found ({}), deleting", compactedFile);
+                        if (!compactedFile.delete()) {
+                            LOG.warn("Couldn't delete invalid compacted file ({})", compactedFile);
+                        }
+                        continue;
+                    }
+                    File finalLogFile = new File(compactedFile.getParentFile(), compactionLogId + ".log");
+
+                    compactionLogs.add(
+                            new EntryLoggerCompactionEntryLog(compactionLogId, compactedLogId,
+                                                              compactingLogFile, compactedFile, finalLogFile));
+                }
+            }
+        }
+        return compactionLogs;
+    }
+
+    private static File compactedLogFileFromCompacting(File compactionLogFile, long compactingLogId) {
+        File dir = compactionLogFile.getParentFile();
+        String filename = compactionLogFile.getName();
+        String newSuffix = ".log." + DefaultEntryLogger.logId2HexString(compactingLogId)
+            + TransactionalEntryLogCompactor.COMPACTED_SUFFIX;
+        String hardLinkFilename = filename.replace(TransactionalEntryLogCompactor.COMPACTING_SUFFIX, newSuffix);
+        return new File(dir, hardLinkFilename);
+    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogCompactor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogCompactor.java
index 98f4960549..743cc9392c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogCompactor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogCompactor.java
@@ -27,7 +27,10 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.bookkeeper.bookie.storage.EntryLogScanner;
+import org.apache.bookkeeper.bookie.storage.EntryLogger;
 import org.apache.bookkeeper.conf.ServerConfiguration;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -82,9 +85,9 @@ public class EntryLogCompactor extends AbstractLogCompactor {
     class CompactionScannerFactory {
         List<EntryLocation> offsets = new ArrayList<EntryLocation>();
 
-        EntryLogger.EntryLogScanner newScanner(final EntryLogMetadata meta) {
+        EntryLogScanner newScanner(final EntryLogMetadata meta) {
 
-            return new EntryLogger.EntryLogScanner() {
+            return new EntryLogScanner() {
                 @Override
                 public boolean accept(long ledgerId) {
                     return meta.containsLedger(ledgerId);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManager.java
index 340e9a18b6..106e6eedef 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManager.java
@@ -26,7 +26,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.List;
 
-import org.apache.bookkeeper.bookie.EntryLogger.BufferedLogChannel;
+import org.apache.bookkeeper.bookie.DefaultEntryLogger.BufferedLogChannel;
 
 interface EntryLogManager {
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerBase.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerBase.java
index f9c6d97cfd..363189a72e 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerBase.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerBase.java
@@ -21,7 +21,7 @@
 
 package org.apache.bookkeeper.bookie;
 
-import static org.apache.bookkeeper.bookie.EntryLogger.UNASSIGNED_LEDGERID;
+import static org.apache.bookkeeper.bookie.DefaultEntryLogger.UNASSIGNED_LEDGERID;
 
 import com.google.common.annotations.VisibleForTesting;
 import io.netty.buffer.ByteBuf;
@@ -31,8 +31,8 @@ import java.io.File;
 import java.io.IOException;
 import java.util.List;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.bookkeeper.bookie.EntryLogger.BufferedLogChannel;
-import org.apache.bookkeeper.bookie.EntryLogger.EntryLogListener;
+import org.apache.bookkeeper.bookie.DefaultEntryLogger.BufferedLogChannel;
+import org.apache.bookkeeper.bookie.DefaultEntryLogger.EntryLogListener;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 
@@ -41,14 +41,14 @@ abstract class EntryLogManagerBase implements EntryLogManager {
     volatile List<BufferedLogChannel> rotatedLogChannels;
     final EntryLoggerAllocator entryLoggerAllocator;
     final LedgerDirsManager ledgerDirsManager;
-    private final List<EntryLogger.EntryLogListener> listeners;
+    private final List<DefaultEntryLogger.EntryLogListener> listeners;
     /**
      * The maximum size of a entry logger file.
      */
     final long logSizeLimit;
 
     EntryLogManagerBase(ServerConfiguration conf, LedgerDirsManager ledgerDirsManager,
-            EntryLoggerAllocator entryLoggerAllocator, List<EntryLogger.EntryLogListener> listeners) {
+            EntryLoggerAllocator entryLoggerAllocator, List<DefaultEntryLogger.EntryLogListener> listeners) {
         this.ledgerDirsManager = ledgerDirsManager;
         this.entryLoggerAllocator = entryLoggerAllocator;
         this.listeners = listeners;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForEntryLogPerLedger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForEntryLogPerLedger.java
index c04ccbe8da..c70bfde9bf 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForEntryLogPerLedger.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForEntryLogPerLedger.java
@@ -57,7 +57,7 @@ import java.util.concurrent.locks.ReentrantLock;
 
 import lombok.extern.slf4j.Slf4j;
 
-import org.apache.bookkeeper.bookie.EntryLogger.BufferedLogChannel;
+import org.apache.bookkeeper.bookie.DefaultEntryLogger.BufferedLogChannel;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.stats.Counter;
@@ -259,7 +259,7 @@ class EntryLogManagerForEntryLogPerLedger extends EntryLogManagerBase {
      */
     private final ConcurrentLongHashMap<BufferedLogChannelWithDirInfo> replicaOfCurrentLogChannels;
     private final CacheLoader<Long, EntryLogAndLockTuple> entryLogAndLockTupleCacheLoader;
-    private final EntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus;
+    private final DefaultEntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus;
     private final int entrylogMapAccessExpiryTimeInSeconds;
     private final int maximumNumberOfActiveEntryLogs;
     private final int entryLogPerLedgerCounterLimitsMultFactor;
@@ -269,9 +269,10 @@ class EntryLogManagerForEntryLogPerLedger extends EntryLogManagerBase {
     final EntryLogsPerLedgerCounter entryLogsPerLedgerCounter;
 
     EntryLogManagerForEntryLogPerLedger(ServerConfiguration conf, LedgerDirsManager ledgerDirsManager,
-            EntryLoggerAllocator entryLoggerAllocator, List<EntryLogger.EntryLogListener> listeners,
-            EntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus, StatsLogger statsLogger)
-            throws IOException {
+                                        EntryLoggerAllocator entryLoggerAllocator,
+                                        List<DefaultEntryLogger.EntryLogListener> listeners,
+                                        DefaultEntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus,
+                                        StatsLogger statsLogger) throws IOException {
         super(conf, ledgerDirsManager, entryLoggerAllocator, listeners);
         this.recentlyCreatedEntryLogsStatus = recentlyCreatedEntryLogsStatus;
         this.rotatedLogChannels = new CopyOnWriteArrayList<BufferedLogChannel>();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForSingleEntryLog.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForSingleEntryLog.java
index f5b8823711..f387eeedcd 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForSingleEntryLog.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForSingleEntryLog.java
@@ -21,8 +21,8 @@
 
 package org.apache.bookkeeper.bookie;
 
-import static org.apache.bookkeeper.bookie.EntryLogger.INVALID_LID;
-import static org.apache.bookkeeper.bookie.EntryLogger.UNASSIGNED_LEDGERID;
+import static org.apache.bookkeeper.bookie.DefaultEntryLogger.INVALID_LID;
+import static org.apache.bookkeeper.bookie.DefaultEntryLogger.UNASSIGNED_LEDGERID;
 
 import io.netty.buffer.ByteBuf;
 import java.io.File;
@@ -33,7 +33,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.bookkeeper.bookie.EntryLogger.BufferedLogChannel;
+import org.apache.bookkeeper.bookie.DefaultEntryLogger.BufferedLogChannel;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.util.IOUtils;
@@ -44,11 +44,11 @@ class EntryLogManagerForSingleEntryLog extends EntryLogManagerBase {
     private volatile BufferedLogChannel activeLogChannel;
     private long logIdBeforeFlush = INVALID_LID;
     private final AtomicBoolean shouldCreateNewEntryLog = new AtomicBoolean(false);
-    private final EntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus;
+    private final DefaultEntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus;
 
     EntryLogManagerForSingleEntryLog(ServerConfiguration conf, LedgerDirsManager ledgerDirsManager,
-            EntryLoggerAllocator entryLoggerAllocator, List<EntryLogger.EntryLogListener> listeners,
-            EntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus) {
+            EntryLoggerAllocator entryLoggerAllocator, List<DefaultEntryLogger.EntryLogListener> listeners,
+            DefaultEntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus) {
         super(conf, ledgerDirsManager, entryLoggerAllocator, listeners);
         this.rotatedLogChannels = new LinkedList<BufferedLogChannel>();
         this.recentlyCreatedEntryLogsStatus = recentlyCreatedEntryLogsStatus;
@@ -157,7 +157,7 @@ class EntryLogManagerForSingleEntryLog extends EntryLogManagerBase {
         if (currentActiveLogChannel != null) {
             return currentActiveLogChannel.getLogId();
         } else {
-            return EntryLogger.UNINITIALIZED_LOG_ID;
+            return DefaultEntryLogger.UNINITIALIZED_LOG_ID;
         }
     }
 
@@ -260,7 +260,7 @@ class EntryLogManagerForSingleEntryLog extends EntryLogManagerBase {
     }
 
     @Override
-    public EntryLogger.BufferedLogChannel createNewLogForCompaction() throws IOException {
+    public DefaultEntryLogger.BufferedLogChannel createNewLogForCompaction() throws IOException {
         return entryLoggerAllocator.createNewLogForCompaction(selectDirForNextEntryLog());
     }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLoggerAllocator.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLoggerAllocator.java
index 2066e6d4ef..a2f61fd6ed 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLoggerAllocator.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLoggerAllocator.java
@@ -45,7 +45,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.bookkeeper.bookie.EntryLogger.BufferedLogChannel;
+import org.apache.bookkeeper.bookie.DefaultEntryLogger.BufferedLogChannel;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 
 /**
@@ -61,14 +61,14 @@ class EntryLoggerAllocator {
     private final LedgerDirsManager ledgerDirsManager;
     private final Object createEntryLogLock = new Object();
     private final Object createCompactionLogLock = new Object();
-    private final EntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus;
+    private final DefaultEntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus;
     private final boolean entryLogPreAllocationEnabled;
     private final ByteBufAllocator byteBufAllocator;
-    final ByteBuf logfileHeader = Unpooled.buffer(EntryLogger.LOGFILE_HEADER_SIZE);
+    final ByteBuf logfileHeader = Unpooled.buffer(DefaultEntryLogger.LOGFILE_HEADER_SIZE);
 
     EntryLoggerAllocator(ServerConfiguration conf, LedgerDirsManager ledgerDirsManager,
-            EntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus, long logId,
-            ByteBufAllocator byteBufAllocator) {
+                         DefaultEntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus, long logId,
+                         ByteBufAllocator byteBufAllocator) {
         this.conf = conf;
         this.byteBufAllocator = byteBufAllocator;
         this.ledgerDirsManager = ledgerDirsManager;
@@ -83,8 +83,8 @@ class EntryLoggerAllocator {
         // so there can be race conditions when entry logs are rolled over and
         // this header buffer is cleared before writing it into the new logChannel.
         logfileHeader.writeBytes("BKLO".getBytes(UTF_8));
-        logfileHeader.writeInt(EntryLogger.HEADER_CURRENT_VERSION);
-        logfileHeader.writerIndex(EntryLogger.LOGFILE_HEADER_SIZE);
+        logfileHeader.writeInt(DefaultEntryLogger.HEADER_CURRENT_VERSION);
+        logfileHeader.writerIndex(DefaultEntryLogger.LOGFILE_HEADER_SIZE);
 
     }
 
@@ -175,7 +175,7 @@ class EntryLoggerAllocator {
             setLastLogId(f, preallocatedLogId);
         }
 
-        if (suffix.equals(EntryLogger.LOG_FILE_SUFFIX)) {
+        if (suffix.equals(DefaultEntryLogger.LOG_FILE_SUFFIX)) {
             recentlyCreatedEntryLogsStatus.createdEntryLog(preallocatedLogId);
         }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
index 87a947731a..fba0bec90f 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
@@ -42,6 +42,7 @@ import lombok.Getter;
 import org.apache.bookkeeper.bookie.BookieException.EntryLogMetadataMapException;
 import org.apache.bookkeeper.bookie.GarbageCollector.GarbageCleaner;
 import org.apache.bookkeeper.bookie.stats.GarbageCollectorStats;
+import org.apache.bookkeeper.bookie.storage.EntryLogger;
 import org.apache.bookkeeper.bookie.storage.ldb.PersistentEntryLogMetadataMap;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.LedgerManager;
@@ -213,7 +214,8 @@ public class GarbageCollectorThread extends SafeRunnable {
             }
         };
         if (conf.getUseTransactionalCompaction()) {
-            this.compactor = new TransactionalEntryLogCompactor(conf, entryLogger, ledgerStorage, remover);
+            this.compactor = new TransactionalEntryLogCompactor(conf, entryLogger, ledgerStorage,
+                    ledgerDirsManager, remover);
         } else {
             this.compactor = new EntryLogCompactor(conf, entryLogger, ledgerStorage, remover);
         }
@@ -755,4 +757,4 @@ public class GarbageCollectorThread extends SafeRunnable {
             .minorCompactionCounter(gcStats.getMinorCompactionCounter().get())
             .build();
     }
-}
\ No newline at end of file
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
index a1dc14b2f6..a071fd5f40 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
@@ -57,8 +57,9 @@ import lombok.Getter;
 
 import org.apache.bookkeeper.bookie.Bookie.NoLedgerException;
 import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
-import org.apache.bookkeeper.bookie.EntryLogger.EntryLogListener;
+import org.apache.bookkeeper.bookie.DefaultEntryLogger.EntryLogListener;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
+import org.apache.bookkeeper.bookie.storage.EntryLogger;
 import org.apache.bookkeeper.common.util.Watcher;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.LedgerManager;
@@ -88,7 +89,7 @@ import org.slf4j.LoggerFactory;
 public class InterleavedLedgerStorage implements CompactableLedgerStorage, EntryLogListener {
     private static final Logger LOG = LoggerFactory.getLogger(InterleavedLedgerStorage.class);
 
-    EntryLogger entryLogger;
+    DefaultEntryLogger entryLogger;
     @Getter
     LedgerCache ledgerCache;
     protected CheckpointSource checkpointSource = CheckpointSource.DEFAULT;
@@ -158,7 +159,7 @@ public class InterleavedLedgerStorage implements CompactableLedgerStorage, Entry
                 ledgerManager,
                 ledgerDirsManager,
                 indexDirsManager,
-                new EntryLogger(conf, ledgerDirsManager, entryLogListener, statsLogger.scope(ENTRYLOGGER_SCOPE),
+                new DefaultEntryLogger(conf, ledgerDirsManager, entryLogListener, statsLogger.scope(ENTRYLOGGER_SCOPE),
                         allocator),
                 statsLogger);
     }
@@ -184,7 +185,7 @@ public class InterleavedLedgerStorage implements CompactableLedgerStorage, Entry
                 StatsLogger statsLogger) throws IOException {
         checkNotNull(checkpointSource, "invalid null checkpoint source");
         checkNotNull(checkpointer, "invalid null checkpointer");
-        this.entryLogger = entryLogger;
+        this.entryLogger = (DefaultEntryLogger) entryLogger;
         this.entryLogger.addListener(this);
         ledgerCache = new LedgerCacheImpl(conf, activeLedgers,
                 null == indexDirsManager ? ledgerDirsManager : indexDirsManager, statsLogger);
@@ -284,7 +285,7 @@ public class InterleavedLedgerStorage implements CompactableLedgerStorage, Entry
         LOG.info("Shutting down GC thread");
         gcThread.shutdown();
         LOG.info("Shutting down entry logger");
-        entryLogger.shutdown();
+        entryLogger.close();
         try {
             ledgerCache.close();
         } catch (IOException e) {
@@ -602,7 +603,7 @@ public class InterleavedLedgerStorage implements CompactableLedgerStorage, Entry
                             try {
                                 entryLogger.checkEntry(ledger, entry, offset);
                                 checkedEntries.increment();
-                            } catch (EntryLogger.EntryLookupException e) {
+                            } catch (DefaultEntryLogger.EntryLookupException e) {
                                 if (version != lep.getVersion()) {
                                     pageRetries.increment();
                                     if (lep.isDeleted()) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedStorageRegenerateIndexOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedStorageRegenerateIndexOp.java
index ad80d4ac1a..46509dc23b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedStorageRegenerateIndexOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedStorageRegenerateIndexOp.java
@@ -30,7 +30,7 @@ import java.util.PrimitiveIterator.OfLong;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.bookkeeper.bookie.EntryLogger.EntryLogScanner;
+import org.apache.bookkeeper.bookie.storage.EntryLogScanner;
 import org.apache.bookkeeper.common.util.Watcher;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.proto.checksum.DigestManager;
@@ -94,7 +94,7 @@ public class InterleavedStorageRegenerateIndexOp {
                 conf, diskChecker, NullStatsLogger.INSTANCE);
         LedgerDirsManager indexDirsManager = BookieResources.createIndexDirsManager(
                 conf, diskChecker,  NullStatsLogger.INSTANCE, ledgerDirsManager);
-        EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager);
+        DefaultEntryLogger entryLogger = new DefaultEntryLogger(conf, ledgerDirsManager);
         final LedgerCache ledgerCache;
         if (dryRun) {
             ledgerCache = new DryRunLedgerCache();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyEntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyDefaultEntryLogger.java
similarity index 86%
copy from bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyEntryLogger.java
copy to bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyDefaultEntryLogger.java
index 2a683dcefa..193a450d55 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyEntryLogger.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyDefaultEntryLogger.java
@@ -29,14 +29,14 @@ import org.apache.bookkeeper.conf.ServerConfiguration;
 /**
  * Read Only Entry Logger.
  */
-public class ReadOnlyEntryLogger extends EntryLogger {
+public class ReadOnlyDefaultEntryLogger extends DefaultEntryLogger {
 
-    public ReadOnlyEntryLogger(ServerConfiguration conf) throws IOException {
+    public ReadOnlyDefaultEntryLogger(ServerConfiguration conf) throws IOException {
         super(conf);
     }
 
     @Override
-    protected boolean removeEntryLog(long entryLogId) {
+    public boolean removeEntryLog(long entryLogId) {
         // can't remove entry log in readonly mode
         return false;
     }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
index 428a126271..89a429b762 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
@@ -37,6 +37,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
+import org.apache.bookkeeper.bookie.storage.EntryLogger;
 import org.apache.bookkeeper.common.util.Watcher;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.LedgerManager;
@@ -54,7 +55,7 @@ import org.slf4j.LoggerFactory;
  */
 public class SortedLedgerStorage
         implements LedgerStorage, CacheCallback, SkipListFlusher,
-            CompactableLedgerStorage, EntryLogger.EntryLogListener {
+            CompactableLedgerStorage, DefaultEntryLogger.EntryLogListener {
     private static final Logger LOG = LoggerFactory.getLogger(SortedLedgerStorage.class);
 
     EntryMemTable memTable;
@@ -260,7 +261,8 @@ public class SortedLedgerStorage
     @Override
     public void checkpoint(final Checkpoint checkpoint) throws IOException {
         long numBytesFlushed = memTable.flush(this, checkpoint);
-        interleavedLedgerStorage.getEntryLogger().prepareSortedLedgerStorageCheckpoint(numBytesFlushed);
+        ((DefaultEntryLogger) interleavedLedgerStorage.getEntryLogger())
+            .prepareSortedLedgerStorageCheckpoint(numBytesFlushed);
         interleavedLedgerStorage.checkpoint(checkpoint);
     }
 
@@ -315,9 +317,9 @@ public class SortedLedgerStorage
             public void run() {
                 try {
                     LOG.info("Started flushing mem table.");
-                    interleavedLedgerStorage.getEntryLogger().prepareEntryMemTableFlush();
+                    ((DefaultEntryLogger) interleavedLedgerStorage.getEntryLogger()).prepareEntryMemTableFlush();
                     memTable.flush(SortedLedgerStorage.this);
-                    if (interleavedLedgerStorage.getEntryLogger().commitEntryMemTableFlush()) {
+                    if (((DefaultEntryLogger) interleavedLedgerStorage.getEntryLogger()).commitEntryMemTableFlush()) {
                         interleavedLedgerStorage.checkpointer.startCheckpoint(cp);
                     }
                 } catch (Exception e) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java
index aec934fa15..7c7307c371 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java
@@ -23,14 +23,15 @@ package org.apache.bookkeeper.bookie;
 
 import io.netty.buffer.ByteBuf;
 
-import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.bookkeeper.bookie.EntryLogger.EntryLogScanner;
+import org.apache.bookkeeper.bookie.storage.CompactionEntryLog;
+import org.apache.bookkeeper.bookie.storage.EntryLogScanner;
+import org.apache.bookkeeper.bookie.storage.EntryLogger;
+
 import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.util.HardLink;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,6 +48,7 @@ public class TransactionalEntryLogCompactor extends AbstractLogCompactor {
 
     final EntryLogger entryLogger;
     final CompactableLedgerStorage ledgerStorage;
+    final LedgerDirsManager ledgerDirsManager;
     final List<EntryLocation> offsets = new ArrayList<>();
 
     // compaction log file suffix
@@ -58,10 +60,12 @@ public class TransactionalEntryLogCompactor extends AbstractLogCompactor {
             ServerConfiguration conf,
             EntryLogger entryLogger,
             CompactableLedgerStorage ledgerStorage,
+            LedgerDirsManager ledgerDirsManager,
             LogRemovalListener logRemover) {
         super(conf, logRemover);
         this.entryLogger = entryLogger;
         this.ledgerStorage = ledgerStorage;
+        this.ledgerDirsManager = ledgerDirsManager;
     }
 
     /**
@@ -70,25 +74,10 @@ public class TransactionalEntryLogCompactor extends AbstractLogCompactor {
     @Override
     public void cleanUpAndRecover() {
         // clean up compacting logs and recover index for already compacted logs
-        List<File> ledgerDirs = entryLogger.getLedgerDirsManager().getAllLedgerDirs();
-        for (File dir : ledgerDirs) {
-            File[] compactingPhaseFiles = dir.listFiles(file -> file.getName().endsWith(COMPACTING_SUFFIX));
-            if (compactingPhaseFiles != null) {
-                for (File file : compactingPhaseFiles) {
-                    if (file.delete()) {
-                        LOG.info("Deleted failed compaction file {}", file);
-                    }
-                }
-            }
-            File[] compactedPhaseFiles = dir.listFiles(file -> file.getName().endsWith(COMPACTED_SUFFIX));
-            if (compactedPhaseFiles != null) {
-                for (File compactedFile : compactedPhaseFiles) {
-                    LOG.info("Found compacted log file {} has partially flushed index, recovering index.",
-                            compactedFile);
-                    CompactionPhase updateIndex = new UpdateIndexPhase(compactedFile, true);
-                    updateIndex.run();
-                }
-            }
+        for (CompactionEntryLog log : entryLogger.incompleteCompactionLogs()) {
+            LOG.info("Found compacted log file {} has partially flushed index, recovering index.", log);
+            CompactionPhase updateIndex = new UpdateIndexPhase(log, true);
+            updateIndex.run();
         }
     }
 
@@ -97,19 +86,26 @@ public class TransactionalEntryLogCompactor extends AbstractLogCompactor {
         if (metadata != null) {
             LOG.info("Compacting entry log {} with usage {}.",
                 metadata.getEntryLogId(), metadata.getUsage());
-            CompactionPhase scanEntryLog = new ScanEntryLogPhase(metadata);
+            CompactionEntryLog compactionLog;
+            try {
+                compactionLog = entryLogger.newCompactionLog(metadata.getEntryLogId());
+            } catch (IOException ioe) {
+                LOG.error("Exception creating new compaction entry log", ioe);
+                return false;
+            }
+            CompactionPhase scanEntryLog = new ScanEntryLogPhase(metadata, compactionLog);
             if (!scanEntryLog.run()) {
                 LOG.info("Compaction for entry log {} end in ScanEntryLogPhase.", metadata.getEntryLogId());
                 return false;
             }
-            File compactionLogFile = entryLogger.getCurCompactionLogFile();
-            CompactionPhase flushCompactionLog = new FlushCompactionLogPhase(metadata.getEntryLogId());
+
+            CompactionPhase flushCompactionLog = new FlushCompactionLogPhase(compactionLog);
             if (!flushCompactionLog.run()) {
                 LOG.info("Compaction for entry log {} end in FlushCompactionLogPhase.", metadata.getEntryLogId());
                 return false;
             }
-            File compactedLogFile = getCompactedLogFile(compactionLogFile, metadata.getEntryLogId());
-            CompactionPhase updateIndex = new UpdateIndexPhase(compactedLogFile);
+
+            CompactionPhase updateIndex = new UpdateIndexPhase(compactionLog);
             if (!updateIndex.run()) {
                 LOG.info("Compaction for entry log {} end in UpdateIndexPhase.", metadata.getEntryLogId());
                 return false;
@@ -161,16 +157,17 @@ public class TransactionalEntryLogCompactor extends AbstractLogCompactor {
      */
     class ScanEntryLogPhase extends CompactionPhase {
         private final EntryLogMetadata metadata;
+        private final CompactionEntryLog compactionLog;
 
-        ScanEntryLogPhase(EntryLogMetadata metadata) {
+        ScanEntryLogPhase(EntryLogMetadata metadata, CompactionEntryLog compactionLog) {
             super("ScanEntryLogPhase");
             this.metadata = metadata;
+            this.compactionLog = compactionLog;
         }
 
         @Override
         void start() throws IOException {
             // scan entry log into compaction log and offset list
-            entryLogger.createNewCompactionLog();
             entryLogger.scanEntryLog(metadata.getEntryLogId(), new EntryLogScanner() {
                 @Override
                 public boolean accept(long ledgerId) {
@@ -189,7 +186,7 @@ public class TransactionalEntryLogCompactor extends AbstractLogCompactor {
                                     ledgerId, lid, entryId, offset);
                             throw new IOException("Invalid entry found @ offset " + offset);
                         }
-                        long newOffset = entryLogger.addEntryForCompaction(ledgerId, entry);
+                        long newOffset = compactionLog.addEntry(ledgerId, entry);
                         offsets.add(new EntryLocation(ledgerId, entryId, newOffset));
 
                         if (LOG.isDebugEnabled()) {
@@ -207,7 +204,7 @@ public class TransactionalEntryLogCompactor extends AbstractLogCompactor {
                 // no valid entries is compacted, delete entry log file
                 LOG.info("No valid entry is found in entry log after scan, removing entry log now.");
                 logRemovalListener.removeEntryLog(metadata.getEntryLogId());
-                entryLogger.removeCurCompactionLog();
+                compactionLog.abort();
                 return false;
             }
             return true;
@@ -217,9 +214,8 @@ public class TransactionalEntryLogCompactor extends AbstractLogCompactor {
         void abort() {
             offsets.clear();
             // since we haven't flushed yet, we only need to delete the unflushed compaction file.
-            entryLogger.removeCurCompactionLog();
+            compactionLog.abort();
         }
-
     }
 
     /**
@@ -230,51 +226,35 @@ public class TransactionalEntryLogCompactor extends AbstractLogCompactor {
      * a hardlink file "3.log.1.compacted" should be created, and "3.log.compacting" should be deleted.
      */
     class FlushCompactionLogPhase extends CompactionPhase {
-        private final long compactingLogId;
-        private File compactedLogFile;
+        final CompactionEntryLog compactionLog;
 
-        FlushCompactionLogPhase(long compactingLogId) {
+        FlushCompactionLogPhase(CompactionEntryLog compactionLog) {
             super("FlushCompactionLogPhase");
-            this.compactingLogId = compactingLogId;
+            this.compactionLog = compactionLog;
         }
 
         @Override
         void start() throws IOException {
             // flush the current compaction log.
-            File compactionLogFile = entryLogger.getCurCompactionLogFile();
-            if (compactionLogFile == null || !compactionLogFile.exists()) {
-                throw new IOException("Compaction log doesn't exist during flushing");
-            }
-            entryLogger.flushCompactionLog();
+            compactionLog.flush();
         }
 
         @Override
         boolean complete() throws IOException {
-            // create a hard link file named "x.log.y.compacted" for file "x.log.compacting".
-            // where x is compactionLogId and y is compactingLogId.
-            File compactionLogFile = entryLogger.getCurCompactionLogFile();
-            if (compactionLogFile == null || !compactionLogFile.exists()) {
-                LOG.warn("Compaction log doesn't exist any more after flush");
+            try {
+                compactionLog.markCompacted();
+                return true;
+            } catch (IOException ioe) {
+                LOG.warn("Error marking compaction as done", ioe);
                 return false;
             }
-            compactedLogFile = getCompactedLogFile(compactionLogFile, compactingLogId);
-            if (compactedLogFile != null && !compactedLogFile.exists()) {
-                HardLink.createHardLink(compactionLogFile, compactedLogFile);
-            }
-            entryLogger.removeCurCompactionLog();
-            return true;
         }
 
         @Override
         void abort() {
             offsets.clear();
             // remove compaction log file and its hardlink
-            entryLogger.removeCurCompactionLog();
-            if (compactedLogFile != null && compactedLogFile.exists()) {
-                if (!compactedLogFile.delete()) {
-                    LOG.warn("Could not delete compacted log file {}", compactedLogFile);
-                }
-            }
+            compactionLog.abort();
         }
     }
 
@@ -289,41 +269,29 @@ public class TransactionalEntryLogCompactor extends AbstractLogCompactor {
      * <p>This phase can also used to recover partially flushed index when we pass isInRecovery=true
      */
     class UpdateIndexPhase extends CompactionPhase {
-        File compactedLogFile;
-        File newEntryLogFile;
+        final CompactionEntryLog compactionLog;
         private final boolean isInRecovery;
 
-        public UpdateIndexPhase(File compactedLogFile) {
-            this(compactedLogFile, false);
+        public UpdateIndexPhase(CompactionEntryLog compactionLog) {
+            this(compactionLog, false);
         }
 
-        public UpdateIndexPhase(File compactedLogFile, boolean isInRecovery) {
+        public UpdateIndexPhase(CompactionEntryLog compactionLog, boolean isInRecovery) {
             super("UpdateIndexPhase");
-            this.compactedLogFile = compactedLogFile;
+            this.compactionLog = compactionLog;
             this.isInRecovery = isInRecovery;
         }
 
         @Override
         void start() throws IOException {
-            if (compactedLogFile != null && compactedLogFile.exists()) {
-                File dir = compactedLogFile.getParentFile();
-                String compactedFilename = compactedLogFile.getName();
-                // create a hard link "x.log" for file "x.log.y.compacted"
-                this.newEntryLogFile = new File(dir, compactedFilename.substring(0,
-                            compactedFilename.indexOf(".log") + 4));
-                if (!newEntryLogFile.exists()) {
-                    HardLink.createHardLink(compactedLogFile, newEntryLogFile);
-                }
-                if (isInRecovery) {
-                    recoverEntryLocations(EntryLogger.fileName2LogId(newEntryLogFile.getName()));
-                }
-                if (!offsets.isEmpty()) {
-                    // update entry locations and flush index
-                    ledgerStorage.updateEntriesLocations(offsets);
-                    ledgerStorage.flushEntriesLocationsIndex();
-                }
-            } else {
-                throw new IOException("Failed to find compacted log file in UpdateIndexPhase");
+            compactionLog.makeAvailable();
+            if (isInRecovery) {
+                recoverEntryLocations(compactionLog);
+            }
+            if (!offsets.isEmpty()) {
+                // update entry locations and flush index
+                ledgerStorage.updateEntriesLocations(offsets);
+                ledgerStorage.flushEntriesLocationsIndex();
             }
         }
 
@@ -332,16 +300,8 @@ public class TransactionalEntryLogCompactor extends AbstractLogCompactor {
             // When index is flushed, and entry log is removed,
             // delete the ".compacted" file to indicate this phase is completed.
             offsets.clear();
-            if (compactedLogFile != null) {
-                if (!compactedLogFile.delete()) {
-                    LOG.warn("Could not delete compacted log file {}", compactedLogFile);
-                }
-                // Now delete the old entry log file since it's compacted
-                String compactedFilename = compactedLogFile.getName();
-                String oldEntryLogFilename = compactedFilename.substring(compactedFilename.indexOf(".log") + 5);
-                long entryLogId = EntryLogger.fileName2LogId(oldEntryLogFilename);
-                logRemovalListener.removeEntryLog(entryLogId);
-            }
+            compactionLog.cleanup();
+            logRemovalListener.removeEntryLog(compactionLog.getCompactedLogId());
             return true;
         }
 
@@ -353,8 +313,8 @@ public class TransactionalEntryLogCompactor extends AbstractLogCompactor {
         /**
          * Scan entry log to recover entry locations.
          */
-        private void recoverEntryLocations(long compactedLogId) throws IOException {
-            entryLogger.scanEntryLog(compactedLogId, new EntryLogScanner() {
+        private void recoverEntryLocations(CompactionEntryLog compactionLog) throws IOException {
+            compactionLog.scan(new EntryLogScanner() {
                 @Override
                 public boolean accept(long ledgerId) {
                     return true;
@@ -370,23 +330,11 @@ public class TransactionalEntryLogCompactor extends AbstractLogCompactor {
                                 ledgerId, lid, entryId, offset);
                         throw new IOException("Invalid entry found @ offset " + offset);
                     }
-                    long location = (compactedLogId << 32L) | (offset + 4);
+                    long location = (compactionLog.getLogId() << 32L) | (offset + 4);
                     offsets.add(new EntryLocation(lid, entryId, location));
                 }
             });
-            LOG.info("Recovered {} entry locations from compacted log {}", offsets.size(), compactedLogId);
+            LOG.info("Recovered {} entry locations from compacted log {}", offsets.size(), compactionLog.getLogId());
         }
     }
-
-    File getCompactedLogFile(File compactionLogFile, long compactingLogId) {
-        if (compactionLogFile == null) {
-            return null;
-        }
-        File dir = compactionLogFile.getParentFile();
-        String filename = compactionLogFile.getName();
-        String newSuffix = ".log." + EntryLogger.logId2HexString(compactingLogId) + COMPACTED_SUFFIX;
-        String hardLinkFilename = filename.replace(COMPACTING_SUFFIX, newSuffix);
-        return new File(dir, hardLinkFilename);
-    }
-
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/CompactionEntryLog.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/CompactionEntryLog.java
new file mode 100644
index 0000000000..60806add86
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/CompactionEntryLog.java
@@ -0,0 +1,91 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage;
+
+import io.netty.buffer.ByteBuf;
+import java.io.IOException;
+
+/**
+ * An entrylog to received compacted entries.
+ * <p/>
+ * The expected lifecycle for a compaction entry log is:
+ * 1. Creation
+ * 2. Mark compacted
+ * 3. Make available
+ * 4. Cleanup
+ * <p/>
+ * Abort can happen at during any step.
+ */
+public interface CompactionEntryLog {
+    /**
+     * Add an entry to the log.
+     * @param ledgerId the ledger the entry belong to
+     * @param entry the payload of the entry
+     * @return the position to which the entry was written
+     */
+    long addEntry(long ledgerId, ByteBuf entry) throws IOException;
+
+    /**
+     * Scan the entry log, reading out all contained entries.
+     */
+    void scan(EntryLogScanner scanner) throws IOException;
+
+    /**
+     * Flush any unwritten entries to physical storage.
+     */
+    void flush() throws IOException;
+
+    /**
+     * Abort the compaction log. This should delete any resources held
+     * by this log.
+     */
+    void abort();
+
+    /**
+     * Mark the compaction log as compacted.
+     * From this point, the heavy work of copying entries from one log
+     * to another should be done. We don't want to repeat that work,
+     * so this method should take steps to ensure that if the bookie crashes
+     * we can resume the compaction from this point.
+     */
+    void markCompacted() throws IOException;
+
+    /**
+     * Make the log written by the compaction process available for reads.
+     */
+    void makeAvailable() throws IOException;
+
+    /**
+     * Clean up any temporary resources that were used by the compaction process.
+     * At this point, there
+     */
+    void cleanup();
+
+    /**
+     * Get the log ID of the entrylog to which compacted entries are being written.
+     */
+    long getLogId();
+
+    /**
+     * Get the log ID of the entrylog which is being compacted.
+     */
+    long getCompactedLogId();
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyEntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/EntryLogScanner.java
similarity index 52%
copy from bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyEntryLogger.java
copy to bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/EntryLogScanner.java
index 2a683dcefa..bcca2b1988 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyEntryLogger.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/EntryLogScanner.java
@@ -1,4 +1,4 @@
-/*
+/**
  *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -18,31 +18,36 @@
  * under the License.
  *
  */
+package org.apache.bookkeeper.bookie.storage;
 
-package org.apache.bookkeeper.bookie;
+import io.netty.buffer.ByteBuf;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.bookkeeper.conf.ServerConfiguration;
 
 /**
- * Read Only Entry Logger.
+ * Scan entries in a entry log file.
  */
-public class ReadOnlyEntryLogger extends EntryLogger {
-
-    public ReadOnlyEntryLogger(ServerConfiguration conf) throws IOException {
-        super(conf);
-    }
-
-    @Override
-    protected boolean removeEntryLog(long entryLogId) {
-        // can't remove entry log in readonly mode
-        return false;
-    }
+public interface EntryLogScanner {
+    /**
+     * Tests whether or not the entries belongs to the specified ledger
+     * should be processed.
+     *
+     * @param ledgerId
+     *          Ledger ID.
+     * @return true if and only the entries of the ledger should be scanned.
+     */
+    boolean accept(long ledgerId);
 
-    @Override
-    public synchronized long addEntry(long ledgerId, ByteBuffer entry) throws IOException {
-        throw new IOException("Can't add entry to a readonly entry logger.");
-    }
+    /**
+     * Process an entry.
+     *
+     * @param ledgerId
+     *          Ledger ID.
+     * @param offset
+     *          File offset of this entry.
+     * @param entry
+     *          Entry ByteBuf
+     * @throws IOException
+     */
+    void process(long ledgerId, long offset, ByteBuf entry) throws IOException;
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/EntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/EntryLogger.java
new file mode 100644
index 0000000000..cab72cd587
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/EntryLogger.java
@@ -0,0 +1,149 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage;
+
+import io.netty.buffer.ByteBuf;
+import java.io.IOException;
+import java.util.Collection;
+import org.apache.bookkeeper.bookie.AbstractLogCompactor;
+import org.apache.bookkeeper.bookie.Bookie.NoEntryException;
+import org.apache.bookkeeper.bookie.EntryLogMetadata;
+
+
+/**
+ * Entry logger. Sequentially writes entries for a large number of ledgers to
+ * a small number of log files, to avoid many random writes.
+ * When an entry is added, a location is returned, which consists of the ID of the
+ * log into which the entry was added, and the offset of that entry within the log.
+ * The location is a long, with 32 bits each for the log ID and the offset. This
+ * naturally limits the offset and thus the size of the log to Integer.MAX_VALUE.
+ */
+public interface EntryLogger extends AutoCloseable {
+    long UNASSIGNED_LEDGERID = -1L;
+    // log file suffix
+    String LOG_FILE_SUFFIX = ".log";
+
+    /**
+     * Add an entry for ledger ```ledgerId``` to the entrylog.
+     * @param ledgerId the ledger for which the entry is being added
+     * @param buf the contents of the entry (this method does not take ownership of the refcount)
+     * @return the location in the entry log of the added entry
+     */
+    long addEntry(long ledgerId, ByteBuf buf) throws IOException;
+
+    /**
+     * Read an entry from an entrylog location.
+     * @param entryLocation the location from which to read the entry
+     * @return the entry
+     */
+    ByteBuf readEntry(long entryLocation)
+            throws IOException, NoEntryException;
+    /**
+     * Read an entry from an entrylog location, and verify that is matches the
+     * expected ledger and entry ID.
+     * @param ledgerId the ledgerID to match
+     * @param entryId the entryID to match
+     * @param entryLocation the location from which to read the entry
+     * @return the entry
+     */
+    ByteBuf readEntry(long ledgerId, long entryId, long entryLocation)
+            throws IOException, NoEntryException;
+
+    /**
+     * Flush any outstanding writes to disk.
+     */
+    void flush() throws IOException;
+
+    @Override
+    void close() throws IOException;
+
+    /**
+     * Create a new entrylog into which compacted entries can be added.
+     * There is a 1-1 mapping between logs that are being compacted
+     * and the log the compacted entries are written to.
+     */
+    CompactionEntryLog newCompactionLog(long logToCompact) throws IOException;
+
+    /**
+     * Return a collection of all the compaction entry logs which have been
+     * compacted, but have not been cleaned up.
+     */
+    Collection<CompactionEntryLog> incompleteCompactionLogs();
+
+    /**
+     * Get the last log id created so far. If entryLogPerLedger is enabled, the Garbage Collector
+     * process needs to look beyond the least unflushed entry log file, as there may be entry logs
+     * ready to be garbage collected.
+     *
+     * @return last entry log id created.
+     */
+    long getLastLogId();
+
+    /**
+     * Get the lowest entrylog ID which has not had all its data persisted to
+     * disk.
+     * This is used by compaction. Any entrylog ID higher or equal to this value
+     * will not be considered for compaction.
+     */
+    long getLeastUnflushedLogId();
+
+    /**
+     * Scan the given entrylog, returning all entries contained therein.
+     */
+    void scanEntryLog(long entryLogId, EntryLogScanner scanner) throws IOException;
+
+    /**
+     * Retrieve metadata for the given entrylog ID.
+     * The metadata contains the size of the log, the size of the data in the log which is still
+     * active, and a list of all the ledgers contained in the log and the size of the data stored
+     * for each ledger.
+     */
+    default EntryLogMetadata getEntryLogMetadata(long entryLogId) throws IOException {
+        return getEntryLogMetadata(entryLogId, null);
+    }
+
+    /**
+     * Retrieve metadata for the given entrylog ID.
+     * The metadata contains the size of the log, the size of the data in the log which is still
+     * active, and a list of all the ledgers contained in the log and the size of the data stored
+     * for each ledger.
+     */
+    EntryLogMetadata getEntryLogMetadata(long entryLogId, AbstractLogCompactor.Throttler throttler) throws IOException;
+
+    /**
+     * Check whether an entrylog with the given ID exists.
+     */
+    boolean logExists(long logId);
+
+    /**
+     * Returns whether the current log id exists and has been rotated already.
+     *
+     * @param entryLogId EntryLog id to check.
+     * @return Whether the given entryLogId exists and has been rotated.
+     */
+    boolean isFlushedEntryLog(Long entryLogId);
+
+    /**
+     * Delete the entrylog with the given ID.
+     * @return false if the entrylog doesn't exist.
+     */
+    boolean removeEntryLog(long entryLogId);
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgersIndexRebuildOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgersIndexRebuildOp.java
index 6e481ca569..1f5cf8de4c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgersIndexRebuildOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LedgersIndexRebuildOp.java
@@ -37,10 +37,10 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import org.apache.bookkeeper.bookie.BookieImpl;
-import org.apache.bookkeeper.bookie.EntryLogger;
-import org.apache.bookkeeper.bookie.EntryLogger.EntryLogScanner;
+import org.apache.bookkeeper.bookie.DefaultEntryLogger;
 import org.apache.bookkeeper.bookie.Journal;
 import org.apache.bookkeeper.bookie.LedgerDirsManager;
+import org.apache.bookkeeper.bookie.storage.EntryLogScanner;
 import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorageFactory.DbConfigType;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.util.BookKeeperConstants;
@@ -136,7 +136,7 @@ public class LedgersIndexRebuildOp {
     }
 
     private void scanEntryLogFiles(Set<Long> ledgers) throws IOException {
-        EntryLogger entryLogger = new EntryLogger(conf, new LedgerDirsManager(conf, conf.getLedgerDirs(),
+        DefaultEntryLogger entryLogger = new DefaultEntryLogger(conf, new LedgerDirsManager(conf, conf.getLedgerDirs(),
                 new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold())));
         Set<Long> entryLogs = entryLogger.getEntryLogsSet();
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java
index 55c5c90c31..5d53c66081 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/LocationsIndexRebuildOp.java
@@ -34,9 +34,9 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.bookkeeper.bookie.BookieImpl;
-import org.apache.bookkeeper.bookie.EntryLogger;
-import org.apache.bookkeeper.bookie.EntryLogger.EntryLogScanner;
+import org.apache.bookkeeper.bookie.DefaultEntryLogger;
 import org.apache.bookkeeper.bookie.LedgerDirsManager;
+import org.apache.bookkeeper.bookie.storage.EntryLogScanner;
 import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorageFactory.DbConfigType;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.stats.NullStatsLogger;
@@ -69,7 +69,7 @@ public class LocationsIndexRebuildOp {
 
         long startTime = System.nanoTime();
 
-        EntryLogger entryLogger = new EntryLogger(conf, new LedgerDirsManager(conf, conf.getLedgerDirs(),
+        DefaultEntryLogger entryLogger = new DefaultEntryLogger(conf, new LedgerDirsManager(conf, conf.getLedgerDirs(),
                 new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold())));
         Set<Long> entryLogs = entryLogger.getEntryLogsSet();
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
index 753b88f7ec..63f2f5a1b7 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
@@ -58,8 +58,8 @@ import org.apache.bookkeeper.bookie.CheckpointSource;
 import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
 import org.apache.bookkeeper.bookie.Checkpointer;
 import org.apache.bookkeeper.bookie.CompactableLedgerStorage;
+import org.apache.bookkeeper.bookie.DefaultEntryLogger;
 import org.apache.bookkeeper.bookie.EntryLocation;
-import org.apache.bookkeeper.bookie.EntryLogger;
 import org.apache.bookkeeper.bookie.GarbageCollectionStatus;
 import org.apache.bookkeeper.bookie.GarbageCollectorThread;
 import org.apache.bookkeeper.bookie.LastAddConfirmedUpdateNotification;
@@ -68,6 +68,7 @@ import org.apache.bookkeeper.bookie.LedgerDirsManager;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
 import org.apache.bookkeeper.bookie.LedgerEntryPage;
 import org.apache.bookkeeper.bookie.StateManager;
+import org.apache.bookkeeper.bookie.storage.EntryLogger;
 import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorageDataFormats.LedgerData;
 import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorage.Batch;
 import org.apache.bookkeeper.common.util.Watcher;
@@ -185,7 +186,7 @@ public class SingleDirectoryDbLedgerStorage implements CompactableLedgerStorage
                 TransientLedgerInfo.LEDGER_INFO_CACHING_TIME_MINUTES,
                 TransientLedgerInfo.LEDGER_INFO_CACHING_TIME_MINUTES, TimeUnit.MINUTES);
 
-        entryLogger = new EntryLogger(conf, ledgerDirsManager, null, statsLogger, allocator);
+        entryLogger = new DefaultEntryLogger(conf, ledgerDirsManager, null, statsLogger, allocator);
         gcThread = new GarbageCollectorThread(conf, ledgerManager, ledgerDirsManager, this, statsLogger);
 
         dbLedgerStorageStats = new DbLedgerStorageStats(
@@ -265,7 +266,7 @@ public class SingleDirectoryDbLedgerStorage implements CompactableLedgerStorage
             flush();
 
             gcThread.shutdown();
-            entryLogger.shutdown();
+            entryLogger.close();
 
             cleanupExecutor.shutdown();
             cleanupExecutor.awaitTermination(1, TimeUnit.SECONDS);
@@ -589,8 +590,8 @@ public class SingleDirectoryDbLedgerStorage implements CompactableLedgerStorage
             while (count < readAheadCacheBatchSize
                     && size < maxReadAheadBytesSize
                     && currentEntryLogId == firstEntryLogId) {
-                ByteBuf entry = entryLogger.internalReadEntry(orginalLedgerId, firstEntryId, currentEntryLocation,
-                        false /* validateEntry */);
+                ByteBuf entry = entryLogger.readEntry(orginalLedgerId,
+                        firstEntryId, currentEntryLocation);
 
                 try {
                     long currentEntryLedgerId = entry.getLong(0);
@@ -722,7 +723,7 @@ public class SingleDirectoryDbLedgerStorage implements CompactableLedgerStorage
             Batch batch = entryLocationIndex.newBatch();
             writeCacheBeingFlushed.forEach((ledgerId, entryId, entry) -> {
                 try {
-                    long location = entryLogger.addEntry(ledgerId, entry, true);
+                    long location = entryLogger.addEntry(ledgerId, entry);
                     entryLocationIndex.addLocation(batch, ledgerId, entryId, location);
                 } catch (IOException e) {
                     throw new RuntimeException(e);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyEntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/package-info.java
similarity index 54%
rename from bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyEntryLogger.java
rename to bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/package-info.java
index 2a683dcefa..f874453270 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyEntryLogger.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/package-info.java
@@ -1,5 +1,4 @@
 /*
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -16,33 +15,8 @@
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
- *
  */
-
-package org.apache.bookkeeper.bookie;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.bookkeeper.conf.ServerConfiguration;
-
 /**
- * Read Only Entry Logger.
+ * Provides a <i>Bookie</i> server that stores entries for clients.
  */
-public class ReadOnlyEntryLogger extends EntryLogger {
-
-    public ReadOnlyEntryLogger(ServerConfiguration conf) throws IOException {
-        super(conf);
-    }
-
-    @Override
-    protected boolean removeEntryLog(long entryLogId) {
-        // can't remove entry log in readonly mode
-        return false;
-    }
-
-    @Override
-    public synchronized long addEntry(long ledgerId, ByteBuffer entry) throws IOException {
-        throw new IOException("Can't add entry to a readonly entry logger.");
-    }
-}
+package org.apache.bookkeeper.bookie.storage;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/ListActiveLedgersCommand.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/ListActiveLedgersCommand.java
index 5662d3d6d5..fbe230cbcd 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/ListActiveLedgersCommand.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/ListActiveLedgersCommand.java
@@ -35,9 +35,9 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import lombok.Setter;
 import lombok.experimental.Accessors;
+import org.apache.bookkeeper.bookie.DefaultEntryLogger;
 import org.apache.bookkeeper.bookie.EntryLogMetadata;
-import org.apache.bookkeeper.bookie.EntryLogger;
-import org.apache.bookkeeper.bookie.ReadOnlyEntryLogger;
+import org.apache.bookkeeper.bookie.ReadOnlyDefaultEntryLogger;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.LedgerManager;
@@ -133,7 +133,7 @@ public class ListActiveLedgersCommand extends BookieCommand<ActiveLedgerFlags>{
             BKException.Code.OK, BKException.Code.ReadException);
           if (done.await(cmdFlags.timeout, TimeUnit.MILLISECONDS)){
             if  (resultCode.get() == BKException.Code.OK) {
-              EntryLogger entryLogger = new ReadOnlyEntryLogger(bkConf);
+              DefaultEntryLogger entryLogger = new ReadOnlyDefaultEntryLogger(bkConf);
               EntryLogMetadata entryLogMetadata = entryLogger.getEntryLogMetadata(cmdFlags.logId);
               List<Long> ledgersOnEntryLog = entryLogMetadata.getLedgersMap().keys();
               if (ledgersOnEntryLog.size() == 0) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/ReadLogCommand.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/ReadLogCommand.java
index 11311936ad..edef6609ff 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/ReadLogCommand.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/ReadLogCommand.java
@@ -25,8 +25,9 @@ import java.io.File;
 import java.io.IOException;
 import lombok.Setter;
 import lombok.experimental.Accessors;
-import org.apache.bookkeeper.bookie.EntryLogger;
-import org.apache.bookkeeper.bookie.ReadOnlyEntryLogger;
+import org.apache.bookkeeper.bookie.ReadOnlyDefaultEntryLogger;
+import org.apache.bookkeeper.bookie.storage.EntryLogScanner;
+import org.apache.bookkeeper.bookie.storage.EntryLogger;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.tools.cli.helpers.BookieCommand;
 import org.apache.bookkeeper.tools.framework.CliFlags;
@@ -174,7 +175,7 @@ public class ReadLogCommand extends BookieCommand<ReadLogCommand.ReadLogFlags> {
         LOG.info("Scan entry log {} ({}.log) for PositionRange: {} - {}",
             logId, Long.toHexString(logId), rangeStartPos, rangeEndPos);
         final MutableBoolean entryFound = new MutableBoolean(false);
-        scanEntryLog(conf, logId, new EntryLogger.EntryLogScanner() {
+        scanEntryLog(conf, logId, new EntryLogScanner() {
             private MutableBoolean stopScanning = new MutableBoolean(false);
 
             @Override
@@ -220,7 +221,7 @@ public class ReadLogCommand extends BookieCommand<ReadLogCommand.ReadLogFlags> {
      * @param logId   Entry Log Id
      * @param scanner Entry Log Scanner
      */
-    private void scanEntryLog(ServerConfiguration conf, long logId, EntryLogger.EntryLogScanner scanner)
+    private void scanEntryLog(ServerConfiguration conf, long logId, EntryLogScanner scanner)
         throws IOException {
         initEntryLogger(conf);
         entryLogger.scanEntryLog(logId, scanner);
@@ -229,7 +230,7 @@ public class ReadLogCommand extends BookieCommand<ReadLogCommand.ReadLogFlags> {
     private synchronized void initEntryLogger(ServerConfiguration conf) throws IOException {
         if (null == entryLogger) {
             // provide read only entry logger
-            entryLogger = new ReadOnlyEntryLogger(conf);
+            entryLogger = new ReadOnlyDefaultEntryLogger(conf);
         }
     }
 
@@ -248,7 +249,7 @@ public class ReadLogCommand extends BookieCommand<ReadLogCommand.ReadLogFlags> {
         LOG.info("Scan entry log {} ({}.log) for LedgerId {} {}", logId, Long.toHexString(logId), ledgerId,
             ((entryId == -1) ? "" : " for EntryId " + entryId));
         final MutableBoolean entryFound = new MutableBoolean(false);
-        scanEntryLog(conf, logId, new EntryLogger.EntryLogScanner() {
+        scanEntryLog(conf, logId, new EntryLogScanner() {
             @Override
             public boolean accept(long candidateLedgerId) {
                 return ((candidateLedgerId == ledgerId) && ((!entryFound.booleanValue()) || (entryId == -1)));
@@ -281,7 +282,7 @@ public class ReadLogCommand extends BookieCommand<ReadLogCommand.ReadLogFlags> {
      */
     private void scanEntryLog(ServerConfiguration conf, long logId, final boolean printMsg) throws Exception {
         LOG.info("Scan entry log {} ({}.log)", logId, Long.toHexString(logId));
-        scanEntryLog(conf, logId, new EntryLogger.EntryLogScanner() {
+        scanEntryLog(conf, logId, new EntryLogScanner() {
             @Override
             public boolean accept(long ledgerId) {
                 return true;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/ReadLogMetadataCommand.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/ReadLogMetadataCommand.java
index 987015cfca..b3a88af7e0 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/ReadLogMetadataCommand.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/ReadLogMetadataCommand.java
@@ -26,8 +26,8 @@ import java.io.IOException;
 import lombok.Setter;
 import lombok.experimental.Accessors;
 import org.apache.bookkeeper.bookie.EntryLogMetadata;
-import org.apache.bookkeeper.bookie.EntryLogger;
-import org.apache.bookkeeper.bookie.ReadOnlyEntryLogger;
+import org.apache.bookkeeper.bookie.ReadOnlyDefaultEntryLogger;
+import org.apache.bookkeeper.bookie.storage.EntryLogger;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.tools.cli.commands.bookie.ReadLogMetadataCommand.ReadLogMetadataFlags;
 import org.apache.bookkeeper.tools.cli.helpers.BookieCommand;
@@ -144,7 +144,7 @@ public class ReadLogMetadataCommand extends BookieCommand<ReadLogMetadataFlags>
     private synchronized void initEntryLogger(ServerConfiguration conf) throws IOException {
         // provide read only entry logger
         if (null == entryLogger) {
-            entryLogger = new ReadOnlyEntryLogger(conf);
+            entryLogger = new ReadOnlyDefaultEntryLogger(conf);
         }
     }
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
index 765d8256a7..9005fef748 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
@@ -35,6 +35,7 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import com.google.common.util.concurrent.UncheckedExecutionException;
+
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.buffer.UnpooledByteBufAllocator;
@@ -57,6 +58,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.bookkeeper.bookie.BookieException.EntryLogMetadataMapException;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
+import org.apache.bookkeeper.bookie.storage.CompactionEntryLog;
 import org.apache.bookkeeper.bookie.storage.ldb.PersistentEntryLogMetadataMap;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
@@ -74,16 +76,18 @@ import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.apache.bookkeeper.test.TestStatsProvider;
 import org.apache.bookkeeper.util.DiskChecker;
-import org.apache.bookkeeper.util.HardLink;
 import org.apache.bookkeeper.util.TestUtils;
 import org.apache.bookkeeper.versioning.Version;
 import org.apache.bookkeeper.versioning.Versioned;
 import org.apache.zookeeper.AsyncCallback;
+
 import org.junit.Before;
 import org.junit.Test;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 /**
  * This class tests the entry log compaction functionality.
  */
@@ -742,7 +746,7 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
 
         // Now, let's mark E1 as flushed, as its ledger L1 has been deleted already. In this case, the GC algorithm
         // should consider it for deletion.
-        getGCThread().entryLogger.recentlyCreatedEntryLogsStatus.flushRotatedEntryLog(1L);
+        ((DefaultEntryLogger) getGCThread().entryLogger).recentlyCreatedEntryLogsStatus.flushRotatedEntryLog(1L);
         getGCThread().triggerGC(true, false, false).get();
         assertTrue("Found entry log file 1.log that should have been compacted in ledgerDirectory: "
                 + tmpDirs.getDirs().get(0), TestUtils.hasNoneLogFiles(tmpDirs.getDirs().get(0), 1));
@@ -752,7 +756,7 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
         getGCThread().triggerGC(true, false, false).get();
         assertTrue("Found entry log file 0.log that should not have been compacted in ledgerDirectory: "
                 + tmpDirs.getDirs().get(0), TestUtils.hasAllLogFiles(tmpDirs.getDirs().get(0), 0));
-        getGCThread().entryLogger.recentlyCreatedEntryLogsStatus.flushRotatedEntryLog(0L);
+        ((DefaultEntryLogger) getGCThread().entryLogger).recentlyCreatedEntryLogsStatus.flushRotatedEntryLog(0L);
         getGCThread().triggerGC(true, false, false).get();
         assertTrue("Found entry log file 0.log that should have been compacted in ledgerDirectory: "
                 + tmpDirs.getDirs().get(0), TestUtils.hasNoneLogFiles(tmpDirs.getDirs().get(0), 0));
@@ -1285,7 +1289,7 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
         BookieImpl.checkDirectoryStructure(curDir);
         conf.setLedgerDirNames(new String[] {tmpDir.toString()});
 
-        conf.setEntryLogSizeLimit(EntryLogger.LOGFILE_HEADER_SIZE + 3 * (4 + ENTRY_SIZE));
+        conf.setEntryLogSizeLimit(DefaultEntryLogger.LOGFILE_HEADER_SIZE + 3 * (4 + ENTRY_SIZE));
         conf.setGcWaitTime(100);
         conf.setMinorCompactionThreshold(0.7f);
         conf.setMajorCompactionThreshold(0.0f);
@@ -1881,30 +1885,31 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
             super(gcThread.conf,
                   gcThread.entryLogger,
                   gcThread.ledgerStorage,
-                    (long entry) -> {
-                        try {
-                            gcThread.removeEntryLog(entry);
-                        } catch (EntryLogMetadataMapException e) {
-                            LOG.warn("Failed to remove entry-log metadata {}", entry, e);
-                        }
-                    });
+                  gcThread.ledgerDirsManager,
+                  (long entry) -> {
+                      try {
+                          gcThread.removeEntryLog(entry);
+                      } catch (EntryLogMetadataMapException e) {
+                          LOG.warn("Failed to remove entry-log metadata {}", entry, e);
+                      }
+                  });
         }
 
-        synchronized void compactWithIndexFlushFailure(EntryLogMetadata metadata) {
+        synchronized void compactWithIndexFlushFailure(EntryLogMetadata metadata) throws IOException {
             LOG.info("Compacting entry log {}.", metadata.getEntryLogId());
-            CompactionPhase scanEntryLog = new ScanEntryLogPhase(metadata);
+            CompactionEntryLog compactionLog = entryLogger.newCompactionLog(metadata.getEntryLogId());
+
+            CompactionPhase scanEntryLog = new ScanEntryLogPhase(metadata, compactionLog);
             if (!scanEntryLog.run()) {
                 LOG.info("Compaction for {} end in ScanEntryLogPhase.", metadata.getEntryLogId());
                 return;
             }
-            File compactionLogFile = entryLogger.getCurCompactionLogFile();
-            CompactionPhase flushCompactionLog = new FlushCompactionLogPhase(metadata.getEntryLogId());
+            CompactionPhase flushCompactionLog = new FlushCompactionLogPhase(compactionLog);
             if (!flushCompactionLog.run()) {
                 LOG.info("Compaction for {} end in FlushCompactionLogPhase.", metadata.getEntryLogId());
                 return;
             }
-            File compactedLogFile = getCompactedLogFile(compactionLogFile, metadata.getEntryLogId());
-            CompactionPhase partialFlushIndexPhase = new PartialFlushIndexPhase(compactedLogFile);
+            CompactionPhase partialFlushIndexPhase = new PartialFlushIndexPhase(compactionLog);
             if (!partialFlushIndexPhase.run()) {
                 LOG.info("Compaction for {} end in PartialFlushIndexPhase.", metadata.getEntryLogId());
                 return;
@@ -1913,21 +1918,21 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
             LOG.info("Compacted entry log : {}.", metadata.getEntryLogId());
         }
 
-        synchronized void compactWithLogFlushFailure(EntryLogMetadata metadata) {
+        synchronized void compactWithLogFlushFailure(EntryLogMetadata metadata) throws IOException {
             LOG.info("Compacting entry log {}", metadata.getEntryLogId());
-            CompactionPhase scanEntryLog = new ScanEntryLogPhase(metadata);
+            CompactionEntryLog compactionLog = entryLogger.newCompactionLog(metadata.getEntryLogId());
+
+            CompactionPhase scanEntryLog = new ScanEntryLogPhase(metadata, compactionLog);
             if (!scanEntryLog.run()) {
                 LOG.info("Compaction for {} end in ScanEntryLogPhase.", metadata.getEntryLogId());
                 return;
             }
-            File compactionLogFile = entryLogger.getCurCompactionLogFile();
-            CompactionPhase logFlushFailurePhase = new LogFlushFailurePhase(metadata.getEntryLogId());
+            CompactionPhase logFlushFailurePhase = new LogFlushFailurePhase(compactionLog);
             if (!logFlushFailurePhase.run()) {
                 LOG.info("Compaction for {} end in FlushCompactionLogPhase.", metadata.getEntryLogId());
                 return;
             }
-            File compactedLogFile = getCompactedLogFile(compactionLogFile, metadata.getEntryLogId());
-            CompactionPhase updateIndex = new UpdateIndexPhase(compactedLogFile);
+            CompactionPhase updateIndex = new UpdateIndexPhase(compactionLog);
             if (!updateIndex.run()) {
                 LOG.info("Compaction for entry log {} end in UpdateIndexPhase.", metadata.getEntryLogId());
                 return;
@@ -1938,45 +1943,35 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
 
         private class PartialFlushIndexPhase extends UpdateIndexPhase {
 
-            public PartialFlushIndexPhase(File compactedLogFile) {
-                super(compactedLogFile);
+            public PartialFlushIndexPhase(CompactionEntryLog compactionLog) {
+                super(compactionLog);
             }
 
             @Override
             void start() throws IOException {
-                if (compactedLogFile != null && compactedLogFile.exists()) {
-                    File dir = compactedLogFile.getParentFile();
-                    String compactedFilename = compactedLogFile.getName();
-                    // create a hard link "x.log" for file "x.log.y.compacted"
-                    this.newEntryLogFile = new File(dir, compactedFilename.substring(0,
-                                compactedFilename.indexOf(".log") + 4));
-                    File hardlinkFile = new File(dir, newEntryLogFile.getName());
-                    if (!hardlinkFile.exists()) {
-                        HardLink.createHardLink(compactedLogFile, hardlinkFile);
-                    }
-                    assertTrue(offsets.size() > 1);
-                    // only flush index for one entry location
-                    EntryLocation el = offsets.get(0);
-                    ledgerStorage.updateEntriesLocations(offsets);
-                    ledgerStorage.flushEntriesLocationsIndex();
-                    throw new IOException("Flush ledger index encounter exception");
-                }
+                compactionLog.makeAvailable();
+                assertTrue(offsets.size() > 1);
+                // only flush index for one entry location
+                EntryLocation el = offsets.get(0);
+                ledgerStorage.updateEntriesLocations(offsets);
+                ledgerStorage.flushEntriesLocationsIndex();
+                throw new IOException("Flush ledger index encounter exception");
             }
         }
 
         private class LogFlushFailurePhase extends FlushCompactionLogPhase {
 
-            LogFlushFailurePhase(long compactingLogId) {
-                super(compactingLogId);
+            LogFlushFailurePhase(CompactionEntryLog compactionEntryLog) {
+                super(compactionEntryLog);
             }
 
             @Override
             void start() throws IOException {
                 // flush the current compaction log
-                entryLogger.flushCompactionLog();
+                compactionLog.flush();
                 throw new IOException("Encounter IOException when trying to flush compaction log");
             }
         }
     }
 
-}
\ No newline at end of file
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java
index 215e15942d..b2699e5e57 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java
@@ -40,8 +40,6 @@ import java.util.concurrent.locks.Lock;
 import java.util.stream.IntStream;
 
 import org.apache.bookkeeper.bookie.EntryLogManagerForEntryLogPerLedger.BufferedLogChannelWithDirInfo;
-import org.apache.bookkeeper.bookie.EntryLogger.BufferedLogChannel;
-import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.conf.TestBKConfiguration;
 import org.apache.bookkeeper.stats.Counter;
@@ -122,7 +120,7 @@ public class CreateNewLogTest {
         File newLogFile = new File(dir, logFileName);
         newLogFile.createNewFile();
 
-        EntryLogger el = new EntryLogger(conf, ledgerDirsManager);
+        DefaultEntryLogger el = new DefaultEntryLogger(conf, ledgerDirsManager);
         // Calls createNewLog, and with the number of directories we
         // are using, if it picks one at random it will fail.
         EntryLogManagerForSingleEntryLog entryLogManager = (EntryLogManagerForSingleEntryLog) el.getEntryLogManager();
@@ -154,7 +152,7 @@ public class CreateNewLogTest {
             ledgerDirsManager.addToFilledDirs(tdir);
         }
 
-        EntryLogger el = new EntryLogger(conf, ledgerDirsManager);
+        DefaultEntryLogger el = new DefaultEntryLogger(conf, ledgerDirsManager);
         // Calls createNewLog, and with the number of directories we
         // are using, if it picks one at random it will fail.
         EntryLogManagerForSingleEntryLog entryLogManager = (EntryLogManagerForSingleEntryLog) el.getEntryLogManager();
@@ -191,7 +189,7 @@ public class CreateNewLogTest {
         conf.setEntryLogPerLedgerEnabled(true);
         LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
                 new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
-        EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager);
+        DefaultEntryLogger entryLogger = new DefaultEntryLogger(conf, ledgerDirsManager);
         EntryLoggerAllocator entryLoggerAllocator = entryLogger.entryLoggerAllocator;
         EntryLogManagerForEntryLogPerLedger entryLogManager = (EntryLogManagerForEntryLogPerLedger) entryLogger
                 .getEntryLogManager();
@@ -244,7 +242,7 @@ public class CreateNewLogTest {
                 expectedPreAllocatedLogID, entryLoggerAllocator.getPreallocatedLogId());
         Assert.assertEquals("Number of current ", numOfLedgers,
                 entryLogManager.getCopyOfCurrentLogs().size());
-        List<BufferedLogChannel> rotatedLogChannels = entryLogManager.getRotatedLogChannels();
+        List<DefaultEntryLogger.BufferedLogChannel> rotatedLogChannels = entryLogManager.getRotatedLogChannels();
         Assert.assertEquals("Number of LogChannels rotated", 1, rotatedLogChannels.size());
         Assert.assertEquals("Rotated logchannel logid", rotatedLedger, rotatedLogChannels.iterator().next().getLogId());
         entryLogger.flush();
@@ -299,7 +297,7 @@ public class CreateNewLogTest {
         conf.setEntryLogPerLedgerEnabled(true);
         LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
                 new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
-        EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager);
+        DefaultEntryLogger entryLogger = new DefaultEntryLogger(conf, ledgerDirsManager);
         EntryLoggerAllocator entryLoggerAllocator = entryLogger.entryLoggerAllocator;
         EntryLogManagerForEntryLogPerLedger entryLogManager = (EntryLogManagerForEntryLogPerLedger)
                 entryLogger.getEntryLogManager();
@@ -332,7 +330,7 @@ public class CreateNewLogTest {
         File nonFilledLedgerDir = BookieImpl.getCurrentDirectory(new File(ledgerDirs[numDirs - 1]));
 
         entryLogManager.createNewLog(ledgerId);
-        BufferedLogChannel newLogChannel = entryLogManager.getCurrentLogForLedger(ledgerId);
+        DefaultEntryLogger.BufferedLogChannel newLogChannel = entryLogManager.getCurrentLogForLedger(ledgerId);
         Assert.assertEquals("Directory of newly created BufferedLogChannel file", nonFilledLedgerDir.getAbsolutePath(),
                 newLogChannel.getLogFile().getParentFile().getAbsolutePath());
 
@@ -357,7 +355,7 @@ public class CreateNewLogTest {
         conf.setEntryLogPerLedgerEnabled(true);
         LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
                 new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
-        EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager);
+        DefaultEntryLogger entryLogger = new DefaultEntryLogger(conf, ledgerDirsManager);
         EntryLogManagerForEntryLogPerLedger entrylogManager = (EntryLogManagerForEntryLogPerLedger)
                 entryLogger.getEntryLogManager();
 
@@ -422,7 +420,7 @@ public class CreateNewLogTest {
         LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
                 new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
 
-        EntryLogger el = new EntryLogger(conf, ledgerDirsManager);
+        DefaultEntryLogger el = new DefaultEntryLogger(conf, ledgerDirsManager);
         EntryLogManagerBase entryLogManager = (EntryLogManagerBase) el.getEntryLogManager();
         // set same thread executor for entryLoggerAllocator's allocatorExecutor
         setSameThreadExecutorForEntryLoggerAllocator(el.getEntryLoggerAllocator());
@@ -467,7 +465,7 @@ public class CreateNewLogTest {
         LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
                 new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
 
-        EntryLogger el = new EntryLogger(conf, ledgerDirsManager);
+        DefaultEntryLogger el = new DefaultEntryLogger(conf, ledgerDirsManager);
         EntryLogManagerBase entryLogManagerBase = (EntryLogManagerBase) el.getEntryLogManager();
         entryLogManagerBase.createNewLog(0L);
 
@@ -506,7 +504,7 @@ public class CreateNewLogTest {
         conf.setEntryLogFilePreAllocationEnabled(true);
         LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
                 new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
-        EntryLogger el = new EntryLogger(conf, ledgerDirsManager);
+        DefaultEntryLogger el = new DefaultEntryLogger(conf, ledgerDirsManager);
         // set same thread executor for entryLoggerAllocator's allocatorExecutor
         setSameThreadExecutorForEntryLoggerAllocator(el.getEntryLoggerAllocator());
         AtomicBoolean receivedException = new AtomicBoolean(false);
@@ -516,7 +514,7 @@ public class CreateNewLogTest {
                 if (i % 2 == 0) {
                     ((EntryLogManagerBase) el.getEntryLogManager()).createNewLog((long) i);
                 } else {
-                    el.createNewCompactionLog();
+                    el.newCompactionLog(i);
                 }
             } catch (IOException e) {
                 LOG.error("Received exception while creating newLog", e);
@@ -544,7 +542,7 @@ public class CreateNewLogTest {
         conf.setEntryLogPerLedgerEnabled(true);
         LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
                 new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
-        EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager);
+        DefaultEntryLogger entryLogger = new DefaultEntryLogger(conf, ledgerDirsManager);
         EntryLogManagerForEntryLogPerLedger entrylogManager = (EntryLogManagerForEntryLogPerLedger)
                 entryLogger.getEntryLogManager();
 
@@ -609,7 +607,7 @@ public class CreateNewLogTest {
         conf.setEntryLogPerLedgerCounterLimitsMultFactor(entryLogPerLedgerCounterLimitsMultFactor);
         LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
                 new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
-        EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager, null, statsLogger,
+        DefaultEntryLogger entryLogger = new DefaultEntryLogger(conf, ledgerDirsManager, null, statsLogger,
                 UnpooledByteBufAllocator.DEFAULT);
         EntryLogManagerForEntryLogPerLedger entrylogManager = (EntryLogManagerForEntryLogPerLedger) entryLogger
                 .getEntryLogManager();
@@ -735,7 +733,7 @@ public class CreateNewLogTest {
         conf.setEntryLogPerLedgerCounterLimitsMultFactor(entryLogPerLedgerCounterLimitsMultFactor);
         LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
                 new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
-        EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager, null, statsLogger,
+        DefaultEntryLogger entryLogger = new DefaultEntryLogger(conf, ledgerDirsManager, null, statsLogger,
                 UnpooledByteBufAllocator.DEFAULT);
         EntryLogManagerForEntryLogPerLedger entrylogManager = (EntryLogManagerForEntryLogPerLedger) entryLogger
                 .getEntryLogManager();
@@ -814,7 +812,7 @@ public class CreateNewLogTest {
             }
         };
 
-        EntryLogger el = new EntryLogger(conf, ledgerDirsManager);
+        DefaultEntryLogger el = new DefaultEntryLogger(conf, ledgerDirsManager);
         EntryLogManagerForEntryLogPerLedger entryLogManager = (EntryLogManagerForEntryLogPerLedger) el
                 .getEntryLogManager();
 
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/DefaultEntryLogTest.java
similarity index 93%
rename from bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
rename to bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/DefaultEntryLogTest.java
index e3627f40ab..0898748b1e 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/DefaultEntryLogTest.java
@@ -55,7 +55,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLongArray;
 import java.util.concurrent.locks.Lock;
 
-import org.apache.bookkeeper.bookie.EntryLogger.BufferedLogChannel;
+import org.apache.bookkeeper.bookie.DefaultEntryLogger.BufferedLogChannel;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
 import org.apache.bookkeeper.common.testing.annotations.FlakyTest;
 import org.apache.bookkeeper.conf.ServerConfiguration;
@@ -78,8 +78,8 @@ import org.slf4j.LoggerFactory;
  * Tests for EntryLog.
  */
 @FixMethodOrder(MethodSorters.NAME_ASCENDING)
-public class EntryLogTest {
-    private static final Logger LOG = LoggerFactory.getLogger(EntryLogTest.class);
+public class DefaultEntryLogTest {
+    private static final Logger LOG = LoggerFactory.getLogger(DefaultEntryLogTest.class);
 
     final List<File> tempDirs = new ArrayList<File>();
     final Random rand = new Random();
@@ -94,7 +94,7 @@ public class EntryLogTest {
     private File curDir;
     private ServerConfiguration conf;
     private LedgerDirsManager dirsMgr;
-    private EntryLogger entryLogger;
+    private DefaultEntryLogger entryLogger;
 
     @Before
     public void setUp() throws Exception {
@@ -108,13 +108,13 @@ public class EntryLogTest {
             new DiskChecker(
                 conf.getDiskUsageThreshold(),
                 conf.getDiskUsageWarnThreshold()));
-        this.entryLogger = new EntryLogger(conf, dirsMgr);
+        this.entryLogger = new DefaultEntryLogger(conf, dirsMgr);
     }
 
     @After
     public void tearDown() throws Exception {
         if (null != this.entryLogger) {
-            entryLogger.shutdown();
+            entryLogger.close();
         }
 
         for (File dir : tempDirs) {
@@ -125,7 +125,7 @@ public class EntryLogTest {
 
     @Test
     public void testDeferCreateNewLog() throws Exception {
-        entryLogger.shutdown();
+        entryLogger.close();
 
         // mark `curDir` as filled
         this.conf.setMinUsableSizeForEntryLogCreation(1);
@@ -137,10 +137,10 @@ public class EntryLogTest {
                 conf.getDiskUsageWarnThreshold()));
         this.dirsMgr.addToFilledDirs(curDir);
 
-        entryLogger = new EntryLogger(conf, dirsMgr);
+        entryLogger = new DefaultEntryLogger(conf, dirsMgr);
         EntryLogManagerForSingleEntryLog entryLogManager =
                 (EntryLogManagerForSingleEntryLog) entryLogger.getEntryLogManager();
-        assertEquals(EntryLogger.UNINITIALIZED_LOG_ID, entryLogManager.getCurrentLogId());
+        assertEquals(DefaultEntryLogger.UNINITIALIZED_LOG_ID, entryLogManager.getCurrentLogId());
 
         // add the first entry will trigger file creation
         entryLogger.addEntry(1L, generateEntry(1, 1).nioBuffer());
@@ -149,7 +149,7 @@ public class EntryLogTest {
 
     @Test
     public void testDeferCreateNewLogWithoutEnoughDiskSpaces() throws Exception {
-        entryLogger.shutdown();
+        entryLogger.close();
 
         // mark `curDir` as filled
         this.conf.setMinUsableSizeForEntryLogCreation(Long.MAX_VALUE);
@@ -161,17 +161,17 @@ public class EntryLogTest {
                 conf.getDiskUsageWarnThreshold()));
         this.dirsMgr.addToFilledDirs(curDir);
 
-        entryLogger = new EntryLogger(conf, dirsMgr);
+        entryLogger = new DefaultEntryLogger(conf, dirsMgr);
         EntryLogManagerForSingleEntryLog entryLogManager =
                 (EntryLogManagerForSingleEntryLog) entryLogger.getEntryLogManager();
-        assertEquals(EntryLogger.UNINITIALIZED_LOG_ID, entryLogManager.getCurrentLogId());
+        assertEquals(DefaultEntryLogger.UNINITIALIZED_LOG_ID, entryLogManager.getCurrentLogId());
 
         // add the first entry will trigger file creation
         try {
             entryLogger.addEntry(1L, generateEntry(1, 1).nioBuffer());
             fail("Should fail to append entry if there is no enough reserved space left");
         } catch (NoWritableLedgerDirException e) {
-            assertEquals(EntryLogger.UNINITIALIZED_LOG_ID, entryLogManager.getCurrentLogId());
+            assertEquals(DefaultEntryLogger.UNINITIALIZED_LOG_ID, entryLogManager.getCurrentLogId());
         }
     }
 
@@ -182,14 +182,14 @@ public class EntryLogTest {
         entryLogger.addEntry(3L, generateEntry(3, 1).nioBuffer());
         entryLogger.addEntry(2L, generateEntry(2, 1).nioBuffer());
         entryLogger.flush();
-        entryLogger.shutdown();
+        entryLogger.close();
         // now lets truncate the file to corrupt the last entry, which simulates a partial write
         File f = new File(curDir, "0.log");
         RandomAccessFile raf = new RandomAccessFile(f, "rw");
         raf.setLength(raf.length() - 10);
         raf.close();
         // now see which ledgers are in the log
-        entryLogger = new EntryLogger(conf, dirsMgr);
+        entryLogger = new DefaultEntryLogger(conf, dirsMgr);
 
         EntryLogMetadata meta = entryLogger.getEntryLogMetadata(0L);
         LOG.info("Extracted Meta From Entry Log {}", meta);
@@ -230,12 +230,12 @@ public class EntryLogTest {
         for (int i = 0; i < numLogs; i++) {
             positions[i] = new long[numEntries];
 
-            EntryLogger logger = new EntryLogger(conf, dirsMgr);
+            DefaultEntryLogger logger = new DefaultEntryLogger(conf, dirsMgr);
             for (int j = 0; j < numEntries; j++) {
                 positions[i][j] = logger.addEntry((long) i, generateEntry(i, j).nioBuffer());
             }
             logger.flush();
-            logger.shutdown();
+            logger.close();
         }
         // delete last log id
         File lastLogId = new File(curDir, "lastId");
@@ -245,15 +245,15 @@ public class EntryLogTest {
         for (int i = numLogs; i < 2 * numLogs; i++) {
             positions[i] = new long[numEntries];
 
-            EntryLogger logger = new EntryLogger(conf, dirsMgr);
+            DefaultEntryLogger logger = new DefaultEntryLogger(conf, dirsMgr);
             for (int j = 0; j < numEntries; j++) {
                 positions[i][j] = logger.addEntry((long) i, generateEntry(i, j).nioBuffer());
             }
             logger.flush();
-            logger.shutdown();
+            logger.close();
         }
 
-        EntryLogger newLogger = new EntryLogger(conf, dirsMgr);
+        DefaultEntryLogger newLogger = new DefaultEntryLogger(conf, dirsMgr);
         for (int i = 0; i < (2 * numLogs + 1); i++) {
             File logFile = new File(curDir, Long.toHexString(i) + ".log");
             assertTrue(logFile.exists());
@@ -281,9 +281,9 @@ public class EntryLogTest {
     public void testEntryLoggerShouldThrowFNFEIfDirectoriesDoesNotExist()
             throws Exception {
         File tmpDir = createTempDir("bkTest", ".dir");
-        EntryLogger entryLogger = null;
+        DefaultEntryLogger entryLogger = null;
         try {
-            entryLogger = new EntryLogger(conf, new LedgerDirsManager(conf, new File[] { tmpDir },
+            entryLogger = new DefaultEntryLogger(conf, new LedgerDirsManager(conf, new File[] { tmpDir },
                     new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold())));
             fail("Expecting FileNotFoundException");
         } catch (FileNotFoundException e) {
@@ -291,7 +291,7 @@ public class EntryLogTest {
                     .getLocalizedMessage());
         } finally {
             if (entryLogger != null) {
-                entryLogger.shutdown();
+                entryLogger.close();
             }
         }
     }
@@ -309,7 +309,7 @@ public class EntryLogTest {
         conf.setLedgerDirNames(new String[] { ledgerDir1.getAbsolutePath(),
                 ledgerDir2.getAbsolutePath() });
         BookieImpl bookie = new TestBookieImpl(conf);
-        EntryLogger entryLogger = new EntryLogger(conf,
+        DefaultEntryLogger entryLogger = new DefaultEntryLogger(conf,
                 bookie.getLedgerDirsManager());
         InterleavedLedgerStorage ledgerStorage =
                 ((InterleavedLedgerStorage) bookie.ledgerStorage.getUnderlyingLedgerStorage());
@@ -323,7 +323,7 @@ public class EntryLogTest {
         ledgerStorage.addEntry(generateEntry(2, 1));
         // Add entry with disk full failure simulation
         bookie.getLedgerDirsManager().addToFilledDirs(((EntryLogManagerBase) entryLogger.getEntryLogManager())
-                .getCurrentLogForLedger(EntryLogger.UNASSIGNED_LEDGERID).getLogFile().getParentFile());
+                .getCurrentLogForLedger(DefaultEntryLogger.UNASSIGNED_LEDGERID).getLogFile().getParentFile());
         ledgerStorage.addEntry(generateEntry(3, 1));
         // Verify written entries
         Assert.assertTrue(0 == generateEntry(1, 1).compareTo(ledgerStorage.getEntry(1, 1)));
@@ -343,7 +343,7 @@ public class EntryLogTest {
         entryLogger.addEntry(1L, generateEntry(1, 2).nioBuffer());
 
         EntryLogManagerBase entryLogManager = (EntryLogManagerBase) entryLogger.getEntryLogManager();
-        entryLogManager.createNewLog(EntryLogger.UNASSIGNED_LEDGERID);
+        entryLogManager.createNewLog(DefaultEntryLogger.UNASSIGNED_LEDGERID);
         entryLogManager.flushRotatedLogs();
 
         EntryLogMetadata meta = entryLogger.extractEntryLogMetadataFromIndex(0L);
@@ -366,19 +366,19 @@ public class EntryLogTest {
         entryLogger.addEntry(3L, generateEntry(3, 1).nioBuffer());
         entryLogger.addEntry(2L, generateEntry(2, 1).nioBuffer());
         entryLogger.addEntry(1L, generateEntry(1, 2).nioBuffer());
-        ((EntryLogManagerBase) entryLogger.getEntryLogManager()).createNewLog(EntryLogger.UNASSIGNED_LEDGERID);
-        entryLogger.shutdown();
+        ((EntryLogManagerBase) entryLogger.getEntryLogManager()).createNewLog(DefaultEntryLogger.UNASSIGNED_LEDGERID);
+        entryLogger.close();
 
         // Rewrite the entry log header to be on V0 format
         File f = new File(curDir, "0.log");
         RandomAccessFile raf = new RandomAccessFile(f, "rw");
-        raf.seek(EntryLogger.HEADER_VERSION_POSITION);
+        raf.seek(DefaultEntryLogger.HEADER_VERSION_POSITION);
         // Write zeros to indicate V0 + no ledgers map info
         raf.write(new byte[4 + 8]);
         raf.close();
 
         // now see which ledgers are in the log
-        entryLogger = new EntryLogger(conf, dirsMgr);
+        entryLogger = new DefaultEntryLogger(conf, dirsMgr);
 
         try {
             entryLogger.extractEntryLogMetadataFromIndex(0L);
@@ -404,25 +404,25 @@ public class EntryLogTest {
      */
     @Test
     public void testPreAllocateLog() throws Exception {
-        entryLogger.shutdown();
+        entryLogger.close();
 
         // enable pre-allocation case
         conf.setEntryLogFilePreAllocationEnabled(true);
 
-        entryLogger = new EntryLogger(conf, dirsMgr);
+        entryLogger = new DefaultEntryLogger(conf, dirsMgr);
         // create a logger whose initialization phase allocating a new entry log
-        ((EntryLogManagerBase) entryLogger.getEntryLogManager()).createNewLog(EntryLogger.UNASSIGNED_LEDGERID);
+        ((EntryLogManagerBase) entryLogger.getEntryLogManager()).createNewLog(DefaultEntryLogger.UNASSIGNED_LEDGERID);
         assertNotNull(entryLogger.getEntryLoggerAllocator().getPreallocationFuture());
 
         entryLogger.addEntry(1L, generateEntry(1, 1).nioBuffer());
         // the Future<BufferedLogChannel> is not null all the time
         assertNotNull(entryLogger.getEntryLoggerAllocator().getPreallocationFuture());
-        entryLogger.shutdown();
+        entryLogger.close();
 
         // disable pre-allocation case
         conf.setEntryLogFilePreAllocationEnabled(false);
         // create a logger
-        entryLogger = new EntryLogger(conf, dirsMgr);
+        entryLogger = new DefaultEntryLogger(conf, dirsMgr);
         assertNull(entryLogger.getEntryLoggerAllocator().getPreallocationFuture());
 
         entryLogger.addEntry(2L, generateEntry(1, 1).nioBuffer());
@@ -440,13 +440,13 @@ public class EntryLogTest {
         EntryLogManagerBase entryLogManagerBase = ((EntryLogManagerBase) entryLogger.getEntryLogManager());
         assertEquals(Sets.newHashSet(), entryLogger.getEntryLogsSet());
 
-        entryLogManagerBase.createNewLog(EntryLogger.UNASSIGNED_LEDGERID);
+        entryLogManagerBase.createNewLog(DefaultEntryLogger.UNASSIGNED_LEDGERID);
         entryLogManagerBase.flushRotatedLogs();
 
         Thread.sleep(2000);
         assertEquals(Sets.newHashSet(0L, 1L), entryLogger.getEntryLogsSet());
 
-        entryLogManagerBase.createNewLog(EntryLogger.UNASSIGNED_LEDGERID);
+        entryLogManagerBase.createNewLog(DefaultEntryLogger.UNASSIGNED_LEDGERID);
         entryLogManagerBase.flushRotatedLogs();
 
         assertEquals(Sets.newHashSet(0L, 1L, 2L), entryLogger.getEntryLogsSet());
@@ -462,7 +462,7 @@ public class EntryLogTest {
      */
     @Test
     public void testFlushOrder() throws Exception {
-        entryLogger.shutdown();
+        entryLogger.close();
 
         int logSizeLimit = 256 * 1024;
         conf.setEntryLogPerLedgerEnabled(false);
@@ -470,7 +470,7 @@ public class EntryLogTest {
         conf.setFlushIntervalInBytes(0);
         conf.setEntryLogSizeLimit(logSizeLimit);
 
-        entryLogger = new EntryLogger(conf, dirsMgr);
+        entryLogger = new DefaultEntryLogger(conf, dirsMgr);
         EntryLogManagerBase entryLogManager = (EntryLogManagerBase) entryLogger.getEntryLogManager();
         AtomicBoolean exceptionHappened = new AtomicBoolean(false);
 
@@ -487,7 +487,7 @@ public class EntryLogTest {
         addEntriesAndRotateLogs(entryLogger, 30);
 
         rotatedLogChannels = new LinkedList<BufferedLogChannel>(entryLogManager.getRotatedLogChannels());
-        currentActiveChannel = entryLogManager.getCurrentLogForLedger(EntryLogger.UNASSIGNED_LEDGERID);
+        currentActiveChannel = entryLogManager.getCurrentLogForLedger(DefaultEntryLogger.UNASSIGNED_LEDGERID);
         long currentActiveChannelUnpersistedBytes = currentActiveChannel.getUnpersistedBytes();
 
         Thread flushThread = new Thread(new Runnable() {
@@ -512,7 +512,7 @@ public class EntryLogTest {
                      * here we are adding entry of size logSizeLimit with
                      * rolllog=true, so it would create a new entrylog.
                      */
-                    entryLogger.addEntry(123, generateEntry(123, 456, logSizeLimit), true);
+                    entryLogger.addEntry(123, generateEntry(123, 456, logSizeLimit));
                 } catch (InterruptedException | BrokenBarrierException | IOException e) {
                     LOG.error("Exception happened for entryLogManager.createNewLog", e);
                     exceptionHappened.set(true);
@@ -551,18 +551,18 @@ public class EntryLogTest {
         }
     }
 
-    void addEntriesAndRotateLogs(EntryLogger entryLogger, int numOfRotations)
+    void addEntriesAndRotateLogs(DefaultEntryLogger entryLogger, int numOfRotations)
             throws IOException {
         EntryLogManagerBase entryLogManager = (EntryLogManagerBase) entryLogger.getEntryLogManager();
-        entryLogManager.setCurrentLogForLedgerAndAddToRotate(EntryLogger.UNASSIGNED_LEDGERID, null);
+        entryLogManager.setCurrentLogForLedgerAndAddToRotate(DefaultEntryLogger.UNASSIGNED_LEDGERID, null);
         for (int i = 0; i < numOfRotations; i++) {
             addEntries(entryLogger, 10);
-            entryLogManager.setCurrentLogForLedgerAndAddToRotate(EntryLogger.UNASSIGNED_LEDGERID, null);
+            entryLogManager.setCurrentLogForLedgerAndAddToRotate(DefaultEntryLogger.UNASSIGNED_LEDGERID, null);
         }
         addEntries(entryLogger, 10);
     }
 
-    void addEntries(EntryLogger entryLogger, int noOfEntries) throws IOException {
+    void addEntries(DefaultEntryLogger entryLogger, int noOfEntries) throws IOException {
         for (int j = 0; j < noOfEntries; j++) {
             int ledgerId = Math.abs(rand.nextInt());
             int entryId = Math.abs(rand.nextInt());
@@ -780,8 +780,8 @@ public class EntryLogTest {
         LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
                 new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
 
-        EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager);
-        EntryLogger.RecentEntryLogsStatus recentlyCreatedLogsStatus = entryLogger.recentlyCreatedEntryLogsStatus;
+        DefaultEntryLogger entryLogger = new DefaultEntryLogger(conf, ledgerDirsManager);
+        DefaultEntryLogger.RecentEntryLogsStatus recentlyCreatedLogsStatus = entryLogger.recentlyCreatedEntryLogsStatus;
 
         recentlyCreatedLogsStatus.createdEntryLog(0L);
         Assert.assertEquals("entryLogger's leastUnflushedLogId ", 0L, entryLogger.getLeastUnflushedLogId());
@@ -842,7 +842,7 @@ public class EntryLogTest {
         conf.setLedgerDirNames(createAndGetLedgerDirs(2));
         LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
                 new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
-        EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager);
+        DefaultEntryLogger entryLogger = new DefaultEntryLogger(conf, ledgerDirsManager);
         EntryLogManagerBase entryLogManagerBase = ((EntryLogManagerBase) entryLogger.getEntryLogManager());
 
         /*
@@ -852,7 +852,7 @@ public class EntryLogTest {
         int firstEntrySize = 1000;
         long entry0Position = entryLogger.addEntry(0L, generateEntry(ledgerId, 0L, firstEntrySize));
         // entrylogger writes length of the entry (4 bytes) before writing entry
-        long expectedUnpersistedBytes = EntryLogger.LOGFILE_HEADER_SIZE + firstEntrySize + 4;
+        long expectedUnpersistedBytes = DefaultEntryLogger.LOGFILE_HEADER_SIZE + firstEntrySize + 4;
         Assert.assertEquals("Unpersisted Bytes of entrylog", expectedUnpersistedBytes,
                 entryLogManagerBase.getCurrentLogForLedger(ledgerId).getUnpersistedBytes());
 
@@ -869,7 +869,7 @@ public class EntryLogTest {
          * newEntryLogger
          */
         conf.setEntryLogPerLedgerEnabled(false);
-        EntryLogger newEntryLogger = new EntryLogger(conf, ledgerDirsManager);
+        DefaultEntryLogger newEntryLogger = new DefaultEntryLogger(conf, ledgerDirsManager);
         EntryLogManager newEntryLogManager = newEntryLogger.getEntryLogManager();
         Assert.assertEquals("EntryLogManager class type", EntryLogManagerForSingleEntryLog.class,
                 newEntryLogManager.getClass());
@@ -900,7 +900,7 @@ public class EntryLogTest {
         LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
                 new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
 
-        EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager);
+        DefaultEntryLogger entryLogger = new DefaultEntryLogger(conf, ledgerDirsManager);
         EntryLogManagerForEntryLogPerLedger entryLogManager = (EntryLogManagerForEntryLogPerLedger) entryLogger
                 .getEntryLogManager();
 
@@ -969,12 +969,15 @@ public class EntryLogTest {
         }
     }
 
-    private EntryLogger.BufferedLogChannel createDummyBufferedLogChannel(EntryLogger entryLogger, long logid,
-            ServerConfiguration servConf) throws IOException {
+    private DefaultEntryLogger.BufferedLogChannel createDummyBufferedLogChannel(DefaultEntryLogger entryLogger,
+                                                                                long logid,
+                                                                                ServerConfiguration servConf)
+        throws IOException {
         File tmpFile = File.createTempFile("entrylog", logid + "");
         tmpFile.deleteOnExit();
         FileChannel fc = new RandomAccessFile(tmpFile, "rw").getChannel();
-        EntryLogger.BufferedLogChannel logChannel = new BufferedLogChannel(UnpooledByteBufAllocator.DEFAULT, fc, 10, 10,
+        DefaultEntryLogger.BufferedLogChannel logChannel =
+            new BufferedLogChannel(UnpooledByteBufAllocator.DEFAULT, fc, 10, 10,
                 logid, tmpFile, servConf.getFlushIntervalInBytes());
         return logChannel;
     }
@@ -1050,7 +1053,7 @@ public class EntryLogTest {
         LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
                 new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
 
-        EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager);
+        DefaultEntryLogger entryLogger = new DefaultEntryLogger(conf, ledgerDirsManager);
         EntryLogManagerForEntryLogPerLedger entryLogManager =
                 (EntryLogManagerForEntryLogPerLedger) entryLogger.getEntryLogManager();
 
@@ -1087,7 +1090,7 @@ public class EntryLogTest {
      */
     @Test
     public void testCacheMaximumSizeEvictionPolicy() throws Exception {
-        entryLogger.shutdown();
+        entryLogger.close();
         final int cacheMaximumSize = 20;
 
         ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
@@ -1098,7 +1101,7 @@ public class EntryLogTest {
         LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
                 new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
 
-        entryLogger = new EntryLogger(conf, ledgerDirsManager);
+        entryLogger = new DefaultEntryLogger(conf, ledgerDirsManager);
         EntryLogManagerForEntryLogPerLedger entryLogManager =
                 (EntryLogManagerForEntryLogPerLedger) entryLogger.getEntryLogManager();
 
@@ -1121,7 +1124,7 @@ public class EntryLogTest {
         LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
                 new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
 
-        EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager);
+        DefaultEntryLogger entryLogger = new DefaultEntryLogger(conf, ledgerDirsManager);
         EntryLogManagerForEntryLogPerLedger entryLogManager = (EntryLogManagerForEntryLogPerLedger) entryLogger
                 .getEntryLogManager();
 
@@ -1173,7 +1176,7 @@ public class EntryLogTest {
         LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
                 new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
 
-        EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager);
+        DefaultEntryLogger entryLogger = new DefaultEntryLogger(conf, ledgerDirsManager);
         EntryLogManagerForEntryLogPerLedger entryLogManager = (EntryLogManagerForEntryLogPerLedger) entryLogger
                 .getEntryLogManager();
 
@@ -1235,7 +1238,7 @@ public class EntryLogTest {
         LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
                 new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
 
-        EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager);
+        DefaultEntryLogger entryLogger = new DefaultEntryLogger(conf, ledgerDirsManager);
         EntryLogManagerForEntryLogPerLedger entryLogManager =
                 (EntryLogManagerForEntryLogPerLedger) entryLogger.getEntryLogManager();
 
@@ -1289,7 +1292,7 @@ public class EntryLogTest {
         LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
                 new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
 
-        EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager);
+        DefaultEntryLogger entryLogger = new DefaultEntryLogger(conf, ledgerDirsManager);
         EntryLogManagerForEntryLogPerLedger entryLogManager =
                 (EntryLogManagerForEntryLogPerLedger) entryLogger.getEntryLogManager();
 
@@ -1357,7 +1360,7 @@ public class EntryLogTest {
         conf.setLedgerDirNames(createAndGetLedgerDirs(2));
         LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
                 new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
-        EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager);
+        DefaultEntryLogger entryLogger = new DefaultEntryLogger(conf, ledgerDirsManager);
         EntryLogManagerBase entryLogManager = (EntryLogManagerBase) entryLogger.getEntryLogManager();
         Assert.assertEquals("EntryLogManager class type", EntryLogManagerForEntryLogPerLedger.class,
                 entryLogManager.getClass());
@@ -1374,7 +1377,7 @@ public class EntryLogTest {
         for (long i = 0; i < numOfActiveLedgers; i++) {
             BufferedLogChannel logChannel =  entryLogManager.getCurrentLogForLedger(i);
             Assert.assertTrue("unpersistedBytes should be greater than LOGFILE_HEADER_SIZE",
-                    logChannel.getUnpersistedBytes() > EntryLogger.LOGFILE_HEADER_SIZE);
+                    logChannel.getUnpersistedBytes() > DefaultEntryLogger.LOGFILE_HEADER_SIZE);
         }
 
         for (long i = 0; i < numOfActiveLedgers; i++) {
@@ -1395,8 +1398,8 @@ public class EntryLogTest {
          */
         for (long i = 0; i < numOfActiveLedgers; i++) {
             BufferedLogChannel logChannel = entryLogManager.getCurrentLogForLedger(i);
-            Assert.assertEquals("unpersistedBytes should be LOGFILE_HEADER_SIZE", EntryLogger.LOGFILE_HEADER_SIZE,
-                    logChannel.getUnpersistedBytes());
+            Assert.assertEquals("unpersistedBytes should be LOGFILE_HEADER_SIZE",
+                DefaultEntryLogger.LOGFILE_HEADER_SIZE, logChannel.getUnpersistedBytes());
         }
 
         for (int j = numEntries; j < 2 * numEntries; j++) {
@@ -1408,7 +1411,7 @@ public class EntryLogTest {
         for (long i = 0; i < numOfActiveLedgers; i++) {
             BufferedLogChannel logChannel =  entryLogManager.getCurrentLogForLedger(i);
             Assert.assertTrue("unpersistedBytes should be greater than LOGFILE_HEADER_SIZE",
-                    logChannel.getUnpersistedBytes() > EntryLogger.LOGFILE_HEADER_SIZE);
+                    logChannel.getUnpersistedBytes() > DefaultEntryLogger.LOGFILE_HEADER_SIZE);
         }
 
         Assert.assertEquals("LeastUnflushedloggerID", 0, entryLogger.getLeastUnflushedLogId());
@@ -1444,7 +1447,7 @@ public class EntryLogTest {
         LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
                 new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
 
-        EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager);
+        DefaultEntryLogger entryLogger = new DefaultEntryLogger(conf, ledgerDirsManager);
         EntryLogManagerBase entryLogManagerBase = ((EntryLogManagerBase) entryLogger.getEntryLogManager());
 
         int numOfActiveLedgers = 10;
@@ -1513,9 +1516,9 @@ public class EntryLogTest {
         long ledgerId;
         int entryId;
         long position;
-        EntryLogger entryLogger;
+        DefaultEntryLogger entryLogger;
 
-        ReadTask(long ledgerId, int entryId, long position, EntryLogger entryLogger) {
+        ReadTask(long ledgerId, int entryId, long position, DefaultEntryLogger entryLogger) {
             this.ledgerId = ledgerId;
             this.entryId = entryId;
             this.position = position;
@@ -1554,7 +1557,7 @@ public class EntryLogTest {
         LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
                 new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
 
-        EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager);
+        DefaultEntryLogger entryLogger = new DefaultEntryLogger(conf, ledgerDirsManager);
         int numOfActiveLedgers = 15;
         int numEntries = 2000;
         final AtomicLongArray positions = new AtomicLongArray(numOfActiveLedgers * numEntries);
@@ -1633,7 +1636,7 @@ public class EntryLogTest {
         LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
                 new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
 
-        EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager);
+        DefaultEntryLogger entryLogger = new DefaultEntryLogger(conf, ledgerDirsManager);
         EntryLogManagerForEntryLogPerLedger entryLogManager = (EntryLogManagerForEntryLogPerLedger)
                 entryLogger.getEntryLogManager();
         Assert.assertEquals("EntryLogManager class type", EntryLogManagerForEntryLogPerLedger.class,
@@ -1704,9 +1707,9 @@ public class EntryLogTest {
      * in this method we add an entry and validate the ledgerdir of the
      * currentLogForLedger against the provided expected ledgerDirs.
      */
-    void addEntryAndValidateFolders(EntryLogger entryLogger, EntryLogManagerBase entryLogManager, int entryId,
-            File expectedDirForLedger0, boolean equalsForLedger0, File expectedDirForLedger1,
-            File expectedDirForLedger2) throws IOException {
+    void addEntryAndValidateFolders(DefaultEntryLogger entryLogger, EntryLogManagerBase entryLogManager, int entryId,
+                                    File expectedDirForLedger0, boolean equalsForLedger0, File expectedDirForLedger1,
+                                    File expectedDirForLedger2) throws IOException {
         entryLogger.addEntry(0L, generateEntry(0, entryId));
         entryLogger.addEntry(1L, generateEntry(1, entryId));
         entryLogger.addEntry(2L, generateEntry(2, entryId));
@@ -1752,8 +1755,8 @@ public class EntryLogTest {
         LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
                 new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
 
-        EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager);
-        EntryLogManagerBase entryLogManager = (EntryLogManagerBase) entryLogger.getEntryLogManager();
+        DefaultEntryLogger defaultEntryLogger = new DefaultEntryLogger(conf, ledgerDirsManager);
+        EntryLogManagerBase entryLogManager = (EntryLogManagerBase) defaultEntryLogger.getEntryLogManager();
         Assert.assertEquals(
                 "EntryLogManager class type", initialEntryLogPerLedgerEnabled
                         ? EntryLogManagerForEntryLogPerLedger.class : EntryLogManagerForSingleEntryLog.class,
@@ -1771,7 +1774,7 @@ public class EntryLogTest {
          */
         for (int j = 0; j < numEntries; j++) {
             for (int i = 0; i < numOfActiveLedgers; i++) {
-                positions[i][j] = entryLogger.addEntry((long) i, generateEntry(i, j));
+                positions[i][j] = defaultEntryLogger.addEntry((long) i, generateEntry(i, j));
                 long entryLogId = (positions[i][j] >> 32L);
                 if (initialEntryLogPerLedgerEnabled) {
                     Assert.assertEquals("EntryLogId for ledger: " + i, i, entryLogId);
@@ -1799,7 +1802,7 @@ public class EntryLogTest {
         conf.setEntryLogPerLedgerEnabled(laterEntryLogPerLedgerEnabled);
         LedgerDirsManager newLedgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
                 new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
-        EntryLogger newEntryLogger = new EntryLogger(conf, newLedgerDirsManager);
+        DefaultEntryLogger newEntryLogger = new DefaultEntryLogger(conf, newLedgerDirsManager);
         EntryLogManager newEntryLogManager = newEntryLogger.getEntryLogManager();
         Assert.assertEquals("EntryLogManager class type",
                 laterEntryLogPerLedgerEnabled ? EntryLogManagerForEntryLogPerLedger.class
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/GarbageCollectorThreadTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/GarbageCollectorThreadTest.java
index ddb0461300..9ed271ecfc 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/GarbageCollectorThreadTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/GarbageCollectorThreadTest.java
@@ -64,7 +64,7 @@ public class GarbageCollectorThreadTest {
 
     @Before
     public void setUp() throws Exception {
-        when(ledgerStorage.getEntryLogger()).thenReturn(mock(EntryLogger.class));
+        when(ledgerStorage.getEntryLogger()).thenReturn(mock(DefaultEntryLogger.class));
         openMocks(this);
     }
 
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorageTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorageTest.java
index e9a02c28fe..777ba5f47c 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorageTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorageTest.java
@@ -104,7 +104,7 @@ public class InterleavedLedgerStorageTest {
         }
     };
 
-    static class TestableEntryLogger extends EntryLogger {
+    static class TestableDefaultEntryLogger extends DefaultEntryLogger {
         public interface CheckEntryListener {
             void accept(long ledgerId,
                         long entryId,
@@ -113,7 +113,7 @@ public class InterleavedLedgerStorageTest {
         }
         volatile CheckEntryListener testPoint;
 
-        public TestableEntryLogger(
+        public TestableDefaultEntryLogger(
                 ServerConfiguration conf,
                 LedgerDirsManager ledgerDirsManager,
                 EntryLogListener listener,
@@ -138,7 +138,7 @@ public class InterleavedLedgerStorageTest {
     TestStatsProvider statsProvider = new TestStatsProvider();
     ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
     LedgerDirsManager ledgerDirsManager;
-    TestableEntryLogger entryLogger;
+    TestableDefaultEntryLogger entryLogger;
     InterleavedLedgerStorage interleavedStorage = new InterleavedLedgerStorage();
     final long numWrites = 2000;
     final long moreNumOfWrites = 3000;
@@ -157,7 +157,7 @@ public class InterleavedLedgerStorageTest {
         ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
                 new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
 
-        entryLogger = new TestableEntryLogger(
+        entryLogger = new TestableDefaultEntryLogger(
                 conf, ledgerDirsManager, null, NullStatsLogger.INSTANCE);
         interleavedStorage.initializeWithEntryLogger(
                 conf, null, ledgerDirsManager, ledgerDirsManager,
@@ -444,7 +444,7 @@ public class InterleavedLedgerStorageTest {
 
 
         // Remove a logger
-        EntryLogger entryLogger = new EntryLogger(conf);
+        DefaultEntryLogger entryLogger = new DefaultEntryLogger(conf);
         entryLogger.removeEntryLog(someEntryLogger.get());
 
         // Should fail consistency checker
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageCheckpointTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageCheckpointTest.java
index 222e268078..7afa816af9 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageCheckpointTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageCheckpointTest.java
@@ -43,7 +43,6 @@ import java.util.stream.IntStream;
 import java.util.stream.LongStream;
 
 import org.apache.bookkeeper.bookie.EntryLogManagerForEntryLogPerLedger.BufferedLogChannelWithDirInfo;
-import org.apache.bookkeeper.bookie.EntryLogger.BufferedLogChannel;
 import org.apache.bookkeeper.bookie.Journal.LastLogMark;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
@@ -372,7 +371,8 @@ public class LedgerStorageCheckpointTest {
         }
         handle.close();
         // simulate rolling entrylog
-        ((EntryLogManagerBase) ledgerStorage.getEntryLogger().getEntryLogManager()).createNewLog(ledgerId);
+        ((EntryLogManagerBase) ((DefaultEntryLogger) ledgerStorage.getEntryLogger()).getEntryLogManager())
+                .createNewLog(ledgerId);
         // sleep for a bit for checkpoint to do its task
         executorController.advance(Duration.ofMillis(500));
 
@@ -508,7 +508,7 @@ public class LedgerStorageCheckpointTest {
         clientConf.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
         BookKeeper bkClient = new BookKeeper(clientConf);
         InterleavedLedgerStorage ledgerStorage = (InterleavedLedgerStorage) server.getBookie().getLedgerStorage();
-        EntryLogger entryLogger = ledgerStorage.entryLogger;
+        DefaultEntryLogger entryLogger = ledgerStorage.entryLogger;
         EntryLogManagerForEntryLogPerLedger entryLogManager = (EntryLogManagerForEntryLogPerLedger) entryLogger
                 .getEntryLogManager();
 
@@ -546,7 +546,7 @@ public class LedgerStorageCheckpointTest {
          * since checkpoint happenend, there shouldn't be any logChannelsToFlush
          * and bytesWrittenSinceLastFlush should be zero.
          */
-        List<BufferedLogChannel> copyOfRotatedLogChannels = entryLogManager.getRotatedLogChannels();
+        List<DefaultEntryLogger.BufferedLogChannel> copyOfRotatedLogChannels = entryLogManager.getRotatedLogChannels();
         Assert.assertTrue("There shouldn't be logChannelsToFlush",
                 ((copyOfRotatedLogChannels == null) || (copyOfRotatedLogChannels.size() == 0)));
 
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SlowInterleavedLedgerStorage.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SlowInterleavedLedgerStorage.java
index bdd7af354e..8f82dd88b2 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SlowInterleavedLedgerStorage.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SlowInterleavedLedgerStorage.java
@@ -44,27 +44,29 @@ public class SlowInterleavedLedgerStorage extends InterleavedLedgerStorage {
     /**
      * Strictly for testing.
      */
-    public static class SlowEntryLogger extends EntryLogger {
+    public static class SlowDefaultEntryLogger extends DefaultEntryLogger {
         public volatile long getDelay = 0;
         public volatile long addDelay = 0;
         public volatile long flushDelay = 0;
 
-        public SlowEntryLogger(ServerConfiguration conf, LedgerDirsManager ledgerDirsManager, EntryLogListener listener,
-                StatsLogger statsLogger) throws IOException {
+        public SlowDefaultEntryLogger(ServerConfiguration conf,
+                                      LedgerDirsManager ledgerDirsManager,
+                                      EntryLogListener listener,
+                                      StatsLogger statsLogger) throws IOException {
             super(conf, ledgerDirsManager, listener, statsLogger, UnpooledByteBufAllocator.DEFAULT);
         }
 
-        public SlowEntryLogger setAddDelay(long delay) {
+        public SlowDefaultEntryLogger setAddDelay(long delay) {
             addDelay = delay;
             return this;
         }
 
-        public SlowEntryLogger setGetDelay(long delay) {
+        public SlowDefaultEntryLogger setGetDelay(long delay) {
             getDelay = delay;
             return this;
         }
 
-        public SlowEntryLogger setFlushDelay(long delay) {
+        public SlowDefaultEntryLogger setFlushDelay(long delay) {
             flushDelay = delay;
             return this;
         }
@@ -76,9 +78,9 @@ public class SlowInterleavedLedgerStorage extends InterleavedLedgerStorage {
         }
 
         @Override
-        public long addEntry(long ledger, ByteBuf entry, boolean rollLog) throws IOException {
+        public long addEntry(long ledger, ByteBuf entry) throws IOException {
             delayMs(addDelay);
-            return super.addEntry(ledger, entry, rollLog);
+            return super.addEntry(ledger, entry);
         }
 
         @Override
@@ -120,22 +122,22 @@ public class SlowInterleavedLedgerStorage extends InterleavedLedgerStorage {
         long addDelay = conf.getLong(PROP_SLOW_STORAGE_ADD_DELAY, 0);
         long flushDelay = conf.getLong(PROP_SLOW_STORAGE_FLUSH_DELAY, 0);
 
-        entryLogger = new SlowEntryLogger(conf, ledgerDirsManager, this, statsLogger)
+        entryLogger = new SlowDefaultEntryLogger(conf, ledgerDirsManager, this, statsLogger)
                 .setAddDelay(addDelay)
                 .setGetDelay(getDelay)
                 .setFlushDelay(flushDelay);
     }
 
     public void setAddDelay(long delay) {
-        ((SlowEntryLogger) entryLogger).setAddDelay(delay);
+        ((SlowDefaultEntryLogger) entryLogger).setAddDelay(delay);
     }
 
     public void setGetDelay(long delay) {
-        ((SlowEntryLogger) entryLogger).setGetDelay(delay);
+        ((SlowDefaultEntryLogger) entryLogger).setGetDelay(delay);
     }
 
     public void setFlushDelay(long delay) {
-        ((SlowEntryLogger) entryLogger).setFlushDelay(delay);
+        ((SlowDefaultEntryLogger) entryLogger).setFlushDelay(delay);
     }
 
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java
index a60ab7e34c..07b0018683 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java
@@ -224,9 +224,9 @@ public class SortedLedgerStorageCheckpointTest extends LedgerStorageTestBase {
         });
 
         // simulate entry log is rotated (due to compaction)
-        EntryLogManagerForSingleEntryLog entryLogManager = (EntryLogManagerForSingleEntryLog) storage.getEntryLogger()
-                .getEntryLogManager();
-        entryLogManager.createNewLog(EntryLogger.UNASSIGNED_LEDGERID);
+        EntryLogManagerForSingleEntryLog entryLogManager =
+            (EntryLogManagerForSingleEntryLog) ((DefaultEntryLogger) storage.getEntryLogger()).getEntryLogManager();
+        entryLogManager.createNewLog(DefaultEntryLogger.UNASSIGNED_LEDGERID);
         long leastUnflushedLogId = storage.getEntryLogger().getLeastUnflushedLogId();
         long currentLogId = entryLogManager.getCurrentLogId();
         log.info("Least unflushed entry log : current = {}, leastUnflushed = {}", currentLogId, leastUnflushedLogId);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java
index 00efee9e4b..2116ff5cf1 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java
@@ -36,10 +36,10 @@ import org.apache.bookkeeper.bookie.Bookie.NoEntryException;
 import org.apache.bookkeeper.bookie.BookieException;
 import org.apache.bookkeeper.bookie.BookieImpl;
 import org.apache.bookkeeper.bookie.EntryLocation;
-import org.apache.bookkeeper.bookie.EntryLogger;
 import org.apache.bookkeeper.bookie.LedgerDirsManager;
 import org.apache.bookkeeper.bookie.LedgerStorage;
 import org.apache.bookkeeper.bookie.TestBookieImpl;
+import org.apache.bookkeeper.bookie.storage.EntryLogger;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.conf.TestBKConfiguration;
 import org.apache.bookkeeper.proto.BookieProtocol;
@@ -227,7 +227,7 @@ public class DbLedgerStorageTest {
         newEntry3.writeLong(4); // ledger id
         newEntry3.writeLong(3); // entry id
         newEntry3.writeBytes("new-entry-3".getBytes());
-        long location = entryLogger.addEntry(4L, newEntry3, false);
+        long location = entryLogger.addEntry(4L, newEntry3);
 
         List<EntryLocation> locations = Lists.newArrayList(new EntryLocation(4, 3, location));
         singleDirStorage.updateEntriesLocations(locations);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
index bedb76b990..d3e628fc0b 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
@@ -57,12 +57,12 @@ import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
 import org.apache.bookkeeper.bookie.Checkpointer;
 import org.apache.bookkeeper.bookie.CompactableLedgerStorage;
 import org.apache.bookkeeper.bookie.EntryLocation;
-import org.apache.bookkeeper.bookie.EntryLogger;
 import org.apache.bookkeeper.bookie.GarbageCollector;
 import org.apache.bookkeeper.bookie.LastAddConfirmedUpdateNotification;
 import org.apache.bookkeeper.bookie.LedgerDirsManager;
 import org.apache.bookkeeper.bookie.ScanAndCompareGarbageCollector;
 import org.apache.bookkeeper.bookie.StateManager;
+import org.apache.bookkeeper.bookie.storage.EntryLogger;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.LedgerMetadataBuilder;
 import org.apache.bookkeeper.client.api.DigestType;
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
index 8740577dc3..362ff50b15 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
@@ -40,10 +40,10 @@ import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
 import org.apache.bookkeeper.bookie.Checkpointer;
 import org.apache.bookkeeper.bookie.CompactableLedgerStorage;
 import org.apache.bookkeeper.bookie.EntryLocation;
-import org.apache.bookkeeper.bookie.EntryLogger;
 import org.apache.bookkeeper.bookie.LastAddConfirmedUpdateNotification;
 import org.apache.bookkeeper.bookie.LedgerDirsManager;
 import org.apache.bookkeeper.bookie.StateManager;
+import org.apache.bookkeeper.bookie.storage.EntryLogger;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.common.util.Watcher;
 import org.apache.bookkeeper.conf.ServerConfiguration;
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ZooKeeperCluster.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ZooKeeperCluster.java
index 2b28bfc694..d6b8b73567 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ZooKeeperCluster.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ZooKeeperCluster.java
@@ -59,7 +59,7 @@ public interface ZooKeeperCluster {
 
     void killCluster() throws Exception;
 
-    void sleepCluster(final int time, final TimeUnit timeUnit, final CountDownLatch l)
+    void sleepCluster(int time, final TimeUnit timeUnit, final CountDownLatch l)
             throws InterruptedException, IOException;
 
     default void expireSession(ZooKeeper zk) throws Exception {
diff --git a/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/bookie/ReadLogCommandTest.java b/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/bookie/ReadLogCommandTest.java
index 1c579fd92b..ec12702aac 100644
--- a/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/bookie/ReadLogCommandTest.java
+++ b/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/bookie/ReadLogCommandTest.java
@@ -22,8 +22,8 @@ import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Mockito.doNothing;
 
-import org.apache.bookkeeper.bookie.EntryLogger;
-import org.apache.bookkeeper.bookie.ReadOnlyEntryLogger;
+import org.apache.bookkeeper.bookie.ReadOnlyDefaultEntryLogger;
+import org.apache.bookkeeper.bookie.storage.EntryLogScanner;
 import org.apache.bookkeeper.tools.cli.helpers.BookieCommandTestBase;
 import org.junit.Assert;
 import org.junit.Test;
@@ -42,8 +42,8 @@ public class ReadLogCommandTest extends BookieCommandTestBase {
         super.setup();
 
         mockServerConfigurationConstruction();
-        mockConstruction(ReadOnlyEntryLogger.class, (entryLogger, context) -> {
-            doNothing().when(entryLogger).scanEntryLog(anyLong(), any(EntryLogger.EntryLogScanner.class));
+        mockConstruction(ReadOnlyDefaultEntryLogger.class, (entryLogger, context) -> {
+            doNothing().when(entryLogger).scanEntryLog(anyLong(), any(EntryLogScanner.class));
         });
     }
 
diff --git a/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/bookie/ReadLogMetadataCommandTest.java b/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/bookie/ReadLogMetadataCommandTest.java
index 4543168aa1..bc4d06c37c 100644
--- a/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/bookie/ReadLogMetadataCommandTest.java
+++ b/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/bookie/ReadLogMetadataCommandTest.java
@@ -25,7 +25,7 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import org.apache.bookkeeper.bookie.EntryLogMetadata;
-import org.apache.bookkeeper.bookie.ReadOnlyEntryLogger;
+import org.apache.bookkeeper.bookie.ReadOnlyDefaultEntryLogger;
 import org.apache.bookkeeper.tools.cli.helpers.BookieCommandTestBase;
 import org.apache.bookkeeper.util.collections.ConcurrentLongLongHashMap;
 import org.junit.Assert;
@@ -49,7 +49,7 @@ public class ReadLogMetadataCommandTest extends BookieCommandTestBase {
         mockServerConfigurationConstruction();
         entryLogMetadata = mock(EntryLogMetadata.class);
 
-        mockConstruction(ReadOnlyEntryLogger.class, (entryLogger, context) -> {
+        mockConstruction(ReadOnlyDefaultEntryLogger.class, (entryLogger, context) -> {
             when(entryLogger.getEntryLogMetadata(anyLong())).thenReturn(entryLogMetadata);
         });
 
@@ -69,7 +69,7 @@ public class ReadLogMetadataCommandTest extends BookieCommandTestBase {
     public void commandTest() throws Exception {
         ReadLogMetadataCommand cmd = new ReadLogMetadataCommand();
         Assert.assertTrue(cmd.apply(bkFlags, new String[] { "-l", "1" }));
-        verify(getMockedConstruction(ReadOnlyEntryLogger.class).constructed().get(0), times(1))
+        verify(getMockedConstruction(ReadOnlyDefaultEntryLogger.class).constructed().get(0), times(1))
                 .getEntryLogMetadata(anyLong());
         verify(entryLogMetadata, times(1)).getLedgersMap();
     }