You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/06/05 21:45:40 UTC

[bookkeeper] branch master updated: Metrics for EntryLogManagerForEntryLogPerLedger.

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new bfa4f9d  Metrics for EntryLogManagerForEntryLogPerLedger.
bfa4f9d is described below

commit bfa4f9d5f10897ea854865d260248af9de6e10be
Author: cguttapalem <cg...@salesforce.com>
AuthorDate: Tue Jun 5 14:45:33 2018 -0700

    Metrics for EntryLogManagerForEntryLogPerLedger.
    
    Descriptions of the changes in this PR:
    
    Add counters/statsloggers for EntryLogManagerForEntryLogPerLedger.
    
    Author: cguttapalem <cg...@salesforce.com>
    
    Reviewers: Enrico Olivelli <eo...@gmail.com>, Sijie Guo <si...@apache.org>
    
    This closes #1460 from reddycharan/elmmetrics
---
 .../src/main/resources/LICENSE-all.bin.txt         |   4 +-
 .../src/main/resources/LICENSE-server.bin.txt      |   4 +-
 .../bookkeeper/bookie/BookKeeperServerStats.java   |   8 +
 .../EntryLogManagerForEntryLogPerLedger.java       | 153 +++++++++++++++--
 .../org/apache/bookkeeper/bookie/EntryLogger.java  |  12 +-
 .../bookie/InterleavedLedgerStorage.java           |   3 +-
 .../ldb/SingleDirectoryDbLedgerStorage.java        |   2 +-
 .../bookkeeper/conf/ServerConfiguration.java       |  26 +++
 .../apache/bookkeeper/bookie/CreateNewLogTest.java | 191 +++++++++++++++++++++
 .../bookie/SlowInterleavedLedgerStorage.java       |   8 +-
 .../apache/bookkeeper/test/TestStatsProvider.java  |   4 +
 conf/bk_server.conf                                |   4 +
 pom.xml                                            |   2 +-
 site/_data/config/bk_server.yaml                   |   3 +
 .../cluster/BookKeeperClusterTestBase.java         |   9 +-
 .../integration/stream/StreamClusterTestBase.java  |  10 +-
 16 files changed, 415 insertions(+), 28 deletions(-)

diff --git a/bookkeeper-dist/src/main/resources/LICENSE-all.bin.txt b/bookkeeper-dist/src/main/resources/LICENSE-all.bin.txt
index 98159e7..e0c19cc 100644
--- a/bookkeeper-dist/src/main/resources/LICENSE-all.bin.txt
+++ b/bookkeeper-dist/src/main/resources/LICENSE-all.bin.txt
@@ -261,7 +261,7 @@ Apache Software License, Version 2.
 - lib/log4j-log4j-1.2.17.jar [29]
 - lib/net.java.dev.jna-jna-3.2.7.jar [30]
 - lib/org.apache.commons-commons-collections4-4.1.jar [31]
-- lib/org.apache.commons-commons-lang3-3.3.2.jar [32]
+- lib/org.apache.commons-commons-lang3-3.6.jar [32]
 - lib/org.apache.zookeeper-zookeeper-3.5.3-beta.jar [33]
 - lib/org.eclipse.jetty-jetty-http-9.4.5.v20170502.jar [34]
 - lib/org.eclipse.jetty-jetty-io-9.4.5.v20170502.jar [34]
@@ -403,7 +403,7 @@ Base64 Encoder and Decoder, which can be obtained at:
   * HOMEPAGE:
     * http://iharder.sourceforge.net/current/java/base64/
 
-lib/io.netty-netty-all-4.1.22.Final.jar contains a modified portion of 'Webbit', an event based  
+lib/io.netty-netty-all-4.1.22.Final.jar contains a modified portion of 'Webbit', an event based
 WebSocket and HTTP server, which can be obtained at:
 
   * LICENSE:
diff --git a/bookkeeper-dist/src/main/resources/LICENSE-server.bin.txt b/bookkeeper-dist/src/main/resources/LICENSE-server.bin.txt
index e970577..22abe3a 100644
--- a/bookkeeper-dist/src/main/resources/LICENSE-server.bin.txt
+++ b/bookkeeper-dist/src/main/resources/LICENSE-server.bin.txt
@@ -226,7 +226,7 @@ Apache Software License, Version 2.
 - lib/log4j-log4j-1.2.17.jar [16]
 - lib/net.java.dev.jna-jna-3.2.7.jar [17]
 - lib/org.apache.commons-commons-collections4-4.1.jar [18]
-- lib/org.apache.commons-commons-lang3-3.3.2.jar [19]
+- lib/org.apache.commons-commons-lang3-3.6.jar [19]
 - lib/org.apache.zookeeper-zookeeper-3.5.3-beta.jar [20]
 - lib/org.eclipse.jetty-jetty-http-9.4.5.v20170502.jar [21]
 - lib/org.eclipse.jetty-jetty-io-9.4.5.v20170502.jar [21]
@@ -321,7 +321,7 @@ Base64 Encoder and Decoder, which can be obtained at:
   * HOMEPAGE:
     * http://iharder.sourceforge.net/current/java/base64/
 
