You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/06/06 23:39:23 UTC

[pulsar] branch master updated: Extracted interface for EntryCacheManager (#15933)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new c7faf6248b1 Extracted interface for EntryCacheManager (#15933)
c7faf6248b1 is described below

commit c7faf6248b1413fc71465fc12571886db323193c
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Mon Jun 6 16:39:14 2022 -0700

    Extracted interface for EntryCacheManager (#15933)
    
    * Extracted interface for EntryCacheManager
    
    * Fixed references
    
    * added more methods to the interface
    
    * Fixed mocked test
    
    * Removed unused import
    
    * Fixed wrong casting in reflection access
---
 .../bookkeeper/mledger/ManagedLedgerFactory.java   |   2 +-
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |   2 +-
 .../mledger/impl/ManagedLedgerFactoryImpl.java     |   6 +-
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  25 ++--
 .../mledger/impl/{ => cache}/EntryCache.java       |   4 +-
 .../EntryCacheDefaultEvictionPolicy.java           |   4 +-
 .../mledger/impl/cache/EntryCacheDisabled.java     | 147 +++++++++++++++++++++
 .../impl/{ => cache}/EntryCacheEvictionPolicy.java |   2 +-
 .../EntryCacheManager.java}                        |  34 ++---
 .../RangeEntryCacheImpl.java}                      |  21 +--
 .../RangeEntryCacheManagerImpl.java}               | 145 +++-----------------
 .../package-info.java}                             |  20 +--
 .../mledger/impl/EntryCacheManagerTest.java        | 126 +++++++++---------
 .../bookkeeper/mledger/impl/EntryCacheTest.java    |   4 +-
 .../mledger/impl/ManagedLedgerBkTest.java          |  11 +-
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java |  18 ++-
 .../service/nonpersistent/NonPersistentTopic.java  |   2 +-
 .../broker/stats/AllocatorStatsGenerator.java      |   4 +-
 .../stats/metrics/ManagedLedgerCacheMetrics.java   |   4 +-
 .../broker/service/PersistentTopicE2ETest.java     |   4 +-
 .../client/api/SimpleProducerConsumerTest.java     |   4 +-
 .../client/api/v1/V1_ProducerConsumerTest.java     |   4 +-
 22 files changed, 314 insertions(+), 279 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java
