You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/10/13 02:25:56 UTC

[pulsar] branch branch-2.11 updated (2e85be99747 -> 1fc4ccf8240)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a change to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


    from 2e85be99747 [improve][client] Refactor SchemaHash to reduce call of hashFunction in SchemaHash (#17948)
     new 44d45af6d21 Apply modernizer-maven-plugin to managed-ledger module. (#16363)
     new df33f59cb4f [enh][broker] Add metrics for entry cache insertion, eviction (#17248)
     new 55a0cb4841e [managed-ledger] Do not send duplicate reads to BK/offloaders (#17241)
     new 1fc4ccf8240 [bug][broker] fix memory leak in case of error conditions in PendingReadsManager (#17995)

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 managed-ledger/pom.xml                             |  18 +
 .../bookkeeper/mledger/ManagedLedgerConfig.java    |   6 +-
 .../mledger/ManagedLedgerFactoryMXBean.java        |  15 +
 .../mledger/impl/LedgerMetadataUtils.java          |  11 +-
 .../mledger/impl/ManagedCursorContainer.java       |   7 +-
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |  34 +-
 .../mledger/impl/ManagedLedgerFactoryImpl.java     |   5 +-
 .../impl/ManagedLedgerFactoryMBeanImpl.java        |  27 ++
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  24 +-
 .../bookkeeper/mledger/impl/OpReadEntry.java       |   4 +-
 .../bookkeeper/mledger/impl/RangeSetWrapper.java   |   4 +-
 .../cache/EntryCacheDefaultEvictionPolicy.java     |   4 +-
 .../mledger/impl/cache/EntryCacheDisabled.java     |   7 +-
 .../mledger/impl/cache/PendingReadsManager.java    | 452 ++++++++++++++++++++
 .../mledger/impl/cache/RangeEntryCacheImpl.java    | 125 +++---
 .../impl/cache/RangeEntryCacheManagerImpl.java     |   8 +-
 .../bookkeeper/mledger/offload/OffloadUtils.java   |   8 +-
 .../apache/bookkeeper/mledger/util/RangeCache.java |  16 +-
 .../bookkeeper/mledger/SimpleBookKeeperTest.java   |   4 +-
 .../mledger/impl/EntryCacheManagerTest.java        |  18 +
 .../bookkeeper/mledger/impl/EntryCacheTest.java    |   4 +-
 .../mledger/impl/ManagedCursorConcurrencyTest.java |  10 +-
 .../mledger/impl/ManagedCursorContainerTest.java   |   4 +-
 ...ManagedCursorIndividualDeletedMessagesTest.java |   6 +-
 .../mledger/impl/ManagedCursorListAckTest.java     |   4 +-
 .../bookkeeper/mledger/impl/ManagedCursorTest.java |  13 +-
 .../mledger/impl/ManagedLedgerBkTest.java          |  17 +-
 .../ManagedLedgerFactoryChangeLedgerPathTest.java  |   9 +-
 .../impl/ManagedLedgerSingleBookieTest.java        |   4 +-
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java |  13 +-
 .../mledger/impl/NonDurableCursorTest.java         |  11 +-
 .../mledger/impl/OffloadPrefixReadTest.java        |  10 +-
 .../bookkeeper/mledger/impl/OffloadPrefixTest.java |  19 +-
 .../mledger/impl/RangeSetWrapperTest.java          |   5 +-
 .../impl/cache/PendingReadsManagerTest.java        | 462 +++++++++++++++++++++
 .../bookkeeper/mledger/util/RangeCacheTest.java    |   5 +-
 .../stats/metrics/ManagedLedgerCacheMetrics.java   |   3 +
 .../client/api/MessageDispatchThrottlingTest.java  |   8 +
 site2/docs/reference-metrics.md                    |   3 +
 39 files changed, 1225 insertions(+), 182 deletions(-)
 create mode 100644 managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java
 create mode 100644 managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManagerTest.java


[pulsar] 01/04: Apply modernizer-maven-plugin to managed-ledger module. (#16363)

Posted by te...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 44d45af6d21b4b4de8dfbdfeda148f7ab85219f5
Author: Marvin Cai <ca...@gmail.com>
AuthorDate: Mon Aug 8 05:59:03 2022 -0700

    Apply modernizer-maven-plugin to managed-ledger module. (#16363)
    
    To encourage the use of modern & jdk builtin APIs.
---
 managed-ledger/pom.xml                             | 18 ++++++++++++
 .../bookkeeper/mledger/ManagedLedgerConfig.java    |  6 ++--
 .../mledger/impl/LedgerMetadataUtils.java          | 11 ++++---
 .../mledger/impl/ManagedCursorContainer.java       |  7 ++---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 34 +++++++++++-----------
 .../mledger/impl/ManagedLedgerFactoryImpl.java     |  5 ++--
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 24 +++++++--------
 .../bookkeeper/mledger/impl/OpReadEntry.java       |  4 +--
 .../bookkeeper/mledger/impl/RangeSetWrapper.java   |  4 +--
 .../cache/EntryCacheDefaultEvictionPolicy.java     |  4 +--
 .../mledger/impl/cache/EntryCacheDisabled.java     |  7 ++---
 .../mledger/impl/cache/RangeEntryCacheImpl.java    |  9 +++---
 .../impl/cache/RangeEntryCacheManagerImpl.java     |  4 +--
 .../bookkeeper/mledger/offload/OffloadUtils.java   |  8 ++---
 .../apache/bookkeeper/mledger/util/RangeCache.java |  4 +--
 .../bookkeeper/mledger/SimpleBookKeeperTest.java   |  4 +--
 .../bookkeeper/mledger/impl/EntryCacheTest.java    |  4 +--
 .../mledger/impl/ManagedCursorConcurrencyTest.java | 10 ++++---
 .../mledger/impl/ManagedCursorContainerTest.java   |  4 ++-
 ...ManagedCursorIndividualDeletedMessagesTest.java |  6 ++--
 .../mledger/impl/ManagedCursorListAckTest.java     |  4 +--
 .../bookkeeper/mledger/impl/ManagedCursorTest.java | 13 ++++-----
 .../mledger/impl/ManagedLedgerBkTest.java          | 17 ++++++-----
 .../ManagedLedgerFactoryChangeLedgerPathTest.java  |  9 +++---
 .../impl/ManagedLedgerSingleBookieTest.java        |  4 +--
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 13 +++++----
 .../mledger/impl/NonDurableCursorTest.java         | 11 ++++---
 .../mledger/impl/OffloadPrefixReadTest.java        | 10 +++----
 .../bookkeeper/mledger/impl/OffloadPrefixTest.java | 19 ++++++------
 .../mledger/impl/RangeSetWrapperTest.java          |  5 ++--
 30 files changed, 149 insertions(+), 133 deletions(-)

diff --git a/managed-ledger/pom.xml b/managed-ledger/pom.xml
index dfa1ffeecde..39cfce5d6aa 100644
--- a/managed-ledger/pom.xml
+++ b/managed-ledger/pom.xml
@@ -124,6 +124,24 @@
 
   <build>
     <plugins>
+      <plugin>
+        <groupId>org.gaul</groupId>
+        <artifactId>modernizer-maven-plugin</artifactId>
+        <configuration>
+          <failOnViolations>true</failOnViolations>
+          <javaVersion>17</javaVersion>
+        </configuration>
+        <executions>
+          <execution>
+            <id>modernizer</id>
+            <phase>verify</phase>
+            <goals>
+              <goal>modernizer</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-jar-plugin</artifactId>
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
index 788732e763a..92c9c911981 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
@@ -19,7 +19,7 @@
 package org.apache.bookkeeper.mledger;
 
 import static com.google.common.base.Preconditions.checkArgument;
-import com.google.common.base.Charsets;
+import java.nio.charset.StandardCharsets;
 import java.time.Clock;
 import java.util.Arrays;
 import java.util.Map;
@@ -68,7 +68,7 @@ public class ManagedLedgerConfig {
     private long readEntryTimeoutSeconds = 120;
     private long addEntryTimeoutSeconds = 120;
     private DigestType digestType = DigestType.CRC32C;
-    private byte[] password = "".getBytes(Charsets.UTF_8);
+    private byte[] password = "".getBytes(StandardCharsets.UTF_8);
     private boolean unackedRangesOpenCacheSetEnabled = true;
     private Class<? extends EnsemblePlacementPolicy>  bookKeeperEnsemblePlacementPolicyClassName;
     private Map<String, Object> bookKeeperEnsemblePlacementPolicyProperties;
@@ -271,7 +271,7 @@ public class ManagedLedgerConfig {
      *            the password to set
      */
     public ManagedLedgerConfig setPassword(String password) {
-        this.password = password.getBytes(Charsets.UTF_8);
+        this.password = password.getBytes(StandardCharsets.UTF_8);
         return this;
     }
 
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerMetadataUtils.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerMetadataUtils.java
index 8049220e964..b32b81b2349 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerMetadataUtils.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerMetadataUtils.java
@@ -18,7 +18,6 @@
  */
 package org.apache.bookkeeper.mledger.impl;
 
-import com.google.common.collect.ImmutableMap;
 import java.nio.charset.StandardCharsets;
 import java.util.Map;
 import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
@@ -53,7 +52,7 @@ public final class LedgerMetadataUtils {
      * @return an immutable map which describes a ManagedLedger
      */
     static Map<String, byte[]> buildBaseManagedLedgerMetadata(String name) {
-        return ImmutableMap.of(
+        return Map.of(
                 METADATA_PROPERTY_APPLICATION, METADATA_PROPERTY_APPLICATION_PULSAR,
                 METADATA_PROPERTY_COMPONENT, METADATA_PROPERTY_COMPONENT_MANAGED_LEDGER,
                 METADATA_PROPERTY_MANAGED_LEDGER_NAME, name.getBytes(StandardCharsets.UTF_8));
@@ -67,7 +66,7 @@ public final class LedgerMetadataUtils {
      * @see #buildBaseManagedLedgerMetadata(java.lang.String)
      */
     static Map<String, byte[]> buildAdditionalMetadataForCursor(String name) {
-        return ImmutableMap.of(METADATA_PROPERTY_CURSOR_NAME, name.getBytes(StandardCharsets.UTF_8));
+        return Map.of(METADATA_PROPERTY_CURSOR_NAME, name.getBytes(StandardCharsets.UTF_8));
     }
 
     /**
@@ -79,7 +78,7 @@ public final class LedgerMetadataUtils {
      */
     public static Map<String, byte[]> buildMetadataForCompactedLedger(String compactedTopic,
                                                                       byte[] compactedToMessageId) {
-        return ImmutableMap.of(
+        return Map.of(
                 METADATA_PROPERTY_APPLICATION, METADATA_PROPERTY_APPLICATION_PULSAR,
                 METADATA_PROPERTY_COMPONENT, METADATA_PROPERTY_COMPONENT_COMPACTED_LEDGER,
                 METADATA_PROPERTY_COMPACTEDTOPIC, compactedTopic.getBytes(StandardCharsets.UTF_8),
@@ -94,7 +93,7 @@ public final class LedgerMetadataUtils {
      * @return an immutable map which describes the schema
      */
     public static Map<String, byte[]> buildMetadataForSchema(String schemaId) {
-        return ImmutableMap.of(
+        return Map.of(
                 METADATA_PROPERTY_APPLICATION, METADATA_PROPERTY_APPLICATION_PULSAR,
                 METADATA_PROPERTY_COMPONENT, METADATA_PROPERTY_COMPONENT_SCHEMA,
                 METADATA_PROPERTY_SCHEMAID, schemaId.getBytes(StandardCharsets.UTF_8)
@@ -117,7 +116,7 @@ public final class LedgerMetadataUtils {
         Class<? extends EnsemblePlacementPolicy> className, Map<String, Object> properties)
         throws EnsemblePlacementPolicyConfig.ParseEnsemblePlacementPolicyConfigException {
         EnsemblePlacementPolicyConfig config = new EnsemblePlacementPolicyConfig(className, properties);
-        return ImmutableMap.of(EnsemblePlacementPolicyConfig.ENSEMBLE_PLACEMENT_POLICY_CONFIG, config.encode());
+        return Map.of(EnsemblePlacementPolicyConfig.ENSEMBLE_PLACEMENT_POLICY_CONFIG, config.encode());
     }
 
     private LedgerMetadataUtils() {}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java
index f9591d9ee6a..5a96ee08de9 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java
@@ -18,8 +18,7 @@
  */
 package org.apache.bookkeeper.mledger.impl;
 
-import static com.google.common.base.Preconditions.checkNotNull;
-import com.google.common.collect.Lists;
+import static java.util.Objects.requireNonNull;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.Map;
@@ -75,7 +74,7 @@ public class ManagedCursorContainer implements Iterable<ManagedCursor> {
     private final CursorType cursorType;
 
     // Used to keep track of slowest cursor. Contains all of all active cursors.
-    private final ArrayList<Item> heap = Lists.newArrayList();
+    private final ArrayList<Item> heap = new ArrayList();
 
     // Maps a cursor to its position in the heap
     private final ConcurrentMap<String, Item> cursors = new ConcurrentSkipListMap<>();
@@ -157,7 +156,7 @@ public class ManagedCursorContainer implements Iterable<ManagedCursor> {
      *         update).
      */
     public Pair<PositionImpl, PositionImpl> cursorUpdated(ManagedCursor cursor, Position newPosition) {
-        checkNotNull(cursor);
+        requireNonNull(cursor);
 
         long stamp = rwLock.writeLock();
         try {
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 25f1d8760b9..e610a0a35c9 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -19,7 +19,7 @@
 package org.apache.bookkeeper.mledger.impl;
 
 import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
+import static java.util.Objects.requireNonNull;
 import static org.apache.bookkeeper.mledger.ManagedLedgerException.getManagedLedgerException;
 import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC;
 import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.DEFAULT_LEDGER_DELETE_RETRIES;
@@ -31,9 +31,7 @@ import com.google.common.base.MoreObjects;
 import com.google.common.base.Predicate;
 import com.google.common.collect.Collections2;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import com.google.common.collect.Range;
-import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.RateLimiter;
 import com.google.protobuf.InvalidProtocolBufferException;
 import java.time.Clock;
@@ -41,6 +39,8 @@ import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -370,7 +370,7 @@ public class ManagedCursorImpl implements ManagedCursor {
         if (lastMarkDeleteEntry != null) {
             LAST_MARK_DELETE_ENTRY_UPDATER.updateAndGet(this, last -> {
                 Map<String, Long> properties = last.properties;
-                Map<String, Long> newProperties = properties == null ? Maps.newHashMap() : Maps.newHashMap(properties);
+                Map<String, Long> newProperties = properties == null ? new HashMap<>() : new HashMap<>(properties);
                 newProperties.put(key, value);
 
                 MarkDeleteEntry newLastMarkDeleteEntry = new MarkDeleteEntry(last.newPosition, newProperties,
@@ -417,7 +417,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                 Map<String, String> recoveredCursorProperties = Collections.emptyMap();
                 if (info.getCursorPropertiesCount() > 0) {
                     // Recover properties map
-                    recoveredCursorProperties = Maps.newHashMap();
+                    recoveredCursorProperties = new HashMap<>();
                     for (int i = 0; i < info.getCursorPropertiesCount(); i++) {
                         StringProperty property = info.getCursorProperties(i);
                         recoveredCursorProperties.put(property.getName(), property.getValue());
@@ -437,7 +437,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                     Map<String, Long> recoveredProperties = Collections.emptyMap();
                     if (info.getPropertiesCount() > 0) {
                         // Recover properties map
-                        recoveredProperties = Maps.newHashMap();
+                        recoveredProperties = new HashMap<>();
                         for (int i = 0; i < info.getPropertiesCount(); i++) {
                             LongProperty property = info.getProperties(i);
                             recoveredProperties.put(property.getName(), property.getValue());
@@ -525,7 +525,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                 Map<String, Long> recoveredProperties = Collections.emptyMap();
                 if (positionInfo.getPropertiesCount() > 0) {
                     // Recover properties map
-                    recoveredProperties = Maps.newHashMap();
+                    recoveredProperties = new HashMap<>();
                     for (int i = 0; i < positionInfo.getPropertiesCount(); i++) {
                         LongProperty property = positionInfo.getProperties(i);
                         recoveredProperties.put(property.getName(), property.getValue());
@@ -1379,7 +1379,7 @@ public class ManagedCursorImpl implements ManagedCursor {
         }
 
         // filters out messages which are already acknowledged
-        Set<Position> alreadyAcknowledgedPositions = Sets.newHashSet();
+        Set<Position> alreadyAcknowledgedPositions = new HashSet<>();
         lock.readLock().lock();
         try {
             positions.stream()
@@ -1499,7 +1499,7 @@ public class ManagedCursorImpl implements ManagedCursor {
     @Override
     public void markDelete(Position position, Map<String, Long> properties)
             throws InterruptedException, ManagedLedgerException {
-        checkNotNull(position);
+        requireNonNull(position);
         checkArgument(position instanceof PositionImpl);
 
         class Result {
@@ -1799,7 +1799,7 @@ public class ManagedCursorImpl implements ManagedCursor {
     @Override
     public void asyncMarkDelete(final Position position, Map<String, Long> properties,
             final MarkDeleteCallback callback, final Object ctx) {
-        checkNotNull(position);
+        requireNonNull(position);
         checkArgument(position instanceof PositionImpl);
 
         if (isClosed()) {
@@ -2039,7 +2039,7 @@ public class ManagedCursorImpl implements ManagedCursor {
 
     @Override
     public void delete(Iterable<Position> positions) throws InterruptedException, ManagedLedgerException {
-        checkNotNull(positions);
+        requireNonNull(positions);
 
         class Result {
             ManagedLedgerException exception = null;
@@ -2105,7 +2105,7 @@ public class ManagedCursorImpl implements ManagedCursor {
             }
 
             for (Position pos : positions) {
-                PositionImpl position  = (PositionImpl) checkNotNull(pos);
+                PositionImpl position  = (PositionImpl) requireNonNull(pos);
                 if (((PositionImpl) ledger.getLastConfirmedEntry()).compareTo(position) < 0) {
                     if (log.isDebugEnabled()) {
                         log.debug(
@@ -2726,7 +2726,7 @@ public class ManagedCursorImpl implements ManagedCursor {
             return Collections.emptyList();
         }
 
-        List<LongProperty> longProperties = Lists.newArrayList();
+        List<LongProperty> longProperties = new ArrayList<>();
         properties.forEach((name, value) -> {
             LongProperty lp = LongProperty.newBuilder().setName(name).setValue(value).build();
             longProperties.add(lp);
@@ -2740,7 +2740,7 @@ public class ManagedCursorImpl implements ManagedCursor {
             return Collections.emptyList();
         }
 
-        List<StringProperty> stringProperties = Lists.newArrayList();
+        List<StringProperty> stringProperties = new ArrayList<>();
         properties.forEach((name, value) -> {
             StringProperty sp = StringProperty.newBuilder().setName(name).setValue(value).build();
             stringProperties.add(sp);
@@ -2760,7 +2760,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                     .newBuilder();
             MLDataFormats.MessageRange.Builder messageRangeBuilder = MLDataFormats.MessageRange.newBuilder();
             AtomicInteger acksSerializedSize = new AtomicInteger(0);
-            List<MessageRange> rangeList = Lists.newArrayList();
+            List<MessageRange> rangeList = new ArrayList<>();
             individualDeletedMessages.forEach((positionRange) -> {
                 PositionImpl p = positionRange.lowerEndpoint();
                 nestedPositionBuilder.setLedgerId(p.getLedgerId());
@@ -2794,7 +2794,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                     .newBuilder();
             MLDataFormats.BatchedEntryDeletionIndexInfo.Builder batchDeletedIndexInfoBuilder = MLDataFormats
                     .BatchedEntryDeletionIndexInfo.newBuilder();
-            List<MLDataFormats.BatchedEntryDeletionIndexInfo> result = Lists.newArrayList();
+            List<MLDataFormats.BatchedEntryDeletionIndexInfo> result = new ArrayList<>();
             Iterator<Map.Entry<PositionImpl, BitSetRecyclable>> iterator = batchDeletedIndexes.entrySet().iterator();
             while (iterator.hasNext() && result.size() < config.getMaxBatchDeletedIndexToPersist()) {
                 Map.Entry<PositionImpl, BitSetRecyclable> entry = iterator.next();
@@ -2830,7 +2830,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                     position);
         }
 
-        checkNotNull(lh);
+        requireNonNull(lh);
         byte[] data = pi.toByteArray();
         lh.asyncAddEntry(data, (rc, lh1, entryId, ctx) -> {
             if (rc == BKException.Code.OK) {
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index eddf97558c2..d7596a7468a 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -28,6 +28,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
@@ -693,7 +694,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
                 }
 
                 if (pbInfo.getPropertiesCount() > 0) {
-                    info.properties = Maps.newTreeMap();
+                    info.properties = new TreeMap();
                     for (int i = 0; i < pbInfo.getPropertiesCount(); i++) {
                         MLDataFormats.KeyValue property = pbInfo.getProperties(i);
                         info.properties.put(property.getKey(), property.getValue());
@@ -744,7 +745,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
                                             }
 
                                             if (pbCursorInfo.getPropertiesCount() > 0) {
-                                                cursorInfo.properties = Maps.newTreeMap();
+                                                cursorInfo.properties = new TreeMap();
                                                 for (int i = 0; i < pbCursorInfo.getPropertiesCount(); i++) {
                                                     LongProperty property = pbCursorInfo.getProperties(i);
                                                     cursorInfo.properties.put(property.getName(), property.getValue());
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 5380ab77516..263a3612ceb 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -25,9 +25,7 @@ import static org.apache.bookkeeper.mledger.util.Errors.isNoSuchLedgerExistsExce
 import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.BoundType;
-import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import com.google.common.collect.Queues;
 import com.google.common.collect.Range;
 import io.netty.buffer.ByteBuf;
@@ -335,13 +333,13 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         this.entryCache = factory.getEntryCacheManager().getEntryCache(this);
         this.waitingCursors = Queues.newConcurrentLinkedQueue();
         this.waitingEntryCallBacks = Queues.newConcurrentLinkedQueue();
-        this.uninitializedCursors = Maps.newHashMap();
+        this.uninitializedCursors = new HashMap();
         this.clock = config.getClock();
 
         // Get the next rollover time. Add a random value upto 5% to avoid rollover multiple ledgers at the same time
         this.maximumRolloverTimeMs = getMaximumRolloverTimeMs(config);
         this.mlOwnershipChecker = mlOwnershipChecker;
-        this.propertiesMap = Maps.newHashMap();
+        this.propertiesMap = new HashMap();
         this.inactiveLedgerRollOverTimeMs = config.getInactiveLedgerRollOverTimeMs();
         if (config.getManagedLedgerInterceptor() != null) {
             this.managedLedgerInterceptor = config.getManagedLedgerInterceptor();
@@ -371,7 +369,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
                 }
 
                 if (mlInfo.getPropertiesCount() > 0) {
-                    propertiesMap = Maps.newHashMap();
+                    propertiesMap = new HashMap();
                     for (int i = 0; i < mlInfo.getPropertiesCount(); i++) {
                         MLDataFormats.KeyValue property = mlInfo.getProperties(i);
                         propertiesMap.put(property.getKey(), property.getValue());
@@ -1443,7 +1441,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
 
     private void closeAllCursors(CloseCallback callback, final Object ctx) {
         // Close all cursors in parallel
-        List<CompletableFuture<Void>> futures = Lists.newArrayList();
+        List<CompletableFuture<Void>> futures = new ArrayList();
         for (ManagedCursor cursor : cursors) {
             Futures.CloseFuture closeFuture = new Futures.CloseFuture();
             cursor.asyncClose(closeFuture, null);
@@ -2454,8 +2452,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
             return;
         }
 
-        List<LedgerInfo> ledgersToDelete = Lists.newArrayList();
-        List<LedgerInfo> offloadedLedgersToDelete = Lists.newArrayList();
+        List<LedgerInfo> ledgersToDelete = new ArrayList();
+        List<LedgerInfo> offloadedLedgersToDelete = new ArrayList();
         Optional<OffloadPolicies> optionalOffloadPolicies = Optional.ofNullable(config.getLedgerOffloader() != null
                 && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
                 ? config.getLedgerOffloader().getOffloadPolicies()
@@ -2988,7 +2986,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         } else {
             long ledgerId = info.getLedgerId();
             UUID uuid = UUID.randomUUID();
-            Map<String, String> extraMetadata = ImmutableMap.of("ManagedLedgerName", name);
+            Map<String, String> extraMetadata = Map.of("ManagedLedgerName", name);
 
             String driverName = config.getLedgerOffloader().getOffloadDriverName();
             Map<String, String> driverMetadata = config.getLedgerOffloader().getOffloadDriverMetadata();
@@ -3203,7 +3201,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
             Map<String, String> offloadDriverMetadata, String cleanupReason) {
         log.info("[{}] Cleanup offload for ledgerId {} uuid {} because of the reason {}.",
                 name, ledgerId, uuid.toString(), cleanupReason);
-        Map<String, String> metadataMap = Maps.newHashMap();
+        Map<String, String> metadataMap = new HashMap();
         metadataMap.putAll(offloadDriverMetadata);
         metadataMap.put("ManagedLedgerName", name);
 
@@ -4011,7 +4009,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
     @Override
     public CompletableFuture<Void> asyncTruncate() {
 
-        final List<CompletableFuture<Void>> futures = Lists.newArrayList();
+        final List<CompletableFuture<Void>> futures = new ArrayList();
         for (ManagedCursor cursor : cursors) {
             final CompletableFuture<Void> future = new CompletableFuture<>();
             cursor.asyncClearBacklog(new AsyncCallbacks.ClearBacklogCallback() {
@@ -4059,7 +4057,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         stats.lastConfirmedEntry = this.getLastConfirmedEntry().toString();
         stats.state = this.getState().toString();
 
-        stats.cursors = Maps.newTreeMap();
+        stats.cursors = new HashMap();
         this.getCursors().forEach(c -> {
             ManagedCursorImpl cursor = (ManagedCursorImpl) c;
             PersistentTopicInternalStats.CursorStats cs = new PersistentTopicInternalStats.CursorStats();
@@ -4097,7 +4095,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
 
         // wait until metadata has been retrieved
         FutureUtil.waitForAll(ledgerMetadataFutures.values()).thenAccept(__ -> {
-            stats.ledgers = Lists.newArrayList();
+            stats.ledgers = new ArrayList();
             ledgersInfos.forEach(li -> {
                 ManagedLedgerInternalStats.LedgerInfo info = new ManagedLedgerInternalStats.LedgerInfo();
                 info.ledgerId = li.getLedgerId();
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
index a805802e633..2fd2e02b26e 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
@@ -19,9 +19,9 @@
 package org.apache.bookkeeper.mledger.impl;
 
 import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
-import com.google.common.collect.Lists;
 import io.netty.util.Recycler;
 import io.netty.util.Recycler.Handle;
+import java.util.ArrayList;
 import java.util.List;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
 import org.apache.bookkeeper.mledger.Entry;
@@ -52,7 +52,7 @@ class OpReadEntry implements ReadEntriesCallback {
         op.cursor = cursor;
         op.count = count;
         op.callback = callback;
-        op.entries = Lists.newArrayList();
+        op.entries = new ArrayList<>();
         if (maxPosition == null) {
             maxPosition = PositionImpl.LATEST;
         }
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/RangeSetWrapper.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/RangeSetWrapper.java
index 02d7967f9fc..e3957269846 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/RangeSetWrapper.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/RangeSetWrapper.java
@@ -18,7 +18,7 @@
  */
 package org.apache.bookkeeper.mledger.impl;
 
-import static com.google.common.base.Preconditions.checkNotNull;
+import static java.util.Objects.requireNonNull;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Range;
 import java.util.ArrayList;
@@ -48,7 +48,7 @@ public class RangeSetWrapper<T extends Comparable<T>> implements LongPairRangeSe
             (LongPairConsumer<Long>) (key, value) -> key);
 
     public RangeSetWrapper(LongPairConsumer<T> rangeConverter, ManagedCursorImpl managedCursor) {
-        checkNotNull(managedCursor);
+        requireNonNull(managedCursor);
         this.config = managedCursor.getConfig();
         this.rangeConverter = rangeConverter;
         this.rangeSet = config.isUnackedRangesOpenCacheSetEnabled()
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDefaultEvictionPolicy.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDefaultEvictionPolicy.java
index f2a3cd4e51e..38f2c943d59 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDefaultEvictionPolicy.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDefaultEvictionPolicy.java
@@ -20,7 +20,7 @@ package org.apache.bookkeeper.mledger.impl.cache;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static java.util.Collections.reverseOrder;
-import com.google.common.collect.Lists;
+import java.util.ArrayList;
 import java.util.List;
 import org.apache.commons.lang3.tuple.Pair;
 import org.slf4j.Logger;
@@ -52,7 +52,7 @@ public class EntryCacheDefaultEvictionPolicy implements EntryCacheEvictionPolicy
 
         // This algorithm apply the eviction only the group of caches whose combined size reaches the
         // PercentOfSizeToConsiderForEviction
-        List<EntryCache> cachesToEvict = Lists.newArrayList();
+        List<EntryCache> cachesToEvict = new ArrayList();
         long cachesToEvictTotalSize = 0;
         long sizeToConsiderForEviction = (long) (totalSize * PercentOfSizeToConsiderForEviction);
         log.debug("Need to gather at least {} from caches", sizeToConsiderForEviction);
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java
index a09b8ba27fc..c581a0fed6f 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java
@@ -19,8 +19,7 @@
 package org.apache.bookkeeper.mledger.impl.cache;
 
 import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException;
-import com.google.common.collect.Lists;
-import com.google.common.primitives.Longs;
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import org.apache.bookkeeper.client.api.LedgerEntry;
@@ -82,7 +81,7 @@ public class EntryCacheDisabled implements EntryCache {
                                final AsyncCallbacks.ReadEntriesCallback callback, Object ctx) {
         lh.readAsync(firstEntry, lastEntry).thenAcceptAsync(
                 ledgerEntries -> {
-                    List<Entry> entries = Lists.newArrayList();
+                    List<Entry> entries = new ArrayList<>();
                     long totalSize = 0;
                     try {
                         for (LedgerEntry e : ledgerEntries) {
@@ -141,7 +140,7 @@ public class EntryCacheDisabled implements EntryCache {
 
     @Override
     public int compareTo(EntryCache other) {
-        return Longs.compare(getSize(), other.getSize());
+        return Long.compare(getSize(), other.getSize());
     }
 
 }
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
index b281ae5ef69..0d29194a883 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
@@ -19,10 +19,9 @@
 package org.apache.bookkeeper.mledger.impl.cache;
 
 import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
+import static java.util.Objects.requireNonNull;
 import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException;
 import com.google.common.collect.Lists;
-import com.google.common.primitives.Longs;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.PooledByteBufAllocator;
 import java.util.Collection;
@@ -298,8 +297,8 @@ public class RangeEntryCacheImpl implements EntryCache {
             // Read all the entries from bookkeeper
             lh.readAsync(firstEntry, lastEntry).thenAcceptAsync(
                     ledgerEntries -> {
-                        checkNotNull(ml.getName());
-                        checkNotNull(ml.getExecutor());
+                        requireNonNull(ml.getName());
+                        requireNonNull(ml.getExecutor());
 
                         try {
                             // We got the entries, we need to transform them to a List<> type
@@ -350,7 +349,7 @@ public class RangeEntryCacheImpl implements EntryCache {
 
     @Override
     public int compareTo(EntryCache other) {
-        return Longs.compare(getSize(), other.getSize());
+        return Long.compare(getSize(), other.getSize());
     }
 
     @Override
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheManagerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheManagerImpl.java
index 4c27781b1f0..e8a463c46d1 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheManagerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheManagerImpl.java
@@ -20,8 +20,8 @@ package org.apache.bookkeeper.mledger.impl.cache;
 
 import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import io.netty.buffer.ByteBuf;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -44,7 +44,7 @@ public class RangeEntryCacheManagerImpl implements EntryCacheManager {
     private volatile long evictionTriggerThreshold;
     private volatile double cacheEvictionWatermark;
     private final AtomicLong currentSize = new AtomicLong(0);
-    private final ConcurrentMap<String, EntryCache> caches = Maps.newConcurrentMap();
+    private final ConcurrentMap<String, EntryCache> caches = new ConcurrentHashMap();
     private final EntryCacheEvictionPolicy evictionPolicy;
 
     private final AtomicBoolean evictionInProgress = new AtomicBoolean(false);
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadUtils.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadUtils.java
index 4e019bd1642..767a0c78b6d 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadUtils.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadUtils.java
@@ -18,10 +18,10 @@
  */
 package org.apache.bookkeeper.mledger.offload;
 
-import com.google.common.collect.Maps;
 import com.google.protobuf.ByteString;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -43,7 +43,7 @@ public final class OffloadUtils {
     private OffloadUtils() {}
 
     public static Map<String, String> getOffloadDriverMetadata(LedgerInfo ledgerInfo) {
-        Map<String, String> metadata = Maps.newHashMap();
+        Map<String, String> metadata = new HashMap();
         if (ledgerInfo.hasOffloadContext()) {
             OffloadContext ctx = ledgerInfo.getOffloadContext();
             if (ctx.hasDriverMetadata()) {
@@ -63,7 +63,7 @@ public final class OffloadUtils {
             if (ctx.hasDriverMetadata()) {
                 OffloadDriverMetadata driverMetadata = ctx.getDriverMetadata();
                 if (driverMetadata.getPropertiesCount() > 0) {
-                    Map<String, String> metadata = Maps.newHashMap();
+                    Map<String, String> metadata = new HashMap();
                     driverMetadata.getPropertiesList().forEach(kv -> metadata.put(kv.getKey(), kv.getValue()));
                     return metadata;
                 }
@@ -155,7 +155,7 @@ public final class OffloadUtils {
         });
 
         if (ledgerMetadataFormat.getCustomMetadataCount() > 0) {
-            Map<String, byte[]> customMetadata = Maps.newHashMap();
+            Map<String, byte[]> customMetadata = new HashMap();
             ledgerMetadataFormat.getCustomMetadataList().forEach(
                     entry -> customMetadata.put(entry.getKey(), entry.getValue().toByteArray()));
             builder.withCustomMetadata(customMetadata);
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java
index 3735a0658f0..c9181e02f2c 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java
@@ -19,8 +19,8 @@
 package org.apache.bookkeeper.mledger.util;
 
 import static com.google.common.base.Preconditions.checkArgument;
-import com.google.common.collect.Lists;
 import io.netty.util.ReferenceCounted;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
@@ -111,7 +111,7 @@ public class RangeCache<Key extends Comparable<Key>, Value extends ReferenceCoun
      * @return a collections of the value found in cache
      */
     public Collection<Value> getRange(Key first, Key last) {
-        List<Value> values = Lists.newArrayList();
+        List<Value> values = new ArrayList();
 
         // Return the values of the entries found in cache
         for (Value value : entries.subMap(first, true, last, true).values()) {
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/SimpleBookKeeperTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/SimpleBookKeeperTest.java
index c1b4894c736..d0a92f0ab9e 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/SimpleBookKeeperTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/SimpleBookKeeperTest.java
@@ -18,8 +18,8 @@
  */
 package org.apache.bookkeeper.mledger;
 
-import com.google.common.base.Charsets;
 import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
 import java.util.Enumeration;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.client.LedgerEntry;
@@ -32,7 +32,7 @@ import org.testng.annotations.Test;
 public class SimpleBookKeeperTest extends MockedBookKeeperTestCase {
 
     private static final String SECRET = "secret";
-    private static final Charset Encoding = Charsets.UTF_8;
+    private static final Charset Encoding = StandardCharsets.UTF_8;
 
     @Test
     public void simpleTest() throws Exception {
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheTest.java
index 4557c74f1fd..c1ee5a1083a 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheTest.java
@@ -28,8 +28,8 @@ import static org.testng.Assert.assertEquals;
 
 import io.netty.buffer.Unpooled;
 
+import java.util.ArrayList;
 import java.util.List;
-import java.util.Vector;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 
@@ -297,7 +297,7 @@ public class EntryCacheTest extends MockedBookKeeperTestCase {
                 long firstEntry = (Long) args[0];
                 long lastEntry = (Long) args[1];
 
-                Vector<LedgerEntry> entries = new Vector<LedgerEntry>();
+                List<LedgerEntry> entries = new ArrayList<>();
                 for (int i = 0; i <= (lastEntry - firstEntry); i++) {
                     entries.add(LedgerEntryImpl.create(0, i, 10, Unpooled.wrappedBuffer(new byte[10])));
                 }
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorConcurrencyTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorConcurrencyTest.java
index 1fcaef0ad9b..b590d222f3b 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorConcurrencyTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorConcurrencyTest.java
@@ -24,6 +24,8 @@ import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNull;
 
 import com.google.common.collect.Lists;
+
+import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
@@ -76,7 +78,7 @@ public class ManagedCursorConcurrencyTest extends MockedBookKeeperTestCase {
 
         final ManagedCursor cursor = ledger.openCursor("c1");
 
-        final List<Position> addedEntries = Lists.newArrayList();
+        final List<Position> addedEntries = new ArrayList();
 
         for (int i = 0; i < 1000; i++) {
             Position pos = ledger.addEntry("entry".getBytes());
@@ -136,7 +138,7 @@ public class ManagedCursorConcurrencyTest extends MockedBookKeeperTestCase {
         final CompletableFuture<String> closeFuture = new CompletableFuture<>();
         final String CLOSED = "closed";
 
-        final List<Position> addedEntries = Lists.newArrayList();
+        final List<Position> addedEntries = new ArrayList();
 
         for (int i = 0; i < 1000; i++) {
             Position pos = ledger.addEntry("entry".getBytes());
@@ -215,7 +217,7 @@ public class ManagedCursorConcurrencyTest extends MockedBookKeeperTestCase {
 
         final ManagedCursor cursor = ledger.openCursor("c1");
 
-        final List<Position> addedEntries = Lists.newArrayList();
+        final List<Position> addedEntries = new ArrayList();
 
         for (int i = 0; i < 1000; i++) {
             Position pos = ledger.addEntry("entry".getBytes());
@@ -314,7 +316,7 @@ public class ManagedCursorConcurrencyTest extends MockedBookKeeperTestCase {
     public void testConcurrentReadOfSameEntry() throws Exception {
         ManagedLedger ledger = factory.open("testConcurrentReadOfSameEntry", new ManagedLedgerConfig());
         final int numCursors = 5;
-        final List<ManagedCursor> cursors = Lists.newArrayList();
+        final List<ManagedCursor> cursors = new ArrayList();
         for (int i = 0; i < numCursors; i++) {
             final ManagedCursor cursor = ledger.openCursor("c" + i);
             cursors.add(cursor);
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
index 05f34df47c1..1d9315ee296 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
@@ -29,6 +29,8 @@ import com.google.common.base.Predicate;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Range;
 import com.google.common.collect.Sets;
+
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -89,7 +91,7 @@ public class ManagedCursorContainerTest {
 
         @Override
         public List<Entry> readEntries(int numberOfEntriesToRead) throws ManagedLedgerException {
-            return Lists.newArrayList();
+            return new ArrayList();
         }
 
         @Override
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorIndividualDeletedMessagesTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorIndividualDeletedMessagesTest.java
index a5b921cad76..b3d3f1117e6 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorIndividualDeletedMessagesTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorIndividualDeletedMessagesTest.java
@@ -23,10 +23,10 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.testng.Assert.assertEquals;
 
-import com.google.common.collect.Lists;
 import com.google.common.collect.Range;
 
 import java.lang.reflect.Method;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.NavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
@@ -65,9 +65,9 @@ public class ManagedCursorIndividualDeletedMessagesTest {
         recoverMethod.setAccessible(true);
 
         // (1) [(1:5..1:10]]
-        List<MessageRange> messageRangeList = Lists.newArrayList();
+        List<MessageRange> messageRangeList = new ArrayList();
         messageRangeList.add(createMessageRange(1, 5, 1, 10));
-        List<Range<PositionImpl>> expectedRangeList = Lists.newArrayList();
+        List<Range<PositionImpl>> expectedRangeList = new ArrayList();
         expectedRangeList.add(createPositionRange(1, 5, 1, 10));
         recoverMethod.invoke(cursor, messageRangeList);
         assertEquals(deletedMessages.size(), 1);
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorListAckTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorListAckTest.java
index dc876e19eb1..bdc5bdb4a5f 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorListAckTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorListAckTest.java
@@ -20,10 +20,10 @@ package org.apache.bookkeeper.mledger.impl;
 
 import static org.testng.Assert.assertEquals;
 
-import com.google.common.base.Charsets;
 import com.google.common.collect.Lists;
 
 import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
 
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedger;
@@ -34,7 +34,7 @@ import org.testng.annotations.Test;
 
 public class ManagedCursorListAckTest extends MockedBookKeeperTestCase {
 
-    private static final Charset Encoding = Charsets.UTF_8;
+    private static final Charset Encoding = StandardCharsets.UTF_8;
 
     @Test(timeOut = 20000)
     void testMultiPositionDelete() throws Exception {
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index b44590bc153..052f6ac2d54 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -31,10 +31,8 @@ import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
-import com.google.common.base.Charsets;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Range;
-import com.google.common.collect.Sets;
 import java.lang.reflect.Field;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
@@ -43,6 +41,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.BitSet;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -114,7 +113,7 @@ import org.testng.annotations.Test;
 
 public class ManagedCursorTest extends MockedBookKeeperTestCase {
 
-    private static final Charset Encoding = Charsets.UTF_8;
+    private static final Charset Encoding = StandardCharsets.UTF_8;
 
     @DataProvider(name = "useOpenRangeSet")
     public static Object[][] useOpenRangeSet() {
@@ -758,7 +757,7 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
         final int Messages = 100;
         final int Consumers = 5;
 
-        List<Future<AtomicBoolean>> futures = Lists.newArrayList();
+        List<Future<AtomicBoolean>> futures = new ArrayList();
         @Cleanup("shutdownNow")
         ExecutorService executor = Executors.newCachedThreadPool();
         final CyclicBarrier barrier = new CyclicBarrier(Consumers + 1);
@@ -1165,7 +1164,7 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
         final ManagedCursor c1 = ledger.openCursor("c1");
 
         final int N = 100;
-        List<Position> positions = Lists.newArrayList();
+        List<Position> positions = new ArrayList();
         for (int i = 0; i < N; i++) {
             Position p = ledger.addEntry("dummy-entry".getBytes(Encoding));
             positions.add(p);
@@ -1818,7 +1817,7 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
         final int Messages = 100;
         final int Consumers = 10;
 
-        List<Future<Void>> futures = Lists.newArrayList();
+        List<Future<Void>> futures = new ArrayList();
         @Cleanup("shutdownNow")
         ExecutorService executor = Executors.newCachedThreadPool();
         final CyclicBarrier barrier = new CyclicBarrier(Consumers + 1);
@@ -2561,7 +2560,7 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
         ledger.addEntry("entry4".getBytes(Encoding));
 
         // 1. Replay empty position set should return empty entry set
-        Set<PositionImpl> positions = Sets.newHashSet();
+        Set<PositionImpl> positions = new HashSet();
         assertTrue(c1.replayEntries(positions).isEmpty());
 
         positions.add(p1);
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java
index b79db341ec3..9026c0f6ac4 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java
@@ -23,9 +23,10 @@ import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
-import com.google.common.base.Charsets;
-import com.google.common.collect.Lists;
 import io.netty.buffer.ByteBuf;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
@@ -168,7 +169,7 @@ public class ManagedLedgerBkTest extends BookKeeperClusterTestCase {
         final AtomicBoolean done = new AtomicBoolean();
         final CyclicBarrier barrier = new CyclicBarrier(NumProducers + NumConsumers + 1);
 
-        List<Future<?>> futures = Lists.newArrayList();
+        List<Future<?>> futures = new ArrayList();
 
         for (int i = 0; i < NumProducers; i++) {
             futures.add(executor.submit(() -> {
@@ -254,12 +255,12 @@ public class ManagedLedgerBkTest extends BookKeeperClusterTestCase {
         mlConfig.setMetadataMaxEntriesPerLedger(10);
         ManagedLedger ledger = factory.open("ml-markdelete-ledger", mlConfig);
 
-        final List<Position> addedEntries = Lists.newArrayList();
+        final List<Position> addedEntries = new ArrayList();
 
         int numCursors = 10;
         final CyclicBarrier barrier = new CyclicBarrier(numCursors);
 
-        List<ManagedCursor> cursors = Lists.newArrayList();
+        List<ManagedCursor> cursors = new ArrayList();
         for (int i = 0; i < numCursors; i++) {
             cursors.add(ledger.openCursor(String.format("c%d", i)));
         }
@@ -269,7 +270,7 @@ public class ManagedLedgerBkTest extends BookKeeperClusterTestCase {
             addedEntries.add(pos);
         }
 
-        List<Future<?>> futures = Lists.newArrayList();
+        List<Future<?>> futures = new ArrayList();
 
         for (ManagedCursor cursor : cursors) {
             futures.add(executor.submit(() -> {
@@ -303,7 +304,7 @@ public class ManagedLedgerBkTest extends BookKeeperClusterTestCase {
         ManagedLedger ledger = factory.open("my_test_ledger" + testName, config);
         ManagedCursor cursor = ledger.openCursor("c1");
 
-        List<Position> positions = Lists.newArrayList();
+        List<Position> positions = new ArrayList();
 
         for (int i = 0; i < 10; i++) {
             Position p = ledger.addEntry("entry".getBytes());
@@ -435,7 +436,7 @@ public class ManagedLedgerBkTest extends BookKeeperClusterTestCase {
         ledger.close();
 
         ManagedLedgerOfflineBacklog offlineTopicBacklog = new ManagedLedgerOfflineBacklog(
-                DigestType.CRC32, "".getBytes(Charsets.UTF_8), "", false);
+                DigestType.CRC32, "".getBytes(StandardCharsets.UTF_8), "", false);
         PersistentOfflineTopicStats offlineTopicStats = offlineTopicBacklog.getEstimatedUnloadedTopicBacklog(
                 (ManagedLedgerFactoryImpl) factory, "property/cluster/namespace/my-ledger");
         assertNotNull(offlineTopicStats);
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryChangeLedgerPathTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryChangeLedgerPathTest.java
index 05391c0862a..ba27777d6d5 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryChangeLedgerPathTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryChangeLedgerPathTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.bookkeeper.mledger.impl;
 
+import java.nio.charset.StandardCharsets;
 import java.util.List;
 import lombok.Cleanup;
 import org.apache.bookkeeper.common.allocator.PoolingPolicy;
@@ -66,7 +67,7 @@ public class ManagedLedgerFactoryChangeLedgerPathTest extends BookKeeperClusterT
 
         for (int i = 0; i < 10; i++) {
             String entry = "entry" + i;
-            ledger.addEntry(entry.getBytes("UTF8"));
+            ledger.addEntry(entry.getBytes(StandardCharsets.UTF_8));
         }
 
         List<Entry> entryList = cursor.readEntries(10);
@@ -74,7 +75,7 @@ public class ManagedLedgerFactoryChangeLedgerPathTest extends BookKeeperClusterT
 
         for (int i = 0; i < 10; i++) {
             Entry entry = entryList.get(i);
-            Assert.assertEquals(("entry" + i).getBytes("UTF8"), entry.getData());
+            Assert.assertEquals(("entry" + i).getBytes(StandardCharsets.UTF_8), entry.getData());
         }
     }
     @Test(timeOut = 60000)
@@ -103,7 +104,7 @@ public class ManagedLedgerFactoryChangeLedgerPathTest extends BookKeeperClusterT
 
         for (int i = 0; i < 10; i++) {
             String entry = "entry" + i;
-            ledger.addEntry(entry.getBytes("UTF8"));
+            ledger.addEntry(entry.getBytes(StandardCharsets.UTF_8));
         }
 
         List<Entry> entryList = cursor.readEntries(10);
@@ -111,7 +112,7 @@ public class ManagedLedgerFactoryChangeLedgerPathTest extends BookKeeperClusterT
 
         for (int i = 0; i < 10; i++) {
             Entry entry = entryList.get(i);
-            Assert.assertEquals(("entry" + i).getBytes("UTF8"), entry.getData());
+            Assert.assertEquals(("entry" + i).getBytes(StandardCharsets.UTF_8), entry.getData());
         }
     }
 }
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerSingleBookieTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerSingleBookieTest.java
index 8a7f64c76a8..305ae15109d 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerSingleBookieTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerSingleBookieTest.java
@@ -22,8 +22,8 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 
-import com.google.common.base.Charsets;
 import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.bookkeeper.mledger.Entry;
@@ -35,7 +35,7 @@ import org.testng.annotations.Test;
 
 public class ManagedLedgerSingleBookieTest extends MockedBookKeeperTestCase {
 
-    private static final Charset Encoding = Charsets.UTF_8;
+    private static final Charset Encoding = StandardCharsets.UTF_8;
 
     public ManagedLedgerSingleBookieTest() {
         // Just one bookie
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 3293c98c5c6..37fa56cd989 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -37,7 +37,6 @@ import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertSame;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
-import com.google.common.base.Charsets;
 import com.google.common.collect.Sets;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
@@ -45,11 +44,13 @@ import io.netty.util.concurrent.DefaultThreadFactory;
 import java.lang.reflect.Field;
 import java.nio.ReadOnlyBufferException;
 import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
 import java.security.GeneralSecurityException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -134,7 +135,7 @@ import org.testng.annotations.Test;
 
 @Slf4j
 public class ManagedLedgerTest extends MockedBookKeeperTestCase {
-    private static final Charset Encoding = Charsets.UTF_8;
+    private static final Charset Encoding = StandardCharsets.UTF_8;
 
     @DataProvider(name = "checkOwnershipFlag")
     public Object[][] checkOwnershipFlagProvider() {
@@ -1487,7 +1488,7 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
 
         c2.close();
         ledger.deleteCursor("c2");
-        assertEquals(Sets.newHashSet(ledger.getCursors()), Sets.newHashSet());
+        assertEquals(Sets.newHashSet(ledger.getCursors()), new HashSet());
     }
 
     @Test
@@ -1614,7 +1615,7 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
     public void ledgersList() throws Exception {
         MetaStore store = factory.getMetaStore();
 
-        assertEquals(Sets.newHashSet(store.getManagedLedgers()), Sets.newHashSet());
+        assertEquals(Sets.newHashSet(store.getManagedLedgers()), new HashSet());
         ManagedLedger ledger1 = factory.open("ledger1");
         assertEquals(Sets.newHashSet(store.getManagedLedgers()), Sets.newHashSet("ledger1"));
         ManagedLedger ledger2 = factory.open("ledger2");
@@ -1622,7 +1623,7 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
         ledger1.delete();
         assertEquals(Sets.newHashSet(store.getManagedLedgers()), Sets.newHashSet("ledger2"));
         ledger2.delete();
-        assertEquals(Sets.newHashSet(store.getManagedLedgers()), Sets.newHashSet());
+        assertEquals(Sets.newHashSet(store.getManagedLedgers()), new HashSet());
     }
 
     @Test
@@ -2667,7 +2668,7 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
         // Open Cursor also adds cursor into activeCursor-container
         ManagedCursor cursor1 = ledger.openCursor("c1");
         ManagedCursor cursor2 = ledger.openCursor("c2");
-        Set<ManagedCursor> activeCursors = Sets.newHashSet();
+        Set<ManagedCursor> activeCursors = new HashSet();
         activeCursors.add(cursor1);
         activeCursors.add(cursor2);
         EntryCache entryCache = Whitebox.getInternalState(ledger, "entryCache");
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
index 3945347ad65..f73ebf122b7 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
@@ -26,11 +26,10 @@ import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
-import com.google.common.base.Charsets;
 import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
 
 import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
@@ -60,7 +59,7 @@ import org.testng.annotations.Test;
 
 public class NonDurableCursorTest extends MockedBookKeeperTestCase {
 
-    private static final Charset Encoding = Charsets.UTF_8;
+    private static final Charset Encoding = StandardCharsets.UTF_8;
 
     @Test(timeOut = 20000)
     void readFromEmptyLedger() throws Exception {
@@ -110,14 +109,14 @@ public class NonDurableCursorTest extends MockedBookKeeperTestCase {
         ManagedLedger ledger = factory.open("my_test_ledger");
 
         ManagedCursor c1 = ledger.newNonDurableCursor(PositionImpl.EARLIEST);
-        assertFalse(Iterables.isEmpty(ledger.getCursors()));
+        assertTrue(ledger.getCursors().iterator().hasNext());
 
         c1.close();
         ledger.close();
 
         // Re-open
         ManagedLedger ledger2 = factory.open("my_test_ledger");
-        assertTrue(Iterables.isEmpty(ledger2.getCursors()));
+        assertTrue(!ledger2.getCursors().iterator().hasNext());
     }
 
     @Test(timeOut = 20000)
@@ -707,7 +706,7 @@ public class NonDurableCursorTest extends MockedBookKeeperTestCase {
         assertEquals(nonDurableCursor.getNumberOfEntries(), 0);
         assertEquals(nonDurableCursor.getNumberOfEntriesInBacklog(true), 0);
 
-        List<Position> positions = Lists.newArrayList();
+        List<Position> positions = new ArrayList();
         for (int i = 0; i < 10; i++) {
             positions.add(ledger.addEntry(("entry-" + i).getBytes(UTF_8)));
         }
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
index bbaef697935..f49ce9961fd 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
@@ -29,9 +29,9 @@ import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.testng.Assert.assertEquals;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
 import io.netty.buffer.ByteBuf;
+
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
@@ -273,7 +273,7 @@ public class OffloadPrefixReadTest extends MockedBookKeeperTestCase {
 
     static class MockOffloadReadHandle implements ReadHandle {
         final long id;
-        final List<ByteBuf> entries = Lists.newArrayList();
+        final List<ByteBuf> entries = new ArrayList();
         final LedgerMetadata metadata;
 
         MockOffloadReadHandle(ReadHandle toCopy) throws Exception {
@@ -302,7 +302,7 @@ public class OffloadPrefixReadTest extends MockedBookKeeperTestCase {
 
         @Override
         public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntry) {
-            List<LedgerEntry> readEntries = Lists.newArrayList();
+            List<LedgerEntry> readEntries = new ArrayList();
             for (long eid = firstEntry; eid <= lastEntry; eid++) {
                 ByteBuf buf = entries.get((int)eid).retainedSlice();
                 readEntries.add(LedgerEntryImpl.create(id, eid, buf.readableBytes(), buf));
@@ -381,7 +381,7 @@ public class OffloadPrefixReadTest extends MockedBookKeeperTestCase {
             metadataFormatVersion = toCopy.getMetadataFormatVersion();
             state = toCopy.getState();
             password = Arrays.copyOf(toCopy.getPassword(), toCopy.getPassword().length);
-            customMetadata = ImmutableMap.copyOf(toCopy.getCustomMetadata());
+            customMetadata = Map.copyOf(toCopy.getCustomMetadata());
         }
 
         @Override
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
index a59f53d4575..3f1bfcca940 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
@@ -23,7 +23,6 @@ import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
-import com.google.common.collect.ImmutableSet;
 import java.lang.reflect.Field;
 import java.util.HashMap;
 import java.util.Map;
@@ -460,7 +459,7 @@ public class OffloadPrefixTest extends MockedBookKeeperTestCase {
         assertEquals(ledger.getLedgersInfoAsList().size(), 4);
 
         // mark ledgers to fail
-        promise.complete(ImmutableSet.of(ledger.getLedgersInfoAsList().get(failIndex).getLedgerId()));
+        promise.complete(Set.of(ledger.getLedgersInfoAsList().get(failIndex).getLedgerId()));
 
         try {
             ledger.offloadPrefix(ledger.getLastConfirmedEntry());
@@ -727,7 +726,7 @@ public class OffloadPrefixTest extends MockedBookKeeperTestCase {
                             .filter(e -> e.getOffloadContext().getComplete())
                             .map(e -> e.getLedgerId()).collect(Collectors.toSet()),
                             offloader.offloadedLedgers());
-        assertEquals(offloader.offloadedLedgers(), ImmutableSet.of(firstLedgerId, thirdLedgerId));
+        assertEquals(offloader.offloadedLedgers(), Set.of(firstLedgerId, thirdLedgerId));
     }
 
     private static byte[] buildEntry(int size, String pattern) {
@@ -762,7 +761,7 @@ public class OffloadPrefixTest extends MockedBookKeeperTestCase {
         // offload should eventually be triggered
         assertEventuallyTrue(() -> offloader.offloadedLedgers().size() == 1);
         assertEquals(offloader.offloadedLedgers(),
-                            ImmutableSet.of(ledger.getLedgersInfoAsList().get(0).getLedgerId()));
+                Set.of(ledger.getLedgersInfoAsList().get(0).getLedgerId()));
     }
 
     @Test
@@ -811,7 +810,7 @@ public class OffloadPrefixTest extends MockedBookKeeperTestCase {
         // eventually all over threshold will be offloaded
         assertEventuallyTrue(() -> offloader.offloadedLedgers().size() == 3);
         assertEquals(offloader.offloadedLedgers(),
-                            ImmutableSet.of(ledger.getLedgersInfoAsList().get(0).getLedgerId(),
+                Set.of(ledger.getLedgersInfoAsList().get(0).getLedgerId(),
                                             ledger.getLedgersInfoAsList().get(1).getLedgerId(),
                                             ledger.getLedgersInfoAsList().get(2).getLedgerId()));
 
@@ -820,7 +819,7 @@ public class OffloadPrefixTest extends MockedBookKeeperTestCase {
 
         assertEquals(offloader.offloadedLedgers().size(), 4);
         assertEquals(offloader.offloadedLedgers(),
-                            ImmutableSet.of(ledger.getLedgersInfoAsList().get(0).getLedgerId(),
+                Set.of(ledger.getLedgersInfoAsList().get(0).getLedgerId(),
                                             ledger.getLedgersInfoAsList().get(1).getLedgerId(),
                                             ledger.getLedgersInfoAsList().get(2).getLedgerId(),
                                             ledger.getLedgersInfoAsList().get(3).getLedgerId()));
@@ -873,7 +872,7 @@ public class OffloadPrefixTest extends MockedBookKeeperTestCase {
         // auto trigger should eventually offload everything else over threshold
         assertEventuallyTrue(() -> offloader.offloadedLedgers().size() == 2);
         assertEquals(offloader.offloadedLedgers(),
-                            ImmutableSet.of(ledger.getLedgersInfoAsList().get(0).getLedgerId(),
+                Set.of(ledger.getLedgersInfoAsList().get(0).getLedgerId(),
                                             ledger.getLedgersInfoAsList().get(1).getLedgerId()));
     }
 
@@ -918,7 +917,7 @@ public class OffloadPrefixTest extends MockedBookKeeperTestCase {
 
         assertEventuallyTrue(() -> offloader.offloadedLedgers().size() == 3);
         assertEquals(offloader.offloadedLedgers(),
-                            ImmutableSet.of(ledger.getLedgersInfoAsList().get(0).getLedgerId(),
+                Set.of(ledger.getLedgersInfoAsList().get(0).getLedgerId(),
                                             ledger.getLedgersInfoAsList().get(1).getLedgerId(),
                                             ledger.getLedgersInfoAsList().get(2).getLedgerId()));
     }
@@ -941,7 +940,7 @@ public class OffloadPrefixTest extends MockedBookKeeperTestCase {
 
         assertEventuallyTrue(() -> offloader.offloadedLedgers().size() == 1);
         assertEquals(offloader.offloadedLedgers(),
-                            ImmutableSet.of(ledger.getLedgersInfoAsList().get(0).getLedgerId()));
+                Set.of(ledger.getLedgersInfoAsList().get(0).getLedgerId()));
 
         for (int i = 0; i < 10; i++) {
             ledger.addEntry(buildEntry(10, "entry-" + i));
@@ -949,7 +948,7 @@ public class OffloadPrefixTest extends MockedBookKeeperTestCase {
 
         assertEventuallyTrue(() -> offloader.offloadedLedgers().size() == 2);
         assertEquals(offloader.offloadedLedgers(),
-                            ImmutableSet.of(ledger.getLedgersInfoAsList().get(0).getLedgerId(),
+                Set.of(ledger.getLedgersInfoAsList().get(0).getLedgerId(),
                                             ledger.getLedgersInfoAsList().get(1).getLedgerId()));
     }
 
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/RangeSetWrapperTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/RangeSetWrapperTest.java
index 88e12910ab8..aeb8f97bfae 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/RangeSetWrapperTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/RangeSetWrapperTest.java
@@ -25,7 +25,6 @@ import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import com.google.common.collect.BoundType;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Range;
 import com.google.common.collect.TreeRangeSet;
 import java.util.ArrayList;
@@ -194,7 +193,7 @@ public class RangeSetWrapperTest {
         // add 10K values for key 0
         int totalInsert = 10_000;
         // add single values
-        List<Range<LongPair>> removedRanges = Lists.newArrayList();
+        List<Range<LongPair>> removedRanges = new ArrayList();
         for (int i = 0; i < totalInsert; i++) {
             if (i % 3 == 0 || i % 7 == 0 || i % 11 == 0) {
                 continue;
@@ -478,7 +477,7 @@ public class RangeSetWrapperTest {
 
 
     private List<Range<LongPair>> getConnectedRange(Set<Range<LongPair>> gRanges) {
-        List<Range<LongPair>> gRangeConnected = Lists.newArrayList();
+        List<Range<LongPair>> gRangeConnected = new ArrayList();
         Range<LongPair> lastRange = null;
         for (Range<LongPair> range : gRanges) {
             if (lastRange == null) {


[pulsar] 03/04: [managed-ledger] Do not send duplicate reads to BK/offloaders (#17241)

Posted by te...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 55a0cb4841e1329bd2137611bad391b5f483ae70
Author: Enrico Olivelli <eo...@apache.org>
AuthorDate: Fri Sep 2 15:09:34 2022 +0200

    [managed-ledger] Do not send duplicate reads to BK/offloaders (#17241)
---
 .../mledger/impl/cache/PendingReadsManager.java    | 447 ++++++++++++++++++++
 .../mledger/impl/cache/RangeEntryCacheImpl.java    | 106 +++--
 .../impl/cache/PendingReadsManagerTest.java        | 462 +++++++++++++++++++++
 .../client/api/MessageDispatchThrottlingTest.java  |   8 +
 4 files changed, 986 insertions(+), 37 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java
new file mode 100644
index 00000000000..4c374d8ace6
--- /dev/null
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java
@@ -0,0 +1,447 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl.cache;
+
+import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException;
+import io.prometheus.client.Counter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import lombok.AllArgsConstructor;
+import lombok.Value;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.impl.EntryImpl;
+
+/**
+ * PendingReadsManager tries to prevent sending duplicate reads to BK.
+ */
+public class PendingReadsManager {
+
+    private static final Counter COUNT_ENTRIES_READ_FROM_BK = Counter
+            .build()
+            .name("pulsar_ml_cache_pendingreads_entries_read")
+            .help("Total number of entries read from BK")
+            .register();
+
+    private static final Counter COUNT_ENTRIES_NOTREAD_FROM_BK = Counter
+            .build()
+            .name("pulsar_ml_cache_pendingreads_entries_notread")
+            .help("Total number of entries not read from BK")
+            .register();
+
+    private static final Counter COUNT_PENDING_READS_MATCHED = Counter
+            .build()
+            .name("pulsar_ml_cache_pendingreads_matched")
+            .help("Pending reads reused with perfect range match")
+            .register();
+    private static final Counter COUNT_PENDING_READS_MATCHED_INCLUDED = Counter
+            .build()
+            .name("pulsar_ml_cache_pendingreads_matched_included")
+            .help("Pending reads reused by attaching to a read with a larger range")
+            .register();
+    private static final Counter COUNT_PENDING_READS_MISSED = Counter
+            .build()
+            .name("pulsar_ml_cache_pendingreads_missed")
+            .help("Pending reads that didn't find a match")
+            .register();
+
+    private static final Counter COUNT_PENDING_READS_MATCHED_OVERLAPPING_MISS_LEFT = Counter
+            .build()
+            .name("pulsar_ml_cache_pendingreads_matched_overlapping_miss_left")
+            .help("Pending reads that didn't find a match but they partially overlap with another read")
+            .register();
+
+    private static final Counter COUNT_PENDING_READS_MATCHED_BUT_OVERLAPPING_MISS_RIGHT = Counter
+            .build()
+            .name("pulsar_ml_cache_pendingreads_matched_overlapping_miss_right")
+            .help("Pending reads that didn't find a match but they partially overlap with another read")
+            .register();
+
+    private static final Counter COUNT_PENDING_READS_MATCHED_BUT_OVERLAPPING_MISS_BOTH = Counter
+            .build()
+            .name("pulsar_ml_cache_pendingreads_matched_overlapping_miss_both")
+            .help("Pending reads that didn't find a match but they partially overlap with another read")
+            .register();
+
+    private final RangeEntryCacheImpl rangeEntryCache;
+    private final ConcurrentHashMap<Long, ConcurrentHashMap<PendingReadKey, PendingRead>> cachedPendingReads =
+            new ConcurrentHashMap<>();
+
+    public PendingReadsManager(RangeEntryCacheImpl rangeEntryCache) {
+        this.rangeEntryCache = rangeEntryCache;
+    }
+
+    @Value
+    private static class PendingReadKey {
+        private final long startEntry;
+        private final long endEntry;
+        long size() {
+            return endEntry - startEntry + 1;
+        }
+
+
+        boolean includes(PendingReadKey other) {
+            return startEntry <= other.startEntry && other.endEntry <= endEntry;
+        }
+
+        boolean overlaps(PendingReadKey other) {
+            return (other.startEntry <= startEntry && startEntry <= other.endEntry)
+                    || (other.startEntry <= endEntry && endEntry <= other.endEntry);
+        }
+
+        PendingReadKey reminderOnLeft(PendingReadKey other) {
+            //   S******-----E
+            //          S----------E
+            if (other.startEntry <= endEntry
+                    && other.startEntry > startEntry) {
+                return new PendingReadKey(startEntry, other.startEntry - 1);
+            }
+            return null;
+        }
+
+        PendingReadKey reminderOnRight(PendingReadKey other) {
+            //          S-----*******E
+            //   S-----------E
+            if (startEntry <= other.endEntry
+                    && other.endEntry < endEntry) {
+                return new PendingReadKey(other.endEntry + 1, endEntry);
+            }
+            return null;
+        }
+
+    }
+
+    @AllArgsConstructor
+    private static final class ReadEntriesCallbackWithContext {
+        final AsyncCallbacks.ReadEntriesCallback callback;
+        final Object ctx;
+        final long startEntry;
+        final long endEntry;
+    }
+
+    @AllArgsConstructor
+    private static final class FindPendingReadOutcome {
+        final PendingRead pendingRead;
+        final PendingReadKey missingOnLeft;
+        final PendingReadKey missingOnRight;
+        boolean needsAdditionalReads() {
+            return missingOnLeft != null || missingOnRight != null;
+        }
+    }
+
+    private FindPendingReadOutcome findPendingRead(PendingReadKey key, Map<PendingReadKey,
+            PendingRead> ledgerCache, AtomicBoolean created) {
+        synchronized (ledgerCache) {
+            PendingRead existing = ledgerCache.get(key);
+            if (existing != null) {
+                COUNT_PENDING_READS_MATCHED.inc(key.size());
+                COUNT_ENTRIES_NOTREAD_FROM_BK.inc(key.size());
+                return new FindPendingReadOutcome(existing, null, null);
+            }
+            FindPendingReadOutcome foundButMissingSomethingOnLeft = null;
+            FindPendingReadOutcome foundButMissingSomethingOnRight = null;
+            FindPendingReadOutcome foundButMissingSomethingOnBoth = null;
+
+            for (Map.Entry<PendingReadKey, PendingRead> entry : ledgerCache.entrySet()) {
+                PendingReadKey entryKey = entry.getKey();
+                if (entryKey.includes(key)) {
+                    COUNT_PENDING_READS_MATCHED_INCLUDED.inc(key.size());
+                    COUNT_ENTRIES_NOTREAD_FROM_BK.inc(key.size());
+                    return new FindPendingReadOutcome(entry.getValue(), null, null);
+                }
+                if (entryKey.overlaps(key)) {
+                    PendingReadKey reminderOnLeft = key.reminderOnLeft(entryKey);
+                    PendingReadKey reminderOnRight = key.reminderOnRight(entryKey);
+                    if (reminderOnLeft != null && reminderOnRight != null) {
+                        foundButMissingSomethingOnBoth = new FindPendingReadOutcome(entry.getValue(),
+                                reminderOnLeft, reminderOnRight);
+                    } else if (reminderOnRight != null && reminderOnLeft == null) {
+                        foundButMissingSomethingOnRight = new FindPendingReadOutcome(entry.getValue(),
+                                null, reminderOnRight);
+                    } else if (reminderOnLeft != null && reminderOnRight == null) {
+                        foundButMissingSomethingOnLeft = new FindPendingReadOutcome(entry.getValue(),
+                                reminderOnLeft, null);
+                    }
+                }
+            }
+
+            if (foundButMissingSomethingOnRight != null) {
+                long delta = key.size()
+                        - foundButMissingSomethingOnRight.missingOnRight.size();
+                COUNT_PENDING_READS_MATCHED_BUT_OVERLAPPING_MISS_RIGHT.inc(delta);
+                COUNT_ENTRIES_NOTREAD_FROM_BK.inc(delta);
+                return foundButMissingSomethingOnRight;
+            } else if (foundButMissingSomethingOnLeft != null) {
+                long delta = key.size()
+                        - foundButMissingSomethingOnLeft.missingOnLeft.size();
+                COUNT_PENDING_READS_MATCHED_OVERLAPPING_MISS_LEFT.inc(delta);
+                COUNT_ENTRIES_NOTREAD_FROM_BK.inc(delta);
+                return foundButMissingSomethingOnLeft;
+            } else if (foundButMissingSomethingOnBoth != null) {
+                long delta = key.size()
+                        - foundButMissingSomethingOnBoth.missingOnRight.size()
+                        - foundButMissingSomethingOnBoth.missingOnLeft.size();
+                COUNT_ENTRIES_NOTREAD_FROM_BK.inc(delta);
+                COUNT_PENDING_READS_MATCHED_BUT_OVERLAPPING_MISS_BOTH.inc(delta);
+                return foundButMissingSomethingOnBoth;
+            }
+
+            created.set(true);
+            PendingRead newRead = new PendingRead(key, ledgerCache);
+            ledgerCache.put(key, newRead);
+            long delta = key.size();
+            COUNT_PENDING_READS_MISSED.inc(delta);
+            COUNT_ENTRIES_READ_FROM_BK.inc(delta);
+            return new FindPendingReadOutcome(newRead, null, null);
+        }
+    }
+
+    private class PendingRead {
+        final PendingReadKey key;
+        final Map<PendingReadKey, PendingRead> ledgerCache;
+        final List<ReadEntriesCallbackWithContext> callbacks = new ArrayList<>(1);
+        boolean completed = false;
+
+        public PendingRead(PendingReadKey key,
+                           Map<PendingReadKey, PendingRead> ledgerCache) {
+            this.key = key;
+            this.ledgerCache = ledgerCache;
+        }
+
+        private List<EntryImpl> keepEntries(List<EntryImpl> list, long startEntry, long endEntry) {
+            List<EntryImpl> result = new ArrayList<>((int) (endEntry - startEntry));
+            for (EntryImpl entry : list) {
+                long entryId = entry.getEntryId();
+                if (startEntry <= entryId && entryId <= endEntry) {
+                    result.add(entry);
+                } else {
+                    entry.release();
+                }
+            }
+            return result;
+        }
+
+        public void attach(CompletableFuture<List<EntryImpl>> handle) {
+            // when the future is done remove this from the map
+            // new reads will go to a new instance
+            // this is required because we are going to do refcount management
+            // on the results of the callback
+            handle.whenComplete((___, error) -> {
+                synchronized (PendingRead.this) {
+                    completed = true;
+                    synchronized (ledgerCache) {
+                        ledgerCache.remove(key, this);
+                    }
+                }
+            });
+
+            handle.thenAcceptAsync(entriesToReturn -> {
+                synchronized (PendingRead.this) {
+                    if (callbacks.size() == 1) {
+                        ReadEntriesCallbackWithContext first = callbacks.get(0);
+                        if (first.startEntry == key.startEntry
+                                && first.endEntry == key.endEntry) {
+                            // perfect match, no copy, this is the most common case
+                            first.callback.readEntriesComplete((List) entriesToReturn,
+                                    first.ctx);
+                        } else {
+                            first.callback.readEntriesComplete(
+                                    (List) keepEntries(entriesToReturn, first.startEntry, first.endEntry),
+                                    first.ctx);
+                        }
+                    } else {
+                        for (ReadEntriesCallbackWithContext callback : callbacks) {
+                            long callbackStartEntry = callback.startEntry;
+                            long callbackEndEntry = callback.endEntry;
+                            List<EntryImpl> copy = new ArrayList<>((int) (callbackEndEntry - callbackStartEntry + 1));
+                            for (EntryImpl entry : entriesToReturn) {
+                                long entryId = entry.getEntryId();
+                                if (callbackStartEntry <= entryId && entryId <= callbackEndEntry) {
+                                    EntryImpl entryCopy = EntryImpl.create(entry);
+                                    copy.add(entryCopy);
+                                }
+                            }
+                            callback.callback.readEntriesComplete((List) copy, callback.ctx);
+                        }
+                        for (EntryImpl entry : entriesToReturn) {
+                            entry.release();
+                        }
+                    }
+                }
+            }, rangeEntryCache.getManagedLedger().getExecutor()
+                    .chooseThread(rangeEntryCache.getManagedLedger().getName())).exceptionally(exception -> {
+                synchronized (PendingRead.this) {
+                    for (ReadEntriesCallbackWithContext callback : callbacks) {
+                        ManagedLedgerException mlException = createManagedLedgerException(exception);
+                        callback.callback.readEntriesFailed(mlException, callback.ctx);
+                    }
+                }
+                return null;
+            });
+        }
+
+        synchronized boolean addListener(AsyncCallbacks.ReadEntriesCallback callback,
+                                         Object ctx, long startEntry, long endEntry) {
+            if (completed) {
+                return false;
+            }
+            callbacks.add(new ReadEntriesCallbackWithContext(callback, ctx, startEntry, endEntry));
+            return true;
+        }
+    }
+
+
+    void readEntries(ReadHandle lh, long firstEntry, long lastEntry, boolean shouldCacheEntry,
+                     final AsyncCallbacks.ReadEntriesCallback callback, Object ctx) {
+
+
+        final PendingReadKey key = new PendingReadKey(firstEntry, lastEntry);
+
+        Map<PendingReadKey, PendingRead> pendingReadsForLedger =
+                cachedPendingReads.computeIfAbsent(lh.getId(), (l) -> new ConcurrentHashMap<>());
+
+        boolean listenerAdded = false;
+        while (!listenerAdded) {
+            AtomicBoolean createdByThisThread = new AtomicBoolean();
+            FindPendingReadOutcome findBestCandidateOutcome = findPendingRead(key,
+                    pendingReadsForLedger, createdByThisThread);
+            PendingRead pendingRead = findBestCandidateOutcome.pendingRead;
+            if (findBestCandidateOutcome.needsAdditionalReads()) {
+                AsyncCallbacks.ReadEntriesCallback wrappedCallback = new AsyncCallbacks.ReadEntriesCallback() {
+                    @Override
+                    public void readEntriesComplete(List<Entry> entries, Object ctx) {
+                        PendingReadKey missingOnLeft = findBestCandidateOutcome.missingOnLeft;
+                        PendingReadKey missingOnRight = findBestCandidateOutcome.missingOnRight;
+                        if (missingOnRight != null && missingOnLeft != null) {
+                            AsyncCallbacks.ReadEntriesCallback readFromLeftCallback =
+                                    new AsyncCallbacks.ReadEntriesCallback() {
+                                @Override
+                                public void readEntriesComplete(List<Entry> entriesFromLeft, Object dummyCtx1) {
+                                    AsyncCallbacks.ReadEntriesCallback readFromRightCallback =
+                                            new AsyncCallbacks.ReadEntriesCallback() {
+                                        @Override
+                                        public void readEntriesComplete(List<Entry> entriesFromRight,
+                                                                        Object dummyCtx2) {
+                                            List<Entry> finalResult =
+                                                    new ArrayList<>(entriesFromLeft.size()
+                                                            + entries.size() + entriesFromRight.size());
+                                            finalResult.addAll(entriesFromLeft);
+                                            finalResult.addAll(entries);
+                                            finalResult.addAll(entriesFromRight);
+                                            callback.readEntriesComplete(finalResult, ctx);
+                                        }
+
+                                        @Override
+                                        public void readEntriesFailed(ManagedLedgerException exception,
+                                                                      Object dummyCtx3) {
+                                            callback.readEntriesFailed(exception, ctx);
+                                        }
+                                    };
+                                    rangeEntryCache.asyncReadEntry0(lh,
+                                            missingOnRight.startEntry, missingOnRight.endEntry,
+                                            shouldCacheEntry, readFromRightCallback, null);
+                                }
+
+                                @Override
+                                public void readEntriesFailed(ManagedLedgerException exception, Object dummyCtx4) {
+                                    callback.readEntriesFailed(exception, ctx);
+                                }
+                            };
+                            rangeEntryCache.asyncReadEntry0(lh, missingOnLeft.startEntry, missingOnLeft.endEntry,
+                                    shouldCacheEntry, readFromLeftCallback, null);
+                        } else if (missingOnLeft != null) {
+                            AsyncCallbacks.ReadEntriesCallback readFromLeftCallback =
+                                    new AsyncCallbacks.ReadEntriesCallback() {
+
+                                        @Override
+                                        public void readEntriesComplete(List<Entry> entriesFromLeft,
+                                                                        Object dummyCtx5) {
+                                            List<Entry> finalResult =
+                                                    new ArrayList<>(entriesFromLeft.size() + entries.size());
+                                            finalResult.addAll(entriesFromLeft);
+                                            finalResult.addAll(entries);
+                                            callback.readEntriesComplete(finalResult, ctx);
+                                        }
+
+                                        @Override
+                                        public void readEntriesFailed(ManagedLedgerException exception,
+                                                                      Object dummyCtx6) {
+                                            callback.readEntriesFailed(exception, ctx);
+                                        }
+                                    };
+                            rangeEntryCache.asyncReadEntry0(lh, missingOnLeft.startEntry, missingOnLeft.endEntry,
+                                    shouldCacheEntry, readFromLeftCallback, null);
+                        } else if (missingOnRight != null) {
+                            AsyncCallbacks.ReadEntriesCallback readFromRightCallback =
+                                    new AsyncCallbacks.ReadEntriesCallback() {
+
+                                        @Override
+                                        public void readEntriesComplete(List<Entry> entriesFromRight,
+                                                                        Object dummyCtx7) {
+                                            List<Entry> finalResult =
+                                                    new ArrayList<>(entriesFromRight.size() + entries.size());
+                                            finalResult.addAll(entries);
+                                            finalResult.addAll(entriesFromRight);
+                                            callback.readEntriesComplete(finalResult, ctx);
+                                        }
+
+                                        @Override
+                                        public void readEntriesFailed(ManagedLedgerException exception,
+                                                                      Object dummyCtx8) {
+                                            callback.readEntriesFailed(exception, ctx);
+                                        }
+                                    };
+                            rangeEntryCache.asyncReadEntry0(lh, missingOnRight.startEntry, missingOnRight.endEntry,
+                                    shouldCacheEntry, readFromRightCallback, null);
+                        }
+                    }
+
+                    @Override
+                    public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
+                        callback.readEntriesFailed(exception, ctx);
+                    }
+                };
+                listenerAdded = pendingRead.addListener(wrappedCallback, ctx, key.startEntry, key.endEntry);
+            } else {
+                listenerAdded = pendingRead.addListener(callback, ctx, key.startEntry, key.endEntry);
+            }
+
+
+            if (createdByThisThread.get()) {
+                CompletableFuture<List<EntryImpl>> readResult = rangeEntryCache.readFromStorage(lh, firstEntry,
+                        lastEntry, shouldCacheEntry);
+                pendingRead.attach(readResult);
+            }
+        }
+    }
+
+    void clear() {
+        cachedPendingReads.clear();
+    }
+
+    void invalidateLedger(long id) {
+        cachedPendingReads.remove(id);
+    }
+}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
index f8f5c328cd9..163728e0f04 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
@@ -21,12 +21,14 @@ package org.apache.bookkeeper.mledger.impl.cache;
 import static com.google.common.base.Preconditions.checkArgument;
 import static java.util.Objects.requireNonNull;
 import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.PooledByteBufAllocator;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import org.apache.bookkeeper.client.api.BKException;
 import org.apache.bookkeeper.client.api.LedgerEntry;
 import org.apache.bookkeeper.client.api.ReadHandle;
@@ -48,15 +50,18 @@ import org.slf4j.LoggerFactory;
 public class RangeEntryCacheImpl implements EntryCache {
 
     private final RangeEntryCacheManagerImpl manager;
-    private final ManagedLedgerImpl ml;
+    final ManagedLedgerImpl ml;
     private ManagedLedgerInterceptor interceptor;
     private final RangeCache<PositionImpl, EntryImpl> entries;
     private final boolean copyEntries;
+    private final PendingReadsManager pendingReadsManager;
+
 
     private static final double MB = 1024 * 1024;
 
     public RangeEntryCacheImpl(RangeEntryCacheManagerImpl manager, ManagedLedgerImpl ml, boolean copyEntries) {
         this.manager = manager;
+        this.pendingReadsManager = new PendingReadsManager(this);
         this.ml = ml;
         this.interceptor = ml.getManagedLedgerInterceptor();
         this.entries = new RangeCache<>(EntryImpl::getLength, EntryImpl::getTimestamp);
@@ -67,6 +72,11 @@ public class RangeEntryCacheImpl implements EntryCache {
         }
     }
 
+    @VisibleForTesting
+    ManagedLedgerImpl getManagedLedger() {
+        return ml;
+    }
+
     @Override
     public String getName() {
         return ml.getName();
@@ -185,6 +195,7 @@ public class RangeEntryCacheImpl implements EntryCache {
         }
 
         manager.entriesRemoved(sizeRemoved, entriesRemoved);
+        pendingReadsManager.invalidateLedger(ledgerId);
     }
 
     @Override
@@ -235,6 +246,7 @@ public class RangeEntryCacheImpl implements EntryCache {
                         }
                     }, ml.getExecutor().chooseThread(ml.getName())).exceptionally(exception -> {
                         ml.invalidateLedgerHandle(lh);
+                        pendingReadsManager.invalidateLedger(lh.getId());
                         callback.readEntryFailed(createManagedLedgerException(exception), ctx);
                         return null;
             });
@@ -257,7 +269,7 @@ public class RangeEntryCacheImpl implements EntryCache {
     }
 
     @SuppressWarnings({ "unchecked", "rawtypes" })
-    private void asyncReadEntry0(ReadHandle lh, long firstEntry, long lastEntry, boolean shouldCacheEntry,
+    void asyncReadEntry0(ReadHandle lh, long firstEntry, long lastEntry, boolean shouldCacheEntry,
             final ReadEntriesCallback callback, Object ctx) {
         final long ledgerId = lh.getId();
         final int entriesToRead = (int) (lastEntry - firstEntry) + 1;
@@ -295,51 +307,71 @@ public class RangeEntryCacheImpl implements EntryCache {
             }
 
             // Read all the entries from bookkeeper
-            lh.readAsync(firstEntry, lastEntry).thenAcceptAsync(
-                    ledgerEntries -> {
-                        requireNonNull(ml.getName());
-                        requireNonNull(ml.getExecutor());
+            pendingReadsManager.readEntries(lh, firstEntry, lastEntry,
+                    shouldCacheEntry, callback, ctx);
 
-                        try {
-                            // We got the entries, we need to transform them to a List<> type
-                            long totalSize = 0;
-                            final List<EntryImpl> entriesToReturn = Lists.newArrayListWithExpectedSize(entriesToRead);
-                            for (LedgerEntry e : ledgerEntries) {
-                                EntryImpl entry = RangeEntryCacheManagerImpl.create(e, interceptor);
-                                entriesToReturn.add(entry);
-                                totalSize += entry.getLength();
-                                if (shouldCacheEntry) {
-                                    EntryImpl cacheEntry = EntryImpl.create(entry);
-                                    insert(cacheEntry);
-                                    cacheEntry.release();
+        }
+    }
+
+    /**
+     * Reads the entries from Storage.
+     * @param lh the handle
+     * @param firstEntry the first entry
+     * @param lastEntry the last entry
+     * @param shouldCacheEntry if we should put the entry into the cache
+     * @return a handle to the operation
+     */
+    CompletableFuture<List<EntryImpl>> readFromStorage(ReadHandle lh,
+                                                       long firstEntry, long lastEntry, boolean shouldCacheEntry) {
+        final int entriesToRead = (int) (lastEntry - firstEntry) + 1;
+        CompletableFuture<List<EntryImpl>> readResult = lh.readAsync(firstEntry, lastEntry)
+                .thenApply(
+                        ledgerEntries -> {
+                            requireNonNull(ml.getName());
+                            requireNonNull(ml.getExecutor());
+
+                            try {
+                                // We got the entries, we need to transform them to a List<> type
+                                long totalSize = 0;
+                                final List<EntryImpl> entriesToReturn =
+                                        Lists.newArrayListWithExpectedSize(entriesToRead);
+                                for (LedgerEntry e : ledgerEntries) {
+                                    EntryImpl entry = RangeEntryCacheManagerImpl.create(e, interceptor);
+                                    entriesToReturn.add(entry);
+                                    totalSize += entry.getLength();
+                                    if (shouldCacheEntry) {
+                                        EntryImpl cacheEntry = EntryImpl.create(entry);
+                                        insert(cacheEntry);
+                                        cacheEntry.release();
+                                    }
                                 }
-                            }
 
-                            manager.mlFactoryMBean.recordCacheMiss(entriesToReturn.size(), totalSize);
-                            ml.getMbean().addReadEntriesSample(entriesToReturn.size(), totalSize);
+                                manager.mlFactoryMBean.recordCacheMiss(entriesToReturn.size(), totalSize);
+                                ml.getMbean().addReadEntriesSample(entriesToReturn.size(), totalSize);
 
-                            callback.readEntriesComplete((List) entriesToReturn, ctx);
-                        } finally {
-                            ledgerEntries.close();
-                        }
-                    }, ml.getExecutor().chooseThread(ml.getName())).exceptionally(exception -> {
-                        if (exception instanceof BKException
-                                && ((BKException) exception).getCode() == BKException.Code.TooManyRequestsException) {
-                            callback.readEntriesFailed(createManagedLedgerException(exception), ctx);
-                        } else {
-                            ml.invalidateLedgerHandle(lh);
-                            ManagedLedgerException mlException = createManagedLedgerException(exception);
-                            callback.readEntriesFailed(mlException, ctx);
-                        }
-                        return null;
-            });
-        }
+                                return entriesToReturn;
+                            } finally {
+                                ledgerEntries.close();
+                            }
+                        });
+        // handle LH invalidation
+        readResult.exceptionally(exception -> {
+            if (exception instanceof BKException
+                    && ((BKException) exception).getCode() == BKException.Code.TooManyRequestsException) {
+            } else {
+                ml.invalidateLedgerHandle(lh);
+                pendingReadsManager.invalidateLedger(lh.getId());
+            }
+            return null;
+        });
+        return readResult;
     }
 
     @Override
     public void clear() {
         Pair<Integer, Long> removedPair = entries.clear();
         manager.entriesRemoved(removedPair.getRight(), removedPair.getLeft());
+        pendingReadsManager.clear();
     }
 
     @Override
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManagerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManagerTest.java
new file mode 100644
index 00000000000..f6d3ac88156
--- /dev/null
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManagerTest.java
@@ -0,0 +1,462 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl.cache;
+
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.EntryImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.AssertJUnit.assertNotSame;
+import static org.testng.AssertJUnit.assertSame;
+
+@Slf4j
+public class PendingReadsManagerTest  {
+
+    static final Object CTX = "foo";
+    static final Object CTX2 = "far";
+    static final long ledgerId = 123414L;
+    OrderedExecutor orderedExecutor;
+
+    PendingReadsManagerTest() {
+    }
+
+    @BeforeClass(alwaysRun = true)
+    void before() {
+        orderedExecutor = OrderedExecutor.newBuilder().build();
+    }
+
+    @AfterClass(alwaysRun = true)
+    void after() {
+        if (orderedExecutor != null) {
+            orderedExecutor.shutdown();
+            orderedExecutor = null;
+        }
+    }
+
+
+    RangeEntryCacheImpl rangeEntryCache;
+    PendingReadsManager pendingReadsManager;
+    ReadHandle lh;
+    ManagedLedgerImpl ml;
+
+    @BeforeMethod(alwaysRun = true)
+    void setupMocks() {
+        rangeEntryCache = mock(RangeEntryCacheImpl.class);
+        pendingReadsManager = new PendingReadsManager(rangeEntryCache);
+        doAnswer(new Answer() {
+            @Override
+            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+                log.info("rangeEntryCache asyncReadEntry0 {}", invocationOnMock);
+                ReadHandle rh = invocationOnMock.getArgument(0);
+                long startEntry = invocationOnMock.getArgument(1);
+                long endEntry = invocationOnMock.getArgument(2);
+                boolean shouldCacheEntry = invocationOnMock.getArgument(3);
+                AsyncCallbacks.ReadEntriesCallback callback = invocationOnMock.getArgument(4);
+                Object ctx = invocationOnMock.getArgument(5);
+                pendingReadsManager.readEntries(lh, startEntry, endEntry, shouldCacheEntry, callback, ctx);
+                return null;
+            }
+        }).when(rangeEntryCache).asyncReadEntry0(any(), anyLong(), anyLong(),
+                anyBoolean(), any(), any());
+
+        lh = mock(ReadHandle.class);
+        ml = mock(ManagedLedgerImpl.class);
+        when(ml.getExecutor()).thenReturn(orderedExecutor);
+        when(rangeEntryCache.getManagedLedger()).thenReturn(ml);
+    }
+
+
+    @Data
+    private static class CapturingReadEntriesCallback extends CompletableFuture<Void>
+            implements AsyncCallbacks.ReadEntriesCallback  {
+        List<Position> entries;
+        Object ctx;
+        Throwable error;
+
+        @Override
+        public synchronized void readEntriesComplete(List<Entry> entries, Object ctx) {
+            this.entries = entries.stream().map(Entry::getPosition).collect(Collectors.toList());
+            this.ctx = ctx;
+            this.error = null;
+            this.complete(null);
+        }
+
+        @Override
+        public synchronized void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
+            this.entries = null;
+            this.ctx = ctx;
+            this.error = exception;
+            this.completeExceptionally(exception);
+        }
+
+    }
+
+    private static List<EntryImpl> buildList(long start, long end) {
+        List<EntryImpl> result = new ArrayList<>();
+        for (long i = start; i <= end; i++) {
+            long entryId = i;
+            EntryImpl entry = EntryImpl.create(ledgerId, entryId, "data".getBytes(StandardCharsets.UTF_8));
+            result.add(entry);
+        }
+        return result;
+    }
+
+
+    private void verifyRange(List<Position> entries, long firstEntry, long endEntry) {
+        int pos = 0;
+        log.info("verifyRange numEntries {}", entries.size());
+        for (long entry = firstEntry; entry <= endEntry; entry++) {
+            assertEquals(entries.get(pos++).getEntryId(), entry);
+        }
+    }
+
+    private static class PreparedReadFromStorage extends CompletableFuture<List<EntryImpl>> {
+        final long firstEntry;
+        final long endEntry;
+        final boolean shouldCacheEntry;
+
+        public PreparedReadFromStorage(long firstEntry, long endEntry, boolean shouldCacheEntry) {
+            this.firstEntry = firstEntry;
+            this.endEntry = endEntry;
+            this.shouldCacheEntry = shouldCacheEntry;
+        }
+
+        @Override
+        public String toString() {
+            return "PreparedReadFromStorage("+firstEntry+","+endEntry+","+shouldCacheEntry+")";
+        }
+
+        public void storageReadCompleted() {
+            this.complete(buildList(firstEntry, endEntry));
+        }
+    }
+
+    private PreparedReadFromStorage prepareReadFromStorage(ReadHandle lh, RangeEntryCacheImpl rangeEntryCache,
+                                                                      long firstEntry, long endEntry, boolean shouldCacheEntry) {
+        PreparedReadFromStorage read = new PreparedReadFromStorage(firstEntry, endEntry, shouldCacheEntry);
+        log.info("prepareReadFromStorage from {} to {} shouldCacheEntry {}", firstEntry, endEntry, shouldCacheEntry);
+        when(rangeEntryCache.readFromStorage(eq(lh), eq(firstEntry), eq(endEntry), eq(shouldCacheEntry))).thenAnswer(
+                (invocationOnMock -> {
+                    log.info("readFromStorage from {} to {} shouldCacheEntry {}", firstEntry, endEntry, shouldCacheEntry);
+                    return read;
+                })
+        );
+        return read;
+    }
+
+    @Test
+    public void simpleRead() throws Exception {
+
+        long firstEntry = 100;
+        long endEntry = 199;
+        boolean shouldCacheEntry = false;
+
+        PreparedReadFromStorage read1
+                = prepareReadFromStorage(lh, rangeEntryCache, firstEntry, endEntry, shouldCacheEntry);
+
+        CapturingReadEntriesCallback callback = new CapturingReadEntriesCallback();
+        pendingReadsManager.readEntries(lh, firstEntry, endEntry, shouldCacheEntry, callback, CTX);
+
+        // complete the read
+        read1.storageReadCompleted();
+
+        // wait for the callback to complete
+        callback.get();
+        assertSame(callback.getCtx(), CTX);
+
+        // verify
+        verifyRange(callback.entries, firstEntry, endEntry);
+    }
+
+
+    @Test
+    public void simpleConcurrentReadPerfectMatch() throws Exception {
+
+        long firstEntry = 100;
+        long endEntry = 199;
+        boolean shouldCacheEntry = false;
+
+        PreparedReadFromStorage read1 = prepareReadFromStorage(lh, rangeEntryCache, firstEntry, endEntry, shouldCacheEntry);
+
+        PendingReadsManager pendingReadsManager = new PendingReadsManager(rangeEntryCache);
+        CapturingReadEntriesCallback callback = new CapturingReadEntriesCallback();
+        pendingReadsManager.readEntries(lh, firstEntry, endEntry, shouldCacheEntry, callback, CTX);
+
+        CapturingReadEntriesCallback callback2 = new CapturingReadEntriesCallback();
+        pendingReadsManager.readEntries(lh, firstEntry, endEntry, shouldCacheEntry, callback2, CTX2);
+
+        // complete the read from BK
+        // only one read completes 2 callbacks
+        read1.storageReadCompleted();
+
+        callback.get();
+        callback2.get();
+
+        assertSame(callback.getCtx(), CTX);
+        assertSame(callback2.getCtx(), CTX2);
+
+        verifyRange(callback.entries, firstEntry, endEntry);
+        verifyRange(callback2.entries, firstEntry, endEntry);
+
+        int pos = 0;
+        for (long entry = firstEntry; entry <= endEntry; entry++) {;
+            assertNotSame(callback.entries.get(pos), callback2.entries.get(pos));
+            assertEquals(callback.entries.get(pos).getEntryId(), callback2.entries.get(pos).getEntryId());
+            pos++;
+        }
+
+    }
+
+    @Test
+    public void simpleConcurrentReadIncluding() throws Exception {
+
+        long firstEntry = 100;
+        long endEntry = 199;
+
+        long firstEntrySecondRead = firstEntry + 10;
+        long endEntrySecondRead = endEntry - 10;
+
+        boolean shouldCacheEntry = false;
+
+        PreparedReadFromStorage read1 = prepareReadFromStorage(lh, rangeEntryCache, firstEntry, endEntry, shouldCacheEntry);
+
+        PendingReadsManager pendingReadsManager = new PendingReadsManager(rangeEntryCache);
+        CapturingReadEntriesCallback callback = new CapturingReadEntriesCallback();
+        pendingReadsManager.readEntries(lh, firstEntry, endEntry, shouldCacheEntry, callback, CTX);
+
+
+        CapturingReadEntriesCallback callback2 = new CapturingReadEntriesCallback();
+        pendingReadsManager.readEntries(lh, firstEntrySecondRead, endEntrySecondRead, shouldCacheEntry, callback2, CTX2);
+
+        // complete the read from BK
+        // only one read completes 2 callbacks
+        read1.storageReadCompleted();
+
+        callback.get();
+        callback2.get();
+
+        assertSame(callback.getCtx(), CTX);
+        assertSame(callback2.getCtx(), CTX2);
+
+        verifyRange(callback.entries, firstEntry, endEntry);
+        verifyRange(callback2.entries, firstEntrySecondRead, endEntrySecondRead);
+
+        int pos = 0;
+        for (long entry = firstEntry; entry <= endEntry; entry++) {;
+            if (entry >= firstEntrySecondRead && entry <= endEntrySecondRead) {
+                int posInSecondList = (int) (pos - (firstEntrySecondRead - firstEntry));
+                assertNotSame(callback.entries.get(pos), callback2.entries.get(posInSecondList));
+                assertEquals(callback.entries.get(pos).getEntryId(), callback2.entries.get(posInSecondList).getEntryId());
+            }
+            pos++;
+        }
+
+    }
+
+    @Test
+    public void simpleConcurrentReadMissingLeft() throws Exception {
+
+        long firstEntry = 100;
+        long endEntry = 199;
+
+        long firstEntrySecondRead = firstEntry - 10;
+        long endEntrySecondRead = endEntry;
+
+        boolean shouldCacheEntry = false;
+
+        PreparedReadFromStorage read1 =
+                prepareReadFromStorage(lh, rangeEntryCache, firstEntry, endEntry, shouldCacheEntry);
+
+        PreparedReadFromStorage readForLeft =
+                prepareReadFromStorage(lh, rangeEntryCache, firstEntrySecondRead, firstEntry - 1, shouldCacheEntry);
+
+        PendingReadsManager pendingReadsManager = new PendingReadsManager(rangeEntryCache);
+        CapturingReadEntriesCallback callback = new CapturingReadEntriesCallback();
+        pendingReadsManager.readEntries(lh, firstEntry, endEntry, shouldCacheEntry, callback, CTX);
+
+        CapturingReadEntriesCallback callback2 = new CapturingReadEntriesCallback();
+        pendingReadsManager.readEntries(lh, firstEntrySecondRead, endEntrySecondRead, shouldCacheEntry, callback2, CTX2);
+
+        // complete the read from BK
+        read1.storageReadCompleted();
+        // the first read can move forward
+        callback.get();
+
+        readForLeft.storageReadCompleted();
+        callback2.get();
+
+        assertSame(callback.getCtx(), CTX);
+        assertSame(callback2.getCtx(), CTX2);
+
+        verifyRange(callback.entries, firstEntry, endEntry);
+        verifyRange(callback2.entries, firstEntrySecondRead, endEntrySecondRead);
+
+    }
+
+    @Test
+    public void simpleConcurrentReadMissingRight() throws Exception {
+
+        long firstEntry = 100;
+        long endEntry = 199;
+
+        long firstEntrySecondRead = firstEntry;
+        long endEntrySecondRead = endEntry + 10;
+
+        boolean shouldCacheEntry = false;
+
+        PreparedReadFromStorage read1 =
+                prepareReadFromStorage(lh, rangeEntryCache, firstEntry, endEntry, shouldCacheEntry);
+
+        PreparedReadFromStorage readForRight =
+                prepareReadFromStorage(lh, rangeEntryCache, endEntry + 1, endEntrySecondRead, shouldCacheEntry);
+
+        PendingReadsManager pendingReadsManager = new PendingReadsManager(rangeEntryCache);
+        CapturingReadEntriesCallback callback = new CapturingReadEntriesCallback();
+        pendingReadsManager.readEntries(lh, firstEntry, endEntry, shouldCacheEntry, callback, CTX);
+
+        CapturingReadEntriesCallback callback2 = new CapturingReadEntriesCallback();
+        pendingReadsManager.readEntries(lh, firstEntrySecondRead, endEntrySecondRead, shouldCacheEntry, callback2, CTX2);
+
+        // complete the read from BK
+        read1.storageReadCompleted();
+        // the first read can move forward
+        callback.get();
+
+        readForRight.storageReadCompleted();
+        callback2.get();
+
+        assertSame(callback.getCtx(), CTX);
+        assertSame(callback2.getCtx(), CTX2);
+
+        verifyRange(callback.entries, firstEntry, endEntry);
+        verifyRange(callback2.entries, firstEntrySecondRead, endEntrySecondRead);
+
+    }
+
+    @Test
+    public void simpleConcurrentReadMissingBoth() throws Exception {
+
+        long firstEntry = 100;
+        long endEntry = 199;
+
+        long firstEntrySecondRead = firstEntry - 10;
+        long endEntrySecondRead = endEntry + 10;
+
+        boolean shouldCacheEntry = false;
+
+        PreparedReadFromStorage read1 =
+                prepareReadFromStorage(lh, rangeEntryCache, firstEntry, endEntry, shouldCacheEntry);
+
+        PreparedReadFromStorage readForLeft =
+                prepareReadFromStorage(lh, rangeEntryCache, firstEntrySecondRead, firstEntry - 1, shouldCacheEntry);
+
+        PreparedReadFromStorage readForRight =
+                prepareReadFromStorage(lh, rangeEntryCache, endEntry + 1, endEntrySecondRead, shouldCacheEntry);
+
+        PendingReadsManager pendingReadsManager = new PendingReadsManager(rangeEntryCache);
+        CapturingReadEntriesCallback callback = new CapturingReadEntriesCallback();
+        pendingReadsManager.readEntries(lh, firstEntry, endEntry, shouldCacheEntry, callback, CTX);
+
+        CapturingReadEntriesCallback callback2 = new CapturingReadEntriesCallback();
+        pendingReadsManager.readEntries(lh, firstEntrySecondRead, endEntrySecondRead, shouldCacheEntry, callback2, CTX2);
+
+        // complete the read from BK
+        read1.storageReadCompleted();
+        // the first read can move forward
+        callback.get();
+
+        readForLeft.storageReadCompleted();
+        readForRight.storageReadCompleted();
+        callback2.get();
+
+        assertSame(callback.getCtx(), CTX);
+        assertSame(callback2.getCtx(), CTX2);
+
+        verifyRange(callback.entries, firstEntry, endEntry);
+        verifyRange(callback2.entries, firstEntrySecondRead, endEntrySecondRead);
+
+    }
+
+
+    @Test
+    public void simpleConcurrentReadNoMatch() throws Exception {
+        long firstEntry = 100;
+        long endEntry = 199;
+
+        long firstEntrySecondRead = 1000;
+        long endEntrySecondRead = 1099;
+
+        boolean shouldCacheEntry = false;
+
+        PreparedReadFromStorage read1 =
+                prepareReadFromStorage(lh, rangeEntryCache, firstEntry, endEntry, shouldCacheEntry);
+
+        PreparedReadFromStorage read2 =
+                prepareReadFromStorage(lh, rangeEntryCache, firstEntrySecondRead, endEntrySecondRead, shouldCacheEntry);
+
+        PendingReadsManager pendingReadsManager = new PendingReadsManager(rangeEntryCache);
+        CapturingReadEntriesCallback callback = new CapturingReadEntriesCallback();
+        pendingReadsManager.readEntries(lh, firstEntry, endEntry, shouldCacheEntry, callback, CTX);
+
+        CapturingReadEntriesCallback callback2 = new CapturingReadEntriesCallback();
+        pendingReadsManager.readEntries(lh, firstEntrySecondRead, endEntrySecondRead, shouldCacheEntry, callback2, CTX2);
+
+        read1.storageReadCompleted();
+        callback.get();
+
+        read2.storageReadCompleted();
+        callback2.get();
+
+        assertSame(callback.getCtx(), CTX);
+        assertSame(callback2.getCtx(), CTX2);
+
+        verifyRange(callback.entries, firstEntry, endEntry);
+        verifyRange(callback2.entries, firstEntrySecondRead, endEntrySecondRead);
+
+    }
+
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
index 9ef28842dfe..1d0b1be3659 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
@@ -45,6 +45,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.cache.PendingReadsManager;
 import org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
@@ -1236,6 +1237,13 @@ public class MessageDispatchThrottlingTest extends ProducerConsumerBase {
         RangeEntryCacheImpl entryCache = spy((RangeEntryCacheImpl) cacheField.get(ledger));
         cacheField.set(ledger, entryCache);
 
+        Field pendingReadsManagerField = RangeEntryCacheImpl.class.getDeclaredField("pendingReadsManager");
+        pendingReadsManagerField.setAccessible(true);
+        PendingReadsManager pendingReadsManager = (PendingReadsManager) pendingReadsManagerField.get(entryCache);
+        Field cacheFieldInManager = PendingReadsManager.class.getDeclaredField("rangeEntryCache");
+        cacheFieldInManager.setAccessible(true);
+        cacheFieldInManager.set(pendingReadsManager, entryCache);
+
         // 2. Produce messages
         for (int i = 0; i < totalMessages; i++) {
             String message = "my-message-" + i;


[pulsar] 04/04: [bug][broker] fix memory leak in case of error conditions in PendingReadsManager (#17995)

Posted by te...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 1fc4ccf8240447d0b164e059400701365f2477e3
Author: Enrico Olivelli <eo...@apache.org>
AuthorDate: Wed Oct 12 10:58:05 2022 +0200

    [bug][broker] fix memory leak in case of error conditions in PendingReadsManager (#17995)
    
    Co-authored-by: Enrico Olivelli <il>
---
 .../apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java    | 5 +++++
 1 file changed, 5 insertions(+)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java
index 4c374d8ace6..30216871e76 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java
@@ -356,6 +356,8 @@ public class PendingReadsManager {
                                         @Override
                                         public void readEntriesFailed(ManagedLedgerException exception,
                                                                       Object dummyCtx3) {
+                                            entries.forEach(Entry::release);
+                                            entriesFromLeft.forEach(Entry::release);
                                             callback.readEntriesFailed(exception, ctx);
                                         }
                                     };
@@ -366,6 +368,7 @@ public class PendingReadsManager {
 
                                 @Override
                                 public void readEntriesFailed(ManagedLedgerException exception, Object dummyCtx4) {
+                                    entries.forEach(Entry::release);
                                     callback.readEntriesFailed(exception, ctx);
                                 }
                             };
@@ -388,6 +391,7 @@ public class PendingReadsManager {
                                         @Override
                                         public void readEntriesFailed(ManagedLedgerException exception,
                                                                       Object dummyCtx6) {
+                                            entries.forEach(Entry::release);
                                             callback.readEntriesFailed(exception, ctx);
                                         }
                                     };
@@ -410,6 +414,7 @@ public class PendingReadsManager {
                                         @Override
                                         public void readEntriesFailed(ManagedLedgerException exception,
                                                                       Object dummyCtx8) {
+                                            entries.forEach(Entry::release);
                                             callback.readEntriesFailed(exception, ctx);
                                         }
                                     };


[pulsar] 02/04: [enh][broker] Add metrics for entry cache insertion, eviction (#17248)

Posted by te...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit df33f59cb4f95bbbe1d93813db6be7d859684d2f
Author: Michael Marshall <mm...@apache.org>
AuthorDate: Wed Aug 24 14:00:45 2022 -0700

    [enh][broker] Add metrics for entry cache insertion, eviction (#17248)
    
    Fixes https://github.com/apache/pulsar/issues/16584
    
    ### Motivation
    
    With the `RangeCache`, it is hard to reason about its behavior other than cache hits/misses or the cache's size hitting the limit and triggering a size based eviction. This PR adds 3 new metrics to help provide additional insight into the cache's behavior. It adds `pulsar_ml_cache_inserted_entries_total`, `pulsar_ml_cache_evicted_entries_total`, and `pulsar_ml_cache_entries`.
    
    ### Modifications
    
    * Add new metrics for cache insertion, eviction, and current number of entries.
    * Add new methods to the `ManagedLedgerFactoryMXBean` interface.
    * Update several method return values in the `RangeCache`.
    * Update tests.
    
    ### Verifying this change
    
    This change is covered by modified tests that already existed.
    
    ### Does this pull request potentially affect one of the following parts:
    
    There is a breaking change to the `RangeCache` class for the `clear` and the `evictLEntriesBeforeTimestamp` methods. The previous result was a `long`, and now it is a `Pair<Integer, Long>`. The new result matches the same style as `evictLeastAccessedEntries`. Given that this class is only meant for use within the broker, I think it is reasonable to break these methods. I will send a note to the mailing list.
    
    ### Documentation
    
    - [x] `doc`
---
 .../mledger/ManagedLedgerFactoryMXBean.java        | 15 ++++++++++++
 .../impl/ManagedLedgerFactoryMBeanImpl.java        | 27 ++++++++++++++++++++++
 .../mledger/impl/cache/RangeEntryCacheImpl.java    | 14 +++++------
 .../impl/cache/RangeEntryCacheManagerImpl.java     |  4 +++-
 .../apache/bookkeeper/mledger/util/RangeCache.java | 12 ++++++----
 .../mledger/impl/EntryCacheManagerTest.java        | 18 +++++++++++++++
 .../bookkeeper/mledger/util/RangeCacheTest.java    |  5 ++--
 .../stats/metrics/ManagedLedgerCacheMetrics.java   |  3 +++
 site2/docs/reference-metrics.md                    |  3 +++
 9 files changed, 87 insertions(+), 14 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryMXBean.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryMXBean.java
index ea5b2074ffa..f71583ab886 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryMXBean.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryMXBean.java
@@ -66,4 +66,19 @@ public interface ManagedLedgerFactoryMXBean {
      * Get the number of cache evictions during the last minute.
      */
     long getNumberOfCacheEvictions();
+
+    /**
+     * Cumulative number of entries inserted into the cache.
+     */
+    long getCacheInsertedEntriesCount();
+
+    /**
+     * Cumulative number of entries evicted from the cache.
+     */
+    long getCacheEvictedEntriesCount();
+
+    /**
+     * Current number of entries in the cache.
+     */
+    long getCacheEntriesCount();
 }
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryMBeanImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryMBeanImpl.java
index d514d8381d9..a5f0c67e68a 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryMBeanImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryMBeanImpl.java
@@ -19,6 +19,7 @@
 package org.apache.bookkeeper.mledger.impl;
 
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAdder;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactoryMXBean;
 import org.apache.pulsar.common.stats.Rate;
 
@@ -31,6 +32,10 @@ public class ManagedLedgerFactoryMBeanImpl implements ManagedLedgerFactoryMXBean
     final Rate cacheMisses = new Rate();
     final Rate cacheEvictions = new Rate();
 
+    private final LongAdder insertedEntryCount = new LongAdder();
+    private final LongAdder evictedEntryCount = new LongAdder();
+    private final LongAdder cacheEntryCount = new LongAdder();
+
     public ManagedLedgerFactoryMBeanImpl(ManagedLedgerFactoryImpl factory) throws Exception {
         this.factory = factory;
     }
@@ -64,6 +69,16 @@ public class ManagedLedgerFactoryMBeanImpl implements ManagedLedgerFactoryMXBean
         cacheEvictions.recordEvent();
     }
 
+    public void recordCacheInsertion() {
+        insertedEntryCount.increment();
+        cacheEntryCount.increment();
+    }
+
+    public void recordNumberOfCacheEntriesEvicted(int count) {
+        evictedEntryCount.add(count);
+        cacheEntryCount.add(-count);
+    }
+
     @Override
     public int getNumberOfManagedLedgers() {
         return factory.ledgers.size();
@@ -104,4 +119,16 @@ public class ManagedLedgerFactoryMBeanImpl implements ManagedLedgerFactoryMXBean
         return cacheEvictions.getCount();
     }
 
+    public long getCacheInsertedEntriesCount() {
+        return insertedEntryCount.sum();
+    }
+
+    public long getCacheEvictedEntriesCount() {
+        return evictedEntryCount.sum();
+    }
+
+    public long getCacheEntriesCount() {
+        return cacheEntryCount.sum();
+    }
+
 }
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
index 0d29194a883..f8f5c328cd9 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
@@ -168,7 +168,7 @@ public class RangeEntryCacheImpl implements EntryCache {
                     lastPosition, entriesRemoved, sizeRemoved);
         }
 
-        manager.entriesRemoved(sizeRemoved);
+        manager.entriesRemoved(sizeRemoved, entriesRemoved);
     }
 
     @Override
@@ -184,7 +184,7 @@ public class RangeEntryCacheImpl implements EntryCache {
                     ml.getName(), ledgerId, entriesRemoved, sizeRemoved);
         }
 
-        manager.entriesRemoved(sizeRemoved);
+        manager.entriesRemoved(sizeRemoved, entriesRemoved);
     }
 
     @Override
@@ -338,8 +338,8 @@ public class RangeEntryCacheImpl implements EntryCache {
 
     @Override
     public void clear() {
-        long removedSize = entries.clear();
-        manager.entriesRemoved(removedSize);
+        Pair<Integer, Long> removedPair = entries.clear();
+        manager.entriesRemoved(removedPair.getRight(), removedPair.getLeft());
     }
 
     @Override
@@ -364,14 +364,14 @@ public class RangeEntryCacheImpl implements EntryCache {
                             + " -- Current Size: {} Mb",
                     ml.getName(), sizeToFree / MB, evictedEntries, evictedSize / MB, entries.getSize() / MB);
         }
-        manager.entriesRemoved(evictedSize);
+        manager.entriesRemoved(evictedSize, evictedEntries);
         return evicted;
     }
 
     @Override
     public void invalidateEntriesBeforeTimestamp(long timestamp) {
-        long evictedSize = entries.evictLEntriesBeforeTimestamp(timestamp);
-        manager.entriesRemoved(evictedSize);
+        Pair<Integer, Long> evictedPair = entries.evictLEntriesBeforeTimestamp(timestamp);
+        manager.entriesRemoved(evictedPair.getRight(), evictedPair.getLeft());
     }
 
     private static final Logger log = LoggerFactory.getLogger(RangeEntryCacheImpl.class);
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheManagerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheManagerImpl.java
index e8a463c46d1..657740c39ec 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheManagerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheManagerImpl.java
@@ -141,10 +141,12 @@ public class RangeEntryCacheManagerImpl implements EntryCacheManager {
     }
 
     void entryAdded(long size) {
+        mlFactoryMBean.recordCacheInsertion();
         currentSize.addAndGet(size);
     }
 
-    void entriesRemoved(long size) {
+    void entriesRemoved(long size, int count) {
+        mlFactoryMBean.recordNumberOfCacheEntriesEvicted(count);
         currentSize.addAndGet(-size);
     }
 
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java
index c9181e02f2c..1b82aa1318e 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java
@@ -187,8 +187,9 @@ public class RangeCache<Key extends Comparable<Key>, Value extends ReferenceCoun
     * @param maxTimestamp the max timestamp of the entries to be evicted
     * @return the tota
     */
-   public long evictLEntriesBeforeTimestamp(long maxTimestamp) {
+   public Pair<Integer, Long> evictLEntriesBeforeTimestamp(long maxTimestamp) {
        long removedSize = 0;
+       int removedCount = 0;
 
        while (true) {
            Map.Entry<Key, Value> entry = entries.firstEntry();
@@ -203,11 +204,12 @@ public class RangeCache<Key extends Comparable<Key>, Value extends ReferenceCoun
 
            Value value = entry.getValue();
            removedSize += weighter.getSize(value);
+           removedCount++;
            value.release();
        }
 
        size.addAndGet(-removedSize);
-       return removedSize;
+       return Pair.of(removedCount, removedSize);
    }
 
     /**
@@ -226,8 +228,9 @@ public class RangeCache<Key extends Comparable<Key>, Value extends ReferenceCoun
      *
      * @return size of removed entries
      */
-    public synchronized long clear() {
+    public synchronized Pair<Integer, Long> clear() {
         long removedSize = 0;
+        int removedCount = 0;
 
         while (true) {
             Map.Entry<Key, Value> entry = entries.pollFirstEntry();
@@ -236,12 +239,13 @@ public class RangeCache<Key extends Comparable<Key>, Value extends ReferenceCoun
             }
             Value value = entry.getValue();
             removedSize += weighter.getSize(value);
+            removedCount++;
             value.release();
         }
 
         entries.clear();
         size.getAndAdd(-removedSize);
-        return removedSize;
+        return Pair.of(removedCount, removedSize);
     }
 
     /**
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java
index a1dcb4ea02f..5b34dc3eb59 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java
@@ -94,6 +94,9 @@ public class EntryCacheManagerTest extends MockedBookKeeperTestCase {
         assertEquals(factory2.getMbean().getCacheMissesRate(), 0.0);
         assertEquals(factory2.getMbean().getCacheHitsThroughput(), 0.0);
         assertEquals(factory2.getMbean().getNumberOfCacheEvictions(), 0);
+        assertEquals(factory2.getMbean().getCacheInsertedEntriesCount(), 2);
+        assertEquals(factory2.getMbean().getCacheEvictedEntriesCount(), 0);
+        assertEquals(factory2.getMbean().getCacheEntriesCount(), 2);
 
         cache2.insert(EntryImpl.create(2, 0, new byte[1]));
         cache2.insert(EntryImpl.create(2, 1, new byte[1]));
@@ -129,6 +132,9 @@ public class EntryCacheManagerTest extends MockedBookKeeperTestCase {
         assertEquals(factory2.getMbean().getCacheMissesRate(), 0.0);
         assertEquals(factory2.getMbean().getCacheHitsThroughput(), 0.0);
         assertEquals(factory2.getMbean().getNumberOfCacheEvictions(), 1);
+        assertEquals(factory2.getMbean().getCacheInsertedEntriesCount(), 5);
+        assertEquals(factory2.getMbean().getCacheEntriesCount(), 2);
+        assertEquals(factory2.getMbean().getCacheEvictedEntriesCount(), 3);
     }
 
     @Test
@@ -153,6 +159,9 @@ public class EntryCacheManagerTest extends MockedBookKeeperTestCase {
 
         assertEquals(cache1.getSize(), 7);
         assertEquals(cacheManager.getSize(), 7);
+        assertEquals(factory2.getMbean().getCacheInsertedEntriesCount(), 2);
+        assertEquals(factory2.getMbean().getCacheEntriesCount(), 2);
+        assertEquals(factory2.getMbean().getCacheEvictedEntriesCount(), 0);
     }
 
     @Test
@@ -185,6 +194,9 @@ public class EntryCacheManagerTest extends MockedBookKeeperTestCase {
 
         cacheManager.removeEntryCache(ml1.getName());
         assertTrue(cacheManager.getSize() > 0);
+        assertEquals(factory2.getMbean().getCacheInsertedEntriesCount(), 20);
+        assertEquals(factory2.getMbean().getCacheEntriesCount(), 0);
+        assertEquals(factory2.getMbean().getCacheEvictedEntriesCount(), 20);
     }
 
 
@@ -217,6 +229,9 @@ public class EntryCacheManagerTest extends MockedBookKeeperTestCase {
         assertEquals(factory2.getMbean().getCacheMissesRate(), 0.0);
         assertEquals(factory2.getMbean().getCacheHitsThroughput(), 0.0);
         assertEquals(factory2.getMbean().getNumberOfCacheEvictions(), 0);
+        assertEquals(factory2.getMbean().getCacheInsertedEntriesCount(), 0);
+        assertEquals(factory2.getMbean().getCacheEntriesCount(), 0);
+        assertEquals(factory2.getMbean().getCacheEvictedEntriesCount(), 0);
 
         cache2.insert(EntryImpl.create(2, 0, new byte[1]));
         cache2.insert(EntryImpl.create(2, 1, new byte[1]));
@@ -253,6 +268,9 @@ public class EntryCacheManagerTest extends MockedBookKeeperTestCase {
         assertEquals(factory2.getMbean().getCacheMissesRate(), 0.0);
         assertEquals(factory2.getMbean().getCacheHitsThroughput(), 0.0);
         assertEquals(factory2.getMbean().getNumberOfCacheEvictions(), 0);
+        assertEquals(factory2.getMbean().getCacheInsertedEntriesCount(), 0);
+        assertEquals(factory2.getMbean().getCacheEntriesCount(), 0);
+        assertEquals(factory2.getMbean().getCacheEvictedEntriesCount(), 0);
     }
 
     @Test
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/RangeCacheTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/RangeCacheTest.java
index f31aa4a74f9..341a4928cc7 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/RangeCacheTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/RangeCacheTest.java
@@ -139,8 +139,9 @@ public class RangeCacheTest {
         assertEquals(cache.getSize(), 10);
         assertEquals(cache.getNumberOfEntries(), 4);
 
-        long evictedSize = cache.evictLEntriesBeforeTimestamp(3);
-        assertEquals(evictedSize, 6);
+        Pair<Integer, Long> evictedSize = cache.evictLEntriesBeforeTimestamp(3);
+        assertEquals(evictedSize.getRight().longValue(), 6);
+        assertEquals(evictedSize.getLeft().longValue(), 3);
 
         assertEquals(cache.getSize(), 4);
         assertEquals(cache.getNumberOfEntries(), 1);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerCacheMetrics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerCacheMetrics.java
index 8e46b4cf254..1f4181f887f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerCacheMetrics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerCacheMetrics.java
@@ -48,6 +48,9 @@ public class ManagedLedgerCacheMetrics extends AbstractMetrics {
 
         m.put("brk_ml_count", mlCacheStats.getNumberOfManagedLedgers());
         m.put("brk_ml_cache_used_size", mlCacheStats.getCacheUsedSize());
+        m.put("brk_ml_cache_inserted_entries_total", mlCacheStats.getCacheInsertedEntriesCount());
+        m.put("brk_ml_cache_evicted_entries_total", mlCacheStats.getCacheEvictedEntriesCount());
+        m.put("brk_ml_cache_entries", mlCacheStats.getCacheEntriesCount());
         m.put("brk_ml_cache_evictions", mlCacheStats.getNumberOfCacheEvictions());
         m.put("brk_ml_cache_hits_rate", mlCacheStats.getCacheHitsRate());
         m.put("brk_ml_cache_misses_rate", mlCacheStats.getCacheMissesRate());
diff --git a/site2/docs/reference-metrics.md b/site2/docs/reference-metrics.md
index 91df1b88f3a..20567826244 100644
--- a/site2/docs/reference-metrics.md
+++ b/site2/docs/reference-metrics.md
@@ -159,6 +159,9 @@ All the broker metrics are labelled with the following labels:
 | Name | Type   | Description                                          |
 |---|---|---|
 | pulsar_ml_cache_evictions | Gauge  | The number of cache evictions during the last minute. |
+| pulsar_ml_cache_inserted_entries_total | Counter | The number of entries inserted into the entry cache. |
+| pulsar_ml_cache_evicted_entries_total | Counter | The number of entries evicted from the entry cache. |
+| pulsar_ml_cache_entries | Gauge | The number of entries in the entry cache. |
 | pulsar_ml_cache_hits_rate | Gauge  | The number of cache hits per second on the broker side. |
 | pulsar_ml_cache_hits_throughput | Gauge  | The amount of data (byte per second) retrieved from the cache on the broker side. |
 | pulsar_ml_cache_misses_rate | Gauge  | The number of cache missed per second on the broker side. |