-lib/io.netty-netty-all-4.1.22.Final.jar contains a modified portion of 'Webbit', an event based  
+lib/io.netty-netty-all-4.1.22.Final.jar contains a modified portion of 'Webbit', an event based
 WebSocket and HTTP server, which can be obtained at:
 
   * LICENSE:
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
index 19db7e9..9afd8a5 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
@@ -164,4 +164,12 @@ public interface BookKeeperServerStats {
     String LD_LEDGER_SCOPE = "ledger";
     String LD_INDEX_SCOPE = "index";
     String LD_WRITABLE_DIRS = "writable_dirs";
+
+    // EntryLogManagerForEntryLogPerLedger Stats
+    String ENTRYLOGGER_SCOPE = "entrylogger";
+    String NUM_OF_WRITE_ACTIVE_LEDGERS = "NUM_OF_WRITE_ACTIVE_LEDGERS";
+    String NUM_OF_WRITE_LEDGERS_REMOVED_CACHE_EXPIRY = "NUM_OF_WRITE_LEDGERS_REMOVED_CACHE_EXPIRY";
+    String NUM_OF_WRITE_LEDGERS_REMOVED_CACHE_MAXSIZE = "NUM_OF_WRITE_LEDGERS_REMOVED_CACHE_MAXSIZE";
+    String NUM_LEDGERS_HAVING_MULTIPLE_ENTRYLOGS = "NUM_LEDGERS_HAVING_MULTIPLE_ENTRYLOGS";
+    String ENTRYLOGS_PER_LEDGER = "ENTRYLOGS_PER_LEDGER";
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForEntryLogPerLedger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForEntryLogPerLedger.java
index c1ec237..41e66d9 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForEntryLogPerLedger.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerForEntryLogPerLedger.java
@@ -21,13 +21,22 @@
 
 package org.apache.bookkeeper.bookie;
 
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ENTRYLOGS_PER_LEDGER;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.NUM_LEDGERS_HAVING_MULTIPLE_ENTRYLOGS;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.NUM_OF_WRITE_ACTIVE_LEDGERS;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.NUM_OF_WRITE_LEDGERS_REMOVED_CACHE_EXPIRY;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.NUM_OF_WRITE_LEDGERS_REMOVED_CACHE_MAXSIZE;
+
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalCause;
 import com.google.common.cache.RemovalListener;
 import com.google.common.cache.RemovalNotification;
+
 import io.netty.buffer.ByteBuf;
+
 import java.io.File;
 import java.io.IOException;
 import java.util.HashMap;
@@ -42,12 +51,17 @@ import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+
 import lombok.extern.slf4j.Slf4j;
+
 import org.apache.bookkeeper.bookie.EntryLogger.BufferedLogChannel;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
 import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.util.collections.ConcurrentLongHashMap;
-import org.apache.commons.lang.mutable.MutableInt;
+import org.apache.commons.lang3.mutable.MutableInt;
 
 @Slf4j
 class EntryLogManagerForEntryLogPerLedger extends EntryLogManagerBase {
@@ -56,19 +70,19 @@ class EntryLogManagerForEntryLogPerLedger extends EntryLogManagerBase {
         private final BufferedLogChannel logChannel;
         volatile boolean ledgerDirFull = false;
 
-        BufferedLogChannelWithDirInfo(BufferedLogChannel logChannel) {
+        private BufferedLogChannelWithDirInfo(BufferedLogChannel logChannel) {
             this.logChannel = logChannel;
         }
 
-        public boolean isLedgerDirFull() {
+        private boolean isLedgerDirFull() {
             return ledgerDirFull;
         }
 
-        public void setLedgerDirFull(boolean ledgerDirFull) {
+        private void setLedgerDirFull(boolean ledgerDirFull) {
             this.ledgerDirFull = ledgerDirFull;
         }
 
-        public BufferedLogChannel getLogChannel() {
+        BufferedLogChannel getLogChannel() {
             return logChannel;
         }
     }
@@ -77,23 +91,127 @@ class EntryLogManagerForEntryLogPerLedger extends EntryLogManagerBase {
         private final Lock ledgerLock;
         private BufferedLogChannelWithDirInfo entryLogWithDirInfo;
 
-        public EntryLogAndLockTuple() {
+        private EntryLogAndLockTuple() {
             ledgerLock = new ReentrantLock();
         }
 
-        public Lock getLedgerLock() {
+        private Lock getLedgerLock() {
             return ledgerLock;
         }
 
-        public BufferedLogChannelWithDirInfo getEntryLogWithDirInfo() {
+        BufferedLogChannelWithDirInfo getEntryLogWithDirInfo() {
             return entryLogWithDirInfo;
         }
 
-        public void setEntryLogWithDirInfo(BufferedLogChannelWithDirInfo entryLogWithDirInfo) {
+        private void setEntryLogWithDirInfo(BufferedLogChannelWithDirInfo entryLogWithDirInfo) {
             this.entryLogWithDirInfo = entryLogWithDirInfo;
         }
     }
 
+    class EntryLogsPerLedgerCounter {
+        private final Counter numOfWriteActiveLedgers;
+        private final Counter numOfWriteLedgersRemovedCacheExpiry;
+        private final Counter numOfWriteLedgersRemovedCacheMaxSize;
+        private final Counter numLedgersHavingMultipleEntrylogs;
+        private final OpStatsLogger entryLogsPerLedger;
+        /*
+         * ledgerIdEntryLogCounterCacheMap cache will be used to store count of
+         * entrylogs as value for its ledgerid key. This cacheMap limits -
+         * 'expiry duration' and 'maximumSize' will be set to
+         * entryLogPerLedgerCounterLimitsMultFactor times of
+         * 'ledgerIdEntryLogMap' cache limits. This is needed because entries
+         * from 'ledgerIdEntryLogMap' can be removed from cache becasue of
+         * accesstime expiry or cache size limits, but to know the actual number
+         * of entrylogs per ledger, we should maintain this count for long time.
+         */
+        private final LoadingCache<Long, MutableInt> ledgerIdEntryLogCounterCacheMap;
+
+        EntryLogsPerLedgerCounter(StatsLogger statsLogger) {
+            this.numOfWriteActiveLedgers = statsLogger.getCounter(NUM_OF_WRITE_ACTIVE_LEDGERS);
+            this.numOfWriteLedgersRemovedCacheExpiry = statsLogger
+                    .getCounter(NUM_OF_WRITE_LEDGERS_REMOVED_CACHE_EXPIRY);
+            this.numOfWriteLedgersRemovedCacheMaxSize = statsLogger
+                    .getCounter(NUM_OF_WRITE_LEDGERS_REMOVED_CACHE_MAXSIZE);
+            this.numLedgersHavingMultipleEntrylogs = statsLogger.getCounter(NUM_LEDGERS_HAVING_MULTIPLE_ENTRYLOGS);
+            this.entryLogsPerLedger = statsLogger.getOpStatsLogger(ENTRYLOGS_PER_LEDGER);
+
+            ledgerIdEntryLogCounterCacheMap = CacheBuilder.newBuilder()
+                    .expireAfterAccess(entrylogMapAccessExpiryTimeInSeconds * entryLogPerLedgerCounterLimitsMultFactor,
+                            TimeUnit.SECONDS)
+                    .maximumSize(maximumNumberOfActiveEntryLogs * entryLogPerLedgerCounterLimitsMultFactor)
+                    .removalListener(new RemovalListener<Long, MutableInt>() {
+                        @Override
+                        public void onRemoval(RemovalNotification<Long, MutableInt> removedEntryFromCounterMap) {
+                            if ((removedEntryFromCounterMap != null)
+                                    && (removedEntryFromCounterMap.getValue() != null)) {
+                                synchronized (EntryLogsPerLedgerCounter.this) {
+                                    entryLogsPerLedger
+                                            .registerSuccessfulValue(removedEntryFromCounterMap.getValue().intValue());
+                                }
+                            }
+                        }
+                    }).build(new CacheLoader<Long, MutableInt>() {
+                        @Override
+                        public MutableInt load(Long key) throws Exception {
+                            synchronized (EntryLogsPerLedgerCounter.this) {
+                                return new MutableInt();
+                            }
+                        }
+                    });
+        }
+
+        private synchronized void openNewEntryLogForLedger(Long ledgerId, boolean newLedgerInEntryLogMapCache) {
+            int numOfEntrylogsForThisLedger = ledgerIdEntryLogCounterCacheMap.getUnchecked(ledgerId).incrementAndGet();
+            if (numOfEntrylogsForThisLedger == 2) {
+                numLedgersHavingMultipleEntrylogs.inc();
+            }
+            if (newLedgerInEntryLogMapCache) {
+                numOfWriteActiveLedgers.inc();
+            }
+        }
+
+        private synchronized void removedLedgerFromEntryLogMapCache(Long ledgerId, RemovalCause cause) {
+            numOfWriteActiveLedgers.dec();
+            if (cause.equals(RemovalCause.EXPIRED)) {
+                numOfWriteLedgersRemovedCacheExpiry.inc();
+            } else if (cause.equals(RemovalCause.SIZE)) {
+                numOfWriteLedgersRemovedCacheMaxSize.inc();
+            }
+        }
+
+        /*
+         * this is for testing purpose only. guava's cache doesnt cleanup
+         * completely (including calling expiry removal listener) automatically
+         * when access timeout elapses.
+         *
+         * https://google.github.io/guava/releases/19.0/api/docs/com/google/
+         * common/cache/CacheBuilder.html
+         *
+         * If expireAfterWrite or expireAfterAccess is requested entries may be
+         * evicted on each cache modification, on occasional cache accesses, or
+         * on calls to Cache.cleanUp(). Expired entries may be counted by
+         * Cache.size(), but will never be visible to read or write operations.
+         *
+         * Certain cache configurations will result in the accrual of periodic
+         * maintenance tasks which will be performed during write operations, or
+         * during occasional read operations in the absence of writes. The
+         * Cache.cleanUp() method of the returned cache will also perform
+         * maintenance, but calling it should not be necessary with a high
+         * throughput cache. Only caches built with removalListener,
+         * expireAfterWrite, expireAfterAccess, weakKeys, weakValues, or
+         * softValues perform periodic maintenance.
+         */
+        @VisibleForTesting
+        void doCounterMapCleanup() {
+            ledgerIdEntryLogCounterCacheMap.cleanUp();
+        }
+
+        @VisibleForTesting
+        ConcurrentMap<Long, MutableInt> getCounterMap() {
+            return ledgerIdEntryLogCounterCacheMap.asMap();
+        }
+    }
+
     private final LoadingCache<Long, EntryLogAndLockTuple> ledgerIdEntryLogMap;
     /*
      * every time active logChannel is accessed from ledgerIdEntryLogMap
@@ -107,16 +225,24 @@ class EntryLogManagerForEntryLogPerLedger extends EntryLogManagerBase {
     private final EntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus;
     private final int entrylogMapAccessExpiryTimeInSeconds;
     private final int maximumNumberOfActiveEntryLogs;
+    private final int entryLogPerLedgerCounterLimitsMultFactor;
+
+    // Expose Stats
+    private final StatsLogger statsLogger;
+    final EntryLogsPerLedgerCounter entryLogsPerLedgerCounter;
 
     EntryLogManagerForEntryLogPerLedger(ServerConfiguration conf, LedgerDirsManager ledgerDirsManager,
             EntryLoggerAllocator entryLoggerAllocator, List<EntryLogger.EntryLogListener> listeners,
-            EntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus) throws IOException {
+            EntryLogger.RecentEntryLogsStatus recentlyCreatedEntryLogsStatus, StatsLogger statsLogger)
+            throws IOException {
         super(conf, ledgerDirsManager, entryLoggerAllocator, listeners);
         this.recentlyCreatedEntryLogsStatus = recentlyCreatedEntryLogsStatus;
         this.rotatedLogChannels = new CopyOnWriteArrayList<BufferedLogChannel>();
         this.replicaOfCurrentLogChannels = new ConcurrentLongHashMap<BufferedLogChannelWithDirInfo>();
         this.entrylogMapAccessExpiryTimeInSeconds = conf.getEntrylogMapAccessExpiryTimeInSeconds();
         this.maximumNumberOfActiveEntryLogs = conf.getMaximumNumberOfActiveEntryLogs();
+        this.entryLogPerLedgerCounterLimitsMultFactor = conf.getEntryLogPerLedgerCounterLimitsMultFactor();
+
         ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener());
         this.entryLogAndLockTupleCacheLoader = new CacheLoader<Long, EntryLogAndLockTuple>() {
             @Override
@@ -147,6 +273,9 @@ class EntryLogManagerForEntryLogPerLedger extends EntryLogManagerBase {
                         onCacheEntryRemoval(expiredLedgerEntryLogMapEntry);
                     }
                 }).build(entryLogAndLockTupleCacheLoader);
+
+        this.statsLogger = statsLogger;
+        this.entryLogsPerLedgerCounter = new EntryLogsPerLedgerCounter(this.statsLogger);
     }
 
     /*
@@ -194,6 +323,8 @@ class EntryLogManagerForEntryLogPerLedger extends EntryLogManagerBase {
             }
             replicaOfCurrentLogChannels.remove(logChannel.getLogId());
             rotatedLogChannels.add(logChannel);
+            entryLogsPerLedgerCounter.removedLedgerFromEntryLogMapCache(ledgerId,
+                    removedLedgerEntryLogMapEntry.getCause());
         } finally {
             lock.unlock();
         }
@@ -244,9 +375,11 @@ class EntryLogManagerForEntryLogPerLedger extends EntryLogManagerBase {
         lock.lock();
         try {
             BufferedLogChannel hasToRotateLogChannel = getCurrentLogForLedger(ledgerId);
+            boolean newLedgerInEntryLogMapCache = (hasToRotateLogChannel == null);
             logChannel.setLedgerIdAssigned(ledgerId);
             BufferedLogChannelWithDirInfo logChannelWithDirInfo = new BufferedLogChannelWithDirInfo(logChannel);
             ledgerIdEntryLogMap.get(ledgerId).setEntryLogWithDirInfo(logChannelWithDirInfo);
+            entryLogsPerLedgerCounter.openNewEntryLogForLedger(ledgerId, newLedgerInEntryLogMapCache);
             replicaOfCurrentLogChannels.put(logChannel.getLogId(), logChannelWithDirInfo);
             if (hasToRotateLogChannel != null) {
                 replicaOfCurrentLogChannels.remove(hasToRotateLogChannel.getLogId());
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
index d300e0b..f8eacbd 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
@@ -56,6 +56,8 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 
 import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.util.IOUtils;
 import org.apache.bookkeeper.util.collections.ConcurrentLongLongHashMap;
 import org.apache.bookkeeper.util.collections.ConcurrentLongLongHashMap.BiConsumerLong;
@@ -78,6 +80,9 @@ public class EntryLogger {
     @VisibleForTesting
     static final int UNINITIALIZED_LOG_ID = -0xDEAD;
 
+    // Expose Stats
+    private final StatsLogger statsLogger;
+
     static class BufferedLogChannel extends BufferedChannel {
         private final long logId;
         private final EntryLogMetadata entryLogMetadata;
@@ -323,11 +328,11 @@ public class EntryLogger {
      */
     public EntryLogger(ServerConfiguration conf,
             LedgerDirsManager ledgerDirsManager) throws IOException {
-        this(conf, ledgerDirsManager, null);
+        this(conf, ledgerDirsManager, null, NullStatsLogger.INSTANCE);
     }
 
     public EntryLogger(ServerConfiguration conf,
-            LedgerDirsManager ledgerDirsManager, EntryLogListener listener)
+            LedgerDirsManager ledgerDirsManager, EntryLogListener listener, StatsLogger statsLogger)
                     throws IOException {
         //We reserve 500 bytes as overhead for the protocol.  This is not 100% accurate
         // but the protocol varies so an exact value is difficult to determine
@@ -363,9 +368,10 @@ public class EntryLogger {
         this.recentlyCreatedEntryLogsStatus = new RecentEntryLogsStatus(logId + 1);
         this.entryLoggerAllocator = new EntryLoggerAllocator(conf, ledgerDirsManager, recentlyCreatedEntryLogsStatus,
                 logId);
+        this.statsLogger = statsLogger;
         if (entryLogPerLedgerEnabled) {
             this.entryLogManager = new EntryLogManagerForEntryLogPerLedger(conf, ledgerDirsManager,
-                    entryLoggerAllocator, listeners, recentlyCreatedEntryLogsStatus);
+                    entryLoggerAllocator, listeners, recentlyCreatedEntryLogsStatus, statsLogger);
         } else {
             this.entryLogManager = new EntryLogManagerForSingleEntryLog(conf, ledgerDirsManager, entryLoggerAllocator,
                     listeners, recentlyCreatedEntryLogsStatus);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
index 2c576d7..08e7f4e 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
@@ -22,6 +22,7 @@
 package org.apache.bookkeeper.bookie;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ENTRYLOGGER_SCOPE;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.STORAGE_GET_ENTRY;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.STORAGE_GET_OFFSET;
 
@@ -101,7 +102,7 @@ public class InterleavedLedgerStorage implements CompactableLedgerStorage, Entry
         checkNotNull(checkpointer, "invalid null checkpointer");
         this.checkpointSource = checkpointSource;
         this.checkpointer = checkpointer;
-        entryLogger = new EntryLogger(conf, ledgerDirsManager, this);
+        entryLogger = new EntryLogger(conf, ledgerDirsManager, this, statsLogger.scope(ENTRYLOGGER_SCOPE));
         ledgerCache = new LedgerCacheImpl(conf, activeLedgers,
                 null == indexDirsManager ? ledgerDirsManager : indexDirsManager, statsLogger);
         gcThread = new GarbageCollectorThread(conf, ledgerManager, this, statsLogger.scope("gc"));
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
index c8e784a..b71c55c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
@@ -176,7 +176,7 @@ public class SingleDirectoryDbLedgerStorage implements CompactableLedgerStorage
                 TransientLedgerInfo.LEDGER_INFO_CACHING_TIME_MINUTES,
                 TransientLedgerInfo.LEDGER_INFO_CACHING_TIME_MINUTES, TimeUnit.MINUTES);
 
-        entryLogger = new EntryLogger(conf, ledgerDirsManager);
+        entryLogger = new EntryLogger(conf, ledgerDirsManager, null, statsLogger);
         gcThread = new GarbageCollectorThread(conf, ledgerManager, this, statsLogger);
 
         stats.registerGauge("write-cache-size", new Gauge<Long>() {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index faf671d..d913dbb 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -217,6 +217,13 @@ public class ServerConfiguration extends AbstractConfiguration<ServerConfigurati
      */
     protected static final String MAXIMUM_NUMBER_OF_ACTIVE_ENTRYLOGS = "maximumNumberOfActiveEntryLogs";
 
+    /*
+     * in EntryLogManagerForEntryLogPerLedger, this config value specifies the
+     * metrics cache size limits in multiples of entrylogMap cache size limits.
+     */
+    protected static final String ENTRY_LOG_PER_LEDGER_COUNTER_LIMITS_MULT_FACTOR =
+            "entryLogPerLedgerCounterLimitsMultFactor";
+
     /**
      * Construct a default configuration object.
      */
@@ -3002,4 +3009,23 @@ public class ServerConfiguration extends AbstractConfiguration<ServerConfigurati
                 Integer.toString(maximumNumberOfActiveEntryLogs));
         return this;
     }
+
+    /*
+     * in EntryLogManagerForEntryLogPerLedger, this config value specifies the
+     * metrics cache size limits in multiples of entrylogMap cache size limits.
+     */
+    public int getEntryLogPerLedgerCounterLimitsMultFactor() {
+        return this.getInt(ENTRY_LOG_PER_LEDGER_COUNTER_LIMITS_MULT_FACTOR, 10);
+    }
+
+    /*
+     * in EntryLogManagerForEntryLogPerLedger, this config value specifies the
+     * metrics cache size limits in multiples of entrylogMap cache size limits.
+     */
+    public ServerConfiguration setEntryLogPerLedgerCounterLimitsMultFactor(
+            int entryLogPerLedgerCounterLimitsMultFactor) {
+        this.setProperty(ENTRY_LOG_PER_LEDGER_COUNTER_LIMITS_MULT_FACTOR,
+                Integer.toString(entryLogPerLedgerCounterLimitsMultFactor));
+        return this;
+    }
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java
index 611a68a..3cdfd9f 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java
@@ -40,6 +40,10 @@ import org.apache.bookkeeper.bookie.EntryLogManagerForEntryLogPerLedger.Buffered
 import org.apache.bookkeeper.bookie.EntryLogger.BufferedLogChannel;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.conf.TestBKConfiguration;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.test.TestStatsProvider;
+import org.apache.bookkeeper.test.TestStatsProvider.TestOpStatsLogger;
+import org.apache.bookkeeper.test.TestStatsProvider.TestStatsLogger;
 import org.apache.bookkeeper.util.DiskChecker;
 import org.apache.commons.lang.mutable.MutableInt;
 import org.junit.After;
@@ -578,4 +582,191 @@ public class CreateNewLogTest {
         Assert.assertEquals("PreviousAllocatedEntryLogId", numOfLedgers * numOfThreadsForSameLedger,
                 entryLogger.getPreviousAllocatedEntryLogId());
     }
+
+    /*
+     * In this testcase metrics of EntryLogManagerForEntryLogPerLedger are
+     * validated.
+     */
+    @Test
+    public void testEntryLogManagerMetrics() throws Exception {
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+        TestStatsProvider statsProvider = new TestStatsProvider();
+        TestStatsLogger statsLogger = statsProvider.getStatsLogger(BookKeeperServerStats.ENTRYLOGGER_SCOPE);
+        int maximumNumberOfActiveEntryLogs = 3;
+        int entryLogPerLedgerCounterLimitsMultFactor = 2;
+
+        // Creating a new configuration with a number of ledger directories.
+        conf.setLedgerDirNames(ledgerDirs);
+        // pre-allocation is enabled
+        conf.setEntryLogFilePreAllocationEnabled(true);
+        conf.setEntryLogPerLedgerEnabled(true);
+        conf.setMaximumNumberOfActiveEntryLogs(maximumNumberOfActiveEntryLogs);
+        conf.setEntryLogPerLedgerCounterLimitsMultFactor(entryLogPerLedgerCounterLimitsMultFactor);
+        LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
+        EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager, null, statsLogger);
+        EntryLogManagerForEntryLogPerLedger entrylogManager = (EntryLogManagerForEntryLogPerLedger) entryLogger
+                .getEntryLogManager();
+        // set same thread executor for entryLoggerAllocator's allocatorExecutor
+        setSameThreadExecutorForEntryLoggerAllocator(entryLogger.getEntryLoggerAllocator());
+
+        Counter numOfWriteActiveLedgers = statsLogger.getCounter(BookKeeperServerStats.NUM_OF_WRITE_ACTIVE_LEDGERS);
+        Counter numOfWriteLedgersRemovedCacheExpiry = statsLogger
+                .getCounter(BookKeeperServerStats.NUM_OF_WRITE_LEDGERS_REMOVED_CACHE_EXPIRY);
+        Counter numOfWriteLedgersRemovedCacheMaxSize = statsLogger
+                .getCounter(BookKeeperServerStats.NUM_OF_WRITE_LEDGERS_REMOVED_CACHE_MAXSIZE);
+        Counter numLedgersHavingMultipleEntrylogs = statsLogger
+                .getCounter(BookKeeperServerStats.NUM_LEDGERS_HAVING_MULTIPLE_ENTRYLOGS);
+        TestOpStatsLogger entryLogsPerLedger = (TestOpStatsLogger) statsLogger
+                .getOpStatsLogger(BookKeeperServerStats.ENTRYLOGS_PER_LEDGER);
+        // initially all the counters should be 0
+        Assert.assertEquals("NUM_OF_WRITE_ACTIVE_LEDGERS", 0, numOfWriteActiveLedgers.get().intValue());
+        Assert.assertEquals("NUM_OF_WRITE_LEDGERS_REMOVED_CACHE_EXPIRY", 0,
+                numOfWriteLedgersRemovedCacheExpiry.get().intValue());
+        Assert.assertEquals("NUM_OF_WRITE_LEDGERS_REMOVED_CACHE_MAXSIZE", 0,
+                numOfWriteLedgersRemovedCacheMaxSize.get().intValue());
+        Assert.assertEquals("NUM_LEDGERS_HAVING_MULTIPLE_ENTRYLOGS", 0,
+                numLedgersHavingMultipleEntrylogs.get().intValue());
+        Assert.assertEquals("ENTRYLOGS_PER_LEDGER SuccessCount", 0, entryLogsPerLedger.getSuccessCount());
+
+        // lid-1 : 3 entrylogs, lid-2 : 2 entrylogs, lid-3 : 1 entrylog
+        int numOfEntrylogsForLedger1 = 3;
+        createNewLogs(entrylogManager, 1L, numOfEntrylogsForLedger1);
+        int numOfEntrylogsForLedger2 = 2;
+        createNewLogs(entrylogManager, 2L, numOfEntrylogsForLedger2);
+        createNewLogs(entrylogManager, 3L, 1);
+
+        Assert.assertEquals("NUM_OF_WRITE_ACTIVE_LEDGERS", 3, numOfWriteActiveLedgers.get().intValue());
+        Assert.assertEquals("NUM_OF_WRITE_LEDGERS_REMOVED_CACHE_EXPIRY", 0,
+                numOfWriteLedgersRemovedCacheExpiry.get().intValue());
+        Assert.assertEquals("NUM_OF_WRITE_LEDGERS_REMOVED_CACHE_MAXSIZE", 0,
+                numOfWriteLedgersRemovedCacheMaxSize.get().intValue());
+        Assert.assertEquals("NUM_LEDGERS_HAVING_MULTIPLE_ENTRYLOGS", 2,
+                numLedgersHavingMultipleEntrylogs.get().intValue());
+        Assert.assertEquals("ENTRYLOGS_PER_LEDGER SuccessCount", 0, entryLogsPerLedger.getSuccessCount());
+
+        /*
+         * since entrylog for lid-4 is created and entrylogmap cachesize is 3,
+         * lid-1 will be removed from entrylogmap cache
+         */
+        createNewLogs(entrylogManager, 4L, 1);
+        Assert.assertEquals("NUM_OF_WRITE_ACTIVE_LEDGERS", maximumNumberOfActiveEntryLogs,
+                numOfWriteActiveLedgers.get().intValue());
+        Assert.assertEquals("NUM_OF_WRITE_LEDGERS_REMOVED_CACHE_MAXSIZE", 1,
+                numOfWriteLedgersRemovedCacheMaxSize.get().intValue());
+        Assert.assertEquals("ENTRYLOGS_PER_LEDGER SuccessCount", 0, entryLogsPerLedger.getSuccessCount());
+
+        /*
+         * entrylog for lid-5, lid-6, lid-7 are created. Since
+         * maximumNumberOfActiveEntryLogs = 3 and
+         * entryLogPerLedgerCounterLimitsMultFactor = 2, when the entrylog for
+         * lid-7 is created, count of lid-1 should be removed from countermap.
+         */
+        createNewLogs(entrylogManager, 5L, 1);
+        createNewLogs(entrylogManager, 6L, 1);
+        createNewLogs(entrylogManager, 7L, 1);
+        Assert.assertEquals("NUM_OF_WRITE_ACTIVE_LEDGERS", maximumNumberOfActiveEntryLogs,
+                numOfWriteActiveLedgers.get().intValue());
+        Assert.assertEquals("NUM_OF_WRITE_LEDGERS_REMOVED_CACHE_MAXSIZE", 4,
+                numOfWriteLedgersRemovedCacheMaxSize.get().intValue());
+        Assert.assertEquals("ENTRYLOGS_PER_LEDGER SuccessCount", 1, entryLogsPerLedger.getSuccessCount());
+        Assert.assertTrue("ENTRYLOGS_PER_LEDGER average value",
+                Double.compare(numOfEntrylogsForLedger1, entryLogsPerLedger.getSuccessAverage()) == 0);
+
+        /*
+         * entrylog for new lid-8 is created so one more entry from countermap
+         * should be removed.
+         */
+        createNewLogs(entrylogManager, 8L, 4);
+        Assert.assertEquals("NUM_OF_WRITE_ACTIVE_LEDGERS", maximumNumberOfActiveEntryLogs,
+                numOfWriteActiveLedgers.get().intValue());
+        Assert.assertEquals("NUM_OF_WRITE_LEDGERS_REMOVED_CACHE_MAXSIZE", 5,
+                numOfWriteLedgersRemovedCacheMaxSize.get().intValue());
+        Assert.assertEquals("NUM_LEDGERS_HAVING_MULTIPLE_ENTRYLOGS", 3,
+                numLedgersHavingMultipleEntrylogs.get().intValue());
+        Assert.assertEquals("ENTRYLOGS_PER_LEDGER SuccessCount", 2, entryLogsPerLedger.getSuccessCount());
+        Assert.assertTrue("ENTRYLOGS_PER_LEDGER average value",
+                Double.compare((numOfEntrylogsForLedger1 + numOfEntrylogsForLedger2) / 2.0,
+                        entryLogsPerLedger.getSuccessAverage()) == 0);
+
+        /*
+         * lid-3 is still in countermap. So when new entrylogs are created for
+         * lid-3, no new entry from counter should be removed. so
+         * entryLogsPerLedger.getSuccessCount() should be still old value. Also,
+         * since lid-3 is still in countermap, these new 4 entrylogs should be
+         * added to previous value 1 and hence the EntryLogsPerLedger for ledger
+         * - 3l should be updated to 5.
+         */
+        createNewLogs(entrylogManager, 3L, 4);
+        Assert.assertEquals("NUM_OF_WRITE_LEDGERS_REMOVED_CACHE_MAXSIZE", 6,
+                numOfWriteLedgersRemovedCacheMaxSize.get().intValue());
+        Assert.assertEquals("NUM_LEDGERS_HAVING_MULTIPLE_ENTRYLOGS", 4,
+                numLedgersHavingMultipleEntrylogs.get().intValue());
+        Assert.assertEquals("Numofentrylogs for ledger: 3l", 5,
+                entrylogManager.entryLogsPerLedgerCounter.getCounterMap().get(3L).intValue());
+        Assert.assertEquals("ENTRYLOGS_PER_LEDGER SuccessCount", 2, entryLogsPerLedger.getSuccessCount());
+    }
+
+    /*
+     * In this testcase metrics of EntryLogManagerForEntryLogPerLedger are
+     * validated.
+     */
+    @Test
+    public void testEntryLogManagerMetricsFromExpiryAspect() throws Exception {
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+        TestStatsProvider statsProvider = new TestStatsProvider();
+        TestStatsLogger statsLogger = statsProvider.getStatsLogger(BookKeeperServerStats.ENTRYLOGGER_SCOPE);
+
+        int entrylogMapAccessExpiryTimeInSeconds = 1;
+        int entryLogPerLedgerCounterLimitsMultFactor = 2;
+
+        // Creating a new configuration with a number of ledger directories.
+        conf.setLedgerDirNames(ledgerDirs);
+        // pre-allocation is enabled
+        conf.setEntryLogFilePreAllocationEnabled(true);
+        conf.setEntryLogPerLedgerEnabled(true);
+        conf.setEntrylogMapAccessExpiryTimeInSeconds(entrylogMapAccessExpiryTimeInSeconds);
+        conf.setEntryLogPerLedgerCounterLimitsMultFactor(entryLogPerLedgerCounterLimitsMultFactor);
+        LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
+        EntryLogger entryLogger = new EntryLogger(conf, ledgerDirsManager, null, statsLogger);
+        EntryLogManagerForEntryLogPerLedger entrylogManager = (EntryLogManagerForEntryLogPerLedger) entryLogger
+                .getEntryLogManager();
+        // set same thread executor for entryLoggerAllocator's allocatorExecutor
+        setSameThreadExecutorForEntryLoggerAllocator(entryLogger.getEntryLoggerAllocator());
+
+        Counter numOfWriteLedgersRemovedCacheExpiry = statsLogger
+                .getCounter(BookKeeperServerStats.NUM_OF_WRITE_LEDGERS_REMOVED_CACHE_EXPIRY);
+        TestOpStatsLogger entryLogsPerLedger = (TestOpStatsLogger) statsLogger
+                .getOpStatsLogger(BookKeeperServerStats.ENTRYLOGS_PER_LEDGER);
+
+        int numOfEntrylogsForLedger1 = 3;
+        createNewLogs(entrylogManager, 1L, numOfEntrylogsForLedger1);
+        Assert.assertEquals("ENTRYLOGS_PER_LEDGER SuccessCount", 0, entryLogsPerLedger.getSuccessCount());
+        Assert.assertEquals("NUM_OF_WRITE_LEDGERS_REMOVED_CACHE_EXPIRY", 0,
+                numOfWriteLedgersRemovedCacheExpiry.get().intValue());
+
+        Thread.sleep(entrylogMapAccessExpiryTimeInSeconds * 1000 + 100);
+        entrylogManager.doEntryLogMapCleanup();
+        entrylogManager.entryLogsPerLedgerCounter.doCounterMapCleanup();
+        Assert.assertEquals("NUM_OF_WRITE_LEDGERS_REMOVED_CACHE_EXPIRY", 1,
+                numOfWriteLedgersRemovedCacheExpiry.get().intValue());
+        Assert.assertEquals("ENTRYLOGS_PER_LEDGER SuccessCount", 0, entryLogsPerLedger.getSuccessCount());
+
+        Thread.sleep(entrylogMapAccessExpiryTimeInSeconds * 1000 + 100);
+        entrylogManager.doEntryLogMapCleanup();
+        entrylogManager.entryLogsPerLedgerCounter.doCounterMapCleanup();
+        Assert.assertEquals("NUM_OF_WRITE_LEDGERS_REMOVED_CACHE_EXPIRY", 1,
+                numOfWriteLedgersRemovedCacheExpiry.get().intValue());
+        Assert.assertEquals("ENTRYLOGS_PER_LEDGER SuccessCount", 1, entryLogsPerLedger.getSuccessCount());
+        Assert.assertTrue("ENTRYLOGS_PER_LEDGER average value",
+                Double.compare(numOfEntrylogsForLedger1, entryLogsPerLedger.getSuccessAverage()) == 0);
+    }
+
+    private static void createNewLogs(EntryLogManagerForEntryLogPerLedger entrylogManager, long ledgerId,
+            int numOfTimes) throws IOException {
+        for (int i = 0; i < numOfTimes; i++) {
+            entrylogManager.createNewLog(ledgerId);
+        }
+    }
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SlowInterleavedLedgerStorage.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SlowInterleavedLedgerStorage.java
index c3a8f63..645af9c 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SlowInterleavedLedgerStorage.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/SlowInterleavedLedgerStorage.java
@@ -46,9 +46,9 @@ public class SlowInterleavedLedgerStorage extends InterleavedLedgerStorage {
         public volatile long addDelay = 0;
         public volatile long flushDelay = 0;
 
-        public SlowEntryLogger(ServerConfiguration conf, LedgerDirsManager ledgerDirsManager, EntryLogListener listener)
-                throws IOException {
-            super(conf, ledgerDirsManager, listener);
+        public SlowEntryLogger(ServerConfiguration conf, LedgerDirsManager ledgerDirsManager, EntryLogListener listener,
+                StatsLogger statsLogger) throws IOException {
+            super(conf, ledgerDirsManager, listener, statsLogger);
         }
 
         public SlowEntryLogger setAddDelay(long delay) {
@@ -119,7 +119,7 @@ public class SlowInterleavedLedgerStorage extends InterleavedLedgerStorage {
         long addDelay = conf.getLong(PROP_SLOW_STORAGE_ADD_DELAY, 0);
         long flushDelay = conf.getLong(PROP_SLOW_STORAGE_FLUSH_DELAY, 0);
 
-        entryLogger = new SlowEntryLogger(conf, ledgerDirsManager, this)
+        entryLogger = new SlowEntryLogger(conf, ledgerDirsManager, this, statsLogger)
                 .setAddDelay(addDelay)
                 .setGetDelay(getDelay)
                 .setFlushDelay(flushDelay);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/TestStatsProvider.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/TestStatsProvider.java
index ac6bfaf..9b9fdb5 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/TestStatsProvider.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/TestStatsProvider.java
@@ -177,6 +177,10 @@ public class TestStatsProvider implements StatsProvider {
             return TestStatsProvider.this.getOrCreateCounter(getSubPath(name));
         }
 
+        public Gauge<? extends Number> getGauge(String name) {
+            return gaugeMap.get(path);
+        }
+
         @Override
         public <T extends Number> void registerGauge(String name, Gauge<T> gauge) {
             TestStatsProvider.this.registerGauge(getSubPath(name), gauge);
diff --git a/conf/bk_server.conf b/conf/bk_server.conf
index 1db0a1c..ab9e99a 100755
--- a/conf/bk_server.conf
+++ b/conf/bk_server.conf
@@ -441,6 +441,10 @@ ledgerDirectories=/tmp/bk-data
 # active at a given point in time
 # maximumNumberOfActiveEntryLogs=500
 
+# in EntryLogManagerForEntryLogPerLedger, this config value specifies the metrics cache size 
+# limits in multiples of entrylogMap cache size limits.
+# entryLogPerLedgerCounterLimitsMultFactor=10
+
 #############################################################################
 ## Entry log compaction settings
 #############################################################################
diff --git a/pom.xml b/pom.xml
index a18146d..2e5d0c4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -114,7 +114,7 @@
     <commons-configuration.version>1.10</commons-configuration.version>
     <commons-compress.version>1.15</commons-compress.version>
     <commons-lang.version>2.6</commons-lang.version>
-    <commons-lang3.version>3.3.2</commons-lang3.version>
+    <commons-lang3.version>3.6</commons-lang3.version>
     <commons-io.version>2.4</commons-io.version>
     <curator.version>4.0.1</curator.version>
     <dropwizard.version>3.1.0</dropwizard.version>
diff --git a/site/_data/config/bk_server.yaml b/site/_data/config/bk_server.yaml
index 9a872e6..39f2719 100644
--- a/site/_data/config/bk_server.yaml
+++ b/site/_data/config/bk_server.yaml
@@ -317,6 +317,9 @@ groups:
   - param: maximumNumberOfActiveEntryLogs
     description: in entryLogPerLedger feature, this specifies the maximum number of entrylogs that can be active at a given point in time. If there are more number of active entryLogs then the maximumNumberOfActiveEntryLogs then the entrylog will be evicted from the cache.
     default: 500
+  - param: entryLogPerLedgerCounterLimitsMultFactor
+    description: in EntryLogManagerForEntryLogPerLedger, this config value specifies the metrics cache size limits in multiples of entrylogMap cache size limits.
+    default: 10
 
 - name: Entry log compaction settings
   params:
diff --git a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/cluster/BookKeeperClusterTestBase.java b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/cluster/BookKeeperClusterTestBase.java
index ebc0c87..9070789 100644
--- a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/cluster/BookKeeperClusterTestBase.java
+++ b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/cluster/BookKeeperClusterTestBase.java
@@ -25,6 +25,7 @@ import java.util.Set;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.Random;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.bookkeeper.conf.ClientConfiguration;
@@ -34,7 +35,6 @@ import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.tests.integration.topologies.BKCluster;
 import org.apache.bookkeeper.tests.integration.topologies.BKClusterSpec;
-import org.apache.commons.lang3.RandomStringUtils;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 
@@ -48,11 +48,16 @@ public abstract class BookKeeperClusterTestBase {
     protected static URI metadataServiceUri;
     protected static MetadataClientDriver metadataClientDriver;
     protected static ScheduledExecutorService executor;
+    protected static Random rand = new Random();
 
     @BeforeClass
     public static void setupCluster() throws Exception {
+        StringBuilder sb = new StringBuilder();
+        for (int i = 0; i < 8; i++) {
+            sb.append((char) (rand.nextInt(26) + 'a'));
+        }
         BKClusterSpec spec = BKClusterSpec.builder()
-            .clusterName(RandomStringUtils.randomAlphabetic(8))
+            .clusterName(sb.toString())
             .numBookies(0)
             .build();
 
diff --git a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/StreamClusterTestBase.java b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/StreamClusterTestBase.java
index a7d8549..a8a3e6e 100644
--- a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/StreamClusterTestBase.java
+++ b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/StreamClusterTestBase.java
@@ -19,6 +19,7 @@
 package org.apache.bookkeeper.tests.integration.stream;
 
 import java.util.List;
+import java.util.Random;
 import java.util.stream.Collectors;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.api.StorageClient;
@@ -29,7 +30,6 @@ import org.apache.bookkeeper.clients.utils.NetUtils;
 import org.apache.bookkeeper.stream.proto.common.Endpoint;
 import org.apache.bookkeeper.tests.integration.cluster.BookKeeperClusterTestBase;
 import org.apache.bookkeeper.tests.integration.topologies.BKClusterSpec;
-import org.apache.commons.lang3.RandomStringUtils;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 
@@ -40,10 +40,16 @@ import org.junit.BeforeClass;
 @Slf4j
 public abstract class StreamClusterTestBase extends BookKeeperClusterTestBase {
 
+    protected static Random rand = new Random();
+
     @BeforeClass
     public static void setupCluster() throws Exception {
+        StringBuilder sb = new StringBuilder();
+        for (int i = 0; i < 8; i++) {
+            sb.append((char) (rand.nextInt(26) + 'a'));
+        }
         BKClusterSpec spec = BKClusterSpec.builder()
-            .clusterName(RandomStringUtils.randomAlphabetic(8))
+            .clusterName(sb.toString())
             .numBookies(3)
             .extraServerComponents("org.apache.bookkeeper.stream.server.StreamStorageLifecycleComponent")
             .build();

-- 
To stop receiving notification emails like this one, please contact
sijie@apache.org.