index ea78987eb3e..e42c2581ba1 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java
@@ -26,7 +26,7 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteLedgerCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ManagedLedgerInfoCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenLedgerCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenReadOnlyCursorCallback;
-import org.apache.bookkeeper.mledger.impl.EntryCacheManager;
+import org.apache.bookkeeper.mledger.impl.cache.EntryCacheManager;
 
 /**
  * A factory to open/create managed ledgers and delete them.
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 3d558a231db..10d8d19eb33 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -2832,7 +2832,7 @@ public class ManagedCursorImpl implements ManagedCursor {
 
     boolean shouldCloseLedger(LedgerHandle lh) {
         long now = clock.millis();
-        if (ledger.factory.isMetadataServiceAvailable()
+        if (ledger.getFactory().isMetadataServiceAvailable()
                 && (lh.getLastAddConfirmed() >= config.getMetadataMaxEntriesPerLedger()
                 || lastLedgerSwitchTimestamp < (now - config.getLedgerRolloverTimeout() * 1000))
                 && (STATE_UPDATER.get(this) != State.Closed && STATE_UPDATER.get(this) != State.Closing)) {
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index b6b1bcf3e8d..48e85423e84 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -68,6 +68,8 @@ import org.apache.bookkeeper.mledger.ReadOnlyCursor;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.ManagedLedgerInitializeLedgerCallback;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.State;
 import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
+import org.apache.bookkeeper.mledger.impl.cache.EntryCacheManager;
+import org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheManagerImpl;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.LongProperty;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
@@ -90,10 +92,12 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
     private final BookkeeperFactoryForCustomEnsemblePlacementPolicy bookkeeperFactory;
     private final boolean isBookkeeperManaged;
     private final ManagedLedgerFactoryConfig config;
+    @Getter
     protected final OrderedScheduler scheduledExecutor;
 
     private final ExecutorService cacheEvictionExecutor;
 
+    @Getter
     protected final ManagedLedgerFactoryMBeanImpl mbean;
 
     protected final ConcurrentHashMap<String, CompletableFuture<ManagedLedgerImpl>> ledgers = new ConcurrentHashMap<>();
@@ -189,7 +193,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
                 config.getManagedCursorInfoCompressionType());
         this.config = config;
         this.mbean = new ManagedLedgerFactoryMBeanImpl(this);
-        this.entryCacheManager = new EntryCacheManager(this);
+        this.entryCacheManager = new RangeEntryCacheManagerImpl(this);
         this.statsTask = scheduledExecutor.scheduleWithFixedDelay(catchingAndLoggingThrowables(this::refreshStats),
                 0, config.getStatsPeriodSeconds(), TimeUnit.SECONDS);
         this.flushCursorsTask = scheduledExecutor.scheduleAtFixedRate(catchingAndLoggingThrowables(this::flushCursors),
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 56399f74fd6..6b7eb90e0d8 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -69,6 +69,7 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
+import lombok.Getter;
 import org.apache.bookkeeper.client.AsyncCallback;
 import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
 import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
@@ -116,6 +117,7 @@ import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.WaitingEntryCallBack;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.VoidCallback;
 import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
+import org.apache.bookkeeper.mledger.impl.cache.EntryCache;
 import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
 import org.apache.bookkeeper.mledger.offload.OffloadUtils;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats;
@@ -262,9 +264,16 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
             AtomicReferenceFieldUpdater.newUpdater(ManagedLedgerImpl.class, State.class, "state");
     protected volatile State state = null;
 
+    @Getter
     private final OrderedScheduler scheduledExecutor;
+
+    @Getter
     private final OrderedExecutor executor;
-    final ManagedLedgerFactoryImpl factory;
+
+    @Getter
+    private final ManagedLedgerFactoryImpl factory;
+
+    @Getter
     protected final ManagedLedgerMBeanImpl mbean;
     protected final Clock clock;
 
@@ -1865,7 +1874,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         }
     }
 
-    void invalidateLedgerHandle(ReadHandle ledgerHandle) {
+    public void invalidateLedgerHandle(ReadHandle ledgerHandle) {
         long ledgerId = ledgerHandle.getId();
         LedgerHandle currentLedger = this.currentLedger;
 
@@ -3564,14 +3573,6 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         return ledgers;
     }
 
-    OrderedScheduler getScheduledExecutor() {
-        return scheduledExecutor;
-    }
-
-    OrderedExecutor getExecutor() {
-        return executor;
-    }
-
     private ManagedLedgerInfo getManagedLedgerInfo() {
         return buildManagedLedgerInfo(ledgers);
     }
@@ -3679,10 +3680,6 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         return STATE_UPDATER.get(this);
     }
 
-    public ManagedLedgerMBeanImpl getMBean() {
-        return mbean;
-    }
-
     public long getCacheSize() {
         return entryCache.getSize();
     }
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCache.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCache.java
similarity index 96%
rename from managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCache.java
rename to managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCache.java
index 0e20a0facff..8f5b3e9b19e 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCache.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCache.java
@@ -16,11 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.bookkeeper.mledger.impl;
+package org.apache.bookkeeper.mledger.impl.cache;
 
 import org.apache.bookkeeper.client.api.ReadHandle;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
+import org.apache.bookkeeper.mledger.impl.EntryImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.commons.lang3.tuple.Pair;
 
 /**
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheDefaultEvictionPolicy.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDefaultEvictionPolicy.java
similarity index 96%
rename from managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheDefaultEvictionPolicy.java
rename to managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDefaultEvictionPolicy.java
index 4c26a93514b..f2a3cd4e51e 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheDefaultEvictionPolicy.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDefaultEvictionPolicy.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.bookkeeper.mledger.impl;
+package org.apache.bookkeeper.mledger.impl.cache;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static java.util.Collections.reverseOrder;
@@ -89,7 +89,7 @@ public class EntryCacheDefaultEvictionPolicy implements EntryCacheEvictionPolicy
         }
 
         log.info("Completed cache eviction. Removed {} entries from {} caches. ({} Mb)", evictedEntries,
-                cachesToEvict.size(), evictedSize / EntryCacheManager.MB);
+                cachesToEvict.size(), evictedSize / RangeEntryCacheManagerImpl.MB);
     }
 
     private static final Logger log = LoggerFactory.getLogger(EntryCacheDefaultEvictionPolicy.class);
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java
new file mode 100644
index 00000000000..a09b8ba27fc
--- /dev/null
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl.cache;
+
+import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException;
+import com.google.common.collect.Lists;
+import com.google.common.primitives.Longs;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.impl.EntryImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
+import org.apache.commons.lang3.tuple.Pair;
+
+/**
+ * Implementation of cache that always read from BookKeeper.
+ */
+public class EntryCacheDisabled implements EntryCache {
+    private final ManagedLedgerImpl ml;
+    private final ManagedLedgerInterceptor interceptor;
+
+    public EntryCacheDisabled(ManagedLedgerImpl ml) {
+        this.ml = ml;
+        this.interceptor = ml.getManagedLedgerInterceptor();
+    }
+
+    @Override
+    public String getName() {
+        return ml.getName();
+    }
+
+    @Override
+    public boolean insert(EntryImpl entry) {
+        return false;
+    }
+
+    @Override
+    public void invalidateEntries(PositionImpl lastPosition) {
+    }
+
+    @Override
+    public void invalidateAllEntries(long ledgerId) {
+    }
+
+    @Override
+    public void clear() {
+    }
+
+    @Override
+    public Pair<Integer, Long> evictEntries(long sizeToFree) {
+        return Pair.of(0, (long) 0);
+    }
+
+    @Override
+    public void invalidateEntriesBeforeTimestamp(long timestamp) {
+    }
+
+    @Override
+    public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boolean isSlowestReader,
+                               final AsyncCallbacks.ReadEntriesCallback callback, Object ctx) {
+        lh.readAsync(firstEntry, lastEntry).thenAcceptAsync(
+                ledgerEntries -> {
+                    List<Entry> entries = Lists.newArrayList();
+                    long totalSize = 0;
+                    try {
+                        for (LedgerEntry e : ledgerEntries) {
+                            // Insert the entries at the end of the list (they will be unsorted for now)
+                            EntryImpl entry = RangeEntryCacheManagerImpl.create(e, interceptor);
+                            entries.add(entry);
+                            totalSize += entry.getLength();
+                        }
+                    } finally {
+                        ledgerEntries.close();
+                    }
+                    ml.getFactory().getMbean().recordCacheMiss(entries.size(), totalSize);
+                    ml.getMbean().addReadEntriesSample(entries.size(), totalSize);
+
+                    callback.readEntriesComplete(entries, ctx);
+                }, ml.getExecutor().chooseThread(ml.getName())).exceptionally(exception -> {
+            callback.readEntriesFailed(createManagedLedgerException(exception), ctx);
+            return null;
+        });
+    }
+
+    @Override
+    public void asyncReadEntry(ReadHandle lh, PositionImpl position, AsyncCallbacks.ReadEntryCallback callback,
+                               Object ctx) {
+        lh.readAsync(position.getEntryId(), position.getEntryId()).whenCompleteAsync(
+                (ledgerEntries, exception) -> {
+                    if (exception != null) {
+                        ml.invalidateLedgerHandle(lh);
+                        callback.readEntryFailed(createManagedLedgerException(exception), ctx);
+                        return;
+                    }
+
+                    try {
+                        Iterator<LedgerEntry> iterator = ledgerEntries.iterator();
+                        if (iterator.hasNext()) {
+                            LedgerEntry ledgerEntry = iterator.next();
+                            EntryImpl returnEntry = RangeEntryCacheManagerImpl.create(ledgerEntry, interceptor);
+
+                            ml.getFactory().getMbean().recordCacheMiss(1, returnEntry.getLength());
+                            ml.getMbean().addReadEntriesSample(1, returnEntry.getLength());
+                            callback.readEntryComplete(returnEntry, ctx);
+                        } else {
+                            callback.readEntryFailed(new ManagedLedgerException("Could not read given position"),
+                                    ctx);
+                        }
+                    } finally {
+                        ledgerEntries.close();
+                    }
+                }, ml.getExecutor().chooseThread(ml.getName()));
+    }
+
+    @Override
+    public long getSize() {
+        return 0;
+    }
+
+    @Override
+    public int compareTo(EntryCache other) {
+        return Longs.compare(getSize(), other.getSize());
+    }
+
+}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheEvictionPolicy.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheEvictionPolicy.java
similarity index 96%
copy from managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheEvictionPolicy.java
copy to managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheEvictionPolicy.java
index 341c5c328ad..8c55ce7cf7f 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheEvictionPolicy.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheEvictionPolicy.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.bookkeeper.mledger.impl;
+package org.apache.bookkeeper.mledger.impl.cache;
 
 import java.util.List;
 
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheEvictionPolicy.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheManager.java
similarity index 61%
copy from managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheEvictionPolicy.java
copy to managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheManager.java
index 341c5c328ad..12cbb023f86 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheEvictionPolicy.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheManager.java
@@ -16,22 +16,24 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.bookkeeper.mledger.impl;
+package org.apache.bookkeeper.mledger.impl.cache;
 
