You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/04/18 17:30:24 UTC

[bookkeeper] branch master updated: Issue #570: Introducing EntryLogManager.

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new a71da1d  Issue #570: Introducing EntryLogManager.
a71da1d is described below

commit a71da1d1bae1790c913b773cdb14f61374687625
Author: cguttapalem <cg...@salesforce.com>
AuthorDate: Wed Apr 18 10:30:16 2018 -0700

    Issue #570: Introducing EntryLogManager.
    
    Descriptions of the changes in this PR:
    
    Introducing EntryLogManager interface, which abstracts out current activeLogChannel,
    rotatedLogChannels and corresponding lock for activeLogChannel. The current logic of
    handling logs is moved to EntryLogManagerForSingleEntryLog class, in the
    next sub-task EntryLogManagerForEntryLogPerLedger will be introduced. Also there
    are minor changes to createNewLog logic and leastUnflushedLogId logic.
    
    This is < sub-task5  > of Issue #570
    
    Master Issue: #570
    
    Author: cguttapalem <cg...@salesforce.com>
    
    Reviewers: Sijie Guo <si...@apache.org>
    
    This closes #1281 from reddycharan/entrylogmanager, closes #570
---
 .../apache/bookkeeper/bookie/BufferedChannel.java  |  19 +-
 .../org/apache/bookkeeper/bookie/EntryLogger.java  | 916 ++++++++++++++-------
 .../bookkeeper/bookie/ReadOnlyEntryLogger.java     |  12 +-
 .../bookkeeper/bookie/SortedLedgerStorage.java     |  21 +-
 .../bookkeeper/conf/ServerConfiguration.java       |   4 +
 .../apache/bookkeeper/bookie/CreateNewLogTest.java | 150 +++-
 .../org/apache/bookkeeper/bookie/EntryLogTest.java | 205 ++++-
 .../bookie/LedgerStorageCheckpointTest.java        |  34 +-
 .../bookie/SortedLedgerStorageCheckpointTest.java  |  11 +-
 .../bookie/storage/ldb/DbLedgerStorageTest.java    |   2 +-
 .../apache/bookkeeper/test/ReadOnlyBookieTest.java |   2 +-
 11 files changed, 1011 insertions(+), 365 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
