You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/02 14:57:01 UTC

[pulsar] branch master updated: Allow to configure the managed ledger cache eviction frequency (#4066)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f5c7b22  Allow to configure the managed ledger cache eviction frequency (#4066)
f5c7b22 is described below

commit f5c7b22f7d2224e28a81daed6b96c117007e2821
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu May 2 07:56:55 2019 -0700

    Allow to configure the managed ledger cache eviction frequency (#4066)
    
    * Allow to configure the managed ledger cache eviction frequency
    
    * Fixed test
    
    * Simplified the cache eviction to make it predictable at the configured frequency
    
    * Address comments
    
    * Apply eviction on slowest active reader by preference
    
    * Re-introduced backlogged subscriptions test
    
    * Addressed comments
    
    * Use config option
    
    * Fixed active/inactive logic and read position
    
    * Use dedicated thread for cache evictions
    
    * Added config options in docs
    
    * Fixed tests
    
    * Added time triggered eviction test
    
    * Fixed flaky test
    
    * Fixed tests
---
 conf/broker.conf                                   |  10 ++
 conf/standalone.conf                               |  10 ++
 .../bookkeeper/mledger/ManagedLedgerConfig.java    |  16 +--
 .../mledger/ManagedLedgerFactoryConfig.java        |  17 ++-
 .../apache/bookkeeper/mledger/impl/EntryCache.java |   4 +-
 .../bookkeeper/mledger/impl/EntryCacheImpl.java    |  21 ++--
 .../bookkeeper/mledger/impl/EntryCacheManager.java |   4 +
 .../apache/bookkeeper/mledger/impl/EntryImpl.java  |  11 ++
 .../mledger/impl/ManagedLedgerFactoryImpl.java     |  58 ++++++++-
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 131 ++++++++-------------
 .../apache/bookkeeper/mledger/util/RangeCache.java |  43 ++++++-
 .../mledger/impl/EntryCacheManagerTest.java        |  51 +++++++-
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java |  89 +++++++-------
 .../bookkeeper/mledger/util/RangeCacheTest.java    |  23 +++-
 .../apache/pulsar/broker/ServiceConfiguration.java |  10 ++
 .../pulsar/broker/ManagedLedgerClientFactory.java  |   3 +
 .../broker/service/persistent/PersistentTopic.java |   3 +
 .../pulsar/client/api/ConsumerRedeliveryTest.java  |   5 +-
 .../client/api/SimpleProducerConsumerTest.java     |  21 +---
 .../client/api/v1/V1_ProducerConsumerTest.java     |  90 --------------
 site2/docs/reference-configuration.md              |   3 +
 21 files changed, 352 insertions(+), 271 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 856d173..fb2cb1e 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -440,6 +440,16 @@ managedLedgerCacheSizeMB=
 # Threshold to which bring down the cache level when eviction is triggered
 managedLedgerCacheEvictionWatermark=0.9
 
+# Configure the cache eviction frequency for the managed ledger cache (evictions/sec)
+managedLedgerCacheEvictionFrequency=100.0
+
+# All entries that have stayed in cache for more than the configured time, will be evicted
+managedLedgerCacheEvictionTimeThresholdMillis=1000
+
+# Configure the threshold (in number of entries) from where a cursor should be considered 'backlogged'
+# and thus should be set as inactive.
+managedLedgerCursorBackloggedThreshold=1000
+
 # Rate limit the amount of writes per second generated by consumer acking the messages
 managedLedgerDefaultMarkDeleteRateLimit=1.0
 
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 4cbf931..b8f13c5 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -301,6 +301,16 @@ managedLedgerCacheSizeMB=
 # Threshold to which bring down the cache level when eviction is triggered
 managedLedgerCacheEvictionWatermark=0.9
 
+# Configure the cache eviction frequency for the managed ledger cache (evictions/sec)
+managedLedgerCacheEvictionFrequency=100.0
+
+# All entries that have stayed in cache for more than the configured time, will be evicted
+managedLedgerCacheEvictionTimeThresholdMillis=1000
+
+# Configure the threshold (in number of entries) from where a cursor should be considered 'backlogged'
+# and thus should be set as inactive.
+managedLedgerCursorBackloggedThreshold=1000
+
 # Rate limit the amount of writes generated by consumer acking the messages
 managedLedgerDefaultMarkDeleteRateLimit=0.1
 
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
index 8f89050..4a57055 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
@@ -515,9 +515,9 @@ public class ManagedLedgerConfig {
     }
 
     /**
-     * 
+     *
      * Ledger-Op (Create/Delete) timeout
-     * 
+     *
      * @return
      */
     public long getMetadataOperationsTimeoutSeconds() {
@@ -526,17 +526,17 @@ public class ManagedLedgerConfig {
 
     /**
      * Ledger-Op (Create/Delete) timeout after which callback will be completed with failure
-     * 
+     *
      * @param metadataOperationsTimeoutSeconds
      */
     public ManagedLedgerConfig setMetadataOperationsTimeoutSeconds(long metadataOperationsTimeoutSeconds) {
         this.metadataOperationsTimeoutSeconds = metadataOperationsTimeoutSeconds;
         return this;
     }
-    
+
     /**
      * Ledger read-entry timeout
-     * 
+     *
      * @return
      */
     public long getReadEntryTimeoutSeconds() {
@@ -546,7 +546,7 @@ public class ManagedLedgerConfig {
     /**
      * Ledger read entry timeout after which callback will be completed with failure. (disable timeout by setting
      * readTimeoutSeconds <= 0)
-     * 
+     *
      * @param readTimeoutSeconds
      * @return
      */
@@ -554,14 +554,14 @@ public class ManagedLedgerConfig {
         this.readEntryTimeoutSeconds = readEntryTimeoutSeconds;
         return this;
     }
-    
+
     public long getAddEntryTimeoutSeconds() {
         return addEntryTimeoutSeconds;
     }
 
     /**
      * Add-entry timeout after which add-entry callback will be failed if add-entry is not succeeded.
-     * 
+     *
      * @param addEntryTimeoutSeconds
      */
     public ManagedLedgerConfig setAddEntryTimeoutSeconds(long addEntryTimeoutSeconds) {
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java
index 81673f4..40b815d 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java
@@ -37,7 +37,18 @@ public class ManagedLedgerFactoryConfig {
     private int numManagedLedgerWorkerThreads = Runtime.getRuntime().availableProcessors();
     private int numManagedLedgerSchedulerThreads = Runtime.getRuntime().availableProcessors();
 
-    public long getMaxCacheSize() {
-        return maxCacheSize;
-    }
+    /**
+     * Frequency of cache eviction triggering. Default is 100 times per second.
+     */
+    private double cacheEvictionFrequency = 100;
+
+    /**
+     * All entries that have stayed in cache for more than the configured time, will be evicted
+     */
+    private long cacheEvictionTimeThresholdMillis = 1000;
+
+    /**
+     * Threshould to consider a cursor as "backlogged"
+     */
+    private long thresholdBackloggedCursor = 1000;
 }
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCache.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCache.java
index 0c99650..0e20a0f 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCache.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCache.java
@@ -50,10 +50,12 @@ public interface EntryCache extends Comparable<EntryCache> {
      * Remove from cache all the entries related to a ledger up to lastPosition included.
      *
      * @param lastPosition
-     *            the position of the last entry to be invalidated (inclusive)
+     *            the position of the last entry to be invalidated (non-inclusive)
      */
     void invalidateEntries(PositionImpl lastPosition);
 
+    void invalidateEntriesBeforeTimestamp(long timestamp);
+
     /**
      * Remove from the cache all the entries belonging to a specific ledger.
      *
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java
index a435c14..053e900 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java
@@ -21,15 +21,17 @@ package org.apache.bookkeeper.mledger.impl;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException;
-import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
 
 import com.google.common.collect.Lists;
 import com.google.common.primitives.Longs;
+
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.PooledByteBufAllocator;
+
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
+
 import org.apache.bookkeeper.client.api.BKException;
 import org.apache.bookkeeper.client.api.LedgerEntry;
 import org.apache.bookkeeper.client.api.ReadHandle;
@@ -37,7 +39,6 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.util.RangeCache;
-import org.apache.bookkeeper.mledger.util.RangeCache.Weighter;
 import org.apache.commons.lang3.tuple.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -53,12 +54,10 @@ public class EntryCacheImpl implements EntryCache {
 
     private static final double MB = 1024 * 1024;
 
-    private static final Weighter<EntryImpl> entryWeighter = EntryImpl::getLength;
-
     public EntryCacheImpl(EntryCacheManager manager, ManagedLedgerImpl ml) {
         this.manager = manager;
         this.ml = ml;
-        this.entries = new RangeCache<>(entryWeighter);
+        this.entries = new RangeCache<>(EntryImpl::getLength, EntryImpl::getTimestamp);
 
         if (log.isDebugEnabled()) {
             log.debug("[{}] Initialized managed-ledger entry cache", ml.getName());
@@ -132,7 +131,7 @@ public class EntryCacheImpl implements EntryCache {
     public void invalidateEntries(final PositionImpl lastPosition) {
         final PositionImpl firstPosition = PositionImpl.get(-1, 0);
 
-        Pair<Integer, Long> removed = entries.removeRange(firstPosition, lastPosition, true);
+        Pair<Integer, Long> removed = entries.removeRange(firstPosition, lastPosition, false);
         int entriesRemoved = removed.getLeft();
         long sizeRemoved = removed.getRight();
         if (log.isDebugEnabled()) {
@@ -173,7 +172,7 @@ public class EntryCacheImpl implements EntryCache {
             callback.readEntryFailed(createManagedLedgerException(t), ctx);
         }
     }
-    
+
     private void asyncReadEntry0(ReadHandle lh, PositionImpl position, final ReadEntryCallback callback,
             final Object ctx) {
         if (log.isDebugEnabled()) {
@@ -229,7 +228,7 @@ public class EntryCacheImpl implements EntryCache {
             callback.readEntriesFailed(createManagedLedgerException(t), ctx);
         }
     }
-    
+
     @SuppressWarnings({ "unchecked", "rawtypes" })
     private void asyncReadEntry0(ReadHandle lh, long firstEntry, long lastEntry, boolean isSlowestReader,
             final ReadEntriesCallback callback, Object ctx) {
@@ -341,5 +340,11 @@ public class EntryCacheImpl implements EntryCache {
         return evicted;
     }
 
+    @Override
+    public void invalidateEntriesBeforeTimestamp(long timestamp) {
+        long evictedSize = entries.evictLEntriesBeforeTimestamp(timestamp);
+        manager.entriesRemoved(evictedSize);
+    }
+
     private static final Logger log = LoggerFactory.getLogger(EntryCacheImpl.class);
 }
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java
index c551002..526796e 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java
@@ -190,6 +190,10 @@ public class EntryCacheManager {
         }
 
         @Override
+        public void invalidateEntriesBeforeTimestamp(long timestamp) {
+        }
+
+        @Override
         public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boolean isSlowestReader,
                 final ReadEntriesCallback callback, Object ctx) {
             lh.readAsync(firstEntry, lastEntry).whenComplete(
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java
index eeccfe7..58ea099 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java
@@ -40,12 +40,14 @@ public final class EntryImpl extends AbstractCASReferenceCounted implements Entr
     };
 
     private final Handle<EntryImpl> recyclerHandle;
+    private long timestamp;
     private long ledgerId;
     private long entryId;
     ByteBuf data;
 
     public static EntryImpl create(LedgerEntry ledgerEntry) {
         EntryImpl entry = RECYCLER.get();
+        entry.timestamp = System.nanoTime();
         entry.ledgerId = ledgerEntry.getLedgerId();
         entry.entryId = ledgerEntry.getEntryId();
         entry.data = ledgerEntry.getEntryBuffer();
@@ -57,6 +59,7 @@ public final class EntryImpl extends AbstractCASReferenceCounted implements Entr
     // Used just for tests
     public static EntryImpl create(long ledgerId, long entryId, byte[] data) {
         EntryImpl entry = RECYCLER.get();
+        entry.timestamp = System.nanoTime();
         entry.ledgerId = ledgerId;
         entry.entryId = entryId;
         entry.data = Unpooled.wrappedBuffer(data);
@@ -66,6 +69,7 @@ public final class EntryImpl extends AbstractCASReferenceCounted implements Entr
 
     public static EntryImpl create(long ledgerId, long entryId, ByteBuf data) {
         EntryImpl entry = RECYCLER.get();
+        entry.timestamp = System.nanoTime();
         entry.ledgerId = ledgerId;
         entry.entryId = entryId;
         entry.data = data;
@@ -76,6 +80,7 @@ public final class EntryImpl extends AbstractCASReferenceCounted implements Entr
 
     public static EntryImpl create(PositionImpl position, ByteBuf data) {
         EntryImpl entry = RECYCLER.get();
+        entry.timestamp = System.nanoTime();
         entry.ledgerId = position.getLedgerId();
         entry.entryId = position.getEntryId();
         entry.data = data;
@@ -86,6 +91,7 @@ public final class EntryImpl extends AbstractCASReferenceCounted implements Entr
 
     public static EntryImpl create(EntryImpl other) {
         EntryImpl entry = RECYCLER.get();
+        entry.timestamp = System.nanoTime();
         entry.ledgerId = other.ledgerId;
         entry.entryId = other.entryId;
         entry.data = other.data.retainedDuplicate();
@@ -97,6 +103,10 @@ public final class EntryImpl extends AbstractCASReferenceCounted implements Entr
         this.recyclerHandle = recyclerHandle;
     }
 
+    public long getTimestamp() {
+        return timestamp;
+    }
+
     @Override
     public ByteBuf getDataBuffer() {
         return data;
@@ -152,6 +162,7 @@ public final class EntryImpl extends AbstractCASReferenceCounted implements Entr
         // This method is called whenever the ref-count of the EntryImpl reaches 0, so that now we can recycle it
         data.release();
         data = null;
+        timestamp = -1;
         ledgerId = -1;
         entryId = -1;
         recyclerHandle.recycle(this);
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index b9f63a8..7399233 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -24,6 +24,8 @@ import static org.apache.bookkeeper.mledger.ManagedLedgerException.getManagedLed
 import com.google.common.base.Predicates;
 import com.google.common.collect.Maps;
 
+import io.netty.util.concurrent.DefaultThreadFactory;
+
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -32,6 +34,8 @@ import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
@@ -82,6 +86,8 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
     protected final OrderedScheduler scheduledExecutor;
     private final OrderedExecutor orderedExecutor;
 
+    private final ExecutorService cacheEvictionExecutor;
+
     protected final ManagedLedgerFactoryMBeanImpl mbean;
 
     protected final ConcurrentHashMap<String, CompletableFuture<ManagedLedgerImpl>> ledgers = new ConcurrentHashMap<>();
@@ -89,6 +95,9 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
 
     private long lastStatTimestamp = System.nanoTime();
     private final ScheduledFuture<?> statsTask;
+
+    private final long cacheEvictionTimeThresholdNanos;
+
     private static final int StatsPeriodSeconds = 60;
 
     public ManagedLedgerFactoryImpl(ClientConfiguration bkClientConfiguration) throws Exception {
@@ -130,6 +139,8 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
                 .numThreads(config.getNumManagedLedgerWorkerThreads())
                 .name("bookkeeper-ml-workers")
                 .build();
+        cacheEvictionExecutor = Executors
+                .newSingleThreadExecutor(new DefaultThreadFactory("bookkeeper-ml-cache-eviction"));
 
         this.bookKeeper = bookKeeper;
         this.isBookkeeperManaged = isBookkeeperManaged;
@@ -139,6 +150,13 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
         this.mbean = new ManagedLedgerFactoryMBeanImpl(this);
         this.entryCacheManager = new EntryCacheManager(this);
         this.statsTask = scheduledExecutor.scheduleAtFixedRate(() -> refreshStats(), 0, StatsPeriodSeconds, TimeUnit.SECONDS);
+
+
+        this.cacheEvictionTimeThresholdNanos = TimeUnit.MILLISECONDS
+                .toNanos(config.getCacheEvictionTimeThresholdMillis());
+
+
+        cacheEvictionExecutor.execute(this::cacheEvictionTask);
     }
 
     private synchronized void refreshStats() {
@@ -147,15 +165,48 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
 
         mbean.refreshStats(period, TimeUnit.NANOSECONDS);
         ledgers.values().forEach(mlfuture -> {
-            ManagedLedgerImpl ml = mlfuture.getNow(null);
-            if (ml != null) {
-                ml.mbean.refreshStats(period, TimeUnit.NANOSECONDS);
+            if (mlfuture.isDone() && !mlfuture.isCompletedExceptionally()) {
+                ManagedLedgerImpl ml = mlfuture.getNow(null);
+                if (ml != null) {
+                    ml.mbean.refreshStats(period, TimeUnit.NANOSECONDS);
+                }
             }
         });
 
         lastStatTimestamp = now;
     }
 
+    private void cacheEvictionTask() {
+        double evictionFrequency = Math.max(Math.min(config.getCacheEvictionFrequency(), 1000.0), 0.001);
+        long waitTimeMillis = (long) (1000 / evictionFrequency);
+
+        while (true) {
+            try {
+                doCacheEviction();
+
+                Thread.sleep(waitTimeMillis);
+            } catch (InterruptedException e) {
+                // Factory is shutting down
+                return;
+            } catch (Throwable t) {
+                log.warn("Exception while performing cache eviction: {}", t.getMessage(), t);
+            }
+        }
+    }
+
+    private synchronized void doCacheEviction() {
+        long maxTimestamp = System.nanoTime() - cacheEvictionTimeThresholdNanos;
+
+        ledgers.values().forEach(mlfuture -> {
+            if (mlfuture.isDone() && !mlfuture.isCompletedExceptionally()) {
+                ManagedLedgerImpl ml = mlfuture.getNow(null);
+                if (ml != null) {
+                    ml.doCacheEviction(maxTimestamp);
+                }
+            }
+        });
+    }
+
     /**
      * Helper for getting stats.
      *
@@ -366,6 +417,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
 
         scheduledExecutor.shutdown();
         orderedExecutor.shutdown();
+        cacheEvictionExecutor.shutdownNow();
 
         entryCacheManager.clear();
     }
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 940fb7f..6b9f529 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -20,10 +20,25 @@ package org.apache.bookkeeper.mledger.impl;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static java.lang.Math.min;
+import static org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.FALSE;
+import static org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.TRUE;
 import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
 
+import com.google.common.collect.BoundType;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Queues;
+import com.google.common.collect.Range;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.util.Recycler;
+import io.netty.util.Recycler.Handle;
+
 import java.time.Clock;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -62,13 +77,13 @@ import org.apache.bookkeeper.common.util.Backoff;
 import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.common.util.Retries;
-import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteLedgerCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.OffloadCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.TerminateCallback;
 import org.apache.bookkeeper.mledger.Entry;
@@ -99,35 +114,15 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats.OffloadContext;
 import org.apache.bookkeeper.mledger.util.CallbackMutex;
 import org.apache.bookkeeper.mledger.util.Futures;
 import org.apache.commons.lang3.tuple.Pair;
-import org.apache.pulsar.common.api.Commands;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
-import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
 import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.BoundType;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Queues;
-import com.google.common.collect.Range;
-import com.google.common.util.concurrent.RateLimiter;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-import io.netty.util.Recycler;
-import io.netty.util.Recycler.Handle;
-import java.util.HashMap;
-import static org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.TRUE;
-import static org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.FALSE;
-
 public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
     private final static long MegaByte = 1024 * 1024;
 
     protected final static int AsyncOperationTimeoutSeconds = 30;
-    private final static long maxActiveCursorBacklogEntries = 100;
-    private static long maxMessageCacheRetentionTimeMillis = 10 * 1000;
 
     protected final BookKeeper bookKeeper;
     protected final String name;
@@ -160,8 +155,6 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
     @SuppressWarnings("unused")
     private volatile long totalSize = 0;
 
-    private RateLimiter updateCursorRateLimit;
-
     // Cursors that are waiting to be notified when new entries are persisted
     final ConcurrentLinkedQueue<ManagedCursorImpl> waitingCursors;
 
@@ -235,6 +228,9 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
     private static final AtomicLongFieldUpdater<ManagedLedgerImpl> READ_OP_COUNT_UPDATER = AtomicLongFieldUpdater
             .newUpdater(ManagedLedgerImpl.class, "readOpCount");
     private volatile long readOpCount = 0;
+
+    private final long backloggedCursorThresholdEntries;
+
     // last read-operation's callback to check read-timeout on it.
     private volatile ReadEntryCallbackWrapper lastReadCallback = null;
 
@@ -267,8 +263,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         this.entryCache = factory.getEntryCacheManager().getEntryCache(this);
         this.waitingCursors = Queues.newConcurrentLinkedQueue();
         this.uninitializedCursors = Maps.newHashMap();
-        this.updateCursorRateLimit = RateLimiter.create(1);
         this.clock = config.getClock();
+        this.backloggedCursorThresholdEntries = factory.getConfig().getThresholdBackloggedCursor();
 
         // Get the next rollover time. Add a random value upto 5% to avoid rollover multiple ledgers at the same time
         this.maximumRolloverTimeMs = (long) (config.getMaximumRolloverTimeMs() * (1 + random.nextDouble() * 5 / 100.0));
@@ -904,63 +900,14 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
 
     @Override
     public void checkBackloggedCursors() {
-
         // activate caught up cursors
         cursors.forEach(cursor -> {
-            if (cursor.getNumberOfEntries() < maxActiveCursorBacklogEntries) {
+            if (cursor.getNumberOfEntries() < backloggedCursorThresholdEntries) {
                 cursor.setActive();
+            } else {
+                cursor.setInactive();
             }
         });
-
-        // deactivate backlog cursors
-        Iterator<ManagedCursor> cursors = activeCursors.iterator();
-        while (cursors.hasNext()) {
-            ManagedCursor cursor = cursors.next();
-            long backlogEntries = cursor.getNumberOfEntries();
-            if (backlogEntries > maxActiveCursorBacklogEntries) {
-                PositionImpl readPosition = (PositionImpl) cursor.getReadPosition();
-                readPosition = isValidPosition(readPosition) ? readPosition : getNextValidPosition(readPosition);
-                if (readPosition == null) {
-                    if (log.isDebugEnabled()) {
-                        log.debug("[{}] Couldn't find valid read position [{}] {}", name, cursor.getName(),
-                                cursor.getReadPosition());
-                    }
-                    continue;
-                }
-                try {
-                    asyncReadEntry(readPosition, new ReadEntryCallback() {
-
-                        @Override
-                        public void readEntryFailed(ManagedLedgerException e, Object ctx) {
-                            log.warn("[{}] Failed while reading entries on [{}] {}", name, cursor.getName(),
-                                    e.getMessage(), e);
-
-                        }
-
-                        @Override
-                        public void readEntryComplete(Entry entry, Object ctx) {
-                            MessageMetadata msgMetadata = null;
-                            try {
-                                msgMetadata = Commands.parseMessageMetadata(entry.getDataBuffer());
-                                long msgTimeSincePublish = (clock.millis() - msgMetadata.getPublishTime());
-                                if (msgTimeSincePublish > maxMessageCacheRetentionTimeMillis) {
-                                    cursor.setInactive();
-                                }
-                            } finally {
-                                if (msgMetadata != null) {
-                                    msgMetadata.recycle();
-                                }
-                                entry.release();
-                            }
-
-                        }
-                    }, null);
-                } catch (Exception e) {
-                    log.warn("[{}] Failed while reading entries from cache on [{}] {}", name, cursor.getName(),
-                            e.getMessage(), e);
-                }
-            }
-        }
     }
 
     @Override
@@ -1578,13 +1525,6 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
                     lastEntry);
         }
         asyncReadEntry(ledger, firstEntry, lastEntry, false, opReadEntry, opReadEntry.ctx);
-
-        if (updateCursorRateLimit.tryAcquire()) {
-            if (isCursorActive(cursor)) {
-                final PositionImpl lastReadPosition = PositionImpl.get(ledger.getId(), lastEntry);
-                discardEntriesFromCache(cursor, lastReadPosition);
-            }
-        }
     }
 
     protected void asyncReadEntry(ReadHandle ledger, PositionImpl position, ReadEntryCallback callback, Object ctx) {
@@ -1780,6 +1720,31 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         }
     }
 
+    void doCacheEviction(long maxTimestamp) {
+        // Always remove all entries already read by active cursors
+        PositionImpl slowestReaderPos = getEarlierReadPositionForActiveCursors();
+        if (slowestReaderPos != null) {
+            entryCache.invalidateEntries(slowestReaderPos);
+        }
+
+        // Remove entries older than the cutoff threshold
+        entryCache.invalidateEntriesBeforeTimestamp(maxTimestamp);
+    }
+
+    private PositionImpl getEarlierReadPositionForActiveCursors() {
+        PositionImpl smallest = null;
+        for (ManagedCursor cursor : activeCursors) {
+            PositionImpl p = (PositionImpl) cursor.getReadPosition();
+            if (smallest == null) {
+                smallest = p;
+            } else if (p.compareTo(smallest) < 0) {
+                smallest = p;
+            }
+        }
+
+        return smallest;
+    }
+
     void updateCursor(ManagedCursorImpl cursor, PositionImpl newPosition) {
         Pair<PositionImpl, PositionImpl> pair = cursors.cursorUpdated(cursor, newPosition);
         if (pair == null) {
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java
index b9b3ace..e64a40d 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java
@@ -43,12 +43,13 @@ public class RangeCache<Key extends Comparable<Key>, Value extends ReferenceCoun
     private final ConcurrentNavigableMap<Key, Value> entries;
     private AtomicLong size; // Total size of values stored in cache
     private final Weighter<Value> weighter; // Weighter object used to extract the size from values
+    private final TimestampExtractor<Value> timestampExtractor; // Extract the timestamp associated with a value
 
     /**
      * Construct a new RangeLruCache with default Weighter.
      */
     public RangeCache() {
-        this(new DefaultWeighter<Value>());
+        this(new DefaultWeighter<Value>(), (x) -> System.nanoTime());
     }
 
     /**
@@ -57,10 +58,11 @@ public class RangeCache<Key extends Comparable<Key>, Value extends ReferenceCoun
      * @param weighter
      *            a custom weighter to compute the size of each stored value
      */
-    public RangeCache(Weighter<Value> weighter) {
+    public RangeCache(Weighter<Value> weighter, TimestampExtractor<Value> timestampExtractor) {
         this.size = new AtomicLong(0);
         this.entries = new ConcurrentSkipListMap<>();
         this.weighter = weighter;
+        this.timestampExtractor = timestampExtractor;
     }
 
     /**
@@ -176,6 +178,34 @@ public class RangeCache<Key extends Comparable<Key>, Value extends ReferenceCoun
     }
 
     /**
+    *
+    * @param maxTimestamp the max timestamp of the entries to be evicted
+    * @return the tota
+    */
+   public long evictLEntriesBeforeTimestamp(long maxTimestamp) {
+       long removedSize = 0;
+
+       while (true) {
+           Map.Entry<Key, Value> entry = entries.firstEntry();
+           if (entry == null || timestampExtractor.getTimestamp(entry.getValue()) > maxTimestamp) {
+               break;
+           }
+
+           entry = entries.pollFirstEntry();
+           if (entry == null) {
+               break;
+           }
+
+           Value value = entry.getValue();
+           removedSize += weighter.getSize(value);
+           value.release();
+       }
+
+       size.addAndGet(-removedSize);
+       return removedSize;
+   }
+
+    /**
      * Just for testing. Getting the number of entries is very expensive on the conncurrent map
      */
     protected long getNumberOfEntries() {
@@ -218,6 +248,15 @@ public class RangeCache<Key extends Comparable<Key>, Value extends ReferenceCoun
     }
 
     /**
+     * Interface of a object that is able to the extract the "timestamp" of the cached values.
+     *
+     * @param <ValueT>
+     */
+    public interface TimestampExtractor<ValueT> {
+        long getTimestamp(ValueT value);
+    }
+
+    /**
      * Default cache weighter, every value is assumed the same cost.
      *
      * @param <Value>
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java
index 47c515d..69c23e3 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java
@@ -28,6 +28,9 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
 import org.testng.annotations.BeforeClass;
@@ -99,15 +102,15 @@ public class EntryCacheManagerTest extends MockedBookKeeperTestCase {
         assertEquals(cacheManager.getSize(), 3);
         assertEquals(cache2.getSize(), 3);
 
-        // Should remove 2 entries
+        // Should remove 1 entry
         cache2.invalidateEntries(new PositionImpl(2, 1));
-        assertEquals(cacheManager.getSize(), 1);
-        assertEquals(cache2.getSize(), 1);
+        assertEquals(cacheManager.getSize(), 2);
+        assertEquals(cache2.getSize(), 2);
 
         cacheManager.mlFactoryMBean.refreshStats(1, TimeUnit.SECONDS);
 
         assertEquals(cacheManager.mlFactoryMBean.getCacheMaxSize(), 10);
-        assertEquals(cacheManager.mlFactoryMBean.getCacheUsedSize(), 1);
+        assertEquals(cacheManager.mlFactoryMBean.getCacheUsedSize(), 2);
         assertEquals(cacheManager.mlFactoryMBean.getCacheHitsRate(), 0.0);
         assertEquals(cacheManager.mlFactoryMBean.getCacheMissesRate(), 0.0);
         assertEquals(cacheManager.mlFactoryMBean.getCacheHitsThroughput(), 0.0);
@@ -207,6 +210,7 @@ public class EntryCacheManagerTest extends MockedBookKeeperTestCase {
         ManagedLedgerFactoryConfig config = new ManagedLedgerFactoryConfig();
         config.setMaxCacheSize(7 * 10);
         config.setCacheEvictionWatermark(0.8);
+        config.setCacheEvictionFrequency(1);
 
         factory = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle(), config);
 
@@ -263,10 +267,47 @@ public class EntryCacheManagerTest extends MockedBookKeeperTestCase {
         entries.forEach(e -> e.release());
 
         cacheManager.mlFactoryMBean.refreshStats(1, TimeUnit.SECONDS);
-        assertEquals(cacheManager.mlFactoryMBean.getCacheUsedSize(), 0);
+        assertEquals(cacheManager.mlFactoryMBean.getCacheUsedSize(), 7);
         assertEquals(cacheManager.mlFactoryMBean.getCacheHitsRate(), 0.0);
         assertEquals(cacheManager.mlFactoryMBean.getCacheMissesRate(), 0.0);
         assertEquals(cacheManager.mlFactoryMBean.getCacheHitsThroughput(), 0.0);
         assertEquals(cacheManager.mlFactoryMBean.getNumberOfCacheEvictions(), 0);
     }
+
+    @Test
+    void verifyTimeBasedEviction() throws Exception {
+        ManagedLedgerFactoryConfig config = new ManagedLedgerFactoryConfig();
+        config.setMaxCacheSize(1000);
+        config.setCacheEvictionFrequency(100);
+        config.setCacheEvictionTimeThresholdMillis(100);
+
+        ManagedLedgerFactoryImpl factory = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle(), config);
+
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("test");
+        ManagedCursor c1 = ledger.openCursor("c1");
+        c1.setActive();
+        ManagedCursor c2 = ledger.openCursor("c2");
+        c2.setActive();
+
+        EntryCacheManager cacheManager = factory.getEntryCacheManager();
+        assertEquals(cacheManager.getSize(), 0);
+
+        EntryCache cache = cacheManager.getEntryCache(ledger);
+        assertEquals(cache.getSize(), 0);
+
+        ledger.addEntry(new byte[4]);
+        ledger.addEntry(new byte[3]);
+
+        // Cache eviction should happen every 10 millis and clean all the entries older that 100ms
+        Thread.sleep(1000);
+
+        c1.close();
+        c2.close();
+
+        assertEquals(cacheManager.getSize(), 0);
+        assertEquals(cache.getSize(), 0);
+
+        factory.shutdown();
+    }
+
 }
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index d0abbbb..35725e6 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -22,7 +22,6 @@ import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyInt;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
@@ -31,6 +30,13 @@ import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
+import com.google.common.base.Charsets;
+import com.google.common.collect.Sets;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.Unpooled;
+
 import java.io.IOException;
 import java.lang.reflect.Field;
 import java.lang.reflect.Modifier;
@@ -38,7 +44,6 @@ import java.nio.charset.Charset;
 import java.security.GeneralSecurityException;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.EnumSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
@@ -60,13 +65,10 @@ import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.client.LedgerHandle;
-import org.apache.bookkeeper.client.api.LedgerEntries;
-import org.apache.bookkeeper.client.api.ReadHandle;
 import org.apache.bookkeeper.client.PulsarMockBookKeeper;
 import org.apache.bookkeeper.client.PulsarMockLedgerHandle;
 import org.apache.bookkeeper.client.api.LedgerEntries;
 import org.apache.bookkeeper.client.api.ReadHandle;
-import org.apache.bookkeeper.client.api.WriteFlag;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
@@ -86,6 +88,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerFencedE
 import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerNotFoundException;
 import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
 import org.apache.bookkeeper.mledger.impl.MetaStore.Stat;
@@ -105,13 +108,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.annotations.Test;
 
-import com.google.common.base.Charsets;
-import com.google.common.collect.Sets;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.buffer.Unpooled;
-
 public class ManagedLedgerTest extends MockedBookKeeperTestCase {
 
     private static final Logger log = LoggerFactory.getLogger(ManagedLedgerTest.class);
@@ -1356,21 +1352,21 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
         assertEquals(cacheManager.getSize(), entryCache.getSize());
 
         c1.setReadPosition(p2);
-        ledger.discardEntriesFromCache(c1, p1);
+        ledger.discardEntriesFromCache(c1, p2);
         assertEquals(entryCache.getSize(), 7 * 3);
         assertEquals(cacheManager.getSize(), entryCache.getSize());
 
         c1.setReadPosition(p3);
-        ledger.discardEntriesFromCache(c1, p2);
-        assertEquals(entryCache.getSize(), 7 * 2);
+        ledger.discardEntriesFromCache(c1, p3);
+        assertEquals(entryCache.getSize(), 7 * 3);
         assertEquals(cacheManager.getSize(), entryCache.getSize());
 
         ledger.deactivateCursor(c1);
-        assertEquals(entryCache.getSize(), 7 * 2); // as c2.readPosition=p3 => Cache contains p3,p4
+        assertEquals(entryCache.getSize(), 7 * 3); // as c2.readPosition=p3 => Cache contains p3,p4
         assertEquals(cacheManager.getSize(), entryCache.getSize());
 
         c2.setReadPosition(p4);
-        ledger.discardEntriesFromCache(c2, p3);
+        ledger.discardEntriesFromCache(c2, p4);
         assertEquals(entryCache.getSize(), 7);
         assertEquals(cacheManager.getSize(), entryCache.getSize());
 
@@ -1903,6 +1899,9 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
      */
     @Test
     public void testActiveDeactiveCursorWithDiscardEntriesFromCache() throws Exception {
+        ManagedLedgerFactoryConfig conf = new ManagedLedgerFactoryConfig();
+        conf.setCacheEvictionFrequency(0.1);
+        ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(bkc, zkc, conf);
         ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("cache_eviction_ledger");
 
         // Open Cursor also adds cursor into activeCursor-container
@@ -1954,13 +1953,14 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
         }
 
         // (3) Validate: cache should remove all entries read by both active cursors
-		log.info("expected, found : {}, {}", (5 * (totalInsertedEntries - readEntries)), entryCache.getSize());
-        assertEquals((5 * (totalInsertedEntries - readEntries)), entryCache.getSize());
+		log.info("expected, found : {}, {}", (5 * (totalInsertedEntries)), entryCache.getSize());
+        assertEquals((5 * totalInsertedEntries), entryCache.getSize());
 
         final int remainingEntries = totalInsertedEntries - readEntries;
         entries1 = cursor1.readEntries(remainingEntries);
         // Acknowledge only on last entry
         cursor1.markDelete(entries1.get(entries1.size() - 1).getPosition());
+
         for (Entry entry : entries1) {
             log.info("Read entry. Position={} Content='{}'", entry.getPosition(), new String(entry.getData()));
             entry.release();
@@ -1968,16 +1968,18 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
 
         // (4) Validate: cursor2 is active cursor and has not read these entries yet: so, cache should not remove these
         // entries
-        assertEquals((5 * (remainingEntries)), entryCache.getSize());
+        assertEquals((5 * totalInsertedEntries), entryCache.getSize());
 
+        ledger.deactivateCursor(cursor1);
         ledger.deactivateCursor(cursor2);
 
         // (5) Validate: cursor2 is not active cursor now: cache should have removed all entries read by active cursor1
-        assertEquals(0, entryCache.getSize());
+        assertEquals(entryCache.getSize(), 0);
 
         log.info("Finished reading entries");
 
         ledger.close();
+        factory.shutdown();
     }
 
     @Test
@@ -2017,7 +2019,8 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
             entry.release();
         }
 
-        // (3) Validate: cache discards all entries as read by active cursor
+        // (3) Validate: cache discards all entries after all cursors are deactivated
+        ledger.deactivateCursor(cursor1);
         assertEquals(0, entryCache.getSize());
 
         ledger.close();
@@ -2043,26 +2046,18 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
 
     @Test
     public void testBacklogCursor() throws Exception {
+        int backloggedThreshold = 10;
+        ManagedLedgerFactoryConfig factoryConf = new ManagedLedgerFactoryConfig();
+        factoryConf.setThresholdBackloggedCursor(backloggedThreshold);
+        ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(bkc, zkc, factoryConf);
         ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("cache_backlog_ledger");
 
-        final long maxMessageCacheRetentionTimeMillis = 100;
-        Field field = ManagedLedgerImpl.class.getDeclaredField("maxMessageCacheRetentionTimeMillis");
-        field.setAccessible(true);
-        Field modifiersField = Field.class.getDeclaredField("modifiers");
-        modifiersField.setAccessible(true);
-        modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
-        field.set(ledger, maxMessageCacheRetentionTimeMillis);
-        Field backlogThresholdField = ManagedLedgerImpl.class.getDeclaredField("maxActiveCursorBacklogEntries");
-        backlogThresholdField.setAccessible(true);
-        final long maxActiveCursorBacklogEntries = (long) backlogThresholdField.get(ledger);
-
         // Open Cursor also adds cursor into activeCursor-container
         ManagedCursor cursor1 = ledger.openCursor("c1");
         ManagedCursor cursor2 = ledger.openCursor("c2");
 
-        final int totalBacklogSizeEntries = (int) maxActiveCursorBacklogEntries;
-        CountDownLatch latch = new CountDownLatch(totalBacklogSizeEntries);
-        for (int i = 0; i < totalBacklogSizeEntries + 1; i++) {
+        CountDownLatch latch = new CountDownLatch(backloggedThreshold);
+        for (int i = 0; i < backloggedThreshold + 1; i++) {
             String content = "entry"; // 5 bytes
             ByteBuf entry = getMessageWithMetadata(content.getBytes());
             ledger.asyncAddEntry(entry, new AddEntryCallback() {
@@ -2086,12 +2081,8 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
         assertTrue(cursor1.isActive());
         assertTrue(cursor2.isActive());
 
-        // it allows message to be older enough to be considered in backlog
-        Thread.sleep(maxMessageCacheRetentionTimeMillis * 2);
-
         // deactivate backlog cursors
         ledger.checkBackloggedCursors();
-        Thread.sleep(100);
 
         // both cursors have to be inactive
         assertFalse(cursor1.isActive());
@@ -2114,6 +2105,8 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
         assertFalse(cursor2.isActive());
 
         ledger.close();
+
+        factory.shutdown();
     }
 
     @Test
@@ -2240,12 +2233,12 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
 
         assertFalse(factory.getManagedLedgers().containsKey("testManagedLedgerWithoutAutoCreate"));
     }
-    
+
     @Test
     public void testManagedLedgerWithCreateLedgerTimeOut() throws Exception {
         ManagedLedgerConfig config = new ManagedLedgerConfig().setMetadataOperationsTimeoutSeconds(3);
         ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("timeout_ledger_test", config);
-        
+
         BookKeeper bk = mock(BookKeeper.class);
         doNothing().when(bk).asyncCreateLedger(anyInt(), anyInt(), anyInt(), any(), any(), any(), any(), any());
         AtomicInteger response = new AtomicInteger(0);
@@ -2260,13 +2253,13 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
 
         latch.await(config.getMetadataOperationsTimeoutSeconds() + 2, TimeUnit.SECONDS);
         assertEquals(response.get(), BKException.Code.TimeoutException);
-        
+
         ledger.close();
     }
-    
+
     /**
      * It verifies that asyncRead timesout if it doesn't receive response from bk-client in configured timeout
-     * 
+     *
      * @throws Exception
      */
     @Test
@@ -2339,8 +2332,8 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
     /**
      * It verifies that if bk-client doesn't complete the add-entry in given time out then broker is resilient enought
      * to create new ledger and add entry successfully.
-     * 
-     * 
+     *
+     *
      * @throws Exception
      */
     @Test(timeOut = 20000)
@@ -2405,7 +2398,7 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
         field.setAccessible(true);
         field.set(classObj, fieldValue);
     }
-    
+
     public static void retryStrategically(Predicate<Void> predicate, int retryCount, long intSleepTimeInMillis)
             throws Exception {
         for (int i = 0; i < retryCount; i++) {
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/RangeCacheTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/RangeCacheTest.java
index d1d2e5d..27e6381 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/RangeCacheTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/RangeCacheTest.java
@@ -111,7 +111,7 @@ public class RangeCacheTest {
 
     @Test
     void customWeighter() {
-        RangeCache<Integer, RefString> cache = new RangeCache<>(value -> value.s.length());
+        RangeCache<Integer, RefString> cache = new RangeCache<>(value -> value.s.length(), x -> 0);
 
         cache.put(0, new RefString("zero"));
         cache.put(1, new RefString("one"));
@@ -121,6 +121,25 @@ public class RangeCacheTest {
     }
 
     @Test
+    void customTimeExtraction() {
+        RangeCache<Integer, RefString> cache = new RangeCache<>(value -> value.s.length(), x -> x.s.length());
+
+        cache.put(1, new RefString("1"));
+        cache.put(2, new RefString("22"));
+        cache.put(3, new RefString("333"));
+        cache.put(4, new RefString("4444"));
+
+        assertEquals(cache.getSize(), 10);
+        assertEquals(cache.getNumberOfEntries(), 4);
+
+        long evictedSize = cache.evictLEntriesBeforeTimestamp(3);
+        assertEquals(evictedSize, 6);
+
+        assertEquals(cache.getSize(), 4);
+        assertEquals(cache.getNumberOfEntries(), 1);
+    }
+
+    @Test
     void doubleInsert() {
         RangeCache<Integer, RefString> cache = new RangeCache<>();
 
@@ -172,7 +191,7 @@ public class RangeCacheTest {
 
     @Test
     void eviction() {
-        RangeCache<Integer, RefString> cache = new RangeCache<>(value -> value.s.length());
+        RangeCache<Integer, RefString> cache = new RangeCache<>(value -> value.s.length(), x -> 0);
 
         cache.put(0, new RefString("zero"));
         cache.put(1, new RefString("one"));
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index b600ac7..3825a86 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -768,6 +768,16 @@ public class ServiceConfiguration implements PulsarConfiguration {
         doc = "Threshold to which bring down the cache level when eviction is triggered"
     )
     private double managedLedgerCacheEvictionWatermark = 0.9f;
+    @FieldContext(category = CATEGORY_STORAGE_ML,
+            doc = "Configure the cache eviction frequency for the managed ledger cache. Default is 100/s")
+    private double managedLedgerCacheEvictionFrequency = 100.0;
+    @FieldContext(category = CATEGORY_STORAGE_ML,
+            doc = "All entries that have stayed in cache for more than the configured time, will be evicted")
+    private long managedLedgerCacheEvictionTimeThresholdMillis = 1000;
+    @FieldContext(category = CATEGORY_STORAGE_ML,
+            doc = "Configure the threshold (in number of entries) from where a cursor should be considered 'backlogged'"
+                    + " and thus should be set as inactive.")
+    private long managedLedgerCursorBackloggedThreshold = 1000;
     @FieldContext(
         category = CATEGORY_STORAGE_ML,
         doc = "Rate limit the amount of writes per second generated by consumer acking the messages"
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
index 2b82204..e3a52d8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
@@ -47,6 +47,9 @@ public class ManagedLedgerClientFactory implements Closeable {
         managedLedgerFactoryConfig.setCacheEvictionWatermark(conf.getManagedLedgerCacheEvictionWatermark());
         managedLedgerFactoryConfig.setNumManagedLedgerWorkerThreads(conf.getManagedLedgerNumWorkerThreads());
         managedLedgerFactoryConfig.setNumManagedLedgerSchedulerThreads(conf.getManagedLedgerNumSchedulerThreads());
+        managedLedgerFactoryConfig.setCacheEvictionFrequency(conf.getManagedLedgerCacheEvictionFrequency());
+        managedLedgerFactoryConfig.setCacheEvictionTimeThresholdMillis(conf.getManagedLedgerCacheEvictionTimeThresholdMillis());
+        managedLedgerFactoryConfig.setThresholdBackloggedCursor(conf.getManagedLedgerCursorBackloggedThreshold());
 
         this.managedLedgerFactory = new ManagedLedgerFactoryImpl(bkClient, zkClient, managedLedgerFactoryConfig);
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 23ca67a..ee4b05f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -568,9 +568,12 @@ public class PersistentTopic implements Topic, AddEntryCallback {
 
         subscriptionFuture.thenAccept(subscription -> {
             try {
+                ledger.checkBackloggedCursors();
+
                 Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, consumerName,
                                                  maxUnackedMessages, cnx, cnx.getRole(), metadata, readCompacted, initialPosition);
                 subscription.addConsumer(consumer);
+
                 if (!cnx.isActive()) {
                     consumer.close();
                     if (log.isDebugEnabled()) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java
index fc05d98..63c974d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java
@@ -36,6 +36,7 @@ public class ConsumerRedeliveryTest extends ProducerConsumerBase {
     @BeforeClass
     @Override
     protected void setup() throws Exception {
+        conf.setManagedLedgerCacheEvictionFrequency(0.1);
         super.internalSetup();
         super.producerBaseSetup();
     }
@@ -62,7 +63,7 @@ public class ConsumerRedeliveryTest extends ProducerConsumerBase {
 
         conf.setManagedLedgerMaxEntriesPerLedger(2);
         conf.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
-        
+
         ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer().topic(topic)
                 .producerName("my-producer-name");
         Producer<byte[]> producer = producerBuilder.create();
@@ -99,7 +100,7 @@ public class ConsumerRedeliveryTest extends ProducerConsumerBase {
             Message<byte[]> message = consumer1.receive(5, TimeUnit.SECONDS);
             MessageIdImpl msgId = (MessageIdImpl) message.getMessageId();
             if (lastMsgId != null) {
-                assertTrue(lastMsgId.getLedgerId() <= msgId.getLedgerId());
+                assertTrue(lastMsgId.getLedgerId() <= msgId.getLedgerId(), "lastMsgId: " + lastMsgId + " -- msgId: " + msgId);
             }
             lastMsgId = msgId;
         }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 74dd59d..7192e7f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -796,9 +796,6 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
         producer.send("message".getBytes());
         msg = subscriber1.receive(5, TimeUnit.SECONDS);
 
-        // Verify: cache has to be cleared as there is no message needs to be consumed by active subscriber
-        assertEquals(entryCache.getSize(), 0, 1);
-
         /************ usecase-2: *************/
         // 1.b Subscriber slower-subscriber
         Consumer<byte[]> subscriber2 = pulsarClient.newConsumer()
@@ -869,16 +866,8 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
         ManagedLedgerImpl ledger = (ManagedLedgerImpl) topicRef.getManagedLedger();
 
         // reflection to set/get cache-backlog fields value:
-        final long maxMessageCacheRetentionTimeMillis = 100;
-        Field backlogThresholdField = ManagedLedgerImpl.class.getDeclaredField("maxActiveCursorBacklogEntries");
-        backlogThresholdField.setAccessible(true);
-        Field field = ManagedLedgerImpl.class.getDeclaredField("maxMessageCacheRetentionTimeMillis");
-        field.setAccessible(true);
-        Field modifiersField = Field.class.getDeclaredField("modifiers");
-        modifiersField.setAccessible(true);
-        modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
-        field.set(ledger, maxMessageCacheRetentionTimeMillis);
-        final long maxActiveCursorBacklogEntries = (long) backlogThresholdField.get(ledger);
+        final long maxMessageCacheRetentionTimeMillis = conf.getManagedLedgerCacheEvictionTimeThresholdMillis();
+        final long maxActiveCursorBacklogEntries = conf.getManagedLedgerCursorBackloggedThreshold();
 
         Message<byte[]> msg = null;
         final int totalMsgs = (int) maxActiveCursorBacklogEntries + receiverSize + 1;
@@ -2911,16 +2900,16 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
 
     /**
      * This test verifies that broker activates fail-over consumer by considering priority-level as well.
-     * 
+     *
      * <pre>
      * 1. Start two failover consumer with same priority level, broker selects consumer based on name-sorting (consumer1).
      * 2. Switch non-active consumer to active (consumer2): by giving it higher priority
      * Partitioned-topic with 9 partitions:
      * 1. C1 (priority=1)
      * 2. C2,C3,C4 (priority=0)
-     * So, broker should evenly distribute C2,C3,C4 active consumers among 9 partitions. 
+     * So, broker should evenly distribute C2,C3,C4 active consumers among 9 partitions.
      * </pre>
-     * 
+     *
      * @throws Exception
      */
     @Test
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
index 161c374..fecef79 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
@@ -687,9 +687,6 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
         producer.send("message".getBytes());
         msg = subscriber1.receive(5, TimeUnit.SECONDS);
 
-        // Verify: cache has to be cleared as there is no message needs to be consumed by active subscriber
-        assertEquals(entryCache.getSize(), 0, 1);
-
         /************ usecase-2: *************/
         // 1.b Subscriber slower-subscriber
         Consumer<byte[]> subscriber2 = pulsarClient.newConsumer()
@@ -732,93 +729,6 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
         log.info("-- Exiting {} test --", methodName);
     }
 
-    @Test
-    public void testDeactivatingBacklogConsumer() throws Exception {
-        log.info("-- Starting {} test --", methodName);
-
-        final long batchMessageDelayMs = 100;
-        final int receiverSize = 10;
-        final String topicName = "cache-topic";
-        final String topic = "persistent://my-property/use/my-ns/" + topicName;
-        final String sub1 = "faster-sub1";
-        final String sub2 = "slower-sub2";
-
-        // 1. Subscriber Faster subscriber: let it consume all messages immediately
-        Consumer<byte[]> subscriber1 = pulsarClient.newConsumer()
-                .topic("persistent://my-property/use/my-ns/" + topicName)
-                .subscriptionName(sub1)
-                .subscriptionType(SubscriptionType.Shared)
-                .receiverQueueSize(receiverSize)
-                .subscribe();
-        // 1.b. Subscriber Slow subscriber:
-        Consumer<byte[]> subscriber2 = pulsarClient.newConsumer()
-                .topic("persistent://my-property/use/my-ns/" + topicName)
-                .subscriptionName(sub2)
-                .subscriptionType(SubscriptionType.Shared)
-                .receiverQueueSize(receiverSize)
-                .subscribe();
-        Producer<byte[]> producer = pulsarClient.newProducer()
-                .topic(topic)
-                .enableBatching(batchMessageDelayMs != 0)
-                .batchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS)
-                .batchingMaxMessages(5)
-                .create();
-
-        PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topic).get();
-        ManagedLedgerImpl ledger = (ManagedLedgerImpl) topicRef.getManagedLedger();
-
-        // reflection to set/get cache-backlog fields value:
-        final long maxMessageCacheRetentionTimeMillis = 100;
-        Field backlogThresholdField = ManagedLedgerImpl.class.getDeclaredField("maxActiveCursorBacklogEntries");
-        backlogThresholdField.setAccessible(true);
-        Field field = ManagedLedgerImpl.class.getDeclaredField("maxMessageCacheRetentionTimeMillis");
-        field.setAccessible(true);
-        Field modifiersField = Field.class.getDeclaredField("modifiers");
-        modifiersField.setAccessible(true);
-        modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
-        field.set(ledger, maxMessageCacheRetentionTimeMillis);
-        final long maxActiveCursorBacklogEntries = (long) backlogThresholdField.get(ledger);
-
-        Message<byte[]>msg = null;
-        final int totalMsgs = (int) maxActiveCursorBacklogEntries + receiverSize + 1;
-        // 2. Produce messages
-        for (int i = 0; i < totalMsgs; i++) {
-            String message = "my-message-" + i;
-            producer.send(message.getBytes());
-        }
-        // 3. Consume messages: at Faster subscriber
-        for (int i = 0; i < totalMsgs; i++) {
-            msg = subscriber1.receive(100, TimeUnit.MILLISECONDS);
-            subscriber1.acknowledge(msg);
-        }
-
-        // wait : so message can be eligible to to be evict from cache
-        Thread.sleep(maxMessageCacheRetentionTimeMillis);
-
-        // 4. deactivate subscriber which has built the backlog
-        ledger.checkBackloggedCursors();
-        Thread.sleep(100);
-
-        // 5. verify: active subscribers
-        Set<String> activeSubscriber = Sets.newHashSet();
-        ledger.getActiveCursors().forEach(c -> activeSubscriber.add(c.getName()));
-        assertTrue(activeSubscriber.contains(sub1));
-        assertFalse(activeSubscriber.contains(sub2));
-
-        // 6. consume messages : at slower subscriber
-        for (int i = 0; i < totalMsgs; i++) {
-            msg = subscriber2.receive(100, TimeUnit.MILLISECONDS);
-            subscriber2.acknowledge(msg);
-        }
-
-        ledger.checkBackloggedCursors();
-
-        activeSubscriber.clear();
-        ledger.getActiveCursors().forEach(c -> activeSubscriber.add(c.getName()));
-
-        assertTrue(activeSubscriber.contains(sub1));
-        assertTrue(activeSubscriber.contains(sub2));
-    }
 
     @Test(timeOut = 2000)
     public void testAsyncProducerAndConsumer() throws Exception {
diff --git a/site2/docs/reference-configuration.md b/site2/docs/reference-configuration.md
index 959f546..1f828ad 100644
--- a/site2/docs/reference-configuration.md
+++ b/site2/docs/reference-configuration.md
@@ -176,6 +176,9 @@ Pulsar brokers are responsible for handling incoming messages from producers, di
 |managedLedgerDefaultAckQuorum| Number of guaranteed copies (acks to wait before write is complete) |2|
 |managedLedgerCacheSizeMB|  Amount of memory to use for caching data payload in managed ledger. This memory is allocated from JVM direct memory and it’s shared across all the topics running in the same broker. By default, uses 1/5th of available direct memory ||
 |managedLedgerCacheEvictionWatermark| Threshold to which bring down the cache level when eviction is triggered  |0.9|
+|managedLedgerCacheEvictionFrequency| Configure the cache eviction frequency for the managed ledger cache (evictions/sec) | 100.0 |
+|managedLedgerCacheEvictionTimeThresholdMillis| All entries that have stayed in cache for more than the configured time, will be evicted | 1000 |
+|managedLedgerCursorBackloggedThreshold| Configure the threshold (in number of entries) from where a cursor should be considered 'backlogged' and thus should be set as inactive. | 1000|
 |managedLedgerDefaultMarkDeleteRateLimit| Rate limit the amount of writes per second generated by consumer acking the messages  |1.0|
 |managedLedgerMaxEntriesPerLedger|  Max number of entries to append to a ledger before triggering a rollover. A ledger rollover is triggered on these conditions: <ul><li>Either the max rollover time has been reached</li><li>or max entries have been written to the ledged and at least min-time has passed</li></ul>|50000|
 |managedLedgerMinLedgerRolloverTimeMinutes| Minimum time between ledger rollover for a topic  |10|