You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2018/05/22 02:02:45 UTC

[GitHub] sijie closed pull request #1391: Issue #570: EntryLogManagerForEntryLogPerLedger implementation

sijie closed pull request #1391: Issue #570: EntryLogManagerForEntryLogPerLedger implementation
URL: https://github.com/apache/bookkeeper/pull/1391
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerBase.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerBase.java
index 849336f66..701fb7b2e 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerBase.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerBase.java
@@ -37,7 +37,7 @@
 abstract class EntryLogManagerBase implements EntryLogManager {
     volatile List<BufferedLogChannel> rotatedLogChannels;
     final EntryLoggerAllocator entryLoggerAllocator;
-    private final LedgerDirsManager ledgerDirsManager;
+    final LedgerDirsManager ledgerDirsManager;
     private final List<EntryLogger.EntryLogListener> listeners;
     /**
      * The maximum size of a entry logger file.
@@ -93,12 +93,12 @@ boolean readEntryLogHardLimit(BufferedLogChannel logChannel, long size) {
         return logChannel.position() + size > Integer.MAX_VALUE;
     }
 
-    abstract BufferedLogChannel getCurrentLogForLedger(long ledgerId);
+    abstract BufferedLogChannel getCurrentLogForLedger(long ledgerId) throws IOException;
 
     abstract BufferedLogChannel getCurrentLogForLedgerForAddEntry(long ledgerId, int entrySize, boolean rollLog)
             throws IOException;
 
-    abstract void setCurrentLogForLedgerAndAddToRotate(long ledgerId, BufferedLogChannel logChannel);
+    abstract void setCurrentLogForLedgerAndAddToRotate(long ledgerId, BufferedLogChannel logChannel) throws IOException;
 
     /*
      * flush current logs.
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForEntryLogPerLedger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForEntryLogPerLedger.java
new file mode 100644
index 000000000..3cdbb7a2a
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForEntryLogPerLedger.java
@@ -0,0 +1,515 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import io.netty.buffer.ByteBuf;
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.bookie.EntryLogger.BufferedLogChannel;
+import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.util.collections.ConcurrentLongHashMap;
+import org.apache.commons.lang.mutable.MutableInt;
+
+@Slf4j
+class EntryLogManagerForEntryLogPerLedger extends EntryLogManagerBase {
+
+    static class BufferedLogChannelWithDirInfo {
+        private final BufferedLogChannel logChannel;
+        volatile boolean ledgerDirFull = false;
+
+        BufferedLogChannelWithDirInfo(BufferedLogChannel logChannel) {
+            this.logChannel = logChannel;
+        }
+
+        public boolean isLedgerDirFull() {
+            return ledgerDirFull;
+        }
+
+        public void setLedgerDirFull(boolean ledgerDirFull) {
+            this.ledgerDirFull = ledgerDirFull;
+        }
+
+        public BufferedLogChannel getLogChannel() {
+            return logChannel;
+        }
+    }
+
+    static class EntryLogAndLockTuple {
+        private final Lock ledgerLock;
+        private BufferedLogChannelWithDirInfo entryLogWithDirInfo;
+
+        public EntryLogAndLockTuple() {
+            ledgerLock = new ReentrantLock();
+        }
+
+        public Lock getLedgerLock() {
+            return ledgerLock;
+        }
+
+        public BufferedLogChannelWithDirInfo getEntryLogWithDirInfo() {
+            return entryLogWithDirInfo;
+        }
+
+        public void setEntryLogWithDirInfo(BufferedLogChannelWithDirInfo entryLogWithDirInfo) {
+            this.entryLogWithDirInfo = entryLogWithDirInfo;
+        }
+    }
+
+    private final LoadingCache<Long, EntryLogAndLockTuple> ledgerIdEntryLogMap;
+    /*
+     * every time active logChannel is accessed from ledgerIdEntryLogMap
+     * cache, the accesstime of that entry is updated. But for certain
+     * operations we dont want to impact accessTime of the entries (like
+     * periodic flush of current active logChannels), and those operations
+     * can use this copy of references.
+     */
+    private final ConcurrentLongHashMap<BufferedLogChannelWithDirInfo> replicaOfCurrentLogChannels;
+    private final CacheLoader<Long, EntryLogAndLockTuple> entryLogAndLockTupleCacheLoader;
+    private final EntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus;
+    private final int entrylogMapAccessExpiryTimeInSeconds;
+    private final int maximumNumberOfActiveEntryLogs;
+
+    EntryLogManagerForEntryLogPerLedger(ServerConfiguration conf, LedgerDirsManager ledgerDirsManager,
+            EntryLoggerAllocator entryLoggerAllocator, List<EntryLogger.EntryLogListener> listeners,
+            EntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus) throws IOException {
+        super(conf, ledgerDirsManager, entryLoggerAllocator, listeners);
+        this.recentlyCreatedEntryLogsStatus = recentlyCreatedEntryLogsStatus;
+        this.rotatedLogChannels = new CopyOnWriteArrayList<BufferedLogChannel>();
+        this.replicaOfCurrentLogChannels = new ConcurrentLongHashMap<BufferedLogChannelWithDirInfo>();
+        this.entrylogMapAccessExpiryTimeInSeconds = conf.getEntrylogMapAccessExpiryTimeInSeconds();
+        this.maximumNumberOfActiveEntryLogs = conf.getMaximumNumberOfActiveEntryLogs();
+        ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener());
+        this.entryLogAndLockTupleCacheLoader = new CacheLoader<Long, EntryLogAndLockTuple>() {
+            @Override
+            public EntryLogAndLockTuple load(Long key) throws Exception {
+                return new EntryLogAndLockTuple();
+            }
+        };
+        /*
+         * Currently we are relying on access time based eviction policy for
+         * removal of EntryLogAndLockTuple, so if the EntryLogAndLockTuple of
+         * the ledger is not accessed in
+         * entrylogMapAccessExpiryTimeInSeconds period, it will be removed
+         * from the cache.
+         *
+         * We are going to introduce explicit advisory writeClose call, with
+         * that explicit call EntryLogAndLockTuple of the ledger will be
+         * removed from the cache. But still timebased eviciton policy is
+         * needed because it is not guaranteed that Bookie/EntryLogger would
+         * receive successfully write close call in all the cases.
+         */
+        ledgerIdEntryLogMap = CacheBuilder.newBuilder()
+                .expireAfterAccess(entrylogMapAccessExpiryTimeInSeconds, TimeUnit.SECONDS)
+                .maximumSize(maximumNumberOfActiveEntryLogs)
+                .removalListener(new RemovalListener<Long, EntryLogAndLockTuple>() {
+                    @Override
+                    public void onRemoval(
+                            RemovalNotification<Long, EntryLogAndLockTuple> expiredLedgerEntryLogMapEntry) {
+                        onCacheEntryRemoval(expiredLedgerEntryLogMapEntry);
+                    }
+                }).build(entryLogAndLockTupleCacheLoader);
+    }
+
+    /*
+     * This method is called when an entry is removed from the cache. This could
+     * be because access time of that ledger has elapsed
+     * entrylogMapAccessExpiryTimeInSeconds period, or number of active
+     * currentlogs in the cache has reached the size of
+     * maximumNumberOfActiveEntryLogs, or if an entry is explicitly
+     * invalidated/removed. In these cases entry for that ledger is removed from
+     * cache. Since the entrylog of this ledger is not active anymore it has to
+     * be removed from replicaOfCurrentLogChannels and added to
+     * rotatedLogChannels.
+     *
+     * Because of performance/optimizations concerns the cleanup maintenance
+     * operations wont happen automatically, for more info on eviction cleanup
+     * maintenance tasks -
+     * https://google.github.io/guava/releases/19.0/api/docs/com/google/
+     * common/cache/CacheBuilder.html
+     *
+     */
+    private void onCacheEntryRemoval(RemovalNotification<Long, EntryLogAndLockTuple> removedLedgerEntryLogMapEntry) {
+        Long ledgerId = removedLedgerEntryLogMapEntry.getKey();
+        log.debug("LedgerId {} is being evicted from the cache map because of {}", ledgerId,
+                removedLedgerEntryLogMapEntry.getCause());
+        EntryLogAndLockTuple entryLogAndLockTuple = removedLedgerEntryLogMapEntry.getValue();
+        if (entryLogAndLockTuple == null) {
+            log.error("entryLogAndLockTuple is not supposed to be null in entry removal listener for ledger : {}",
+                    ledgerId);
+            return;
+        }
+        Lock lock = entryLogAndLockTuple.ledgerLock;
+        BufferedLogChannelWithDirInfo logChannelWithDirInfo = entryLogAndLockTuple.getEntryLogWithDirInfo();
+        if (logChannelWithDirInfo == null) {
+            log.error("logChannel for ledger: {} is not supposed to be null in entry removal listener", ledgerId);
+            return;
+        }
+        lock.lock();
+        try {
+            BufferedLogChannel logChannel = logChannelWithDirInfo.getLogChannel();
+            replicaOfCurrentLogChannels.remove(logChannel.getLogId());
+            rotatedLogChannels.add(logChannel);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    private LedgerDirsListener getLedgerDirsListener() {
+        return new LedgerDirsListener() {
+            @Override
+            public void diskFull(File disk) {
+                Set<BufferedLogChannelWithDirInfo> copyOfCurrentLogsWithDirInfo = getCopyOfCurrentLogs();
+                for (BufferedLogChannelWithDirInfo currentLogWithDirInfo : copyOfCurrentLogsWithDirInfo) {
+                    if (disk.equals(currentLogWithDirInfo.getLogChannel().getLogFile().getParentFile())) {
+                        currentLogWithDirInfo.setLedgerDirFull(true);
+                    }
+                }
+            }
+
+            @Override
+            public void diskWritable(File disk) {
+                Set<BufferedLogChannelWithDirInfo> copyOfCurrentLogsWithDirInfo = getCopyOfCurrentLogs();
+                for (BufferedLogChannelWithDirInfo currentLogWithDirInfo : copyOfCurrentLogsWithDirInfo) {
+                    if (disk.equals(currentLogWithDirInfo.getLogChannel().getLogFile().getParentFile())) {
+                        currentLogWithDirInfo.setLedgerDirFull(false);
+                    }
+                }
+            }
+        };
+    }
+
+    Lock getLock(long ledgerId) throws IOException {
+        try {
+            return ledgerIdEntryLogMap.get(ledgerId).getLedgerLock();
+        } catch (Exception e) {
+            log.error("Received unexpected exception while fetching lock to acquire for ledger: " + ledgerId, e);
+            throw new IOException("Received unexpected exception while fetching lock to acquire", e);
+        }
+    }
+
+    /*
+     * sets the logChannel for the given ledgerId. It will add the new
+     * logchannel to replicaOfCurrentLogChannels, and the previous one will
+     * be removed from replicaOfCurrentLogChannels. Previous logChannel will
+     * be added to rotatedLogChannels in both the cases.
+     */
+    @Override
+    public void setCurrentLogForLedgerAndAddToRotate(long ledgerId, BufferedLogChannel logChannel) throws IOException {
+        Lock lock = getLock(ledgerId);
+        lock.lock();
+        try {
+            BufferedLogChannel hasToRotateLogChannel = getCurrentLogForLedger(ledgerId);
+            logChannel.setLedgerIdAssigned(ledgerId);
+            BufferedLogChannelWithDirInfo logChannelWithDirInfo = new BufferedLogChannelWithDirInfo(logChannel);
+            ledgerIdEntryLogMap.get(ledgerId).setEntryLogWithDirInfo(logChannelWithDirInfo);
+            replicaOfCurrentLogChannels.put(logChannel.getLogId(), logChannelWithDirInfo);
+            if (hasToRotateLogChannel != null) {
+                replicaOfCurrentLogChannels.remove(hasToRotateLogChannel.getLogId());
+                rotatedLogChannels.add(hasToRotateLogChannel);
+            }
+        } catch (Exception e) {
+            log.error("Received unexpected exception while fetching entry from map for ledger: " + ledgerId, e);
+            throw new IOException("Received unexpected exception while fetching entry from map", e);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    @Override
+    public BufferedLogChannel getCurrentLogForLedger(long ledgerId) throws IOException {
+        BufferedLogChannelWithDirInfo bufferedLogChannelWithDirInfo = getCurrentLogWithDirInfoForLedger(ledgerId);
+        BufferedLogChannel bufferedLogChannel = null;
+        if (bufferedLogChannelWithDirInfo != null) {
+            bufferedLogChannel = bufferedLogChannelWithDirInfo.getLogChannel();
+        }
+        return bufferedLogChannel;
+    }
+
+    public BufferedLogChannelWithDirInfo getCurrentLogWithDirInfoForLedger(long ledgerId) throws IOException {
+        Lock lock = getLock(ledgerId);
+        lock.lock();
+        try {
+            EntryLogAndLockTuple entryLogAndLockTuple = ledgerIdEntryLogMap.get(ledgerId);
+            return entryLogAndLockTuple.getEntryLogWithDirInfo();
+        } catch (Exception e) {
+            log.error("Received unexpected exception while fetching entry from map for ledger: " + ledgerId, e);
+            throw new IOException("Received unexpected exception while fetching entry from map", e);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public Set<BufferedLogChannelWithDirInfo> getCopyOfCurrentLogs() {
+        return new HashSet<BufferedLogChannelWithDirInfo>(replicaOfCurrentLogChannels.values());
+    }
+
+    @Override
+    public BufferedLogChannel getCurrentLogIfPresent(long entryLogId) {
+        BufferedLogChannelWithDirInfo bufferedLogChannelWithDirInfo = replicaOfCurrentLogChannels.get(entryLogId);
+        BufferedLogChannel logChannel = null;
+        if (bufferedLogChannelWithDirInfo != null) {
+            logChannel = bufferedLogChannelWithDirInfo.getLogChannel();
+        }
+        return logChannel;
+    }
+
+    @Override
+    public void checkpoint() throws IOException {
+        /*
+         * In the case of entryLogPerLedgerEnabled we need to flush
+         * both rotatedlogs and currentlogs. This is needed because
+         * syncThread periodically does checkpoint and at this time
+         * all the logs should be flushed.
+         *
+         */
+        super.flush();
+    }
+
+    @Override
+    public void prepareSortedLedgerStorageCheckpoint(long numBytesFlushed) throws IOException {
+        // do nothing
+        /*
+         * prepareSortedLedgerStorageCheckpoint is required for
+         * singleentrylog scenario, but it is not needed for
+         * entrylogperledger scenario, since entries of a ledger go
+         * to a entrylog (even during compaction) and SyncThread
+         * drives periodic checkpoint logic.
+         */
+
+    }
+
+    @Override
+    public void prepareEntryMemTableFlush() {
+        // do nothing
+    }
+
+    @Override
+    public boolean commitEntryMemTableFlush() throws IOException {
+        // lock it only if there is new data
+        // so that cache accesstime is not changed
+        Set<BufferedLogChannelWithDirInfo> copyOfCurrentLogsWithDirInfo = getCopyOfCurrentLogs();
+        for (BufferedLogChannelWithDirInfo currentLogWithDirInfo : copyOfCurrentLogsWithDirInfo) {
+            BufferedLogChannel currentLog = currentLogWithDirInfo.getLogChannel();
+            if (reachEntryLogLimit(currentLog, 0L)) {
+                Long ledgerId = currentLog.getLedgerIdAssigned();
+                Lock lock = getLock(ledgerId);
+                lock.lock();
+                try {
+                    if (reachEntryLogLimit(currentLog, 0L)) {
+                        log.info("Rolling entry logger since it reached size limitation for ledger: {}", ledgerId);
+                        createNewLog(ledgerId);
+                    }
+                } finally {
+                    lock.unlock();
+                }
+            }
+        }
+        /*
+         * in the case of entrylogperledger, SyncThread drives
+         * checkpoint logic for every flushInterval. So
+         * EntryMemtable doesn't need to call checkpoint in the case
+         * of entrylogperledger.
+         */
+        return false;
+    }
+
+    /*
+     * this is for testing purpose only. guava's cache doesnt cleanup
+     * completely (including calling expiry removal listener) automatically
+     * when access timeout elapses.
+     *
+     * https://google.github.io/guava/releases/19.0/api/docs/com/google/
+     * common/cache/CacheBuilder.html
+     *
+     * If expireAfterWrite or expireAfterAccess is requested entries may be
+     * evicted on each cache modification, on occasional cache accesses, or
+     * on calls to Cache.cleanUp(). Expired entries may be counted by
+     * Cache.size(), but will never be visible to read or write operations.
+     *
+     * Certain cache configurations will result in the accrual of periodic
+     * maintenance tasks which will be performed during write operations, or
+     * during occasional read operations in the absence of writes. The
+     * Cache.cleanUp() method of the returned cache will also perform
+     * maintenance, but calling it should not be necessary with a high
+     * throughput cache. Only caches built with removalListener,
+     * expireAfterWrite, expireAfterAccess, weakKeys, weakValues, or
+     * softValues perform periodic maintenance.
+     */
+    @VisibleForTesting
+    void doEntryLogMapCleanup() {
+        ledgerIdEntryLogMap.cleanUp();
+    }
+
+    @VisibleForTesting
+    ConcurrentMap<Long, EntryLogAndLockTuple> getCacheAsMap() {
+        return ledgerIdEntryLogMap.asMap();
+    }
+    /*
+     * Returns writable ledger dir with least number of current active
+     * entrylogs.
+     */
+    @Override
+    public File getDirForNextEntryLog(List<File> writableLedgerDirs) {
+        Map<File, MutableInt> writableLedgerDirFrequency = new HashMap<File, MutableInt>();
+        writableLedgerDirs.stream()
+                .forEach((ledgerDir) -> writableLedgerDirFrequency.put(ledgerDir, new MutableInt()));
+        for (BufferedLogChannelWithDirInfo logChannelWithDirInfo : replicaOfCurrentLogChannels.values()) {
+            File parentDirOfCurrentLogChannel = logChannelWithDirInfo.getLogChannel().getLogFile().getParentFile();
+            if (writableLedgerDirFrequency.containsKey(parentDirOfCurrentLogChannel)) {
+                writableLedgerDirFrequency.get(parentDirOfCurrentLogChannel).increment();
+            }
+        }
+        @SuppressWarnings("unchecked")
+        Optional<Entry<File, MutableInt>> ledgerDirWithLeastNumofCurrentLogs = writableLedgerDirFrequency.entrySet()
+                .stream().min(Map.Entry.comparingByValue());
+        return ledgerDirWithLeastNumofCurrentLogs.get().getKey();
+    }
+
+    @Override
+    public void close() throws IOException {
+        Set<BufferedLogChannelWithDirInfo> copyOfCurrentLogsWithDirInfo = getCopyOfCurrentLogs();
+        for (BufferedLogChannelWithDirInfo currentLogWithDirInfo : copyOfCurrentLogsWithDirInfo) {
+            EntryLogger.closeFileChannel(currentLogWithDirInfo.getLogChannel());
+        }
+    }
+
+    @Override
+    public void forceClose() {
+        Set<BufferedLogChannelWithDirInfo> copyOfCurrentLogsWithDirInfo = getCopyOfCurrentLogs();
+        for (BufferedLogChannelWithDirInfo currentLogWithDirInfo : copyOfCurrentLogsWithDirInfo) {
+            EntryLogger.forceCloseFileChannel(currentLogWithDirInfo.getLogChannel());
+        }
+    }
+
+    @Override
+    void flushCurrentLogs() throws IOException {
+        Set<BufferedLogChannelWithDirInfo> copyOfCurrentLogsWithDirInfo = getCopyOfCurrentLogs();
+        for (BufferedLogChannelWithDirInfo logChannelWithDirInfo : copyOfCurrentLogsWithDirInfo) {
+            /**
+             * flushCurrentLogs method is called during checkpoint, so metadata
+             * of the file also should be force written.
+             */
+            flushLogChannel(logChannelWithDirInfo.getLogChannel(), true);
+        }
+    }
+
+    @Override
+    public BufferedLogChannel createNewLogForCompaction() throws IOException {
+        throw new UnsupportedOperationException(
+                "When entryLogPerLedger is enabled, transactional compaction should have been disabled");
+    }
+
+    @Override
+    public long addEntry(long ledger, ByteBuf entry, boolean rollLog) throws IOException {
+        Lock lock = getLock(ledger);
+        lock.lock();
+        try {
+            return super.addEntry(ledger, entry, rollLog);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    @Override
+    void createNewLog(long ledgerId) throws IOException {
+        Lock lock = getLock(ledgerId);
+        lock.lock();
+        try {
+            super.createNewLog(ledgerId);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+
+    @Override
+    BufferedLogChannel getCurrentLogForLedgerForAddEntry(long ledgerId, int entrySize, boolean rollLog)
+            throws IOException {
+        Lock lock = getLock(ledgerId);
+        lock.lock();
+        try {
+            BufferedLogChannelWithDirInfo logChannelWithDirInfo = getCurrentLogWithDirInfoForLedger(ledgerId);
+            BufferedLogChannel logChannel = null;
+            if (logChannelWithDirInfo != null) {
+                logChannel = logChannelWithDirInfo.getLogChannel();
+            }
+            boolean reachEntryLogLimit = rollLog ? reachEntryLogLimit(logChannel, entrySize)
+                    : readEntryLogHardLimit(logChannel, entrySize);
+            // Create new log if logSizeLimit reached or current disk is full
+            boolean diskFull = (logChannel == null) ? false : logChannelWithDirInfo.isLedgerDirFull();
+            boolean allDisksFull = !ledgerDirsManager.hasWritableLedgerDirs();
+
+            /**
+             * if disk of the logChannel is full or if the entrylog limit is
+             * reached of if the logchannel is not initialized, then
+             * createNewLog. If allDisks are full then proceed with the current
+             * logChannel, since Bookie must have turned to readonly mode and
+             * the addEntry traffic would be from GC and it is ok to proceed in
+             * this case.
+             */
+            if ((diskFull && (!allDisksFull)) || reachEntryLogLimit || (logChannel == null)) {
+                if (logChannel != null) {
+                    logChannel.flushAndForceWriteIfRegularFlush(false);
+                }
+                createNewLog(ledgerId);
+            }
+
+            return getCurrentLogForLedger(ledgerId);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    @Override
+    public void flushRotatedLogs() throws IOException {
+        for (BufferedLogChannel channel : rotatedLogChannels) {
+            channel.flushAndForceWrite(true);
+            // since this channel is only used for writing, after flushing the channel,
+            // we had to close the underlying file channel. Otherwise, we might end up
+            // leaking fds which cause the disk spaces could not be reclaimed.
+            EntryLogger.closeFileChannel(channel);
+            recentlyCreatedEntryLogsStatus.flushRotatedEntryLog(channel.getLogId());
+            rotatedLogChannels.remove(channel);
+            log.info("Synced entry logger {} to disk.", channel.getLogId());
+        }
+    }
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
index 75445c23f..d300e0b90 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
@@ -45,9 +45,7 @@
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -366,63 +364,8 @@ public EntryLogger(ServerConfiguration conf,
         this.entryLoggerAllocator = new EntryLoggerAllocator(conf, ledgerDirsManager, recentlyCreatedEntryLogsStatus,
                 logId);
         if (entryLogPerLedgerEnabled) {
-            this.entryLogManager = new EntryLogManagerForSingleEntryLog(conf, ledgerDirsManager, entryLoggerAllocator,
-                    listeners, recentlyCreatedEntryLogsStatus) {
-                @Override
-                public void checkpoint() throws IOException {
-                    /*
-                     * In the case of entryLogPerLedgerEnabled we need to flush
-                     * both rotatedlogs and currentlogs. This is needed because
-                     * syncThread periodically does checkpoint and at this time
-                     * all the logs should be flushed.
-                     *
-                     */
-                    super.flush();
-                }
-
-                @Override
-                public void prepareSortedLedgerStorageCheckpoint(long numBytesFlushed) throws IOException {
-                    // do nothing
-                    /*
-                     * prepareSortedLedgerStorageCheckpoint is required for
-                     * singleentrylog scenario, but it is not needed for
-                     * entrylogperledger scenario, since entries of a ledger go
-                     * to a entrylog (even during compaction) and SyncThread
-                     * drives periodic checkpoint logic.
-                     */
-
-                }
-
-                @Override
-                public void prepareEntryMemTableFlush() {
-                    // do nothing
-                }
-
-                @Override
-                public boolean commitEntryMemTableFlush() throws IOException {
-                    // lock it only if there is new data
-                    // so that cache accesstime is not changed
-                    Set<BufferedLogChannel> copyOfCurrentLogs = new HashSet<BufferedLogChannel>(
-                            Arrays.asList(super.getCurrentLogForLedger(EntryLogger.UNASSIGNED_LEDGERID)));
-                    for (BufferedLogChannel currentLog : copyOfCurrentLogs) {
-                        if (reachEntryLogLimit(currentLog, 0L)) {
-                            synchronized (this) {
-                                if (reachEntryLogLimit(currentLog, 0L)) {
-                                    LOG.info("Rolling entry logger since it reached size limitation");
-                                    createNewLog(EntryLogger.UNASSIGNED_LEDGERID);
-                                }
-                            }
-                        }
-                    }
-                    /*
-                     * in the case of entrylogperledger, SyncThread drives
-                     * checkpoint logic for every flushInterval. So
-                     * EntryMemtable doesn't need to call checkpoint in the case
-                     * of entrylogperledger.
-                     */
-                    return false;
-                }
-            };
+            this.entryLogManager = new EntryLogManagerForEntryLogPerLedger(conf, ledgerDirsManager,
+                    entryLoggerAllocator, listeners, recentlyCreatedEntryLogsStatus);
         } else {
             this.entryLogManager = new EntryLogManagerForSingleEntryLog(conf, ledgerDirsManager, entryLoggerAllocator,
                     listeners, recentlyCreatedEntryLogsStatus);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLoggerAllocator.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLoggerAllocator.java
index d33d7c48d..10e7715cd 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLoggerAllocator.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLoggerAllocator.java
@@ -51,8 +51,8 @@
 class EntryLoggerAllocator {
 
     private long preallocatedLogId;
-    private Future<BufferedLogChannel> preallocation = null;
-    private ExecutorService allocatorExecutor;
+    Future<BufferedLogChannel> preallocation = null;
+    ExecutorService allocatorExecutor;
     private final ServerConfiguration conf;
     private final LedgerDirsManager ledgerDirsManager;
     private final Object createEntryLogLock = new Object();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index bdc26e767..583296deb 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -189,6 +189,22 @@
     protected static final String NUMBER_OF_MEMTABLE_FLUSH_THREADS = "numOfMemtableFlushThreads";
 
 
+    /*
+     * config specifying if the entrylog per ledger is enabled, then the amount
+     * of time EntryLogManagerForEntryLogPerLedger should wait for closing the
+     * entrylog file after the last addEntry call for that ledger, if explicit
+     * writeclose for that ledger is not received.
+     */
+    protected static final String ENTRYLOGMAP_ACCESS_EXPIRYTIME_INSECONDS = "entrylogMapAccessExpiryTimeInSeconds";
+
+    /*
+     * in entryLogPerLedger feature, this specifies the maximum number of
+     * entrylogs that can be active at a given point in time. If there are more
+     * number of active entryLogs then the maximumNumberOfActiveEntryLogs then
+     * the entrylog will be evicted from the cache.
+     */
+    protected static final String MAXIMUM_NUMBER_OF_ACTIVE_ENTRYLOGS = "maximumNumberOfActiveEntryLogs";
+
     /**
      * Construct a default configuration object.
      */
@@ -2744,4 +2760,41 @@ public ServerConfiguration setNumOfMemtableFlushThreads(int numOfMemtableFlushTh
         this.setProperty(NUMBER_OF_MEMTABLE_FLUSH_THREADS, Integer.toString(numOfMemtableFlushThreads));
         return this;
     }
+
+    /*
+     * in entryLogPerLedger feature, this specifies the time, once this duration
+     * has elapsed after the entry's last access, that entry should be
+     * automatically removed from the cache
+     */
+    public int getEntrylogMapAccessExpiryTimeInSeconds() {
+        return this.getInt(ENTRYLOGMAP_ACCESS_EXPIRYTIME_INSECONDS, 5 * 60);
+    }
+
+    /*
+     * sets the time duration for entrylogMapAccessExpiryTimeInSeconds, which will be used for cache eviction
+     * policy, in entrylogperledger feature.
+     */
+    public ServerConfiguration setEntrylogMapAccessExpiryTimeInSeconds(int entrylogMapAccessExpiryTimeInSeconds) {
+        this.setProperty(ENTRYLOGMAP_ACCESS_EXPIRYTIME_INSECONDS,
+                Integer.toString(entrylogMapAccessExpiryTimeInSeconds));
+        return this;
+    }
+
+    /*
+     * get the maximum number of entrylogs that can be active at a given point
+     * in time.
+     */
+    public int getMaximumNumberOfActiveEntryLogs() {
+        return this.getInt(MAXIMUM_NUMBER_OF_ACTIVE_ENTRYLOGS, 500);
+    }
+
+    /*
+     * sets the maximum number of entrylogs that can be active at a given point
+     * in time.
+     */
+    public ServerConfiguration setMaximumNumberOfActiveEntryLogs(int maximumNumberOfActiveEntryLogs) {
+        this.setProperty(MAXIMUM_NUMBER_OF_ACTIVE_ENTRYLOGS,
+                Integer.toString(maximumNumberOfActiveEntryLogs));
+        return this;
+    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMap.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMap.java
index bdfaf033a..dff0af4ec 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMap.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMap.java
@@ -196,7 +196,10 @@ public void forEach(EntryProcessor<V> processor) {
         return keys;
     }
 
-    List<V> values() {
+    /**
+     * @return a new list of all keys (makes a copy)
+     */
+    public List<V> values() {
         List<V> values = Lists.newArrayListWithExpectedSize((int) size());
         forEach((key, value) -> values.add(value));
         return values;
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java
index 4c4514a7e..611a68ae7 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java
@@ -20,15 +20,28 @@
 
 import static org.junit.Assert.assertTrue;
 
+import com.google.common.util.concurrent.MoreExecutors;
+
 import java.io.File;
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.IntStream;
 
+import org.apache.bookkeeper.bookie.EntryLogManagerForEntryLogPerLedger.BufferedLogChannelWithDirInfo;
+import org.apache.bookkeeper.bookie.EntryLogger.BufferedLogChannel;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.conf.TestBKConfiguration;
 import org.apache.bookkeeper.util.DiskChecker;
+import org.apache.commons.lang.mutable.MutableInt;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -41,7 +54,7 @@
  */
 public class CreateNewLogTest {
     private static final Logger LOG = LoggerFactory
-    .getLogger(CreateNewLogTest.class);
+            .getLogger(CreateNewLogTest.class);
 
     private String[] ledgerDirs;
     private int numDirs = 100;
@@ -142,6 +155,245 @@ public void testCreateNewLogWithNoWritableLedgerDirs() throws Exception {
         assertTrue("Wrong log id", entryLogManager.getCurrentLogId() > 1);
     }
 
+    void setSameThreadExecutorForEntryLoggerAllocator(EntryLoggerAllocator entryLoggerAllocator) {
+        ExecutorService executorService = entryLoggerAllocator.allocatorExecutor;
+        executorService.shutdown();
+        entryLoggerAllocator.allocatorExecutor = MoreExecutors.newDirectExecutorService();
+    }
+
+    /*
+     * entryLogPerLedger is enabled and various scenarios of entrylogcreation are tested
+     */
+    @Test
+    public void testEntryLogPerLedgerCreationWithPreAllocation() throws Exception {
+        /*
+         * I wish I could shorten this testcase or split it into multiple testcases,
+         * but I want to cover a scenario and it requires multiple operations in
+         * sequence and validations along the way. Please bear with the length of this
+         * testcase, I added as many comments as I can to simplify it.
+         */
+
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+
+        // Creating a new configuration with a number of ledger directories.
+        conf.setLedgerDirNames(ledgerDirs);
+        conf.setIsForceGCAllowWhenNoSpace(true);
+        // preAllocation is Enabled
+        conf.setEntryLogFilePreAllocationEnabled(true);
+        conf.setEntryLogPerLedgerEnabled(true);
+        LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
+        EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager);
+        EntryLoggerAllocator entryLoggerAllocator = entryLogger.entryLoggerAllocator;
+        EntryLogManagerForEntryLogPerLedger entryLogManager = (EntryLogManagerForEntryLogPerLedger) entryLogger
+                .getEntryLogManager();
+        // set same thread executor for entryLoggerAllocator's allocatorExecutor
+        setSameThreadExecutorForEntryLoggerAllocator(entryLoggerAllocator);
+
+        /*
+         * no entrylog will be created during initialization
+         */
+        int expectedPreAllocatedLogID = -1;
+        Assert.assertEquals("PreallocatedlogId after initialization of Entrylogger",
+                expectedPreAllocatedLogID, entryLoggerAllocator.getPreallocatedLogId());
+
+        int numOfLedgers = 6;
+
+        for (long i = 0; i < numOfLedgers; i++) {
+            /* since we are starting creation of new ledgers, entrylogid will be ledgerid */
+            entryLogManager.createNewLog(i);
+        }
+
+        /*
+         * preallocation is enabled so though entryLogId starts with 0, preallocatedLogId would be equal to numOfLedgers
+         */
+        expectedPreAllocatedLogID = numOfLedgers;
+        Assert.assertEquals("PreallocatedlogId after creation of logs for ledgers", expectedPreAllocatedLogID,
+                entryLoggerAllocator.getPreallocatedLogId());
+        Assert.assertEquals("Number of current ", numOfLedgers,
+                entryLogManager.getCopyOfCurrentLogs().size());
+        Assert.assertEquals("Number of LogChannels to flush", 0,
+                entryLogManager.getRotatedLogChannels().size());
+
+        // create dummy entrylog file with id - (expectedPreAllocatedLogID + 1)
+        String logFileName = Long.toHexString(expectedPreAllocatedLogID + 1) + ".log";
+        File dir = ledgerDirsManager.pickRandomWritableDir();
+        LOG.info("Picked this directory: " + dir);
+        File newLogFile = new File(dir, logFileName);
+        newLogFile.createNewFile();
+
+        /*
+         * since there is already preexisting entrylog file with id -
+         * (expectedPreAllocatedLogIDDuringInitialization + 1), when new
+         * entrylog is created it should have
+         * (expectedPreAllocatedLogIDDuringInitialization + 2) id
+         */
+        long rotatedLedger = 1L;
+        entryLogManager.createNewLog(rotatedLedger);
+
+        expectedPreAllocatedLogID = expectedPreAllocatedLogID + 2;
+        Assert.assertEquals("PreallocatedlogId ",
+                expectedPreAllocatedLogID, entryLoggerAllocator.getPreallocatedLogId());
+        Assert.assertEquals("Number of current ", numOfLedgers,
+                entryLogManager.getCopyOfCurrentLogs().size());
+        List<BufferedLogChannel> rotatedLogChannels = entryLogManager.getRotatedLogChannels();
+        Assert.assertEquals("Number of LogChannels rotated", 1, rotatedLogChannels.size());
+        Assert.assertEquals("Rotated logchannel logid", rotatedLedger, rotatedLogChannels.iterator().next().getLogId());
+        entryLogger.flush();
+        /*
+         * when flush is called all the rotatedlogchannels are flushed and
+         * removed from rotatedlogchannels list. But here since entrylogId - 0,
+         * is not yet rotated and flushed yet, getLeastUnflushedLogId will still
+         * return 0.
+         */
+        rotatedLogChannels = entryLogManager.getRotatedLogChannels();
+        Assert.assertEquals("Number of LogChannels rotated", 0, rotatedLogChannels.size());
+        Assert.assertEquals("Least UnflushedLoggerId", 0, entryLogger.getLeastUnflushedLogId());
+
+        entryLogManager.createNewLog(0L);
+        rotatedLogChannels = entryLogManager.getRotatedLogChannels();
+        Assert.assertEquals("Number of LogChannels rotated", 1, rotatedLogChannels.size());
+        Assert.assertEquals("Least UnflushedLoggerId", 0, entryLogger.getLeastUnflushedLogId());
+        entryLogger.flush();
+        /*
+         * since both entrylogids 0, 1 are rotated and flushed,
+         * leastunFlushedLogId should be 2
+         */
+        Assert.assertEquals("Least UnflushedLoggerId", 2, entryLogger.getLeastUnflushedLogId());
+        expectedPreAllocatedLogID = expectedPreAllocatedLogID + 1;
+
+        /*
+         * we should be able to get entryLogMetadata from all the active
+         * entrylogs and the logs which are moved toflush list. Since no entry
+         * is added, all the meta should be empty.
+         */
+        for (int i = 0; i <= expectedPreAllocatedLogID; i++) {
+            EntryLogMetadata meta = entryLogger.getEntryLogMetadata(i);
+            Assert.assertTrue("EntryLogMetadata should be empty", meta.isEmpty());
+            Assert.assertTrue("EntryLog usage should be 0", meta.getTotalSize() == 0);
+        }
+    }
+
+    /**
+     * In this testcase entryLogPerLedger is Enabled and entrylogs are created
+     * while ledgerdirs are getting full.
+     */
+    @Test
+    public void testEntryLogCreationWithFilledDirs() throws Exception {
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+
+        // Creating a new configuration with a number of ledger directories.
+        conf.setLedgerDirNames(ledgerDirs);
+        // forceGCAllowWhenNoSpace is disabled
+        conf.setIsForceGCAllowWhenNoSpace(false);
+        // pre-allocation is not enabled
+        conf.setEntryLogFilePreAllocationEnabled(false);
+        conf.setEntryLogPerLedgerEnabled(true);
+        LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
+        EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager);
+        EntryLoggerAllocator entryLoggerAllocator = entryLogger.entryLoggerAllocator;
+        EntryLogManagerForEntryLogPerLedger entryLogManager = (EntryLogManagerForEntryLogPerLedger)
+                entryLogger.getEntryLogManager();
+        // set same thread executor for entryLoggerAllocator's allocatorExecutor
+        setSameThreadExecutorForEntryLoggerAllocator(entryLoggerAllocator);
+
+        int expectedPreAllocatedLogIDDuringInitialization = -1;
+        Assert.assertEquals("PreallocatedlogId after initialization of Entrylogger",
+                expectedPreAllocatedLogIDDuringInitialization, entryLoggerAllocator.getPreallocatedLogId());
+        Assert.assertEquals("Preallocation Future of this slot should be null", null,
+                entryLogger.entryLoggerAllocator.preallocation);
+
+        long ledgerId = 0L;
+
+        entryLogManager.createNewLog(ledgerId);
+
+        /*
+         * pre-allocation is not enabled, so it would not preallocate for next entrylog
+         */
+        Assert.assertEquals("PreallocatedlogId after initialization of Entrylogger",
+                expectedPreAllocatedLogIDDuringInitialization + 1, entryLoggerAllocator.getPreallocatedLogId());
+
+        for (int i = 0; i < numDirs - 1; i++) {
+            ledgerDirsManager.addToFilledDirs(Bookie.getCurrentDirectory(new File(ledgerDirs[i])));
+        }
+
+        /*
+         * this is the only non-filled ledgerDir so it should be used for creating new entryLog
+         */
+        File nonFilledLedgerDir = Bookie.getCurrentDirectory(new File(ledgerDirs[numDirs - 1]));
+
+        entryLogManager.createNewLog(ledgerId);
+        BufferedLogChannel newLogChannel = entryLogManager.getCurrentLogForLedger(ledgerId);
+        Assert.assertEquals("Directory of newly created BufferedLogChannel file", nonFilledLedgerDir.getAbsolutePath(),
+                newLogChannel.getLogFile().getParentFile().getAbsolutePath());
+
+        ledgerDirsManager.addToFilledDirs(Bookie.getCurrentDirectory(new File(ledgerDirs[numDirs - 1])));
+
+        // new entrylog creation should succeed, though there is no writable ledgerDir
+        entryLogManager.createNewLog(ledgerId);
+    }
+
+    /*
+     * In this testcase it is validated if the entryLog is created in the
+     * ledgerDir with least number of current active entrylogs
+     */
+    @Test
+    public void testLedgerDirsUniformityDuringCreation() throws Exception {
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+
+        // Creating a new configuration with a number of ledger directories.
+        conf.setLedgerDirNames(ledgerDirs);
+        // pre-allocation is not enabled
+        conf.setEntryLogFilePreAllocationEnabled(false);
+        conf.setEntryLogPerLedgerEnabled(true);
+        LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
+        EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager);
+        EntryLogManagerForEntryLogPerLedger entrylogManager = (EntryLogManagerForEntryLogPerLedger)
+                entryLogger.getEntryLogManager();
+
+        for (long i = 0; i < ledgerDirs.length; i++) {
+            entrylogManager.createNewLog(i);
+        }
+
+        int numberOfLedgersCreated = ledgerDirs.length;
+
+        Assert.assertEquals("Highest frequency of entrylogs per ledgerdir", 1,
+                highestFrequencyOfEntryLogsPerLedgerDir(entrylogManager.getCopyOfCurrentLogs()));
+
+        long newLedgerId = numberOfLedgersCreated;
+        entrylogManager.createNewLog(newLedgerId);
+        numberOfLedgersCreated++;
+
+        Assert.assertEquals("Highest frequency of entrylogs per ledgerdir", 2,
+                highestFrequencyOfEntryLogsPerLedgerDir(entrylogManager.getCopyOfCurrentLogs()));
+
+        for (long i = numberOfLedgersCreated; i < 2 * ledgerDirs.length; i++) {
+            entrylogManager.createNewLog(i);
+        }
+
+        Assert.assertEquals("Highest frequency of entrylogs per ledgerdir", 2,
+                highestFrequencyOfEntryLogsPerLedgerDir(entrylogManager.getCopyOfCurrentLogs()));
+    }
+
+
+    int highestFrequencyOfEntryLogsPerLedgerDir(Set<BufferedLogChannelWithDirInfo> copyOfCurrentLogsWithDirInfo) {
+        Map<File, MutableInt> frequencyOfEntryLogsInLedgerDirs = new HashMap<File, MutableInt>();
+        for (BufferedLogChannelWithDirInfo logChannelWithDirInfo : copyOfCurrentLogsWithDirInfo) {
+            File parentDir = logChannelWithDirInfo.getLogChannel().getLogFile().getParentFile();
+            if (frequencyOfEntryLogsInLedgerDirs.containsKey(parentDir)) {
+                frequencyOfEntryLogsInLedgerDirs.get(parentDir).increment();
+            } else {
+                frequencyOfEntryLogsInLedgerDirs.put(parentDir, new MutableInt(1));
+            }
+        }
+        @SuppressWarnings("unchecked")
+        int highestFreq = ((Entry<File, MutableInt>) (frequencyOfEntryLogsInLedgerDirs.entrySet().stream()
+                .max(Map.Entry.comparingByValue()).get())).getValue().intValue();
+        return highestFreq;
+    }
+
     @Test
     public void testConcurrentCreateNewLogWithEntryLogFilePreAllocationEnabled() throws Exception {
         testConcurrentCreateNewLog(true);
@@ -164,6 +416,9 @@ public void testConcurrentCreateNewLog(boolean entryLogFilePreAllocationEnabled)
 
         EntryLogger el = new EntryLogger(conf, ledgerDirsManager);
         EntryLogManagerBase entryLogManager = (EntryLogManagerBase) el.getEntryLogManager();
+        // set same thread executor for entryLoggerAllocator's allocatorExecutor
+        setSameThreadExecutorForEntryLoggerAllocator(el.getEntryLoggerAllocator());
+
         Assert.assertEquals("previousAllocatedEntryLogId after initialization", -1,
                 el.getPreviousAllocatedEntryLogId());
         Assert.assertEquals("leastUnflushedLogId after initialization", 0, el.getLeastUnflushedLogId());
@@ -178,8 +433,6 @@ public void testConcurrentCreateNewLog(boolean entryLogFilePreAllocationEnabled)
                 receivedException.set(true);
             }
         });
-        // wait for the pre-allocation to complete
-        Thread.sleep(1000);
 
         Assert.assertFalse("There shouldn't be any exceptions while creating newlog", receivedException.get());
         int expectedPreviousAllocatedEntryLogId = createNewLogNumOfTimes - 1;
@@ -189,7 +442,7 @@ public void testConcurrentCreateNewLog(boolean entryLogFilePreAllocationEnabled)
 
         Assert.assertEquals(
                 "previousAllocatedEntryLogId after " + createNewLogNumOfTimes
-                        + " number of times createNewLog is called",
+                + " number of times createNewLog is called",
                 expectedPreviousAllocatedEntryLogId, el.getPreviousAllocatedEntryLogId());
         Assert.assertEquals("Number of RotatedLogChannels", createNewLogNumOfTimes - 1,
                 entryLogManager.getRotatedLogChannels().size());
@@ -246,6 +499,8 @@ public void testCreateNewLogAndCompactionLog() throws Exception {
         LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
                 new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
         EntryLogger el = new EntryLogger(conf, ledgerDirsManager);
+        // set same thread executor for entryLoggerAllocator's allocatorExecutor
+        setSameThreadExecutorForEntryLoggerAllocator(el.getEntryLoggerAllocator());
         AtomicBoolean receivedException = new AtomicBoolean(false);
 
         IntStream.range(0, 2).parallel().forEach((i) -> {
@@ -260,12 +515,67 @@ public void testCreateNewLogAndCompactionLog() throws Exception {
                 receivedException.set(true);
             }
         });
-        // wait for the pre-allocation to complete
-        Thread.sleep(1000);
 
         Assert.assertFalse("There shouldn't be any exceptions while creating newlog", receivedException.get());
         Assert.assertEquals(
                 "previousAllocatedEntryLogId after 2 times createNewLog is called", 2,
                 el.getPreviousAllocatedEntryLogId());
     }
+
+    /*
+     * In this testcase entrylogs for ledgers are tried to create concurrently.
+     */
+    @Test
+    public void testConcurrentEntryLogCreations() throws Exception {
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+
+        // Creating a new configuration with a number of ledger directories.
+        conf.setLedgerDirNames(ledgerDirs);
+        // pre-allocation is enabled
+        conf.setEntryLogFilePreAllocationEnabled(true);
+        conf.setEntryLogPerLedgerEnabled(true);
+        LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
+        EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager);
+        EntryLogManagerForEntryLogPerLedger entrylogManager = (EntryLogManagerForEntryLogPerLedger)
+                entryLogger.getEntryLogManager();
+
+        int numOfLedgers = 10;
+        int numOfThreadsForSameLedger = 10;
+        AtomicInteger createdEntryLogs = new AtomicInteger(0);
+        CountDownLatch startLatch = new CountDownLatch(1);
+        CountDownLatch createdLatch = new CountDownLatch(numOfLedgers * numOfThreadsForSameLedger);
+
+        for (long i = 0; i < numOfLedgers; i++) {
+            for (int j = 0; j < numOfThreadsForSameLedger; j++) {
+                long ledgerId = i;
+                new Thread(() -> {
+                    try {
+                        startLatch.await();
+                        entrylogManager.createNewLog(ledgerId);
+                        createdEntryLogs.incrementAndGet();
+                    } catch (InterruptedException | IOException e) {
+                        LOG.error("Got exception while trying to createNewLog for Ledger: " + ledgerId, e);
+                    } finally {
+                        createdLatch.countDown();
+                    }
+                }).start();
+            }
+        }
+
+        startLatch.countDown();
+        createdLatch.await(5, TimeUnit.SECONDS);
+        Assert.assertEquals("Created EntryLogs", numOfLedgers * numOfThreadsForSameLedger, createdEntryLogs.get());
+        Assert.assertEquals("Active currentlogs size", numOfLedgers, entrylogManager.getCopyOfCurrentLogs().size());
+        Assert.assertEquals("Rotated entrylogs size", (numOfThreadsForSameLedger - 1) * numOfLedgers,
+                entrylogManager.getRotatedLogChannels().size());
+        /*
+         * EntryLogFilePreAllocation is Enabled so
+         * getPreviousAllocatedEntryLogId would be (numOfLedgers *
+         * numOfThreadsForSameLedger) instead of (numOfLedgers *
+         * numOfThreadsForSameLedger - 1)
+         */
+        Assert.assertEquals("PreviousAllocatedEntryLogId", numOfLedgers * numOfThreadsForSameLedger,
+                entryLogger.getPreviousAllocatedEntryLogId());
+    }
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
index ac6433515..f5e73a167 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
@@ -26,27 +26,32 @@
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-
 import com.google.common.collect.Sets;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
-
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Random;
+import java.util.Set;
 import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLongArray;
+import java.util.concurrent.locks.Lock;
 
 import org.apache.bookkeeper.bookie.EntryLogger.BufferedLogChannel;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
@@ -814,7 +819,7 @@ public void testEntryLoggersRecentEntryLogsStatus() throws Exception {
      * test for validating if the EntryLog/BufferedChannel flushes/forcewrite if the bytes written to it are more than
      * flushIntervalInBytes
      */
-    @Test(timeout = 60000)
+    @Test
     public void testFlushIntervalInBytes() throws Exception {
         long flushIntervalInBytes = 5000;
         ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
@@ -867,4 +872,828 @@ public void testFlushIntervalInBytes() throws Exception {
         Assert.assertEquals("LedgerId", ledgerId, readLedgerId);
         Assert.assertEquals("EntryId", 1L, readEntryId);
     }
+
+    /*
+     * tests basic logic of EntryLogManager interface for
+     * EntryLogManagerForEntryLogPerLedger.
+     */
+    @Test
+    public void testEntryLogManagerInterfaceForEntryLogPerLedger() throws Exception {
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+        conf.setEntryLogFilePreAllocationEnabled(true);
+        conf.setEntryLogPerLedgerEnabled(true);
+        conf.setLedgerDirNames(createAndGetLedgerDirs(2));
+        LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
+
+        EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager);
+        EntryLogManagerForEntryLogPerLedger entryLogManager = (EntryLogManagerForEntryLogPerLedger) entryLogger
+                .getEntryLogManager();
+
+        Assert.assertEquals("Number of current active EntryLogs ", 0, entryLogManager.getCopyOfCurrentLogs().size());
+        Assert.assertEquals("Number of Rotated Logs ", 0, entryLogManager.getRotatedLogChannels().size());
+
+        int numOfLedgers = 5;
+        int numOfThreadsPerLedger = 10;
+        validateLockAcquireAndRelease(numOfLedgers, numOfThreadsPerLedger, entryLogManager);
+
+        for (long i = 0; i < numOfLedgers; i++) {
+            entryLogManager.setCurrentLogForLedgerAndAddToRotate(i,
+                    createDummyBufferedLogChannel(entryLogger, i, conf));
+        }
+
+        for (long i = 0; i < numOfLedgers; i++) {
+            Assert.assertEquals("LogChannel for ledger: " + i, entryLogManager.getCurrentLogIfPresent(i),
+                    entryLogManager.getCurrentLogForLedger(i));
+        }
+
+        Assert.assertEquals("Number of current active EntryLogs ", numOfLedgers,
+                entryLogManager.getCopyOfCurrentLogs().size());
+        Assert.assertEquals("Number of Rotated Logs ", 0, entryLogManager.getRotatedLogChannels().size());
+
+        for (long i = 0; i < numOfLedgers; i++) {
+            entryLogManager.setCurrentLogForLedgerAndAddToRotate(i,
+                    createDummyBufferedLogChannel(entryLogger, numOfLedgers + i, conf));
+        }
+
+        /*
+         * since new entryLogs are set for all the ledgers, previous entrylogs would be added to rotatedLogChannels
+         */
+        Assert.assertEquals("Number of current active EntryLogs ", numOfLedgers,
+                entryLogManager.getCopyOfCurrentLogs().size());
+        Assert.assertEquals("Number of Rotated Logs ", numOfLedgers,
+                entryLogManager.getRotatedLogChannels().size());
+
+        for (long i = 0; i < numOfLedgers; i++) {
+            entryLogManager.setCurrentLogForLedgerAndAddToRotate(i,
+                    createDummyBufferedLogChannel(entryLogger, 2 * numOfLedgers + i, conf));
+        }
+
+        /*
+         * again since new entryLogs are set for all the ledgers, previous entrylogs would be added to
+         * rotatedLogChannels
+         */
+        Assert.assertEquals("Number of current active EntryLogs ", numOfLedgers,
+                entryLogManager.getCopyOfCurrentLogs().size());
+        Assert.assertEquals("Number of Rotated Logs ", 2 * numOfLedgers,
+                entryLogManager.getRotatedLogChannels().size());
+
+        for (BufferedLogChannel logChannel : entryLogManager.getRotatedLogChannels()) {
+            entryLogManager.getRotatedLogChannels().remove(logChannel);
+        }
+        Assert.assertEquals("Number of Rotated Logs ", 0, entryLogManager.getRotatedLogChannels().size());
+
+        // entrylogid is sequential
+        for (long i = 0; i < numOfLedgers; i++) {
+            assertEquals("EntryLogid for Ledger " + i, 2 * numOfLedgers + i,
+                    entryLogManager.getCurrentLogForLedger(i).getLogId());
+        }
+
+        for (long i = 2 * numOfLedgers; i < (3 * numOfLedgers); i++) {
+            assertTrue("EntryLog with logId: " + i + " should be present",
+                    entryLogManager.getCurrentLogIfPresent(i) != null);
+        }
+    }
+
+    private EntryLogger.BufferedLogChannel createDummyBufferedLogChannel(EntryLogger entryLogger, long logid,
+            ServerConfiguration servConf) throws IOException {
+        File tmpFile = File.createTempFile("entrylog", logid + "");
+        tmpFile.deleteOnExit();
+        FileChannel fc = FileChannel.open(tmpFile.toPath());
+        EntryLogger.BufferedLogChannel logChannel = new BufferedLogChannel(fc, 10, 10, logid, tmpFile,
+                servConf.getFlushIntervalInBytes());
+        return logChannel;
+    }
+
+    /*
+     * validates the concurrency aspect of entryLogManager's lock
+     *
+     * Executor of fixedThreadPool of size 'numOfLedgers * numOfThreadsPerLedger' is created and the same number
+     * of tasks are submitted to the Executor. In each task, lock of that ledger is acquired and then released.
+     */
+    private void validateLockAcquireAndRelease(int numOfLedgers, int numOfThreadsPerLedger,
+            EntryLogManagerForEntryLogPerLedger entryLogManager) throws InterruptedException {
+        ExecutorService tpe = Executors.newFixedThreadPool(numOfLedgers * numOfThreadsPerLedger);
+        CountDownLatch latchToStart = new CountDownLatch(1);
+        CountDownLatch latchToWait = new CountDownLatch(1);
+        AtomicInteger numberOfThreadsAcquiredLock = new AtomicInteger(0);
+        AtomicBoolean irptExceptionHappened = new AtomicBoolean(false);
+        Random rand = new Random();
+
+        for (int i = 0; i < numOfLedgers * numOfThreadsPerLedger; i++) {
+            long ledgerId = i % numOfLedgers;
+            tpe.submit(() -> {
+                try {
+                    latchToStart.await();
+                    Lock lock = entryLogManager.getLock(ledgerId);
+                    lock.lock();
+                    numberOfThreadsAcquiredLock.incrementAndGet();
+                    latchToWait.await();
+                    lock.unlock();
+                } catch (InterruptedException | IOException e) {
+                    irptExceptionHappened.set(true);
+                }
+            });
+        }
+
+        assertEquals("Number Of Threads acquired Lock", 0, numberOfThreadsAcquiredLock.get());
+        latchToStart.countDown();
+        Thread.sleep(1000);
+        /*
+         * since there are only "numOfLedgers" ledgers, only "numOfLedgers" threads should have been able to acquire
+         * lock. After acquiring the lock there must be waiting on 'latchToWait' latch
+         */
+        assertEquals("Number Of Threads acquired Lock", numOfLedgers, numberOfThreadsAcquiredLock.get());
+        latchToWait.countDown();
+        Thread.sleep(2000);
+        assertEquals("Number Of Threads acquired Lock", numOfLedgers * numOfThreadsPerLedger,
+                numberOfThreadsAcquiredLock.get());
+    }
+
+    /*
+     * test EntryLogManager.EntryLogManagerForEntryLogPerLedger removes the
+     * ledger from its cache map if entry is not added to that ledger or its
+     * corresponding state is not accessed for more than evictionPeriod
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testEntryLogManagerExpiryRemoval() throws Exception {
+        int evictionPeriod = 1;
+
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+        conf.setEntryLogFilePreAllocationEnabled(false);
+        conf.setEntryLogPerLedgerEnabled(true);
+        conf.setLedgerDirNames(createAndGetLedgerDirs(2));
+        conf.setEntrylogMapAccessExpiryTimeInSeconds(evictionPeriod);
+        LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
+
+        EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager);
+        EntryLogManagerForEntryLogPerLedger entryLogManager =
+                (EntryLogManagerForEntryLogPerLedger) entryLogger.getEntryLogManager();
+
+        long ledgerId = 0L;
+
+        BufferedLogChannel logChannel = createDummyBufferedLogChannel(entryLogger, 0, conf);
+        entryLogManager.setCurrentLogForLedgerAndAddToRotate(ledgerId, logChannel);
+
+        BufferedLogChannel currentLogForLedger = entryLogManager.getCurrentLogForLedger(ledgerId);
+        assertEquals("LogChannel for ledger " + ledgerId + " should match", logChannel, currentLogForLedger);
+
+        Thread.sleep(evictionPeriod * 1000 + 100);
+        entryLogManager.doEntryLogMapCleanup();
+
+        /*
+         * since for more than evictionPeriod, that ledger is not accessed and cache is cleaned up, mapping for that
+         * ledger should not be available anymore
+         */
+        currentLogForLedger = entryLogManager.getCurrentLogForLedger(ledgerId);
+        assertEquals("LogChannel for ledger " + ledgerId + " should be null", null, currentLogForLedger);
+        Assert.assertEquals("Number of current active EntryLogs ", 0, entryLogManager.getCopyOfCurrentLogs().size());
+        Assert.assertEquals("Number of rotated EntryLogs ", 1, entryLogManager.getRotatedLogChannels().size());
+        Assert.assertTrue("CopyOfRotatedLogChannels should contain the created LogChannel",
+                entryLogManager.getRotatedLogChannels().contains(logChannel));
+
+        Assert.assertTrue("since mapentry must have been evicted, it should be null",
+                (entryLogManager.getCacheAsMap().get(ledgerId) == null)
+                        || (entryLogManager.getCacheAsMap().get(ledgerId).getEntryLogWithDirInfo() == null));
+    }
+
+    /*
+     * tests if the maximum size of cache (maximumNumberOfActiveEntryLogs) is
+     * honored in EntryLogManagerForEntryLogPerLedger's cache eviction policy.
+     */
+    @Test
+    public void testCacheMaximumSizeEvictionPolicy() throws Exception {
+        final int cacheMaximumSize = 20;
+
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+        conf.setEntryLogFilePreAllocationEnabled(true);
+        conf.setEntryLogPerLedgerEnabled(true);
+        conf.setLedgerDirNames(createAndGetLedgerDirs(1));
+        conf.setMaximumNumberOfActiveEntryLogs(cacheMaximumSize);
+        LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
+
+        EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager);
+        EntryLogManagerForEntryLogPerLedger entryLogManager =
+                (EntryLogManagerForEntryLogPerLedger) entryLogger.getEntryLogManager();
+
+        for (int i = 0; i < cacheMaximumSize + 10; i++) {
+            entryLogManager.createNewLog(i);
+            int cacheSize = entryLogManager.getCacheAsMap().size();
+            Assert.assertTrue("Cache maximum size is expected to be less than " + cacheMaximumSize
+                    + " but current cacheSize is " + cacheSize, cacheSize <= cacheMaximumSize);
+        }
+    }
+
+    /**
+     * test EntryLogManager.EntryLogManagerForEntryLogPerLedger doesn't removes
+     * the ledger from its cache map if ledger's corresponding state is accessed
+     * within the evictionPeriod.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testExpiryRemovalByAccessingOnAnotherThread() throws Exception {
+        int evictionPeriod = 1;
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+        conf.setEntryLogFilePreAllocationEnabled(false);
+        conf.setEntryLogPerLedgerEnabled(true);
+        conf.setLedgerDirNames(createAndGetLedgerDirs(2));
+        conf.setEntrylogMapAccessExpiryTimeInSeconds(evictionPeriod);
+        LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
+
+        EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager);
+        EntryLogManagerForEntryLogPerLedger entryLogManager =
+                (EntryLogManagerForEntryLogPerLedger) entryLogger.getEntryLogManager();
+
+        long ledgerId = 0L;
+
+        BufferedLogChannel newLogChannel = createDummyBufferedLogChannel(entryLogger, 1, conf);
+        entryLogManager.setCurrentLogForLedgerAndAddToRotate(ledgerId, newLogChannel);
+
+        Thread t = new Thread() {
+            public void run() {
+                try {
+                    Thread.sleep((evictionPeriod * 1000) / 2);
+                    entryLogManager.getCurrentLogForLedger(ledgerId);
+                } catch (InterruptedException | IOException e) {
+                }
+            }
+        };
+
+        t.start();
+        Thread.sleep(evictionPeriod * 1000 + 100);
+        entryLogManager.doEntryLogMapCleanup();
+
+        /*
+         * in this scenario, that ledger is accessed by other thread during
+         * eviction period time, so it should not be evicted.
+         */
+        BufferedLogChannel currentLogForLedger = entryLogManager.getCurrentLogForLedger(ledgerId);
+        assertEquals("LogChannel for ledger " + ledgerId, newLogChannel, currentLogForLedger);
+        Assert.assertEquals("Number of current active EntryLogs ", 1, entryLogManager.getCopyOfCurrentLogs().size());
+        Assert.assertEquals("Number of rotated EntryLogs ", 0, entryLogManager.getRotatedLogChannels().size());
+    }
+
+    /**
+     * test EntryLogManager.EntryLogManagerForEntryLogPerLedger removes the
+     * ledger from its cache map if entry is not added to that ledger or its
+     * corresponding state is not accessed for more than evictionPeriod. In this
+     * testcase we try to call unrelated methods or access state of other
+     * ledgers within the eviction period.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testExpiryRemovalByAccessingNonCacheRelatedMethods() throws Exception {
+        int evictionPeriod = 1;
+
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+        conf.setEntryLogFilePreAllocationEnabled(false);
+        conf.setEntryLogPerLedgerEnabled(true);
+        conf.setLedgerDirNames(createAndGetLedgerDirs(2));
+        conf.setEntrylogMapAccessExpiryTimeInSeconds(evictionPeriod);
+        LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
+
+        EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager);
+        EntryLogManagerForEntryLogPerLedger entryLogManager =
+                (EntryLogManagerForEntryLogPerLedger) entryLogger.getEntryLogManager();
+
+        long ledgerId = 0L;
+
+        BufferedLogChannel newLogChannel = createDummyBufferedLogChannel(entryLogger, 1, conf);
+        entryLogManager.setCurrentLogForLedgerAndAddToRotate(ledgerId, newLogChannel);
+
+        AtomicBoolean exceptionOccured = new AtomicBoolean(false);
+        Thread t = new Thread() {
+            public void run() {
+                try {
+                    Thread.sleep(500);
+                    /*
+                     * any of the following operations should not access entry
+                     * of 'ledgerId' in the cache
+                     */
+                    entryLogManager.getCopyOfCurrentLogs();
+                    entryLogManager.getRotatedLogChannels();
+                    entryLogManager.getCurrentLogIfPresent(newLogChannel.getLogId());
+                    entryLogManager.getDirForNextEntryLog(ledgerDirsManager.getWritableLedgerDirs());
+                    long newLedgerId = 100;
+                    BufferedLogChannel logChannelForNewLedger =
+                            createDummyBufferedLogChannel(entryLogger, newLedgerId, conf);
+                    entryLogManager.setCurrentLogForLedgerAndAddToRotate(newLedgerId, logChannelForNewLedger);
+                    entryLogManager.getCurrentLogIfPresent(newLedgerId);
+                } catch (Exception e) {
+                    LOG.error("Got Exception in thread", e);
+                    exceptionOccured.set(true);
+                }
+            }
+        };
+
+        t.start();
+        Thread.sleep(evictionPeriod * 1000 + 100);
+        entryLogManager.doEntryLogMapCleanup();
+        Assert.assertFalse("Exception occured in thread, which is not expected", exceptionOccured.get());
+
+        /*
+         * since for more than evictionPeriod, that ledger is not accessed and cache is cleaned up, mapping for that
+         * ledger should not be available anymore
+         */
+        BufferedLogChannel currentLogForLedger = entryLogManager.getCurrentLogForLedger(ledgerId);
+        assertEquals("LogChannel for ledger " + ledgerId + " should be null", null, currentLogForLedger);
+        // expected number of current active entryLogs is 1 since we created entrylog for 'newLedgerId'
+        Assert.assertEquals("Number of current active EntryLogs ", 1, entryLogManager.getCopyOfCurrentLogs().size());
+        Assert.assertEquals("Number of rotated EntryLogs ", 1, entryLogManager.getRotatedLogChannels().size());
+        Assert.assertTrue("CopyOfRotatedLogChannels should contain the created LogChannel",
+                entryLogManager.getRotatedLogChannels().contains(newLogChannel));
+
+        Assert.assertTrue("since mapentry must have been evicted, it should be null",
+                (entryLogManager.getCacheAsMap().get(ledgerId) == null)
+                        || (entryLogManager.getCacheAsMap().get(ledgerId).getEntryLogWithDirInfo() == null));
+    }
+
+    /*
+     * testing EntryLogger functionality (addEntry/createNewLog/flush) and EntryLogManager with entryLogPerLedger
+     * enabled
+     */
+    @Test
+    public void testEntryLogManagerForEntryLogPerLedger() throws Exception {
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+        conf.setEntryLogPerLedgerEnabled(true);
+        conf.setFlushIntervalInBytes(10000000);
+        conf.setLedgerDirNames(createAndGetLedgerDirs(2));
+        LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
+        EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager);
+        EntryLogManagerBase entryLogManager = (EntryLogManagerBase) entryLogger.getEntryLogManager();
+        Assert.assertEquals("EntryLogManager class type", EntryLogManagerForEntryLogPerLedger.class,
+                entryLogManager.getClass());
+
+        int numOfActiveLedgers = 20;
+        int numEntries = 5;
+
+        for (int j = 0; j < numEntries; j++) {
+            for (long i = 0; i < numOfActiveLedgers; i++) {
+                entryLogger.addEntry(i, generateEntry(i, j));
+            }
+        }
+
+        for (long i = 0; i < numOfActiveLedgers; i++) {
+            BufferedLogChannel logChannel =  entryLogManager.getCurrentLogForLedger(i);
+            Assert.assertTrue("unpersistedBytes should be greater than LOGFILE_HEADER_SIZE",
+                    logChannel.getUnpersistedBytes() > EntryLogger.LOGFILE_HEADER_SIZE);
+        }
+
+        for (long i = 0; i < numOfActiveLedgers; i++) {
+            entryLogManager.createNewLog(i);
+        }
+
+        /*
+         * since we created new entrylog for all the activeLedgers, entrylogs of all the ledgers
+         * should be rotated and hence the size of copyOfRotatedLogChannels should be numOfActiveLedgers
+         */
+        List<BufferedLogChannel> rotatedLogs = entryLogManager.getRotatedLogChannels();
+        Assert.assertEquals("Number of rotated entrylogs", numOfActiveLedgers, rotatedLogs.size());
+
+        /*
+         * Since newlog is created for all slots, so they are moved to rotated logs and hence unpersistedBytes of all
+         * the slots should be just EntryLogger.LOGFILE_HEADER_SIZE
+         *
+         */
+        for (long i = 0; i < numOfActiveLedgers; i++) {
+            BufferedLogChannel logChannel = entryLogManager.getCurrentLogForLedger(i);
+            Assert.assertEquals("unpersistedBytes should be LOGFILE_HEADER_SIZE", EntryLogger.LOGFILE_HEADER_SIZE,
+                    logChannel.getUnpersistedBytes());
+        }
+
+        for (int j = numEntries; j < 2 * numEntries; j++) {
+            for (long i = 0; i < numOfActiveLedgers; i++) {
+                entryLogger.addEntry(i, generateEntry(i, j));
+            }
+        }
+
+        for (long i = 0; i < numOfActiveLedgers; i++) {
+            BufferedLogChannel logChannel =  entryLogManager.getCurrentLogForLedger(i);
+            Assert.assertTrue("unpersistedBytes should be greater than LOGFILE_HEADER_SIZE",
+                    logChannel.getUnpersistedBytes() > EntryLogger.LOGFILE_HEADER_SIZE);
+        }
+
+        Assert.assertEquals("LeastUnflushedloggerID", 0, entryLogger.getLeastUnflushedLogId());
+
+        /*
+         * here flush is called so all the rotatedLogChannels should be file closed and there shouldn't be any
+         * rotatedlogchannel and also leastUnflushedLogId should be advanced to numOfActiveLedgers
+         */
+        entryLogger.flush();
+        Assert.assertEquals("Number of rotated entrylogs", 0, entryLogManager.getRotatedLogChannels().size());
+        Assert.assertEquals("LeastUnflushedloggerID", numOfActiveLedgers, entryLogger.getLeastUnflushedLogId());
+
+        /*
+         * after flush (flushCurrentLogs) unpersistedBytes should be 0.
+         */
+        for (long i = 0; i < numOfActiveLedgers; i++) {
+            BufferedLogChannel logChannel =  entryLogManager.getCurrentLogForLedger(i);
+            Assert.assertEquals("unpersistedBytes should be 0", 0L, logChannel.getUnpersistedBytes());
+        }
+    }
+
+    /*
+     * with entryLogPerLedger enabled, create multiple entrylogs, add entries of ledgers and read them before and after
+     * flush
+     */
+    @Test
+    public void testReadAddCallsOfMultipleEntryLogs() throws Exception {
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+        conf.setEntryLogPerLedgerEnabled(true);
+        conf.setLedgerDirNames(createAndGetLedgerDirs(2));
+        // pre allocation enabled
+        conf.setEntryLogFilePreAllocationEnabled(true);
+        LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
+
+        EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager);
+        EntryLogManagerBase entryLogManagerBase = ((EntryLogManagerBase) entryLogger.getEntryLogManager());
+
+        int numOfActiveLedgers = 10;
+        int numEntries = 10;
+        long[][] positions = new long[numOfActiveLedgers][];
+        for (int i = 0; i < numOfActiveLedgers; i++) {
+            positions[i] = new long[numEntries];
+        }
+
+        /*
+         * addentries to the ledgers
+         */
+        for (int j = 0; j < numEntries; j++) {
+            for (int i = 0; i < numOfActiveLedgers; i++) {
+                positions[i][j] = entryLogger.addEntry((long) i, generateEntry(i, j));
+                long entryLogId = (positions[i][j] >> 32L);
+                /**
+                 *
+                 * Though EntryLogFilePreAllocation is enabled, Since things are not done concurrently here,
+                 * entryLogIds will be sequential.
+                 */
+                Assert.assertEquals("EntryLogId for ledger: " + i, i, entryLogId);
+            }
+        }
+
+        /*
+         * read the entries which are written
+         */
+        for (int j = 0; j < numEntries; j++) {
+            for (int i = 0; i < numOfActiveLedgers; i++) {
+                String expectedValue = "ledger-" + i + "-" + j;
+                ByteBuf buf = entryLogger.readEntry(i, j, positions[i][j]);
+                long ledgerId = buf.readLong();
+                long entryId = buf.readLong();
+                byte[] data = new byte[buf.readableBytes()];
+                buf.readBytes(data);
+                assertEquals("LedgerId ", i, ledgerId);
+                assertEquals("EntryId ", j, entryId);
+                assertEquals("Entry Data ", expectedValue, new String(data));
+            }
+        }
+
+        for (long i = 0; i < numOfActiveLedgers; i++) {
+            entryLogManagerBase.createNewLog(i);
+        }
+
+        entryLogManagerBase.flushRotatedLogs();
+
+        // reading after flush of rotatedlogs
+        for (int j = 0; j < numEntries; j++) {
+            for (int i = 0; i < numOfActiveLedgers; i++) {
+                String expectedValue = "ledger-" + i + "-" + j;
+                ByteBuf buf = entryLogger.readEntry(i, j, positions[i][j]);
+                long ledgerId = buf.readLong();
+                long entryId = buf.readLong();
+                byte[] data = new byte[buf.readableBytes()];
+                buf.readBytes(data);
+                assertEquals("LedgerId ", i, ledgerId);
+                assertEquals("EntryId ", j, entryId);
+                assertEquals("Entry Data ", expectedValue, new String(data));
+            }
+        }
+    }
+
+    class ReadTask implements Callable<Boolean> {
+        long ledgerId;
+        int entryId;
+        long position;
+        EntryLogger entryLogger;
+
+        ReadTask(long ledgerId, int entryId, long position, EntryLogger entryLogger) {
+            this.ledgerId = ledgerId;
+            this.entryId = entryId;
+            this.position = position;
+            this.entryLogger = entryLogger;
+        }
+
+        @Override
+        public Boolean call() throws IOException {
+            try {
+                ByteBuf expectedByteBuf = generateEntry(ledgerId, entryId);
+                ByteBuf actualByteBuf = entryLogger.readEntry(ledgerId, entryId, position);
+                if (!expectedByteBuf.equals(actualByteBuf)) {
+                    LOG.error("Expected Entry: {} Actual Entry: {}", expectedByteBuf.toString(Charset.defaultCharset()),
+                            actualByteBuf.toString(Charset.defaultCharset()));
+                    throw new IOException("Expected Entry: " + expectedByteBuf.toString(Charset.defaultCharset())
+                            + " Actual Entry: " + actualByteBuf.toString(Charset.defaultCharset()));
+                }
+            } catch (IOException e) {
+                LOG.error("Got Exception for GetEntry call. LedgerId: " + ledgerId + " entryId: " + entryId, e);
+                throw new IOException("Got Exception for GetEntry call. LedgerId: " + ledgerId + " entryId: " + entryId,
+                        e);
+            }
+            return true;
+        }
+    }
+
+    /*
+     * test concurrent read operations of entries from flushed rotatedlogs with entryLogPerLedgerEnabled
+     */
+    @Test
+    public void testConcurrentReadCallsAfterEntryLogsAreRotated() throws Exception {
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+        conf.setEntryLogPerLedgerEnabled(true);
+        conf.setFlushIntervalInBytes(1000 * 25);
+        conf.setLedgerDirNames(createAndGetLedgerDirs(3));
+        LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
+
+        EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager);
+        int numOfActiveLedgers = 15;
+        int numEntries = 2000;
+        final AtomicLongArray positions = new AtomicLongArray(numOfActiveLedgers * numEntries);
+        EntryLogManagerForEntryLogPerLedger entryLogManager = (EntryLogManagerForEntryLogPerLedger)
+                entryLogger.getEntryLogManager();
+
+        for (int i = 0; i < numOfActiveLedgers; i++) {
+            for (int j = 0; j < numEntries; j++) {
+                positions.set(i * numEntries + j, entryLogger.addEntry((long) i, generateEntry(i, j)));
+                long entryLogId = (positions.get(i * numEntries + j) >> 32L);
+                /**
+                 *
+                 * Though EntryLogFilePreAllocation is enabled, Since things are not done concurrently here, entryLogIds
+                 * will be sequential.
+                 */
+                Assert.assertEquals("EntryLogId for ledger: " + i, i, entryLogId);
+            }
+        }
+
+        for (long i = 0; i < numOfActiveLedgers; i++) {
+            entryLogManager.createNewLog(i);
+        }
+        entryLogManager.flushRotatedLogs();
+
+        // reading after flush of rotatedlogs
+        ArrayList<ReadTask> readTasks = new ArrayList<ReadTask>();
+        for (int i = 0; i < numOfActiveLedgers; i++) {
+            for (int j = 0; j < numEntries; j++) {
+                readTasks.add(new ReadTask(i, j, positions.get(i * numEntries + j), entryLogger));
+            }
+        }
+
+        ExecutorService executor = Executors.newFixedThreadPool(40);
+        executor.invokeAll(readTasks).forEach((future) -> {
+            try {
+                future.get();
+            } catch (InterruptedException ie) {
+                Thread.currentThread().interrupt();
+                LOG.error("Read/Flush task failed because of InterruptedException", ie);
+                Assert.fail("Read/Flush task interrupted");
+            } catch (Exception ex) {
+                LOG.error("Read/Flush task failed because of  exception", ex);
+                Assert.fail("Read/Flush task failed " + ex.getMessage());
+            }
+        });
+    }
+
+    /**
+     * testcase to validate when ledgerdirs become full and eventually all
+     * ledgerdirs become full. Later a ledgerdir becomes writable.
+     */
+    @Test
+    public void testEntryLoggerAddEntryWhenLedgerDirsAreFull() throws Exception {
+        int numberOfLedgerDirs = 3;
+        List<File> ledgerDirs = new ArrayList<File>();
+        String[] ledgerDirsPath = new String[numberOfLedgerDirs];
+        List<File> curDirs = new ArrayList<File>();
+
+        File ledgerDir;
+        File curDir;
+        for (int i = 0; i < numberOfLedgerDirs; i++) {
+            ledgerDir = createTempDir("bkTest", ".dir").getAbsoluteFile();
+            curDir = Bookie.getCurrentDirectory(ledgerDir);
+            Bookie.checkDirectoryStructure(curDir);
+            ledgerDirs.add(ledgerDir);
+            ledgerDirsPath[i] = ledgerDir.getPath();
+            curDirs.add(curDir);
+        }
+
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+        // pre-allocation is disabled
+        conf.setEntryLogFilePreAllocationEnabled(false);
+        conf.setEntryLogPerLedgerEnabled(true);
+        conf.setLedgerDirNames(ledgerDirsPath);
+
+        LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
+
+        EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager);
+        EntryLogManagerForEntryLogPerLedger entryLogManager = (EntryLogManagerForEntryLogPerLedger)
+                entryLogger.getEntryLogManager();
+        Assert.assertEquals("EntryLogManager class type", EntryLogManagerForEntryLogPerLedger.class,
+                entryLogManager.getClass());
+
+        entryLogger.addEntry(0L, generateEntry(0, 1));
+        entryLogger.addEntry(1L, generateEntry(1, 1));
+        entryLogger.addEntry(2L, generateEntry(2, 1));
+
+        File ledgerDirForLedger0 = entryLogManager.getCurrentLogForLedger(0L).getLogFile().getParentFile();
+        File ledgerDirForLedger1 = entryLogManager.getCurrentLogForLedger(1L).getLogFile().getParentFile();
+        File ledgerDirForLedger2 = entryLogManager.getCurrentLogForLedger(2L).getLogFile().getParentFile();
+
+        Set<File> ledgerDirsSet = new HashSet<File>();
+        ledgerDirsSet.add(ledgerDirForLedger0);
+        ledgerDirsSet.add(ledgerDirForLedger1);
+        ledgerDirsSet.add(ledgerDirForLedger2);
+
+        /*
+         * since there are 3 ledgerdirs, entrylogs for all the 3 ledgers should be in different ledgerdirs.
+         */
+        Assert.assertEquals("Current active LedgerDirs size", 3, ledgerDirs.size());
+        Assert.assertEquals("Number of rotated logchannels", 0, entryLogManager.getRotatedLogChannels().size());
+
+        /*
+         * ledgerDirForLedger0 is added to filledDirs, for ledger0 new entrylog should not be created in
+         * ledgerDirForLedger0
+         */
+        ledgerDirsManager.addToFilledDirs(ledgerDirForLedger0);
+        addEntryAndValidateFolders(entryLogger, entryLogManager, 2, ledgerDirForLedger0, false, ledgerDirForLedger1,
+                ledgerDirForLedger2);
+        Assert.assertEquals("Number of rotated logchannels", 1, entryLogManager.getRotatedLogChannels().size());
+
+        /*
+         * ledgerDirForLedger1 is also added to filledDirs, so for all the ledgers new entryLogs should be in
+         * ledgerDirForLedger2
+         */
+        ledgerDirsManager.addToFilledDirs(ledgerDirForLedger1);
+        addEntryAndValidateFolders(entryLogger, entryLogManager, 3, ledgerDirForLedger2, true, ledgerDirForLedger2,
+                ledgerDirForLedger2);
+        Assert.assertTrue("Number of rotated logchannels", (2 <= entryLogManager.getRotatedLogChannels().size())
+                && (entryLogManager.getRotatedLogChannels().size() <= 3));
+        int numOfRotatedLogChannels = entryLogManager.getRotatedLogChannels().size();
+
+        /*
+         * since ledgerDirForLedger2 is added to filleddirs, all the dirs are full. If all the dirs are full then it
+         * will continue to use current entrylogs for new entries instead of creating new one. So for all the ledgers
+         * ledgerdirs should be same as before - ledgerDirForLedger2
+         */
+        ledgerDirsManager.addToFilledDirs(ledgerDirForLedger2);
+        addEntryAndValidateFolders(entryLogger, entryLogManager, 4, ledgerDirForLedger2, true, ledgerDirForLedger2,
+                ledgerDirForLedger2);
+        Assert.assertEquals("Number of rotated logchannels", numOfRotatedLogChannels,
+                entryLogManager.getRotatedLogChannels().size());
+
+        /*
+         *  ledgerDirForLedger1 is added back to writableDirs, so new entrylog for all the ledgers should be created in
+         *  ledgerDirForLedger1
+         */
+        ledgerDirsManager.addToWritableDirs(ledgerDirForLedger1, true);
+        addEntryAndValidateFolders(entryLogger, entryLogManager, 4, ledgerDirForLedger1, true, ledgerDirForLedger1,
+                ledgerDirForLedger1);
+        Assert.assertEquals("Number of rotated logchannels", numOfRotatedLogChannels + 3,
+                entryLogManager.getRotatedLogChannels().size());
+    }
+
+    /*
+     * in this method we add an entry and validate the ledgerdir of the
+     * currentLogForLedger against the provided expected ledgerDirs.
+     */
+    void addEntryAndValidateFolders(EntryLogger entryLogger, EntryLogManagerBase entryLogManager, int entryId,
+            File expectedDirForLedger0, boolean equalsForLedger0, File expectedDirForLedger1,
+            File expectedDirForLedger2) throws IOException {
+        entryLogger.addEntry(0L, generateEntry(0, entryId));
+        entryLogger.addEntry(1L, generateEntry(1, entryId));
+        entryLogger.addEntry(2L, generateEntry(2, entryId));
+
+        if (equalsForLedger0) {
+            Assert.assertEquals("LedgerDir for ledger 0 after adding entry " + entryId, expectedDirForLedger0,
+                    entryLogManager.getCurrentLogForLedger(0L).getLogFile().getParentFile());
+        } else {
+            Assert.assertNotEquals("LedgerDir for ledger 0 after adding entry " + entryId, expectedDirForLedger0,
+                    entryLogManager.getCurrentLogForLedger(0L).getLogFile().getParentFile());
+        }
+        Assert.assertEquals("LedgerDir for ledger 1 after adding entry " + entryId, expectedDirForLedger1,
+                entryLogManager.getCurrentLogForLedger(1L).getLogFile().getParentFile());
+        Assert.assertEquals("LedgerDir for ledger 2 after adding entry " + entryId, expectedDirForLedger2,
+                entryLogManager.getCurrentLogForLedger(2L).getLogFile().getParentFile());
+    }
+
+    /*
+     * entries added using entrylogger with entryLogPerLedger enabled and the same entries are read using entrylogger
+     * with entryLogPerLedger disabled
+     */
+    @Test
+    public void testSwappingEntryLogManagerFromEntryLogPerLedgerToSingle() throws Exception {
+        testSwappingEntryLogManager(true, false);
+    }
+
+    /*
+     * entries added using entrylogger with entryLogPerLedger disabled and the same entries are read using entrylogger
+     * with entryLogPerLedger enabled
+     */
+    @Test
+    public void testSwappingEntryLogManagerFromSingleToEntryLogPerLedger() throws Exception {
+        testSwappingEntryLogManager(false, true);
+    }
+
+    public void testSwappingEntryLogManager(boolean initialEntryLogPerLedgerEnabled,
+            boolean laterEntryLogPerLedgerEnabled) throws Exception {
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+        conf.setEntryLogPerLedgerEnabled(initialEntryLogPerLedgerEnabled);
+        conf.setLedgerDirNames(createAndGetLedgerDirs(2));
+        // pre allocation enabled
+        conf.setEntryLogFilePreAllocationEnabled(true);
+        LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
+
+        EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager);
+        EntryLogManagerBase entryLogManager = (EntryLogManagerBase) entryLogger.getEntryLogManager();
+        Assert.assertEquals(
+                "EntryLogManager class type", initialEntryLogPerLedgerEnabled
+                        ? EntryLogManagerForEntryLogPerLedger.class : EntryLogManagerForSingleEntryLog.class,
+                entryLogManager.getClass());
+
+        int numOfActiveLedgers = 10;
+        int numEntries = 10;
+        long[][] positions = new long[numOfActiveLedgers][];
+        for (int i = 0; i < numOfActiveLedgers; i++) {
+            positions[i] = new long[numEntries];
+        }
+
+        /*
+         * addentries to the ledgers
+         */
+        for (int j = 0; j < numEntries; j++) {
+            for (int i = 0; i < numOfActiveLedgers; i++) {
+                positions[i][j] = entryLogger.addEntry((long) i, generateEntry(i, j));
+                long entryLogId = (positions[i][j] >> 32L);
+                if (initialEntryLogPerLedgerEnabled) {
+                    Assert.assertEquals("EntryLogId for ledger: " + i, i, entryLogId);
+                } else {
+                    Assert.assertEquals("EntryLogId for ledger: " + i, 0, entryLogId);
+                }
+            }
+        }
+
+        for (long i = 0; i < numOfActiveLedgers; i++) {
+            entryLogManager.createNewLog(i);
+        }
+
+        /**
+         * since new entrylog is created for all the ledgers, the previous
+         * entrylogs must be rotated and with the following flushRotatedLogs
+         * call they should be forcewritten and file should be closed.
+         */
+        entryLogManager.flushRotatedLogs();
+
+        /*
+         * new entrylogger and entryLogManager are created with
+         * 'laterEntryLogPerLedgerEnabled' conf
+         */
+        conf.setEntryLogPerLedgerEnabled(laterEntryLogPerLedgerEnabled);
+        LedgerDirsManager newLedgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
+        EntryLogger newEntryLogger = new EntryLogger(conf, newLedgerDirsManager);
+        EntryLogManager newEntryLogManager = newEntryLogger.getEntryLogManager();
+        Assert.assertEquals("EntryLogManager class type",
+                laterEntryLogPerLedgerEnabled ? EntryLogManagerForEntryLogPerLedger.class
+                        : EntryLogManagerForSingleEntryLog.class,
+                newEntryLogManager.getClass());
+
+        /*
+         * read the entries (which are written with previous entrylogger) with
+         * new entrylogger
+         */
+        for (int j = 0; j < numEntries; j++) {
+            for (int i = 0; i < numOfActiveLedgers; i++) {
+                String expectedValue = "ledger-" + i + "-" + j;
+                ByteBuf buf = newEntryLogger.readEntry(i, j, positions[i][j]);
+                long ledgerId = buf.readLong();
+                long entryId = buf.readLong();
+                byte[] data = new byte[buf.readableBytes()];
+                buf.readBytes(data);
+                assertEquals("LedgerId ", i, ledgerId);
+                assertEquals("EntryId ", j, entryId);
+                assertEquals("Entry Data ", expectedValue, new String(data));
+            }
+        }
+    }
+
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageCheckpointTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageCheckpointTest.java
index 604099cc3..921d31018 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageCheckpointTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageCheckpointTest.java
@@ -28,9 +28,7 @@
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.time.Duration;
-import java.util.Arrays;
 import java.util.Enumeration;
-import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Random;
@@ -42,6 +40,7 @@
 import java.util.stream.IntStream;
 import java.util.stream.LongStream;
 
+import org.apache.bookkeeper.bookie.EntryLogManagerForEntryLogPerLedger.BufferedLogChannelWithDirInfo;
 import org.apache.bookkeeper.bookie.EntryLogger.BufferedLogChannel;
 import org.apache.bookkeeper.bookie.Journal.LastLogMark;
 import org.apache.bookkeeper.client.BKException;
@@ -494,44 +493,33 @@ public void testIfEntryLogPerLedgerEnabledCheckpointFlushesAllLogs() throws Exce
         BookKeeper bkClient = new BookKeeper(clientConf);
         InterleavedLedgerStorage ledgerStorage = (InterleavedLedgerStorage) server.getBookie().ledgerStorage;
         EntryLogger entryLogger = ledgerStorage.entryLogger;
-        EntryLogManagerBase entryLogManagerBase = (EntryLogManagerBase) entryLogger.getEntryLogManager();
+        EntryLogManagerForEntryLogPerLedger entryLogManager = (EntryLogManagerForEntryLogPerLedger) entryLogger
+                .getEntryLogManager();
 
+        Random rand = new Random();
         int numOfEntries = 5;
         byte[] dataBytes = "data".getBytes();
 
-        long ledgerId = 10;
-        LedgerHandle handle = bkClient.createLedgerAdv(ledgerId, 1, 1, 1, DigestType.CRC32, "passwd".getBytes(), null);
-        for (int j = 0; j < numOfEntries; j++) {
-            handle.addEntry(j, dataBytes);
-        }
-        handle.close();
-        // simulate rolling entrylog
-        entryLogManagerBase.createNewLog(ledgerId);
-
-        ledgerId = 20;
-        handle = bkClient.createLedgerAdv(ledgerId, 1, 1, 1, DigestType.CRC32, "passwd".getBytes(), null);
-        for (int j = 0; j < numOfEntries; j++) {
-            handle.addEntry(j, dataBytes);
-        }
-        handle.close();
-        // simulate rolling entrylog
-        entryLogManagerBase.createNewLog(ledgerId);
-
-        ledgerId = 30;
-        handle = bkClient.createLedgerAdv(ledgerId, 1, 1, 1, DigestType.CRC32, "passwd".getBytes(), null);
-        for (int j = 0; j < numOfEntries; j++) {
-            handle.addEntry(j, dataBytes);
+        int numOfLedgers = 3;
+        long[] ledgerIds = new long[numOfLedgers];
+        LedgerHandle handle;
+        for (int i = 0; i < numOfLedgers; i++) {
+            ledgerIds[i] = rand.nextInt(100000) + 1;
+            handle = bkClient.createLedgerAdv(ledgerIds[i], 1, 1, 1, DigestType.CRC32, "passwd".getBytes(), null);
+            for (int j = 0; j < numOfEntries; j++) {
+                handle.addEntry(j, dataBytes);
+            }
+            // simulate rolling entrylog
+            entryLogManager.createNewLog(ledgerIds[i]);
         }
-        handle.close();
 
-        Set<BufferedLogChannel> copyOfCurrentLogs = new HashSet<BufferedLogChannel>(
-                Arrays.asList(entryLogManagerBase.getCurrentLogForLedger(EntryLogger.UNASSIGNED_LEDGERID)));
-        for (BufferedLogChannel currentLog : copyOfCurrentLogs) {
+        Set<BufferedLogChannelWithDirInfo> copyOfCurrentLogsWithDirInfo = entryLogManager.getCopyOfCurrentLogs();
+        for (BufferedLogChannelWithDirInfo currentLogWithDirInfo : copyOfCurrentLogsWithDirInfo) {
             Assert.assertNotEquals("bytesWrittenSinceLastFlush shouldn't be zero", 0,
-                    currentLog.getUnpersistedBytes());
+                    currentLogWithDirInfo.getLogChannel().getUnpersistedBytes());
         }
         Assert.assertNotEquals("There should be logChannelsToFlush", 0,
-                entryLogManagerBase.getRotatedLogChannels().size());
+                entryLogManager.getRotatedLogChannels().size());
 
         /*
          * wait for atleast flushInterval period, so that checkpoint can happen.
@@ -542,15 +530,14 @@ public void testIfEntryLogPerLedgerEnabledCheckpointFlushesAllLogs() throws Exce
          * since checkpoint happenend, there shouldn't be any logChannelsToFlush
          * and bytesWrittenSinceLastFlush should be zero.
          */
-        List<BufferedLogChannel> copyOfRotatedLogChannels = entryLogManagerBase.getRotatedLogChannels();
+        List<BufferedLogChannel> copyOfRotatedLogChannels = entryLogManager.getRotatedLogChannels();
         Assert.assertTrue("There shouldn't be logChannelsToFlush",
                 ((copyOfRotatedLogChannels == null) || (copyOfRotatedLogChannels.size() == 0)));
 
-        copyOfCurrentLogs = new HashSet<BufferedLogChannel>(
-                Arrays.asList(entryLogManagerBase.getCurrentLogForLedger(EntryLogger.UNASSIGNED_LEDGERID)));
-        for (BufferedLogChannel currentLog : copyOfCurrentLogs) {
+        copyOfCurrentLogsWithDirInfo = entryLogManager.getCopyOfCurrentLogs();
+        for (BufferedLogChannelWithDirInfo currentLogWithDirInfo : copyOfCurrentLogsWithDirInfo) {
             Assert.assertEquals("bytesWrittenSinceLastFlush should be zero", 0,
-                    currentLog.getUnpersistedBytes());
+                    currentLogWithDirInfo.getLogChannel().getUnpersistedBytes());
         }
     }
 
diff --git a/conf/bk_server.conf b/conf/bk_server.conf
index ad73a01ef..23cd06405 100755
--- a/conf/bk_server.conf
+++ b/conf/bk_server.conf
@@ -21,10 +21,10 @@
 # - `Bookie Server`     : bookie server generic settings, including network related settings.
 # - `Bookie Storage`    : bookie storage related settings, such as journal, entrylogger, gc and ledger storages.
 # - `Security`          : security related settings
-# - `Metadata Services` : metadata service related settings 
+# - `Metadata Services` : metadata service related settings
 # - `Stats Providers`   : stats providers related settings
 # - `Auto Recovery`     : auto recovery related settings
-# 
+#
 
 ############################################## Bookie Server ##############################################
 
@@ -359,7 +359,7 @@ ledgerDirectories=/tmp/bk-data
 # This parameter allows creating entry log files when there are enough disk spaces, even when
 # the bookie is running at readonly mode because of the disk usage is exceeding `diskUsageThreshold`.
 # Because compaction, journal replays can still write data to disks when a bookie is readonly.
-# 
+#
 # Default value is 1.2 * `logSizeLimit`.
 #
 # minUsableSizeForEntryLogCreation=
@@ -374,8 +374,8 @@ ledgerDirectories=/tmp/bk-data
 # When entryLogPerLedgerEnabled is enabled, checkpoint doesn't happens
 # when a new active entrylog is created / previous one is rolled over.
 # Instead SyncThread checkpoints periodically with 'flushInterval' delay
-# (in milliseconds) in between executions. Checkpoint flushes both ledger 
-# entryLogs and ledger index pages to disk. 
+# (in milliseconds) in between executions. Checkpoint flushes both ledger
+# entryLogs and ledger index pages to disk.
 # Flushing entrylog and index files will introduce much random disk I/O.
 # If separating journal dir and ledger dirs each on different devices,
 # flushing would not affect performance. But if putting journal dir
@@ -414,15 +414,22 @@ ledgerDirectories=/tmp/bk-data
 # The number of bytes used as capacity for the write buffer. Default is 64KB.
 # writeBufferSizeBytes=65536
 
-# Specifies if entryLog per ledger is enabled/disabled. If it is enabled, then there would be a 
-# active entrylog for each ledger. It would be ideal to enable this feature if the underlying 
-# storage device has multiple DiskPartitions or SSD and if in a given moment, entries of fewer 
+# Specifies if entryLog per ledger is enabled/disabled. If it is enabled, then there would be a
+# active entrylog for each ledger. It would be ideal to enable this feature if the underlying
+# storage device has multiple DiskPartitions or SSD and if in a given moment, entries of fewer
 # number of active ledgers are written to a bookie.
 # entryLogPerLedgerEnabled=false
 
 # In the case of multipleentrylogs, multiple threads can be used to flush the memtable
 # numOfMemtableFlushThreads=8
 
+# in entryLogPerLedger feature, the time duration used for lastaccess eviction policy for cache
+# entrylogMapAccessExpiryTimeInSeconds=300
+
+# in entryLogPerLedger feature, this specifies the maximum number of entrylogs that can be
+# active at a given point in time
+# maximumNumberOfActiveEntryLogs=500
+
 #############################################################################
 ## Entry log compaction settings
 #############################################################################
@@ -719,7 +726,7 @@ zkEnableSecurity=false
 #   - Twitter Science   : org.apache.bookkeeper.stats.twitter.science.TwitterStatsProvider
 # Default value is:
 #   org.apache.bookkeeper.stats.prometheus.PrometheusMetricsProvider
-# 
+#
 # For configuring corresponding stats provider, see details at each section below.
 #
 # statsProviderClass=org.apache.bookkeeper.stats.prometheus.PrometheusMetricsProvider
@@ -818,7 +825,7 @@ zkEnableSecurity=false
 # the following settings take effects when `autoRecoveryDaemonEnabled` is true.
 
 # The ensemble placement policy used for re-replicating entries.
-# 
+#
 # Options:
 #   - org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy
 #   - org.apache.bookkeeper.client.RegionAwareEnsemblePlacementPolicy
diff --git a/site/_data/config/bk_server.yaml b/site/_data/config/bk_server.yaml
index 481d82826..ef2804735 100644
--- a/site/_data/config/bk_server.yaml
+++ b/site/_data/config/bk_server.yaml
@@ -308,6 +308,12 @@ groups:
   - param: entryLogPerLedgerEnabled
     description: Specifies if entryLog per ledger is enabled/disabled. If it is enabled, then there would be a active entrylog for each ledger. It would be ideal to enable this feature if the underlying storage device has multiple DiskPartitions or SSD and if in a given moment, entries of fewer number of active ledgers are written to the bookie.
     default: false
+  - param: entrylogMapAccessExpiryTimeInSeconds
+    description: config specifying if the entrylog per ledger is enabled, then the amount of time EntryLogManagerForEntryLogPerLedger should wait for closing the entrylog file after the last addEntry call for that ledger, if explicit writeclose for that ledger is not received.
+    default: 300
+  - param: maximumNumberOfActiveEntryLogs
+    description: in entryLogPerLedger feature, this specifies the maximum number of entrylogs that can be active at a given point in time. If there are more number of active entryLogs then the maximumNumberOfActiveEntryLogs then the entrylog will be evicted from the cache.
+    default: 500
 
 - name: Entry log compaction settings
   params:


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services