You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2018/05/07 16:00:18 UTC

[GitHub] reddycharan closed pull request #1201: (WIP) ISSUE #570: Entrylog per ledger

reddycharan closed pull request #1201: (WIP) ISSUE #570: Entrylog per ledger
URL: https://github.com/apache/bookkeeper/pull/1201
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bookkeeper-server/conf/bk_server.conf b/bookkeeper-server/conf/bk_server.conf
index 464ff4aa0..095796945 100755
--- a/bookkeeper-server/conf/bk_server.conf
+++ b/bookkeeper-server/conf/bk_server.conf
@@ -620,3 +620,15 @@ zkEnableSecurity=false
 # dbStorage_rocksDB_numLevels=-1
 # dbStorage_rocksDB_numFilesInLevel0=4
 # dbStorage_rocksDB_maxSizeInLevel1MB=256
+
+# config enabling/disabling entrylog per ledger feature.
+# entryLogPerLedgerEnabled=false
+
+# in entryLogPerLedger feature, the time duration used for lastaccess eviction policy for cache
+# entrylogMapAccessExpiryTimeInSeconds=300
+
+# In the case of multipleentrylogs, multiple threads can be used to flush the memtable
+# numOfMemtableFlushThreads=4
+
+# memtableFlushTimeoutInSeconds specifies the amount of time main flushthread has to wait for the processor threads to complete the flush
+# memtableFlushTimeoutInSeconds=120
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
index e091199b0..eea67ef6b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -111,6 +111,7 @@
     final List<Journal> journals;
 
     final HandleFactory handles;
+    final boolean entryLogPerLedgerEnabled;
 
     static final long METAENTRY_ID_LEDGER_KEY = -0x1000;
     static final long METAENTRY_ID_FENCE_KEY  = -0x2000;
@@ -687,6 +688,7 @@ public Bookie(ServerConfiguration conf, StatsLogger statsLogger)
                          conf, ledgerDirsManager, statsLogger.scope(JOURNAL_SCOPE + "_" + i)));
         }
 
+        this.entryLogPerLedgerEnabled = conf.isEntryLogPerLedgerEnabled();
         CheckpointSource checkpointSource = new CheckpointSourceList(journals);
 
         // Instantiate the ledger storage implementation
@@ -695,6 +697,21 @@ public Bookie(ServerConfiguration conf, StatsLogger statsLogger)
         ledgerStorage = LedgerStorageFactory.createLedgerStorage(ledgerStorageClass);
         syncThread = new SyncThread(conf, getLedgerDirsListener(), ledgerStorage, checkpointSource);
 
+        Checkpointer checkpointer;
+        /*
+         * with this change https://github.com/apache/bookkeeper/pull/677,
+         * LedgerStorage drives the checkpoint logic. But with multiple entry
+         * logs, checkpoint logic based on a entry log is not possible, hence it
+         * needs to be timebased recurring thing and it is driven by SyncThread.
+         * SyncThread.start does that and it is started in Bookie.start method.
+         */
+        if (entryLogPerLedgerEnabled) {
+            checkpointer = (checkpoint) -> {
+            };
+        } else {
+            checkpointer = syncThread;
+        }
+
         ledgerStorage.initialize(
             conf,
             ledgerManager,
@@ -702,7 +719,7 @@ public Bookie(ServerConfiguration conf, StatsLogger statsLogger)
             indexDirsManager,
             stateManager,
             checkpointSource,
-            syncThread,
+            checkpointer,
             statsLogger);
 
 
@@ -797,6 +814,15 @@ public synchronized void start() {
             idxMonitor.start();
         }
 