index 05a20e5..53628cf 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
@@ -53,6 +53,7 @@ public class BufferedChannel extends BufferedReadChannel implements Closeable {
      * calling fileChannel.force
      */
     protected final long unpersistedBytesBound;
+    private final boolean doRegularFlushes;
 
     /*
      * it tracks the number of bytes which are not persisted yet by force
@@ -81,6 +82,7 @@ public class BufferedChannel extends BufferedReadChannel implements Closeable {
         this.writeBuffer = ByteBufAllocator.DEFAULT.directBuffer(writeCapacity);
         this.unpersistedBytes = new AtomicLong(0);
         this.unpersistedBytesBound = unpersistedBytesBound;
+        this.doRegularFlushes = unpersistedBytesBound > 0;
     }
 
     @Override
@@ -114,7 +116,7 @@ public class BufferedChannel extends BufferedReadChannel implements Closeable {
             }
             position.addAndGet(copied);
             unpersistedBytes.addAndGet(copied);
-            if (unpersistedBytesBound > 0) {
+            if (doRegularFlushes) {
                 if (unpersistedBytes.get() >= unpersistedBytesBound) {
                     flush();
                     shouldForceWrite = true;
@@ -157,6 +159,21 @@ public class BufferedChannel extends BufferedReadChannel implements Closeable {
     }
 
     /**
+     * calls both flush and forceWrite methods if regular flush is enabled.
+     *
+     * @param forceMetadata
+     *            - If true then this method is required to force changes to
+     *            both the file's content and metadata to be written to storage;
+     *            otherwise, it need only force content changes to be written
+     * @throws IOException
+     */
+    public void flushAndForceWriteIfRegularFlush(boolean forceMetadata) throws IOException {
+        if (doRegularFlushes) {
+            flushAndForceWrite(forceMetadata);
+        }
+    }
+
+    /**
      * Write any data in the buffer to the file and advance the writeBufferPosition.
      * Callers are expected to synchronize appropriately
      *
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
index 99b0f49..158525d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
@@ -51,12 +51,16 @@ import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -84,6 +88,9 @@ import org.slf4j.LoggerFactory;
  */
 public class EntryLogger {
     private static final Logger LOG = LoggerFactory.getLogger(EntryLogger.class);
+    static final long UNASSIGNED_LEDGERID = -1L;
+    // log file suffix
+    private static final String LOG_FILE_SUFFIX = ".log";
 
     @VisibleForTesting
     static final int UNINITIALIZED_LOG_ID = -0xDEAD;
@@ -92,13 +99,10 @@ public class EntryLogger {
         private final long logId;
         private final EntryLogMetadata entryLogMetadata;
         private final File logFile;
+        private long ledgerIdAssigned = UNASSIGNED_LEDGERID;
 
-        public BufferedLogChannel(FileChannel fc,
-                                  int writeCapacity,
-                                  int readCapacity,
-                                  long logId,
-                                  File logFile,
-                                  long unpersistedBytesBound) throws IOException {
+        public BufferedLogChannel(FileChannel fc, int writeCapacity, int readCapacity, long logId, File logFile,
+                long unpersistedBytesBound) throws IOException {
             super(fc, writeCapacity, readCapacity, unpersistedBytesBound);
             this.logId = logId;
             this.entryLogMetadata = new EntryLogMetadata(logId);
@@ -120,21 +124,104 @@ public class EntryLogger {
             return entryLogMetadata.getLedgersMap();
         }
 
+        public Long getLedgerIdAssigned() {
+            return ledgerIdAssigned;
+        }
+
+        public void setLedgerIdAssigned(Long ledgerId) {
+            this.ledgerIdAssigned = ledgerId;
+        }
+
         @Override
         public String toString() {
             return MoreObjects.toStringHelper(BufferedChannel.class)
                 .add("logId", logId)
                 .add("logFile", logFile)
+                .add("ledgerIdAssigned", ledgerIdAssigned)
                 .toString();
         }
+
+        /**
+         * Append the ledger map at the end of the entry log.
+         * Updates the entry log file header with the offset and size of the map.
+         */
+        private void appendLedgersMap() throws IOException {
+
+            long ledgerMapOffset = this.position();
+
+            ConcurrentLongLongHashMap ledgersMap = this.getLedgersMap();
+            int numberOfLedgers = (int) ledgersMap.size();
+
+            // Write the ledgers map into several batches
+
+            final int maxMapSize = LEDGERS_MAP_HEADER_SIZE + LEDGERS_MAP_ENTRY_SIZE * LEDGERS_MAP_MAX_BATCH_SIZE;
+            final ByteBuf serializedMap = ByteBufAllocator.DEFAULT.buffer(maxMapSize);
+
+            try {
+                ledgersMap.forEach(new BiConsumerLong() {
+                    int remainingLedgers = numberOfLedgers;
+                    boolean startNewBatch = true;
+                    int remainingInBatch = 0;
+
+                    @Override
+                    public void accept(long ledgerId, long size) {
+                        if (startNewBatch) {
+                            int batchSize = Math.min(remainingLedgers, LEDGERS_MAP_MAX_BATCH_SIZE);
+                            int ledgerMapSize = LEDGERS_MAP_HEADER_SIZE + LEDGERS_MAP_ENTRY_SIZE * batchSize;
+
+                            serializedMap.clear();
+                            serializedMap.writeInt(ledgerMapSize - 4);
+                            serializedMap.writeLong(INVALID_LID);
+                            serializedMap.writeLong(LEDGERS_MAP_ENTRY_ID);
+                            serializedMap.writeInt(batchSize);
+
+                            startNewBatch = false;
+                            remainingInBatch = batchSize;
+                        }
+                        // Dump the ledger in the current batch
+                        serializedMap.writeLong(ledgerId);
+                        serializedMap.writeLong(size);
+                        --remainingLedgers;
+
+                        if (--remainingInBatch == 0) {
+                            // Close current batch
+                            try {
+                                write(serializedMap);
+                            } catch (IOException e) {
+                                throw new RuntimeException(e);
+                            }
+
+                            startNewBatch = true;
+                        }
+                    }
+                });
+            } catch (RuntimeException e) {
+                if (e.getCause() instanceof IOException) {
+                    throw (IOException) e.getCause();
+                } else {
+                    throw e;
+                }
+            } finally {
+                serializedMap.release();
+            }
+            // Flush the ledger's map out before we write the header.
+            // Otherwise the header might point to something that is not fully
+            // written
+            super.flush();
+
+            // Update the headers with the map offset and count of ledgers
+            ByteBuffer mapInfo = ByteBuffer.allocate(8 + 4);
+            mapInfo.putLong(ledgerMapOffset);
+            mapInfo.putInt(numberOfLedgers);
+            mapInfo.flip();
+            this.fileChannel.write(mapInfo, LEDGERS_MAP_OFFSET_POSITION);
+        }
     }
 
-    volatile File currentDir;
     private final LedgerDirsManager ledgerDirsManager;
     private final boolean entryLogPerLedgerEnabled;
-    private final AtomicBoolean shouldCreateNewEntryLog = new AtomicBoolean(false);
 
-    private volatile long leastUnflushedLogId;
+    final RecentEntryLogsStatus recentlyCreatedEntryLogsStatus;
 
     /**
      * locks for compaction log.
@@ -145,11 +232,11 @@ public class EntryLogger {
      * The maximum size of a entry logger file.
      */
     final long logSizeLimit;
-    List<BufferedLogChannel> logChannelsToFlush;
-    volatile BufferedLogChannel logChannel;
     private volatile BufferedLogChannel compactionLogChannel;
 
-    private final EntryLoggerAllocator entryLoggerAllocator;
+    final EntryLoggerAllocator entryLoggerAllocator;
+    private final EntryLogManager entryLogManager;
+
     private final boolean entryLogPreAllocationEnabled;
     private final CopyOnWriteArrayList<EntryLogListener> listeners = new CopyOnWriteArrayList<EntryLogListener>();
 
@@ -268,6 +355,8 @@ public class EntryLogger {
         // but the protocol varies so an exact value is difficult to determine
         this.maxSaneEntrySize = conf.getNettyMaxFrameSizeBytes() - 500;
         this.ledgerDirsManager = ledgerDirsManager;
+        this.conf = conf;
+        entryLogPerLedgerEnabled = conf.isEntryLogPerLedgerEnabled();
         if (listener != null) {
             addListener(listener);
         }
@@ -296,12 +385,72 @@ public class EntryLogger {
                 logId = lastLogId;
             }
         }
-        this.leastUnflushedLogId = logId + 1;
+        this.recentlyCreatedEntryLogsStatus = new RecentEntryLogsStatus(logId + 1);
         this.entryLoggerAllocator = new EntryLoggerAllocator(logId);
-        this.conf = conf;
-        this.entryLogPerLedgerEnabled = conf.isEntryLogPerLedgerEnabled();
+        if (entryLogPerLedgerEnabled) {
+            this.entryLogManager = new EntryLogManagerForSingleEntryLog(ledgerDirsManager) {
+                @Override
+                public void checkpoint() throws IOException {
+                    /*
+                     * In the case of entryLogPerLedgerEnabled we need to flush
+                     * both rotatedlogs and currentlogs. This is needed because
+                     * syncThread periodically does checkpoint and at this time
+                     * all the logs should be flushed.
+                     *
+                     */
+                    super.flush();
+                }
+
+                @Override
+                public void prepareSortedLedgerStorageCheckpoint(long numBytesFlushed) throws IOException {
+                    // do nothing
+                    /*
+                     * prepareSortedLedgerStorageCheckpoint is required for
+                     * singleentrylog scenario, but it is not needed for
+                     * entrylogperledger scenario, since entries of a ledger go
+                     * to a entrylog (even during compaction) and SyncThread
+                     * drives periodic checkpoint logic.
+                     */
+
+                }
+
+                @Override
+                public void prepareEntryMemTableFlush() {
+                    // do nothing
+                }
+
+                @Override
+                public boolean commitEntryMemTableFlush() throws IOException {
+                    // lock it only if there is new data
+                    // so that cache accesstime is not changed
+                    Set<BufferedLogChannel> copyOfCurrentLogs = new HashSet<BufferedLogChannel>(
+                            Arrays.asList(super.getCurrentLogForLedger(EntryLogger.UNASSIGNED_LEDGERID)));
+                    for (BufferedLogChannel currentLog : copyOfCurrentLogs) {
+                        if (reachEntryLogLimit(currentLog, 0L)) {
+                            synchronized (this) {
+                                if (reachEntryLogLimit(currentLog, 0L)) {
+                                    LOG.info("Rolling entry logger since it reached size limitation");
+                                    createNewLog(EntryLogger.UNASSIGNED_LEDGERID);
+                                }
+                            }
+                        }
+                    }
+                    /*
+                     * in the case of entrylogperledger, SyncThread drives
+                     * checkpoint logic for every flushInterval. So
+                     * EntryMemtable doesn't need to call checkpoint in the case
+                     * of entrylogperledger.
+                     */
+                    return false;
+                }
+            };
+        } else {
+            this.entryLogManager = new EntryLogManagerForSingleEntryLog(ledgerDirsManager);
+        }
+    }
 
-        initialize();
+    EntryLogManager getEntryLogManager() {
+        return entryLogManager;
     }
 
     void addListener(EntryLogListener listener) {
@@ -324,13 +473,11 @@ public class EntryLogger {
      */
     private int readFromLogChannel(long entryLogId, BufferedReadChannel channel, ByteBuf buff, long pos)
             throws IOException {
-        BufferedLogChannel bc = logChannel;
+        BufferedLogChannel bc = entryLogManager.getCurrentLogIfPresent(entryLogId);
         if (null != bc) {
-            if (entryLogId == bc.getLogId()) {
-                synchronized (bc) {
-                    if (pos + buff.writableBytes() >= bc.getFileChannelPosition()) {
-                        return bc.read(buff, pos);
-                    }
+            synchronized (bc) {
+                if (pos + buff.writableBytes() >= bc.getFileChannelPosition()) {
+                    return bc.read(buff, pos);
                 }
             }
         }
@@ -397,17 +544,12 @@ public class EntryLogger {
      *
      * @return least unflushed log id.
      */
-    synchronized long getLeastUnflushedLogId() {
-        return leastUnflushedLogId;
+    long getLeastUnflushedLogId() {
+        return recentlyCreatedEntryLogsStatus.getLeastUnflushedLogId();
     }
 
-    synchronized long getCurrentLogId() {
-        BufferedLogChannel channel = logChannel;
-        if (null == channel) {
-            return UNINITIALIZED_LOG_ID;
-        } else {
-            return channel.getLogId();
-        }
+    long getPreviousAllocatedEntryLogId() {
+        return entryLoggerAllocator.getPreallocatedLogId();
     }
 
     /**
@@ -422,75 +564,16 @@ public class EntryLogger {
         }
     }
 
-    protected void initialize() throws IOException {
-        // Register listener for disk full notifications.
-        ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener());
-
-        if (ledgerDirsManager.hasWritableLedgerDirs()) {
-            createNewLog();
-        }
+    void prepareSortedLedgerStorageCheckpoint(long numBytesFlushed) throws IOException {
+        entryLogManager.prepareSortedLedgerStorageCheckpoint(numBytesFlushed);
     }
 
-    private LedgerDirsListener getLedgerDirsListener() {
-        return new LedgerDirsListener() {
-            @Override
-            public void diskFull(File disk) {
-                // If the current entry log disk is full, then create new entry
-                // log.
-                if (currentDir != null && currentDir.equals(disk)) {
-                    shouldCreateNewEntryLog.set(true);
-                }
-            }
-
-            @Override
-            public void diskAlmostFull(File disk) {
-                // If the current entry log disk is almost full, then create new entry
-                // log.
-                if (currentDir != null && currentDir.equals(disk)) {
-                    shouldCreateNewEntryLog.set(true);
-                }
-            }
-        };
+    void prepareEntryMemTableFlush() {
+        entryLogManager.prepareEntryMemTableFlush();
     }
 
-    /**
-     * Rolling a new log file to write.
-     */
-    synchronized void rollLog() throws IOException {
-        createNewLog();
-    }
-
-    /**
-     * Creates a new log file.
-     */
-    void createNewLog() throws IOException {
-        // first tried to create a new log channel. add current log channel to ToFlush list only when
-        // there is a new log channel. it would prevent that a log channel is referenced by both
-        // *logChannel* and *ToFlush* list.
-        if (null != logChannel) {
-            if (null == logChannelsToFlush) {
-                logChannelsToFlush = new LinkedList<BufferedLogChannel>();
-            }
-
-            // flush the internal buffer back to filesystem but not sync disk
-            // so the readers could access the data from filesystem.
-            logChannel.flush();
-
-            // Append ledgers map at the end of entry log
-            appendLedgersMap(logChannel);
-
-            BufferedLogChannel newLogChannel = entryLoggerAllocator.createNewLog();
-            logChannelsToFlush.add(logChannel);
-            LOG.info("Flushing entry logger {} back to filesystem, pending for syncing entry loggers : {}.",
-                    logChannel.getLogId(), logChannelsToFlush);
-            for (EntryLogListener listener : listeners) {
-                listener.onRotateEntryLog();
-            }
-            logChannel = newLogChannel;
-        } else {
-            logChannel = entryLoggerAllocator.createNewLog();
-        }
-        currentDir = logChannel.getLogFile().getParentFile();
+    boolean commitEntryMemTableFlush() throws IOException {
+        return entryLogManager.commitEntryMemTableFlush();
     }
 
     /**
@@ -499,79 +582,6 @@ public class EntryLogger {
     EntryLoggerAllocator getEntryLoggerAllocator() {
         return entryLoggerAllocator;
     }
-    /**
-     * Append the ledger map at the end of the entry log.
-     * Updates the entry log file header with the offset and size of the map.
-     */
-    private void appendLedgersMap(BufferedLogChannel entryLogChannel) throws IOException {
-        long ledgerMapOffset = entryLogChannel.position();
-
-        ConcurrentLongLongHashMap ledgersMap = entryLogChannel.getLedgersMap();
-        int numberOfLedgers = (int) ledgersMap.size();
-
-        // Write the ledgers map into several batches
-
-        final int maxMapSize = LEDGERS_MAP_HEADER_SIZE + LEDGERS_MAP_ENTRY_SIZE * LEDGERS_MAP_MAX_BATCH_SIZE;
-        final ByteBuf serializedMap = ByteBufAllocator.DEFAULT.buffer(maxMapSize);
-
-        try {
-            ledgersMap.forEach(new BiConsumerLong() {
-                int remainingLedgers = numberOfLedgers;
-                boolean startNewBatch = true;
-                int remainingInBatch = 0;
-
-                @Override
-                public void accept(long ledgerId, long size) {
-                    if (startNewBatch) {
-                        int batchSize = Math.min(remainingLedgers, LEDGERS_MAP_MAX_BATCH_SIZE);
-                        int ledgerMapSize = LEDGERS_MAP_HEADER_SIZE + LEDGERS_MAP_ENTRY_SIZE * batchSize;
-
-                        serializedMap.clear();
-                        serializedMap.writeInt(ledgerMapSize - 4);
-                        serializedMap.writeLong(INVALID_LID);
-                        serializedMap.writeLong(LEDGERS_MAP_ENTRY_ID);
-                        serializedMap.writeInt(batchSize);
-
-                        startNewBatch = false;
-                        remainingInBatch = batchSize;
-                    }
-                    // Dump the ledger in the current batch
-                    serializedMap.writeLong(ledgerId);
-                    serializedMap.writeLong(size);
-                    --remainingLedgers;
-
-                    if (--remainingInBatch == 0) {
-                        // Close current batch
-                        try {
-                            entryLogChannel.write(serializedMap);
-                        } catch (IOException e) {
-                            throw new RuntimeException(e);
-                        }
-
-                        startNewBatch = true;
-                    }
-                }
-            });
-        } catch (RuntimeException e) {
-            if (e.getCause() instanceof IOException) {
-                throw (IOException) e.getCause();
-            } else {
-                throw e;
-            }
-        } finally {
-            serializedMap.release();
-        }
-        // Flush the ledger's map out before we write the header.
-        // Otherwise the header might point to something that is not fully written
-        entryLogChannel.flush();
-
-        // Update the headers with the map offset and count of ledgers
-        ByteBuffer mapInfo = ByteBuffer.allocate(8 + 4);
-        mapInfo.putLong(ledgerMapOffset);
-        mapInfo.putInt(numberOfLedgers);
-        mapInfo.flip();
-        entryLogChannel.fileChannel.write(mapInfo, LEDGERS_MAP_OFFSET_POSITION);
-    }
 
     /**
      * An allocator pre-allocates entry log files.
@@ -589,6 +599,10 @@ public class EntryLogger {
             allocatorExecutor = Executors.newSingleThreadExecutor();
         }
 
+        synchronized long getPreallocatedLogId() {
+            return preallocatedLogId;
+        }
+
         BufferedLogChannel createNewLog() throws IOException {
             synchronized (createEntryLogLock) {
                 BufferedLogChannel bc;
@@ -630,37 +644,42 @@ public class EntryLogger {
             }
         }
 
-        private BufferedLogChannel allocateNewLog() throws IOException {
+        private synchronized BufferedLogChannel allocateNewLog() throws IOException {
             return allocateNewLog(".log");
         }
 
         /**
          * Allocate a new log file.
          */
-        private BufferedLogChannel allocateNewLog(String suffix) throws IOException {
+        private synchronized BufferedLogChannel allocateNewLog(String suffix) throws IOException {
             List<File> list = ledgerDirsManager.getWritableLedgerDirsForNewLog();
-            Collections.shuffle(list);
+            File dirForNextEntryLog = entryLogManager.getDirForNextEntryLog(list);
+
+            List<File> ledgersDirs = ledgerDirsManager.getAllLedgerDirs();
+            String logFileName;
             // It would better not to overwrite existing entry log files
-            File newLogFile = null;
+            File testLogFile = null;
             do {
                 if (preallocatedLogId >= Integer.MAX_VALUE) {
                     preallocatedLogId = 0;
                 } else {
                     ++preallocatedLogId;
                 }
-                String logFileName = Long.toHexString(preallocatedLogId) + suffix;
-                for (File dir : list) {
-                    newLogFile = new File(dir, logFileName);
-                    if (newLogFile.exists()) {
-                        LOG.warn("Found existed entry log " + newLogFile
+                logFileName = Long.toHexString(preallocatedLogId) + suffix;
+                for (File dir : ledgersDirs) {
+                    testLogFile = new File(dir, logFileName);
+                    if (testLogFile.exists()) {
+                        LOG.warn("Found existed entry log " + testLogFile
                                + " when trying to create it as a new log.");
-                        newLogFile = null;
+                        testLogFile = null;
                         break;
                     }
                 }
-            } while (newLogFile == null);
+            } while (testLogFile == null);
 
+            File newLogFile = new File(dirForNextEntryLog, logFileName);
             FileChannel channel = new RandomAccessFile(newLogFile, "rw").getChannel();
+
             BufferedLogChannel logChannel = new BufferedLogChannel(channel, conf.getWriteBufferBytes(),
                     conf.getReadBufferBytes(), preallocatedLogId, newLogFile, conf.getFlushIntervalInBytes());
             logfileHeader.readerIndex(0);
@@ -669,6 +688,11 @@ public class EntryLogger {
             for (File f : list) {
                 setLastLogId(f, preallocatedLogId);
             }
+
+            if (suffix.equals(LOG_FILE_SUFFIX)) {
+                recentlyCreatedEntryLogsStatus.createdEntryLog(preallocatedLogId);
+            }
+
             LOG.info("Created new entry log file {} for logId {}.", newLogFile, preallocatedLogId);
             return logChannel;
         }
@@ -788,89 +812,418 @@ public class EntryLogger {
         }
     }
 
-    /**
-     * Flushes all rotated log channels. After log channels are flushed,
-     * move leastUnflushedLogId ptr to current logId.
-     */
-    void checkpoint() throws IOException {
-        flushRotatedLogs();
+    interface EntryLogManager {
+
+        /*
+         * add entry to the corresponding entrylog and return the position of
+         * the entry in the entrylog
+         */
+        long addEntry(long ledger, ByteBuf entry, boolean rollLog) throws IOException;
+
+        /*
+         * gets the active logChannel with the given entryLogId. null if it is
+         * not existing.
+         */
+        BufferedLogChannel getCurrentLogIfPresent(long entryLogId);
+
         /*
-         * In the case of entryLogPerLedgerEnabled we need to flush both
-         * rotatedlogs and currentlogs. This is needed because syncThread
-         * periodically does checkpoint and at this time all the logs should
-         * be flushed.
+         * Returns eligible writable ledger dir for the creation next entrylog
+         */
+        File getDirForNextEntryLog(List<File> writableLedgerDirs);
+
+        /*
+         * Do the operations required for checkpoint.
+         */
+        void checkpoint() throws IOException;
+
+        /*
+         * flush both current and rotated logs.
+         */
+        void flush() throws IOException;
+
+        /*
+         * close current logs.
+         */
+        void close() throws IOException;
+
+        /*
+         * force close current logs.
+         */
+        void forceClose();
+
+        /*
+         *
+         */
+        void prepareSortedLedgerStorageCheckpoint(long numBytesFlushed) throws IOException;
+
+        /*
+         * this method should be called before doing entrymemtable flush, it
+         * would save the state of the entrylogger before entrymemtable flush
+         * and commitEntryMemTableFlush would take appropriate action after
+         * entrymemtable flush.
+         */
+        void prepareEntryMemTableFlush();
+
+        /*
+         * this method should be called after doing entrymemtable flush,it would
+         * take appropriate action after entrymemtable flush depending on the
+         * current state of the entrylogger and the state of the entrylogger
+         * during prepareEntryMemTableFlush.
          *
-         * TODO: When EntryLogManager is introduced in the subsequent sub-tasks of
-         * this Issue, I will move this logic to individual implamentations of
-         * EntryLogManager and it would be free of this booalen flag based logic.
+         * It is assumed that there would be corresponding
+         * prepareEntryMemTableFlush for every commitEntryMemTableFlush and both
+         * would be called from the same thread.
          *
+         * returns boolean value indicating whether EntryMemTable should do checkpoint
+         * after this commit method.
          */
-        if (entryLogPerLedgerEnabled) {
-            flushCurrentLog();
+        boolean commitEntryMemTableFlush() throws IOException;
+    }
+
+    abstract class EntryLogManagerBase implements EntryLogManager {
+        volatile List<BufferedLogChannel> rotatedLogChannels;
+
+        private final FastThreadLocal<ByteBuf> sizeBufferForAdd = new FastThreadLocal<ByteBuf>() {
+            @Override
+            protected ByteBuf initialValue() throws Exception {
+                return Unpooled.buffer(4);
+            }
+        };
+
+        /*
+         * This method should be guarded by a lock, so callers of this method
+         * should be in the right scope of the lock.
+         */
+        @Override
+        public long addEntry(long ledger, ByteBuf entry, boolean rollLog) throws IOException {
+            int entrySize = entry.readableBytes() + 4; // Adding 4 bytes to prepend the size
+            BufferedLogChannel logChannel = getCurrentLogForLedgerForAddEntry(ledger, entrySize, rollLog);
+            ByteBuf sizeBuffer = sizeBufferForAdd.get();
+            sizeBuffer.clear();
+            sizeBuffer.writeInt(entry.readableBytes());
+            logChannel.write(sizeBuffer);
+
+            long pos = logChannel.position();
+            logChannel.write(entry);
+            logChannel.registerWrittenEntry(ledger, entrySize);
+
+            return (logChannel.getLogId() << 32L) | pos;
+        }
+
+        boolean reachEntryLogLimit(BufferedLogChannel logChannel, long size) {
+            if (logChannel == null) {
+                return false;
+            }
+            return logChannel.position() + size > logSizeLimit;
+        }
+
+        boolean readEntryLogHardLimit(BufferedLogChannel logChannel, long size) {
+            if (logChannel == null) {
+                return false;
+            }
+            return logChannel.position() + size > Integer.MAX_VALUE;
+        }
+
+        abstract BufferedLogChannel getCurrentLogForLedger(long ledgerId);
+
+        abstract BufferedLogChannel getCurrentLogForLedgerForAddEntry(long ledgerId, int entrySize, boolean rollLog)
+                throws IOException;
+
+        abstract void setCurrentLogForLedgerAndAddToRotate(long ledgerId, BufferedLogChannel logChannel);
+
+        /*
+         * flush current logs.
+         */
+        abstract void flushCurrentLogs() throws IOException;
+
+        /*
+         * flush rotated logs.
+         */
+        abstract void flushRotatedLogs() throws IOException;
+
+        List<BufferedLogChannel> getRotatedLogChannels() {
+            return rotatedLogChannels;
+        }
+
+        @Override
+        public void flush() throws IOException {
+            flushRotatedLogs();
+            flushCurrentLogs();
+        }
+
+        void flushLogChannel(BufferedLogChannel logChannel, boolean forceMetadata) throws IOException {
+            if (logChannel != null) {
+                logChannel.flushAndForceWrite(forceMetadata);
+                LOG.debug("Flush and sync current entry logger {}", logChannel.getLogId());
+            }
+        }
+
+        /*
+         * Creates a new log file. This method should be guarded by a lock,
+         * so callers of this method should be in right scope of the lock.
+         */
+        void createNewLog(long ledgerId) throws IOException {
+            BufferedLogChannel logChannel = getCurrentLogForLedger(ledgerId);
+            // first tried to create a new log channel. add current log channel to ToFlush list only when
+            // there is a new log channel. it would prevent that a log channel is referenced by both
+            // *logChannel* and *ToFlush* list.
+            if (null != logChannel) {
+
+                // flush the internal buffer back to filesystem but not sync disk
+                logChannel.flush();
+
+                // Append ledgers map at the end of entry log
+                logChannel.appendLedgersMap();
+
+                BufferedLogChannel newLogChannel = entryLoggerAllocator.createNewLog();
+                setCurrentLogForLedgerAndAddToRotate(ledgerId, newLogChannel);
+                LOG.info("Flushing entry logger {} back to filesystem, pending for syncing entry loggers : {}.",
+                        logChannel.getLogId(), rotatedLogChannels);
+                for (EntryLogListener listener : listeners) {
+                    listener.onRotateEntryLog();
+                }
+            } else {
+                setCurrentLogForLedgerAndAddToRotate(ledgerId, entryLoggerAllocator.createNewLog());
+            }
         }
     }
 
-    void flushRotatedLogs() throws IOException {
-        List<BufferedLogChannel> channels = null;
-        long flushedLogId = INVALID_LID;
-        synchronized (this) {
-            channels = logChannelsToFlush;
-            logChannelsToFlush = null;
+    class EntryLogManagerForSingleEntryLog extends EntryLogManagerBase {
+
+        private volatile BufferedLogChannel activeLogChannel;
+        private long logIdBeforeFlush = INVALID_LID;
+        private final AtomicBoolean shouldCreateNewEntryLog = new AtomicBoolean(false);
+
+        EntryLogManagerForSingleEntryLog(LedgerDirsManager ledgerDirsManager) {
+            this.rotatedLogChannels = new LinkedList<BufferedLogChannel>();
+            // Register listener for disk full notifications.
+            ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener());
         }
-        if (null == channels) {
-            return;
+
+        private LedgerDirsListener getLedgerDirsListener() {
+            return new LedgerDirsListener() {
+                @Override
+                public void diskFull(File disk) {
+                    // If the current entry log disk is full, then create new
+                    // entry log.
+                    BufferedLogChannel currentActiveLogChannel = activeLogChannel;
+                    if (currentActiveLogChannel != null
+                            && currentActiveLogChannel.getLogFile().getParentFile().equals(disk)) {
+                        shouldCreateNewEntryLog.set(true);
+                    }
+                }
+
+                @Override
+                public void diskAlmostFull(File disk) {
+                    // If the current entry log disk is almost full, then create new entry
+                    // log.
+                    BufferedLogChannel currentActiveLogChannel = activeLogChannel;
+                    if (currentActiveLogChannel != null
+                            && currentActiveLogChannel.getLogFile().getParentFile().equals(disk)) {
+                        shouldCreateNewEntryLog.set(true);
+                    }
+                }
+            };
         }
-        Iterator<BufferedLogChannel> chIter = channels.iterator();
-        while (chIter.hasNext()) {
-            BufferedLogChannel channel = chIter.next();
-            try {
-                channel.flushAndForceWrite(false);
-            } catch (IOException ioe) {
-                // rescue from flush exception, add unflushed channels back
-                synchronized (this) {
-                    if (null == logChannelsToFlush) {
-                        logChannelsToFlush = channels;
-                    } else {
-                        logChannelsToFlush.addAll(0, channels);
+
+        @Override
+        public synchronized long addEntry(long ledger, ByteBuf entry, boolean rollLog) throws IOException {
+            return super.addEntry(ledger, entry, rollLog);
+        }
+
+        @Override
+        synchronized BufferedLogChannel getCurrentLogForLedgerForAddEntry(long ledgerId, int entrySize,
+                boolean rollLog) throws IOException {
+            if (null == activeLogChannel) {
+                // log channel can be null because the file is deferred to be created
+                createNewLog(UNASSIGNED_LEDGERID);
+            }
+
+            boolean reachEntryLogLimit = rollLog ? reachEntryLogLimit(activeLogChannel, entrySize)
+                    : readEntryLogHardLimit(activeLogChannel, entrySize);
+            // Create new log if logSizeLimit reached or current disk is full
+            boolean createNewLog = shouldCreateNewEntryLog.get();
+            if (createNewLog || reachEntryLogLimit) {
+                if (activeLogChannel != null) {
+                    activeLogChannel.flushAndForceWriteIfRegularFlush(false);
+                }
+                createNewLog(UNASSIGNED_LEDGERID);
+                // Reset the flag
+                if (createNewLog) {
+                    shouldCreateNewEntryLog.set(false);
+                }
+            }
+            return activeLogChannel;
+        }
+
+        @Override
+        synchronized void createNewLog(long ledgerId) throws IOException {
+            super.createNewLog(ledgerId);
+        }
+
+        @Override
+        public synchronized void setCurrentLogForLedgerAndAddToRotate(long ledgerId, BufferedLogChannel logChannel) {
+            BufferedLogChannel hasToRotateLogChannel = activeLogChannel;
+            activeLogChannel = logChannel;
+            if (hasToRotateLogChannel != null) {
+                rotatedLogChannels.add(hasToRotateLogChannel);
+            }
+        }
+
+        @Override
+        public BufferedLogChannel getCurrentLogForLedger(long ledgerId) {
+            return activeLogChannel;
+        }
+
+        @Override
+        public BufferedLogChannel getCurrentLogIfPresent(long entryLogId) {
+            BufferedLogChannel activeLogChannelTemp = activeLogChannel;
+            if ((activeLogChannelTemp != null) && (activeLogChannelTemp.getLogId() == entryLogId)) {
+                return activeLogChannelTemp;
+            }
+            return null;
+        }
+
+        @Override
+        public File getDirForNextEntryLog(List<File> writableLedgerDirs) {
+            Collections.shuffle(writableLedgerDirs);
+            return writableLedgerDirs.get(0);
+        }
+
+        @Override
+        public void checkpoint() throws IOException {
+            flushRotatedLogs();
+        }
+
+        public long getCurrentLogId() {
+            BufferedLogChannel currentActiveLogChannel = activeLogChannel;
+            if (currentActiveLogChannel != null) {
+                return currentActiveLogChannel.getLogId();
+            } else {
+                return EntryLogger.UNINITIALIZED_LOG_ID;
+            }
+        }
+
+        @Override
+        public void flushCurrentLogs() throws IOException {
+            BufferedLogChannel currentActiveLogChannel = activeLogChannel;
+            if (currentActiveLogChannel != null) {
+                /**
+                 * flushCurrentLogs method is called during checkpoint, so
+                 * metadata of the file also should be force written.
+                 */
+                flushLogChannel(currentActiveLogChannel, true);
+            }
+        }
+
+        @Override
+        void flushRotatedLogs() throws IOException {
+            List<BufferedLogChannel> channels = null;
+            synchronized (this) {
+                channels = rotatedLogChannels;
+                rotatedLogChannels = new LinkedList<BufferedLogChannel>();
+            }
+            if (null == channels) {
+                return;
+            }
+            Iterator<BufferedLogChannel> chIter = channels.iterator();
+            while (chIter.hasNext()) {
+                BufferedLogChannel channel = chIter.next();
+                try {
+                    channel.flushAndForceWrite(true);
+                } catch (IOException ioe) {
+                    // rescue from flush exception, add unflushed channels back
+                    synchronized (this) {
+                        if (null == rotatedLogChannels) {
+                            rotatedLogChannels = channels;
+                        } else {
+                            rotatedLogChannels.addAll(0, channels);
+                        }
                     }
+                    throw ioe;
                 }
-                throw ioe;
+                // remove the channel from the list after it is successfully flushed
+                chIter.remove();
+                // since this channel is only used for writing, after flushing the channel,
+                // we had to close the underlying file channel. Otherwise, we might end up
+                // leaking fds which cause the disk spaces could not be reclaimed.
+                closeFileChannel(channel);
+                recentlyCreatedEntryLogsStatus.flushRotatedEntryLog(channel.getLogId());
+                LOG.info("Synced entry logger {} to disk.", channel.getLogId());
             }
-            // remove the channel from the list after it is successfully flushed
-            chIter.remove();
-            // since this channel is only used for writing, after flushing the channel,
-            // we had to close the underlying file channel. Otherwise, we might end up
-            // leaking fds which cause the disk spaces could not be reclaimed.
-            closeFileChannel(channel);
-            if (channel.getLogId() > flushedLogId) {
-                flushedLogId = channel.getLogId();
+        }
+
+        @Override
+        public void close() throws IOException {
+            if (activeLogChannel != null) {
+                closeFileChannel(activeLogChannel);
             }
-            LOG.info("Synced entry logger {} to disk.", channel.getLogId());
         }
-        // move the leastUnflushedLogId ptr
-        leastUnflushedLogId = flushedLogId + 1;
-    }
 
-    public void flush() throws IOException {
-        flushRotatedLogs();
-        flushCurrentLog();
-    }
+        @Override
+        public void forceClose() {
+            if (activeLogChannel != null) {
+                forceCloseFileChannel(activeLogChannel);
+            }
+        }
+
+        @Override
+        public void prepareEntryMemTableFlush() {
+            logIdBeforeFlush = getCurrentLogId();
+        }
 
-    synchronized void flushCurrentLog() throws IOException {
-        if (logChannel != null) {
-            logChannel.flushAndForceWrite(false);
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Flush and sync current entry logger {}.", logChannel.getLogId());
+        @Override
+        public boolean commitEntryMemTableFlush() throws IOException {
+            long logIdAfterFlush = getCurrentLogId();
+            /*
+             * in any case that an entry log reaches the limit, we roll the log
+             * and start checkpointing. if a memory table is flushed spanning
+             * over two entry log files, we also roll log. this is for
+             * performance consideration: since we don't wanna checkpoint a new
+             * log file that ledger storage is writing to.
+             */
+            if (reachEntryLogLimit(activeLogChannel, 0L) || logIdAfterFlush != logIdBeforeFlush) {
+                LOG.info("Rolling entry logger since it reached size limitation");
+                createNewLog(UNASSIGNED_LEDGERID);
+                return true;
+            }
+            return false;
+        }
+
+        @Override
+        public void prepareSortedLedgerStorageCheckpoint(long numBytesFlushed) throws IOException{
+            if (numBytesFlushed > 0) {
+                // if bytes are added between previous flush and this checkpoint,
+                // it means bytes might live at current active entry log, we need
+                // roll current entry log and then issue checkpoint to underlying
+                // interleaved ledger storage.
+                createNewLog(UNASSIGNED_LEDGERID);
             }
         }
     }
 
+    /**
+     * Flushes all rotated log channels. After log channels are flushed,
+     * move leastUnflushedLogId ptr to current logId.
+     */
+    void checkpoint() throws IOException {
+        entryLogManager.checkpoint();
+    }
+
+    public void flush() throws IOException {
+        entryLogManager.flush();
+    }
+
     long addEntry(long ledger, ByteBuffer entry) throws IOException {
-        return addEntry(ledger, Unpooled.wrappedBuffer(entry), true);
+        return entryLogManager.addEntry(ledger, Unpooled.wrappedBuffer(entry), true);
     }
 
     long addEntry(long ledger, ByteBuf entry) throws IOException {
-        return addEntry(ledger, entry, true);
+        return entryLogManager.addEntry(ledger, entry, true);
+    }
+
+    public long addEntry(long ledger, ByteBuf entry, boolean rollLog) throws IOException {
+        return entryLogManager.addEntry(ledger, entry, true);
     }
 
     private final FastThreadLocal<ByteBuf> sizeBuffer = new FastThreadLocal<ByteBuf>() {
@@ -880,39 +1233,6 @@ public class EntryLogger {
         }
     };
 
-    public synchronized long addEntry(long ledger, ByteBuf entry, boolean rollLog) throws IOException {
-        if (null == logChannel) {
-            // log channel can be null because the file is deferred to be created when no writable ledger directory
-            // is available.
-            createNewLog();
-        }
-
-        int entrySize = entry.readableBytes() + 4; // Adding 4 bytes to prepend the size
-        boolean reachEntryLogLimit =
-            rollLog ? reachEntryLogLimit(entrySize) : readEntryLogHardLimit(entrySize);
-        // Create new log if logSizeLimit reached or current disk is full
-        boolean createNewLog = shouldCreateNewEntryLog.get();
-        if (createNewLog || reachEntryLogLimit) {
-            createNewLog();
-            // Reset the flag
-            if (createNewLog) {
-                shouldCreateNewEntryLog.set(false);
-            }
-        }
-
-        // Get a buffer from thread local to store the size
-        ByteBuf sizeBuffer = this.sizeBuffer.get();
-        sizeBuffer.clear();
-        sizeBuffer.writeInt(entry.readableBytes());
-        logChannel.write(sizeBuffer);
-
-        long pos = logChannel.position();
-        logChannel.write(entry);
-        logChannel.registerWrittenEntry(ledger, entrySize);
-
-        return (logChannel.getLogId() << 32L) | pos;
-    }
-
     long addEntryForCompaction(long ledgerId, ByteBuf entry) throws IOException {
         synchronized (compactionLogLock) {
             int entrySize = entry.readableBytes() + 4;
@@ -935,7 +1255,7 @@ public class EntryLogger {
     void flushCompactionLog() throws IOException {
         synchronized (compactionLogLock) {
             if (compactionLogChannel != null) {
-                appendLedgersMap(compactionLogChannel);
+                compactionLogChannel.appendLedgersMap();
                 compactionLogChannel.flushAndForceWrite(false);
                 LOG.info("Flushed compaction log file {} with logId.",
                     compactionLogChannel.getLogFile(),
@@ -982,13 +1302,7 @@ public class EntryLogger {
         return offset >> 32L;
     }
 
-    synchronized boolean reachEntryLogLimit(long size) {
-        return logChannel.position() + size > logSizeLimit;
-    }
 
-    synchronized boolean readEntryLogHardLimit(long size) {
-        return logChannel.position() + size > Integer.MAX_VALUE;
-    }
 
     public ByteBuf internalReadEntry(long ledgerId, long entryId, long location)
             throws IOException, Bookie.NoEntryException {
@@ -1366,8 +1680,7 @@ public class EntryLogger {
             }
             // clear the mapping, so we don't need to go through the channels again in finally block in normal case.
             logid2FileChannel.clear();
-            // close current writing log file
-            closeFileChannel(logChannel);
+            entryLogManager.close();
             synchronized (compactionLogLock) {
                 closeFileChannel(compactionLogChannel);
                 compactionLogChannel = null;
@@ -1379,7 +1692,8 @@ public class EntryLogger {
             for (FileChannel fc : logid2FileChannel.values()) {
                 IOUtils.close(LOG, fc);
             }
-            forceCloseFileChannel(logChannel);
+
+            entryLogManager.forceClose();
             synchronized (compactionLogLock) {
                 forceCloseFileChannel(compactionLogChannel);
             }
@@ -1434,4 +1748,40 @@ public class EntryLogger {
     static String logId2HexString(long logId) {
         return Long.toHexString(logId);
     }
-}
+
+    /**
+     * Datastructure which maintains the status of logchannels. When a
+     * logChannel is created entry of < entryLogId, false > will be made to this
+     * sortedmap and when logChannel is rotated and flushed then the entry is
+     * updated to < entryLogId, true > and all the lowest entries with
+     * < entryLogId, true > status will be removed from the sortedmap. So that way
+     * we could get least unflushed LogId.
+     *
+     */
+    static class RecentEntryLogsStatus {
+        private final SortedMap<Long, Boolean> entryLogsStatusMap;
+        private long leastUnflushedLogId;
+
+        RecentEntryLogsStatus(long leastUnflushedLogId) {
+            entryLogsStatusMap = new TreeMap<Long, Boolean>();
+            this.leastUnflushedLogId = leastUnflushedLogId;
+        }
+
+        synchronized void createdEntryLog(Long entryLogId) {
+            entryLogsStatusMap.put(entryLogId, false);
+        }
+
+        synchronized void flushRotatedEntryLog(Long entryLogId) {
+            entryLogsStatusMap.replace(entryLogId, true);
+            while ((!entryLogsStatusMap.isEmpty()) && (entryLogsStatusMap.get(entryLogsStatusMap.firstKey()))) {
+                long leastFlushedLogId = entryLogsStatusMap.firstKey();
+                entryLogsStatusMap.remove(leastFlushedLogId);
+                leastUnflushedLogId = leastFlushedLogId + 1;
+            }
+        }
+
+        synchronized long getLeastUnflushedLogId() {
+            return leastUnflushedLogId;
+        }
+    }
+}
\ No newline at end of file
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyEntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyEntryLogger.java
index d661412..3a07ec4 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyEntryLogger.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyEntryLogger.java
@@ -38,23 +38,13 @@ public class ReadOnlyEntryLogger extends EntryLogger {
     }
 
     @Override
-    protected void initialize() throws IOException {
-        // do nothing for read only entry logger
-    }
-
-    @Override
-    void createNewLog() throws IOException {
-        throw new IOException("Can't create new entry log using a readonly entry logger.");
-    }
-
-    @Override
     protected boolean removeEntryLog(long entryLogId) {
         // can't remove entry log in readonly mode
         return false;
     }
 
     @Override
-    public synchronized long addEntry(long ledger, ByteBuffer entry) throws IOException {
+    public synchronized long addEntry(long ledgerId, ByteBuffer entry) throws IOException {
         throw new IOException("Can't add entry to a readonly entry logger.");
     }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
index 0e3e3b9..f2efa55 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
@@ -168,13 +168,7 @@ public class SortedLedgerStorage extends InterleavedLedgerStorage
     @Override
     public void checkpoint(final Checkpoint checkpoint) throws IOException {
         long numBytesFlushed = memTable.flush(this, checkpoint);
-        if (numBytesFlushed > 0) {
-            // if bytes are added between previous flush and this checkpoint,
-            // it means bytes might live at current active entry log, we need
-            // roll current entry log and then issue checkpoint to underlying
-            // interleaved ledger storage.
-            entryLogger.rollLog();
-        }
+        entryLogger.prepareSortedLedgerStorageCheckpoint(numBytesFlushed);
         super.checkpoint(checkpoint);
     }
 
@@ -209,19 +203,12 @@ public class SortedLedgerStorage extends InterleavedLedgerStorage
             public void run() {
                 try {
                     LOG.info("Started flushing mem table.");
-                    long logIdBeforeFlush = entryLogger.getCurrentLogId();
+                    entryLogger.prepareEntryMemTableFlush();
                     memTable.flush(SortedLedgerStorage.this);
-                    long logIdAfterFlush = entryLogger.getCurrentLogId();
-                    // in any case that an entry log reaches the limit, we roll the log and start checkpointing.
-                    // if a memory table is flushed spanning over two entry log files, we also roll log. this is
-                    // for performance consideration: since we don't wanna checkpoint a new log file that ledger
-                    // storage is writing to.
-                    if (entryLogger.reachEntryLogLimit(0) || logIdAfterFlush != logIdBeforeFlush) {
-                        LOG.info("Rolling entry logger since it reached size limitation");
-                        entryLogger.rollLog();
+                    if (entryLogger.commitEntryMemTableFlush()) {
                         checkpointer.startCheckpoint(cp);
                     }
-                } catch (IOException e) {
+                } catch (Exception e) {
                     stateManager.transitionToReadOnlyMode();
                     LOG.error("Exception thrown while flushing skip list cache.", e);
                 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index 904ca3f..785b35a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -2262,6 +2262,10 @@ public class ServerConfiguration extends AbstractConfiguration<ServerConfigurati
         if (0 == getBookiePort() && !getAllowEphemeralPorts()) {
             throw new ConfigurationException("Invalid port specified, using ephemeral ports accidentally?");
         }
+        if (isEntryLogPerLedgerEnabled() && getUseTransactionalCompaction()) {
+            throw new ConfigurationException(
+                    "When entryLogPerLedger is enabled , it is unnecessary to use transactional compaction");
+        }
     }
 
     /**
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java
index 33642c2..4257ccc 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java
@@ -21,12 +21,18 @@ package org.apache.bookkeeper.bookie;
 import static org.junit.Assert.assertTrue;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.IntStream;
 
+import org.apache.bookkeeper.bookie.EntryLogger.EntryLogManagerBase;
+import org.apache.bookkeeper.bookie.EntryLogger.EntryLogManagerForSingleEntryLog;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.conf.TestBKConfiguration;
 import org.apache.bookkeeper.util.DiskChecker;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -93,16 +99,17 @@ public class CreateNewLogTest {
         // Extracted from createNewLog()
         String logFileName = Long.toHexString(1) + ".log";
         File dir = ledgerDirsManager.pickRandomWritableDir();
-        LOG.info("Picked this directory: " + dir);
+        LOG.info("Picked this directory: {}", dir);
         File newLogFile = new File(dir, logFileName);
         newLogFile.createNewFile();
 
         EntryLogger el = new EntryLogger(conf, ledgerDirsManager);
         // Calls createNewLog, and with the number of directories we
         // are using, if it picks one at random it will fail.
-        el.createNewLog();
-        LOG.info("This is the current log id: " + el.getCurrentLogId());
-        assertTrue("Wrong log id", el.getCurrentLogId() > 1);
+        EntryLogManagerForSingleEntryLog entryLogManager = (EntryLogManagerForSingleEntryLog) el.getEntryLogManager();
+        entryLogManager.createNewLog(0L);
+        LOG.info("This is the current log id: {}", entryLogManager.getCurrentLogId());
+        assertTrue("Wrong log id", entryLogManager.getCurrentLogId() > 1);
     }
 
     @Test
@@ -118,7 +125,7 @@ public class CreateNewLogTest {
         // Extracted from createNewLog()
         String logFileName = Long.toHexString(1) + ".log";
         File dir = ledgerDirsManager.pickRandomWritableDir();
-        LOG.info("Picked this directory: " + dir);
+        LOG.info("Picked this directory: {}", dir);
         File newLogFile = new File(dir, logFileName);
         newLogFile.createNewFile();
 
@@ -131,9 +138,136 @@ public class CreateNewLogTest {
         EntryLogger el = new EntryLogger(conf, ledgerDirsManager);
         // Calls createNewLog, and with the number of directories we
         // are using, if it picks one at random it will fail.
-        el.createNewLog();
-        LOG.info("This is the current log id: " + el.getCurrentLogId());
-        assertTrue("Wrong log id", el.getCurrentLogId() > 1);
+        EntryLogManagerForSingleEntryLog entryLogManager = (EntryLogManagerForSingleEntryLog) el.getEntryLogManager();
+        entryLogManager.createNewLog(0L);
+        LOG.info("This is the current log id: {}", entryLogManager.getCurrentLogId());
+        assertTrue("Wrong log id", entryLogManager.getCurrentLogId() > 1);
     }
 
+    @Test
+    public void testConcurrentCreateNewLogWithEntryLogFilePreAllocationEnabled() throws Exception {
+        testConcurrentCreateNewLog(true);
+    }
+
+    @Test
+    public void testConcurrentCreateNewLogWithEntryLogFilePreAllocationDisabled() throws Exception {
+        testConcurrentCreateNewLog(false);
+    }
+
+    public void testConcurrentCreateNewLog(boolean entryLogFilePreAllocationEnabled) throws Exception {
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+
+        // Creating a new configuration with a number of
+        // ledger directories.
+        conf.setLedgerDirNames(ledgerDirs);
+        conf.setEntryLogFilePreAllocationEnabled(entryLogFilePreAllocationEnabled);
+        LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
+
+        EntryLogger el = new EntryLogger(conf, ledgerDirsManager);
+        EntryLogManagerBase entryLogManager = (EntryLogManagerBase) el.getEntryLogManager();
+        Assert.assertEquals("previousAllocatedEntryLogId after initialization", -1,
+                el.getPreviousAllocatedEntryLogId());
+        Assert.assertEquals("leastUnflushedLogId after initialization", 0, el.getLeastUnflushedLogId());
+        int createNewLogNumOfTimes = 10;
+        AtomicBoolean receivedException = new AtomicBoolean(false);
+
+        IntStream.range(0, createNewLogNumOfTimes).parallel().forEach((i) -> {
+            try {
+                (entryLogManager).createNewLog((long) i);
+            } catch (IOException e) {
+                LOG.error("Received exception while creating newLog", e);
+                receivedException.set(true);
+            }
+        });
+        // wait for the pre-allocation to complete
+        Thread.sleep(1000);
+
+        Assert.assertFalse("There shouldn't be any exceptions while creating newlog", receivedException.get());
+        int expectedPreviousAllocatedEntryLogId = createNewLogNumOfTimes - 1;
+        if (entryLogFilePreAllocationEnabled) {
+            expectedPreviousAllocatedEntryLogId = createNewLogNumOfTimes;
+        }
+
+        Assert.assertEquals(
+                "previousAllocatedEntryLogId after " + createNewLogNumOfTimes
+                        + " number of times createNewLog is called",
+                expectedPreviousAllocatedEntryLogId, el.getPreviousAllocatedEntryLogId());
+        Assert.assertEquals("Number of RotatedLogChannels", createNewLogNumOfTimes - 1,
+                entryLogManager.getRotatedLogChannels().size());
+    }
+
+    @Test
+    public void testCreateNewLogWithGaps() throws Exception {
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+
+        // Creating a new configuration with a number of
+        // ledger directories.
+        conf.setLedgerDirNames(ledgerDirs);
+        conf.setEntryLogFilePreAllocationEnabled(false);
+        LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
+
+        EntryLogger el = new EntryLogger(conf, ledgerDirsManager);
+        EntryLogManagerBase entryLogManagerBase = (EntryLogManagerBase) el.getEntryLogManager();
+        entryLogManagerBase.createNewLog(0L);
+
+        Assert.assertEquals("previousAllocatedEntryLogId after initialization", 0, el.getPreviousAllocatedEntryLogId());
+
+        // Extracted from createNewLog()
+        String logFileName = Long.toHexString(1) + ".log";
+        File dir = ledgerDirsManager.pickRandomWritableDir();
+        LOG.info("Picked this directory: {}", dir);
+        File newLogFile = new File(dir, logFileName);
+        newLogFile.createNewFile();
+
+        entryLogManagerBase.createNewLog(0L);
+        Assert.assertEquals("previousAllocatedEntryLogId since entrylogid 1 is already taken", 2,
+                el.getPreviousAllocatedEntryLogId());
+
+        // Extracted from createNewLog()
+        logFileName = Long.toHexString(3) + ".log";
+        dir = ledgerDirsManager.pickRandomWritableDir();
+        LOG.info("Picked this directory: {}", dir);
+        newLogFile = new File(dir, logFileName);
+        newLogFile.createNewFile();
+
+        entryLogManagerBase.createNewLog(0L);
+        Assert.assertEquals("previousAllocatedEntryLogId since entrylogid 3 is already taken", 4,
+                el.getPreviousAllocatedEntryLogId());
+    }
+
+    @Test
+    public void testCreateNewLogAndCompactionLog() throws Exception {
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+
+        // Creating a new configuration with a number of
+        // ledger directories.
+        conf.setLedgerDirNames(ledgerDirs);
+        conf.setEntryLogFilePreAllocationEnabled(true);
+        LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
+        EntryLogger el = new EntryLogger(conf, ledgerDirsManager);
+        AtomicBoolean receivedException = new AtomicBoolean(false);
+
+        IntStream.range(0, 2).parallel().forEach((i) -> {
+            try {
+                if (i % 2 == 0) {
+                    ((EntryLogManagerBase) el.getEntryLogManager()).createNewLog((long) i);
+                } else {
+                    el.createNewCompactionLog();
+                }
+            } catch (IOException e) {
+                LOG.error("Received exception while creating newLog", e);
+                receivedException.set(true);
+            }
+        });
+        // wait for the pre-allocation to complete
+        Thread.sleep(1000);
+
+        Assert.assertFalse("There shouldn't be any exceptions while creating newlog", receivedException.get());
+        Assert.assertEquals(
+                "previousAllocatedEntryLogId after 2 times createNewLog is called", 2,
+                el.getPreviousAllocatedEntryLogId());
+    }
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
index b7f286c..21da951 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
@@ -44,6 +44,9 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
+import org.apache.bookkeeper.bookie.EntryLogger.EntryLogManager;
+import org.apache.bookkeeper.bookie.EntryLogger.EntryLogManagerBase;
+import org.apache.bookkeeper.bookie.EntryLogger.EntryLogManagerForSingleEntryLog;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.conf.TestBKConfiguration;
@@ -64,6 +67,7 @@ public class EntryLogTest {
     private static final Logger LOG = LoggerFactory.getLogger(EntryLogTest.class);
 
     final List<File> tempDirs = new ArrayList<File>();
+    final Random rand = new Random();
 
     File createTempDir(String prefix, String suffix) throws IOException {
         File dir = IOUtils.createTempDir(prefix, suffix);
@@ -119,11 +123,13 @@ public class EntryLogTest {
         this.dirsMgr.addToFilledDirs(curDir);
 
         entryLogger = new EntryLogger(conf, dirsMgr);
-        assertEquals(EntryLogger.UNINITIALIZED_LOG_ID, entryLogger.getCurrentLogId());
+        EntryLogManagerForSingleEntryLog entryLogManager =
+                (EntryLogManagerForSingleEntryLog) entryLogger.getEntryLogManager();
+        assertEquals(EntryLogger.UNINITIALIZED_LOG_ID, entryLogManager.getCurrentLogId());
 
         // add the first entry will trigger file creation
-        entryLogger.addEntry(1, generateEntry(1, 1).nioBuffer());
-        assertEquals(2L, entryLogger.getCurrentLogId());
+        entryLogger.addEntry(1L, generateEntry(1, 1).nioBuffer());
+        assertEquals(0L, entryLogManager.getCurrentLogId());
     }
 
     @Test
@@ -141,23 +147,25 @@ public class EntryLogTest {
         this.dirsMgr.addToFilledDirs(curDir);
 
         entryLogger = new EntryLogger(conf, dirsMgr);
-        assertEquals(EntryLogger.UNINITIALIZED_LOG_ID, entryLogger.getCurrentLogId());
+        EntryLogManagerForSingleEntryLog entryLogManager =
+                (EntryLogManagerForSingleEntryLog) entryLogger.getEntryLogManager();
+        assertEquals(EntryLogger.UNINITIALIZED_LOG_ID, entryLogManager.getCurrentLogId());
 
         // add the first entry will trigger file creation
         try {
-            entryLogger.addEntry(1, generateEntry(1, 1).nioBuffer());
+            entryLogger.addEntry(1L, generateEntry(1, 1).nioBuffer());
             fail("Should fail to append entry if there is no enough reserved space left");
         } catch (NoWritableLedgerDirException e) {
-            assertEquals(EntryLogger.UNINITIALIZED_LOG_ID, entryLogger.getCurrentLogId());
+            assertEquals(EntryLogger.UNINITIALIZED_LOG_ID, entryLogManager.getCurrentLogId());
         }
     }
 
     @Test
     public void testCorruptEntryLog() throws Exception {
         // create some entries
-        entryLogger.addEntry(1, generateEntry(1, 1).nioBuffer());
-        entryLogger.addEntry(3, generateEntry(3, 1).nioBuffer());
-        entryLogger.addEntry(2, generateEntry(2, 1).nioBuffer());
+        entryLogger.addEntry(1L, generateEntry(1, 1).nioBuffer());
+        entryLogger.addEntry(3L, generateEntry(3, 1).nioBuffer());
+        entryLogger.addEntry(2L, generateEntry(2, 1).nioBuffer());
         entryLogger.flush();
         entryLogger.shutdown();
         // now lets truncate the file to corrupt the last entry, which simulates a partial write
@@ -184,6 +192,16 @@ public class EntryLogTest {
         return bb;
     }
 
+    private ByteBuf generateEntry(long ledger, long entry, int length) {
+        ByteBuf bb = Unpooled.buffer(length);
+        bb.writeLong(ledger);
+        bb.writeLong(entry);
+        byte[] randbyteArray = new byte[length - 8 - 8];
+        rand.nextBytes(randbyteArray);
+        bb.writeBytes(randbyteArray);
+        return bb;
+    }
+
     private static String generateDataString(long ledger, long entry) {
         return ("ledger-" + ledger + "-" + entry);
     }
@@ -199,7 +217,7 @@ public class EntryLogTest {
 
             EntryLogger logger = new EntryLogger(conf, dirsMgr);
             for (int j = 0; j < numEntries; j++) {
-                positions[i][j] = logger.addEntry(i, generateEntry(i, j).nioBuffer());
+                positions[i][j] = logger.addEntry((long) i, generateEntry(i, j).nioBuffer());
             }
             logger.flush();
             logger.shutdown();
@@ -214,7 +232,7 @@ public class EntryLogTest {
 
             EntryLogger logger = new EntryLogger(conf, dirsMgr);
             for (int j = 0; j < numEntries; j++) {
-                positions[i][j] = logger.addEntry(i, generateEntry(i, j).nioBuffer());
+                positions[i][j] = logger.addEntry((long) i, generateEntry(i, j).nioBuffer());
             }
             logger.flush();
             logger.shutdown();
@@ -271,6 +289,7 @@ public class EntryLogTest {
         File ledgerDir1 = createTempDir("bkTest", ".dir");
         File ledgerDir2 = createTempDir("bkTest", ".dir");
         ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+        conf.setLedgerStorageClass(InterleavedLedgerStorage.class.getName());
         conf.setJournalDirName(ledgerDir1.toString());
         conf.setLedgerDirNames(new String[] { ledgerDir1.getAbsolutePath(),
                 ledgerDir2.getAbsolutePath() });
@@ -287,7 +306,8 @@ public class EntryLogTest {
         ledgerStorage.addEntry(generateEntry(1, 1));
         ledgerStorage.addEntry(generateEntry(2, 1));
         // Add entry with disk full failure simulation
-        bookie.getLedgerDirsManager().addToFilledDirs(entryLogger.currentDir);
+        bookie.getLedgerDirsManager().addToFilledDirs(((EntryLogManagerBase) entryLogger.getEntryLogManager())
+                .getCurrentLogForLedger(EntryLogger.UNASSIGNED_LEDGERID).getLogFile().getParentFile());
         ledgerStorage.addEntry(generateEntry(3, 1));
         // Verify written entries
         Assert.assertTrue(0 == generateEntry(1, 1).compareTo(ledgerStorage.getEntry(1, 1)));
@@ -301,12 +321,14 @@ public class EntryLogTest {
     @Test
     public void testRecoverFromLedgersMap() throws Exception {
         // create some entries
-        entryLogger.addEntry(1, generateEntry(1, 1).nioBuffer());
-        entryLogger.addEntry(3, generateEntry(3, 1).nioBuffer());
-        entryLogger.addEntry(2, generateEntry(2, 1).nioBuffer());
-        entryLogger.addEntry(1, generateEntry(1, 2).nioBuffer());
-        entryLogger.rollLog();
-        entryLogger.flushRotatedLogs();
+        entryLogger.addEntry(1L, generateEntry(1, 1).nioBuffer());
+        entryLogger.addEntry(3L, generateEntry(3, 1).nioBuffer());
+        entryLogger.addEntry(2L, generateEntry(2, 1).nioBuffer());
+        entryLogger.addEntry(1L, generateEntry(1, 2).nioBuffer());
+
+        EntryLogManagerBase entryLogManager = (EntryLogManagerBase) entryLogger.getEntryLogManager();
+        entryLogManager.createNewLog(EntryLogger.UNASSIGNED_LEDGERID);
+        entryLogManager.flushRotatedLogs();
 
         EntryLogMetadata meta = entryLogger.extractEntryLogMetadataFromIndex(0L);
         LOG.info("Extracted Meta From Entry Log {}", meta);
@@ -324,11 +346,11 @@ public class EntryLogTest {
     @Test
     public void testRecoverFromLedgersMapOnV0EntryLog() throws Exception {
         // create some entries
-        entryLogger.addEntry(1, generateEntry(1, 1).nioBuffer());
-        entryLogger.addEntry(3, generateEntry(3, 1).nioBuffer());
-        entryLogger.addEntry(2, generateEntry(2, 1).nioBuffer());
-        entryLogger.addEntry(1, generateEntry(1, 2).nioBuffer());
-        entryLogger.rollLog();
+        entryLogger.addEntry(1L, generateEntry(1, 1).nioBuffer());
+        entryLogger.addEntry(3L, generateEntry(3, 1).nioBuffer());
+        entryLogger.addEntry(2L, generateEntry(2, 1).nioBuffer());
+        entryLogger.addEntry(1L, generateEntry(1, 2).nioBuffer());
+        ((EntryLogManagerBase) entryLogger.getEntryLogManager()).createNewLog(EntryLogger.UNASSIGNED_LEDGERID);
         entryLogger.shutdown();
 
         // Rewrite the entry log header to be on V0 format
@@ -373,9 +395,10 @@ public class EntryLogTest {
 
         entryLogger = new EntryLogger(conf, dirsMgr);
         // create a logger whose initialization phase allocating a new entry log
+        ((EntryLogManagerBase) entryLogger.getEntryLogManager()).createNewLog(EntryLogger.UNASSIGNED_LEDGERID);
         assertNotNull(entryLogger.getEntryLoggerAllocator().getPreallocationFuture());
 
-        entryLogger.addEntry(1, generateEntry(1, 1).nioBuffer());
+        entryLogger.addEntry(1L, generateEntry(1, 1).nioBuffer());
         // the Future<BufferedLogChannel> is not null all the time
         assertNotNull(entryLogger.getEntryLoggerAllocator().getPreallocationFuture());
         entryLogger.shutdown();
@@ -386,7 +409,7 @@ public class EntryLogTest {
         entryLogger = new EntryLogger(conf, dirsMgr);
         assertNull(entryLogger.getEntryLoggerAllocator().getPreallocationFuture());
 
-        entryLogger.addEntry(2, generateEntry(1, 1).nioBuffer());
+        entryLogger.addEntry(2L, generateEntry(1, 1).nioBuffer());
 
         // the Future<BufferedLogChannel> is null all the time
         assertNull(entryLogger.getEntryLoggerAllocator().getPreallocationFuture());
@@ -398,17 +421,18 @@ public class EntryLogTest {
     @Test
     public void testGetEntryLogsSet() throws Exception {
         // create some entries
-        assertEquals(Sets.newHashSet(0L, 1L), entryLogger.getEntryLogsSet());
+        EntryLogManagerBase entryLogManagerBase = ((EntryLogManagerBase) entryLogger.getEntryLogManager());
+        assertEquals(Sets.newHashSet(), entryLogger.getEntryLogsSet());
 
-        entryLogger.rollLog();
-        entryLogger.flushRotatedLogs();
+        entryLogManagerBase.createNewLog(EntryLogger.UNASSIGNED_LEDGERID);
+        entryLogManagerBase.flushRotatedLogs();
 
-        assertEquals(Sets.newHashSet(0L, 1L, 2L), entryLogger.getEntryLogsSet());
+        assertEquals(Sets.newHashSet(0L, 1L), entryLogger.getEntryLogsSet());
 
-        entryLogger.rollLog();
-        entryLogger.flushRotatedLogs();
+        entryLogManagerBase.createNewLog(EntryLogger.UNASSIGNED_LEDGERID);
+        entryLogManagerBase.flushRotatedLogs();
 
-        assertEquals(Sets.newHashSet(0L, 1L, 2L, 3L), entryLogger.getEntryLogsSet());
+        assertEquals(Sets.newHashSet(0L, 1L, 2L), entryLogger.getEntryLogsSet());
     }
 
     static class LedgerStorageWriteTask implements Callable<Boolean> {
@@ -566,4 +590,121 @@ public class EntryLogTest {
 
         executor.shutdownNow();
     }
+
+    /**
+     * Test to verify the leastUnflushedLogId logic in EntryLogsStatus.
+     */
+    @Test
+    public void testEntryLoggersRecentEntryLogsStatus() throws Exception {
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+        conf.setLedgerDirNames(createAndGetLedgerDirs(2));
+        LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
+
+        EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager);
+        EntryLogger.RecentEntryLogsStatus recentlyCreatedLogsStatus = entryLogger.recentlyCreatedEntryLogsStatus;
+
+        recentlyCreatedLogsStatus.createdEntryLog(0L);
+        Assert.assertEquals("entryLogger's leastUnflushedLogId ", 0L, entryLogger.getLeastUnflushedLogId());
+        recentlyCreatedLogsStatus.flushRotatedEntryLog(0L);
+        // since we marked entrylog - 0 as rotated, LeastUnflushedLogId would be previous rotatedlog+1
+        Assert.assertEquals("entryLogger's leastUnflushedLogId ", 1L, entryLogger.getLeastUnflushedLogId());
+        recentlyCreatedLogsStatus.createdEntryLog(1L);
+        Assert.assertEquals("entryLogger's leastUnflushedLogId ", 1L, entryLogger.getLeastUnflushedLogId());
+        recentlyCreatedLogsStatus.createdEntryLog(2L);
+        recentlyCreatedLogsStatus.createdEntryLog(3L);
+        recentlyCreatedLogsStatus.createdEntryLog(4L);
+        Assert.assertEquals("entryLogger's leastUnflushedLogId ", 1L, entryLogger.getLeastUnflushedLogId());
+        recentlyCreatedLogsStatus.flushRotatedEntryLog(1L);
+        Assert.assertEquals("entryLogger's leastUnflushedLogId ", 2L, entryLogger.getLeastUnflushedLogId());
+        recentlyCreatedLogsStatus.flushRotatedEntryLog(3L);
+        // here though we rotated entrylog-3, entrylog-2 is not yet rotated so
+        // LeastUnflushedLogId should be still 2
+        Assert.assertEquals("entryLogger's leastUnflushedLogId ", 2L, entryLogger.getLeastUnflushedLogId());
+        recentlyCreatedLogsStatus.flushRotatedEntryLog(2L);
+        // entrylog-3 is already rotated, so leastUnflushedLogId should be 4
+        Assert.assertEquals("entryLogger's leastUnflushedLogId ", 4L, entryLogger.getLeastUnflushedLogId());
+        recentlyCreatedLogsStatus.flushRotatedEntryLog(4L);
+        Assert.assertEquals("entryLogger's leastUnflushedLogId ", 5L, entryLogger.getLeastUnflushedLogId());
+        recentlyCreatedLogsStatus.createdEntryLog(5L);
+        recentlyCreatedLogsStatus.createdEntryLog(7L);
+        recentlyCreatedLogsStatus.createdEntryLog(9L);
+        Assert.assertEquals("entryLogger's leastUnflushedLogId ", 5L, entryLogger.getLeastUnflushedLogId());
+        recentlyCreatedLogsStatus.flushRotatedEntryLog(5L);
+        // since we marked entrylog-5 as rotated, LeastUnflushedLogId would be previous rotatedlog+1
+        Assert.assertEquals("entryLogger's leastUnflushedLogId ", 6L, entryLogger.getLeastUnflushedLogId());
+        recentlyCreatedLogsStatus.flushRotatedEntryLog(7L);
+        Assert.assertEquals("entryLogger's leastUnflushedLogId ", 8L, entryLogger.getLeastUnflushedLogId());
+    }
+
+    String[] createAndGetLedgerDirs(int numOfLedgerDirs) throws IOException {
+        File ledgerDir;
+        File curDir;
+        String[] ledgerDirsPath = new String[numOfLedgerDirs];
+        for (int i = 0; i < numOfLedgerDirs; i++) {
+            ledgerDir = createTempDir("bkTest", ".dir");
+            curDir = Bookie.getCurrentDirectory(ledgerDir);
+            Bookie.checkDirectoryStructure(curDir);
+            ledgerDirsPath[i] = ledgerDir.getAbsolutePath();
+        }
+        return ledgerDirsPath;
+    }
+
+    /*
+     * test for validating if the EntryLog/BufferedChannel flushes/forcewrite if the bytes written to it are more than
+     * flushIntervalInBytes
+     */
+    @Test(timeout = 60000)
+    public void testFlushIntervalInBytes() throws Exception {
+        long flushIntervalInBytes = 5000;
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+        conf.setEntryLogPerLedgerEnabled(true);
+        conf.setFlushIntervalInBytes(flushIntervalInBytes);
+        conf.setLedgerDirNames(createAndGetLedgerDirs(2));
+        LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
+        EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager);
+        EntryLogManagerBase entryLogManagerBase = ((EntryLogManagerBase) entryLogger.getEntryLogManager());
+
+        /*
+         * when entryLogger is created Header of length EntryLogger.LOGFILE_HEADER_SIZE is created
+         */
+        long ledgerId = 0L;
+        int firstEntrySize = 1000;
+        long entry0Position = entryLogger.addEntry(0L, generateEntry(ledgerId, 0L, firstEntrySize));
+        // entrylogger writes length of the entry (4 bytes) before writing entry
+        long expectedUnpersistedBytes = EntryLogger.LOGFILE_HEADER_SIZE + firstEntrySize + 4;
+        Assert.assertEquals("Unpersisted Bytes of entrylog", expectedUnpersistedBytes,
+                entryLogManagerBase.getCurrentLogForLedger(ledgerId).getUnpersistedBytes());
+
+        /*
+         * 'flushIntervalInBytes' number of bytes are flushed so BufferedChannel should be forcewritten
+         */
+        int secondEntrySize = (int) (flushIntervalInBytes - expectedUnpersistedBytes);
+        long entry1Position = entryLogger.addEntry(0L, generateEntry(ledgerId, 1L, secondEntrySize));
+        Assert.assertEquals("Unpersisted Bytes of entrylog", 0,
+                entryLogManagerBase.getCurrentLogForLedger(ledgerId).getUnpersistedBytes());
+
+        /*
+         * since entrylog/Bufferedchannel is persisted (forcewritten), we should be able to read the entrylog using
+         * newEntryLogger
+         */
+        conf.setEntryLogPerLedgerEnabled(false);
+        EntryLogger newEntryLogger = new EntryLogger(conf, ledgerDirsManager);
+        EntryLogManager newEntryLogManager = newEntryLogger.getEntryLogManager();
+        Assert.assertEquals("EntryLogManager class type", EntryLogger.EntryLogManagerForSingleEntryLog.class,
+                newEntryLogManager.getClass());
+
+        ByteBuf buf = newEntryLogger.readEntry(ledgerId, 0L, entry0Position);
+        long readLedgerId = buf.readLong();
+        long readEntryId = buf.readLong();
+        Assert.assertEquals("LedgerId", ledgerId, readLedgerId);
+        Assert.assertEquals("EntryId", 0L, readEntryId);
+
+        buf = newEntryLogger.readEntry(ledgerId, 1L, entry1Position);
+        readLedgerId = buf.readLong();
+        readEntryId = buf.readLong();
+        Assert.assertEquals("LedgerId", ledgerId, readLedgerId);
+        Assert.assertEquals("EntryId", 1L, readEntryId);
+    }
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageCheckpointTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageCheckpointTest.java
index f55249c..6cbdcde 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageCheckpointTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageCheckpointTest.java
@@ -28,10 +28,13 @@ import java.io.FileInputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.time.Duration;
+import java.util.Arrays;
 import java.util.Enumeration;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Random;
+import java.util.Set;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -39,6 +42,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.IntStream;
 import java.util.stream.LongStream;
 
+import org.apache.bookkeeper.bookie.EntryLogger.BufferedLogChannel;
+import org.apache.bookkeeper.bookie.EntryLogger.EntryLogManagerBase;
 import org.apache.bookkeeper.bookie.Journal.LastLogMark;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
@@ -359,7 +364,7 @@ public class LedgerStorageCheckpointTest {
         }
         handle.close();
         // simulate rolling entrylog
-        ledgerStorage.entryLogger.rollLog();
+        ((EntryLogManagerBase) ledgerStorage.getEntryLogger().getEntryLogManager()).createNewLog(ledgerId);
         // sleep for a bit for checkpoint to do its task
         executorController.advance(Duration.ofMillis(500));
 
@@ -490,6 +495,7 @@ public class LedgerStorageCheckpointTest {
         BookKeeper bkClient = new BookKeeper(clientConf);
         InterleavedLedgerStorage ledgerStorage = (InterleavedLedgerStorage) server.getBookie().ledgerStorage;
         EntryLogger entryLogger = ledgerStorage.entryLogger;
+        EntryLogManagerBase entryLogManagerBase = (EntryLogManagerBase) entryLogger.getEntryLogManager();
 
         int numOfEntries = 5;
         byte[] dataBytes = "data".getBytes();
@@ -501,7 +507,7 @@ public class LedgerStorageCheckpointTest {
         }
         handle.close();
         // simulate rolling entrylog
-        ledgerStorage.entryLogger.rollLog();
+        entryLogManagerBase.createNewLog(ledgerId);
 
         ledgerId = 20;
         handle = bkClient.createLedgerAdv(ledgerId, 1, 1, 1, DigestType.CRC32, "passwd".getBytes(), null);
@@ -510,7 +516,7 @@ public class LedgerStorageCheckpointTest {
         }
         handle.close();
         // simulate rolling entrylog
-        ledgerStorage.entryLogger.rollLog();
+        entryLogManagerBase.createNewLog(ledgerId);
 
         ledgerId = 30;
         handle = bkClient.createLedgerAdv(ledgerId, 1, 1, 1, DigestType.CRC32, "passwd".getBytes(), null);
@@ -519,9 +525,14 @@ public class LedgerStorageCheckpointTest {
         }
         handle.close();
 
-        Assert.assertNotEquals("bytesWrittenSinceLastFlush shouldn't be zero", 0,
-                entryLogger.logChannel.getUnpersistedBytes());
-        Assert.assertNotEquals("There should be logChannelsToFlush", 0, entryLogger.logChannelsToFlush.size());
+        Set<BufferedLogChannel> copyOfCurrentLogs = new HashSet<BufferedLogChannel>(
+                Arrays.asList(entryLogManagerBase.getCurrentLogForLedger(EntryLogger.UNASSIGNED_LEDGERID)));
+        for (BufferedLogChannel currentLog : copyOfCurrentLogs) {
+            Assert.assertNotEquals("bytesWrittenSinceLastFlush shouldn't be zero", 0,
+                    currentLog.getUnpersistedBytes());
+        }
+        Assert.assertNotEquals("There should be logChannelsToFlush", 0,
+                entryLogManagerBase.getRotatedLogChannels().size());
 
         /*
          * wait for atleast flushInterval period, so that checkpoint can happen.
@@ -532,11 +543,16 @@ public class LedgerStorageCheckpointTest {
          * since checkpoint happenend, there shouldn't be any logChannelsToFlush
          * and bytesWrittenSinceLastFlush should be zero.
          */
+        List<BufferedLogChannel> copyOfRotatedLogChannels = entryLogManagerBase.getRotatedLogChannels();
         Assert.assertTrue("There shouldn't be logChannelsToFlush",
-                ((entryLogger.logChannelsToFlush == null) || (entryLogger.logChannelsToFlush.size() == 0)));
+                ((copyOfRotatedLogChannels == null) || (copyOfRotatedLogChannels.size() == 0)));
 
-        Assert.assertEquals("bytesWrittenSinceLastFlush should be zero", 0,
-                entryLogger.logChannel.getUnpersistedBytes());
+        copyOfCurrentLogs = new HashSet<BufferedLogChannel>(
+                Arrays.asList(entryLogManagerBase.getCurrentLogForLedger(EntryLogger.UNASSIGNED_LEDGERID)));
+        for (BufferedLogChannel currentLog : copyOfCurrentLogs) {
+            Assert.assertEquals("bytesWrittenSinceLastFlush should be zero", 0,
+                    currentLog.getUnpersistedBytes());
+        }
     }
 
     static class MockInterleavedLedgerStorage extends InterleavedLedgerStorage {
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java
index c183fbf..9642c18 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java
@@ -36,9 +36,11 @@ import lombok.RequiredArgsConstructor;
 import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
+import org.apache.bookkeeper.bookie.EntryLogger.EntryLogManagerForSingleEntryLog;
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -99,6 +101,7 @@ public class SortedLedgerStorageCheckpointTest extends LedgerStorageTestBase {
     public SortedLedgerStorageCheckpointTest() {
         super();
         conf.setEntryLogSizeLimit(1);
+        conf.setEntryLogFilePreAllocationEnabled(false);
         this.checkpoints = new LinkedBlockingQueue<>();
     }
 
@@ -182,6 +185,7 @@ public class SortedLedgerStorageCheckpointTest extends LedgerStorageTestBase {
         assertEquals(new TestCheckpoint(0), memtableCp);
 
         // trigger a memtable flush
+        Assert.assertNotNull("snapshot shouldn't have returned null", storage.memTable.snapshot());
         storage.onSizeLimitReached(checkpointSrc.newCheckpoint());
         // wait for checkpoint to complete
         checkpoints.poll(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
@@ -219,9 +223,11 @@ public class SortedLedgerStorageCheckpointTest extends LedgerStorageTestBase {
         });
 
         // simulate entry log is rotated (due to compaction)
-        storage.entryLogger.rollLog();
+        EntryLogManagerForSingleEntryLog entryLogManager = (EntryLogManagerForSingleEntryLog) storage.getEntryLogger()
+                .getEntryLogManager();
+        entryLogManager.createNewLog(EntryLogger.UNASSIGNED_LEDGERID);
         long leastUnflushedLogId = storage.entryLogger.getLeastUnflushedLogId();
-        long currentLogId = storage.entryLogger.getCurrentLogId();
+        long currentLogId = entryLogManager.getCurrentLogId();
         log.info("Least unflushed entry log : current = {}, leastUnflushed = {}", currentLogId, leastUnflushedLogId);
 
         readyLatch.countDown();
@@ -230,6 +236,7 @@ public class SortedLedgerStorageCheckpointTest extends LedgerStorageTestBase {
         assertEquals(20, storage.memTable.kvmap.size());
 
         // trigger a memtable flush
+        Assert.assertNotNull("snapshot shouldn't have returned null", storage.memTable.snapshot());
         storage.onSizeLimitReached(checkpointSrc.newCheckpoint());
         assertEquals(new TestCheckpoint(100), checkpoints.poll(Long.MAX_VALUE, TimeUnit.MILLISECONDS));
 
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java
index 67c69fe..7b44c94 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java
@@ -227,7 +227,7 @@ public class DbLedgerStorageTest {
         newEntry3.writeLong(4); // ledger id
         newEntry3.writeLong(3); // entry id
         newEntry3.writeBytes("new-entry-3".getBytes());
-        long location = entryLogger.addEntry(4, newEntry3, false);
+        long location = entryLogger.addEntry(4L, newEntry3, false);
 
         List<EntryLocation> locations = Lists.newArrayList(new EntryLocation(4, 3, location));
         singleDirStorage.updateEntriesLocations(locations);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java
index bdc8901..35b2fd3 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java
@@ -306,4 +306,4 @@ public class ReadOnlyBookieTest extends BookKeeperClusterTestCase {
             assertEquals("Entry should contain correct data", "data", new String(entry.getEntry()));
         }
     }
-}
+}
\ No newline at end of file

-- 
To stop receiving notification emails like this one, please contact
sijie@apache.org.