You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2018/04/18 17:30:27 UTC

[GitHub] sijie closed pull request #1281: Issue #570: Introducing EntryLogManager.

sijie closed pull request #1281: Issue #570: Introducing EntryLogManager.
URL: https://github.com/apache/bookkeeper/pull/1281
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
index 05a20e549..53628cfd1 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
@@ -53,6 +53,7 @@
      * calling fileChannel.force
      */
     protected final long unpersistedBytesBound;
+    private final boolean doRegularFlushes;
 
     /*
      * it tracks the number of bytes which are not persisted yet by force
@@ -81,6 +82,7 @@ public BufferedChannel(FileChannel fc, int writeCapacity, int readCapacity, long
         this.writeBuffer = ByteBufAllocator.DEFAULT.directBuffer(writeCapacity);
         this.unpersistedBytes = new AtomicLong(0);
         this.unpersistedBytesBound = unpersistedBytesBound;
+        this.doRegularFlushes = unpersistedBytesBound > 0;
     }
 
     @Override
@@ -114,7 +116,7 @@ public void write(ByteBuf src) throws IOException {
             }
             position.addAndGet(copied);
             unpersistedBytes.addAndGet(copied);
-            if (unpersistedBytesBound > 0) {
+            if (doRegularFlushes) {
                 if (unpersistedBytes.get() >= unpersistedBytesBound) {
                     flush();
                     shouldForceWrite = true;
@@ -156,6 +158,21 @@ public void flushAndForceWrite(boolean forceMetadata) throws IOException {
         forceWrite(forceMetadata);
     }
 
+    /**
+     * calls both flush and forceWrite methods if regular flush is enabled.
+     *
+     * @param forceMetadata
+     *            - If true then this method is required to force changes to
+     *            both the file's content and metadata to be written to storage;
+     *            otherwise, it need only force content changes to be written
+     * @throws IOException
+     */
+    public void flushAndForceWriteIfRegularFlush(boolean forceMetadata) throws IOException {
+        if (doRegularFlushes) {
+            flushAndForceWrite(forceMetadata);
+        }
+    }
+
     /**
      * Write any data in the buffer to the file and advance the writeBufferPosition.
      * Callers are expected to synchronize appropriately
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
index 99b0f49ad..158525d5f 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
@@ -51,12 +51,16 @@
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -84,6 +88,9 @@
  */
 public class EntryLogger {
     private static final Logger LOG = LoggerFactory.getLogger(EntryLogger.class);
+    static final long UNASSIGNED_LEDGERID = -1L;
+    // log file suffix
+    private static final String LOG_FILE_SUFFIX = ".log";
 
     @VisibleForTesting
     static final int UNINITIALIZED_LOG_ID = -0xDEAD;
@@ -92,13 +99,10 @@
         private final long logId;
         private final EntryLogMetadata entryLogMetadata;
         private final File logFile;
+        private long ledgerIdAssigned = UNASSIGNED_LEDGERID;
 
-        public BufferedLogChannel(FileChannel fc,
-                                  int writeCapacity,
-                                  int readCapacity,
-                                  long logId,
-                                  File logFile,
-                                  long unpersistedBytesBound) throws IOException {
+        public BufferedLogChannel(FileChannel fc, int writeCapacity, int readCapacity, long logId, File logFile,
+                long unpersistedBytesBound) throws IOException {
             super(fc, writeCapacity, readCapacity, unpersistedBytesBound);
             this.logId = logId;
             this.entryLogMetadata = new EntryLogMetadata(logId);
@@ -120,21 +124,104 @@ public ConcurrentLongLongHashMap getLedgersMap() {
             return entryLogMetadata.getLedgersMap();
         }
 
+        public Long getLedgerIdAssigned() {
+            return ledgerIdAssigned;
+        }
+
+        public void setLedgerIdAssigned(Long ledgerId) {
+            this.ledgerIdAssigned = ledgerId;
+        }
+
         @Override
         public String toString() {
             return MoreObjects.toStringHelper(BufferedChannel.class)
                 .add("logId", logId)
                 .add("logFile", logFile)
+                .add("ledgerIdAssigned", ledgerIdAssigned)
                 .toString();
         }
+
+        /**
+         * Append the ledger map at the end of the entry log.
+         * Updates the entry log file header with the offset and size of the map.
+         */
+        private void appendLedgersMap() throws IOException {
+
+            long ledgerMapOffset = this.position();
+
+            ConcurrentLongLongHashMap ledgersMap = this.getLedgersMap();
+            int numberOfLedgers = (int) ledgersMap.size();
+
+            // Write the ledgers map into several batches
+
+            final int maxMapSize = LEDGERS_MAP_HEADER_SIZE + LEDGERS_MAP_ENTRY_SIZE * LEDGERS_MAP_MAX_BATCH_SIZE;
+            final ByteBuf serializedMap = ByteBufAllocator.DEFAULT.buffer(maxMapSize);
+
+            try {
+                ledgersMap.forEach(new BiConsumerLong() {
+                    int remainingLedgers = numberOfLedgers;
+                    boolean startNewBatch = true;
+                    int remainingInBatch = 0;
+
+                    @Override
+                    public void accept(long ledgerId, long size) {
+                        if (startNewBatch) {
+                            int batchSize = Math.min(remainingLedgers, LEDGERS_MAP_MAX_BATCH_SIZE);
+                            int ledgerMapSize = LEDGERS_MAP_HEADER_SIZE + LEDGERS_MAP_ENTRY_SIZE * batchSize;
+
+                            serializedMap.clear();
+                            serializedMap.writeInt(ledgerMapSize - 4);
+                            serializedMap.writeLong(INVALID_LID);
+                            serializedMap.writeLong(LEDGERS_MAP_ENTRY_ID);
+                            serializedMap.writeInt(batchSize);
+
+                            startNewBatch = false;
+                            remainingInBatch = batchSize;
+                        }
+                        // Dump the ledger in the current batch
+                        serializedMap.writeLong(ledgerId);
+                        serializedMap.writeLong(size);
+                        --remainingLedgers;
+
+                        if (--remainingInBatch == 0) {
+                            // Close current batch
+                            try {
+                                write(serializedMap);
+                            } catch (IOException e) {
+                                throw new RuntimeException(e);
+                            }
+
+                            startNewBatch = true;
+                        }
+                    }
+                });
+            } catch (RuntimeException e) {
+                if (e.getCause() instanceof IOException) {
+                    throw (IOException) e.getCause();
+                } else {
+                    throw e;
+                }
+            } finally {
+                serializedMap.release();
+            }
+            // Flush the ledger's map out before we write the header.
+            // Otherwise the header might point to something that is not fully
+            // written
+            super.flush();
+
+            // Update the headers with the map offset and count of ledgers
+            ByteBuffer mapInfo = ByteBuffer.allocate(8 + 4);
+            mapInfo.putLong(ledgerMapOffset);
+            mapInfo.putInt(numberOfLedgers);
+            mapInfo.flip();
+            this.fileChannel.write(mapInfo, LEDGERS_MAP_OFFSET_POSITION);
+        }
     }
 
-    volatile File currentDir;
     private final LedgerDirsManager ledgerDirsManager;
     private final boolean entryLogPerLedgerEnabled;
-    private final AtomicBoolean shouldCreateNewEntryLog = new AtomicBoolean(false);
 
-    private volatile long leastUnflushedLogId;
+    final RecentEntryLogsStatus recentlyCreatedEntryLogsStatus;
 
     /**
      * locks for compaction log.
@@ -145,11 +232,11 @@ public String toString() {
      * The maximum size of a entry logger file.
      */
     final long logSizeLimit;
-    List<BufferedLogChannel> logChannelsToFlush;
-    volatile BufferedLogChannel logChannel;
     private volatile BufferedLogChannel compactionLogChannel;
 
-    private final EntryLoggerAllocator entryLoggerAllocator;
+    final EntryLoggerAllocator entryLoggerAllocator;
+    private final EntryLogManager entryLogManager;
+
     private final boolean entryLogPreAllocationEnabled;
     private final CopyOnWriteArrayList<EntryLogListener> listeners = new CopyOnWriteArrayList<EntryLogListener>();
 
@@ -268,6 +355,8 @@ public EntryLogger(ServerConfiguration conf,
         // but the protocol varies so an exact value is difficult to determine
         this.maxSaneEntrySize = conf.getNettyMaxFrameSizeBytes() - 500;
         this.ledgerDirsManager = ledgerDirsManager;
+        this.conf = conf;
+        entryLogPerLedgerEnabled = conf.isEntryLogPerLedgerEnabled();
         if (listener != null) {
             addListener(listener);
         }
@@ -296,12 +385,72 @@ public EntryLogger(ServerConfiguration conf,
                 logId = lastLogId;
             }
         }
-        this.leastUnflushedLogId = logId + 1;
+        this.recentlyCreatedEntryLogsStatus = new RecentEntryLogsStatus(logId + 1);
         this.entryLoggerAllocator = new EntryLoggerAllocator(logId);
-        this.conf = conf;
-        this.entryLogPerLedgerEnabled = conf.isEntryLogPerLedgerEnabled();
+        if (entryLogPerLedgerEnabled) {
+            this.entryLogManager = new EntryLogManagerForSingleEntryLog(ledgerDirsManager) {
+                @Override
+                public void checkpoint() throws IOException {
+                    /*
+                     * In the case of entryLogPerLedgerEnabled we need to flush
+                     * both rotatedlogs and currentlogs. This is needed because
+                     * syncThread periodically does checkpoint and at this time
+                     * all the logs should be flushed.
+                     *
+                     */
+                    super.flush();
+                }
+
+                @Override
+                public void prepareSortedLedgerStorageCheckpoint(long numBytesFlushed) throws IOException {
+                    // do nothing
+                    /*
+                     * prepareSortedLedgerStorageCheckpoint is required for
+                     * singleentrylog scenario, but it is not needed for
+                     * entrylogperledger scenario, since entries of a ledger go
+                     * to a entrylog (even during compaction) and SyncThread
+                     * drives periodic checkpoint logic.
+                     */
+
+                }
+
+                @Override
+                public void prepareEntryMemTableFlush() {
+                    // do nothing
+                }
+
+                @Override
+                public boolean commitEntryMemTableFlush() throws IOException {
+                    // lock it only if there is new data
+                    // so that cache accesstime is not changed
+                    Set<BufferedLogChannel> copyOfCurrentLogs = new HashSet<BufferedLogChannel>(
+                            Arrays.asList(super.getCurrentLogForLedger(EntryLogger.UNASSIGNED_LEDGERID)));
+                    for (BufferedLogChannel currentLog : copyOfCurrentLogs) {
+                        if (reachEntryLogLimit(currentLog, 0L)) {
+                            synchronized (this) {
+                                if (reachEntryLogLimit(currentLog, 0L)) {
+                                    LOG.info("Rolling entry logger since it reached size limitation");
+                                    createNewLog(EntryLogger.UNASSIGNED_LEDGERID);
+                                }
+                            }
+                        }
+                    }
+                    /*
+                     * in the case of entrylogperledger, SyncThread drives
+                     * checkpoint logic for every flushInterval. So
+                     * EntryMemtable doesn't need to call checkpoint in the case
+                     * of entrylogperledger.
+                     */
+                    return false;
+                }
+            };
+        } else {
+            this.entryLogManager = new EntryLogManagerForSingleEntryLog(ledgerDirsManager);
+        }
+    }
 
-        initialize();
+    EntryLogManager getEntryLogManager() {
+        return entryLogManager;
     }
 
     void addListener(EntryLogListener listener) {
@@ -324,13 +473,11 @@ void addListener(EntryLogListener listener) {
      */
     private int readFromLogChannel(long entryLogId, BufferedReadChannel channel, ByteBuf buff, long pos)
             throws IOException {
-        BufferedLogChannel bc = logChannel;
+        BufferedLogChannel bc = entryLogManager.getCurrentLogIfPresent(entryLogId);
         if (null != bc) {
-            if (entryLogId == bc.getLogId()) {
-                synchronized (bc) {
-                    if (pos + buff.writableBytes() >= bc.getFileChannelPosition()) {
-                        return bc.read(buff, pos);
-                    }
+            synchronized (bc) {
+                if (pos + buff.writableBytes() >= bc.getFileChannelPosition()) {
+                    return bc.read(buff, pos);
                 }
             }
         }
@@ -397,17 +544,12 @@ public BufferedReadChannel getFromChannels(long logId) {
      *
      * @return least unflushed log id.
      */
-    synchronized long getLeastUnflushedLogId() {
-        return leastUnflushedLogId;
+    long getLeastUnflushedLogId() {
+        return recentlyCreatedEntryLogsStatus.getLeastUnflushedLogId();
     }
 
-    synchronized long getCurrentLogId() {
-        BufferedLogChannel channel = logChannel;
-        if (null == channel) {
-            return UNINITIALIZED_LOG_ID;
-        } else {
-            return channel.getLogId();
-        }
+    long getPreviousAllocatedEntryLogId() {
+        return entryLoggerAllocator.getPreallocatedLogId();
     }
 
     /**
@@ -422,75 +564,16 @@ File getCurCompactionLogFile() {
         }
     }
 
-    protected void initialize() throws IOException {
-        // Register listener for disk full notifications.
-        ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener());
-
-        if (ledgerDirsManager.hasWritableLedgerDirs()) {
-            createNewLog();
-        }
+    void prepareSortedLedgerStorageCheckpoint(long numBytesFlushed) throws IOException {
+        entryLogManager.prepareSortedLedgerStorageCheckpoint(numBytesFlushed);
     }
 
-    private LedgerDirsListener getLedgerDirsListener() {
-        return new LedgerDirsListener() {
-            @Override
-            public void diskFull(File disk) {
-                // If the current entry log disk is full, then create new entry
-                // log.
-                if (currentDir != null && currentDir.equals(disk)) {
-                    shouldCreateNewEntryLog.set(true);
-                }
-            }
-
-            @Override
-            public void diskAlmostFull(File disk) {
-                // If the current entry log disk is almost full, then create new entry
-                // log.
-                if (currentDir != null && currentDir.equals(disk)) {
-                    shouldCreateNewEntryLog.set(true);
-                }
-            }
-        };
+    void prepareEntryMemTableFlush() {
+        entryLogManager.prepareEntryMemTableFlush();
     }
 
-    /**
-     * Rolling a new log file to write.
-     */
-    synchronized void rollLog() throws IOException {
-        createNewLog();
-    }
-
-    /**
-     * Creates a new log file.
-     */
-    void createNewLog() throws IOException {
-        // first tried to create a new log channel. add current log channel to ToFlush list only when
-        // there is a new log channel. it would prevent that a log channel is referenced by both
-        // *logChannel* and *ToFlush* list.
-        if (null != logChannel) {
-            if (null == logChannelsToFlush) {
-                logChannelsToFlush = new LinkedList<BufferedLogChannel>();
-            }
-
-            // flush the internal buffer back to filesystem but not sync disk
-            // so the readers could access the data from filesystem.
-            logChannel.flush();
-
-            // Append ledgers map at the end of entry log
-            appendLedgersMap(logChannel);
-
-            BufferedLogChannel newLogChannel = entryLoggerAllocator.createNewLog();
-            logChannelsToFlush.add(logChannel);
-            LOG.info("Flushing entry logger {} back to filesystem, pending for syncing entry loggers : {}.",
-                    logChannel.getLogId(), logChannelsToFlush);
-            for (EntryLogListener listener : listeners) {
-                listener.onRotateEntryLog();
-            }
-            logChannel = newLogChannel;
-        } else {
-            logChannel = entryLoggerAllocator.createNewLog();
-        }
-        currentDir = logChannel.getLogFile().getParentFile();
+    boolean commitEntryMemTableFlush() throws IOException {
+        return entryLogManager.commitEntryMemTableFlush();
     }
 
     /**
@@ -499,79 +582,6 @@ void createNewLog() throws IOException {
     EntryLoggerAllocator getEntryLoggerAllocator() {
         return entryLoggerAllocator;
     }
-    /**
-     * Append the ledger map at the end of the entry log.
-     * Updates the entry log file header with the offset and size of the map.
-     */
-    private void appendLedgersMap(BufferedLogChannel entryLogChannel) throws IOException {
-        long ledgerMapOffset = entryLogChannel.position();
-
-        ConcurrentLongLongHashMap ledgersMap = entryLogChannel.getLedgersMap();
-        int numberOfLedgers = (int) ledgersMap.size();
-
-        // Write the ledgers map into several batches
-
-        final int maxMapSize = LEDGERS_MAP_HEADER_SIZE + LEDGERS_MAP_ENTRY_SIZE * LEDGERS_MAP_MAX_BATCH_SIZE;
-        final ByteBuf serializedMap = ByteBufAllocator.DEFAULT.buffer(maxMapSize);
-
-        try {
-            ledgersMap.forEach(new BiConsumerLong() {
-                int remainingLedgers = numberOfLedgers;
-                boolean startNewBatch = true;
-                int remainingInBatch = 0;
-
-                @Override
-                public void accept(long ledgerId, long size) {
-                    if (startNewBatch) {
-                        int batchSize = Math.min(remainingLedgers, LEDGERS_MAP_MAX_BATCH_SIZE);
-                        int ledgerMapSize = LEDGERS_MAP_HEADER_SIZE + LEDGERS_MAP_ENTRY_SIZE * batchSize;
-
-                        serializedMap.clear();
-                        serializedMap.writeInt(ledgerMapSize - 4);
-                        serializedMap.writeLong(INVALID_LID);
-                        serializedMap.writeLong(LEDGERS_MAP_ENTRY_ID);
-                        serializedMap.writeInt(batchSize);
-
-                        startNewBatch = false;
-                        remainingInBatch = batchSize;
-                    }
-                    // Dump the ledger in the current batch
-                    serializedMap.writeLong(ledgerId);
-                    serializedMap.writeLong(size);
-                    --remainingLedgers;
-
-                    if (--remainingInBatch == 0) {
-                        // Close current batch
-                        try {
-                            entryLogChannel.write(serializedMap);
-                        } catch (IOException e) {
-                            throw new RuntimeException(e);
-                        }
-
-                        startNewBatch = true;
-                    }
-                }
-            });
-        } catch (RuntimeException e) {
-            if (e.getCause() instanceof IOException) {
-                throw (IOException) e.getCause();
-            } else {
-                throw e;
-            }
-        } finally {
-            serializedMap.release();
-        }
-        // Flush the ledger's map out before we write the header.
-        // Otherwise the header might point to something that is not fully written
-        entryLogChannel.flush();
-
-        // Update the headers with the map offset and count of ledgers
-        ByteBuffer mapInfo = ByteBuffer.allocate(8 + 4);
-        mapInfo.putLong(ledgerMapOffset);
-        mapInfo.putInt(numberOfLedgers);
-        mapInfo.flip();
-        entryLogChannel.fileChannel.write(mapInfo, LEDGERS_MAP_OFFSET_POSITION);
-    }
 
     /**
      * An allocator pre-allocates entry log files.
@@ -589,6 +599,10 @@ public void accept(long ledgerId, long size) {
             allocatorExecutor = Executors.newSingleThreadExecutor();
         }
 
+        synchronized long getPreallocatedLogId() {
+            return preallocatedLogId;
+        }
+
         BufferedLogChannel createNewLog() throws IOException {
             synchronized (createEntryLogLock) {
                 BufferedLogChannel bc;
@@ -630,37 +644,42 @@ BufferedLogChannel createNewLogForCompaction() throws IOException {
             }
         }
 
-        private BufferedLogChannel allocateNewLog() throws IOException {
+        private synchronized BufferedLogChannel allocateNewLog() throws IOException {
             return allocateNewLog(".log");
         }
 
         /**
          * Allocate a new log file.
          */
-        private BufferedLogChannel allocateNewLog(String suffix) throws IOException {
+        private synchronized BufferedLogChannel allocateNewLog(String suffix) throws IOException {
             List<File> list = ledgerDirsManager.getWritableLedgerDirsForNewLog();
-            Collections.shuffle(list);
+            File dirForNextEntryLog = entryLogManager.getDirForNextEntryLog(list);
+
+            List<File> ledgersDirs = ledgerDirsManager.getAllLedgerDirs();
+            String logFileName;
             // It would better not to overwrite existing entry log files
-            File newLogFile = null;
+            File testLogFile = null;
             do {
                 if (preallocatedLogId >= Integer.MAX_VALUE) {
                     preallocatedLogId = 0;
                 } else {
                     ++preallocatedLogId;
                 }
-                String logFileName = Long.toHexString(preallocatedLogId) + suffix;
-                for (File dir : list) {
-                    newLogFile = new File(dir, logFileName);
-                    if (newLogFile.exists()) {
-                        LOG.warn("Found existed entry log " + newLogFile
+                logFileName = Long.toHexString(preallocatedLogId) + suffix;
+                for (File dir : ledgersDirs) {
+                    testLogFile = new File(dir, logFileName);
+                    if (testLogFile.exists()) {
+                        LOG.warn("Found existed entry log " + testLogFile
                                + " when trying to create it as a new log.");
-                        newLogFile = null;
+                        testLogFile = null;
                         break;
                     }
                 }
-            } while (newLogFile == null);
+            } while (testLogFile == null);
 
+            File newLogFile = new File(dirForNextEntryLog, logFileName);
             FileChannel channel = new RandomAccessFile(newLogFile, "rw").getChannel();
+
             BufferedLogChannel logChannel = new BufferedLogChannel(channel, conf.getWriteBufferBytes(),
                     conf.getReadBufferBytes(), preallocatedLogId, newLogFile, conf.getFlushIntervalInBytes());
             logfileHeader.readerIndex(0);
@@ -669,6 +688,11 @@ private BufferedLogChannel allocateNewLog(String suffix) throws IOException {
             for (File f : list) {
                 setLastLogId(f, preallocatedLogId);
             }
+
+            if (suffix.equals(LOG_FILE_SUFFIX)) {
+                recentlyCreatedEntryLogsStatus.createdEntryLog(preallocatedLogId);
+            }
+
             LOG.info("Created new entry log file {} for logId {}.", newLogFile, preallocatedLogId);
             return logChannel;
         }
@@ -788,89 +812,418 @@ private long readLastLogId(File f) {
         }
     }
 
-    /**
-     * Flushes all rotated log channels. After log channels are flushed,
-     * move leastUnflushedLogId ptr to current logId.
-     */
-    void checkpoint() throws IOException {
-        flushRotatedLogs();
+    interface EntryLogManager {
+
+        /*
+         * add entry to the corresponding entrylog and return the position of
+         * the entry in the entrylog
+         */
+        long addEntry(long ledger, ByteBuf entry, boolean rollLog) throws IOException;
+
+        /*
+         * gets the active logChannel with the given entryLogId. null if it is
+         * not existing.
+         */
+        BufferedLogChannel getCurrentLogIfPresent(long entryLogId);
+
         /*
-         * In the case of entryLogPerLedgerEnabled we need to flush both
-         * rotatedlogs and currentlogs. This is needed because syncThread
-         * periodically does checkpoint and at this time all the logs should
-         * be flushed.
+         * Returns eligible writable ledger dir for the creation next entrylog
+         */
+        File getDirForNextEntryLog(List<File> writableLedgerDirs);
+
+        /*
+         * Do the operations required for checkpoint.
+         */
+        void checkpoint() throws IOException;
+
+        /*
+         * flush both current and rotated logs.
+         */
+        void flush() throws IOException;
+
+        /*
+         * close current logs.
+         */
+        void close() throws IOException;
+
+        /*
+         * force close current logs.
+         */
+        void forceClose();
+
+        /*
+         *
+         */
+        void prepareSortedLedgerStorageCheckpoint(long numBytesFlushed) throws IOException;
+
+        /*
+         * this method should be called before doing entrymemtable flush, it
+         * would save the state of the entrylogger before entrymemtable flush
+         * and commitEntryMemTableFlush would take appropriate action after
+         * entrymemtable flush.
+         */
+        void prepareEntryMemTableFlush();
+
+        /*
+         * this method should be called after doing entrymemtable flush,it would
+         * take appropriate action after entrymemtable flush depending on the
+         * current state of the entrylogger and the state of the entrylogger
+         * during prepareEntryMemTableFlush.
          *
-         * TODO: When EntryLogManager is introduced in the subsequent sub-tasks of
-         * this Issue, I will move this logic to individual implamentations of
-         * EntryLogManager and it would be free of this booalen flag based logic.
+         * It is assumed that there would be corresponding
+         * prepareEntryMemTableFlush for every commitEntryMemTableFlush and both
+         * would be called from the same thread.
          *
+         * returns boolean value indicating whether EntryMemTable should do checkpoint
+         * after this commit method.
          */
-        if (entryLogPerLedgerEnabled) {
-            flushCurrentLog();
+        boolean commitEntryMemTableFlush() throws IOException;
+    }
+
+    abstract class EntryLogManagerBase implements EntryLogManager {
+        volatile List<BufferedLogChannel> rotatedLogChannels;
+
+        private final FastThreadLocal<ByteBuf> sizeBufferForAdd = new FastThreadLocal<ByteBuf>() {
+            @Override
+            protected ByteBuf initialValue() throws Exception {
+                return Unpooled.buffer(4);
+            }
+        };
+
+        /*
+         * This method should be guarded by a lock, so callers of this method
+         * should be in the right scope of the lock.
+         */
+        @Override
+        public long addEntry(long ledger, ByteBuf entry, boolean rollLog) throws IOException {
+            int entrySize = entry.readableBytes() + 4; // Adding 4 bytes to prepend the size
+            BufferedLogChannel logChannel = getCurrentLogForLedgerForAddEntry(ledger, entrySize, rollLog);
+            ByteBuf sizeBuffer = sizeBufferForAdd.get();
+            sizeBuffer.clear();
+            sizeBuffer.writeInt(entry.readableBytes());
+            logChannel.write(sizeBuffer);
+
+            long pos = logChannel.position();
+            logChannel.write(entry);
+            logChannel.registerWrittenEntry(ledger, entrySize);
+
+            return (logChannel.getLogId() << 32L) | pos;
+        }
+
+        boolean reachEntryLogLimit(BufferedLogChannel logChannel, long size) {
+            if (logChannel == null) {
+                return false;
+            }
+            return logChannel.position() + size > logSizeLimit;
+        }
+
+        boolean readEntryLogHardLimit(BufferedLogChannel logChannel, long size) {
+            if (logChannel == null) {
+                return false;
+            }
+            return logChannel.position() + size > Integer.MAX_VALUE;
+        }
+
+        abstract BufferedLogChannel getCurrentLogForLedger(long ledgerId);
+
+        abstract BufferedLogChannel getCurrentLogForLedgerForAddEntry(long ledgerId, int entrySize, boolean rollLog)
+                throws IOException;
+
+        abstract void setCurrentLogForLedgerAndAddToRotate(long ledgerId, BufferedLogChannel logChannel);
+
+        /*
+         * flush current logs.
+         */
+        abstract void flushCurrentLogs() throws IOException;
+
+        /*
+         * flush rotated logs.
+         */
+        abstract void flushRotatedLogs() throws IOException;
+
+        List<BufferedLogChannel> getRotatedLogChannels() {
+            return rotatedLogChannels;
+        }
+
+        @Override
+        public void flush() throws IOException {
+            flushRotatedLogs();
+            flushCurrentLogs();
+        }
+
+        void flushLogChannel(BufferedLogChannel logChannel, boolean forceMetadata) throws IOException {
+            if (logChannel != null) {
+                logChannel.flushAndForceWrite(forceMetadata);
+                LOG.debug("Flush and sync current entry logger {}", logChannel.getLogId());
+            }
+        }
+
+        /*
+         * Creates a new log file. This method should be guarded by a lock,
+         * so callers of this method should be in right scope of the lock.
+         */
+        void createNewLog(long ledgerId) throws IOException {
+            BufferedLogChannel logChannel = getCurrentLogForLedger(ledgerId);
+            // first tried to create a new log channel. add current log channel to ToFlush list only when
+            // there is a new log channel. it would prevent that a log channel is referenced by both
+            // *logChannel* and *ToFlush* list.
+            if (null != logChannel) {
+
+                // flush the internal buffer back to filesystem but not sync disk
+                logChannel.flush();
+
+                // Append ledgers map at the end of entry log
+                logChannel.appendLedgersMap();
+
+                BufferedLogChannel newLogChannel = entryLoggerAllocator.createNewLog();
+                setCurrentLogForLedgerAndAddToRotate(ledgerId, newLogChannel);
+                LOG.info("Flushing entry logger {} back to filesystem, pending for syncing entry loggers : {}.",
+                        logChannel.getLogId(), rotatedLogChannels);
+                for (EntryLogListener listener : listeners) {
+                    listener.onRotateEntryLog();
+                }
+            } else {
+                setCurrentLogForLedgerAndAddToRotate(ledgerId, entryLoggerAllocator.createNewLog());
+            }
         }
     }
 
-    void flushRotatedLogs() throws IOException {
-        List<BufferedLogChannel> channels = null;
-        long flushedLogId = INVALID_LID;
-        synchronized (this) {
-            channels = logChannelsToFlush;
-            logChannelsToFlush = null;
+    class EntryLogManagerForSingleEntryLog extends EntryLogManagerBase {
+
+        private volatile BufferedLogChannel activeLogChannel;
+        private long logIdBeforeFlush = INVALID_LID;
+        private final AtomicBoolean shouldCreateNewEntryLog = new AtomicBoolean(false);
+
+        EntryLogManagerForSingleEntryLog(LedgerDirsManager ledgerDirsManager) {
+            this.rotatedLogChannels = new LinkedList<BufferedLogChannel>();
+            // Register listener for disk full notifications.
+            ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener());
         }
-        if (null == channels) {
-            return;
+
+        private LedgerDirsListener getLedgerDirsListener() {
+            return new LedgerDirsListener() {
+                @Override
+                public void diskFull(File disk) {
+                    // If the current entry log disk is full, then create new
+                    // entry log.
+                    BufferedLogChannel currentActiveLogChannel = activeLogChannel;
+                    if (currentActiveLogChannel != null
+                            && currentActiveLogChannel.getLogFile().getParentFile().equals(disk)) {
+                        shouldCreateNewEntryLog.set(true);
+                    }
+                }
+
+                @Override
+                public void diskAlmostFull(File disk) {
+                    // If the current entry log disk is almost full, then create new entry
+                    // log.
+                    BufferedLogChannel currentActiveLogChannel = activeLogChannel;
+                    if (currentActiveLogChannel != null
+                            && currentActiveLogChannel.getLogFile().getParentFile().equals(disk)) {
+                        shouldCreateNewEntryLog.set(true);
+                    }
+                }
+            };
         }
-        Iterator<BufferedLogChannel> chIter = channels.iterator();
-        while (chIter.hasNext()) {
-            BufferedLogChannel channel = chIter.next();
-            try {
-                channel.flushAndForceWrite(false);
-            } catch (IOException ioe) {
-                // rescue from flush exception, add unflushed channels back
-                synchronized (this) {
-                    if (null == logChannelsToFlush) {
-                        logChannelsToFlush = channels;
-                    } else {
-                        logChannelsToFlush.addAll(0, channels);
+
+        @Override
+        public synchronized long addEntry(long ledger, ByteBuf entry, boolean rollLog) throws IOException {
+            return super.addEntry(ledger, entry, rollLog);
+        }
+
+        @Override
+        synchronized BufferedLogChannel getCurrentLogForLedgerForAddEntry(long ledgerId, int entrySize,
+                boolean rollLog) throws IOException {
+            if (null == activeLogChannel) {
+                // log channel can be null because the file is deferred to be created
+                createNewLog(UNASSIGNED_LEDGERID);
+            }
+
+            boolean reachEntryLogLimit = rollLog ? reachEntryLogLimit(activeLogChannel, entrySize)
+                    : readEntryLogHardLimit(activeLogChannel, entrySize);
+            // Create new log if logSizeLimit reached or current disk is full
+            boolean createNewLog = shouldCreateNewEntryLog.get();
+            if (createNewLog || reachEntryLogLimit) {
+                if (activeLogChannel != null) {
+                    activeLogChannel.flushAndForceWriteIfRegularFlush(false);
+                }
+                createNewLog(UNASSIGNED_LEDGERID);
+                // Reset the flag
+                if (createNewLog) {
+                    shouldCreateNewEntryLog.set(false);
+                }
+            }
+            return activeLogChannel;
+        }
+
+        @Override
+        synchronized void createNewLog(long ledgerId) throws IOException {
+            super.createNewLog(ledgerId);
+        }
+
+        @Override
+        public synchronized void setCurrentLogForLedgerAndAddToRotate(long ledgerId, BufferedLogChannel logChannel) {
+            BufferedLogChannel hasToRotateLogChannel = activeLogChannel;
+            activeLogChannel = logChannel;
+            if (hasToRotateLogChannel != null) {
+                rotatedLogChannels.add(hasToRotateLogChannel);
+            }
+        }
+
+        @Override
+        public BufferedLogChannel getCurrentLogForLedger(long ledgerId) {
+            return activeLogChannel;
+        }
+
+        @Override
+        public BufferedLogChannel getCurrentLogIfPresent(long entryLogId) {
+            BufferedLogChannel activeLogChannelTemp = activeLogChannel;
+            if ((activeLogChannelTemp != null) && (activeLogChannelTemp.getLogId() == entryLogId)) {
+                return activeLogChannelTemp;
+            }
+            return null;
+        }
+
+        @Override
+        public File getDirForNextEntryLog(List<File> writableLedgerDirs) {
+            Collections.shuffle(writableLedgerDirs);
+            return writableLedgerDirs.get(0);
+        }
+
+        @Override
+        public void checkpoint() throws IOException {
+            flushRotatedLogs();
+        }
+
+        public long getCurrentLogId() {
+            BufferedLogChannel currentActiveLogChannel = activeLogChannel;
+            if (currentActiveLogChannel != null) {
+                return currentActiveLogChannel.getLogId();
+            } else {
+                return EntryLogger.UNINITIALIZED_LOG_ID;
+            }
+        }
+
+        @Override
+        public void flushCurrentLogs() throws IOException {
+            BufferedLogChannel currentActiveLogChannel = activeLogChannel;
+            if (currentActiveLogChannel != null) {
+                /**
+                 * flushCurrentLogs method is called during checkpoint, so
+                 * metadata of the file also should be force written.
+                 */
+                flushLogChannel(currentActiveLogChannel, true);
+            }
+        }
+
+        @Override
+        void flushRotatedLogs() throws IOException {
+            List<BufferedLogChannel> channels = null;
+            synchronized (this) {
+                channels = rotatedLogChannels;
+                rotatedLogChannels = new LinkedList<BufferedLogChannel>();
+            }
+            if (null == channels) {
+                return;
+            }
+            Iterator<BufferedLogChannel> chIter = channels.iterator();
+            while (chIter.hasNext()) {
+                BufferedLogChannel channel = chIter.next();
+                try {
+                    channel.flushAndForceWrite(true);
+                } catch (IOException ioe) {
+                    // rescue from flush exception, add unflushed channels back
+                    synchronized (this) {
+                        if (null == rotatedLogChannels) {
+                            rotatedLogChannels = channels;
+                        } else {
+                            rotatedLogChannels.addAll(0, channels);
+                        }
                     }
+                    throw ioe;
                 }
-                throw ioe;
+                // remove the channel from the list after it is successfully flushed
+                chIter.remove();
+                // since this channel is only used for writing, after flushing the channel,
+                // we had to close the underlying file channel. Otherwise, we might end up
+                // leaking fds which cause the disk spaces could not be reclaimed.
+                closeFileChannel(channel);
+                recentlyCreatedEntryLogsStatus.flushRotatedEntryLog(channel.getLogId());
+                LOG.info("Synced entry logger {} to disk.", channel.getLogId());
             }
-            // remove the channel from the list after it is successfully flushed
-            chIter.remove();
-            // since this channel is only used for writing, after flushing the channel,
-            // we had to close the underlying file channel. Otherwise, we might end up
-            // leaking fds which cause the disk spaces could not be reclaimed.
-            closeFileChannel(channel);
-            if (channel.getLogId() > flushedLogId) {
-                flushedLogId = channel.getLogId();
+        }
+
+        @Override
+        public void close() throws IOException {
+            if (activeLogChannel != null) {
+                closeFileChannel(activeLogChannel);
             }
-            LOG.info("Synced entry logger {} to disk.", channel.getLogId());
         }
-        // move the leastUnflushedLogId ptr
-        leastUnflushedLogId = flushedLogId + 1;
-    }
 
-    public void flush() throws IOException {
-        flushRotatedLogs();
-        flushCurrentLog();
-    }
+        @Override
+        public void forceClose() {
+            if (activeLogChannel != null) {
+                forceCloseFileChannel(activeLogChannel);
+            }
+        }
+
+        @Override
+        public void prepareEntryMemTableFlush() {
+            logIdBeforeFlush = getCurrentLogId();
+        }
 
-    synchronized void flushCurrentLog() throws IOException {
-        if (logChannel != null) {
-            logChannel.flushAndForceWrite(false);
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Flush and sync current entry logger {}.", logChannel.getLogId());
+        @Override
+        public boolean commitEntryMemTableFlush() throws IOException {
+            long logIdAfterFlush = getCurrentLogId();
+            /*
+             * in any case that an entry log reaches the limit, we roll the log
+             * and start checkpointing. if a memory table is flushed spanning
+             * over two entry log files, we also roll log. this is for
+             * performance consideration: since we don't wanna checkpoint a new
+             * log file that ledger storage is writing to.
+             */
+            if (reachEntryLogLimit(activeLogChannel, 0L) || logIdAfterFlush != logIdBeforeFlush) {
+                LOG.info("Rolling entry logger since it reached size limitation");
+                createNewLog(UNASSIGNED_LEDGERID);
+                return true;
+            }
+            return false;
+        }
+
+        @Override
+        public void prepareSortedLedgerStorageCheckpoint(long numBytesFlushed) throws IOException{
+            if (numBytesFlushed > 0) {
+                // if bytes are added between previous flush and this checkpoint,
+                // it means bytes might live at current active entry log, we need
+                // roll current entry log and then issue checkpoint to underlying
+                // interleaved ledger storage.
+                createNewLog(UNASSIGNED_LEDGERID);
             }
         }
     }
 
+    /**
+     * Flushes all rotated log channels. After log channels are flushed,
+     * move leastUnflushedLogId ptr to current logId.
+     */
+    void checkpoint() throws IOException {
+        entryLogManager.checkpoint();
+    }
+
+    public void flush() throws IOException {
+        entryLogManager.flush();
+    }
+
     long addEntry(long ledger, ByteBuffer entry) throws IOException {
-        return addEntry(ledger, Unpooled.wrappedBuffer(entry), true);
+        return entryLogManager.addEntry(ledger, Unpooled.wrappedBuffer(entry), true);
     }
 
     long addEntry(long ledger, ByteBuf entry) throws IOException {
-        return addEntry(ledger, entry, true);
+        return entryLogManager.addEntry(ledger, entry, true);
+    }
+
+    public long addEntry(long ledger, ByteBuf entry, boolean rollLog) throws IOException {
+        return entryLogManager.addEntry(ledger, entry, true);
     }
 
     private final FastThreadLocal<ByteBuf> sizeBuffer = new FastThreadLocal<ByteBuf>() {
@@ -880,39 +1233,6 @@ protected ByteBuf initialValue() throws Exception {
         }
     };
 
-    public synchronized long addEntry(long ledger, ByteBuf entry, boolean rollLog) throws IOException {
-        if (null == logChannel) {
-            // log channel can be null because the file is deferred to be created when no writable ledger directory
-            // is available.
-            createNewLog();
-        }
-
-        int entrySize = entry.readableBytes() + 4; // Adding 4 bytes to prepend the size
-        boolean reachEntryLogLimit =
-            rollLog ? reachEntryLogLimit(entrySize) : readEntryLogHardLimit(entrySize);
-        // Create new log if logSizeLimit reached or current disk is full
-        boolean createNewLog = shouldCreateNewEntryLog.get();
-        if (createNewLog || reachEntryLogLimit) {
-            createNewLog();
-            // Reset the flag
-            if (createNewLog) {
-                shouldCreateNewEntryLog.set(false);
-            }
-        }
-
-        // Get a buffer from thread local to store the size
-        ByteBuf sizeBuffer = this.sizeBuffer.get();
-        sizeBuffer.clear();
-        sizeBuffer.writeInt(entry.readableBytes());
-        logChannel.write(sizeBuffer);
-
-        long pos = logChannel.position();
-        logChannel.write(entry);
-        logChannel.registerWrittenEntry(ledger, entrySize);
-
-        return (logChannel.getLogId() << 32L) | pos;
-    }
-
     long addEntryForCompaction(long ledgerId, ByteBuf entry) throws IOException {
         synchronized (compactionLogLock) {
             int entrySize = entry.readableBytes() + 4;
@@ -935,7 +1255,7 @@ long addEntryForCompaction(long ledgerId, ByteBuf entry) throws IOException {
     void flushCompactionLog() throws IOException {
         synchronized (compactionLogLock) {
             if (compactionLogChannel != null) {
-                appendLedgersMap(compactionLogChannel);
+                compactionLogChannel.appendLedgersMap();
                 compactionLogChannel.flushAndForceWrite(false);
                 LOG.info("Flushed compaction log file {} with logId.",
                     compactionLogChannel.getLogFile(),
@@ -982,13 +1302,7 @@ static long logIdForOffset(long offset) {
         return offset >> 32L;
     }
 
-    synchronized boolean reachEntryLogLimit(long size) {
-        return logChannel.position() + size > logSizeLimit;
-    }
 
-    synchronized boolean readEntryLogHardLimit(long size) {
-        return logChannel.position() + size > Integer.MAX_VALUE;
-    }
 
     public ByteBuf internalReadEntry(long ledgerId, long entryId, long location)
             throws IOException, Bookie.NoEntryException {
@@ -1366,8 +1680,7 @@ public void shutdown() {
             }
             // clear the mapping, so we don't need to go through the channels again in finally block in normal case.
             logid2FileChannel.clear();
-            // close current writing log file
-            closeFileChannel(logChannel);
+            entryLogManager.close();
             synchronized (compactionLogLock) {
                 closeFileChannel(compactionLogChannel);
                 compactionLogChannel = null;
@@ -1379,7 +1692,8 @@ public void shutdown() {
             for (FileChannel fc : logid2FileChannel.values()) {
                 IOUtils.close(LOG, fc);
             }
-            forceCloseFileChannel(logChannel);
+
+            entryLogManager.forceClose();
             synchronized (compactionLogLock) {
                 forceCloseFileChannel(compactionLogChannel);
             }
@@ -1434,4 +1748,40 @@ static long fileName2LogId(String fileName) {
     static String logId2HexString(long logId) {
         return Long.toHexString(logId);
     }
-}
+
+    /**
+     * Datastructure which maintains the status of logchannels. When a
+     * logChannel is created entry of < entryLogId, false > will be made to this
+     * sortedmap and when logChannel is rotated and flushed then the entry is
+     * updated to < entryLogId, true > and all the lowest entries with
+     * < entryLogId, true > status will be removed from the sortedmap. So that way
+     * we could get least unflushed LogId.
+     *
+     */
+    static class RecentEntryLogsStatus {
+        private final SortedMap<Long, Boolean> entryLogsStatusMap;
+        private long leastUnflushedLogId;
+
+        RecentEntryLogsStatus(long leastUnflushedLogId) {
+            entryLogsStatusMap = new TreeMap<Long, Boolean>();
+            this.leastUnflushedLogId = leastUnflushedLogId;
+        }
+
+        synchronized void createdEntryLog(Long entryLogId) {
+            entryLogsStatusMap.put(entryLogId, false);
+        }
+
+        synchronized void flushRotatedEntryLog(Long entryLogId) {
+            entryLogsStatusMap.replace(entryLogId, true);
+            while ((!entryLogsStatusMap.isEmpty()) && (entryLogsStatusMap.get(entryLogsStatusMap.firstKey()))) {
+                long leastFlushedLogId = entryLogsStatusMap.firstKey();
+                entryLogsStatusMap.remove(leastFlushedLogId);
+                leastUnflushedLogId = leastFlushedLogId + 1;
+            }
+        }
+
+        synchronized long getLeastUnflushedLogId() {
+            return leastUnflushedLogId;
+        }
+    }
+}
\ No newline at end of file
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyEntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyEntryLogger.java
index d661412be..3a07ec4e8 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyEntryLogger.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyEntryLogger.java
@@ -37,16 +37,6 @@ public ReadOnlyEntryLogger(ServerConfiguration conf) throws IOException {
                 new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold())));
     }
 
-    @Override
-    protected void initialize() throws IOException {
-        // do nothing for read only entry logger
-    }
-
-    @Override
-    void createNewLog() throws IOException {
-        throw new IOException("Can't create new entry log using a readonly entry logger.");
-    }
-
     @Override
     protected boolean removeEntryLog(long entryLogId) {
         // can't remove entry log in readonly mode
@@ -54,7 +44,7 @@ protected boolean removeEntryLog(long entryLogId) {
     }
 
     @Override
-    public synchronized long addEntry(long ledger, ByteBuffer entry) throws IOException {
+    public synchronized long addEntry(long ledgerId, ByteBuffer entry) throws IOException {
         throw new IOException("Can't add entry to a readonly entry logger.");
     }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
index 0e3e3b946..f2efa551a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
@@ -168,13 +168,7 @@ public ByteBuf getEntry(long ledgerId, long entryId) throws IOException {
     @Override
     public void checkpoint(final Checkpoint checkpoint) throws IOException {
         long numBytesFlushed = memTable.flush(this, checkpoint);
-        if (numBytesFlushed > 0) {
-            // if bytes are added between previous flush and this checkpoint,
-            // it means bytes might live at current active entry log, we need
-            // roll current entry log and then issue checkpoint to underlying
-            // interleaved ledger storage.
-            entryLogger.rollLog();
-        }
+        entryLogger.prepareSortedLedgerStorageCheckpoint(numBytesFlushed);
         super.checkpoint(checkpoint);
     }
 
@@ -209,19 +203,12 @@ public void onSizeLimitReached(final Checkpoint cp) throws IOException {
             public void run() {
                 try {
                     LOG.info("Started flushing mem table.");
-                    long logIdBeforeFlush = entryLogger.getCurrentLogId();
+                    entryLogger.prepareEntryMemTableFlush();
                     memTable.flush(SortedLedgerStorage.this);
-                    long logIdAfterFlush = entryLogger.getCurrentLogId();
-                    // in any case that an entry log reaches the limit, we roll the log and start checkpointing.
-                    // if a memory table is flushed spanning over two entry log files, we also roll log. this is
-                    // for performance consideration: since we don't wanna checkpoint a new log file that ledger
-                    // storage is writing to.
-                    if (entryLogger.reachEntryLogLimit(0) || logIdAfterFlush != logIdBeforeFlush) {
-                        LOG.info("Rolling entry logger since it reached size limitation");
-                        entryLogger.rollLog();
+                    if (entryLogger.commitEntryMemTableFlush()) {
                         checkpointer.startCheckpoint(cp);
                     }
-                } catch (IOException e) {
+                } catch (Exception e) {
                     stateManager.transitionToReadOnlyMode();
                     LOG.error("Exception thrown while flushing skip list cache.", e);
                 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index 904ca3ffe..785b35a75 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -2262,6 +2262,10 @@ public void validate() throws ConfigurationException {
         if (0 == getBookiePort() && !getAllowEphemeralPorts()) {
             throw new ConfigurationException("Invalid port specified, using ephemeral ports accidentally?");
         }
+        if (isEntryLogPerLedgerEnabled() && getUseTransactionalCompaction()) {
+            throw new ConfigurationException(
+                    "When entryLogPerLedger is enabled , it is unnecessary to use transactional compaction");
+        }
     }
 
     /**
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java
index 33642c279..4257cccd3 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java
@@ -21,12 +21,18 @@
 import static org.junit.Assert.assertTrue;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.IntStream;
 
+import org.apache.bookkeeper.bookie.EntryLogger.EntryLogManagerBase;
+import org.apache.bookkeeper.bookie.EntryLogger.EntryLogManagerForSingleEntryLog;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.conf.TestBKConfiguration;
 import org.apache.bookkeeper.util.DiskChecker;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -93,16 +99,17 @@ public void testCreateNewLog() throws Exception {
         // Extracted from createNewLog()
         String logFileName = Long.toHexString(1) + ".log";
         File dir = ledgerDirsManager.pickRandomWritableDir();
-        LOG.info("Picked this directory: " + dir);
+        LOG.info("Picked this directory: {}", dir);
         File newLogFile = new File(dir, logFileName);
         newLogFile.createNewFile();
 
         EntryLogger el = new EntryLogger(conf, ledgerDirsManager);
         // Calls createNewLog, and with the number of directories we
         // are using, if it picks one at random it will fail.
-        el.createNewLog();
-        LOG.info("This is the current log id: " + el.getCurrentLogId());
-        assertTrue("Wrong log id", el.getCurrentLogId() > 1);
+        EntryLogManagerForSingleEntryLog entryLogManager = (EntryLogManagerForSingleEntryLog) el.getEntryLogManager();
+        entryLogManager.createNewLog(0L);
+        LOG.info("This is the current log id: {}", entryLogManager.getCurrentLogId());
+        assertTrue("Wrong log id", entryLogManager.getCurrentLogId() > 1);
     }
 
     @Test
@@ -118,7 +125,7 @@ public void testCreateNewLogWithNoWritableLedgerDirs() throws Exception {
         // Extracted from createNewLog()
         String logFileName = Long.toHexString(1) + ".log";
         File dir = ledgerDirsManager.pickRandomWritableDir();
-        LOG.info("Picked this directory: " + dir);
+        LOG.info("Picked this directory: {}", dir);
         File newLogFile = new File(dir, logFileName);
         newLogFile.createNewFile();
 
@@ -131,9 +138,136 @@ public void testCreateNewLogWithNoWritableLedgerDirs() throws Exception {
         EntryLogger el = new EntryLogger(conf, ledgerDirsManager);
         // Calls createNewLog, and with the number of directories we
         // are using, if it picks one at random it will fail.
-        el.createNewLog();
-        LOG.info("This is the current log id: " + el.getCurrentLogId());
-        assertTrue("Wrong log id", el.getCurrentLogId() > 1);
+        EntryLogManagerForSingleEntryLog entryLogManager = (EntryLogManagerForSingleEntryLog) el.getEntryLogManager();
+        entryLogManager.createNewLog(0L);
+        LOG.info("This is the current log id: {}", entryLogManager.getCurrentLogId());
+        assertTrue("Wrong log id", entryLogManager.getCurrentLogId() > 1);
     }
 
+    @Test
+    public void testConcurrentCreateNewLogWithEntryLogFilePreAllocationEnabled() throws Exception {
+        testConcurrentCreateNewLog(true);
+    }
+
+    @Test
+    public void testConcurrentCreateNewLogWithEntryLogFilePreAllocationDisabled() throws Exception {
+        testConcurrentCreateNewLog(false);
+    }
+
+    public void testConcurrentCreateNewLog(boolean entryLogFilePreAllocationEnabled) throws Exception {
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+
+        // Creating a new configuration with a number of
+        // ledger directories.
+        conf.setLedgerDirNames(ledgerDirs);
+        conf.setEntryLogFilePreAllocationEnabled(entryLogFilePreAllocationEnabled);
+        LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
+
+        EntryLogger el = new EntryLogger(conf, ledgerDirsManager);
+        EntryLogManagerBase entryLogManager = (EntryLogManagerBase) el.getEntryLogManager();
+        Assert.assertEquals("previousAllocatedEntryLogId after initialization", -1,
+                el.getPreviousAllocatedEntryLogId());
+        Assert.assertEquals("leastUnflushedLogId after initialization", 0, el.getLeastUnflushedLogId());
+        int createNewLogNumOfTimes = 10;
+        AtomicBoolean receivedException = new AtomicBoolean(false);
+
+        IntStream.range(0, createNewLogNumOfTimes).parallel().forEach((i) -> {
+            try {
+                (entryLogManager).createNewLog((long) i);
+            } catch (IOException e) {
+                LOG.error("Received exception while creating newLog", e);
+                receivedException.set(true);
+            }
+        });
+        // wait for the pre-allocation to complete
+        Thread.sleep(1000);
+
+        Assert.assertFalse("There shouldn't be any exceptions while creating newlog", receivedException.get());
+        int expectedPreviousAllocatedEntryLogId = createNewLogNumOfTimes - 1;
+        if (entryLogFilePreAllocationEnabled) {
+            expectedPreviousAllocatedEntryLogId = createNewLogNumOfTimes;
+        }
+
+        Assert.assertEquals(
+                "previousAllocatedEntryLogId after " + createNewLogNumOfTimes
+                        + " number of times createNewLog is called",
+                expectedPreviousAllocatedEntryLogId, el.getPreviousAllocatedEntryLogId());
+        Assert.assertEquals("Number of RotatedLogChannels", createNewLogNumOfTimes - 1,
+                entryLogManager.getRotatedLogChannels().size());
+    }
+
+    @Test
+    public void testCreateNewLogWithGaps() throws Exception {
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+
+        // Creating a new configuration with a number of
+        // ledger directories.
+        conf.setLedgerDirNames(ledgerDirs);
+        conf.setEntryLogFilePreAllocationEnabled(false);
+        LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
+
+        EntryLogger el = new EntryLogger(conf, ledgerDirsManager);
+        EntryLogManagerBase entryLogManagerBase = (EntryLogManagerBase) el.getEntryLogManager();
+        entryLogManagerBase.createNewLog(0L);
+
+        Assert.assertEquals("previousAllocatedEntryLogId after initialization", 0, el.getPreviousAllocatedEntryLogId());
+
+        // Extracted from createNewLog()
+        String logFileName = Long.toHexString(1) + ".log";
+        File dir = ledgerDirsManager.pickRandomWritableDir();
+        LOG.info("Picked this directory: {}", dir);
+        File newLogFile = new File(dir, logFileName);
+        newLogFile.createNewFile();
+
+        entryLogManagerBase.createNewLog(0L);
+        Assert.assertEquals("previousAllocatedEntryLogId since entrylogid 1 is already taken", 2,
+                el.getPreviousAllocatedEntryLogId());
+
+        // Extracted from createNewLog()
+        logFileName = Long.toHexString(3) + ".log";
+        dir = ledgerDirsManager.pickRandomWritableDir();
+        LOG.info("Picked this directory: {}", dir);
+        newLogFile = new File(dir, logFileName);
+        newLogFile.createNewFile();
+
+        entryLogManagerBase.createNewLog(0L);
+        Assert.assertEquals("previousAllocatedEntryLogId since entrylogid 3 is already taken", 4,
+                el.getPreviousAllocatedEntryLogId());
+    }
+
+    @Test
+    public void testCreateNewLogAndCompactionLog() throws Exception {
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+
+        // Creating a new configuration with a number of
+        // ledger directories.
+        conf.setLedgerDirNames(ledgerDirs);
+        conf.setEntryLogFilePreAllocationEnabled(true);
+        LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
+        EntryLogger el = new EntryLogger(conf, ledgerDirsManager);
+        AtomicBoolean receivedException = new AtomicBoolean(false);
+
+        IntStream.range(0, 2).parallel().forEach((i) -> {
+            try {
+                if (i % 2 == 0) {
+                    ((EntryLogManagerBase) el.getEntryLogManager()).createNewLog((long) i);
+                } else {
+                    el.createNewCompactionLog();
+                }
+            } catch (IOException e) {
+                LOG.error("Received exception while creating newLog", e);
+                receivedException.set(true);
+            }
+        });
+        // wait for the pre-allocation to complete
+        Thread.sleep(1000);
+
+        Assert.assertFalse("There shouldn't be any exceptions while creating newlog", receivedException.get());
+        Assert.assertEquals(
+                "previousAllocatedEntryLogId after 2 times createNewLog is called", 2,
+                el.getPreviousAllocatedEntryLogId());
+    }
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
index b7f286cf8..21da95118 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
@@ -44,6 +44,9 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
+import org.apache.bookkeeper.bookie.EntryLogger.EntryLogManager;
+import org.apache.bookkeeper.bookie.EntryLogger.EntryLogManagerBase;
+import org.apache.bookkeeper.bookie.EntryLogger.EntryLogManagerForSingleEntryLog;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.conf.TestBKConfiguration;
@@ -64,6 +67,7 @@
     private static final Logger LOG = LoggerFactory.getLogger(EntryLogTest.class);
 
     final List<File> tempDirs = new ArrayList<File>();
+    final Random rand = new Random();
 
     File createTempDir(String prefix, String suffix) throws IOException {
         File dir = IOUtils.createTempDir(prefix, suffix);
@@ -119,11 +123,13 @@ public void testDeferCreateNewLog() throws Exception {
         this.dirsMgr.addToFilledDirs(curDir);
 
         entryLogger = new EntryLogger(conf, dirsMgr);
-        assertEquals(EntryLogger.UNINITIALIZED_LOG_ID, entryLogger.getCurrentLogId());
+        EntryLogManagerForSingleEntryLog entryLogManager =
+                (EntryLogManagerForSingleEntryLog) entryLogger.getEntryLogManager();
+        assertEquals(EntryLogger.UNINITIALIZED_LOG_ID, entryLogManager.getCurrentLogId());
 
         // add the first entry will trigger file creation
-        entryLogger.addEntry(1, generateEntry(1, 1).nioBuffer());
-        assertEquals(2L, entryLogger.getCurrentLogId());
+        entryLogger.addEntry(1L, generateEntry(1, 1).nioBuffer());
+        assertEquals(0L, entryLogManager.getCurrentLogId());
     }
 
     @Test
@@ -141,23 +147,25 @@ public void testDeferCreateNewLogWithoutEnoughDiskSpaces() throws Exception {
         this.dirsMgr.addToFilledDirs(curDir);
 
         entryLogger = new EntryLogger(conf, dirsMgr);
-        assertEquals(EntryLogger.UNINITIALIZED_LOG_ID, entryLogger.getCurrentLogId());
+        EntryLogManagerForSingleEntryLog entryLogManager =
+                (EntryLogManagerForSingleEntryLog) entryLogger.getEntryLogManager();
+        assertEquals(EntryLogger.UNINITIALIZED_LOG_ID, entryLogManager.getCurrentLogId());
 
         // add the first entry will trigger file creation
         try {
-            entryLogger.addEntry(1, generateEntry(1, 1).nioBuffer());
+            entryLogger.addEntry(1L, generateEntry(1, 1).nioBuffer());
             fail("Should fail to append entry if there is no enough reserved space left");
         } catch (NoWritableLedgerDirException e) {
-            assertEquals(EntryLogger.UNINITIALIZED_LOG_ID, entryLogger.getCurrentLogId());
+            assertEquals(EntryLogger.UNINITIALIZED_LOG_ID, entryLogManager.getCurrentLogId());
         }
     }
 
     @Test
     public void testCorruptEntryLog() throws Exception {
         // create some entries
-        entryLogger.addEntry(1, generateEntry(1, 1).nioBuffer());
-        entryLogger.addEntry(3, generateEntry(3, 1).nioBuffer());
-        entryLogger.addEntry(2, generateEntry(2, 1).nioBuffer());
+        entryLogger.addEntry(1L, generateEntry(1, 1).nioBuffer());
+        entryLogger.addEntry(3L, generateEntry(3, 1).nioBuffer());
+        entryLogger.addEntry(2L, generateEntry(2, 1).nioBuffer());
         entryLogger.flush();
         entryLogger.shutdown();
         // now lets truncate the file to corrupt the last entry, which simulates a partial write
@@ -184,6 +192,16 @@ private static ByteBuf generateEntry(long ledger, long entry) {
         return bb;
     }
 
+    private ByteBuf generateEntry(long ledger, long entry, int length) {
+        ByteBuf bb = Unpooled.buffer(length);
+        bb.writeLong(ledger);
+        bb.writeLong(entry);
+        byte[] randbyteArray = new byte[length - 8 - 8];
+        rand.nextBytes(randbyteArray);
+        bb.writeBytes(randbyteArray);
+        return bb;
+    }
+
     private static String generateDataString(long ledger, long entry) {
         return ("ledger-" + ledger + "-" + entry);
     }
@@ -199,7 +217,7 @@ public void testMissingLogId() throws Exception {
 
             EntryLogger logger = new EntryLogger(conf, dirsMgr);
             for (int j = 0; j < numEntries; j++) {
-                positions[i][j] = logger.addEntry(i, generateEntry(i, j).nioBuffer());
+                positions[i][j] = logger.addEntry((long) i, generateEntry(i, j).nioBuffer());
             }
             logger.flush();
             logger.shutdown();
@@ -214,7 +232,7 @@ public void testMissingLogId() throws Exception {
 
             EntryLogger logger = new EntryLogger(conf, dirsMgr);
             for (int j = 0; j < numEntries; j++) {
-                positions[i][j] = logger.addEntry(i, generateEntry(i, j).nioBuffer());
+                positions[i][j] = logger.addEntry((long) i, generateEntry(i, j).nioBuffer());
             }
             logger.flush();
             logger.shutdown();
@@ -271,6 +289,7 @@ public void testAddEntryFailureOnDiskFull() throws Exception {
         File ledgerDir1 = createTempDir("bkTest", ".dir");
         File ledgerDir2 = createTempDir("bkTest", ".dir");
         ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+        conf.setLedgerStorageClass(InterleavedLedgerStorage.class.getName());
         conf.setJournalDirName(ledgerDir1.toString());
         conf.setLedgerDirNames(new String[] { ledgerDir1.getAbsolutePath(),
                 ledgerDir2.getAbsolutePath() });
@@ -287,7 +306,8 @@ public void testAddEntryFailureOnDiskFull() throws Exception {
         ledgerStorage.addEntry(generateEntry(1, 1));
         ledgerStorage.addEntry(generateEntry(2, 1));
         // Add entry with disk full failure simulation
-        bookie.getLedgerDirsManager().addToFilledDirs(entryLogger.currentDir);
+        bookie.getLedgerDirsManager().addToFilledDirs(((EntryLogManagerBase) entryLogger.getEntryLogManager())
+                .getCurrentLogForLedger(EntryLogger.UNASSIGNED_LEDGERID).getLogFile().getParentFile());
         ledgerStorage.addEntry(generateEntry(3, 1));
         // Verify written entries
         Assert.assertTrue(0 == generateEntry(1, 1).compareTo(ledgerStorage.getEntry(1, 1)));
@@ -301,12 +321,14 @@ public void testAddEntryFailureOnDiskFull() throws Exception {
     @Test
     public void testRecoverFromLedgersMap() throws Exception {
         // create some entries
-        entryLogger.addEntry(1, generateEntry(1, 1).nioBuffer());
-        entryLogger.addEntry(3, generateEntry(3, 1).nioBuffer());
-        entryLogger.addEntry(2, generateEntry(2, 1).nioBuffer());
-        entryLogger.addEntry(1, generateEntry(1, 2).nioBuffer());
-        entryLogger.rollLog();
-        entryLogger.flushRotatedLogs();
+        entryLogger.addEntry(1L, generateEntry(1, 1).nioBuffer());
+        entryLogger.addEntry(3L, generateEntry(3, 1).nioBuffer());
+        entryLogger.addEntry(2L, generateEntry(2, 1).nioBuffer());
+        entryLogger.addEntry(1L, generateEntry(1, 2).nioBuffer());
+
+        EntryLogManagerBase entryLogManager = (EntryLogManagerBase) entryLogger.getEntryLogManager();
+        entryLogManager.createNewLog(EntryLogger.UNASSIGNED_LEDGERID);
+        entryLogManager.flushRotatedLogs();
 
         EntryLogMetadata meta = entryLogger.extractEntryLogMetadataFromIndex(0L);
         LOG.info("Extracted Meta From Entry Log {}", meta);
@@ -324,11 +346,11 @@ public void testRecoverFromLedgersMap() throws Exception {
     @Test
     public void testRecoverFromLedgersMapOnV0EntryLog() throws Exception {
         // create some entries
-        entryLogger.addEntry(1, generateEntry(1, 1).nioBuffer());
-        entryLogger.addEntry(3, generateEntry(3, 1).nioBuffer());
-        entryLogger.addEntry(2, generateEntry(2, 1).nioBuffer());
-        entryLogger.addEntry(1, generateEntry(1, 2).nioBuffer());
-        entryLogger.rollLog();
+        entryLogger.addEntry(1L, generateEntry(1, 1).nioBuffer());
+        entryLogger.addEntry(3L, generateEntry(3, 1).nioBuffer());
+        entryLogger.addEntry(2L, generateEntry(2, 1).nioBuffer());
+        entryLogger.addEntry(1L, generateEntry(1, 2).nioBuffer());
+        ((EntryLogManagerBase) entryLogger.getEntryLogManager()).createNewLog(EntryLogger.UNASSIGNED_LEDGERID);
         entryLogger.shutdown();
 
         // Rewrite the entry log header to be on V0 format
@@ -373,9 +395,10 @@ public void testPreAllocateLog() throws Exception {
 
         entryLogger = new EntryLogger(conf, dirsMgr);
         // create a logger whose initialization phase allocating a new entry log
+        ((EntryLogManagerBase) entryLogger.getEntryLogManager()).createNewLog(EntryLogger.UNASSIGNED_LEDGERID);
         assertNotNull(entryLogger.getEntryLoggerAllocator().getPreallocationFuture());
 
-        entryLogger.addEntry(1, generateEntry(1, 1).nioBuffer());
+        entryLogger.addEntry(1L, generateEntry(1, 1).nioBuffer());
         // the Future<BufferedLogChannel> is not null all the time
         assertNotNull(entryLogger.getEntryLoggerAllocator().getPreallocationFuture());
         entryLogger.shutdown();
@@ -386,7 +409,7 @@ public void testPreAllocateLog() throws Exception {
         entryLogger = new EntryLogger(conf, dirsMgr);
         assertNull(entryLogger.getEntryLoggerAllocator().getPreallocationFuture());
 
-        entryLogger.addEntry(2, generateEntry(1, 1).nioBuffer());
+        entryLogger.addEntry(2L, generateEntry(1, 1).nioBuffer());
 
         // the Future<BufferedLogChannel> is null all the time
         assertNull(entryLogger.getEntryLoggerAllocator().getPreallocationFuture());
@@ -398,17 +421,18 @@ public void testPreAllocateLog() throws Exception {
     @Test
     public void testGetEntryLogsSet() throws Exception {
         // create some entries
-        assertEquals(Sets.newHashSet(0L, 1L), entryLogger.getEntryLogsSet());
+        EntryLogManagerBase entryLogManagerBase = ((EntryLogManagerBase) entryLogger.getEntryLogManager());
+        assertEquals(Sets.newHashSet(), entryLogger.getEntryLogsSet());
 
-        entryLogger.rollLog();
-        entryLogger.flushRotatedLogs();
+        entryLogManagerBase.createNewLog(EntryLogger.UNASSIGNED_LEDGERID);
+        entryLogManagerBase.flushRotatedLogs();
 
-        assertEquals(Sets.newHashSet(0L, 1L, 2L), entryLogger.getEntryLogsSet());
+        assertEquals(Sets.newHashSet(0L, 1L), entryLogger.getEntryLogsSet());
 
-        entryLogger.rollLog();
-        entryLogger.flushRotatedLogs();
+        entryLogManagerBase.createNewLog(EntryLogger.UNASSIGNED_LEDGERID);
+        entryLogManagerBase.flushRotatedLogs();
 
-        assertEquals(Sets.newHashSet(0L, 1L, 2L, 3L), entryLogger.getEntryLogsSet());
+        assertEquals(Sets.newHashSet(0L, 1L, 2L), entryLogger.getEntryLogsSet());
     }
 
     static class LedgerStorageWriteTask implements Callable<Boolean> {
@@ -566,4 +590,121 @@ public void testConcurrentWriteAndReadCallsOfInterleavedLedgerStorage() throws E
 
         executor.shutdownNow();
     }
+
+    /**
+     * Test to verify the leastUnflushedLogId logic in EntryLogsStatus.
+     */
+    @Test
+    public void testEntryLoggersRecentEntryLogsStatus() throws Exception {
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+        conf.setLedgerDirNames(createAndGetLedgerDirs(2));
+        LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
+
+        EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager);
+        EntryLogger.RecentEntryLogsStatus recentlyCreatedLogsStatus = entryLogger.recentlyCreatedEntryLogsStatus;
+
+        recentlyCreatedLogsStatus.createdEntryLog(0L);
+        Assert.assertEquals("entryLogger's leastUnflushedLogId ", 0L, entryLogger.getLeastUnflushedLogId());
+        recentlyCreatedLogsStatus.flushRotatedEntryLog(0L);
+        // since we marked entrylog - 0 as rotated, LeastUnflushedLogId would be previous rotatedlog+1
+        Assert.assertEquals("entryLogger's leastUnflushedLogId ", 1L, entryLogger.getLeastUnflushedLogId());
+        recentlyCreatedLogsStatus.createdEntryLog(1L);
+        Assert.assertEquals("entryLogger's leastUnflushedLogId ", 1L, entryLogger.getLeastUnflushedLogId());
+        recentlyCreatedLogsStatus.createdEntryLog(2L);
+        recentlyCreatedLogsStatus.createdEntryLog(3L);
+        recentlyCreatedLogsStatus.createdEntryLog(4L);
+        Assert.assertEquals("entryLogger's leastUnflushedLogId ", 1L, entryLogger.getLeastUnflushedLogId());
+        recentlyCreatedLogsStatus.flushRotatedEntryLog(1L);
+        Assert.assertEquals("entryLogger's leastUnflushedLogId ", 2L, entryLogger.getLeastUnflushedLogId());
+        recentlyCreatedLogsStatus.flushRotatedEntryLog(3L);
+        // here though we rotated entrylog-3, entrylog-2 is not yet rotated so
+        // LeastUnflushedLogId should be still 2
+        Assert.assertEquals("entryLogger's leastUnflushedLogId ", 2L, entryLogger.getLeastUnflushedLogId());
+        recentlyCreatedLogsStatus.flushRotatedEntryLog(2L);
+        // entrylog-3 is already rotated, so leastUnflushedLogId should be 4
+        Assert.assertEquals("entryLogger's leastUnflushedLogId ", 4L, entryLogger.getLeastUnflushedLogId());
+        recentlyCreatedLogsStatus.flushRotatedEntryLog(4L);
+        Assert.assertEquals("entryLogger's leastUnflushedLogId ", 5L, entryLogger.getLeastUnflushedLogId());
+        recentlyCreatedLogsStatus.createdEntryLog(5L);
+        recentlyCreatedLogsStatus.createdEntryLog(7L);
+        recentlyCreatedLogsStatus.createdEntryLog(9L);
+        Assert.assertEquals("entryLogger's leastUnflushedLogId ", 5L, entryLogger.getLeastUnflushedLogId());
+        recentlyCreatedLogsStatus.flushRotatedEntryLog(5L);
+        // since we marked entrylog-5 as rotated, LeastUnflushedLogId would be previous rotatedlog+1
+        Assert.assertEquals("entryLogger's leastUnflushedLogId ", 6L, entryLogger.getLeastUnflushedLogId());
+        recentlyCreatedLogsStatus.flushRotatedEntryLog(7L);
+        Assert.assertEquals("entryLogger's leastUnflushedLogId ", 8L, entryLogger.getLeastUnflushedLogId());
+    }
+
+    String[] createAndGetLedgerDirs(int numOfLedgerDirs) throws IOException {
+        File ledgerDir;
+        File curDir;
+        String[] ledgerDirsPath = new String[numOfLedgerDirs];
+        for (int i = 0; i < numOfLedgerDirs; i++) {
+            ledgerDir = createTempDir("bkTest", ".dir");
+            curDir = Bookie.getCurrentDirectory(ledgerDir);
+            Bookie.checkDirectoryStructure(curDir);
+            ledgerDirsPath[i] = ledgerDir.getAbsolutePath();
+        }
+        return ledgerDirsPath;
+    }
+
+    /*
+     * test for validating if the EntryLog/BufferedChannel flushes/forcewrite if the bytes written to it are more than
+     * flushIntervalInBytes
+     */
+    @Test(timeout = 60000)
+    public void testFlushIntervalInBytes() throws Exception {
+        long flushIntervalInBytes = 5000;
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+        conf.setEntryLogPerLedgerEnabled(true);
+        conf.setFlushIntervalInBytes(flushIntervalInBytes);
+        conf.setLedgerDirNames(createAndGetLedgerDirs(2));
+        LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
+        EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager);
+        EntryLogManagerBase entryLogManagerBase = ((EntryLogManagerBase) entryLogger.getEntryLogManager());
+
+        /*
+         * when entryLogger is created Header of length EntryLogger.LOGFILE_HEADER_SIZE is created
+         */
+        long ledgerId = 0L;
+        int firstEntrySize = 1000;
+        long entry0Position = entryLogger.addEntry(0L, generateEntry(ledgerId, 0L, firstEntrySize));
+        // entrylogger writes length of the entry (4 bytes) before writing entry
+        long expectedUnpersistedBytes = EntryLogger.LOGFILE_HEADER_SIZE + firstEntrySize + 4;
+        Assert.assertEquals("Unpersisted Bytes of entrylog", expectedUnpersistedBytes,
+                entryLogManagerBase.getCurrentLogForLedger(ledgerId).getUnpersistedBytes());
+
+        /*
+         * 'flushIntervalInBytes' number of bytes are flushed so BufferedChannel should be forcewritten
+         */
+        int secondEntrySize = (int) (flushIntervalInBytes - expectedUnpersistedBytes);
+        long entry1Position = entryLogger.addEntry(0L, generateEntry(ledgerId, 1L, secondEntrySize));
+        Assert.assertEquals("Unpersisted Bytes of entrylog", 0,
+                entryLogManagerBase.getCurrentLogForLedger(ledgerId).getUnpersistedBytes());
+
+        /*
+         * since entrylog/Bufferedchannel is persisted (forcewritten), we should be able to read the entrylog using
+         * newEntryLogger
+         */
+        conf.setEntryLogPerLedgerEnabled(false);
+        EntryLogger newEntryLogger = new EntryLogger(conf, ledgerDirsManager);
+        EntryLogManager newEntryLogManager = newEntryLogger.getEntryLogManager();
+        Assert.assertEquals("EntryLogManager class type", EntryLogger.EntryLogManagerForSingleEntryLog.class,
+                newEntryLogManager.getClass());
+
+        ByteBuf buf = newEntryLogger.readEntry(ledgerId, 0L, entry0Position);
+        long readLedgerId = buf.readLong();
+        long readEntryId = buf.readLong();
+        Assert.assertEquals("LedgerId", ledgerId, readLedgerId);
+        Assert.assertEquals("EntryId", 0L, readEntryId);
+
+        buf = newEntryLogger.readEntry(ledgerId, 1L, entry1Position);
+        readLedgerId = buf.readLong();
+        readEntryId = buf.readLong();
+        Assert.assertEquals("LedgerId", ledgerId, readLedgerId);
+        Assert.assertEquals("EntryId", 1L, readEntryId);
+    }
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageCheckpointTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageCheckpointTest.java
index 1a795ad0d..869295809 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageCheckpointTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageCheckpointTest.java
@@ -28,10 +28,13 @@
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.time.Duration;
+import java.util.Arrays;
 import java.util.Enumeration;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Random;
+import java.util.Set;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -39,6 +42,8 @@
 import java.util.stream.IntStream;
 import java.util.stream.LongStream;
 
+import org.apache.bookkeeper.bookie.EntryLogger.BufferedLogChannel;
+import org.apache.bookkeeper.bookie.EntryLogger.EntryLogManagerBase;
 import org.apache.bookkeeper.bookie.Journal.LastLogMark;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
@@ -359,7 +364,7 @@ public void testCheckpointofILSWhenEntryLogIsRotated(boolean entryLogPerLedgerEn
         }
         handle.close();
         // simulate rolling entrylog
-        ledgerStorage.entryLogger.rollLog();
+        ((EntryLogManagerBase) ledgerStorage.getEntryLogger().getEntryLogManager()).createNewLog(ledgerId);
         // sleep for a bit for checkpoint to do its task
         executorController.advance(Duration.ofMillis(500));
 
@@ -491,6 +496,7 @@ public void testIfEntryLogPerLedgerEnabledCheckpointFlushesAllLogs() throws Exce
         BookKeeper bkClient = new BookKeeper(clientConf);
         InterleavedLedgerStorage ledgerStorage = (InterleavedLedgerStorage) server.getBookie().ledgerStorage;
         EntryLogger entryLogger = ledgerStorage.entryLogger;
+        EntryLogManagerBase entryLogManagerBase = (EntryLogManagerBase) entryLogger.getEntryLogManager();
 
         int numOfEntries = 5;
         byte[] dataBytes = "data".getBytes();
@@ -502,7 +508,7 @@ public void testIfEntryLogPerLedgerEnabledCheckpointFlushesAllLogs() throws Exce
         }
         handle.close();
         // simulate rolling entrylog
-        ledgerStorage.entryLogger.rollLog();
+        entryLogManagerBase.createNewLog(ledgerId);
 
         ledgerId = 20;
         handle = bkClient.createLedgerAdv(ledgerId, 1, 1, 1, DigestType.CRC32, "passwd".getBytes(), null);
@@ -511,7 +517,7 @@ public void testIfEntryLogPerLedgerEnabledCheckpointFlushesAllLogs() throws Exce
         }
         handle.close();
         // simulate rolling entrylog
-        ledgerStorage.entryLogger.rollLog();
+        entryLogManagerBase.createNewLog(ledgerId);
 
         ledgerId = 30;
         handle = bkClient.createLedgerAdv(ledgerId, 1, 1, 1, DigestType.CRC32, "passwd".getBytes(), null);
@@ -520,9 +526,14 @@ public void testIfEntryLogPerLedgerEnabledCheckpointFlushesAllLogs() throws Exce
         }
         handle.close();
 
-        Assert.assertNotEquals("bytesWrittenSinceLastFlush shouldn't be zero", 0,
-                entryLogger.logChannel.getUnpersistedBytes());
-        Assert.assertNotEquals("There should be logChannelsToFlush", 0, entryLogger.logChannelsToFlush.size());
+        Set<BufferedLogChannel> copyOfCurrentLogs = new HashSet<BufferedLogChannel>(
+                Arrays.asList(entryLogManagerBase.getCurrentLogForLedger(EntryLogger.UNASSIGNED_LEDGERID)));
+        for (BufferedLogChannel currentLog : copyOfCurrentLogs) {
+            Assert.assertNotEquals("bytesWrittenSinceLastFlush shouldn't be zero", 0,
+                    currentLog.getUnpersistedBytes());
+        }
+        Assert.assertNotEquals("There should be logChannelsToFlush", 0,
+                entryLogManagerBase.getRotatedLogChannels().size());
 
         /*
          * wait for atleast flushInterval period, so that checkpoint can happen.
@@ -533,11 +544,16 @@ public void testIfEntryLogPerLedgerEnabledCheckpointFlushesAllLogs() throws Exce
          * since checkpoint happenend, there shouldn't be any logChannelsToFlush
          * and bytesWrittenSinceLastFlush should be zero.
          */
+        List<BufferedLogChannel> copyOfRotatedLogChannels = entryLogManagerBase.getRotatedLogChannels();
         Assert.assertTrue("There shouldn't be logChannelsToFlush",
-                ((entryLogger.logChannelsToFlush == null) || (entryLogger.logChannelsToFlush.size() == 0)));
+                ((copyOfRotatedLogChannels == null) || (copyOfRotatedLogChannels.size() == 0)));
 
-        Assert.assertEquals("bytesWrittenSinceLastFlush should be zero", 0,
-                entryLogger.logChannel.getUnpersistedBytes());
+        copyOfCurrentLogs = new HashSet<BufferedLogChannel>(
+                Arrays.asList(entryLogManagerBase.getCurrentLogForLedger(EntryLogger.UNASSIGNED_LEDGERID)));
+        for (BufferedLogChannel currentLog : copyOfCurrentLogs) {
+            Assert.assertEquals("bytesWrittenSinceLastFlush should be zero", 0,
+                    currentLog.getUnpersistedBytes());
+        }
     }
 
     static class MockInterleavedLedgerStorage extends InterleavedLedgerStorage {
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java
index c183fbf40..9642c18d1 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java
@@ -36,9 +36,11 @@
 import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
+import org.apache.bookkeeper.bookie.EntryLogger.EntryLogManagerForSingleEntryLog;
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -99,6 +101,7 @@ public void checkpointComplete(Checkpoint checkpoint, boolean compact)
     public SortedLedgerStorageCheckpointTest() {
         super();
         conf.setEntryLogSizeLimit(1);
+        conf.setEntryLogFilePreAllocationEnabled(false);
         this.checkpoints = new LinkedBlockingQueue<>();
     }
 
@@ -182,6 +185,7 @@ public void testCheckpoint() throws Exception {
         assertEquals(new TestCheckpoint(0), memtableCp);
 
         // trigger a memtable flush
+        Assert.assertNotNull("snapshot shouldn't have returned null", storage.memTable.snapshot());
         storage.onSizeLimitReached(checkpointSrc.newCheckpoint());
         // wait for checkpoint to complete
         checkpoints.poll(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
@@ -219,9 +223,11 @@ public void testCheckpointAfterEntryLogRotated() throws Exception {
         });
 
         // simulate entry log is rotated (due to compaction)
-        storage.entryLogger.rollLog();
+        EntryLogManagerForSingleEntryLog entryLogManager = (EntryLogManagerForSingleEntryLog) storage.getEntryLogger()
+                .getEntryLogManager();
+        entryLogManager.createNewLog(EntryLogger.UNASSIGNED_LEDGERID);
         long leastUnflushedLogId = storage.entryLogger.getLeastUnflushedLogId();
-        long currentLogId = storage.entryLogger.getCurrentLogId();
+        long currentLogId = entryLogManager.getCurrentLogId();
         log.info("Least unflushed entry log : current = {}, leastUnflushed = {}", currentLogId, leastUnflushedLogId);
 
         readyLatch.countDown();
@@ -230,6 +236,7 @@ public void testCheckpointAfterEntryLogRotated() throws Exception {
         assertEquals(20, storage.memTable.kvmap.size());
 
         // trigger a memtable flush
+        Assert.assertNotNull("snapshot shouldn't have returned null", storage.memTable.snapshot());
         storage.onSizeLimitReached(checkpointSrc.newCheckpoint());
         assertEquals(new TestCheckpoint(100), checkpoints.poll(Long.MAX_VALUE, TimeUnit.MILLISECONDS));
 
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java
index 67c69fe05..7b44c9475 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java
@@ -227,7 +227,7 @@ public void testBookieCompaction() throws Exception {
         newEntry3.writeLong(4); // ledger id
         newEntry3.writeLong(3); // entry id
         newEntry3.writeBytes("new-entry-3".getBytes());
-        long location = entryLogger.addEntry(4, newEntry3, false);
+        long location = entryLogger.addEntry(4L, newEntry3, false);
 
         List<EntryLocation> locations = Lists.newArrayList(new EntryLocation(4, 3, location));
         singleDirStorage.updateEntriesLocations(locations);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java
index 75aaa933f..3e02cf7c7 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java
@@ -306,4 +306,4 @@ public void testReadFromReadOnlyBookieShouldBeSuccess() throws Exception {
             assertEquals("Entry should contain correct data", "data", new String(entry.getEntry()));
         }
     }
-}
+}
\ No newline at end of file


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services