+        /*
+         * start sync thread first, so during replaying journals, we could do
+         * checkpoint which reduce the chance that we need to replay journals
+         * again if bookie restarted again before finished journal replays.
+         */
+        if (entryLogPerLedgerEnabled) {
+            syncThread.start();
+        }
+
         // replay journals
         try {
             readJournal();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
index 0d21d415c..e1eb5b01c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
@@ -41,20 +41,46 @@
     // The buffer used to write operations.
     protected final ByteBuf writeBuffer;
     // The absolute position of the next write operation.
-    protected volatile long position;
+    protected final AtomicLong position;
+
+    /*
+     * if unpersistedBytesBound is non-zero value, then after writing to
+     * writeBuffer, it will check if the unpersistedBytes is greater than
+     * unpersistedBytesBound and then calls flush method if it is greater.
+     *
+     * It is a best-effort feature, since 'forceWrite' method is not
+     * synchronized and unpersistedBytes is reset in 'forceWrite' method before
+     * calling fileChannel.force
+     */
+    protected final long unpersistedBytesBound;
+
+    /*
+     * it tracks the number of bytes which are not persisted yet by force
+     * writing the FileChannel. The unpersisted bytes could be in writeBuffer or
+     * in fileChannel system cache.
+     */
+    protected final AtomicLong unpersistedBytes;
 
     // make constructor to be public for unit test
     public BufferedChannel(FileChannel fc, int capacity) throws IOException {
         // Use the same capacity for read and write buffers.
-        this(fc, capacity, capacity);
+        this(fc, capacity, 0L);
     }
 
-    public BufferedChannel(FileChannel fc, int writeCapacity, int readCapacity) throws IOException {
+    public BufferedChannel(FileChannel fc, int capacity, long unpersistedBytesBound) throws IOException {
+        // Use the same capacity for read and write buffers.
+        this(fc, capacity, capacity, unpersistedBytesBound);
+    }
+
+    public BufferedChannel(FileChannel fc, int writeCapacity, int readCapacity, long unpersistedBytesBound)
+            throws IOException {
         super(fc, readCapacity);
         this.writeCapacity = writeCapacity;
-        this.position = fc.position();
-        this.writeBufferStartPosition.set(position);
+        this.position = new AtomicLong(fc.position());
+        this.writeBufferStartPosition.set(position.get());
         this.writeBuffer = ByteBufAllocator.DEFAULT.directBuffer(writeCapacity);
+        this.unpersistedBytes = new AtomicLong(0);
+        this.unpersistedBytesBound = unpersistedBytesBound;
     }
 
     @Override
@@ -70,20 +96,34 @@ public void close() throws IOException {
      * @param src The source ByteBuffer which contains the data to be written.
      * @throws IOException if a write operation fails.
      */
-    public synchronized void write(ByteBuf src) throws IOException {
+    public void write(ByteBuf src) throws IOException {
         int copied = 0;
-        int len = src.readableBytes();
-        while (copied < len) {
-            int bytesToCopy = Math.min(src.readableBytes() - copied, writeBuffer.writableBytes());
-            writeBuffer.writeBytes(src, src.readerIndex() + copied, bytesToCopy);
-            copied += bytesToCopy;
-
-            // if we have run out of buffer space, we should flush to the file
-            if (!writeBuffer.isWritable()) {
-                flushInternal();
+        boolean shouldForceWrite = false;
+        synchronized (this) {
+            int len = src.readableBytes();
+            while (copied < len) {
+                int bytesToCopy = Math.min(src.readableBytes() - copied, writeBuffer.writableBytes());
+                writeBuffer.writeBytes(src, src.readerIndex() + copied, bytesToCopy);
+                copied += bytesToCopy;
+
+                // if we have run out of buffer space, we should flush to the
+                // file
+                if (!writeBuffer.isWritable()) {
+                    flushInternal();
+                }
+            }
+            position.addAndGet(copied);
+            unpersistedBytes.addAndGet(copied);
+            if (unpersistedBytesBound > 0) {
+                if (unpersistedBytes.get() > unpersistedBytesBound) {
+                    flushInternal();
+                    shouldForceWrite = true;
+                }
             }
         }
-        position += copied;
+        if (shouldForceWrite) {
+            forceWrite(false);
+        }
     }
 
     /**
@@ -91,7 +131,7 @@ public synchronized void write(ByteBuf src) throws IOException {
      * @return
      */
     public long position() {
-        return position;
+        return position.get();
     }
 
     /**
@@ -110,11 +150,15 @@ public long getFileChannelPosition() {
      * @throws IOException if the write or sync operation fails.
      */
     public void flush(boolean shouldForceWrite) throws IOException {
+        flush(shouldForceWrite, false);
+    }
+
+    public void flush(boolean shouldForceWrite, boolean forceMetadata) throws IOException {
         synchronized (this) {
             flushInternal();
         }
         if (shouldForceWrite) {
-            forceWrite(false);
+            forceWrite(forceMetadata);
         }
     }
 
@@ -138,6 +182,20 @@ public long forceWrite(boolean forceMetadata) throws IOException {
         // the force write, any flush that happens after this may or may
         // not be flushed
         long positionForceWrite = writeBufferStartPosition.get();
+        /*
+         * since forceWrite method is not called in synchronized block, to make
+         * sure we are not undercounting unpersistedBytes, setting
+         * unpersistedBytes to the current number of bytes in writeBuffer.
+         *
+         * since we are calling fileChannel.force, bytes which are written to
+         * filechannel (system filecache) will be persisted to the disk. So we
+         * dont need to consider those bytes for setting value to
+         * unpersistedBytes.
+         *
+         */
+        synchronized (this) {
+            unpersistedBytes.set(writeBuffer.readableBytes());
+        }
         fileChannel.force(forceMetadata);
         return positionForceWrite;
     }
@@ -188,4 +246,12 @@ public synchronized void clear() {
         super.clear();
         writeBuffer.clear();
     }
+
+    public synchronized int getNumOfBytesInWriteBuffer() {
+        return writeBuffer.readableBytes();
+    }
+
+    long getUnpersistedBytes() {
+        return unpersistedBytes.get();
+    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
index 112a602af..4381a5a4e 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
@@ -25,6 +25,10 @@
 import static org.apache.bookkeeper.bookie.TransactionalEntryLogCompactor.COMPACTING_SUFFIX;
 import static org.apache.bookkeeper.util.BookKeeperConstants.MAX_LOG_SIZE_LIMIT;
 
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
 import com.google.common.collect.MapMaker;
 import com.google.common.collect.Sets;
 
@@ -50,11 +54,16 @@
 import java.nio.channels.FileChannel;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Iterator;
-import java.util.LinkedList;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
 import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.Callable;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -63,13 +72,17 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
-import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
+import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.util.IOUtils;
 import org.apache.bookkeeper.util.collections.ConcurrentLongLongHashMap;
 import org.apache.bookkeeper.util.collections.ConcurrentLongLongHashMap.BiConsumerLong;
+import org.apache.commons.lang.mutable.MutableInt;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -82,18 +95,19 @@
  */
 public class EntryLogger {
     private static final Logger LOG = LoggerFactory.getLogger(EntryLogger.class);
+    private static final Long INVALID_LEDGERID = new Long(-1);
+    // log file suffix
+    private static final String LOG_FILE_SUFFIX = ".log";
 
-    private static class BufferedLogChannel extends BufferedChannel {
+    class BufferedLogChannel extends BufferedChannel {
         private final long logId;
         private final EntryLogMetadata entryLogMetadata;
         private final File logFile;
+        private Long ledgerId = INVALID_LEDGERID;
 
-        public BufferedLogChannel(FileChannel fc,
-                                  int writeCapacity,
-                                  int readCapacity,
-                                  long logId,
-                                  File logFile) throws IOException {
-            super(fc, writeCapacity, readCapacity);
+        public BufferedLogChannel(FileChannel fc, int writeCapacity, int readCapacity, long logId, File logFile,
+                long unpersistedBytesBound) throws IOException {
+            super(fc, writeCapacity, readCapacity, unpersistedBytesBound);
             this.logId = logId;
             this.entryLogMetadata = new EntryLogMetadata(logId);
             this.logFile = logFile;
@@ -113,13 +127,25 @@ public void registerWrittenEntry(long ledgerId, long entrySize) {
         public ConcurrentLongLongHashMap getLedgersMap() {
             return entryLogMetadata.getLedgersMap();
         }
+
+        public boolean isLedgerDirFull() {
+            return ledgerDirsManager.isDirFull(logFile.getParentFile());
+        }
+
+        public Long getLedgerId() {
+            return ledgerId;
+        }
+
+        public void setLedgerId(Long ledgerId) {
+            this.ledgerId = ledgerId;
+        }
+
     }
 
-    volatile File currentDir;
     private final LedgerDirsManager ledgerDirsManager;
-    private final AtomicBoolean shouldCreateNewEntryLog = new AtomicBoolean(false);
+    private final boolean entryLogPerLedgerEnabled;
 
-    private volatile long leastUnflushedLogId;
+    RecentEntryLogsStatus recentlyCreatedEntryLogsStatus;
 
     /**
      * locks for compaction log.
@@ -130,11 +156,10 @@ public ConcurrentLongLongHashMap getLedgersMap() {
      * The maximum size of a entry logger file.
      */
     final long logSizeLimit;
-    private List<BufferedLogChannel> logChannelsToFlush;
-    private volatile BufferedLogChannel logChannel;
     private volatile BufferedLogChannel compactionLogChannel;
 
-    private final EntryLoggerAllocator entryLoggerAllocator;
+    final EntryLoggerAllocator entryLoggerAllocator;
+    final EntryLogManager entryLogManager;
     private final boolean entryLogPreAllocationEnabled;
     private final CopyOnWriteArrayList<EntryLogListener> listeners = new CopyOnWriteArrayList<EntryLogListener>();
 
@@ -199,7 +224,6 @@ public ConcurrentLongLongHashMap getLedgersMap() {
 
     private final long flushIntervalInBytes;
     private final boolean doRegularFlushes;
-    private long bytesWrittenSinceLastFlush = 0;
     private final int maxSaneEntrySize;
 
     final ServerConfiguration conf;
@@ -256,6 +280,8 @@ public EntryLogger(ServerConfiguration conf,
         // but the protocol varies so an exact value is difficult to determine
         this.maxSaneEntrySize = conf.getNettyMaxFrameSizeBytes() - 500;
         this.ledgerDirsManager = ledgerDirsManager;
+        this.conf = conf;
+        entryLogPerLedgerEnabled = conf.isEntryLogPerLedgerEnabled();
         if (listener != null) {
             addListener(listener);
         }
@@ -284,13 +310,15 @@ public EntryLogger(ServerConfiguration conf,
                 logId = lastLogId;
             }
         }
-        this.leastUnflushedLogId = logId + 1;
+        this.recentlyCreatedEntryLogsStatus = new RecentEntryLogsStatus(logId + 1);
+        this.flushIntervalInBytes = conf.getFlushIntervalInBytes();
+        this.doRegularFlushes = flushIntervalInBytes > 0;
         this.entryLoggerAllocator = new EntryLoggerAllocator(logId);
-        this.conf = conf;
-        flushIntervalInBytes = conf.getFlushIntervalInBytes();
-        doRegularFlushes = flushIntervalInBytes > 0;
-
-        initialize();
+        if (entryLogPerLedgerEnabled) {
+            this.entryLogManager = new EntryLogManagerForEntryLogPerLedger(conf);
+        } else {
+            this.entryLogManager = new EntryLogManagerForSingleEntryLog();
+        }
     }
 
     void addListener(EntryLogListener listener) {
@@ -313,13 +341,11 @@ void addListener(EntryLogListener listener) {
      */
     private int readFromLogChannel(long entryLogId, BufferedReadChannel channel, ByteBuf buff, long pos)
             throws IOException {
-        BufferedLogChannel bc = logChannel;
+        BufferedLogChannel bc = entryLogManager.getCurrentLogIfPresent(entryLogId);
         if (null != bc) {
-            if (entryLogId == bc.getLogId()) {
-                synchronized (bc) {
-                    if (pos + buff.writableBytes() >= bc.getFileChannelPosition()) {
-                        return bc.read(buff, pos);
-                    }
+            synchronized (bc) {
+                if (pos + buff.writableBytes() >= bc.getFileChannelPosition()) {
+                    return bc.read(buff, pos);
                 }
             }
         }
@@ -386,12 +412,37 @@ public BufferedReadChannel getFromChannels(long logId) {
      *
      * @return least unflushed log id.
      */
-    synchronized long getLeastUnflushedLogId() {
-        return leastUnflushedLogId;
+    long getLeastUnflushedLogId() {
+        return recentlyCreatedEntryLogsStatus.getLeastUnflushedLogId();
     }
 
-    synchronized long getCurrentLogId() {
-        return logChannel.getLogId();
+    long getPreviousAllocatedEntryLogId() {
+        return entryLoggerAllocator.getPreallocatedLogId();
+    }
+
+    boolean rollLogsIfEntryLogLimitReached() throws IOException {
+        // for this add ledgerid to bufferedlogchannel, getcopyofcurrentlogs get
+        // ledgerid, lock it only if there is new data
+        // so that cache accesstime is not changed
+
+        boolean rolledLog = false;
+        Set<BufferedLogChannel> copyOfCurrentLogs = entryLogManager.getCopyOfCurrentLogs();
+        for (BufferedLogChannel currentLog : copyOfCurrentLogs) {
+            if (currentLog.position() > logSizeLimit) {
+                Long ledgerId = currentLog.getLedgerId();
+                entryLogManager.acquireLock(ledgerId);
+                try {
+                    if (reachEntryLogLimit(ledgerId, 0L)) {
+                        rolledLog = true;
+                        LOG.info("Rolling entry logger since it reached size limitation");
+                        createNewLog(ledgerId);
+                    }
+                } finally {
+                    entryLogManager.releaseLock(ledgerId);
+                }
+            }
+        }
+        return rolledLog;
     }
 
     /**
@@ -406,98 +457,39 @@ File getCurCompactionLogFile() {
         }
     }
 
-    protected void initialize() throws IOException {
-        // Register listener for disk full notifications.
-        ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener());
-        // create a new log to write
-        createNewLog();
-    }
-
-    private LedgerDirsListener getLedgerDirsListener() {
-        return new LedgerDirsListener() {
-            @Override
-            public void diskFull(File disk) {
-                // If the current entry log disk is full, then create new entry
-                // log.
-                if (currentDir != null && currentDir.equals(disk)) {
-                    shouldCreateNewEntryLog.set(true);
-                }
-            }
-
-            @Override
-            public void diskAlmostFull(File disk) {
-                // If the current entry log disk is almost full, then create new entry
-                // log.
-                if (currentDir != null && currentDir.equals(disk)) {
-                    shouldCreateNewEntryLog.set(true);
-                }
-            }
-
-            @Override
-            public void diskFailed(File disk) {
-                // Nothing to handle here. Will be handled in Bookie
-            }
-
-            @Override
-            public void allDisksFull() {
-                // Nothing to handle here. Will be handled in Bookie
-            }
-
-            @Override
-            public void fatalError() {
-                // Nothing to handle here. Will be handled in Bookie
-            }
-
-            @Override
-            public void diskWritable(File disk) {
-                // Nothing to handle here. Will be handled in Bookie
-            }
-
-            @Override
-            public void diskJustWritable(File disk) {
-                // Nothing to handle here. Will be handled in Bookie
-            }
-        };
-    }
-
-    /**
-     * Rolling a new log file to write.
-     */
-    synchronized void rollLog() throws IOException {
-        createNewLog();
-    }
-
     /**
      * Creates a new log file.
      */
-    void createNewLog() throws IOException {
-        // first tried to create a new log channel. add current log channel to ToFlush list only when
-        // there is a new log channel. it would prevent that a log channel is referenced by both
-        // *logChannel* and *ToFlush* list.
-        if (null != logChannel) {
-            if (null == logChannelsToFlush) {
-                logChannelsToFlush = new LinkedList<BufferedLogChannel>();
-            }
-
-            // flush the internal buffer back to filesystem but not sync disk
-            // so the readers could access the data from filesystem.
-            logChannel.flush(false);
-
-            // Append ledgers map at the end of entry log
-            appendLedgersMap(logChannel);
-
-            BufferedLogChannel newLogChannel = entryLoggerAllocator.createNewLog();
-            logChannelsToFlush.add(logChannel);
-            LOG.info("Flushing entry logger {} back to filesystem, pending for syncing entry loggers : {}.",
-                    logChannel.getLogId(), logChannelsToFlush);
-            for (EntryLogListener listener : listeners) {
-                listener.onRotateEntryLog();
+    void createNewLog(Long ledgerId) throws IOException {
+        entryLogManager.acquireLock(ledgerId);
+        try {
+            BufferedLogChannel logChannel = entryLogManager.getCurrentLogForLedger(ledgerId);
+            // first tried to create a new log channel. add current log channel to ToFlush list only when
+            // there is a new log channel. it would prevent that a log channel is referenced by both
+            // *logChannel* and *ToFlush* list.
+            if (null != logChannel) {
+
+                // flush the internal buffer back to filesystem but not sync disk
+                logChannel.flush(false);
+
+                // Append ledgers map at the end of entry log
+                appendLedgersMap(ledgerId);
+
+                BufferedLogChannel newLogChannel = entryLoggerAllocator.createNewLog();
+                entryLogManager.setCurrentLogForLedger(ledgerId, newLogChannel);
+                LOG.info("Flushing entry logger {} back to filesystem, pending for syncing entry loggers : {}.",
+                        logChannel.getLogId(), entryLogManager.getCopyOfRotatedLogChannels());
+                if (!entryLogPerLedgerEnabled) {
+                    for (EntryLogListener listener : listeners) {
+                        listener.onRotateEntryLog();
+                    }
+                }
+            } else {
+                entryLogManager.setCurrentLogForLedger(ledgerId, entryLoggerAllocator.createNewLog());
             }
-            logChannel = newLogChannel;
-        } else {
-            logChannel = entryLoggerAllocator.createNewLog();
+        } finally {
+            entryLogManager.releaseLock(ledgerId);
         }
-        currentDir = logChannel.getLogFile().getParentFile();
     }
 
     /**
@@ -506,78 +498,86 @@ void createNewLog() throws IOException {
     EntryLoggerAllocator getEntryLoggerAllocator() {
         return entryLoggerAllocator;
     }
+
     /**
      * Append the ledger map at the end of the entry log.
      * Updates the entry log file header with the offset and size of the map.
      */
-    private void appendLedgersMap(BufferedLogChannel entryLogChannel) throws IOException {
-        long ledgerMapOffset = entryLogChannel.position();
-
-        ConcurrentLongLongHashMap ledgersMap = entryLogChannel.getLedgersMap();
-        int numberOfLedgers = (int) ledgersMap.size();
+    private void appendLedgersMap(Long ledgerId) throws IOException {
+        entryLogManager.acquireLock(ledgerId);
+        try {
+            BufferedLogChannel entryLogChannel = entryLogManager.getCurrentLogForLedger(ledgerId);
+            long ledgerMapOffset = entryLogChannel.position();
 
-        // Write the ledgers map into several batches
+            ConcurrentLongLongHashMap ledgersMap = entryLogChannel.getLedgersMap();
+            int numberOfLedgers = (int) ledgersMap.size();
 
-        final int maxMapSize = LEDGERS_MAP_HEADER_SIZE + LEDGERS_MAP_ENTRY_SIZE * LEDGERS_MAP_MAX_BATCH_SIZE;
-        final ByteBuf serializedMap = ByteBufAllocator.DEFAULT.buffer(maxMapSize);
+            // Write the ledgers map into several batches
 
-        try {
-            ledgersMap.forEach(new BiConsumerLong() {
-                int remainingLedgers = numberOfLedgers;
-                boolean startNewBatch = true;
-                int remainingInBatch = 0;
+            final int maxMapSize = LEDGERS_MAP_HEADER_SIZE + LEDGERS_MAP_ENTRY_SIZE * LEDGERS_MAP_MAX_BATCH_SIZE;
+            final ByteBuf serializedMap = ByteBufAllocator.DEFAULT.buffer(maxMapSize);
 
-                @Override
-                public void accept(long ledgerId, long size) {
-                    if (startNewBatch) {
-                        int batchSize = Math.min(remainingLedgers, LEDGERS_MAP_MAX_BATCH_SIZE);
-                        int ledgerMapSize = LEDGERS_MAP_HEADER_SIZE + LEDGERS_MAP_ENTRY_SIZE * batchSize;
-
-                        serializedMap.clear();
-                        serializedMap.writeInt(ledgerMapSize - 4);
-                        serializedMap.writeLong(INVALID_LID);
-                        serializedMap.writeLong(LEDGERS_MAP_ENTRY_ID);
-                        serializedMap.writeInt(batchSize);
-
-                        startNewBatch = false;
-                        remainingInBatch = batchSize;
-                    }
-                    // Dump the ledger in the current batch
-                    serializedMap.writeLong(ledgerId);
-                    serializedMap.writeLong(size);
-                    --remainingLedgers;
-
-                    if (--remainingInBatch == 0) {
-                        // Close current batch
-                        try {
-                            entryLogChannel.write(serializedMap);
-                        } catch (IOException e) {
-                            throw new RuntimeException(e);
+            try {
+                ledgersMap.forEach(new BiConsumerLong() {
+                    int remainingLedgers = numberOfLedgers;
+                    boolean startNewBatch = true;
+                    int remainingInBatch = 0;
+
+                    @Override
+                    public void accept(long ledgerId, long size) {
+                        if (startNewBatch) {
+                            int batchSize = Math.min(remainingLedgers, LEDGERS_MAP_MAX_BATCH_SIZE);
+                            int ledgerMapSize = LEDGERS_MAP_HEADER_SIZE + LEDGERS_MAP_ENTRY_SIZE * batchSize;
+
+                            serializedMap.clear();
+                            serializedMap.writeInt(ledgerMapSize - 4);
+                            serializedMap.writeLong(INVALID_LID);
+                            serializedMap.writeLong(LEDGERS_MAP_ENTRY_ID);
+                            serializedMap.writeInt(batchSize);
+
+                            startNewBatch = false;
+                            remainingInBatch = batchSize;
                         }
+                        // Dump the ledger in the current batch
+                        serializedMap.writeLong(ledgerId);
+                        serializedMap.writeLong(size);
+                        --remainingLedgers;
+
+                        if (--remainingInBatch == 0) {
+                            // Close current batch
+                            try {
+                                entryLogChannel.write(serializedMap);
+                            } catch (IOException e) {
+                                throw new RuntimeException(e);
+                            }
 
-                        startNewBatch = true;
+                            startNewBatch = true;
+                        }
                     }
+                });
+            } catch (RuntimeException e) {
+                if (e.getCause() instanceof IOException) {
+                    throw (IOException) e.getCause();
+                } else {
+                    throw e;
                 }
-            });
-        } catch (RuntimeException e) {
-            if (e.getCause() instanceof IOException) {
-                throw (IOException) e.getCause();
-            } else {
-                throw e;
+            } finally {
+                serializedMap.release();
             }
+            // Flush the ledger's map out before we write the header.
+            // Otherwise the header might point to something that is not fully
+            // written
+            entryLogChannel.flush(false);
+
+            // Update the headers with the map offset and count of ledgers
+            ByteBuffer mapInfo = ByteBuffer.allocate(8 + 4);
+            mapInfo.putLong(ledgerMapOffset);
+            mapInfo.putInt(numberOfLedgers);
+            mapInfo.flip();
+            entryLogChannel.fileChannel.write(mapInfo, LEDGERS_MAP_OFFSET_POSITION);
         } finally {
-            serializedMap.release();
-        }
-        // Flush the ledger's map out before we write the header.
-        // Otherwise the header might point to something that is not fully written
-        entryLogChannel.flush(false);
-
-        // Update the headers with the map offset and count of ledgers
-        ByteBuffer mapInfo = ByteBuffer.allocate(8 + 4);
-        mapInfo.putLong(ledgerMapOffset);
-        mapInfo.putInt(numberOfLedgers);
-        mapInfo.flip();
-        entryLogChannel.fileChannel.write(mapInfo, LEDGERS_MAP_OFFSET_POSITION);
+            entryLogManager.releaseLock(ledgerId);
+        }
     }
 
     /**
@@ -586,95 +586,148 @@ public void accept(long ledgerId, long size) {
     class EntryLoggerAllocator {
 
         private long preallocatedLogId;
-        private Future<BufferedLogChannel> preallocation = null;
+        Future<BufferedLogChannel> preallocation = null;
         private ExecutorService allocatorExecutor;
-        private final Object createEntryLogLock = new Object();
-        private final Object createCompactionLogLock = new Object();
 
         EntryLoggerAllocator(long logId) {
             preallocatedLogId = logId;
             allocatorExecutor = Executors.newSingleThreadExecutor();
         }
 
-        BufferedLogChannel createNewLog() throws IOException {
-            synchronized (createEntryLogLock) {
-                BufferedLogChannel bc;
-                if (!entryLogPreAllocationEnabled){
-                    // create a new log directly
-                    bc = allocateNewLog();
-                    return bc;
-                } else {
-                    // allocate directly to response request
-                    if (null == preallocation){
-                        bc = allocateNewLog();
+        synchronized long getPreallocatedLogId(){
+            return preallocatedLogId;
+        }
+
+        synchronized BufferedLogChannel createNewLog() throws IOException {
+            BufferedLogChannel bc;
+            if (!entryLogPreAllocationEnabled || null == preallocation) {
+                // initialization time to create a new log
+                bc = allocateNewLog();
+            } else {
+                // has a preallocated entry log
+                try {
+                    /*
+                     * both createNewLog and allocateNewLog are synchronized on
+                     * EntryLoggerAllocator.this object. So it is possible that
+                     * a thread calling createNewLog would attain the lock on
+                     * this object and get to this point but preallocation
+                     * Future is starving for lock on EntryLoggerAllocator.this
+                     * to execute allocateNewLog. Here since we attained lock
+                     * for this it means preallocation future must have either
+                     * completed creating new log or still waiting for lock on
+                     * this object to execute allocateNewLog method. So we
+                     * should try to get result of the future without waiting.
+                     * If it fails with TimeoutException then call
+                     * allocateNewLog explicitly since we are holding the lock
+                     * on this anyhow.
+                     *
+                     */
+                    bc = preallocation.get(0, TimeUnit.MILLISECONDS);
+                } catch (ExecutionException ee) {
+                    if (ee.getCause() instanceof IOException) {
+                        throw (IOException) (ee.getCause());
                     } else {
-                        // has a preallocated entry log
-                        try {
-                            bc = preallocation.get();
-                        } catch (ExecutionException ee) {
-                            if (ee.getCause() instanceof IOException) {
-                                throw (IOException) (ee.getCause());
-                            } else {
-                                throw new IOException("Error to execute entry log allocation.", ee);
-                            }
-                        } catch (CancellationException ce) {
-                            throw new IOException("Task to allocate a new entry log is cancelled.", ce);
-                        } catch (InterruptedException ie) {
-                            throw new IOException("Intrrupted when waiting a new entry log to be allocated.", ie);
-                        }
+                        throw new IOException("Error to execute entry log allocation.", ee);
                     }
-                    // preallocate a new log in background upon every call
-                    preallocation = allocatorExecutor.submit(() -> allocateNewLog());
-                    return bc;
+                } catch (CancellationException ce) {
+                    throw new IOException("Task to allocate a new entry log is cancelled.", ce);
+                } catch (InterruptedException ie) {
+                    throw new IOException("Intrrupted when waiting a new entry log to be allocated.", ie);
+                } catch (TimeoutException e) {
+                    LOG.debug("Received TimeoutException while trying to get preallocation future result,"
+                            + " which means that Future is waiting for acquiring lock on EntryLoggerAllocator.this");
+                    bc = allocateNewLog();
+                }
+            }
+            if (entryLogPreAllocationEnabled) {
+                /*
+                 * We should submit new callable / create new instance of future only if the previous preallocation is
+                 * null or if it is done. This is needed because previous preallocation has not completed its execution
+                 * since it is waiting for lock on EntryLoggerAllocator.this.
+                 */
+                if ((preallocation == null) || preallocation.isDone()) {
+                    preallocation = allocatorExecutor.submit(new Callable<BufferedLogChannel>() {
+                        @Override
+                        public BufferedLogChannel call() throws IOException {
+                            return allocateNewLog();
+                        }
+                    });
                 }
             }
+            LOG.info("Created new entry logger {}.", bc.getLogId());
+            return bc;
         }
 
-        BufferedLogChannel createNewLogForCompaction() throws IOException {
-            synchronized (createCompactionLogLock) {
+        synchronized BufferedLogChannel createNewLogForCompaction() throws IOException {
                 return allocateNewLog(COMPACTING_SUFFIX);
-            }
         }
 
-        private BufferedLogChannel allocateNewLog() throws IOException {
-            return allocateNewLog(".log");
+        synchronized BufferedLogChannel allocateNewLog() throws IOException {
+            return allocateNewLog(LOG_FILE_SUFFIX);
         }
 
         /**
          * Allocate a new log file.
          */
-        private BufferedLogChannel allocateNewLog(String suffix) throws IOException {
-            List<File> list = ledgerDirsManager.getWritableLedgerDirsForNewLog();
-            Collections.shuffle(list);
-            // It would better not to overwrite existing entry log files
-            File newLogFile = null;
-            do {
+        synchronized BufferedLogChannel allocateNewLog(String suffix) throws IOException {
+            File dirForNextEntryLog;
+            List<File> list;
+
+            try {
+                list = ledgerDirsManager.getWritableLedgerDirs();
+            } catch (NoWritableLedgerDirException nwe) {
+                if (!ledgerDirsManager.hasWritableLedgerDirs()) {
+                    list = ledgerDirsManager.getWritableLedgerDirsForNewLog();
+                } else {
+                    LOG.error("All Disks are not full, but getWritableLedgerDirs threw exception ", nwe);
+                    throw nwe;
+                }
+            }
+
+            dirForNextEntryLog = entryLogManager.getDirForNextEntryLog(list);
+
+            List<File> ledgersDirs = ledgerDirsManager.getAllLedgerDirs();
+            String logFileName;
+            while (true) {
                 if (preallocatedLogId >= Integer.MAX_VALUE) {
                     preallocatedLogId = 0;
                 } else {
                     ++preallocatedLogId;
                 }
-                String logFileName = Long.toHexString(preallocatedLogId) + suffix;
-                for (File dir : list) {
-                    newLogFile = new File(dir, logFileName);
+                /*
+                 * make sure there is no entrylog which already has the same
+                 * logID. Have to check all the ledegerdirs. If already there is
+                 * an entrylog with that logid then move to next ID.
+                 */
+                logFileName = Long.toHexString(preallocatedLogId) + ".log";
+                boolean entryLogAlreadyExistsWithThisId = false;
+                for (File dir : ledgersDirs) {
+                    File newLogFile = new File(dir, logFileName);
                     if (newLogFile.exists()) {
                         LOG.warn("Found existed entry log " + newLogFile
                                + " when trying to create it as a new log.");
-                        newLogFile = null;
+                        entryLogAlreadyExistsWithThisId = true;
                         break;
                     }
                 }
-            } while (newLogFile == null);
+                if (!entryLogAlreadyExistsWithThisId) {
+                    break;
+                }
+            }
 
+            File newLogFile = new File(dirForNextEntryLog, logFileName);
             FileChannel channel = new RandomAccessFile(newLogFile, "rw").getChannel();
-            BufferedLogChannel logChannel = new BufferedLogChannel(channel,
-                    conf.getWriteBufferBytes(), conf.getReadBufferBytes(), preallocatedLogId, newLogFile);
+            BufferedLogChannel logChannel = new BufferedLogChannel(channel, conf.getWriteBufferBytes(),
+                    conf.getReadBufferBytes(), preallocatedLogId, newLogFile, conf.getFlushIntervalInBytes());
             logfileHeader.readerIndex(0);
             logChannel.write(logfileHeader);
 
-            for (File f : list) {
+            for (File f : ledgersDirs) {
                 setLastLogId(f, preallocatedLogId);
             }
+            if (suffix.equals(LOG_FILE_SUFFIX)) {
+                recentlyCreatedEntryLogsStatus.createdEntryLog(preallocatedLogId);
+            }
             LOG.info("Created new entry log file {} for logId {}.", newLogFile, preallocatedLogId);
             return logChannel;
         }
@@ -794,6 +847,372 @@ private long readLastLogId(File f) {
         }
     }
 
+    interface EntryLogManager {
+        /*
+         * acquire lock for this ledger.
+         */
+        void acquireLock(Long ledgerId);
+
+        /*
+         * acquire lock for this ledger if it is not already available for this
+         * ledger then it will create a new one and then acquire lock.
+         */
+        void acquireLockByCreatingIfRequired(Long ledgerId);
+
+        /*
+         * release lock for this ledger
+         */
+        void releaseLock(Long ledgerId);
+
+        /*
+         * sets the logChannel for the given ledgerId. The previous one will be
+         * removed from replicaOfCurrentLogChannels. Previous logChannel will be
+         * added to rotatedLogChannels.
+         */
+        void setCurrentLogForLedger(Long ledgerId, BufferedLogChannel logChannel);
+
+        /*
+         * gets the logChannel for the given ledgerId.
+         */
+        BufferedLogChannel getCurrentLogForLedger(Long ledgerId);
+
+        /*
+         * gets the copy of rotatedLogChannels
+         */
+        Set<BufferedLogChannel> getCopyOfRotatedLogChannels();
+
+        /*
+         * gets the copy of replicaOfCurrentLogChannels
+         */
+        Set<BufferedLogChannel> getCopyOfCurrentLogs();
+
+        /*
+         * gets the active logChannel with the given entryLogId. null if it is
+         * not existing.
+         */
+        BufferedLogChannel getCurrentLogIfPresent(long entryLogId);
+
+        /*
+         * removes the logChannel from rotatedLogChannels collection
+         */
+        void removeFromRotatedLogChannels(BufferedLogChannel rotatedLogChannelToRemove);
+
+        /*
+         * Returns eligible writable ledger dir for the creation next entrylog
+         */
+        File getDirForNextEntryLog(List<File> writableLedgerDirs);
+    }
+
+    class EntryLogManagerForSingleEntryLog implements EntryLogManager {
+
+        private BufferedLogChannel activeLogChannel;
+        private Lock lockForActiveLogChannel;
+        private final Set<BufferedLogChannel> rotatedLogChannels;
+
+        EntryLogManagerForSingleEntryLog() {
+            rotatedLogChannels = ConcurrentHashMap.newKeySet();
+            lockForActiveLogChannel = new ReentrantLock();
+        }
+
+        /*
+         * since entryLogPerLedger is not enabled, it is just one lock for all
+         * ledgers.
+         */
+        @Override
+        public void acquireLock(Long ledgerId) {
+            lockForActiveLogChannel.lock();
+        }
+
+        @Override
+        public void acquireLockByCreatingIfRequired(Long ledgerId) {
+            acquireLock(ledgerId);
+        }
+
+        @Override
+        public void releaseLock(Long ledgerId) {
+            lockForActiveLogChannel.unlock();
+        }
+
+        @Override
+        public void setCurrentLogForLedger(Long ledgerId, BufferedLogChannel logChannel) {
+            acquireLock(ledgerId);
+            try {
+                BufferedLogChannel hasToRotateLogChannel = activeLogChannel;
+                activeLogChannel = logChannel;
+                if (hasToRotateLogChannel != null) {
+                    rotatedLogChannels.add(hasToRotateLogChannel);
+                }
+            } finally {
+                releaseLock(ledgerId);
+            }
+        }
+
+        @Override
+        public BufferedLogChannel getCurrentLogForLedger(Long ledgerId) {
+            return activeLogChannel;
+        }
+
+        @Override
+        public Set<BufferedLogChannel> getCopyOfRotatedLogChannels() {
+            return new HashSet<BufferedLogChannel>(rotatedLogChannels);
+        }
+
+        @Override
+        public Set<BufferedLogChannel> getCopyOfCurrentLogs() {
+            HashSet<BufferedLogChannel> copyOfCurrentLogs = new HashSet<BufferedLogChannel>();
+            copyOfCurrentLogs.add(activeLogChannel);
+            return copyOfCurrentLogs;
+        }
+
+        @Override
+        public BufferedLogChannel getCurrentLogIfPresent(long entryLogId) {
+            BufferedLogChannel activeLogChannelTemp = activeLogChannel;
+            if ((activeLogChannelTemp != null) && (activeLogChannelTemp.getLogId() == entryLogId)) {
+                return activeLogChannelTemp;
+            }
+            return null;
+        }
+
+        @Override
+        public void removeFromRotatedLogChannels(BufferedLogChannel rotatedLogChannelToRemove) {
+            rotatedLogChannels.remove(rotatedLogChannelToRemove);
+        }
+
+        @Override
+        public File getDirForNextEntryLog(List<File> writableLedgerDirs) {
+            Collections.shuffle(writableLedgerDirs);
+            return writableLedgerDirs.get(0);
+        }
+    }
+
+    class EntryLogManagerForEntryLogPerLedger implements EntryLogManager {
+
+        class EntryLogAndLockTuple {
+            private final Lock ledgerLock;
+            private BufferedLogChannel entryLog;
+
+            public EntryLogAndLockTuple() {
+                ledgerLock = new ReentrantLock();
+            }
+
+            public Lock getLedgerLock() {
+                return ledgerLock;
+            }
+
+            public BufferedLogChannel getEntryLog() {
+                return entryLog;
+            }
+
+            public void setEntryLog(BufferedLogChannel entryLog) {
+                this.entryLog = entryLog;
+            }
+        }
+
+        private Cache<Long, EntryLogAndLockTuple> ledgerIdEntryLogMap;
+        private final Set<BufferedLogChannel> rotatedLogChannels;
+        /*
+         * every time active logChannel is accessed from ledgerIdEntryLogMap
+         * cache, the accesstime of that entry is updated. But for certain
+         * operations we dont want to impact accessTime of the entries (like
+         * periodic flush of current active logChannels), and those operations
+         * can use this copy of references.
+         */
+        private final ConcurrentHashMap<Long, BufferedLogChannel> replicaOfCurrentLogChannels;
+        private final Callable<EntryLogAndLockTuple> entryLogAndLockTupleValueLoader;
+
+        EntryLogManagerForEntryLogPerLedger(ServerConfiguration conf) throws IOException {
+            rotatedLogChannels = ConcurrentHashMap.newKeySet();
+
+            replicaOfCurrentLogChannels = new ConcurrentHashMap<Long, BufferedLogChannel>();
+            int entrylogMapAccessExpiryTimeInSeconds = conf.getEntrylogMapAccessExpiryTimeInSeconds();
+            entryLogAndLockTupleValueLoader = new Callable<EntryLogAndLockTuple>() {
+                @Override
+                public EntryLogAndLockTuple call() throws Exception {
+                    return new EntryLogAndLockTuple();
+                }
+            };
+            /*
+             * Currently we are relying on access time based eviction policy for
+             * removal of EntryLogAndLockTuple, so if the EntryLogAndLockTuple of
+             * the ledger is not accessed in
+             * entrylogMapAccessExpiryTimeInSeconds period, it will be removed
+             * from the cache.
+             *
+             * We are going to introduce explicit advisory writeClose call, with
+             * that explicit call EntryLogAndLockTuple of the ledger will be
+             * removed from the cache. But still timebased eviciton policy is
+             * needed because it is not guaranteed that Bookie/EntryLogger would
+             * receive successfully write close call in all the cases.
+             */
+            ledgerIdEntryLogMap = CacheBuilder.newBuilder()
+                    .expireAfterAccess(entrylogMapAccessExpiryTimeInSeconds, TimeUnit.SECONDS)
+                    .removalListener(new RemovalListener<Long, EntryLogAndLockTuple>() {
+                        @Override
+                        public void onRemoval(
+                                RemovalNotification<Long, EntryLogAndLockTuple> expiredLedgerEntryLogMapEntry) {
+                            removalOnExpiry(expiredLedgerEntryLogMapEntry);
+                        }
+                    }).build();
+        }
+
+        /*
+         * This method is called when access time of that ledger has elapsed
+         * entrylogMapAccessExpiryTimeInSeconds period and the entry for that
+         * ledger is removed from cache. Since the entrylog of this ledger is
+         * not active anymore it has to be removed from
+         * replicaOfCurrentLogChannels and added to rotatedLogChannels.
+         *
+         * Because of performance/optimizations concerns the cleanup maintenance
+         * operations wont happen automatically, for more info on eviction
+         * cleanup maintenance tasks -
+         * https://google.github.io/guava/releases/19.0/api/docs/com/google/
+         * common/cache/CacheBuilder.html
+         *
+         */
+        private void removalOnExpiry(RemovalNotification<Long, EntryLogAndLockTuple> expiredLedgerEntryLogMapEntry) {
+            Long ledgerId = expiredLedgerEntryLogMapEntry.getKey();
+            LOG.debug("LedgerId {} is not accessed for entrylogMapAccessExpiryTimeInSeconds"
+                    + " period so it is being evicted from the cache map", ledgerId);
+            EntryLogAndLockTuple entryLogAndLockTuple = expiredLedgerEntryLogMapEntry.getValue();
+            Lock lock = entryLogAndLockTuple.ledgerLock;
+            BufferedLogChannel logChannel = entryLogAndLockTuple.entryLog;
+            lock.lock();
+            try {
+                replicaOfCurrentLogChannels.remove(logChannel.logId);
+                rotatedLogChannels.add(logChannel);
+            } finally {
+                lock.unlock();
+            }
+        }
+
+        @Override
+        public void acquireLock(Long ledgerId) {
+            ledgerIdEntryLogMap.getIfPresent(ledgerId).getLedgerLock().lock();
+        }
+
+        /*
+         * acquire lock for this ledger. In this method if EntryLogAndLockTuple
+         * is not already available for this ledger in the cache, then it will
+         * create a new EntryLogAndLockTuple, add it to cache and acquire lock.
+         *
+         */
+        @Override
+        public void acquireLockByCreatingIfRequired(Long ledgerId) {
+            try {
+                ledgerIdEntryLogMap.get(ledgerId, entryLogAndLockTupleValueLoader).getLedgerLock().lock();
+            } catch (ExecutionException e) {
+                throw new RuntimeException(
+                        "Got ExecutionException while trying to create EntryLogAndLockTuple for Ledger: " + ledgerId,
+                        e);
+            }
+        }
+
+        @Override
+        public void releaseLock(Long ledgerId) {
+            ledgerIdEntryLogMap.getIfPresent(ledgerId).getLedgerLock().unlock();
+        }
+
+        /*
+         * sets the logChannel for the given ledgerId. It will add the new
+         * logchannel to replicaOfCurrentLogChannels, and the previous one will
+         * be removed from replicaOfCurrentLogChannels. Previous logChannel will
+         * be added to rotatedLogChannels in both the cases.
+         */
+        @Override
+        public void setCurrentLogForLedger(Long ledgerId, BufferedLogChannel logChannel) {
+            acquireLock(ledgerId);
+            try {
+                BufferedLogChannel hasToRotateLogChannel = getCurrentLogForLedger(ledgerId);
+                logChannel.setLedgerId(ledgerId);
+                ledgerIdEntryLogMap.getIfPresent(ledgerId).setEntryLog(logChannel);
+                replicaOfCurrentLogChannels.put(logChannel.logId, logChannel);
+                if (hasToRotateLogChannel != null) {
+                    replicaOfCurrentLogChannels.remove(hasToRotateLogChannel.logId);
+                    rotatedLogChannels.add(hasToRotateLogChannel);
+                }
+            } finally {
+                releaseLock(ledgerId);
+            }
+        }
+
+        @Override
+        public BufferedLogChannel getCurrentLogForLedger(Long ledgerId) {
+            EntryLogAndLockTuple entryLogAndLockTuple = ledgerIdEntryLogMap.getIfPresent(ledgerId);
+            if (entryLogAndLockTuple == null) {
+                return null;
+            } else {
+                return entryLogAndLockTuple.getEntryLog();
+            }
+        }
+
+        @Override
+        public Set<BufferedLogChannel> getCopyOfRotatedLogChannels() {
+            return new HashSet<BufferedLogChannel>(rotatedLogChannels);
+        }
+
+        @Override
+        public Set<BufferedLogChannel> getCopyOfCurrentLogs() {
+            return new HashSet<BufferedLogChannel>(replicaOfCurrentLogChannels.values());
+        }
+
+        @Override
+        public BufferedLogChannel getCurrentLogIfPresent(long entryLogId) {
+            return replicaOfCurrentLogChannels.get(entryLogId);
+        }
+
+        @Override
+        public void removeFromRotatedLogChannels(BufferedLogChannel rotatedLogChannelToRemove) {
+            rotatedLogChannels.remove(rotatedLogChannelToRemove);
+        }
+
+        /*
+         * this is for testing purpose only. guava's cache doesnt cleanup
+         * completely (including calling expiry removal listener) automatically
+         * when access timeout elapses.
+         *
+         * https://google.github.io/guava/releases/19.0/api/docs/com/google/
+         * common/cache/CacheBuilder.html
+         *
+         * If expireAfterWrite or expireAfterAccess is requested entries may be
+         * evicted on each cache modification, on occasional cache accesses, or
+         * on calls to Cache.cleanUp(). Expired entries may be counted by
+         * Cache.size(), but will never be visible to read or write operations.
+         *
+         * Certain cache configurations will result in the accrual of periodic
+         * maintenance tasks which will be performed during write operations, or
+         * during occasional read operations in the absence of writes. The
+         * Cache.cleanUp() method of the returned cache will also perform
+         * maintenance, but calling it should not be necessary with a high
+         * throughput cache. Only caches built with removalListener,
+         * expireAfterWrite, expireAfterAccess, weakKeys, weakValues, or
+         * softValues perform periodic maintenance.
+         */
+        void doEntryLogMapCleanup() {
+            ledgerIdEntryLogMap.cleanUp();
+        }
+
+        /*
+         * Returns writable ledger dir with least number of current active
+         * entrylogs.
+         */
+        @Override
+        public File getDirForNextEntryLog(List<File> writableLedgerDirs) {
+            Map<File, MutableInt> writableLedgerDirFrequency = new HashMap<File, MutableInt>();
+            writableLedgerDirs.stream()
+                    .forEach((ledgerDir) -> writableLedgerDirFrequency.put(ledgerDir, new MutableInt()));
+            for (BufferedLogChannel logChannel : replicaOfCurrentLogChannels.values()) {
+                File parentDirOfCurrentLogChannel = logChannel.getLogFile().getParentFile();
+                if (writableLedgerDirFrequency.containsKey(parentDirOfCurrentLogChannel)) {
+                    writableLedgerDirFrequency.get(parentDirOfCurrentLogChannel).increment();
+                }
+            }
+            @SuppressWarnings("unchecked")
+            Optional<Entry<File, MutableInt>> ledgerDirWithLeastNumofCurrentLogs = writableLedgerDirFrequency.entrySet()
+                    .stream().min(Map.Entry.comparingByValue());
+            return ledgerDirWithLeastNumofCurrentLogs.get().getKey();
+        }
+    }
+
     /**
      * Flushes all rotated log channels. After log channels are flushed,
      * move leastUnflushedLogId ptr to current logId.
@@ -803,66 +1222,50 @@ void checkpoint() throws IOException {
     }
 
     void flushRotatedLogs() throws IOException {
-        List<BufferedLogChannel> channels = null;
-        long flushedLogId = INVALID_LID;
-        synchronized (this) {
-            channels = logChannelsToFlush;
-            logChannelsToFlush = null;
-        }
+        Set<BufferedLogChannel> channels = entryLogManager.getCopyOfRotatedLogChannels();
         if (null == channels) {
             return;
         }
-        Iterator<BufferedLogChannel> chIter = channels.iterator();
-        while (chIter.hasNext()) {
-            BufferedLogChannel channel = chIter.next();
-            try {
-                channel.flush(true);
-            } catch (IOException ioe) {
-                // rescue from flush exception, add unflushed channels back
-                synchronized (this) {
-                    if (null == logChannelsToFlush) {
-                        logChannelsToFlush = channels;
-                    } else {
-                        logChannelsToFlush.addAll(0, channels);
-                    }
-                }
-                throw ioe;
-            }
-            // remove the channel from the list after it is successfully flushed
-            chIter.remove();
+        for (BufferedLogChannel channel : channels) {
+            channel.flush(true);
             // since this channel is only used for writing, after flushing the channel,
             // we had to close the underlying file channel. Otherwise, we might end up
             // leaking fds which cause the disk spaces could not be reclaimed.
             closeFileChannel(channel);
-            if (channel.getLogId() > flushedLogId) {
-                flushedLogId = channel.getLogId();
-            }
+            recentlyCreatedEntryLogsStatus.flushRotatedEntryLog(channel.getLogId());
+            entryLogManager.removeFromRotatedLogChannels(channel);
             LOG.info("Synced entry logger {} to disk.", channel.getLogId());
         }
-        // move the leastUnflushedLogId ptr
-        leastUnflushedLogId = flushedLogId + 1;
     }
 
     public void flush() throws IOException {
+        flushCurrentLogs();
         flushRotatedLogs();
-        flushCurrentLog();
     }
 
-    synchronized void flushCurrentLog() throws IOException {
+    void flushCurrentLogs() throws IOException {
+        Set<BufferedLogChannel> copyOfCurrentLogs = entryLogManager.getCopyOfCurrentLogs();
+        for (BufferedLogChannel logChannel : copyOfCurrentLogs) {
+            /**
+             * flushCurrentLogs method is called during checkpoint, so metadata
+             * of the file also should be force written.
+             */
+            flushCurrentLog(logChannel, true);
+        }
+    }
+
+    void flushCurrentLog(BufferedLogChannel logChannel, boolean forceMetadata) throws IOException {
         if (logChannel != null) {
-            logChannel.flush(true);
-            bytesWrittenSinceLastFlush = 0;
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Flush and sync current entry logger {}.", logChannel.getLogId());
-            }
+            logChannel.flush(true, forceMetadata);
+            LOG.debug("Flush and sync current entry logger {}", logChannel.getLogId());
         }
     }
 
-    long addEntry(long ledger, ByteBuffer entry) throws IOException {
+    long addEntry(Long ledger, ByteBuffer entry) throws IOException {
         return addEntry(ledger, Unpooled.wrappedBuffer(entry), true);
     }
 
-    long addEntry(long ledger, ByteBuf entry) throws IOException {
+    long addEntry(Long ledger, ByteBuf entry) throws IOException {
         return addEntry(ledger, entry, true);
     }
 
@@ -873,36 +1276,44 @@ protected ByteBuf initialValue() throws Exception {
         }
     };
 
-    public synchronized long addEntry(long ledger, ByteBuf entry, boolean rollLog) throws IOException {
-        int entrySize = entry.readableBytes() + 4; // Adding 4 bytes to prepend the size
-        boolean reachEntryLogLimit =
-            rollLog ? reachEntryLogLimit(entrySize) : readEntryLogHardLimit(entrySize);
-        // Create new log if logSizeLimit reached or current disk is full
-        boolean createNewLog = shouldCreateNewEntryLog.get();
-        if (createNewLog || reachEntryLogLimit) {
-            if (doRegularFlushes) {
-                flushCurrentLog();
-            }
-            createNewLog();
-            // Reset the flag
-            if (createNewLog) {
-                shouldCreateNewEntryLog.set(false);
+    public long addEntry(Long ledger, ByteBuf entry, boolean rollLog) throws IOException {
+        entryLogManager.acquireLockByCreatingIfRequired(ledger);
+        try {
+            int entrySize = entry.readableBytes() + 4; // Adding 4 bytes to prepend the size
+            boolean reachEntryLogLimit = rollLog ? reachEntryLogLimit(ledger, entrySize)
+                    : readEntryLogHardLimit(ledger, entrySize);
+            BufferedLogChannel logChannel = entryLogManager.getCurrentLogForLedger(ledger);
+            // Create new log if logSizeLimit reached or current disk is full
+            boolean diskFull = (logChannel == null) ? false : logChannel.isLedgerDirFull();
+            boolean allDisksFull = !ledgerDirsManager.hasWritableLedgerDirs();
+
+            /**
+             * if disk of the logChannel is full or if the entrylog limit is
+             * reached of if the logchannel is not initialized, then
+             * createNewLog. If allDisks are full then proceed with the current
+             * logChannel, since Bookie must have turned to readonly mode and
+             * the addEntry traffic would be from GC and it is ok to proceed in
+             * this case.
+             */
+            if ((diskFull && (!allDisksFull)) || reachEntryLogLimit || (logChannel == null)) {
+                flushCurrentLog(logChannel, false);
+                createNewLog(ledger);
             }
-        }
 
-        // Get a buffer from thread local to store the size
-        ByteBuf sizeBuffer = this.sizeBuffer.get();
-        sizeBuffer.clear();
-        sizeBuffer.writeInt(entry.readableBytes());
-        logChannel.write(sizeBuffer);
-
-        long pos = logChannel.position();
-        logChannel.write(entry);
-        logChannel.registerWrittenEntry(ledger, entrySize);
+            logChannel = entryLogManager.getCurrentLogForLedger(ledger);
+            ByteBuf sizeBuffer = this.sizeBuffer.get();
+            sizeBuffer.clear();
+            sizeBuffer.writeInt(entry.readableBytes());
+            logChannel.write(sizeBuffer);
 
-        incrementBytesWrittenAndMaybeFlush(4L + entrySize);
+            long pos = logChannel.position();
+            logChannel.write(entry);
+            logChannel.registerWrittenEntry(ledger, entrySize);
 
-        return (logChannel.getLogId() << 32L) | pos;
+            return (logChannel.getLogId() << 32L) | pos;
+        } finally {
+            entryLogManager.releaseLock(ledger);
+        }
     }
 
     long addEntryForCompaction(long ledgerId, ByteBuf entry) throws IOException {
@@ -969,26 +1380,34 @@ void removeCurCompactionLog() {
         }
     }
 
-    private void incrementBytesWrittenAndMaybeFlush(long bytesWritten) throws IOException {
-        if (!doRegularFlushes) {
-            return;
-        }
-        bytesWrittenSinceLastFlush += bytesWritten;
-        if (bytesWrittenSinceLastFlush > flushIntervalInBytes) {
-            flushCurrentLog();
-        }
-    }
-
     static long logIdForOffset(long offset) {
         return offset >> 32L;
     }
 
-    synchronized boolean reachEntryLogLimit(long size) {
-        return logChannel.position() + size > logSizeLimit;
+    boolean reachEntryLogLimit(Long ledger, long size) {
+        entryLogManager.acquireLock(ledger);
+        try {
+            BufferedLogChannel logChannel = entryLogManager.getCurrentLogForLedger(ledger);
+            if (logChannel == null) {
+                return false;
+            }
+            return logChannel.position() + size > logSizeLimit;
+        } finally {
+            entryLogManager.releaseLock(ledger);
+        }
     }
 
-    synchronized boolean readEntryLogHardLimit(long size) {
-        return logChannel.position() + size > Integer.MAX_VALUE;
+    boolean readEntryLogHardLimit(Long ledger, long size) {
+        entryLogManager.acquireLock(ledger);
+        try {
+            BufferedLogChannel logChannel = entryLogManager.getCurrentLogForLedger(ledger);
+            if (logChannel == null) {
+                return false;
+            }
+            return logChannel.position() + size > Integer.MAX_VALUE;
+        } finally {
+            entryLogManager.releaseLock(ledger);
+        }
     }
 
     public ByteBuf internalReadEntry(long ledgerId, long entryId, long location)
@@ -1360,6 +1779,7 @@ public boolean accept(long ledgerId) {
     public void shutdown() {
         // since logChannel is buffered channel, do flush when shutting down
         LOG.info("Stopping EntryLogger");
+        Set<BufferedLogChannel> copyOfCurrentLogs = entryLogManager.getCopyOfCurrentLogs();
         try {
             flush();
             for (FileChannel fc : logid2FileChannel.values()) {
@@ -1367,8 +1787,10 @@ public void shutdown() {
             }
             // clear the mapping, so we don't need to go through the channels again in finally block in normal case.
             logid2FileChannel.clear();
-            // close current writing log file
-            closeFileChannel(logChannel);
+            for (BufferedLogChannel currentLog : copyOfCurrentLogs) {
+                // close current writing log file
+                closeFileChannel(currentLog);
+            }
             synchronized (compactionLogLock) {
                 closeFileChannel(compactionLogChannel);
                 compactionLogChannel = null;
@@ -1380,7 +1802,9 @@ public void shutdown() {
             for (FileChannel fc : logid2FileChannel.values()) {
                 IOUtils.close(LOG, fc);
             }
-            forceCloseFileChannel(logChannel);
+            for (BufferedLogChannel currentLog : copyOfCurrentLogs) {
+                forceCloseFileChannel(currentLog);
+            }
             synchronized (compactionLogLock) {
                 forceCloseFileChannel(compactionLogChannel);
             }
@@ -1435,4 +1859,40 @@ static long fileName2LogId(String fileName) {
     static String logId2HexString(long logId) {
         return Long.toHexString(logId);
     }
+
+    /**
+     * Datastructure which maintains the status of logchannels. When a
+     * logChannel is created entry of < entryLogId, false > will be made to this
+     * sortedmap and when logChannel is rotated and flushed then the entry is
+     * updated to < entryLogId, true > and all the lowest entries with
+     * < entryLogId, true > status will be removed from the sortedmap. So that way
+     * we could get least unflushed LogId.
+     *
+     */
+    class RecentEntryLogsStatus {
+        private SortedMap<Long, Boolean> entryLogsStatusMap;
+        private long leastUnflushedLogId;
+
+        RecentEntryLogsStatus(long leastUnflushedLogId) {
+            entryLogsStatusMap = new TreeMap<Long, Boolean>();
+            this.leastUnflushedLogId = leastUnflushedLogId;
+        }
+
+        synchronized void createdEntryLog(Long entryLogId) {
+            entryLogsStatusMap.put(entryLogId, false);
+        }
+
+        synchronized void flushRotatedEntryLog(Long entryLogId) {
+            entryLogsStatusMap.replace(entryLogId, true);
+            while ((!entryLogsStatusMap.isEmpty()) && (entryLogsStatusMap.get(entryLogsStatusMap.firstKey()))) {
+                long leastFlushedLogId = entryLogsStatusMap.firstKey();
+                entryLogsStatusMap.remove(leastFlushedLogId);
+                leastUnflushedLogId = leastFlushedLogId + 1;
+            }
+        }
+
+        synchronized long getLeastUnflushedLogId() {
+            return leastUnflushedLogId;
+        }
+    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java
index 50e93a3dc..43664c60e 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java
@@ -27,10 +27,17 @@
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
 import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.bookkeeper.bookie.Bookie.NoLedgerException;
 import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
@@ -39,6 +46,8 @@
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.util.MathUtils;
+import org.apache.bookkeeper.util.OrderedSafeExecutor;
+import org.apache.bookkeeper.util.SafeRunnable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,7 +58,7 @@
  * flusher reports in that the flush succeeded. At that point we let the snapshot go.
  */
 public class EntryMemTable {
-    private static Logger logger = LoggerFactory.getLogger(Journal.class);
+    private static Logger logger = LoggerFactory.getLogger(EntryMemTable.class);
 
     /**
      * Entry skip list.
@@ -113,6 +122,9 @@ private EntrySkipList newSkipList() {
         return new EntrySkipList(checkpointSource.newCheckpoint());
     }
 
+    private final OrderedSafeExecutor flushExecutor;
+    private final int memtableFlushTimeoutInSeconds;
+
     // Stats
     private final OpStatsLogger snapshotStats;
     private final OpStatsLogger putEntryStats;
@@ -125,7 +137,7 @@ private EntrySkipList newSkipList() {
     * @param conf Server configuration
     */
     public EntryMemTable(final ServerConfiguration conf, final CheckpointSource source,
-                         final StatsLogger statsLogger) {
+            OrderedSafeExecutor flushExecutor, final StatsLogger statsLogger) {
         this.checkpointSource = source;
         this.kvmap = newSkipList();
         this.snapshot = EntrySkipList.EMPTY_VALUE;
@@ -142,6 +154,8 @@ public EntryMemTable(final ServerConfiguration conf, final CheckpointSource sour
         this.getEntryStats = statsLogger.getOpStatsLogger(SKIP_LIST_GET_ENTRY);
         this.flushBytesCounter = statsLogger.getCounter(SKIP_LIST_FLUSH_BYTES);
         this.throttlingCounter = statsLogger.getCounter(SKIP_LIST_THROTTLING);
+        this.flushExecutor = flushExecutor;
+        this.memtableFlushTimeoutInSeconds = conf.getMemtableFlushTimeoutInSeconds();
     }
 
     void dump() {
@@ -236,6 +250,14 @@ public long flush(SkipListFlusher flusher, Checkpoint checkpoint) throws IOExcep
      * Only this function change non-empty this.snapshot.
      */
     private long flushSnapshot(final SkipListFlusher flusher, Checkpoint checkpoint) throws IOException {
+        if (flushExecutor == null) {
+            return flushSnapshotSequentially(flusher, checkpoint);
+        } else {
+            return flushSnapshotParallelly(flusher, checkpoint);
+        }
+    }
+
+    long flushSnapshotSequentially(final SkipListFlusher flusher, Checkpoint checkpoint) throws IOException {
         long size = 0;
         if (this.snapshot.compareTo(checkpoint) < 0) {
             long ledger, ledgerGC = -1;
@@ -263,6 +285,81 @@ private long flushSnapshot(final SkipListFlusher flusher, Checkpoint checkpoint)
         return size;
     }
 
+    long flushSnapshotParallelly(final SkipListFlusher flusher, Checkpoint checkpoint) throws IOException {
+        AtomicLong flushedSize = new AtomicLong();
+        if (this.snapshot.compareTo(checkpoint) < 0) {
+            synchronized (this) {
+                EntrySkipList keyValues = this.snapshot;
+                if (keyValues.compareTo(checkpoint) < 0) {
+                    NavigableSet<EntryKey> keyValuesSet = keyValues.keySet();
+                    Map<Long, List<EntryKeyValue>> entryKeyValuesMap = new HashMap<Long, List<EntryKeyValue>>();
+
+                    for (EntryKey key : keyValuesSet) {
+                        EntryKeyValue kv = (EntryKeyValue) key;
+                        Long ledger = kv.getLedgerId();
+                        if (!entryKeyValuesMap.containsKey(ledger)) {
+                            entryKeyValuesMap.put(ledger, new LinkedList<EntryKeyValue>());
+                        }
+                        entryKeyValuesMap.get(ledger).add(kv);
+                    }
+
+                    CountDownLatch latch = new CountDownLatch(entryKeyValuesMap.size());
+                    AtomicBoolean isFlushThreadInterrupted = new AtomicBoolean(false);
+                    AtomicReference<Exception> exceptionWhileFlushingParallelly =  new AtomicReference<Exception>();
+                    Thread mainFlushThread = Thread.currentThread();
+
+                    for (Long ledgerId : entryKeyValuesMap.keySet()) {
+                        List<EntryKeyValue> entryKeyValuesOfALedger = entryKeyValuesMap.get(ledgerId);
+                        flushExecutor.submitOrdered(ledgerId, new SafeRunnable() {
+                            @Override
+                            public void safeRun() {
+                                for (EntryKeyValue entryKeyValue : entryKeyValuesOfALedger) {
+                                    try {
+                                        flusher.process(ledgerId, entryKeyValue.getEntryId(),
+                                                entryKeyValue.getValueAsByteBuffer());
+                                        flushedSize.addAndGet(entryKeyValue.getLength());
+                                    } catch (NoLedgerException exception) {
+                                        logger.info("Got NoLedgerException while flushing entry: {}. The ledger "
+                                                + "must be deleted " + "after this entry is added to the Memtable",
+                                                entryKeyValue);
+                                        break;
+                                    } catch (Exception exc) {
+                                        logger.error(
+                                                "Got Exception while trying to flush process entry: " + entryKeyValue,
+                                                exc);
+                                        if (isFlushThreadInterrupted.compareAndSet(false, true)) {
+                                            exceptionWhileFlushingParallelly.set(exc);
+                                            mainFlushThread.interrupt();
+                                        }
+                                        // return without countdowning the latch since we got unexpected Exception
+                                        return;
+                                    }
+                                }
+                                latch.countDown();
+                            }
+                        });
+                    }
+
+                    try {
+                        while (!latch.await(memtableFlushTimeoutInSeconds, TimeUnit.SECONDS)) {
+                            logger.error("Entrymemtable parallel flush has not completed in {0} secs, so waiting again",
+                                    memtableFlushTimeoutInSeconds);
+                        }
+                        assert (exceptionWhileFlushingParallelly.get() == null);
+                        flushBytesCounter.add(flushedSize.get());
+                        clearSnapshot(keyValues);
+                    } catch (InterruptedException ie) {
+                        logger.error("Got Interrupted exception while waiting for the flushexecutor "
+                                + "to complete the entry flushes");
+                        throw new IOException("Failed to complete the flushSnapshotByParallelizing",
+                                exceptionWhileFlushingParallelly.get());
+                    }
+                }
+            }
+        }
+        return flushedSize.longValue();
+    }
+
     /**
      * The passed snapshot was successfully persisted; it can be let go.
      * @param keyValues The snapshot to clean out.
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
index a6d8361f8..b79ddcb24 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
@@ -34,6 +34,8 @@
 import java.util.NavigableMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
 import org.apache.bookkeeper.bookie.Bookie.NoLedgerException;
 import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
 import org.apache.bookkeeper.bookie.EntryLogger.EntryLogListener;
@@ -74,12 +76,14 @@
     GarbageCollectorThread gcThread;
 
     // this indicates that a write has happened since the last flush
-    private volatile boolean somethingWritten = false;
+    private AtomicBoolean somethingWritten = new AtomicBoolean(false);
 
     // Expose Stats
     private OpStatsLogger getOffsetStats;
     private OpStatsLogger getEntryStats;
 
+    protected boolean entryLogPerLedgerEnabled;
+
     @VisibleForTesting
     public InterleavedLedgerStorage() {
         activeLedgers = new SnapshotMap<Long, Boolean>();
@@ -97,7 +101,7 @@ public void initialize(ServerConfiguration conf,
             throws IOException {
         checkNotNull(checkpointSource, "invalid null checkpoint source");
         checkNotNull(checkpointer, "invalid null checkpointer");
-
+        this.entryLogPerLedgerEnabled = conf.isEntryLogPerLedgerEnabled();
         this.checkpointSource = checkpointSource;
         this.checkpointer = checkpointer;
         entryLogger = new EntryLogger(conf, ledgerDirsManager, this);
@@ -261,7 +265,7 @@ public boolean waitForLastAddConfirmedUpdate(long ledgerId,
 
 
     @Override
-    public synchronized long addEntry(ByteBuf entry) throws IOException {
+    public long addEntry(ByteBuf entry) throws IOException {
         long ledgerId = entry.getLong(entry.readerIndex() + 0);
         long entryId = entry.getLong(entry.readerIndex() + 8);
         long lac = entry.getLong(entry.readerIndex() + 16);
@@ -328,9 +332,16 @@ private void flushOrCheckpoint(boolean isCheckpointFlush)
         }
 
         try {
-            // if it is just a checkpoint flush, we just flush rotated entry log files
-            // in entry logger.
-            if (isCheckpointFlush) {
+            /*
+             * if it is just a checkpoint flush and if entryLogPerLedger is not
+             * enabled, then we just flush rotated entry log files in entry
+             * logger.
+             *
+             * In the case of entryLogPerLedgerEnabled we need to flush both
+             * rotatedlogs and currentlogs. Hence we call entryLogger.flush in
+             * the case of entrylogperledgerenabled.
+             */
+            if (isCheckpointFlush && !entryLogPerLedgerEnabled) {
                 entryLogger.checkpoint();
             } else {
                 entryLogger.flush();
@@ -357,10 +368,10 @@ public void checkpoint(Checkpoint checkpoint) throws IOException {
 
     @Override
     public synchronized void flush() throws IOException {
-        if (!somethingWritten) {
+        if (!somethingWritten.get()) {
             return;
         }
-        somethingWritten = false;
+        somethingWritten.set(false);
         flushOrCheckpoint(false);
     }
 
@@ -416,12 +427,12 @@ protected void processEntry(long ledgerId, long entryId, ByteBuf entry) throws I
         processEntry(ledgerId, entryId, entry, true);
     }
 
-    protected synchronized void processEntry(long ledgerId, long entryId, ByteBuf entry, boolean rollLog)
+    protected void processEntry(long ledgerId, long entryId, ByteBuf entry, boolean rollLog)
             throws IOException {
         /*
          * Touch dirty flag
          */
-        somethingWritten = true;
+        somethingWritten.set(true);
 
         /*
          * Log the entry
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index fc1c80c7c..e93403315 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -82,7 +82,7 @@
      * @param filter journal id filter
      * @return list of filtered ids
      */
-    private static List<Long> listJournalIds(File journalDir, JournalIdFilter filter) {
+    static List<Long> listJournalIds(File journalDir, JournalIdFilter filter) {
         File logFiles[] = journalDir.listFiles();
         if (logFiles == null || logFiles.length == 0) {
             return Collections.emptyList();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyEntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyEntryLogger.java
index d661412be..085ba7081 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyEntryLogger.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyEntryLogger.java
@@ -38,12 +38,7 @@ public ReadOnlyEntryLogger(ServerConfiguration conf) throws IOException {
     }
 
     @Override
-    protected void initialize() throws IOException {
-        // do nothing for read only entry logger
-    }
-
-    @Override
-    void createNewLog() throws IOException {
+    void createNewLog(Long ledgerId) throws IOException {
         throw new IOException("Can't create new entry log using a readonly entry logger.");
     }
 
@@ -54,7 +49,7 @@ protected boolean removeEntryLog(long entryLogId) {
     }
 
     @Override
-    public synchronized long addEntry(long ledger, ByteBuffer entry) throws IOException {
+    public synchronized long addEntry(Long ledgerId, ByteBuffer entry) throws IOException {
         throw new IOException("Can't add entry to a readonly entry logger.");
     }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
index 0e3e3b946..c31c8ed9b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
@@ -32,6 +32,7 @@
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.proto.BookieProtocol;
 import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.util.OrderedSafeExecutor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,6 +49,7 @@
     EntryMemTable memTable;
     private ScheduledExecutorService scheduler;
     private StateManager stateManager;
+    OrderedSafeExecutor flushExecutor;
 
     public SortedLedgerStorage() {
         super();
@@ -72,7 +74,11 @@ public void initialize(ServerConfiguration conf,
             checkpointSource,
             checkpointer,
             statsLogger);
-        this.memTable = new EntryMemTable(conf, checkpointSource, statsLogger);
+        if (conf.isEntryLogPerLedgerEnabled()) {
+            flushExecutor = OrderedSafeExecutor.newBuilder().numThreads(conf.getNumOfMemtableFlushThreads())
+                    .name("MemtableFlushThreads").build();
+        }
+        this.memTable = new EntryMemTable(conf, checkpointSource, flushExecutor, statsLogger);
         this.scheduler = Executors.newSingleThreadScheduledExecutor(
                 new ThreadFactoryBuilder()
                 .setNameFormat("SortedLedgerStorage-%d")
@@ -102,6 +108,9 @@ public void shutdown() throws InterruptedException {
         if (!scheduler.awaitTermination(3, TimeUnit.SECONDS)) {
             scheduler.shutdownNow();
         }
+        if (flushExecutor != null) {
+            flushExecutor.shutdown();
+        }
         super.shutdown();
     }
 
@@ -168,13 +177,6 @@ public ByteBuf getEntry(long ledgerId, long entryId) throws IOException {
     @Override
     public void checkpoint(final Checkpoint checkpoint) throws IOException {
         long numBytesFlushed = memTable.flush(this, checkpoint);
-        if (numBytesFlushed > 0) {
-            // if bytes are added between previous flush and this checkpoint,
-            // it means bytes might live at current active entry log, we need
-            // roll current entry log and then issue checkpoint to underlying
-            // interleaved ledger storage.
-            entryLogger.rollLog();
-        }
         super.checkpoint(checkpoint);
     }
 
@@ -209,16 +211,9 @@ public void onSizeLimitReached(final Checkpoint cp) throws IOException {
             public void run() {
                 try {
                     LOG.info("Started flushing mem table.");
-                    long logIdBeforeFlush = entryLogger.getCurrentLogId();
                     memTable.flush(SortedLedgerStorage.this);
-                    long logIdAfterFlush = entryLogger.getCurrentLogId();
                     // in any case that an entry log reaches the limit, we roll the log and start checkpointing.
-                    // if a memory table is flushed spanning over two entry log files, we also roll log. this is
-                    // for performance consideration: since we don't wanna checkpoint a new log file that ledger
-                    // storage is writing to.
-                    if (entryLogger.reachEntryLogLimit(0) || logIdAfterFlush != logIdBeforeFlush) {
-                        LOG.info("Rolling entry logger since it reached size limitation");
-                        entryLogger.rollLog();
+                    if (entryLogger.rollLogsIfEntryLogLimitReached()) {
                         checkpointer.startCheckpoint(cp);
                     }
                 } catch (IOException e) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java
index 3fbedbc60..74e9617ac 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java
@@ -119,6 +119,12 @@ public void startCheckpoint(Checkpoint checkpoint) {
         });
     }
 
+    void start() {
+        executor.scheduleAtFixedRate(() -> {
+            startCheckpoint(checkpointSource.newCheckpoint());
+        }, flushInterval, flushInterval, TimeUnit.MILLISECONDS);
+    }
+
     private void flush() {
         Checkpoint checkpoint = checkpointSource.newCheckpoint();
         try {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index 3a7c763de..d532460e4 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -177,6 +177,23 @@
     // Stats
     protected static final String ENABLE_TASK_EXECUTION_STATS = "enableTaskExecutionStats";
 
+    /*
+     * config specifying if the entrylog per ledger is enabled or not.
+     */
+    protected static final String ENTRY_LOG_PERLEDGER_ENABLED = "entryLogPerLedgerEnabled";
+
+    // In the case of multipleentrylogs, multiple threads can be used to flush the memtable parallelly.
+    protected static final String NUMBER_OF_MEMTABLE_FLUSH_THREADS = "numOfMemtableFlushThreads";
+
+    /*
+     * In the case of multiple entrylogs, memtableFlushTimeoutInSeconds specifies the amount of time main flushthread
+     * has to wait for the processor threads to complete the flush
+     */
+    protected static final String MEMTABLE_FLUSH_TIMEOUT_INSECONDS = "memtableFlushTimeoutInSeconds";
+
+
+    protected static final String ENTRYLOGMAP_ACCESS_EXPIRYTIME_INSECONDS = "entrylogMapAccessExpiryTimeInSeconds";
+
     /**
      * Construct a default configuration object.
      */
@@ -2586,4 +2603,79 @@ public void setRegistrationManagerClass(
     protected ServerConfiguration getThis() {
         return this;
     }
+
+
+    /*
+     * specifies if entryLog per ledger is enabled. If it is enabled, then there
+     * would be a active entrylog for each ledger
+     */
+    public boolean isEntryLogPerLedgerEnabled() {
+        return this.getBoolean(ENTRY_LOG_PERLEDGER_ENABLED, false);
+    }
+
+    /*
+     * enables/disables entrylog per ledger feature.
+     *
+     */
+    public ServerConfiguration setEntryLogPerLedgerEnabled(boolean entryLogPerLedgerEnabled) {
+        this.setProperty(ENTRY_LOG_PERLEDGER_ENABLED, Boolean.toString(entryLogPerLedgerEnabled));
+        return this;
+    }
+
+    /*
+     * in entryLogPerLedger feature, this specifies the time, once this duration
+     * has elapsed after the entry's last access, that entry should be
+     * automatically removed from the cache
+     */
+    public int getEntrylogMapAccessExpiryTimeInSeconds() {
+        return this.getInt(ENTRYLOGMAP_ACCESS_EXPIRYTIME_INSECONDS, 5 * 60);
+    }
+
+    /*
+     * sets the time duration for entrylogMapAccessExpiryTimeInSeconds, which will be used for cache eviction
+     * policy, in entrylogperledger feature.
+     *
+     */
+    public ServerConfiguration setEntrylogMapAccessExpiryTimeInSeconds(int entrylogMapAccessExpiryTimeInSeconds) {
+        this.setProperty(ENTRYLOGMAP_ACCESS_EXPIRYTIME_INSECONDS,
+                Integer.toString(entrylogMapAccessExpiryTimeInSeconds));
+        return this;
+    }
+
+    /*
+     * In the case of multipleentrylogs, multiple threads can be used to flush the memtable.
+     *
+     * Gets the number of threads used to flush entrymemtable
+     */
+    public int getNumOfMemtableFlushThreads() {
+        return this.getInt(NUMBER_OF_MEMTABLE_FLUSH_THREADS, 4);
+    }
+
+    /*
+     * Sets the number of threads used to flush entrymemtable, in the case of multiple entrylogs
+     *
+     */
+    public ServerConfiguration setNumOfMemtableFlushThreads(int numOfMemtableFlushThreads) {
+        this.setProperty(NUMBER_OF_MEMTABLE_FLUSH_THREADS, Integer.toString(numOfMemtableFlushThreads));
+        return this;
+    }
+
+    /*
+     * In the case of multiple entrylogs, memtableFlushTimeoutInSeconds specifies the amount of time main flushthread.
+     * has to wait for the processor threads to complete the flush.
+     *
+     * Gets the amount of time to wait for the flush to be completed.
+     */
+    public int getMemtableFlushTimeoutInSeconds() {
+        return this.getInt(MEMTABLE_FLUSH_TIMEOUT_INSECONDS, 120);
+    }
+
+    /*
+     * Sets the amount of time to wait for the flush to be completed.
+     *
+     */
+    public ServerConfiguration setMemtableFlushTimeoutInSeconds(int memtableFlushTimeoutInSeconds) {
+        this.setProperty(MEMTABLE_FLUSH_TIMEOUT_INSECONDS, Integer.toString(memtableFlushTimeoutInSeconds));
+        return this;
+    }
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
index 36406ce74..2004b92e9 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
@@ -29,21 +29,31 @@
 import static org.junit.Assert.fail;
 import java.io.BufferedWriter;
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStreamWriter;
 import java.net.BindException;
 import java.net.InetAddress;
+import java.nio.ByteBuffer;
 import java.security.AccessControlException;
 import java.util.ArrayList;
+import java.util.Enumeration;
 import java.util.List;
 import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.bookkeeper.bookie.BookieException.DiskPartitionDuplicationException;
+import org.apache.bookkeeper.bookie.Journal.LastLogMark;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.client.BookKeeperAdmin;
+import org.apache.bookkeeper.client.LedgerEntry;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
@@ -621,15 +631,6 @@ public void testWithDiskFullReadOnlyDisabledOrForceGCAllowDisabled() throws Exce
         } catch (NoWritableLedgerDirException e) {
             // expected
         }
-
-        conf.setIsForceGCAllowWhenNoSpace(false)
-            .setReadOnlyModeEnabled(true);
-        try {
-            new Bookie(conf);
-            fail("NoWritableLedgerDirException expected");
-        } catch (NoWritableLedgerDirException e) {
-            // expected
-        }
     }
 
     /**
@@ -753,6 +754,164 @@ public void testWithDiskFullAndAbilityToCreateNewIndexFile() throws Exception {
         bkClient.close();
     }
 
+    static class MockInterleavedLedgerStorage extends InterleavedLedgerStorage {
+        @Override
+        public void shutdown() {
+            // During BookieServer shutdown this method will be called
+            // and we want it to be noop.
+            // do nothing
+        }
+
+        @Override
+        public synchronized void flush() throws IOException {
+            // this method will be called by SyncThread.shutdown.
+            // During BookieServer shutdown we want this method to be noop
+            // do nothing
+        }
+    }
+
+    private LogMark readLastMarkFile(File lastMarkFile) throws IOException {
+        byte buff[] = new byte[16];
+        ByteBuffer bb = ByteBuffer.wrap(buff);
+        LogMark rolledLogMark = new LogMark();
+        FileInputStream fis = new FileInputStream(lastMarkFile);
+        int bytesRead = fis.read(buff);
+        fis.close();
+        if (bytesRead != 16) {
+            throw new IOException("Couldn't read enough bytes from lastMark." + " Wanted " + 16 + ", got " + bytesRead);
+        }
+        bb.clear();
+        rolledLogMark.readLogMark(bb);
+        return rolledLogMark;
+    }
+
+    @Test(timeout = 30000)
+    public void testCheckPointForEntryLoggerWithMultipleActiveEntryLogs() throws Exception {
+        File tmpDir = createTempDir("DiskCheck", "test");
+
+        final ServerConfiguration conf = TestBKConfiguration.newServerConfiguration()
+                .setZkServers(zkUtil.getZooKeeperConnectString())
+                .setZkTimeout(5000)
+                .setJournalDirName(tmpDir.getPath())
+                .setLedgerDirNames(new String[] { tmpDir.getPath() })
+                .setAutoRecoveryDaemonEnabled(false)
+                .setFlushInterval(5000)
+                .setBookiePort(PortManager.nextFreePort())
+                // entrylog per ledger is enabled
+                .setEntryLogPerLedgerEnabled(true)
+                .setLedgerStorageClass(MockInterleavedLedgerStorage.class.getName());
+        Assert.assertEquals("Number of JournalDirs", 1, conf.getJournalDirs().length);
+        // we know there is only one ledgerDir
+        File ledgerDir = Bookie.getCurrentDirectories(conf.getLedgerDirs())[0];
+        BookieServer server = new BookieServer(conf);
+        server.start();
+        ClientConfiguration clientConf = new ClientConfiguration();
+        clientConf.setZkServers(zkUtil.getZooKeeperConnectString());
+        BookKeeper bkClient = new BookKeeper(clientConf);
+
+        ExecutorService threadPool = Executors.newFixedThreadPool(30);
+        int numOfLedgers = 12;
+        int numOfEntries = 500;
+        byte[] dataBytes = "data".getBytes();
+        LedgerHandle[] handles = new LedgerHandle[numOfLedgers];
+        AtomicBoolean receivedExceptionForAdd = new AtomicBoolean(false);
+        CountDownLatch countDownLatch = new CountDownLatch(numOfLedgers * numOfEntries);
+        for (int i = 0; i < numOfLedgers; i++) {
+            int ledgerIndex = i;
+            handles[i] = bkClient.createLedgerAdv((long) i, 1, 1, 1, DigestType.CRC32, "passwd".getBytes(), null);
+            for (int j = 0; j < numOfEntries; j++) {
+                int entryIndex = j;
+                threadPool.submit(() -> {
+                    try {
+                        handles[ledgerIndex].addEntry(entryIndex, dataBytes);
+                    } catch (Exception e) {
+                        LOG.error("Got Exception while trying to addEntry for ledgerId: " + ledgerIndex + " entry: "
+                                + entryIndex, e);
+                        receivedExceptionForAdd.set(true);
+                    } finally {
+                        countDownLatch.countDown();
+                    }
+                });
+            }
+        }
+        Assert.assertTrue("It is expected add requests are supposed to be completed in 3000 secs",
+                countDownLatch.await(3000, TimeUnit.MILLISECONDS));
+        Assert.assertFalse("there shouldn't be any exceptions for addentry requests", receivedExceptionForAdd.get());
+        for (int i = 0; i < numOfLedgers; i++) {
+            handles[i].close();
+        }
+        threadPool.shutdown();
+
+        LastLogMark lastLogMarkBeforeCheckpoint = server.getBookie().journals.get(0).getLastLogMark();
+        LogMark curMarkBeforeCheckpoint = lastLogMarkBeforeCheckpoint.getCurMark();
+
+        File lastMarkFile = new File(ledgerDir, "lastMark");
+        // lastMark file should be zero, because checkpoint hasn't happenend
+        LogMark logMarkFileBeforeCheckpoint = readLastMarkFile(lastMarkFile);
+        Assert.assertEquals("lastMarkFile before checkpoint should be zero", 0,
+                logMarkFileBeforeCheckpoint.compare(new LogMark()));
+
+        Thread.sleep(conf.getFlushInterval() + 1000);
+        // since we have waited for more than flushInterval SyncThread should have checkpointed.
+        // if entrylogperledger is not enabled, then we checkpoint only when currentLog in EntryLogger
+        // is rotated. but if entrylogperledger is enabled, then we checkpoint for every flushInterval period
+        Assert.assertTrue("lastMark file must be existing, because checkpoint should have happened",
+                lastMarkFile.exists());
+
+        LastLogMark lastLogMarkAfterCheckpoint = server.getBookie().journals.get(0).getLastLogMark();
+        LogMark curMarkAfterCheckpoint = lastLogMarkAfterCheckpoint.getCurMark();
+
+        LogMark rolledLogMark = readLastMarkFile(lastMarkFile);
+        Assert.assertNotEquals("rolledLogMark should not be zero, since checkpoint has happenend", 0,
+                rolledLogMark.compare(new LogMark()));
+        // Curmark should be equal before and after checkpoint,
+        // because we didnt add new entries during this period
+        Assert.assertTrue("Curmark should be equal before and after checkpoint",
+                curMarkAfterCheckpoint.compare(curMarkBeforeCheckpoint) == 0);
+        // Curmark after checkpoint should be equal to rolled logmark,
+        // because we checkpointed
+        Assert.assertTrue("Curmark after checkpoint should be equal to rolled logmark",
+                curMarkAfterCheckpoint.compare(rolledLogMark) == 0);
+
+        // here we are calling shutdown, but MockInterleavedLedgerStorage shudown/flush
+        // methods are noop, so entrylogger is not flushed as part of this shutdown
+        // here we are trying to simulate Bookie crash, but there is no way to
+        // simulate bookie abrupt crash
+        server.shutdown();
+        bkClient.close();
+
+        // delete journal files and lastMark, to make sure that we are not reading from
+        // Journal file
+        File journalDirectory = Bookie.getCurrentDirectory(conf.getJournalDirs()[0]);
+        List<Long> journalLogsId = Journal.listJournalIds(journalDirectory, null);
+        for (long journalId : journalLogsId) {
+            File journalFile = new File(journalDirectory, Long.toHexString(journalId) + ".txn");
+            journalFile.delete();
+        }
+
+        // we know there is only one ledgerDir
+        lastMarkFile = new File(ledgerDir, "lastMark");
+        lastMarkFile.delete();
+
+        // now we are restarting BookieServer
+        server = new BookieServer(conf);
+        server.start();
+        bkClient = new BookKeeper(clientConf);
+        // since Bookie checkpointed successfully before shutdown/crash,
+        // we should be able to read from entryLogs though journal is deleted
+        for (int i = 0; i < numOfLedgers; i++) {
+            LedgerHandle lh = bkClient.openLedger(i, DigestType.CRC32, "passwd".getBytes());
+            Enumeration<LedgerEntry> entries = lh.readEntries(0, numOfEntries - 1);
+            while (entries.hasMoreElements()) {
+                LedgerEntry entry = entries.nextElement();
+                byte[] readData = entry.getEntry();
+                Assert.assertEquals("Ledger Entry Data should match", new String("data".getBytes()),
+                        new String(readData));
+            }
+        }
+        server.shutdown();
+    }
+
     /**
      * Check disk error for file. Expected to throw DiskErrorException.
      */
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
index 9294e27f0..8d8d6c4e3 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
@@ -171,7 +171,7 @@ private void writePreV2Journal(File journalDir, int numEntries) throws Exception
 
     private static void moveToPosition(JournalChannel jc, long pos) throws IOException {
         jc.fc.position(pos);
-        jc.bc.position = pos;
+        jc.bc.position.set(pos);
         jc.bc.writeBufferStartPosition.set(pos);
     }
 
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java
index 33642c279..3c8421526 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java
@@ -21,12 +21,26 @@
 import static org.junit.Assert.assertTrue;
 
 import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.bookkeeper.bookie.EntryLogger.BufferedLogChannel;
+import org.apache.bookkeeper.bookie.EntryLogger.EntryLogManager;
+import org.apache.bookkeeper.bookie.EntryLogger.EntryLoggerAllocator;
+import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.conf.TestBKConfiguration;
 import org.apache.bookkeeper.util.DiskChecker;
+import org.apache.commons.lang.mutable.MutableInt;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -100,9 +114,9 @@ public void testCreateNewLog() throws Exception {
         EntryLogger el = new EntryLogger(conf, ledgerDirsManager);
         // Calls createNewLog, and with the number of directories we
         // are using, if it picks one at random it will fail.
-        el.createNewLog();
-        LOG.info("This is the current log id: " + el.getCurrentLogId());
-        assertTrue("Wrong log id", el.getCurrentLogId() > 1);
+        el.createNewLog(0L);
+        LOG.info("This is the current log id: " + el.getPreviousAllocatedEntryLogId());
+        assertTrue("Wrong log id", el.getPreviousAllocatedEntryLogId() > 1);
     }
 
     @Test
@@ -131,9 +145,316 @@ public void testCreateNewLogWithNoWritableLedgerDirs() throws Exception {
         EntryLogger el = new EntryLogger(conf, ledgerDirsManager);
         // Calls createNewLog, and with the number of directories we
         // are using, if it picks one at random it will fail.
-        el.createNewLog();
-        LOG.info("This is the current log id: " + el.getCurrentLogId());
-        assertTrue("Wrong log id", el.getCurrentLogId() > 1);
+        el.createNewLog(0L);
+        LOG.info("This is the current log id: " + el.getPreviousAllocatedEntryLogId());
+        assertTrue("Wrong log id", el.getPreviousAllocatedEntryLogId() > 1);
     }
 
+    /*
+     * entryLogPerLedger is enabled and various scenarios of entrylogcreation are tested
+     */
+    @Test(timeout = 60000)
+    public void testEntryLogPerLedgerCreationWithPreAllocation() throws Exception {
+        /*
+         * I wish I could shorten this testcase or split it into multiple testcases,
+         * but I want to cover a scenario and it requires multiple operations in
+         * sequence and validations along the way. Please bear with the length of this
+         * testcase, I added as many comments as I can to simplify it.
+         */
+
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+
+        // Creating a new configuration with a number of ledger directories.
+        conf.setLedgerDirNames(ledgerDirs);
+        conf.setIsForceGCAllowWhenNoSpace(true);
+        // preAllocation is Enabled
+        conf.setEntryLogFilePreAllocationEnabled(true);
+        conf.setEntryLogPerLedgerEnabled(true);
+        LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
+        EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager);
+        EntryLoggerAllocator entryLoggerAllocator = entryLogger.entryLoggerAllocator;
+        EntryLogManager entryLogManager = entryLogger.entryLogManager;
+
+        /*
+         * no entrylog will be created during initialization
+         */
+        int expectedPreAllocatedLogID = -1;
+        Assert.assertEquals("PreallocatedlogId after initialization of Entrylogger",
+                expectedPreAllocatedLogID, entryLoggerAllocator.getPreallocatedLogId());
+
+        int numOfLedgers = 6;
+        for (long i = 0; i < numOfLedgers; i++) {
+            entryLogManager.acquireLockByCreatingIfRequired(i);
+            entryLogManager.releaseLock(i);
+        }
+
+        for (long i = 0; i < numOfLedgers; i++) {
+            /* since we are starting creation of new ledgers, entrylogid will be ledgerid */
+            entryLogger.createNewLog(i);
+        }
+
+        Thread.sleep(100);
+        /*
+         * preallocation is enabled so though entryLogId starts with 0, preallocatedLogId would be equal to numOfLedgers
+         */
+        expectedPreAllocatedLogID = numOfLedgers;
+        Assert.assertEquals("PreallocatedlogId after creation of logs for ledgers", expectedPreAllocatedLogID,
+                entryLoggerAllocator.getPreallocatedLogId());
+        Assert.assertEquals("Number of current ", numOfLedgers,
+                entryLogger.entryLogManager.getCopyOfCurrentLogs().size());
+        Assert.assertEquals("Number of LogChannels to flush", 0,
+                entryLogger.entryLogManager.getCopyOfRotatedLogChannels().size());
+
+        // create dummy entrylog file with id - (expectedPreAllocatedLogID + 1)
+        String logFileName = Long.toHexString(expectedPreAllocatedLogID + 1) + ".log";
+        File dir = ledgerDirsManager.pickRandomWritableDir();
+        LOG.info("Picked this directory: " + dir);
+        File newLogFile = new File(dir, logFileName);
+        newLogFile.createNewFile();
+
+        /*
+         * since there is already preexisting entrylog file with id -
+         * (expectedPreAllocatedLogIDDuringInitialization + 1), when new
+         * entrylog is created it should have
+         * (expectedPreAllocatedLogIDDuringInitialization + 2) id
+         */
+        long rotatedLedger = 1L;
+        entryLogger.createNewLog(rotatedLedger);
+        Thread.sleep(100);
+        expectedPreAllocatedLogID = expectedPreAllocatedLogID + 2;
+        Assert.assertEquals("PreallocatedlogId ",
+                expectedPreAllocatedLogID, entryLoggerAllocator.getPreallocatedLogId());
+        Assert.assertEquals("Number of current ", numOfLedgers,
+                entryLogger.entryLogManager.getCopyOfCurrentLogs().size());
+        Set<BufferedLogChannel> rotatedLogChannels = entryLogger.entryLogManager.getCopyOfRotatedLogChannels();
+        Assert.assertEquals("Number of LogChannels rotated", 1, rotatedLogChannels.size());
+        Assert.assertEquals("Rotated logchannel logid", rotatedLedger, rotatedLogChannels.iterator().next().getLogId());
+        entryLogger.flush();
+        /*
+         * when flush is called all the rotatedlogchannels are flushed and
+         * removed from rotatedlogchannels list. But here since entrylogId - 0,
+         * is not yet rotated and flushed yet, getLeastUnflushedLogId will still
+         * return 0.
+         */
+        rotatedLogChannels = entryLogger.entryLogManager.getCopyOfRotatedLogChannels();
+        Assert.assertEquals("Number of LogChannels rotated", 0, rotatedLogChannels.size());
+        Assert.assertEquals("Least UnflushedLoggerId", 0, entryLogger.getLeastUnflushedLogId());
+
+        entryLogger.createNewLog(0L);
+        rotatedLogChannels = entryLogger.entryLogManager.getCopyOfRotatedLogChannels();
+        Assert.assertEquals("Number of LogChannels rotated", 1, rotatedLogChannels.size());
+        Assert.assertEquals("Least UnflushedLoggerId", 0, entryLogger.getLeastUnflushedLogId());
+        entryLogger.flush();
+        /*
+         * since both entrylogids 0, 1 are rotated and flushed,
+         * leastunFlushedLogId should be 2
+         */
+        Assert.assertEquals("Least UnflushedLoggerId", 2, entryLogger.getLeastUnflushedLogId());
+        expectedPreAllocatedLogID = expectedPreAllocatedLogID + 1;
+
+        /*
+         * we should be able to get entryLogMetadata from all the active
+         * entrylogs and the logs which are moved toflush list. Since no entry
+         * is added, all the meta should be empty.
+         */
+        for (int i = 0; i <= expectedPreAllocatedLogID; i++) {
+            EntryLogMetadata meta = entryLogger.getEntryLogMetadata(i);
+            Assert.assertTrue("EntryLogMetadata should be empty", meta.isEmpty());
+            Assert.assertTrue("EntryLog usage should be 0", meta.getTotalSize() == 0);
+        }
+    }
+
+    /**
+     * In this testcase entryLogPerLedger is Enabled and entrylogs are created
+     * while ledgerdirs are getting full.
+     */
+    @Test(timeout = 60000)
+    public void testEntryLogCreationWithFilledDirs() throws Exception {
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+
+        // Creating a new configuration with a number of ledger directories.
+        conf.setLedgerDirNames(ledgerDirs);
+        // forceGCAllowWhenNoSpace is disabled
+        conf.setIsForceGCAllowWhenNoSpace(false);
+        // pre-allocation is not enabled
+        conf.setEntryLogFilePreAllocationEnabled(false);
+        conf.setEntryLogPerLedgerEnabled(true);
+        LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
+        EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager);
+        EntryLoggerAllocator entryLoggerAllocator = entryLogger.entryLoggerAllocator;
+        Thread.sleep(200);
+        int expectedPreAllocatedLogIDDuringInitialization = -1;
+        Assert.assertEquals("PreallocatedlogId after initialization of Entrylogger",
+                expectedPreAllocatedLogIDDuringInitialization, entryLoggerAllocator.getPreallocatedLogId());
+        Assert.assertEquals("Preallocation Future of this slot should be null", null,
+                entryLogger.entryLoggerAllocator.preallocation);
+
+        long ledgerId = 0L;
+        entryLogger.entryLogManager.acquireLockByCreatingIfRequired(ledgerId);
+        entryLogger.entryLogManager.releaseLock(ledgerId);
+
+        entryLogger.createNewLog(ledgerId);
+        Thread.sleep(100);
+        /*
+         * pre-allocation is not enabled, so it would not preallocate for next entrylog
+         */
+        Assert.assertEquals("PreallocatedlogId after initialization of Entrylogger",
+                expectedPreAllocatedLogIDDuringInitialization + 1, entryLoggerAllocator.getPreallocatedLogId());
+
+        for (int i = 0; i < numDirs - 1; i++) {
+            ledgerDirsManager.addToFilledDirs(Bookie.getCurrentDirectory(new File(ledgerDirs[i])));
+        }
+
+        /*
+         * this is the only non-filled ledgerDir so it should be used for creating new entryLog
+         */
+        File nonFilledLedgerDir = Bookie.getCurrentDirectory(new File(ledgerDirs[numDirs - 1]));
+
+        entryLogger.createNewLog(ledgerId);
+        BufferedLogChannel newLogChannel = entryLogger.entryLogManager.getCurrentLogForLedger(ledgerId);
+        Assert.assertEquals("Directory of newly created BufferedLogChannel file", nonFilledLedgerDir.getAbsolutePath(),
+                newLogChannel.getLogFile().getParentFile().getAbsolutePath());
+
+        ledgerDirsManager.addToFilledDirs(Bookie.getCurrentDirectory(new File(ledgerDirs[numDirs - 1])));
+
+        try {
+            /*
+             * forceGCAllowWhenNoSpace is disabled so createNewLog should fail if all the dirs are filled
+             */
+            entryLogger.createNewLog(ledgerId);
+            Assert.fail("new entrylog creation should fail, since there is no writable ledgerDir");
+        } catch (NoWritableLedgerDirException nwe) {
+            // expected
+        }
+    }
+
+    /*
+     * In this testcase it is validated if the entryLog is created in the
+     * ledgerDir with least number of current active entrylogs
+     */
+    @Test(timeout = 60000)
+    public void testLedgerDirsUniformityDuringCreation() throws Exception {
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+
+        // Creating a new configuration with a number of ledger directories.
+        conf.setLedgerDirNames(ledgerDirs);
+        // pre-allocation is not enabled
+        conf.setEntryLogFilePreAllocationEnabled(false);
+        conf.setEntryLogPerLedgerEnabled(true);
+        LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
+        EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager);
+        EntryLogManager entrylogManager = entryLogger.entryLogManager;
+
+        for (long i = 0; i < ledgerDirs.length; i++) {
+            entrylogManager.acquireLockByCreatingIfRequired(i);
+            entrylogManager.releaseLock(i);
+
+            entryLogger.createNewLog(i);
+        }
+
+        int numberOfLedgersCreated = ledgerDirs.length;
+
+        Assert.assertEquals("Highest frequency of entrylogs per ledgerdir", 1,
+                highestFrequencyOfEntryLogsPerLedgerDir(entrylogManager.getCopyOfCurrentLogs()));
+
+        long newLedgerId = numberOfLedgersCreated;
+        entrylogManager.acquireLockByCreatingIfRequired(newLedgerId);
+        entrylogManager.releaseLock(newLedgerId);
+        entryLogger.createNewLog(newLedgerId);
+        numberOfLedgersCreated++;
+
+        Assert.assertEquals("Highest frequency of entrylogs per ledgerdir", 2,
+                highestFrequencyOfEntryLogsPerLedgerDir(entrylogManager.getCopyOfCurrentLogs()));
+
+        for (long i = numberOfLedgersCreated; i < 2 * ledgerDirs.length; i++) {
+            entrylogManager.acquireLockByCreatingIfRequired(i);
+            entrylogManager.releaseLock(i);
+
+            entryLogger.createNewLog(i);
+        }
+
+        Assert.assertEquals("Highest frequency of entrylogs per ledgerdir", 2,
+                highestFrequencyOfEntryLogsPerLedgerDir(entrylogManager.getCopyOfCurrentLogs()));
+    }
+
+
+    int highestFrequencyOfEntryLogsPerLedgerDir(Set<BufferedLogChannel> copyOfCurrentLogs) {
+        Map<File, MutableInt> frequencyOfEntryLogsInLedgerDirs = new HashMap<File, MutableInt>();
+        for (BufferedLogChannel logChannel : copyOfCurrentLogs) {
+            File parentDir = logChannel.getLogFile().getParentFile();
+            if (frequencyOfEntryLogsInLedgerDirs.containsKey(parentDir)) {
+                frequencyOfEntryLogsInLedgerDirs.get(parentDir).increment();
+            } else {
+                frequencyOfEntryLogsInLedgerDirs.put(parentDir, new MutableInt(1));
+            }
+        }
+        @SuppressWarnings("unchecked")
+        int highestFreq = ((Entry<File, MutableInt>) (frequencyOfEntryLogsInLedgerDirs.entrySet().stream()
+                .max(Map.Entry.comparingByValue()).get())).getValue().intValue();
+        return highestFreq;
+    }
+
+    /*
+     * In this testcase entrylogs for ledgers are tried to create concurrently.
+     */
+    @Test(timeout = 60000)
+    public void testConcurrentEntryLogCreations() throws Exception {
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+
+        // Creating a new configuration with a number of ledger directories.
+        conf.setLedgerDirNames(ledgerDirs);
+        // pre-allocation is enabled
+        conf.setEntryLogFilePreAllocationEnabled(true);
+        conf.setEntryLogPerLedgerEnabled(true);
+        LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
+        EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager);
+        EntryLogManager entrylogManager = entryLogger.entryLogManager;
+
+        int numOfLedgers = 10;
+        int numOfThreadsForSameLedger = 10;
+        AtomicInteger createdEntryLogs = new AtomicInteger(0);
+        CountDownLatch startLatch = new CountDownLatch(1);
+        CountDownLatch createdLatch = new CountDownLatch(numOfLedgers * numOfThreadsForSameLedger);
+
+        for (long i = 0; i < numOfLedgers; i++) {
+            entrylogManager.acquireLockByCreatingIfRequired(i);
+            entrylogManager.releaseLock(i);
+            for (int j = 0; j < numOfThreadsForSameLedger; j++) {
+                long ledgerId = i;
+                new Thread() {
+                    @Override
+                    public void run() {
+                        try {
+                            startLatch.await();
+                            entryLogger.createNewLog(ledgerId);
+                            createdEntryLogs.incrementAndGet();
+                        } catch (InterruptedException | IOException e) {
+                            LOG.error("Got exception while trying to createNewLog for Ledger: " + ledgerId, e);
+                        } finally {
+                            createdLatch.countDown();
+                        }
+                    }
+                }.start();
+            }
+        }
+
+        startLatch.countDown();
+        createdLatch.await(5, TimeUnit.SECONDS);
+        Assert.assertEquals("Created EntryLogs", numOfLedgers * numOfThreadsForSameLedger, createdEntryLogs.get());
+        Assert.assertEquals("Active currentlogs size", numOfLedgers, entrylogManager.getCopyOfCurrentLogs().size());
+        Assert.assertEquals("Rotated entrylogs size", (numOfThreadsForSameLedger - 1) * numOfLedgers,
+                entrylogManager.getCopyOfRotatedLogChannels().size());
+        /*
+         * EntryLogFilePreAllocation is Enabled so
+         * getPreviousAllocatedEntryLogId would be (numOfLedgers *
+         * numOfThreadsForSameLedger) instead of (numOfLedgers *
+         * numOfThreadsForSameLedger - 1)
+         */
+        Assert.assertEquals("PreviousAllocatedEntryLogId", numOfLedgers * numOfThreadsForSameLedger,
+                entryLogger.getPreviousAllocatedEntryLogId());
+    }
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
index c60a997a2..7a8dc8f82 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
@@ -36,9 +36,25 @@
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLongArray;
 
+import org.apache.bookkeeper.bookie.EntryLogger.BufferedLogChannel;
+import org.apache.bookkeeper.bookie.EntryLogger.EntryLogManager;
+import org.apache.bookkeeper.bookie.EntryLogger.EntryLogManagerForEntryLogPerLedger;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.conf.TestBKConfiguration;
 import org.apache.bookkeeper.util.DiskChecker;
@@ -57,6 +73,7 @@
     private static final Logger LOG = LoggerFactory.getLogger(EntryLogTest.class);
 
     final List<File> tempDirs = new ArrayList<File>();
+    Random rand = new Random();
 
     File createTempDir(String prefix, String suffix) throws IOException {
         File dir = IOUtils.createTempDir(prefix, suffix);
@@ -87,9 +104,9 @@ public void testCorruptEntryLog() throws Exception {
         Bookie bookie = new Bookie(conf);
         // create some entries
         EntryLogger logger = ((InterleavedLedgerStorage) bookie.ledgerStorage).entryLogger;
-        logger.addEntry(1, generateEntry(1, 1).nioBuffer());
-        logger.addEntry(3, generateEntry(3, 1).nioBuffer());
-        logger.addEntry(2, generateEntry(2, 1).nioBuffer());
+        logger.addEntry(1L, generateEntry(1, 1).nioBuffer());
+        logger.addEntry(3L, generateEntry(3, 1).nioBuffer());
+        logger.addEntry(2L, generateEntry(2, 1).nioBuffer());
         logger.flush();
         // now lets truncate the file to corrupt the last entry, which simulates a partial write
         File f = new File(curDir, "0.log");
@@ -115,6 +132,16 @@ private ByteBuf generateEntry(long ledger, long entry) {
         return bb;
     }
 
+    private ByteBuf generateEntry(long ledger, long entry, int length) {
+        ByteBuf bb = Unpooled.buffer(length);
+        bb.writeLong(ledger);
+        bb.writeLong(entry);
+        byte[] randbyteArray = new byte[length - 8 - 8];
+        rand.nextBytes(randbyteArray);
+        bb.writeBytes(randbyteArray);
+        return bb;
+    }
+
     @Test
     public void testMissingLogId() throws Exception {
         File tmpDir = createTempDir("entryLogTest", ".dir");
@@ -135,7 +162,7 @@ public void testMissingLogId() throws Exception {
             EntryLogger logger = new EntryLogger(conf,
                     bookie.getLedgerDirsManager());
             for (int j = 0; j < numEntries; j++) {
-                positions[i][j] = logger.addEntry(i, generateEntry(i, j).nioBuffer());
+                positions[i][j] = logger.addEntry((long) i, generateEntry(i, j).nioBuffer());
             }
             logger.flush();
         }
@@ -150,7 +177,7 @@ public void testMissingLogId() throws Exception {
             EntryLogger logger = new EntryLogger(conf,
                     bookie.getLedgerDirsManager());
             for (int j = 0; j < numEntries; j++) {
-                positions[i][j] = logger.addEntry(i, generateEntry(i, j).nioBuffer());
+                positions[i][j] = logger.addEntry((long) i, generateEntry(i, j).nioBuffer());
             }
             logger.flush();
         }
@@ -209,6 +236,7 @@ public void testAddEntryFailureOnDiskFull() throws Exception {
         File ledgerDir1 = createTempDir("bkTest", ".dir");
         File ledgerDir2 = createTempDir("bkTest", ".dir");
         ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+        conf.setLedgerStorageClass(InterleavedLedgerStorage.class.getName());
         conf.setJournalDirName(ledgerDir1.toString());
         conf.setLedgerDirNames(new String[] { ledgerDir1.getAbsolutePath(),
                 ledgerDir2.getAbsolutePath() });
@@ -225,7 +253,8 @@ public void testAddEntryFailureOnDiskFull() throws Exception {
         ledgerStorage.addEntry(generateEntry(1, 1));
         ledgerStorage.addEntry(generateEntry(2, 1));
         // Add entry with disk full failure simulation
-        bookie.getLedgerDirsManager().addToFilledDirs(entryLogger.currentDir);
+        bookie.getLedgerDirsManager().addToFilledDirs(entryLogger.entryLogManager
+                .getCurrentLogForLedger(EntryLogger.INVALID_LID).getLogFile().getParentFile());
         ledgerStorage.addEntry(generateEntry(3, 1));
         // Verify written entries
         Assert.assertTrue(0 == generateEntry(1, 1).compareTo(ledgerStorage.getEntry(1, 1)));
@@ -252,11 +281,11 @@ public void testRecoverFromLedgersMap() throws Exception {
 
         // create some entries
         EntryLogger logger = ((InterleavedLedgerStorage) bookie.ledgerStorage).entryLogger;
-        logger.addEntry(1, generateEntry(1, 1).nioBuffer());
-        logger.addEntry(3, generateEntry(3, 1).nioBuffer());
-        logger.addEntry(2, generateEntry(2, 1).nioBuffer());
-        logger.addEntry(1, generateEntry(1, 2).nioBuffer());
-        logger.rollLog();
+        logger.addEntry(1L, generateEntry(1, 1).nioBuffer());
+        logger.addEntry(3L, generateEntry(3, 1).nioBuffer());
+        logger.addEntry(2L, generateEntry(2, 1).nioBuffer());
+        logger.addEntry(1L, generateEntry(1, 2).nioBuffer());
+        logger.createNewLog(EntryLogger.INVALID_LID);
         logger.flushRotatedLogs();
 
         EntryLogMetadata meta = logger.extractEntryLogMetadataFromIndex(0L);
@@ -287,11 +316,11 @@ public void testRecoverFromLedgersMapOnV0EntryLog() throws Exception {
 
         // create some entries
         EntryLogger logger = ((InterleavedLedgerStorage) bookie.ledgerStorage).entryLogger;
-        logger.addEntry(1, generateEntry(1, 1).nioBuffer());
-        logger.addEntry(3, generateEntry(3, 1).nioBuffer());
-        logger.addEntry(2, generateEntry(2, 1).nioBuffer());
-        logger.addEntry(1, generateEntry(1, 2).nioBuffer());
-        logger.rollLog();
+        logger.addEntry(1L, generateEntry(1, 1).nioBuffer());
+        logger.addEntry(3L, generateEntry(3, 1).nioBuffer());
+        logger.addEntry(2L, generateEntry(2, 1).nioBuffer());
+        logger.addEntry(1L, generateEntry(1, 2).nioBuffer());
+        logger.createNewLog(EntryLogger.INVALID_LID);
 
         // Rewrite the entry log header to be on V0 format
         File f = new File(curDir, "0.log");
@@ -339,9 +368,10 @@ public void testPreAllocateLog() throws Exception {
         Bookie bookie = new Bookie(conf);
         // create a logger whose initialization phase allocating a new entry log
         EntryLogger logger = ((InterleavedLedgerStorage) bookie.ledgerStorage).entryLogger;
+        logger.createNewLog(EntryLogger.INVALID_LID);
         assertNotNull(logger.getEntryLoggerAllocator().getPreallocationFuture());
 
-        logger.addEntry(1, generateEntry(1, 1).nioBuffer());
+        logger.addEntry(1L, generateEntry(1, 1).nioBuffer());
         // the Future<BufferedLogChannel> is not null all the time
         assertNotNull(logger.getEntryLoggerAllocator().getPreallocationFuture());
 
@@ -352,9 +382,10 @@ public void testPreAllocateLog() throws Exception {
         Bookie bookie2 = new Bookie(conf2);
         // create a logger
         EntryLogger logger2 = ((InterleavedLedgerStorage) bookie2.ledgerStorage).entryLogger;
+        logger2.createNewLog(EntryLogger.INVALID_LID);
         assertNull(logger2.getEntryLoggerAllocator().getPreallocationFuture());
 
-        logger2.addEntry(2, generateEntry(1, 1).nioBuffer());
+        logger2.addEntry(2L, generateEntry(1, 1).nioBuffer());
 
         // the Future<BufferedLogChannel> is null all the time
         assertNull(logger2.getEntryLoggerAllocator().getPreallocationFuture());
@@ -379,16 +410,1088 @@ public void testGetEntryLogsSet() throws Exception {
         // create some entries
         EntryLogger logger = ((InterleavedLedgerStorage) bookie.ledgerStorage).entryLogger;
 
+        assertEquals(Sets.newHashSet(), logger.getEntryLogsSet());
+
+        logger.createNewLog(EntryLogger.INVALID_LID);
+        logger.flushRotatedLogs();
+
         assertEquals(Sets.newHashSet(0L, 1L), logger.getEntryLogsSet());
 
-        logger.rollLog();
+        logger.createNewLog(EntryLogger.INVALID_LID);
         logger.flushRotatedLogs();
 
         assertEquals(Sets.newHashSet(0L, 1L, 2L), logger.getEntryLogsSet());
+    }
 
-        logger.rollLog();
-        logger.flushRotatedLogs();
 
-        assertEquals(Sets.newHashSet(0L, 1L, 2L, 3L), logger.getEntryLogsSet());
+    /**
+     * Test to verify the leastUnflushedLogId logic in EntryLogsStatus.
+     */
+    @Test(timeout = 60000)
+    public void testEntryLoggersRecentEntryLogsStatus() throws Exception {
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+        conf.setLedgerDirNames(createAndGetLedgerDirs(2));
+        LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
+
+        EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager);
+        EntryLogger.RecentEntryLogsStatus recentlyCreatedLogsStatus = entryLogger.recentlyCreatedEntryLogsStatus;
+
+        recentlyCreatedLogsStatus.createdEntryLog(0L);
+        Assert.assertEquals("entryLogger's leastUnflushedLogId ", 0L, entryLogger.getLeastUnflushedLogId());
+        recentlyCreatedLogsStatus.flushRotatedEntryLog(0L);
+        // since we marked entrylog - 0 as rotated, LeastUnflushedLogId would be previous rotatedlog+1
+        Assert.assertEquals("entryLogger's leastUnflushedLogId ", 1L, entryLogger.getLeastUnflushedLogId());
+        recentlyCreatedLogsStatus.createdEntryLog(1L);
+        Assert.assertEquals("entryLogger's leastUnflushedLogId ", 1L, entryLogger.getLeastUnflushedLogId());
+        recentlyCreatedLogsStatus.createdEntryLog(2L);
+        recentlyCreatedLogsStatus.createdEntryLog(3L);
+        recentlyCreatedLogsStatus.createdEntryLog(4L);
+        Assert.assertEquals("entryLogger's leastUnflushedLogId ", 1L, entryLogger.getLeastUnflushedLogId());
+        recentlyCreatedLogsStatus.flushRotatedEntryLog(1L);
+        Assert.assertEquals("entryLogger's leastUnflushedLogId ", 2L, entryLogger.getLeastUnflushedLogId());
+        recentlyCreatedLogsStatus.flushRotatedEntryLog(3L);
+        // here though we rotated entrylog-3, entrylog-2 is not yet rotated so
+        // LeastUnflushedLogId should be still 2
+        Assert.assertEquals("entryLogger's leastUnflushedLogId ", 2L, entryLogger.getLeastUnflushedLogId());
+        recentlyCreatedLogsStatus.flushRotatedEntryLog(2L);
+        // entrylog-3 is already rotated, so leastUnflushedLogId should be 4
+        Assert.assertEquals("entryLogger's leastUnflushedLogId ", 4L, entryLogger.getLeastUnflushedLogId());
+        recentlyCreatedLogsStatus.flushRotatedEntryLog(4L);
+        Assert.assertEquals("entryLogger's leastUnflushedLogId ", 5L, entryLogger.getLeastUnflushedLogId());
+        recentlyCreatedLogsStatus.createdEntryLog(5L);
+        recentlyCreatedLogsStatus.createdEntryLog(7L);
+        recentlyCreatedLogsStatus.createdEntryLog(9L);
+        Assert.assertEquals("entryLogger's leastUnflushedLogId ", 5L, entryLogger.getLeastUnflushedLogId());
+        recentlyCreatedLogsStatus.flushRotatedEntryLog(5L);
+        // since we marked entrylog-5 as rotated, LeastUnflushedLogId would be previous rotatedlog+1
+        Assert.assertEquals("entryLogger's leastUnflushedLogId ", 6L, entryLogger.getLeastUnflushedLogId());
+        recentlyCreatedLogsStatus.flushRotatedEntryLog(7L);
+        Assert.assertEquals("entryLogger's leastUnflushedLogId ", 8L, entryLogger.getLeastUnflushedLogId());
+    }
+
+    /*
+     * tests logic of EntryLogManager in EntryLogger with EntryLogPerLedger enabled
+     */
+    @Test(timeout = 60000)
+    public void testEntryLogManager() throws Exception {
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+        conf.setEntryLogFilePreAllocationEnabled(true);
+        conf.setEntryLogPerLedgerEnabled(true);
+        conf.setLedgerDirNames(createAndGetLedgerDirs(2));
+        LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
+
+        EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager);
+        EntryLogManager entryLogManager = entryLogger.entryLogManager;
+
+        Assert.assertEquals("Number of current active EntryLogs ", 0, entryLogManager.getCopyOfCurrentLogs().size());
+        Assert.assertEquals("Number of Rotated Logs ", 0, entryLogManager.getCopyOfRotatedLogChannels().size());
+
+        try {
+            entryLogManager.acquireLock(0L);
+            fail("EntryLogManager acquireLock for ledger 0 is supposed to fail,"
+                    + "since lock must not have been created yet");
+        } catch (NullPointerException npe) {
+            // expected, since lock must not have been created
+        }
+
+        int numOfLedgers = 5;
+        int numOfThreadsPerLedger = 10;
+        validateLockAcquireAndRelease(numOfLedgers, numOfThreadsPerLedger, entryLogManager, true);
+        validateLockAcquireAndRelease(numOfLedgers, numOfThreadsPerLedger, entryLogManager, false);
+
+        for (long i = 0; i < numOfLedgers; i++) {
+            entryLogManager.setCurrentLogForLedger(i, createDummyBufferedLogChannel(entryLogger, i, conf));
+        }
+
+        for (long i = 0; i < numOfLedgers; i++) {
+            Assert.assertEquals("LogChannel for ledger: " + i, entryLogManager.getCurrentLogIfPresent(i),
+                    entryLogManager.getCurrentLogForLedger(i));
+        }
+
+        Assert.assertEquals("Number of current active EntryLogs ", numOfLedgers,
+                entryLogManager.getCopyOfCurrentLogs().size());
+        Assert.assertEquals("Number of Rotated Logs ", 0, entryLogManager.getCopyOfRotatedLogChannels().size());
+
+        for (long i = 0; i < numOfLedgers; i++) {
+            entryLogManager.setCurrentLogForLedger(i,
+                    createDummyBufferedLogChannel(entryLogger, numOfLedgers + i, conf));
+        }
+
+        /*
+         * since new entryLogs are set for all the ledgers, previous entrylogs would be added to rotatedLogChannels
+         */
+        Assert.assertEquals("Number of current active EntryLogs ", numOfLedgers,
+                entryLogManager.getCopyOfCurrentLogs().size());
+        Assert.assertEquals("Number of Rotated Logs ", numOfLedgers,
+                entryLogManager.getCopyOfRotatedLogChannels().size());
+
+        for (long i = 0; i < numOfLedgers; i++) {
+            entryLogManager.setCurrentLogForLedger(i,
+                    createDummyBufferedLogChannel(entryLogger, 2 * numOfLedgers + i, conf));
+        }
+
+        /*
+         * again since new entryLogs are set for all the ledgers, previous entrylogs would be added to
+         * rotatedLogChannels
+         */
+        Assert.assertEquals("Number of current active EntryLogs ", numOfLedgers,
+                entryLogManager.getCopyOfCurrentLogs().size());
+        Assert.assertEquals("Number of Rotated Logs ", 2 * numOfLedgers,
+                entryLogManager.getCopyOfRotatedLogChannels().size());
+
+        for (BufferedLogChannel logChannel : entryLogManager.getCopyOfRotatedLogChannels()) {
+            entryLogManager.removeFromRotatedLogChannels(logChannel);
+        }
+        Assert.assertEquals("Number of Rotated Logs ", 0, entryLogManager.getCopyOfRotatedLogChannels().size());
+
+        // entrylogid is sequential
+        for (long i = 0; i < numOfLedgers; i++) {
+            assertEquals("EntryLogid for Ledger " + i, 2 * numOfLedgers + i,
+                    entryLogManager.getCurrentLogForLedger(i).getLogId());
+        }
+
+        for (long i = 2 * numOfLedgers; i < (3 * numOfLedgers); i++) {
+            assertTrue("EntryLog with logId: " + i + " should be present",
+                    entryLogManager.getCurrentLogIfPresent(i) != null);
+        }
+    }
+
+    private EntryLogger.BufferedLogChannel createDummyBufferedLogChannel(EntryLogger entryLogger, long logid,
+            ServerConfiguration servConf) throws IOException {
+        File tmpFile = File.createTempFile("entrylog", logid + "");
+        tmpFile.deleteOnExit();
+        FileChannel fc = FileChannel.open(tmpFile.toPath());
+        EntryLogger.BufferedLogChannel logChannel = entryLogger.new BufferedLogChannel(fc, 10, 10, logid, tmpFile,
+                servConf.getFlushIntervalInBytes());
+        return logChannel;
+    }
+
+    /*
+     * validates the concurrency aspect of entryLogManager.acquireLockByCreatingIfRequired/acquireLock/releaseLock
+     *
+     * Executor of fixedThreadPool of size 'numOfLedgers * numOfThreadsPerLedger' is created and the same number
+     * of tasks are submitted to the Executor. In each task, lock of that ledger is acquired and then released.
+     */
+    private void validateLockAcquireAndRelease(int numOfLedgers, int numOfThreadsPerLedger,
+            EntryLogManager entryLogManager, boolean acquireLockByCreatingOnly) throws InterruptedException {
+        ExecutorService tpe = Executors.newFixedThreadPool(numOfLedgers * numOfThreadsPerLedger);
+        CountDownLatch latchToStart = new CountDownLatch(1);
+        CountDownLatch latchToWait = new CountDownLatch(1);
+        AtomicInteger numberOfThreadsAcquiredLock = new AtomicInteger(0);
+        AtomicBoolean irptExceptionHappened = new AtomicBoolean(false);
+        Random rand = new Random();
+
+        for (int i = 0; i < numOfLedgers * numOfThreadsPerLedger; i++) {
+            long ledgerId = i % numOfLedgers;
+            tpe.submit(() -> {
+                try {
+                    latchToStart.await();
+                    if (acquireLockByCreatingOnly) {
+                        entryLogManager.acquireLockByCreatingIfRequired(ledgerId);
+                    } else {
+                        if (rand.nextBoolean()) {
+                            entryLogManager.acquireLock(ledgerId);
+                        } else {
+                            entryLogManager.acquireLockByCreatingIfRequired(ledgerId);
+                        }
+                    }
+                    numberOfThreadsAcquiredLock.incrementAndGet();
+                    latchToWait.await();
+                    entryLogManager.releaseLock(ledgerId);
+                } catch (InterruptedException e) {
+                    irptExceptionHappened.set(true);
+                }
+            });
+        }
+
+        assertEquals("Number Of Threads acquired Lock", 0, numberOfThreadsAcquiredLock.get());
+        latchToStart.countDown();
+        Thread.sleep(1000);
+        /*
+         * since there are only "numOfLedgers" ledgers, only "numOfLedgers" threads should have been able to acquire
+         * lock. After acquiring the lock there must be waiting on 'latchToWait' latch
+         */
+        assertEquals("Number Of Threads acquired Lock", numOfLedgers, numberOfThreadsAcquiredLock.get());
+        latchToWait.countDown();
+        Thread.sleep(2000);
+        assertEquals("Number Of Threads acquired Lock", numOfLedgers * numOfThreadsPerLedger,
+                numberOfThreadsAcquiredLock.get());
+    }
+
+    /*
+     * test EntryLogManager.EntryLogManagerForEntryLogPerLedger removes the
+     * ledger from its cache map if entry is not added to that ledger or its
+     * corresponding state is not accessed for more than evictionPeriod
+     *
+     * @throws Exception
+     */
+    @Test(timeout = 60000)
+    public void testEntryLogManagerExpiryRemoval() throws Exception {
+        int evictionPeriod = 1;
+
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+        conf.setEntryLogFilePreAllocationEnabled(false);
+        conf.setEntryLogPerLedgerEnabled(true);
+        conf.setLedgerDirNames(createAndGetLedgerDirs(2));
+        conf.setEntrylogMapAccessExpiryTimeInSeconds(evictionPeriod);
+        LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
+
+        EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager);
+        EntryLogManagerForEntryLogPerLedger entryLogManager =
+                (EntryLogManagerForEntryLogPerLedger) entryLogger.entryLogManager;
+
+        long ledgerId = 0L;
+
+        entryLogManager.acquireLockByCreatingIfRequired(ledgerId);
+        entryLogManager.releaseLock(ledgerId);
+        BufferedLogChannel logChannel = createDummyBufferedLogChannel(entryLogger, 0, conf);
+        entryLogManager.setCurrentLogForLedger(ledgerId, logChannel);
+
+        BufferedLogChannel currentLogForLedger = entryLogManager.getCurrentLogForLedger(ledgerId);
+        assertEquals("LogChannel for ledger " + ledgerId + " should match", logChannel, currentLogForLedger);
+
+        Thread.sleep(evictionPeriod * 1000 + 100);
+        entryLogManager.doEntryLogMapCleanup();
+
+        /*
+         * since for more than evictionPeriod, that ledger is not accessed and cache is cleaned up, mapping for that
+         * ledger should not be available anymore
+         */
+        currentLogForLedger = entryLogManager.getCurrentLogForLedger(ledgerId);
+        assertEquals("LogChannel for ledger " + ledgerId + " should be null", null, currentLogForLedger);
+        Assert.assertEquals("Number of current active EntryLogs ", 0, entryLogManager.getCopyOfCurrentLogs().size());
+        Assert.assertEquals("Number of rotated EntryLogs ", 1, entryLogManager.getCopyOfRotatedLogChannels().size());
+        Assert.assertTrue("CopyOfRotatedLogChannels should contain the created LogChannel",
+                entryLogManager.getCopyOfRotatedLogChannels().contains(logChannel));
+
+        try {
+            entryLogManager.acquireLock(ledgerId);
+            fail("EntryLogManager acquireLock for ledger is supposed to fail, since mapentry must have been evicted");
+        } catch (NullPointerException npe) {
+            // expected, since lock must not have been created
+        }
+    }
+
+    /**
+     * test EntryLogManager.EntryLogManagerForEntryLogPerLedger doesn't removes
+     * the ledger from its cache map if ledger's corresponding state is accessed
+     * within the evictionPeriod.
+     *
+     * @throws Exception
+     */
+    @Test(timeout = 60000)
+    public void testExpiryRemovalByAccessingOnAnotherThread() throws Exception {
+        int evictionPeriod = 1;
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+        conf.setEntryLogFilePreAllocationEnabled(false);
+        conf.setEntryLogPerLedgerEnabled(true);
+        conf.setLedgerDirNames(createAndGetLedgerDirs(2));
+        conf.setEntrylogMapAccessExpiryTimeInSeconds(evictionPeriod);
+        LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
+
+        EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager);
+        EntryLogManagerForEntryLogPerLedger entryLogManager =
+                (EntryLogManagerForEntryLogPerLedger) entryLogger.entryLogManager;
+
+        long ledgerId = 0L;
+
+        entryLogManager.acquireLockByCreatingIfRequired(ledgerId);
+        entryLogManager.releaseLock(ledgerId);
+        BufferedLogChannel newLogChannel = createDummyBufferedLogChannel(entryLogger, 1, conf);
+        entryLogManager.setCurrentLogForLedger(ledgerId, newLogChannel);
+
+        Thread t = new Thread() {
+            public void run() {
+                try {
+                    Thread.sleep((evictionPeriod * 1000) / 2);
+                    entryLogManager.getCurrentLogForLedger(ledgerId);
+                } catch (InterruptedException e) {
+                }
+            }
+        };
+
+        t.start();
+        Thread.sleep(evictionPeriod * 1000 + 100);
+        entryLogManager.doEntryLogMapCleanup();
+
+        /*
+         * in this scenario, that ledger is accessed by other thread during
+         * eviction period time, so it should not be evicted.
+         */
+        BufferedLogChannel currentLogForLedger = entryLogManager.getCurrentLogForLedger(ledgerId);
+        assertEquals("LogChannel for ledger " + ledgerId, newLogChannel, currentLogForLedger);
+        Assert.assertEquals("Number of current active EntryLogs ", 1, entryLogManager.getCopyOfCurrentLogs().size());
+        Assert.assertEquals("Number of rotated EntryLogs ", 0, entryLogManager.getCopyOfRotatedLogChannels().size());
+    }
+
+    /**
+     * test EntryLogManager.EntryLogManagerForEntryLogPerLedger removes the
+     * ledger from its cache map if entry is not added to that ledger or its
+     * corresponding state is not accessed for more than evictionPeriod. In this
+     * testcase we try to call unrelated methods or access state of other
+     * ledgers within the eviction period.
+     *
+     * @throws Exception
+     */
+    @Test(timeout = 60000)
+    public void testExpiryRemovalByAccessingNonCacheRelatedMethods() throws Exception {
+        int evictionPeriod = 1;
+
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+        conf.setEntryLogFilePreAllocationEnabled(false);
+        conf.setEntryLogPerLedgerEnabled(true);
+        conf.setLedgerDirNames(createAndGetLedgerDirs(2));
+        conf.setEntrylogMapAccessExpiryTimeInSeconds(evictionPeriod);
+        LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
+
+        EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager);
+        EntryLogManagerForEntryLogPerLedger entryLogManager =
+                (EntryLogManagerForEntryLogPerLedger) entryLogger.entryLogManager;
+
+        long ledgerId = 0L;
+
+        entryLogManager.acquireLockByCreatingIfRequired(ledgerId);
+        entryLogManager.releaseLock(ledgerId);
+        BufferedLogChannel newLogChannel = createDummyBufferedLogChannel(entryLogger, 1, conf);
+        entryLogManager.setCurrentLogForLedger(ledgerId, newLogChannel);
+
+        AtomicBoolean exceptionOccured = new AtomicBoolean(false);
+        Thread t = new Thread() {
+            public void run() {
+                try {
+                    Thread.sleep(500);
+                    /*
+                     * any of the following operations should not access entry
+                     * of 'ledgerId' in the cache
+                     */
+                    entryLogManager.getCopyOfCurrentLogs();
+                    entryLogManager.getCopyOfRotatedLogChannels();
+                    entryLogManager.getCurrentLogIfPresent(newLogChannel.getLogId());
+                    entryLogManager.removeFromRotatedLogChannels(createDummyBufferedLogChannel(entryLogger, 500, conf));
+                    entryLogManager.getDirForNextEntryLog(ledgerDirsManager.getWritableLedgerDirs());
+                    long newLedgerId = 100;
+                    entryLogManager.acquireLockByCreatingIfRequired(newLedgerId);
+                    entryLogManager.releaseLock(newLedgerId);
+                    BufferedLogChannel logChannelForNewLedger =
+                            createDummyBufferedLogChannel(entryLogger, newLedgerId, conf);
+                    entryLogManager.setCurrentLogForLedger(newLedgerId, logChannelForNewLedger);
+                    entryLogManager.getCurrentLogIfPresent(newLedgerId);
+                } catch (Exception e) {
+                    LOG.error("Got Exception in thread", e);
+                    exceptionOccured.set(true);
+                }
+            }
+        };
+
+        t.start();
+        Thread.sleep(evictionPeriod * 1000 + 100);
+        entryLogManager.doEntryLogMapCleanup();
+        Assert.assertFalse("Exception occured in thread, which is not expected", exceptionOccured.get());
+
+        /*
+         * since for more than evictionPeriod, that ledger is not accessed and cache is cleaned up, mapping for that
+         * ledger should not be available anymore
+         */
+        BufferedLogChannel currentLogForLedger = entryLogManager.getCurrentLogForLedger(ledgerId);
+        assertEquals("LogChannel for ledger " + ledgerId + " should be null", null, currentLogForLedger);
+        // expected number of current active entryLogs is 1 since we created entrylog for 'newLedgerId'
+        Assert.assertEquals("Number of current active EntryLogs ", 1, entryLogManager.getCopyOfCurrentLogs().size());
+        Assert.assertEquals("Number of rotated EntryLogs ", 1, entryLogManager.getCopyOfRotatedLogChannels().size());
+        Assert.assertTrue("CopyOfRotatedLogChannels should contain the created LogChannel",
+                entryLogManager.getCopyOfRotatedLogChannels().contains(newLogChannel));
+
+        try {
+            entryLogManager.acquireLock(ledgerId);
+            fail("EntryLogManager acquireLock for ledger is supposed to fail, since mapentry must have been evicted");
+        } catch (NullPointerException npe) {
+            // expected, since lock must not have been created
+        }
+    }
+
+    /*
+     * testing EntryLogger functionality (addEntry/createNewLog/flush) and EntryLogManager with entryLogPerLedger
+     * enabled
+     */
+    @Test(timeout = 60000)
+    public void testEntryLoggerWithEntryLogPerLedgerEnabled() throws Exception {
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+        conf.setEntryLogPerLedgerEnabled(true);
+        conf.setFlushIntervalInBytes(10000000);
+        conf.setLedgerDirNames(createAndGetLedgerDirs(2));
+        LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
+        EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager);
+        EntryLogManager entryLogManager = entryLogger.entryLogManager;
+        Assert.assertEquals("EntryLogManager class type", EntryLogger.EntryLogManagerForEntryLogPerLedger.class,
+                entryLogManager.getClass());
+
+        int numOfActiveLedgers = 20;
+        int numEntries = 5;
+
+        for (int j = 0; j < numEntries; j++) {
+            for (long i = 0; i < numOfActiveLedgers; i++) {
+                entryLogger.addEntry(i, generateEntry(i, j));
+            }
+        }
+
+        for (long i = 0; i < numOfActiveLedgers; i++) {
+            BufferedLogChannel logChannel =  entryLogManager.getCurrentLogForLedger(i);
+            Assert.assertTrue("unpersistedBytes should be greater than LOGFILE_HEADER_SIZE",
+                    logChannel.getUnpersistedBytes() > EntryLogger.LOGFILE_HEADER_SIZE);
+        }
+
+        for (long i = 0; i < numOfActiveLedgers; i++) {
+            entryLogger.createNewLog(i);
+        }
+
+        /*
+         * since we created new entrylog for all the activeLedgers, entrylogs of all the ledgers
+         * should be rotated and hence the size of copyOfRotatedLogChannels should be numOfActiveLedgers
+         */
+        Set<BufferedLogChannel> rotatedLogs = entryLogManager.getCopyOfRotatedLogChannels();
+        Assert.assertEquals("Number of rotated entrylogs", numOfActiveLedgers, rotatedLogs.size());
+
+        /*
+         * Since newlog is created for all slots, so they are moved to rotated logs and hence unpersistedBytes of all
+         * the slots should be just EntryLogger.LOGFILE_HEADER_SIZE
+         *
+         */
+        for (long i = 0; i < numOfActiveLedgers; i++) {
+            BufferedLogChannel logChannel = entryLogManager.getCurrentLogForLedger(i);
+            Assert.assertEquals("unpersistedBytes should be LOGFILE_HEADER_SIZE", EntryLogger.LOGFILE_HEADER_SIZE,
+                    logChannel.getUnpersistedBytes());
+        }
+
+        for (int j = numEntries; j < 2 * numEntries; j++) {
+            for (long i = 0; i < numOfActiveLedgers; i++) {
+                entryLogger.addEntry(i, generateEntry(i, j));
+            }
+        }
+
+        for (long i = 0; i < numOfActiveLedgers; i++) {
+            BufferedLogChannel logChannel =  entryLogManager.getCurrentLogForLedger(i);
+            Assert.assertTrue("unpersistedBytes should be greater than LOGFILE_HEADER_SIZE",
+                    logChannel.getUnpersistedBytes() > EntryLogger.LOGFILE_HEADER_SIZE);
+        }
+
+        Assert.assertEquals("LeastUnflushedloggerID", 0, entryLogger.getLeastUnflushedLogId());
+
+        /*
+         * here flush is called so all the rotatedLogChannels should be file closed and there shouldn't be any
+         * rotatedlogchannel and also leastUnflushedLogId should be advanced to numOfActiveLedgers
+         */
+        entryLogger.flush();
+        Assert.assertEquals("Number of rotated entrylogs", 0, entryLogManager.getCopyOfRotatedLogChannels().size());
+        Assert.assertEquals("LeastUnflushedloggerID", numOfActiveLedgers, entryLogger.getLeastUnflushedLogId());
+
+        /*
+         * after flush (flushCurrentLogs) unpersistedBytes should be 0.
+         */
+        for (long i = 0; i < numOfActiveLedgers; i++) {
+            BufferedLogChannel logChannel =  entryLogManager.getCurrentLogForLedger(i);
+            Assert.assertEquals("unpersistedBytes should be 0", 0L, logChannel.getUnpersistedBytes());
+        }
+    }
+
+    /*
+     * with entryLogPerLedger enabled, create multiple entrylogs, add entries of ledgers and read them before and after
+     * flush
+     */
+    @Test(timeout = 60000)
+    public void testReadAddCallsOfMultipleEntryLogs() throws Exception {
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+        conf.setEntryLogPerLedgerEnabled(true);
+        conf.setLedgerDirNames(createAndGetLedgerDirs(2));
+        // pre allocation enabled
+        conf.setEntryLogFilePreAllocationEnabled(true);
+        LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
+
+        EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager);
+
+        int numOfActiveLedgers = 10;
+        int numEntries = 10;
+        long[][] positions = new long[numOfActiveLedgers][];
+        for (int i = 0; i < numOfActiveLedgers; i++) {
+            positions[i] = new long[numEntries];
+        }
+
+        /*
+         * addentries to the ledgers
+         */
+        for (int j = 0; j < numEntries; j++) {
+            for (int i = 0; i < numOfActiveLedgers; i++) {
+                positions[i][j] = entryLogger.addEntry((long) i, generateEntry(i, j));
+                long entryLogId = (positions[i][j] >> 32L);
+                /**
+                 *
+                 * Though EntryLogFilePreAllocation is enabled, Since things are not done concurrently here,
+                 * entryLogIds will be sequential.
+                 */
+                Assert.assertEquals("EntryLogId for ledger: " + i, i, entryLogId);
+            }
+        }
+
+        /*
+         * read the entries which are written
+         */
+        for (int j = 0; j < numEntries; j++) {
+            for (int i = 0; i < numOfActiveLedgers; i++) {
+                String expectedValue = "ledger-" + i + "-" + j;
+                ByteBuf buf = entryLogger.readEntry(i, j, positions[i][j]);
+                long ledgerId = buf.readLong();
+                long entryId = buf.readLong();
+                byte[] data = new byte[buf.readableBytes()];
+                buf.readBytes(data);
+                assertEquals("LedgerId ", i, ledgerId);
+                assertEquals("EntryId ", j, entryId);
+                assertEquals("Entry Data ", expectedValue, new String(data));
+            }
+        }
+
+        for (long i = 0; i < numOfActiveLedgers; i++) {
+            entryLogger.createNewLog(i);
+        }
+
+        entryLogger.flushRotatedLogs();
+
+        // reading after flush of rotatedlogs
+        for (int j = 0; j < numEntries; j++) {
+            for (int i = 0; i < numOfActiveLedgers; i++) {
+                String expectedValue = "ledger-" + i + "-" + j;
+                ByteBuf buf = entryLogger.readEntry(i, j, positions[i][j]);
+                long ledgerId = buf.readLong();
+                long entryId = buf.readLong();
+                byte[] data = new byte[buf.readableBytes()];
+                buf.readBytes(data);
+                assertEquals("LedgerId ", i, ledgerId);
+                assertEquals("EntryId ", j, entryId);
+                assertEquals("Entry Data ", expectedValue, new String(data));
+            }
+        }
+    }
+
+
+    class WriteTask implements Callable<Boolean> {
+        long ledgerId;
+        int entryId;
+        AtomicLongArray positions;
+        int indexInPositions;
+        EntryLogger entryLogger;
+
+        WriteTask(long ledgerId, int entryId, EntryLogger entryLogger, AtomicLongArray positions,
+                int indexInPositions) {
+            this.ledgerId = ledgerId;
+            this.entryId = entryId;
+            this.entryLogger = entryLogger;
+            this.positions = positions;
+            this.indexInPositions = indexInPositions;
+        }
+
+        @Override
+        public Boolean call() {
+            try {
+                positions.set(indexInPositions, entryLogger.addEntry(ledgerId, generateEntry(ledgerId, entryId)));
+            } catch (IOException e) {
+                LOG.error("Got Exception for AddEntry call. LedgerId: " + ledgerId + " entryId: " + entryId, e);
+                return false;
+            }
+            return true;
+        }
+    }
+
+    class ReadTask implements Callable<Boolean> {
+        long ledgerId;
+        int entryId;
+        long position;
+        EntryLogger entryLogger;
+
+        ReadTask(long ledgerId, int entryId, long position, EntryLogger entryLogger) {
+            this.ledgerId = ledgerId;
+            this.entryId = entryId;
+            this.position = position;
+            this.entryLogger = entryLogger;
+        }
+
+        @Override
+        public Boolean call() {
+            try {
+                String expectedValue = "ledger-" + ledgerId + "-" + entryId;
+                ByteBuf buf = entryLogger.readEntry(ledgerId, entryId, position);
+                long actualLedgerId = buf.readLong();
+                long actualEntryId = buf.readLong();
+                byte[] data = new byte[buf.readableBytes()];
+                buf.readBytes(data);
+                if (ledgerId != actualLedgerId) {
+                    LOG.error("For ledgerId: {} entryId: {} readRequest, actual ledgerId: {}", ledgerId, entryId,
+                            actualLedgerId);
+                    return false;
+                }
+                if (entryId != actualEntryId) {
+                    LOG.error("For ledgerId: {} entryId: {} readRequest, actual entryId: {}", ledgerId, entryId,
+                            actualEntryId);
+                    return false;
+                }
+                if (!expectedValue.equals(new String(data))) {
+                    LOG.error("For ledgerId: {} entryId: {} readRequest, actual Data: {}", ledgerId, entryId,
+                            new String(data));
+                    return false;
+                }
+            } catch (IOException e) {
+                LOG.error("Got Exception for ReadEntry call. LedgerId: " + ledgerId + " entryId: " + entryId, e);
+                return false;
+            }
+            return true;
+        }
+    }
+
+    class FlushCurrentLogsTask implements Callable<Boolean> {
+        EntryLogger entryLogger;
+
+        FlushCurrentLogsTask(EntryLogger entryLogger) {
+            this.entryLogger = entryLogger;
+        }
+
+        @Override
+        public Boolean call() throws Exception {
+            try {
+                entryLogger.flushCurrentLogs();
+                return true;
+            } catch (IOException e) {
+                LOG.error("Got Exception while trying to flushCurrentLogs");
+                return false;
+            }
+        }
+    }
+
+    /*
+     * test concurrent write and flush operations and then concurrent read operations with entrylogperledger enabled
+     *
+     * @throws Exception
+     */
+    @Test(timeout = 60000)
+    public void testConcurrentWriteFlushAndReadCallsOfMultipleEntryLogs() throws Exception {
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+        conf.setEntryLogPerLedgerEnabled(true);
+        conf.setFlushIntervalInBytes(1000 * 25);
+        conf.setLedgerDirNames(createAndGetLedgerDirs(3));
+        LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
+
+        EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager);
+        int numOfActiveLedgers = 15;
+        int numEntries = 2000;
+        final AtomicLongArray positions = new AtomicLongArray(numOfActiveLedgers * numEntries);
+        ExecutorService executor = Executors.newFixedThreadPool(40);
+
+        List<Callable<Boolean>> writeAndFlushTasks = new ArrayList<Callable<Boolean>>();
+        for (int j = 0; j < numEntries; j++) {
+            for (int i = 0; i < numOfActiveLedgers; i++) {
+                writeAndFlushTasks.add(new WriteTask(i, j, entryLogger, positions, i * numEntries + j));
+            }
+        }
+
+        // add flushCurrentLogs tasks also
+        for (int i = 0; i < writeAndFlushTasks.size() / 100; i++) {
+            writeAndFlushTasks.add(i * 100, new FlushCurrentLogsTask(entryLogger));
+        }
+
+        // invoke all those write and flushcurrentlogs tasks all at once concurrently and set timeout
+        // 6 seconds for them to complete
+        List<Future<Boolean>> writeAndFlushTasksFutures = executor.invokeAll(writeAndFlushTasks, 6, TimeUnit.SECONDS);
+        for (int i = 0; i < writeAndFlushTasks.size(); i++) {
+            Future<Boolean> future = writeAndFlushTasksFutures.get(i);
+            Callable<Boolean> task = writeAndFlushTasks.get(i);
+            if (task instanceof WriteTask) {
+                WriteTask writeTask = (WriteTask) task;
+                long ledgerId = writeTask.ledgerId;
+                int entryId = writeTask.entryId;
+                Assert.assertTrue("WriteTask should have been completed successfully ledgerId: " + ledgerId
+                        + " entryId: " + entryId, future.isDone() && (!future.isCancelled()));
+                Assert.assertTrue(
+                        "Position for ledgerId: " + ledgerId + " entryId: " + entryId + " should have been set",
+                        future.get());
+            } else {
+                Assert.assertTrue("FlushTask should have been completed successfully. Index " + i,
+                        future.isDone() && (!future.isCancelled()));
+                Assert.assertTrue("FlushTask should have been succeded without exception. Index " + i, future.get());
+            }
+        }
+
+        List<ReadTask> readTasks = new ArrayList<ReadTask>();
+        for (int j = 0; j < numEntries; j++) {
+            for (int i = 0; i < numOfActiveLedgers; i++) {
+                readTasks.add(new ReadTask(i, j, positions.get(i * numEntries + j), entryLogger));
+            }
+        }
+
+        // invoke all those readtasks all at once concurrently and set timeout
+        // 6 seconds for them to complete
+        List<Future<Boolean>> readTasksFutures = executor.invokeAll(readTasks, 6, TimeUnit.SECONDS);
+        for (int i = 0; i < numOfActiveLedgers * numEntries; i++) {
+            Future<Boolean> future = readTasksFutures.get(i);
+            long ledgerId = readTasks.get(i).ledgerId;
+            int entryId = readTasks.get(i).entryId;
+            Assert.assertTrue(
+                    "ReadTask should have been completed successfully ledgerId: " + ledgerId + " entryId: " + entryId,
+                    future.isDone() && (!future.isCancelled()));
+            Assert.assertTrue("ReadEntry of ledgerId: " + ledgerId + " entryId: " + entryId
+                    + " should have been completed successfully", future.get());
+        }
+    }
+
+    /*
+     * test concurrent read operations for entries which are flushed to entrylogs with entryLogPerLedger enabled
+     */
+    @Test(timeout = 60000)
+    public void testConcurrentReadCallsAfterEntryLogsAreRotated() throws Exception {
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+        conf.setEntryLogPerLedgerEnabled(true);
+        conf.setFlushIntervalInBytes(1000 * 25);
+        conf.setLedgerDirNames(createAndGetLedgerDirs(3));
+        LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
+
+        EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager);
+        int numOfActiveLedgers = 15;
+        int numEntries = 2000;
+        final AtomicLongArray positions = new AtomicLongArray(numOfActiveLedgers * numEntries);
+
+        for (int i = 0; i < numOfActiveLedgers; i++) {
+            for (int j = 0; j < numEntries; j++) {
+                positions.set(i * numEntries + j, entryLogger.addEntry((long) i, generateEntry(i, j)));
+                long entryLogId = (positions.get(i * numEntries + j) >> 32L);
+                /**
+                 *
+                 * Though EntryLogFilePreAllocation is enabled, Since things are not done concurrently here, entryLogIds
+                 * will be sequential.
+                 */
+                Assert.assertEquals("EntryLogId for ledger: " + i, i, entryLogId);
+            }
+        }
+
+        for (long i = 0; i < numOfActiveLedgers; i++) {
+            entryLogger.createNewLog(i);
+        }
+        entryLogger.flushRotatedLogs();
+
+        // reading after flush of rotatedlogs
+        ArrayList<ReadTask> readTasks = new ArrayList<ReadTask>();
+        for (int i = 0; i < numOfActiveLedgers; i++) {
+            for (int j = 0; j < numEntries; j++) {
+                readTasks.add(new ReadTask(i, j, positions.get(i * numEntries + j), entryLogger));
+            }
+        }
+
+        ExecutorService executor = Executors.newFixedThreadPool(40);
+        List<Future<Boolean>> readTasksFutures = executor.invokeAll(readTasks, 6, TimeUnit.SECONDS);
+        for (int i = 0; i < numOfActiveLedgers * numEntries; i++) {
+            Future<Boolean> future = readTasksFutures.get(i);
+            long ledgerId = readTasks.get(i).ledgerId;
+            int entryId = readTasks.get(i).entryId;
+            Assert.assertTrue(
+                    "ReadTask should have been completed successfully ledgerId: " + ledgerId + " entryId: " + entryId,
+                    future.isDone() && (!future.isCancelled()));
+            Assert.assertTrue("ReadEntry of ledgerId: " + ledgerId + " entryId: " + entryId
+                    + " should have been completed successfully", future.get());
+        }
+    }
+
+    String[] createAndGetLedgerDirs(int numOfLedgerDirs) throws IOException {
+        File ledgerDir;
+        File curDir;
+        String[] ledgerDirsPath = new String[numOfLedgerDirs];
+        for (int i = 0; i < numOfLedgerDirs; i++) {
+            ledgerDir = createTempDir("bkTest", ".dir");
+            curDir = Bookie.getCurrentDirectory(ledgerDir);
+            Bookie.checkDirectoryStructure(curDir);
+            ledgerDirsPath[i] = ledgerDir.getAbsolutePath();
+        }
+        return ledgerDirsPath;
+    }
+
+    /**
+     * testcase to validate when ledgerdirs become full and eventually all
+     * ledgerdirs become full. Later a ledgerdir becomes writable.
+     */
+    @Test(timeout = 60000)
+    public void testEntryLoggerAddEntryWhenLedgerDirsAreFull() throws Exception {
+        int numberOfLedgerDirs = 3;
+        List<File> ledgerDirs = new ArrayList<File>();
+        String[] ledgerDirsPath = new String[numberOfLedgerDirs];
+        List<File> curDirs = new ArrayList<File>();
+
+        File ledgerDir;
+        File curDir;
+        for (int i = 0; i < numberOfLedgerDirs; i++) {
+            ledgerDir = createTempDir("bkTest", ".dir").getAbsoluteFile();
+            curDir = Bookie.getCurrentDirectory(ledgerDir);
+            Bookie.checkDirectoryStructure(curDir);
+            ledgerDirs.add(ledgerDir);
+            ledgerDirsPath[i] = ledgerDir.getPath();
+            curDirs.add(curDir);
+        }
+
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+        // pre-allocation is disabled
+        conf.setEntryLogFilePreAllocationEnabled(false);
+        conf.setEntryLogPerLedgerEnabled(true);
+        conf.setLedgerDirNames(ledgerDirsPath);
+
+        LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
+
+        EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager);
+        EntryLogManager entryLogManager = entryLogger.entryLogManager;
+        Assert.assertEquals("EntryLogManager class type", EntryLogger.EntryLogManagerForEntryLogPerLedger.class,
+                entryLogManager.getClass());
+
+        entryLogger.addEntry(0L, generateEntry(0, 1));
+        entryLogger.addEntry(1L, generateEntry(1, 1));
+        entryLogger.addEntry(2L, generateEntry(2, 1));
+
+        File ledgerDirForLedger0 = entryLogManager.getCurrentLogForLedger(0L).getLogFile().getParentFile();
+        File ledgerDirForLedger1 = entryLogManager.getCurrentLogForLedger(1L).getLogFile().getParentFile();
+        File ledgerDirForLedger2 = entryLogManager.getCurrentLogForLedger(2L).getLogFile().getParentFile();
+
+        Set<File> ledgerDirsSet = new HashSet<File>();
+        ledgerDirsSet.add(ledgerDirForLedger0);
+        ledgerDirsSet.add(ledgerDirForLedger1);
+        ledgerDirsSet.add(ledgerDirForLedger2);
+
+        /*
+         * since there are 3 ledgerdirs, entrylogs for all the 3 ledgers should be in different ledgerdirs.
+         */
+        Assert.assertEquals("Current active LedgerDirs size", 3, ledgerDirs.size());
+        Assert.assertEquals("Number of rotated logchannels", 0, entryLogManager.getCopyOfRotatedLogChannels().size());
+
+        /*
+         * ledgerDirForLedger0 is added to filledDirs, for ledger0 new entrylog should not be created in
+         * ledgerDirForLedger0
+         */
+        ledgerDirsManager.addToFilledDirs(ledgerDirForLedger0);
+        addEntryAndValidateFolders(entryLogger, entryLogManager, 2, ledgerDirForLedger0, false, ledgerDirForLedger1,
+                ledgerDirForLedger2);
+        Assert.assertEquals("Number of rotated logchannels", 1, entryLogManager.getCopyOfRotatedLogChannels().size());
+
+        /*
+         * ledgerDirForLedger1 is also added to filledDirs, so for all the ledgers new entryLogs should be in
+         * ledgerDirForLedger2
+         */
+        ledgerDirsManager.addToFilledDirs(ledgerDirForLedger1);
+        addEntryAndValidateFolders(entryLogger, entryLogManager, 3, ledgerDirForLedger2, true, ledgerDirForLedger2,
+                ledgerDirForLedger2);
+        Assert.assertTrue("Number of rotated logchannels", (2 <= entryLogManager.getCopyOfRotatedLogChannels().size())
+                && (entryLogManager.getCopyOfRotatedLogChannels().size() <= 3));
+        int numOfRotatedLogChannels = entryLogManager.getCopyOfRotatedLogChannels().size();
+
+        /*
+         * since ledgerDirForLedger2 is added to filleddirs, all the dirs are full. If all the dirs are full then it
+         * will continue to use current entrylogs for new entries instead of creating new one. So for all the ledgers
+         * ledgerdirs should be same as before - ledgerDirForLedger2
+         */
+        ledgerDirsManager.addToFilledDirs(ledgerDirForLedger2);
+        addEntryAndValidateFolders(entryLogger, entryLogManager, 4, ledgerDirForLedger2, true, ledgerDirForLedger2,
+                ledgerDirForLedger2);
+        Assert.assertEquals("Number of rotated logchannels", numOfRotatedLogChannels,
+                entryLogManager.getCopyOfRotatedLogChannels().size());
+
+        /*
+         *  ledgerDirForLedger1 is added back to writableDirs, so new entrylog for all the ledgers should be created in
+         *  ledgerDirForLedger1
+         */
+        ledgerDirsManager.addToWritableDirs(ledgerDirForLedger1, true);
+        addEntryAndValidateFolders(entryLogger, entryLogManager, 4, ledgerDirForLedger1, true, ledgerDirForLedger1,
+                ledgerDirForLedger1);
+        Assert.assertEquals("Number of rotated logchannels", numOfRotatedLogChannels + 3,
+                entryLogManager.getCopyOfRotatedLogChannels().size());
+    }
+
+    /*
+     * in this method we add an entry and validate the ledgerdir of the
+     * currentLogForLedger against the provided expected ledgerDirs.
+     */
+    void addEntryAndValidateFolders(EntryLogger entryLogger, EntryLogManager entryLogManager, int entryId,
+            File expectedDirForLedger0, boolean equalsForLedger0, File expectedDirForLedger1,
+            File expectedDirForLedger2) throws IOException {
+        entryLogger.addEntry(0L, generateEntry(0, entryId));
+        entryLogger.addEntry(1L, generateEntry(1, entryId));
+        entryLogger.addEntry(2L, generateEntry(2, entryId));
+
+        if (equalsForLedger0) {
+            Assert.assertEquals("LedgerDir for ledger 0 after adding entry " + entryId, expectedDirForLedger0,
+                    entryLogManager.getCurrentLogForLedger(0L).getLogFile().getParentFile());
+        } else {
+            Assert.assertNotEquals("LedgerDir for ledger 0 after adding entry " + entryId, expectedDirForLedger0,
+                    entryLogManager.getCurrentLogForLedger(0L).getLogFile().getParentFile());
+        }
+        Assert.assertEquals("LedgerDir for ledger 1 after adding entry " + entryId, expectedDirForLedger1,
+                entryLogManager.getCurrentLogForLedger(1L).getLogFile().getParentFile());
+        Assert.assertEquals("LedgerDir for ledger 2 after adding entry " + entryId, expectedDirForLedger2,
+                entryLogManager.getCurrentLogForLedger(2L).getLogFile().getParentFile());
+    }
+
+    /*
+     * entries added using entrylogger with entryLogPerLedger enabled and the same entries are read using entrylogger
+     * with entryLogPerLedger disabled
+     */
+    @Test(timeout = 60000)
+    public void testSwappingEntryLogManagerFromEntryLogPerLedgerToSingle() throws Exception {
+        testSwappingEntryLogManager(true, false);
+    }
+
+    /*
+     * entries added using entrylogger with entryLogPerLedger disabled and the same entries are read using entrylogger
+     * with entryLogPerLedger enabled
+     */
+    @Test(timeout = 60000)
+    public void testSwappingEntryLogManagerFromSingleToEntryLogPerLedger() throws Exception {
+        testSwappingEntryLogManager(false, true);
+    }
+
+    public void testSwappingEntryLogManager(boolean initialEntryLogPerLedgerEnabled,
+            boolean laterEntryLogPerLedgerEnabled) throws Exception {
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+        conf.setEntryLogPerLedgerEnabled(initialEntryLogPerLedgerEnabled);
+        conf.setLedgerDirNames(createAndGetLedgerDirs(2));
+        // pre allocation enabled
+        conf.setEntryLogFilePreAllocationEnabled(true);
+        LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
+
+        EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager);
+        EntryLogManager entryLogManager = entryLogger.entryLogManager;
+        Assert.assertEquals("EntryLogManager class type",
+                initialEntryLogPerLedgerEnabled ? EntryLogger.EntryLogManagerForEntryLogPerLedger.class
+                        : EntryLogger.EntryLogManagerForSingleEntryLog.class,
+                entryLogManager.getClass());
+
+        int numOfActiveLedgers = 10;
+        int numEntries = 10;
+        long[][] positions = new long[numOfActiveLedgers][];
+        for (int i = 0; i < numOfActiveLedgers; i++) {
+            positions[i] = new long[numEntries];
+        }
+
+        /*
+         * addentries to the ledgers
+         */
+        for (int j = 0; j < numEntries; j++) {
+            for (int i = 0; i < numOfActiveLedgers; i++) {
+                positions[i][j] = entryLogger.addEntry((long) i, generateEntry(i, j));
+                long entryLogId = (positions[i][j] >> 32L);
+                if (initialEntryLogPerLedgerEnabled) {
+                    Assert.assertEquals("EntryLogId for ledger: " + i, i, entryLogId);
+                } else {
+                    Assert.assertEquals("EntryLogId for ledger: " + i, 0, entryLogId);
+                }
+            }
+        }
+
+        for (long i = 0; i < numOfActiveLedgers; i++) {
+            entryLogger.createNewLog(i);
+        }
+
+        /**
+         * since new entrylog is created for all the ledgers, the previous
+         * entrylogs must be rotated and with the following flushRotatedLogs
+         * call they should be forcewritten and file should be closed.
+         */
+        entryLogger.flushRotatedLogs();
+
+        /*
+         * new entrylogger and entryLogManager are created with
+         * 'laterEntryLogPerLedgerEnabled' conf
+         */
+        conf.setEntryLogPerLedgerEnabled(laterEntryLogPerLedgerEnabled);
+        LedgerDirsManager newLedgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
+        EntryLogger newEntryLogger = new EntryLogger(conf, newLedgerDirsManager);
+        EntryLogManager newEntryLogManager = newEntryLogger.entryLogManager;
+        Assert.assertEquals("EntryLogManager class type",
+                laterEntryLogPerLedgerEnabled ? EntryLogger.EntryLogManagerForEntryLogPerLedger.class
+                        : EntryLogger.EntryLogManagerForSingleEntryLog.class,
+                newEntryLogManager.getClass());
+
+        /*
+         * read the entries (which are written with previous entrylogger) with
+         * new entrylogger
+         */
+        for (int j = 0; j < numEntries; j++) {
+            for (int i = 0; i < numOfActiveLedgers; i++) {
+                String expectedValue = "ledger-" + i + "-" + j;
+                ByteBuf buf = newEntryLogger.readEntry(i, j, positions[i][j]);
+                long ledgerId = buf.readLong();
+                long entryId = buf.readLong();
+                byte[] data = new byte[buf.readableBytes()];
+                buf.readBytes(data);
+                assertEquals("LedgerId ", i, ledgerId);
+                assertEquals("EntryId ", j, entryId);
+                assertEquals("Entry Data ", expectedValue, new String(data));
+            }
+        }
+    }
+
+    /*
+     * test for validating if the EntryLog/BufferedChannel flushes/forcewrite if the bytes written to it are more than
+     * flushIntervalInBytes
+     */
+    @Test(timeout = 60000)
+    public void testFlushIntervalInBytes() throws Exception {
+        long flushIntervalInBytes = 5000;
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+        conf.setEntryLogPerLedgerEnabled(true);
+        conf.setFlushIntervalInBytes(flushIntervalInBytes);
+        conf.setLedgerDirNames(createAndGetLedgerDirs(2));
+        LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
+        EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager);
+        EntryLogManager entryLogManager = entryLogger.entryLogManager;
+
+        /*
+         * when entryLogger is created Header of length EntryLogger.LOGFILE_HEADER_SIZE is created
+         */
+        long ledgerId = 0L;
+        int firstEntrySize = 1000;
+        long entry0Position = entryLogger.addEntry(0L, generateEntry(ledgerId, 0L, firstEntrySize));
+        // entrylogger writes length of the entry (4 bytes) before writing entry
+        long expectedUnpersistedBytes = EntryLogger.LOGFILE_HEADER_SIZE + firstEntrySize + 4;
+        Assert.assertEquals("Unpersisted Bytes of entrylog", expectedUnpersistedBytes,
+                entryLogManager.getCurrentLogForLedger(ledgerId).getUnpersistedBytes());
+
+        /*
+         * 'flushIntervalInBytes' number of bytes are flushed so BufferedChannel should be forcewritten
+         */
+        int secondEntrySize = (int) (flushIntervalInBytes - expectedUnpersistedBytes);
+        long entry1Position = entryLogger.addEntry(0L, generateEntry(ledgerId, 1L, secondEntrySize));
+        Assert.assertEquals("Unpersisted Bytes of entrylog", 0,
+                entryLogManager.getCurrentLogForLedger(ledgerId).getUnpersistedBytes());
+
+        /*
+         * since entrylog/Bufferedchannel is persisted (forcewritten), we should be able to read the entrylog using
+         * newEntryLogger
+         */
+        conf.setEntryLogPerLedgerEnabled(false);
+        EntryLogger newEntryLogger = new EntryLogger(conf, ledgerDirsManager);
+        EntryLogManager newEntryLogManager = newEntryLogger.entryLogManager;
+        Assert.assertEquals("EntryLogManager class type", EntryLogger.EntryLogManagerForSingleEntryLog.class,
+                newEntryLogManager.getClass());
+
+        ByteBuf buf = newEntryLogger.readEntry(ledgerId, 0L, entry0Position);
+        long readLedgerId = buf.readLong();
+        long readEntryId = buf.readLong();
+        Assert.assertEquals("LedgerId", ledgerId, readLedgerId);
+        Assert.assertEquals("EntryId", 0L, readEntryId);
+
+        buf = newEntryLogger.readEntry(ledgerId, 1L, entry1Position);
+        readLedgerId = buf.readLong();
+        readEntryId = buf.readLong();
+        Assert.assertEquals("LedgerId", ledgerId, readLedgerId);
+        Assert.assertEquals("EntryId", 1L, readEntryId);
     }
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
index b6282443f..f43750be2 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
@@ -37,7 +37,10 @@
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
 import org.apache.bookkeeper.bookie.Bookie.NoLedgerException;
+import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
 import org.apache.bookkeeper.bookie.FileInfoBackingCache.CachedFileInfo;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.conf.TestBKConfiguration;
@@ -456,19 +459,37 @@ public void run() {
     static class FlushTestSortedLedgerStorage extends SortedLedgerStorage {
         final AtomicBoolean injectMemTableSizeLimitReached;
         final AtomicBoolean injectFlushException;
+        final AtomicLong injectFlushExceptionForLedger;
+        static final long FORALLLEDGERS = -1;
+
+        enum Flusher {
+            Sequential, Parallel
+        }
+
+        Flusher flusherCalled = Flusher.Sequential;
 
         public FlushTestSortedLedgerStorage() {
             super();
-            injectMemTableSizeLimitReached = new AtomicBoolean();
-            injectFlushException = new AtomicBoolean();
+            injectMemTableSizeLimitReached = new AtomicBoolean(false);
+            injectFlushException = new AtomicBoolean(false);
+            injectFlushExceptionForLedger = new AtomicLong(FORALLLEDGERS);
         }
 
         public void setInjectMemTableSizeLimitReached(boolean setValue) {
             injectMemTableSizeLimitReached.set(setValue);
         }
 
-        public void setInjectFlushException(boolean setValue) {
+        public void setInjectFlushException(boolean setValue, long ledgerId) {
             injectFlushException.set(setValue);
+            injectFlushExceptionForLedger.set(ledgerId);
+        }
+
+        public void setFlusherCalled(Flusher flusherCalled) {
+            this.flusherCalled = flusherCalled;
+        }
+
+        public Flusher getFlusherCalled() {
+            return this.flusherCalled;
         }
 
         @Override
@@ -489,17 +510,31 @@ public void initialize(ServerConfiguration conf,
                 checkpointSource,
                 checkpointer,
                 statsLogger);
-            this.memTable = new EntryMemTable(conf, checkpointSource, statsLogger) {
+            this.memTable = new EntryMemTable(conf, checkpointSource, this.flushExecutor, statsLogger) {
                 @Override
                 boolean isSizeLimitReached() {
                     return (injectMemTableSizeLimitReached.get() || super.isSizeLimitReached());
                 }
+
+                @Override
+                long flushSnapshotSequentially(final SkipListFlusher flusher, Checkpoint checkpoint)
+                        throws IOException {
+                    setFlusherCalled(Flusher.Sequential);
+                    return super.flushSnapshotSequentially(flusher, checkpoint);
+                }
+
+                @Override
+                long flushSnapshotParallelly(final SkipListFlusher flusher, Checkpoint checkpoint) throws IOException {
+                    setFlusherCalled(Flusher.Parallel);
+                    return super.flushSnapshotParallelly(flusher, checkpoint);
+                }
             };
         }
 
         @Override
         public void process(long ledgerId, long entryId, ByteBuf buffer) throws IOException {
-            if (injectFlushException.get()) {
+            if (injectFlushException.get() && ((injectFlushExceptionForLedger.get() == FORALLLEDGERS)
+                    || (injectFlushExceptionForLedger.get() == ledgerId))) {
                 throw new IOException("Injected Exception");
             }
             super.process(ledgerId, entryId, buffer);
@@ -547,16 +582,18 @@ public void testEntryMemTableFlushFailure() throws Exception {
 
         // set flags, so that FlushTestSortedLedgerStorage simulates FlushFailure scenario
         flushTestSortedLedgerStorage.setInjectMemTableSizeLimitReached(true);
-        flushTestSortedLedgerStorage.setInjectFlushException(true);
+        flushTestSortedLedgerStorage.setInjectFlushException(true, FlushTestSortedLedgerStorage.FORALLLEDGERS);
         flushTestSortedLedgerStorage.addEntry(generateEntry(1, 2));
         Thread.sleep(1000);
 
         // since we simulated sizeLimitReached, snapshot shouldn't be empty
         assertFalse("EntryMemTable SnapShot is not expected to be empty", memTable.snapshot.isEmpty());
+        assertEquals("Flusher called", FlushTestSortedLedgerStorage.Flusher.Sequential,
+                flushTestSortedLedgerStorage.getFlusherCalled());
 
         // set the flags to false, so flush will succeed this time
         flushTestSortedLedgerStorage.setInjectMemTableSizeLimitReached(false);
-        flushTestSortedLedgerStorage.setInjectFlushException(false);
+        flushTestSortedLedgerStorage.setInjectFlushException(false, FlushTestSortedLedgerStorage.FORALLLEDGERS);
 
         flushTestSortedLedgerStorage.addEntry(generateEntry(1, 3));
         Thread.sleep(1000);
@@ -591,7 +628,7 @@ public void testSortedLedgerFlushFailure() throws Exception {
 
         // set flags, so that FlushTestSortedLedgerStorage simulates FlushFailure scenario
         flushTestSortedLedgerStorage.setInjectMemTableSizeLimitReached(true);
-        flushTestSortedLedgerStorage.setInjectFlushException(true);
+        flushTestSortedLedgerStorage.setInjectFlushException(true, FlushTestSortedLedgerStorage.FORALLLEDGERS);
         flushTestSortedLedgerStorage.addEntry(generateEntry(1, 2));
 
         // since we simulated sizeLimitReached, snapshot shouldn't be empty
@@ -618,4 +655,110 @@ private ByteBuf generateEntry(long ledger, long entry) {
         bb.writeBytes(data);
         return bb;
     }
+
+    @Test(timeout = 60000)
+    public void testEntryMemTableParallelFlush() throws Exception {
+        int gcWaitTime = 1000;
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+        conf.setGcWaitTime(gcWaitTime);
+        conf.setLedgerDirNames(createAndGetLedgerDirs(1));
+        conf.setLedgerStorageClass(FlushTestSortedLedgerStorage.class.getName());
+        // enable entrylog per ledger
+        conf.setEntryLogPerLedgerEnabled(true);
+
+        Bookie bookie = new Bookie(conf);
+        FlushTestSortedLedgerStorage flushTestSortedLedgerStorage = (FlushTestSortedLedgerStorage) bookie.ledgerStorage;
+        EntryMemTable memTable = flushTestSortedLedgerStorage.memTable;
+
+        /*
+         * this bookie.addEntry call is required. FileInfo for Ledger 1, 2, 3
+         * would be created with this call. without the fileinfo,
+         * 'flushTestSortedLedgerStorage.addEntry' calls will fail because of
+         * BOOKKEEPER-965 change.
+         */
+        bookie.addEntry(generateEntry(1, 1), false, new Bookie.NopWriteCallback(), null, "passwd".getBytes());
+        bookie.addEntry(generateEntry(2, 1), false, new Bookie.NopWriteCallback(), null, "passwd".getBytes());
+        bookie.addEntry(generateEntry(3, 1), false, new Bookie.NopWriteCallback(), null, "passwd".getBytes());
+
+        flushTestSortedLedgerStorage.addEntry(generateEntry(1, 2));
+        flushTestSortedLedgerStorage.addEntry(generateEntry(2, 2));
+        flushTestSortedLedgerStorage.addEntry(generateEntry(3, 2));
+
+        assertTrue("EntryMemTable SnapShot is expected to be empty", memTable.snapshot.isEmpty());
+        assertFalse("EntryMemTable is not expected to be empty", memTable.isEmpty());
+
+        // inject MemTableSizeLimitReached, so entrymemtable will be flushed
+        flushTestSortedLedgerStorage.setInjectMemTableSizeLimitReached(true);
+        flushTestSortedLedgerStorage.addEntry(generateEntry(1, 3));
+        Thread.sleep(1000);
+
+        // since we simulated sizeLimitReached, snapshot should have been created and flushed
+        assertTrue("EntryMemTable SnapShot is expected to be empty", memTable.snapshot.isEmpty());
+        assertEquals("Flusher called", FlushTestSortedLedgerStorage.Flusher.Parallel,
+                flushTestSortedLedgerStorage.getFlusherCalled());
+    }
+
+    @Test(timeout = 60000)
+    public void testEntryMemTableParallelFlushWithFlushException() throws Exception {
+        int gcWaitTime = 1000;
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+        conf.setGcWaitTime(gcWaitTime);
+        conf.setLedgerDirNames(createAndGetLedgerDirs(1));
+        conf.setLedgerStorageClass(FlushTestSortedLedgerStorage.class.getName());
+        // enable entrylog per ledger
+        conf.setEntryLogPerLedgerEnabled(true);
+
+        Bookie bookie = new Bookie(conf);
+        FlushTestSortedLedgerStorage flushTestSortedLedgerStorage = (FlushTestSortedLedgerStorage) bookie.ledgerStorage;
+        EntryMemTable memTable = flushTestSortedLedgerStorage.memTable;
+
+        /*
+         * this bookie.addEntry call is required. FileInfo for Ledger 1, 2, 3
+         * would be created with this call. without the fileinfo,
+         * 'flushTestSortedLedgerStorage.addEntry' calls will fail because of
+         * BOOKKEEPER-965 change.
+         */
+        bookie.addEntry(generateEntry(1, 1), false, new Bookie.NopWriteCallback(), null, "passwd".getBytes());
+        bookie.addEntry(generateEntry(2, 1), false, new Bookie.NopWriteCallback(), null, "passwd".getBytes());
+        bookie.addEntry(generateEntry(3, 1), false, new Bookie.NopWriteCallback(), null, "passwd".getBytes());
+
+        flushTestSortedLedgerStorage.addEntry(generateEntry(1, 4));
+        flushTestSortedLedgerStorage.addEntry(generateEntry(2, 4));
+        flushTestSortedLedgerStorage.addEntry(generateEntry(3, 4));
+
+        // inject MemTableSizeLimitReached and FlushException, so entrymemtable flush will fail
+        flushTestSortedLedgerStorage.setInjectMemTableSizeLimitReached(true);
+        flushTestSortedLedgerStorage.setInjectFlushException(true, 1L);
+
+        flushTestSortedLedgerStorage.addEntry(generateEntry(1, 5));
+        Thread.sleep(1000);
+        // since we simulate FlushException, memtable snapshot should not be empty
+        assertFalse("EntryMemTable SnapShot is not expected to be empty", memTable.snapshot.isEmpty());
+        assertEquals("Flusher called", FlushTestSortedLedgerStorage.Flusher.Parallel,
+                flushTestSortedLedgerStorage.getFlusherCalled());
+
+        flushTestSortedLedgerStorage.setInjectFlushException(false, FlushTestSortedLedgerStorage.FORALLLEDGERS);
+        flushTestSortedLedgerStorage.addEntry(generateEntry(1, 5));
+        Thread.sleep(1000);
+        /*
+         * since MemTableSizeLimitReached is already set to true, and flush
+         * exception is disabled, this time memtable snapshot should be flushed
+         */
+        assertTrue("EntryMemTable SnapShot is expected to be empty", memTable.snapshot.isEmpty());
+        assertEquals("Flusher called", FlushTestSortedLedgerStorage.Flusher.Parallel,
+                flushTestSortedLedgerStorage.getFlusherCalled());
+    }
+
+    String[] createAndGetLedgerDirs(int numOfLedgerDirs) throws IOException {
+        File ledgerDir;
+        File curDir;
+        String[] ledgerDirsPath = new String[numOfLedgerDirs];
+        for (int i = 0; i < numOfLedgerDirs; i++) {
+            ledgerDir = createTempDir("bkTest", ".dir");
+            curDir = Bookie.getCurrentDirectory(ledgerDir);
+            Bookie.checkDirectoryStructure(curDir);
+            ledgerDirsPath[i] = ledgerDir.getAbsolutePath();
+        }
+        return ledgerDirsPath;
+    }
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java
index 2dfebdf1f..47b5a16ae 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SortedLedgerStorageCheckpointTest.java
@@ -207,9 +207,9 @@ public void testCheckpointAfterEntryLogRotated() throws Exception {
         });
 
         // simulate entry log is rotated (due to compaction)
-        storage.entryLogger.rollLog();
+        storage.entryLogger.createNewLog(EntryLogger.INVALID_LID);
         long leastUnflushedLogId = storage.entryLogger.getLeastUnflushedLogId();
-        long currentLogId = storage.entryLogger.getCurrentLogId();
+        long currentLogId = storage.entryLogger.getPreviousAllocatedEntryLogId();
         log.info("Least unflushed entry log : current = {}, leastUnflushed = {}", currentLogId, leastUnflushedLogId);
 
         readyLatch.countDown();
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestEntryMemTable.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestEntryMemTable.java
index 50844f3d5..998c848e2 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestEntryMemTable.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestEntryMemTable.java
@@ -59,7 +59,7 @@ public void checkpointComplete(Checkpoint checkpoint, boolean compact)
     @Before
     public void setUp() throws Exception {
         this.memTable = new EntryMemTable(TestBKConfiguration.newServerConfiguration(),
-                this, NullStatsLogger.INSTANCE);
+                this, null, NullStatsLogger.INSTANCE);
     }
 
     @Test
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java
index 4b61a657f..89df3a521 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java
@@ -215,7 +215,7 @@ public void testBookieCompaction() throws Exception {
         newEntry3.writeLong(4); // ledger id
         newEntry3.writeLong(3); // entry id
         newEntry3.writeBytes("new-entry-3".getBytes());
-        long location = entryLogger.addEntry(4, newEntry3, false);
+        long location = entryLogger.addEntry(4L, newEntry3, false);
 
         List<EntryLocation> locations = Lists.newArrayList(new EntryLocation(4, 3, location));
         storage.updateEntriesLocations(locations);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services