-import java.util.List;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 
-/**
- * Cache eviction policy abstraction interface.
- *
- */
-public interface EntryCacheEvictionPolicy {
-    /**
-     * Perform the cache eviction of at least sizeToFree bytes on the supplied list of caches.
-     *
-     * @param caches
-     *            the list of caches to consider
-     * @param sizeToFree
-     *            the minimum size in bytes to be freed
-     */
-    void doEviction(List<EntryCache> caches, long sizeToFree);
+public interface EntryCacheManager {
+    EntryCache getEntryCache(ManagedLedgerImpl ml);
+
+    void removeEntryCache(String name);
+
+    long getSize();
+
+    long getMaxSize();
+
+    void clear();
+
+    void updateCacheSizeAndThreshold(long maxSize);
+
+    void updateCacheEvictionWatermark(double cacheEvictionWatermark);
+
+    double getCacheEvictionWatermark();
 }
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
similarity index 94%
rename from managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java
rename to managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
index 1a6986a7405..d37676b8d65 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.bookkeeper.mledger.impl;
+package org.apache.bookkeeper.mledger.impl.cache;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
@@ -34,6 +34,9 @@ import org.apache.bookkeeper.client.api.ReadHandle;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.impl.EntryImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
 import org.apache.bookkeeper.mledger.util.RangeCache;
 import org.apache.commons.lang3.tuple.Pair;
@@ -43,9 +46,9 @@ import org.slf4j.LoggerFactory;
 /**
  * Cache data payload for entries of all ledgers.
  */
-public class EntryCacheImpl implements EntryCache {
+public class RangeEntryCacheImpl implements EntryCache {
 
-    private final EntryCacheManager manager;
+    private final RangeEntryCacheManagerImpl manager;
     private final ManagedLedgerImpl ml;
     private ManagedLedgerInterceptor interceptor;
     private final RangeCache<PositionImpl, EntryImpl> entries;
@@ -53,7 +56,7 @@ public class EntryCacheImpl implements EntryCache {
 
     private static final double MB = 1024 * 1024;
 
-    public EntryCacheImpl(EntryCacheManager manager, ManagedLedgerImpl ml, boolean copyEntries) {
+    public RangeEntryCacheImpl(RangeEntryCacheManagerImpl manager, ManagedLedgerImpl ml, boolean copyEntries) {
         this.manager = manager;
         this.ml = ml;
         this.interceptor = ml.getManagedLedgerInterceptor();
@@ -214,10 +217,10 @@ public class EntryCacheImpl implements EntryCache {
                             Iterator<LedgerEntry> iterator = ledgerEntries.iterator();
                             if (iterator.hasNext()) {
                                 LedgerEntry ledgerEntry = iterator.next();
-                                EntryImpl returnEntry = EntryCacheManager.create(ledgerEntry, interceptor);
+                                EntryImpl returnEntry = RangeEntryCacheManagerImpl.create(ledgerEntry, interceptor);
 
                                 manager.mlFactoryMBean.recordCacheMiss(1, returnEntry.getLength());
-                                ml.mbean.addReadEntriesSample(1, returnEntry.getLength());
+                                ml.getMbean().addReadEntriesSample(1, returnEntry.getLength());
                                 callback.readEntryComplete(returnEntry, ctx);
                             } else {
                                 // got an empty sequence
@@ -299,14 +302,14 @@ public class EntryCacheImpl implements EntryCache {
                             long totalSize = 0;
                             final List<EntryImpl> entriesToReturn = Lists.newArrayListWithExpectedSize(entriesToRead);
                             for (LedgerEntry e : ledgerEntries) {
-                                EntryImpl entry = EntryCacheManager.create(e, interceptor);
+                                EntryImpl entry = RangeEntryCacheManagerImpl.create(e, interceptor);
 
                                 entriesToReturn.add(entry);
                                 totalSize += entry.getLength();
                             }
 
                             manager.mlFactoryMBean.recordCacheMiss(entriesToReturn.size(), totalSize);
-                            ml.getMBean().addReadEntriesSample(entriesToReturn.size(), totalSize);
+                            ml.getMbean().addReadEntriesSample(entriesToReturn.size(), totalSize);
 
                             callback.readEntriesComplete((List) entriesToReturn, ctx);
                         } finally {
@@ -364,5 +367,5 @@ public class EntryCacheImpl implements EntryCache {
         manager.entriesRemoved(evictedSize);
     }
 
-    private static final Logger log = LoggerFactory.getLogger(EntryCacheImpl.class);
+    private static final Logger log = LoggerFactory.getLogger(RangeEntryCacheImpl.class);
 }
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheManagerImpl.java
similarity index 56%
rename from managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java
rename to managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheManagerImpl.java
index 132feca6de8..4c27781b1f0 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheManagerImpl.java
@@ -16,34 +16,29 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.bookkeeper.mledger.impl;
+package org.apache.bookkeeper.mledger.impl.cache;
 
-import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException;
 import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import com.google.common.primitives.Longs;
 import io.netty.buffer.ByteBuf;
-import java.util.Iterator;
-import java.util.List;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.bookkeeper.client.api.LedgerEntry;
-import org.apache.bookkeeper.client.api.ReadHandle;
 import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
-import org.apache.bookkeeper.mledger.AsyncCallbacks;
-import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
 import org.apache.bookkeeper.mledger.Entry;
-import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.impl.EntryImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryMBeanImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
-import org.apache.commons.lang3.tuple.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @SuppressWarnings("checkstyle:javadoctype")
-public class EntryCacheManager {
+public class RangeEntryCacheManagerImpl implements EntryCacheManager {
 
     private volatile long maxSize;
     private volatile long evictionTriggerThreshold;
@@ -62,13 +57,13 @@ public class EntryCacheManager {
     private static final double evictionTriggerThresholdPercent = 0.98;
 
 
-    public EntryCacheManager(ManagedLedgerFactoryImpl factory) {
+    public RangeEntryCacheManagerImpl(ManagedLedgerFactoryImpl factory) {
         this.maxSize = factory.getConfig().getMaxCacheSize();
         this.evictionTriggerThreshold = (long) (maxSize * evictionTriggerThresholdPercent);
         this.cacheEvictionWatermark = factory.getConfig().getCacheEvictionWatermark();
         this.evictionPolicy = new EntryCacheDefaultEvictionPolicy();
         this.mlFactory = factory;
-        this.mlFactoryMBean = factory.mbean;
+        this.mlFactoryMBean = factory.getMbean();
 
         log.info("Initialized managed-ledger entry cache of {} Mb", maxSize / MB);
     }
@@ -79,7 +74,7 @@ public class EntryCacheManager {
             return new EntryCacheDisabled(ml);
         }
 
-        EntryCache newEntryCache = new EntryCacheImpl(this, ml, mlFactory.getConfig().isCopyEntriesInCache());
+        EntryCache newEntryCache = new RangeEntryCacheImpl(this, ml, mlFactory.getConfig().isCopyEntriesInCache());
         EntryCache currentEntryCache = caches.putIfAbsent(ml.getName(), newEntryCache);
         if (currentEntryCache != null) {
             return currentEntryCache;
@@ -88,16 +83,19 @@ public class EntryCacheManager {
         }
     }
 
+    @Override
     public void updateCacheSizeAndThreshold(long maxSize) {
         this.maxSize = maxSize;
         this.evictionTriggerThreshold = (long) (maxSize * evictionTriggerThresholdPercent);
     }
 
+    @Override
     public void updateCacheEvictionWatermark(double cacheEvictionWatermark) {
         this.cacheEvictionWatermark = cacheEvictionWatermark;
     }
 
-    void removeEntryCache(String name) {
+    @Override
+    public void removeEntryCache(String name) {
         EntryCache entryCache = caches.remove(name);
         if (entryCache == null) {
             return;
@@ -116,7 +114,7 @@ public class EntryCacheManager {
 
         // Trigger a single eviction in background. While the eviction is running we stop inserting entries in the cache
         if (currentSize > evictionTriggerThreshold && evictionInProgress.compareAndSet(false, true)) {
-            mlFactory.scheduledExecutor.execute(safeRun(() -> {
+            mlFactory.getScheduledExecutor().execute(safeRun(() -> {
                 // Trigger a new cache eviction cycle to bring the used memory below the cacheEvictionWatermark
                 // percentage limit
                 long sizeToEvict = currentSize - (long) (maxSize * cacheEvictionWatermark);
@@ -150,131 +148,26 @@ public class EntryCacheManager {
         currentSize.addAndGet(-size);
     }
 
+    @Override
     public long getSize() {
         return currentSize.get();
     }
 
+    @Override
     public long getMaxSize() {
         return maxSize;
     }
 
+    @Override
     public double getCacheEvictionWatermark() {
         return cacheEvictionWatermark;
     }
 
+    @Override
     public void clear() {
         caches.values().forEach(EntryCache::clear);
     }
 
-    protected class EntryCacheDisabled implements EntryCache {
-        private final ManagedLedgerImpl ml;
-        private final ManagedLedgerInterceptor interceptor;
-
-        public EntryCacheDisabled(ManagedLedgerImpl ml) {
-            this.ml = ml;
-            this.interceptor = ml.getManagedLedgerInterceptor();
-        }
-
-        @Override
-        public String getName() {
-            return ml.getName();
-        }
-
-        @Override
-        public boolean insert(EntryImpl entry) {
-            return false;
-        }
-
-        @Override
-        public void invalidateEntries(PositionImpl lastPosition) {
-        }
-
-        @Override
-        public void invalidateAllEntries(long ledgerId) {
-        }
-
-        @Override
-        public void clear() {
-        }
-
-        @Override
-        public Pair<Integer, Long> evictEntries(long sizeToFree) {
-            return Pair.of(0, (long) 0);
-        }
-
-        @Override
-        public void invalidateEntriesBeforeTimestamp(long timestamp) {
-        }
-
-        @Override
-        public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boolean isSlowestReader,
-                final ReadEntriesCallback callback, Object ctx) {
-            lh.readAsync(firstEntry, lastEntry).thenAcceptAsync(
-                    ledgerEntries -> {
-                        List<Entry> entries = Lists.newArrayList();
-                        long totalSize = 0;
-                        try {
-                            for (LedgerEntry e : ledgerEntries) {
-                                // Insert the entries at the end of the list (they will be unsorted for now)
-                                EntryImpl entry = create(e, interceptor);
-                                entries.add(entry);
-                                totalSize += entry.getLength();
-                            }
-                        } finally {
-                            ledgerEntries.close();
-                        }
-                        mlFactoryMBean.recordCacheMiss(entries.size(), totalSize);
-                        ml.mbean.addReadEntriesSample(entries.size(), totalSize);
-
-                        callback.readEntriesComplete(entries, ctx);
-                    }, ml.getExecutor().chooseThread(ml.getName())).exceptionally(exception -> {
-                        callback.readEntriesFailed(createManagedLedgerException(exception), ctx);
-                        return null;
-            });
-        }
-
-        @Override
-        public void asyncReadEntry(ReadHandle lh, PositionImpl position, AsyncCallbacks.ReadEntryCallback callback,
-                Object ctx) {
-            lh.readAsync(position.getEntryId(), position.getEntryId()).whenCompleteAsync(
-                    (ledgerEntries, exception) -> {
-                        if (exception != null) {
-                            ml.invalidateLedgerHandle(lh);
-                            callback.readEntryFailed(createManagedLedgerException(exception), ctx);
-                            return;
-                        }
-
-                        try {
-                            Iterator<LedgerEntry> iterator = ledgerEntries.iterator();
-                            if (iterator.hasNext()) {
-                                LedgerEntry ledgerEntry = iterator.next();
-                                EntryImpl returnEntry = create(ledgerEntry, interceptor);
-
-                                mlFactoryMBean.recordCacheMiss(1, returnEntry.getLength());
-                                ml.getMBean().addReadEntriesSample(1, returnEntry.getLength());
-                                callback.readEntryComplete(returnEntry, ctx);
-                            } else {
-                                callback.readEntryFailed(new ManagedLedgerException("Could not read given position"),
-                                        ctx);
-                            }
-                        } finally {
-                            ledgerEntries.close();
-                        }
-                    }, ml.getExecutor().chooseThread(ml.getName()));
-        }
-
-        @Override
-        public long getSize() {
-            return 0;
-        }
-
-        @Override
-        public int compareTo(EntryCache other) {
-            return Longs.compare(getSize(), other.getSize());
-        }
-
-    }
-
     public static Entry create(long ledgerId, long entryId, ByteBuf data) {
         return EntryImpl.create(ledgerId, entryId, data);
     }
@@ -300,5 +193,5 @@ public class EntryCacheManager {
         return returnEntry;
     }
 
-    private static final Logger log = LoggerFactory.getLogger(EntryCacheManager.class);
+    private static final Logger log = LoggerFactory.getLogger(RangeEntryCacheManagerImpl.class);
 }
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheEvictionPolicy.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/package-info.java
similarity index 61%
rename from managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheEvictionPolicy.java
rename to managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/package-info.java
index 341c5c328ad..898d250aa7f 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheEvictionPolicy.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/package-info.java
@@ -16,22 +16,4 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.bookkeeper.mledger.impl;
-
-import java.util.List;
-
-/**
- * Cache eviction policy abstraction interface.
- *
- */
-public interface EntryCacheEvictionPolicy {
-    /**
-     * Perform the cache eviction of at least sizeToFree bytes on the supplied list of caches.
-     *
-     * @param caches
-     *            the list of caches to consider
-     * @param sizeToFree
-     *            the minimum size in bytes to be freed
-     */
-    void doEviction(List<EntryCache> caches, long sizeToFree);
-}
+package org.apache.bookkeeper.mledger.impl.cache;
\ No newline at end of file
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java
index 0e53b1a9254..55f58ecd11c 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java
@@ -40,6 +40,9 @@ import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
+import org.apache.bookkeeper.mledger.impl.cache.EntryCache;
+import org.apache.bookkeeper.mledger.impl.cache.EntryCacheDisabled;
+import org.apache.bookkeeper.mledger.impl.cache.EntryCacheManager;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -56,8 +59,9 @@ public class EntryCacheManagerTest extends MockedBookKeeperTestCase {
         ml1 = mock(ManagedLedgerImpl.class);
         when(ml1.getScheduledExecutor()).thenReturn(executor);
         when(ml1.getName()).thenReturn("cache1");
-        when(ml1.getMBean()).thenReturn(new ManagedLedgerMBeanImpl(ml1));
+        when(ml1.getMbean()).thenReturn(new ManagedLedgerMBeanImpl(ml1));
         when(ml1.getExecutor()).thenReturn(super.executor);
+        when(ml1.getFactory()).thenReturn(factory);
 
         ml2 = mock(ManagedLedgerImpl.class);
         when(ml2.getScheduledExecutor()).thenReturn(executor);
@@ -83,13 +87,13 @@ public class EntryCacheManagerTest extends MockedBookKeeperTestCase {
         assertEquals(cache1.getSize(), 7);
         assertEquals(cacheManager.getSize(), 7);
 
-        cacheManager.mlFactoryMBean.refreshStats(1, TimeUnit.SECONDS);
-        assertEquals(cacheManager.mlFactoryMBean.getCacheMaxSize(), 10);
-        assertEquals(cacheManager.mlFactoryMBean.getCacheUsedSize(), 7);
-        assertEquals(cacheManager.mlFactoryMBean.getCacheHitsRate(), 0.0);
-        assertEquals(cacheManager.mlFactoryMBean.getCacheMissesRate(), 0.0);
-        assertEquals(cacheManager.mlFactoryMBean.getCacheHitsThroughput(), 0.0);
-        assertEquals(cacheManager.mlFactoryMBean.getNumberOfCacheEvictions(), 0);
+        factory2.getMbean().refreshStats(1, TimeUnit.SECONDS);
+        assertEquals(factory2.getMbean().getCacheMaxSize(), 10);
+        assertEquals(factory2.getMbean().getCacheUsedSize(), 7);
+        assertEquals(factory2.getMbean().getCacheHitsRate(), 0.0);
+        assertEquals(factory2.getMbean().getCacheMissesRate(), 0.0);
+        assertEquals(factory2.getMbean().getCacheHitsThroughput(), 0.0);
+        assertEquals(factory2.getMbean().getNumberOfCacheEvictions(), 0);
 
         cache2.insert(EntryImpl.create(2, 0, new byte[1]));
         cache2.insert(EntryImpl.create(2, 1, new byte[1]));
@@ -117,14 +121,14 @@ public class EntryCacheManagerTest extends MockedBookKeeperTestCase {
         assertEquals(cacheManager.getSize(), 2);
         assertEquals(cache2.getSize(), 2);
 
-        cacheManager.mlFactoryMBean.refreshStats(1, TimeUnit.SECONDS);
+        factory2.getMbean().refreshStats(1, TimeUnit.SECONDS);
 
-        assertEquals(cacheManager.mlFactoryMBean.getCacheMaxSize(), 10);
-        assertEquals(cacheManager.mlFactoryMBean.getCacheUsedSize(), 2);
-        assertEquals(cacheManager.mlFactoryMBean.getCacheHitsRate(), 0.0);
-        assertEquals(cacheManager.mlFactoryMBean.getCacheMissesRate(), 0.0);
-        assertEquals(cacheManager.mlFactoryMBean.getCacheHitsThroughput(), 0.0);
-        assertEquals(cacheManager.mlFactoryMBean.getNumberOfCacheEvictions(), 1);
+        assertEquals(factory2.getMbean().getCacheMaxSize(), 10);
+        assertEquals(factory2.getMbean().getCacheUsedSize(), 2);
+        assertEquals(factory2.getMbean().getCacheHitsRate(), 0.0);
+        assertEquals(factory2.getMbean().getCacheMissesRate(), 0.0);
+        assertEquals(factory2.getMbean().getCacheHitsThroughput(), 0.0);
+        assertEquals(factory2.getMbean().getNumberOfCacheEvictions(), 1);
     }
 
     @Test
@@ -197,8 +201,8 @@ public class EntryCacheManagerTest extends MockedBookKeeperTestCase {
         EntryCache cache1 = cacheManager.getEntryCache(ml1);
         EntryCache cache2 = cacheManager.getEntryCache(ml2);
 
-        assertTrue(cache1 instanceof EntryCacheManager.EntryCacheDisabled);
-        assertTrue(cache2 instanceof EntryCacheManager.EntryCacheDisabled);
+        assertTrue(cache1 instanceof EntryCacheDisabled);
+        assertTrue(cache2 instanceof EntryCacheDisabled);
 
         cache1.insert(EntryImpl.create(1, 1, new byte[4]));
         cache1.insert(EntryImpl.create(1, 0, new byte[3]));
@@ -206,13 +210,13 @@ public class EntryCacheManagerTest extends MockedBookKeeperTestCase {
         assertEquals(cache1.getSize(), 0);
         assertEquals(cacheManager.getSize(), 0);
 
-        cacheManager.mlFactoryMBean.refreshStats(1, TimeUnit.SECONDS);
-        assertEquals(cacheManager.mlFactoryMBean.getCacheMaxSize(), 0);
-        assertEquals(cacheManager.mlFactoryMBean.getCacheUsedSize(), 0);
-        assertEquals(cacheManager.mlFactoryMBean.getCacheHitsRate(), 0.0);
-        assertEquals(cacheManager.mlFactoryMBean.getCacheMissesRate(), 0.0);
-        assertEquals(cacheManager.mlFactoryMBean.getCacheHitsThroughput(), 0.0);
-        assertEquals(cacheManager.mlFactoryMBean.getNumberOfCacheEvictions(), 0);
+        factory2.getMbean().refreshStats(1, TimeUnit.SECONDS);
+        assertEquals(factory2.getMbean().getCacheMaxSize(), 0);
+        assertEquals(factory2.getMbean().getCacheUsedSize(), 0);
+        assertEquals(factory2.getMbean().getCacheHitsRate(), 0.0);
+        assertEquals(factory2.getMbean().getCacheMissesRate(), 0.0);
+        assertEquals(factory2.getMbean().getCacheHitsThroughput(), 0.0);
+        assertEquals(factory2.getMbean().getNumberOfCacheEvictions(), 0);
 
         cache2.insert(EntryImpl.create(2, 0, new byte[1]));
         cache2.insert(EntryImpl.create(2, 1, new byte[1]));
@@ -242,13 +246,13 @@ public class EntryCacheManagerTest extends MockedBookKeeperTestCase {
         assertEquals(cache1.getSize(), 0);
         assertEquals(cacheManager.getSize(), 0);
 
-        cacheManager.mlFactoryMBean.refreshStats(1, TimeUnit.SECONDS);
-        assertEquals(cacheManager.mlFactoryMBean.getCacheMaxSize(), 7 * 10);
-        assertEquals(cacheManager.mlFactoryMBean.getCacheUsedSize(), 0);
-        assertEquals(cacheManager.mlFactoryMBean.getCacheHitsRate(), 0.0);
-        assertEquals(cacheManager.mlFactoryMBean.getCacheMissesRate(), 0.0);
-        assertEquals(cacheManager.mlFactoryMBean.getCacheHitsThroughput(), 0.0);
-        assertEquals(cacheManager.mlFactoryMBean.getNumberOfCacheEvictions(), 0);
+        factory2.getMbean().refreshStats(1, TimeUnit.SECONDS);
+        assertEquals(factory2.getMbean().getCacheMaxSize(), 7 * 10);
+        assertEquals(factory2.getMbean().getCacheUsedSize(), 0);
+        assertEquals(factory2.getMbean().getCacheHitsRate(), 0.0);
+        assertEquals(factory2.getMbean().getCacheMissesRate(), 0.0);
+        assertEquals(factory2.getMbean().getCacheHitsThroughput(), 0.0);
+        assertEquals(factory2.getMbean().getNumberOfCacheEvictions(), 0);
     }
 
     @Test
@@ -271,54 +275,54 @@ public class EntryCacheManagerTest extends MockedBookKeeperTestCase {
             ledger.addEntry(("entry-" + i).getBytes());
         }
 
-        cacheManager.mlFactoryMBean.refreshStats(1, TimeUnit.SECONDS);
-        assertEquals(cacheManager.mlFactoryMBean.getCacheUsedSize(), 70);
-        assertEquals(cacheManager.mlFactoryMBean.getCacheHitsRate(), 0.0);
-        assertEquals(cacheManager.mlFactoryMBean.getCacheMissesRate(), 0.0);
-        assertEquals(cacheManager.mlFactoryMBean.getCacheHitsThroughput(), 0.0);
-        assertEquals(cacheManager.mlFactoryMBean.getNumberOfCacheEvictions(), 0);
+        factory2.getMbean().refreshStats(1, TimeUnit.SECONDS);
+        assertEquals(factory2.getMbean().getCacheUsedSize(), 70);
+        assertEquals(factory2.getMbean().getCacheHitsRate(), 0.0);
+        assertEquals(factory2.getMbean().getCacheMissesRate(), 0.0);
+        assertEquals(factory2.getMbean().getCacheHitsThroughput(), 0.0);
+        assertEquals(factory2.getMbean().getNumberOfCacheEvictions(), 0);
 
         List<Entry> entries = c1.readEntries(10);
         assertEquals(entries.size(), 10);
         entries.forEach(Entry::release);
 
-        cacheManager.mlFactoryMBean.refreshStats(1, TimeUnit.SECONDS);
-        assertEquals(cacheManager.mlFactoryMBean.getCacheUsedSize(), 70);
-        assertEquals(cacheManager.mlFactoryMBean.getCacheHitsRate(), 10.0);
-        assertEquals(cacheManager.mlFactoryMBean.getCacheMissesRate(), 0.0);
-        assertEquals(cacheManager.mlFactoryMBean.getCacheHitsThroughput(), 70.0);
-        assertEquals(cacheManager.mlFactoryMBean.getNumberOfCacheEvictions(), 0);
+        factory2.getMbean().refreshStats(1, TimeUnit.SECONDS);
+        assertEquals(factory2.getMbean().getCacheUsedSize(), 70);
+        assertEquals(factory2.getMbean().getCacheHitsRate(), 10.0);
+        assertEquals(factory2.getMbean().getCacheMissesRate(), 0.0);
+        assertEquals(factory2.getMbean().getCacheHitsThroughput(), 70.0);
+        assertEquals(factory2.getMbean().getNumberOfCacheEvictions(), 0);
 
         ledger.deactivateCursor(c1);
 
-        cacheManager.mlFactoryMBean.refreshStats(1, TimeUnit.SECONDS);
-        assertEquals(cacheManager.mlFactoryMBean.getCacheUsedSize(), 70);
-        assertEquals(cacheManager.mlFactoryMBean.getCacheHitsRate(), 0.0);
-        assertEquals(cacheManager.mlFactoryMBean.getCacheMissesRate(), 0.0);
-        assertEquals(cacheManager.mlFactoryMBean.getCacheHitsThroughput(), 0.0);
-        assertEquals(cacheManager.mlFactoryMBean.getNumberOfCacheEvictions(), 0);
+        factory2.getMbean().refreshStats(1, TimeUnit.SECONDS);
+        assertEquals(factory2.getMbean().getCacheUsedSize(), 70);
+        assertEquals(factory2.getMbean().getCacheHitsRate(), 0.0);
+        assertEquals(factory2.getMbean().getCacheMissesRate(), 0.0);
+        assertEquals(factory2.getMbean().getCacheHitsThroughput(), 0.0);
+        assertEquals(factory2.getMbean().getNumberOfCacheEvictions(), 0);
 
         entries = c2.readEntries(10);
         assertEquals(entries.size(), 10);
 
-        cacheManager.mlFactoryMBean.refreshStats(1, TimeUnit.SECONDS);
-        assertEquals(cacheManager.mlFactoryMBean.getCacheUsedSize(), 70);
-        assertEquals(cacheManager.mlFactoryMBean.getCacheHitsRate(), 10.0);
-        assertEquals(cacheManager.mlFactoryMBean.getCacheMissesRate(), 0.0);
-        assertEquals(cacheManager.mlFactoryMBean.getCacheHitsThroughput(), 70.0);
-        assertEquals(cacheManager.mlFactoryMBean.getNumberOfCacheEvictions(), 0);
+        factory2.getMbean().refreshStats(1, TimeUnit.SECONDS);
+        assertEquals(factory2.getMbean().getCacheUsedSize(), 70);
+        assertEquals(factory2.getMbean().getCacheHitsRate(), 10.0);
+        assertEquals(factory2.getMbean().getCacheMissesRate(), 0.0);
+        assertEquals(factory2.getMbean().getCacheHitsThroughput(), 70.0);
+        assertEquals(factory2.getMbean().getNumberOfCacheEvictions(), 0);
 
         PositionImpl pos = (PositionImpl) entries.get(entries.size() - 1).getPosition();
         c2.setReadPosition(pos);
         ledger.discardEntriesFromCache(c2, pos);
         entries.forEach(Entry::release);
 
-        cacheManager.mlFactoryMBean.refreshStats(1, TimeUnit.SECONDS);
-        assertEquals(cacheManager.mlFactoryMBean.getCacheUsedSize(), 7);
-        assertEquals(cacheManager.mlFactoryMBean.getCacheHitsRate(), 0.0);
-        assertEquals(cacheManager.mlFactoryMBean.getCacheMissesRate(), 0.0);
-        assertEquals(cacheManager.mlFactoryMBean.getCacheHitsThroughput(), 0.0);
-        assertEquals(cacheManager.mlFactoryMBean.getNumberOfCacheEvictions(), 0);
+        factory2.getMbean().refreshStats(1, TimeUnit.SECONDS);
+        assertEquals(factory2.getMbean().getCacheUsedSize(), 7);
+        assertEquals(factory2.getMbean().getCacheHitsRate(), 0.0);
+        assertEquals(factory2.getMbean().getCacheMissesRate(), 0.0);
+        assertEquals(factory2.getMbean().getCacheHitsThroughput(), 0.0);
+        assertEquals(factory2.getMbean().getNumberOfCacheEvictions(), 0);
     }
 
     @Test
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheTest.java
index 6f177c9ac92..fc7f0bbb058 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheTest.java
@@ -41,6 +41,8 @@ import org.apache.bookkeeper.client.api.ReadHandle;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.impl.cache.EntryCache;
+import org.apache.bookkeeper.mledger.impl.cache.EntryCacheManager;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
 import org.mockito.Mockito;
 import org.testng.Assert;
@@ -55,7 +57,7 @@ public class EntryCacheTest extends MockedBookKeeperTestCase {
         ml = mock(ManagedLedgerImpl.class);
         when(ml.getName()).thenReturn("name");
         when(ml.getExecutor()).thenReturn(executor);
-        when(ml.getMBean()).thenReturn(new ManagedLedgerMBeanImpl(ml));
+        when(ml.getMbean()).thenReturn(new ManagedLedgerMBeanImpl(ml));
     }
 
     @Test(timeOut = 5000)
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java
index b5358987d6e..511a5f9149a 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java
@@ -48,6 +48,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerAlready
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
 import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.cache.EntryCacheManager;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
 import org.testng.annotations.Test;
@@ -216,12 +217,12 @@ public class ManagedLedgerBkTest extends BookKeeperClusterTestCase {
             future.get();
         }
 
-        cacheManager.mlFactoryMBean.refreshStats(1, TimeUnit.SECONDS);
+        factory.getMbean().refreshStats(1, TimeUnit.SECONDS);
 
-        assertTrue(cacheManager.mlFactoryMBean.getCacheHitsRate() > 0.0);
-        assertEquals(cacheManager.mlFactoryMBean.getCacheMissesRate(), 0.0);
-        assertTrue(cacheManager.mlFactoryMBean.getCacheHitsThroughput() > 0.0);
-        assertEquals(cacheManager.mlFactoryMBean.getNumberOfCacheEvictions(), 0);
+        assertTrue(factory.getMbean().getCacheHitsRate() > 0.0);
+        assertEquals(factory.getMbean().getCacheMissesRate(), 0.0);
+        assertTrue(factory.getMbean().getCacheHitsThroughput() > 0.0);
+        assertEquals(factory.getMbean().getNumberOfCacheEvictions(), 0);
     }
 
     @Test
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index ded0f4990e9..32265cd49ac 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -108,23 +108,26 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.VoidCallback;
 import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
+import org.apache.bookkeeper.mledger.impl.cache.EntryCache;
+import org.apache.bookkeeper.mledger.impl.cache.EntryCacheManager;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
 import org.apache.bookkeeper.mledger.util.Futures;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
-import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
-import org.apache.pulsar.metadata.api.extended.SessionEvent;
-import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
 import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
+import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.Stat;
+import org.apache.pulsar.metadata.api.extended.SessionEvent;
+import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore;
 import org.awaitility.Awaitility;
 import org.mockito.Mockito;
+import org.powermock.reflect.Whitebox;
 import org.testng.Assert;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
@@ -2585,9 +2588,7 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
         Set<ManagedCursor> activeCursors = Sets.newHashSet();
         activeCursors.add(cursor1);
         activeCursors.add(cursor2);
-        Field cacheField = ManagedLedgerImpl.class.getDeclaredField("entryCache");
-        cacheField.setAccessible(true);
-        EntryCacheImpl entryCache = (EntryCacheImpl) cacheField.get(ledger);
+        EntryCache entryCache = Whitebox.getInternalState(ledger, "entryCache");
 
         Iterator<ManagedCursor> activeCursor = ledger.getActiveCursors().iterator();
 
@@ -2659,10 +2660,7 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
     @Test
     public void testActiveDeactiveCursor() throws Exception {
         ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("cache_eviction_ledger");
-
-        Field cacheField = ManagedLedgerImpl.class.getDeclaredField("entryCache");
-        cacheField.setAccessible(true);
-        EntryCacheImpl entryCache = (EntryCacheImpl) cacheField.get(ledger);
+        EntryCache entryCache = Whitebox.getInternalState(ledger, "entryCache");
 
         final int totalInsertedEntries = 20;
         for (int i = 0; i < totalInsertedEntries; i++) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index b8f41fa4c40..2dee32f12c8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -19,7 +19,7 @@
 package org.apache.pulsar.broker.service.nonpersistent;
 
 import static com.google.common.base.Preconditions.checkArgument;
-import static org.apache.bookkeeper.mledger.impl.EntryCacheManager.create;
+import static org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheManagerImpl.create;
 import static org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
 import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
 import com.carrotsearch.hppc.ObjectObjectHashMap;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/AllocatorStatsGenerator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/AllocatorStatsGenerator.java
index 43dbd642f80..5dd4c0b0402 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/AllocatorStatsGenerator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/AllocatorStatsGenerator.java
@@ -25,7 +25,7 @@ import io.netty.buffer.PoolChunkMetric;
 import io.netty.buffer.PoolSubpageMetric;
 import io.netty.buffer.PooledByteBufAllocator;
 import java.util.stream.Collectors;
-import org.apache.bookkeeper.mledger.impl.EntryCacheImpl;
+import org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl;
 import org.apache.pulsar.common.stats.AllocatorStats;
 import org.apache.pulsar.common.stats.AllocatorStats.PoolArenaStats;
 import org.apache.pulsar.common.stats.AllocatorStats.PoolChunkListStats;
@@ -38,7 +38,7 @@ public class AllocatorStatsGenerator {
         if ("default".equals(allocatorName)) {
             allocator = PooledByteBufAllocator.DEFAULT;
         } else if ("ml-cache".equals(allocatorName)) {
-            allocator = EntryCacheImpl.ALLOCATOR;
+            allocator = RangeEntryCacheImpl.ALLOCATOR;
         } else {
             throw new IllegalArgumentException("Invalid allocator name : " + allocatorName);
         }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerCacheMetrics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerCacheMetrics.java
index 2474ce2b8de..8e46b4cf254 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerCacheMetrics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerCacheMetrics.java
@@ -25,7 +25,7 @@ import io.netty.buffer.PoolChunkMetric;
 import io.netty.buffer.PooledByteBufAllocator;
 import java.util.List;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactoryMXBean;
-import org.apache.bookkeeper.mledger.impl.EntryCacheImpl;
+import org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.common.stats.Metrics;
 
@@ -54,7 +54,7 @@ public class ManagedLedgerCacheMetrics extends AbstractMetrics {
         m.put("brk_ml_cache_hits_throughput", mlCacheStats.getCacheHitsThroughput());
         m.put("brk_ml_cache_misses_throughput", mlCacheStats.getCacheMissesThroughput());
 
-        PooledByteBufAllocator allocator = EntryCacheImpl.ALLOCATOR;
+        PooledByteBufAllocator allocator = RangeEntryCacheImpl.ALLOCATOR;
         long activeAllocations = 0;
         long activeAllocationsSmall = 0;
         long activeAllocationsNormal = 0;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
index 7a90b10fa7e..c3f6a634a11 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
@@ -48,9 +48,9 @@ import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
 import org.apache.bookkeeper.mledger.ManagedCursor;
-import org.apache.bookkeeper.mledger.impl.EntryCacheImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.cache.EntryCache;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.service.persistent.MessageRedeliveryController;
@@ -339,7 +339,7 @@ public class PersistentTopicE2ETest extends BrokerTestBase {
         ManagedLedgerImpl ledger = (ManagedLedgerImpl) topicRef.getManagedLedger();
         Field cacheField = ManagedLedgerImpl.class.getDeclaredField("entryCache");
         cacheField.setAccessible(true);
-        EntryCacheImpl entryCache = (EntryCacheImpl) cacheField.get(ledger);
+        EntryCache entryCache = (EntryCache) cacheField.get(ledger);
 
         /************* Validation on non-empty active-cursor **************/
         // (4) Get ActiveCursor : which is list of active subscription
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index f4845c08c55..8205dbe935b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -75,8 +75,8 @@ import lombok.Data;
 import lombok.EqualsAndHashCode;
 import org.apache.avro.Schema.Parser;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
-import org.apache.bookkeeper.mledger.impl.EntryCacheImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.cache.EntryCache;
 import org.apache.commons.lang3.RandomUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
@@ -1058,7 +1058,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
 
         PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topic).get();
         ManagedLedgerImpl ledger = (ManagedLedgerImpl) topicRef.getManagedLedger();
-        EntryCacheImpl entryCache = spy((EntryCacheImpl) Whitebox.getInternalState(ledger, "entryCache"));
+        EntryCache entryCache = spy((EntryCache) Whitebox.getInternalState(ledger, "entryCache"));
         Whitebox.setInternalState(ledger, "entryCache", entryCache);
 
         Message<byte[]> msg;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
index a36dd359e59..9679620c357 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
@@ -47,8 +47,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 import lombok.Cleanup;
-import org.apache.bookkeeper.mledger.impl.EntryCacheImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.cache.EntryCache;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerBuilder;
@@ -648,7 +648,7 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
         PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topic).get();
         ManagedLedgerImpl ledger = (ManagedLedgerImpl) topicRef.getManagedLedger();
 
-        EntryCacheImpl entryCache = spy((EntryCacheImpl) Whitebox.getInternalState(ledger, "entryCache"));
+        EntryCache entryCache = spy((EntryCache) Whitebox.getInternalState(ledger, "entryCache"));
         Whitebox.setInternalState(ledger, "entryCache", entryCache);
 
         Message<byte[]>msg = null;