You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2024/01/17 09:41:17 UTC

(pulsar) branch master updated: [feat][broker] Implementation of PIP-323: Complete Backlog Quota Telemetry (#21816)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 59782b3c1f6 [feat][broker] Implementation of PIP-323: Complete Backlog Quota Telemetry (#21816)
59782b3c1f6 is described below

commit 59782b3c1f63baa68b2f5f33a33f1b00212159d7
Author: Asaf Mesika <as...@gmail.com>
AuthorDate: Wed Jan 17 11:41:08 2024 +0200

    [feat][broker] Implementation of PIP-323: Complete Backlog Quota Telemetry (#21816)
---
 .../bookkeeper/mledger/ManagedLedgerMXBean.java    |   5 +
 .../mledger/impl/ManagedCursorContainer.java       | 114 +++++-
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |   3 +-
 .../mledger/impl/ManagedLedgerMBeanImpl.java       |  12 +-
 .../mledger/impl/ManagedCursorContainerTest.java   | 123 ++++--
 pom.xml                                            |   8 +
 pulsar-broker/pom.xml                              |   8 +-
 .../pulsar/broker/service/BacklogQuotaManager.java |  65 ++--
 .../pulsar/broker/service/BrokerService.java       |  22 +-
 .../org/apache/pulsar/broker/service/Topic.java    |  13 +-
 .../service/nonpersistent/NonPersistentTopic.java  |   7 +-
 .../broker/service/persistent/PersistentTopic.java | 235 ++++++++++--
 .../service/persistent/PersistentTopicMetrics.java |  50 +++
 .../stats/prometheus/AggregatedBrokerStats.java    |   8 +
 .../stats/prometheus/AggregatedNamespaceStats.java |  11 +
 .../stats/prometheus/NamespaceStatsAggregator.java |  61 ++-
 .../pulsar/broker/stats/prometheus/TopicStats.java |  32 ++
 .../stats/prometheus/metrics/PrometheusLabels.java |  32 ++
 .../broker/service/BacklogQuotaManagerTest.java    | 420 ++++++++++++++++++---
 .../pulsar/broker/service/ServerCnxTest.java       |   5 +-
 .../broker/service/SubscriptionSeekTest.java       |  24 ++
 .../persistent/BucketDelayedDeliveryTest.java      |  29 +-
 .../service/persistent/PersistentTopicTest.java    |   9 +-
 .../broker/service/schema/SchemaServiceTest.java   |  23 +-
 .../pulsar/broker/stats/ConsumerStatsTest.java     |   8 +-
 .../broker/stats/MetadataStoreStatsTest.java       |  30 +-
 .../pulsar/broker/stats/PrometheusMetricsTest.java |  61 +--
 .../pulsar/broker/stats/SubscriptionStatsTest.java |  14 +-
 .../broker/stats/TransactionMetricsTest.java       |  25 +-
 .../prometheus/NamespaceStatsAggregatorTest.java   |   3 +
 .../stats/prometheus/PrometheusMetricsClient.java  | 154 ++++++++
 .../buffer/TransactionBufferClientTest.java        |  38 +-
 .../pendingack/PendingAckPersistentTest.java       |  21 +-
 .../apache/pulsar/broker/web/WebServiceTest.java   |  25 +-
 pulsar-broker/src/test/resources/log4j2.xml        |  39 ++
 .../pulsar/common/policies/data/TopicStats.java    |  25 ++
 .../org/apache/pulsar/client/api/Consumer.java     |   3 +
 .../common/policies/data/stats/TopicStatsImpl.java |  35 ++
 38 files changed, 1455 insertions(+), 345 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerMXBean.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerMXBean.java
index 50a3ffb1579..cb6d3700afe 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerMXBean.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerMXBean.java
@@ -90,6 +90,11 @@ public interface ManagedLedgerMXBean {
      */
     long getAddEntryErrors();
 
+    /**
+     * @return the number of entries read from the managed ledger (from cache or BK)
+     */
+    long getEntriesReadTotalCount();
+
     /**
      * @return the number of readEntries requests that succeeded
      */
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java
index 58c83961d61..92f3d892b53 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java
@@ -25,25 +25,46 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.locks.StampedLock;
+import lombok.Value;
+import lombok.experimental.UtilityClass;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.commons.lang3.tuple.Pair;
 
 /**
  * Contains cursors for a ManagedLedger.
- *
- * <p/>The goal is to always know the slowest consumer and hence decide which is the oldest ledger we need to keep.
- *
- * <p/>This data structure maintains a heap and a map of cursors. The map is used to relate a cursor name with
+ * <p>
+ * The goal is to always know the slowest consumer and hence decide which is the oldest ledger we need to keep.
+ * <p>
+ * This data structure maintains a heap and a map of cursors. The map is used to relate a cursor name with
  * an entry index in the heap. The heap data structure sorts cursors in a binary tree which is represented
  * in a single array. More details about heap implementations:
- * https://en.wikipedia.org/wiki/Heap_(data_structure)#Implementation
- *
- * <p/>The heap is updated and kept sorted when a cursor is updated.
+ * <a href="https://en.wikipedia.org/wiki/Heap_(data_structure)#Implementation">here</a>
+ * <p>
+ * The heap is updated and kept sorted when a cursor is updated.
  *
  */
 public class ManagedCursorContainer implements Iterable<ManagedCursor> {
 
+    /**
+     * This field is incremented everytime the cursor information is updated.
+     */
+    private long version;
+
+    @Value
+    public static class CursorInfo {
+        ManagedCursor cursor;
+        PositionImpl position;
+
+        /**
+         * Cursor info's version.
+         * <p>
+         * Use {@link  DataVersion#compareVersions(long, long)} to compare between two versions,
+         * since it rolls over to 0 once reaching Long.MAX_VALUE
+         */
+        long version;
+    }
+
     private static class Item {
         final ManagedCursor cursor;
         PositionImpl position;
@@ -56,10 +77,66 @@ public class ManagedCursorContainer implements Iterable<ManagedCursor> {
         }
     }
 
-    public ManagedCursorContainer() {
+    /**
+     * Utility class to manage a data version, which rolls over to 0 when reaching Long.MAX_VALUE.
+     */
+    @UtilityClass
+    public class DataVersion {
+
+        /**
+         * Compares two data versions, which either rolls overs to 0 when reaching Long.MAX_VALUE.
+         * <p>
+         * Use {@link DataVersion#getNextVersion(long)} to increment the versions. The assumptions
+         * are that metric versions are compared with close time proximity one to another, hence,
+         * they are expected not close to each other in terms of distance, hence we don't
+         * expect the distance ever to exceed Long.MAX_VALUE / 2, otherwise we wouldn't be able
+         * to know which one is a later version in case the furthest rolls over to beyond 0. We
+         * assume the shortest distance between them dictates that.
+         * <p>
+         * @param v1 First version to compare
+         * @param v2 Second version to compare
+         * @return the value {@code 0} if {@code v1 == v2};
+         *         a value less than {@code 0} if {@code v1 < v2}; and
+         *         a value greater than {@code 0} if {@code v1 > v2}
+         */
+        public static int compareVersions(long v1, long v2) {
+            if (v1 == v2) {
+                return 0;
+            }
+
+            // 0-------v1--------v2--------MAX_LONG
+            if (v2 > v1) {
+                long distance = v2 - v1;
+                long wrapAroundDistance = (Long.MAX_VALUE - v2) + v1;
+                if (distance < wrapAroundDistance) {
+                    return -1;
+                } else {
+                    return 1;
+                }
+
+            // 0-------v2--------v1--------MAX_LONG
+            } else {
+                long distance = v1 - v2;
+                long wrapAroundDistance = (Long.MAX_VALUE - v1) + v2;
+                if (distance < wrapAroundDistance) {
+                    return 1; // v1 is bigger
+                } else {
+                    return -1; // v2 is bigger
+                }
+            }
+        }
 
+        public static long getNextVersion(long existingVersion) {
+            if (existingVersion == Long.MAX_VALUE) {
+                return 0;
+            } else {
+                return existingVersion + 1;
+            }
+        }
     }
 
+    public ManagedCursorContainer() {}
+
     // Used to keep track of slowest cursor.
     private final ArrayList<Item> heap = new ArrayList<>();
 
@@ -94,6 +171,7 @@ public class ManagedCursorContainer implements Iterable<ManagedCursor> {
             if (cursor.isDurable()) {
                 durableCursorCount++;
             }
+            version = DataVersion.getNextVersion(version);
         } finally {
             rwLock.unlockWrite(stamp);
         }
@@ -129,6 +207,7 @@ public class ManagedCursorContainer implements Iterable<ManagedCursor> {
                 if (item.cursor.isDurable()) {
                     durableCursorCount--;
                 }
+                version = DataVersion.getNextVersion(version);
                 return true;
             } else {
                 return false;
@@ -162,6 +241,7 @@ public class ManagedCursorContainer implements Iterable<ManagedCursor> {
 
             PositionImpl previousSlowestConsumer = heap.get(0).position;
             item.position = (PositionImpl) newPosition;
+            version = DataVersion.getNextVersion(version);
 
             if (heap.size() == 1) {
                 return Pair.of(previousSlowestConsumer, item.position);
@@ -204,6 +284,24 @@ public class ManagedCursorContainer implements Iterable<ManagedCursor> {
         }
     }
 
+    /**
+     * @return Returns the CursorInfo for the cursor with the oldest position,
+     *         or null if there aren't any tracked cursors
+     */
+    public CursorInfo getCursorWithOldestPosition() {
+        long stamp = rwLock.readLock();
+        try {
+            if (heap.isEmpty()) {
+                return null;
+            } else {
+                Item item = heap.get(0);
+                return new CursorInfo(item.cursor, item.position, version);
+            }
+        } finally {
+            rwLock.unlockRead(stamp);
+        }
+    }
+
     /**
      *  Check whether there are any cursors.
      * @return true is there are no cursors and false if there are
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 948eacce72d..569776edccf 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -323,7 +323,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
 
     /**
      * This variable is used for testing the tests.
-     * {@link ManagedLedgerTest#testManagedLedgerWithPlacementPolicyInCustomMetadata()}
+     * ManagedLedgerTest#testManagedLedgerWithPlacementPolicyInCustomMetadata()
      */
     @VisibleForTesting
     Map<String, byte[]> createdLedgerCustomMetadata;
@@ -2129,6 +2129,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
     }
 
     protected void asyncReadEntry(ReadHandle ledger, PositionImpl position, ReadEntryCallback callback, Object ctx) {
+        mbean.addEntriesRead(1);
         if (config.getReadEntryTimeoutSeconds() > 0) {
             // set readOpCount to uniquely validate if ReadEntryCallbackWrapper is already recycled
             long readOpCount = READ_OP_COUNT_UPDATER.incrementAndGet(this);
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java
index e057dee9953..3935828ff3d 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java
@@ -41,6 +41,7 @@ public class ManagedLedgerMBeanImpl implements ManagedLedgerMXBean {
     private final Rate readEntriesOpsFailed = new Rate();
     private final Rate readEntriesOpsCacheMisses = new Rate();
     private final Rate markDeleteOps = new Rate();
+    private final Rate entriesRead = new Rate();
 
     private final LongAdder dataLedgerOpenOp = new LongAdder();
     private final LongAdder dataLedgerCloseOp = new LongAdder();
@@ -80,6 +81,7 @@ public class ManagedLedgerMBeanImpl implements ManagedLedgerMXBean {
         ledgerAddEntryLatencyStatsUsec.refresh();
         ledgerSwitchLatencyStatsUsec.refresh();
         entryStats.refresh();
+        entriesRead.calculateRate(seconds);
     }
 
     public void addAddEntrySample(long size) {
@@ -120,6 +122,10 @@ public class ManagedLedgerMBeanImpl implements ManagedLedgerMXBean {
         readEntriesOps.recordMultipleEvents(count, totalSize);
     }
 
+    public void addEntriesRead(int count) {
+        entriesRead.recordEvent(count);
+    }
+
     public void startDataLedgerOpenOp() {
         dataLedgerOpenOp.increment();
     }
@@ -189,6 +195,11 @@ public class ManagedLedgerMBeanImpl implements ManagedLedgerMXBean {
         return managedLedger.getName();
     }
 
+    @Override
+    public long getEntriesReadTotalCount() {
+        return entriesRead.getTotalCount();
+    }
+
     @Override
     public double getAddEntryMessagesRate() {
         return addEntryOps.getRate();
@@ -333,5 +344,4 @@ public class ManagedLedgerMBeanImpl implements ManagedLedgerMXBean {
         result.cursorLedgerDeleteOp = cursorLedgerDeleteOp.longValue();
         return result;
     }
-
 }
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
index 04d99d3bdf4..f0b3efe39d6 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.bookkeeper.mledger.impl;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.spy;
 import static org.testng.Assert.assertEquals;
@@ -46,7 +47,6 @@ import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedCursorMXBean;
 import org.apache.bookkeeper.mledger.ManagedLedger;
-import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.Position;
 import org.testng.annotations.Test;
 
@@ -105,7 +105,7 @@ public class ManagedCursorContainerTest {
         }
 
         @Override
-        public List<Entry> readEntries(int numberOfEntriesToRead) throws ManagedLedgerException {
+        public List<Entry> readEntries(int numberOfEntriesToRead) {
             return new ArrayList();
         }
 
@@ -137,14 +137,14 @@ public class ManagedCursorContainerTest {
         }
 
         @Override
-        public void markDelete(Position position) throws ManagedLedgerException {
+        public void markDelete(Position position) {
             markDelete(position, Collections.emptyMap());
         }
 
         @Override
-        public void markDelete(Position position, Map<String, Long> properties) throws ManagedLedgerException {
+        public void markDelete(Position position, Map<String, Long> properties) {
             this.position = position;
-            container.cursorUpdated(this, (PositionImpl) position);
+            container.cursorUpdated(this, position);
         }
 
         @Override
@@ -209,7 +209,7 @@ public class ManagedCursorContainerTest {
         }
 
         @Override
-        public void delete(Position position) throws InterruptedException, ManagedLedgerException {
+        public void delete(Position position) {
         }
 
         @Override
@@ -217,7 +217,7 @@ public class ManagedCursorContainerTest {
         }
 
         @Override
-        public void delete(Iterable<Position> positions) throws InterruptedException, ManagedLedgerException {
+        public void delete(Iterable<Position> positions) {
         }
 
         @Override
@@ -225,7 +225,7 @@ public class ManagedCursorContainerTest {
         }
 
         @Override
-        public void clearBacklog() throws InterruptedException, ManagedLedgerException {
+        public void clearBacklog() {
         }
 
         @Override
@@ -233,8 +233,7 @@ public class ManagedCursorContainerTest {
         }
 
         @Override
-        public void skipEntries(int numEntriesToSkip, IndividualDeletedEntries deletedEntries)
-                throws InterruptedException, ManagedLedgerException {
+        public void skipEntries(int numEntriesToSkip, IndividualDeletedEntries deletedEntries) {
         }
 
         @Override
@@ -243,13 +242,12 @@ public class ManagedCursorContainerTest {
         }
 
         @Override
-        public Position findNewestMatching(Predicate<Entry> condition)
-                throws InterruptedException, ManagedLedgerException {
+        public Position findNewestMatching(Predicate<Entry> condition) {
             return null;
         }
 
         @Override
-        public Position findNewestMatching(FindPositionConstraint constraint, Predicate<Entry> condition) throws InterruptedException, ManagedLedgerException {
+        public Position findNewestMatching(FindPositionConstraint constraint, Predicate<Entry> condition) {
             return null;
         }
 
@@ -270,7 +268,7 @@ public class ManagedCursorContainerTest {
         }
 
         @Override
-        public void resetCursor(final Position position) throws ManagedLedgerException, InterruptedException {
+        public void resetCursor(final Position position) {
 
         }
 
@@ -284,8 +282,7 @@ public class ManagedCursorContainerTest {
         }
 
         @Override
-        public List<Entry> replayEntries(Set<? extends Position> positions)
-                throws InterruptedException, ManagedLedgerException {
+        public List<Entry> replayEntries(Set<? extends Position> positions) {
             return null;
         }
 
@@ -300,8 +297,7 @@ public class ManagedCursorContainerTest {
         }
 
         @Override
-        public List<Entry> readEntriesOrWait(int numberOfEntriesToRead)
-                throws InterruptedException, ManagedLedgerException {
+        public List<Entry> readEntriesOrWait(int numberOfEntriesToRead) {
             return null;
         }
 
@@ -322,8 +318,7 @@ public class ManagedCursorContainerTest {
         }
 
         @Override
-        public Entry getNthEntry(int N, IndividualDeletedEntries deletedEntries)
-                throws InterruptedException, ManagedLedgerException {
+        public Entry getNthEntry(int N, IndividualDeletedEntries deletedEntries) {
             return null;
         }
 
@@ -399,13 +394,8 @@ public class ManagedCursorContainerTest {
             return null;
         }
 
-        public void asyncReadEntriesOrWait(int maxEntries, long maxSizeBytes, ReadEntriesCallback callback,
-                Object ctx) {
-        }
-
         @Override
-        public List<Entry> readEntriesOrWait(int maxEntries, long maxSizeBytes)
-                throws InterruptedException, ManagedLedgerException {
+        public List<Entry> readEntriesOrWait(int maxEntries, long maxSizeBytes) {
             return null;
         }
 
@@ -421,7 +411,7 @@ public class ManagedCursorContainerTest {
     }
 
     @Test
-    public void testSlowestReadPositionForActiveCursors() throws Exception {
+    public void testSlowestReadPositionForActiveCursors() {
         ManagedCursorContainer container = new ManagedCursorContainer();
         assertNull(container.getSlowestReaderPosition());
 
@@ -466,14 +456,20 @@ public class ManagedCursorContainerTest {
         ManagedCursor cursor1 = new MockManagedCursor(container, "test1", new PositionImpl(5, 5));
         container.add(cursor1, cursor1.getMarkDeletedPosition());
         assertEquals(container.getSlowestReaderPosition(), new PositionImpl(5, 5));
+        assertEqualsCursorAndPosition(container.getCursorWithOldestPosition(),
+                cursor1, new PositionImpl(5, 5));
 
         ManagedCursor cursor2 = new MockManagedCursor(container, "test2", new PositionImpl(2, 2));
         container.add(cursor2, cursor2.getMarkDeletedPosition());
         assertEquals(container.getSlowestReaderPosition(), new PositionImpl(2, 2));
+        assertEqualsCursorAndPosition(container.getCursorWithOldestPosition(),
+                cursor2, new PositionImpl(2, 2));
 
         ManagedCursor cursor3 = new MockManagedCursor(container, "test3", new PositionImpl(2, 0));
         container.add(cursor3, cursor3.getMarkDeletedPosition());
         assertEquals(container.getSlowestReaderPosition(), new PositionImpl(2, 0));
+        assertEqualsCursorAndPosition(container.getCursorWithOldestPosition(),
+                cursor3, new PositionImpl(2, 0));
 
         assertEquals(container.toString(), "[test1=5:5, test2=2:2, test3=2:0]");
 
@@ -487,6 +483,8 @@ public class ManagedCursorContainerTest {
 
         cursor3.markDelete(new PositionImpl(3, 0));
         assertEquals(container.getSlowestReaderPosition(), new PositionImpl(2, 2));
+        assertEqualsCursorAndPosition(container.getCursorWithOldestPosition(),
+                cursor2, new PositionImpl(2, 2));
 
         cursor2.markDelete(new PositionImpl(10, 5));
         assertEquals(container.getSlowestReaderPosition(), new PositionImpl(3, 0));
@@ -498,6 +496,8 @@ public class ManagedCursorContainerTest {
         container.removeCursor(cursor5.getName());
         container.removeCursor(cursor1.getName());
         assertEquals(container.getSlowestReaderPosition(), new PositionImpl(4, 0));
+        assertEqualsCursorAndPosition(container.getCursorWithOldestPosition(),
+                cursor4, new PositionImpl(4, 0));
 
         assertTrue(container.hasDurableCursors());
 
@@ -514,7 +514,7 @@ public class ManagedCursorContainerTest {
     }
 
     @Test
-    public void updatingCursorOutsideContainer() throws Exception {
+    public void updatingCursorOutsideContainer() {
         ManagedCursorContainer container = new ManagedCursorContainer();
 
         ManagedCursor cursor1 = new MockManagedCursor(container, "test1", new PositionImpl(5, 5));
@@ -533,10 +533,19 @@ public class ManagedCursorContainerTest {
         container.cursorUpdated(cursor2, cursor2.position);
 
         assertEquals(container.getSlowestReaderPosition(), new PositionImpl(5, 5));
+        assertEqualsCursorAndPosition(container.getCursorWithOldestPosition(),
+                cursor1, new PositionImpl(5, 5));
+    }
+
+    private void assertEqualsCursorAndPosition(ManagedCursorContainer.CursorInfo cursorInfo,
+                                               ManagedCursor expectedCursor,
+                                               PositionImpl expectedPosition) {
+        assertThat(cursorInfo.getCursor().getName()).isEqualTo(expectedCursor.getName());
+        assertThat(cursorInfo.getPosition()).isEqualTo(expectedPosition);
     }
 
     @Test
-    public void removingCursor() throws Exception {
+    public void removingCursor() {
         ManagedCursorContainer container = new ManagedCursorContainer();
 
         ManagedCursor cursor1 = new MockManagedCursor(container, "test1", new PositionImpl(5, 5));
@@ -607,7 +616,7 @@ public class ManagedCursorContainerTest {
     }
 
     @Test
-    public void orderingWithUpdates() throws Exception {
+    public void orderingWithUpdates() {
         ManagedCursorContainer container = new ManagedCursorContainer();
 
         MockManagedCursor c1 = new MockManagedCursor(container, "test1", new PositionImpl(5, 5));
@@ -672,7 +681,7 @@ public class ManagedCursorContainerTest {
     }
 
     @Test
-    public void orderingWithUpdatesAndReset() throws Exception {
+    public void orderingWithUpdatesAndReset() {
         ManagedCursorContainer container = new ManagedCursorContainer();
 
         MockManagedCursor c1 = new MockManagedCursor(container, "test1", new PositionImpl(5, 5));
@@ -735,4 +744,56 @@ public class ManagedCursorContainerTest {
 
         assertFalse(container.hasDurableCursors());
     }
+
+    @Test
+    public void testDataVersion() {
+        assertThat(ManagedCursorContainer.DataVersion.compareVersions(1L, 3L)).isNegative();
+        assertThat(ManagedCursorContainer.DataVersion.compareVersions(3L, 1L)).isPositive();
+        assertThat(ManagedCursorContainer.DataVersion.compareVersions(3L, 3L)).isZero();
+
+        long v1 = Long.MAX_VALUE - 1;
+        long v2 = ManagedCursorContainer.DataVersion.getNextVersion(v1);
+
+        assertThat(ManagedCursorContainer.DataVersion.compareVersions(v1, v2)).isNegative();
+
+        v2 = ManagedCursorContainer.DataVersion.getNextVersion(v2);
+        assertThat(ManagedCursorContainer.DataVersion.compareVersions(v1, v2)).isNegative();
+
+        v1 = ManagedCursorContainer.DataVersion.getNextVersion(v1);
+        assertThat(ManagedCursorContainer.DataVersion.compareVersions(v1, v2)).isNegative();
+
+        v1 = ManagedCursorContainer.DataVersion.getNextVersion(v1);
+        assertThat(ManagedCursorContainer.DataVersion.compareVersions(v1, v2)).isZero();
+
+        v1 = ManagedCursorContainer.DataVersion.getNextVersion(v1);
+        assertThat(ManagedCursorContainer.DataVersion.compareVersions(v1, v2)).isPositive();
+    }
+
+    @Test
+    public void testVersions() {
+        ManagedCursorContainer container = new ManagedCursorContainer();
+
+        MockManagedCursor c1 = new MockManagedCursor(container, "test1", new PositionImpl(5, 5));
+        MockManagedCursor c2 = new MockManagedCursor(container, "test2", new PositionImpl(5, 1));
+
+        container.add(c1, c1.getMarkDeletedPosition());
+        long version = container.getCursorWithOldestPosition().getVersion();
+
+        container.add(c2, c2.getMarkDeletedPosition());
+        long newVersion = container.getCursorWithOldestPosition().getVersion();
+        // newVersion > version
+        assertThat(ManagedCursorContainer.DataVersion.compareVersions(newVersion, version)).isPositive();
+        version = newVersion;
+
+        container.cursorUpdated(c2, new PositionImpl(5, 8));
+        newVersion = container.getCursorWithOldestPosition().getVersion();
+        // newVersion > version
+        assertThat(ManagedCursorContainer.DataVersion.compareVersions(newVersion, version)).isPositive();
+        version = newVersion;
+
+        container.removeCursor("test2");
+        newVersion = container.getCursorWithOldestPosition().getVersion();
+        // newVersion > version
+        assertThat(ManagedCursorContainer.DataVersion.compareVersions(newVersion, version)).isPositive();
+    }
 }
diff --git a/pom.xml b/pom.xml
index aac52933bc0..c62893014ba 100644
--- a/pom.xml
+++ b/pom.xml
@@ -252,6 +252,7 @@ flexible messaging model and an intuitive client API.</description>
     <!-- test dependencies -->
     <testcontainers.version>1.18.3</testcontainers.version>
     <hamcrest.version>2.2</hamcrest.version>
+    <restassured.version>5.4.0</restassured.version>
 
     <!-- Set docker-java.version to the version of docker-java used in Testcontainers -->
     <docker-java.version>3.3.0</docker-java.version>
@@ -1425,6 +1426,13 @@ flexible messaging model and an intuitive client API.</description>
         <artifactId>checker-qual</artifactId>
         <version>${checkerframework.version}</version>
       </dependency>
+
+      <dependency>
+        <groupId>io.rest-assured</groupId>
+        <artifactId>rest-assured</artifactId>
+        <version>${restassured.version}</version>
+        <scope>test</scope>
+      </dependency>
     </dependencies>
   </dependencyManagement>
 
diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml
index 65fe86f5ba8..054fcd87335 100644
--- a/pulsar-broker/pom.xml
+++ b/pulsar-broker/pom.xml
@@ -26,7 +26,7 @@
     <groupId>org.apache.pulsar</groupId>
     <artifactId>pulsar</artifactId>
     <version>3.3.0-SNAPSHOT</version>
-    <relativePath>..</relativePath>
+    <relativePath>../pom.xml</relativePath>
   </parent>
 
   <artifactId>pulsar-broker</artifactId>
@@ -430,6 +430,12 @@
       <artifactId>javax.activation</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>io.rest-assured</groupId>
+      <artifactId>rest-assured</artifactId>
+      <scope>test</scope>
+    </dependency>
+
     <!-- transaction related dependencies (begin) -->
 
     <dependency>
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
index 6ad1697adfc..c889062088e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
@@ -18,10 +18,12 @@
  */
 package org.apache.pulsar.broker.service;
 
+import static java.util.concurrent.TimeUnit.SECONDS;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedCursor.IndividualDeletedEntries;
@@ -32,6 +34,7 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.resources.NamespaceResources;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.service.persistent.PersistentTopicMetrics.BacklogQuotaMetrics;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
@@ -41,6 +44,7 @@ import org.apache.pulsar.metadata.api.MetadataStoreException;
 
 @Slf4j
 public class BacklogQuotaManager {
+    @Getter
     private final BacklogQuotaImpl defaultQuota;
     private final NamespaceResources namespaceResources;
 
@@ -55,10 +59,6 @@ public class BacklogQuotaManager {
         this.namespaceResources = pulsar.getPulsarResources().getNamespaceResources();
     }
 
-    public BacklogQuotaImpl getDefaultQuota() {
-        return this.defaultQuota;
-    }
-
     public BacklogQuotaImpl getBacklogQuota(NamespaceName namespace, BacklogQuotaType backlogQuotaType) {
         try {
             if (namespaceResources == null) {
@@ -86,30 +86,34 @@ public class BacklogQuotaManager {
     public void handleExceededBacklogQuota(PersistentTopic persistentTopic, BacklogQuotaType backlogQuotaType,
                                            boolean preciseTimeBasedBacklogQuotaCheck) {
         BacklogQuota quota = persistentTopic.getBacklogQuota(backlogQuotaType);
+        BacklogQuotaMetrics topicBacklogQuotaMetrics =
+                persistentTopic.getPersistentTopicMetrics().getBacklogQuotaMetrics();
         log.info("Backlog quota type {} exceeded for topic [{}]. Applying [{}] policy", backlogQuotaType,
                 persistentTopic.getName(), quota.getPolicy());
         switch (quota.getPolicy()) {
-        case consumer_backlog_eviction:
-            switch (backlogQuotaType) {
-                case destination_storage:
+            case consumer_backlog_eviction:
+                switch (backlogQuotaType) {
+                    case destination_storage:
                         dropBacklogForSizeLimit(persistentTopic, quota);
+                        topicBacklogQuotaMetrics.recordSizeBasedBacklogEviction();
                         break;
-                case message_age:
+                    case message_age:
                         dropBacklogForTimeLimit(persistentTopic, quota, preciseTimeBasedBacklogQuotaCheck);
+                        topicBacklogQuotaMetrics.recordTimeBasedBacklogEviction();
                         break;
-                default:
-                    break;
-            }
-            break;
-        case producer_exception:
-        case producer_request_hold:
-            if (!advanceSlowestSystemCursor(persistentTopic)) {
-                // The slowest is not a system cursor. Disconnecting producers to put backpressure.
-                disconnectProducers(persistentTopic);
-            }
-            break;
-        default:
-            break;
+                    default:
+                        break;
+                }
+                break;
+            case producer_exception:
+            case producer_request_hold:
+                if (!advanceSlowestSystemCursor(persistentTopic)) {
+                    // The slowest is not a system cursor. Disconnecting producers to put backpressure.
+                    disconnectProducers(persistentTopic);
+                }
+                break;
+            default:
+                break;
         }
     }
 
@@ -210,7 +214,7 @@ public class BacklogQuotaManager {
             );
         } else {
             // If disabled precise time based backlog quota check, will try to remove whole ledger from cursor's backlog
-            Long currentMillis = ((ManagedLedgerImpl) persistentTopic.getManagedLedger()).getClock().millis();
+            long currentMillis = ((ManagedLedgerImpl) persistentTopic.getManagedLedger()).getClock().millis();
             ManagedLedgerImpl mLedger = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
             try {
                 for (; ; ) {
@@ -229,7 +233,7 @@ public class BacklogQuotaManager {
                     }
                     // Timestamp only > 0 if ledger has been closed
                     if (ledgerInfo.getTimestamp() > 0
-                            && currentMillis - ledgerInfo.getTimestamp() > quota.getLimitTime() * 1000) {
+                            && currentMillis - ledgerInfo.getTimestamp() > SECONDS.toMillis(quota.getLimitTime())) {
                         // skip whole ledger for the slowest cursor
                         PositionImpl nextPosition =
                                 PositionImpl.get(mLedger.getNextValidLedger(ledgerInfo.getLedgerId()), -1);
@@ -263,19 +267,20 @@ public class BacklogQuotaManager {
             futures.add(producer.disconnect());
         });
 
-        FutureUtil.waitForAll(futures).thenRun(() -> {
-            log.info("All producers on topic [{}] are disconnected", persistentTopic.getName());
-        }).exceptionally(exception -> {
-            log.error("Error in disconnecting producers on topic [{}] [{}]", persistentTopic.getName(), exception);
-            return null;
-
+        FutureUtil.waitForAll(futures)
+                .thenRun(() ->
+                        log.info("All producers on topic [{}] are disconnected", persistentTopic.getName()))
+                .exceptionally(exception -> {
+                    log.error("Error in disconnecting producers on topic [{}] [{}]", persistentTopic.getName(),
+                            exception);
+                    return null;
         });
     }
 
     /**
      * Advances the slowest cursor if that is a system cursor.
      *
-     * @param persistentTopic
+     * @param persistentTopic Persistent topic
      * @return true if the slowest cursor is a system cursor
      */
     private boolean advanceSlowestSystemCursor(PersistentTopic persistentTopic) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index ae7744fef7c..1f5fc7c2f4b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.service;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static java.util.Objects.requireNonNull;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.apache.bookkeeper.mledger.ManagedLedgerConfig.PROPERTY_SOURCE_TOPIC_KEY;
 import static org.apache.commons.collections4.CollectionUtils.isEmpty;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
@@ -37,6 +38,7 @@ import io.netty.channel.EventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.handler.ssl.SslContext;
 import io.netty.util.concurrent.DefaultThreadFactory;
+import io.prometheus.client.Histogram;
 import java.io.Closeable;
 import java.io.IOException;
 import java.lang.reflect.Field;
@@ -199,6 +201,12 @@ public class BrokerService implements Closeable {
     private static final double GRACEFUL_SHUTDOWN_QUIET_PERIOD_RATIO_OF_TOTAL_TIMEOUT = 0.25d;
     private static final double GRACEFUL_SHUTDOWN_TIMEOUT_RATIO_OF_TOTAL_TIMEOUT = 0.5d;
 
+    private static final Histogram backlogQuotaCheckDuration = Histogram.build()
+            .name("pulsar_storage_backlog_quota_check_duration_seconds")
+            .help("The duration of the backlog quota check process.")
+            .buckets(5, 10, 30, 60, 300)
+            .register();
+
     private final PulsarService pulsar;
     private final ManagedLedgerFactory managedLedgerFactory;
 
@@ -838,7 +846,7 @@ public class BrokerService implements Closeable {
         long timeout = (long) (GRACEFUL_SHUTDOWN_TIMEOUT_RATIO_OF_TOTAL_TIMEOUT * brokerShutdownTimeoutMs);
         return NettyFutureUtil.toCompletableFutureVoid(
                 eventLoopGroup.shutdownGracefully(quietPeriod,
-                        timeout, TimeUnit.MILLISECONDS));
+                        timeout, MILLISECONDS));
     }
 
     private CompletableFuture<Void> closeChannel(Channel channel) {
@@ -892,8 +900,8 @@ public class BrokerService implements Closeable {
                                 rateLimiter.acquire(1);
                             }
                             long timeout = pulsar.getConfiguration().getNamespaceBundleUnloadingTimeoutMs();
-                            pulsar.getNamespaceService().unloadNamespaceBundle(su, timeout, TimeUnit.MILLISECONDS,
-                                    closeWithoutWaitingClientDisconnect).get(timeout, TimeUnit.MILLISECONDS);
+                            pulsar.getNamespaceService().unloadNamespaceBundle(su, timeout, MILLISECONDS,
+                                    closeWithoutWaitingClientDisconnect).get(timeout, MILLISECONDS);
                         } catch (Exception e) {
                             log.warn("Failed to unload namespace bundle {}", su, e);
                         }
@@ -2066,6 +2074,7 @@ public class BrokerService implements Closeable {
     }
 
     public void monitorBacklogQuota() {
+        long startTimeMillis = System.currentTimeMillis();
         forEachPersistentTopic(topic -> {
             if (topic.isSizeBacklogExceeded()) {
                 getBacklogQuotaManager().handleExceededBacklogQuota(topic,
@@ -2085,6 +2094,9 @@ public class BrokerService implements Closeable {
                     log.error("Error when checkTimeBacklogExceeded({}) in monitorBacklogQuota",
                             topic.getName(), throwable);
                     return null;
+                }).whenComplete((unused, throwable) -> {
+                    backlogQuotaCheckDuration.observe(
+                            MILLISECONDS.toSeconds(System.currentTimeMillis() - startTimeMillis));
                 });
             }
         });
@@ -2580,7 +2592,7 @@ public class BrokerService implements Closeable {
         //  add listener to notify broker managedLedgerCacheEvictionTimeThresholdMillis dynamic config
         registerConfigurationListener(
                 "managedLedgerCacheEvictionTimeThresholdMillis", (cacheEvictionTimeThresholdMills) -> {
-            managedLedgerFactory.updateCacheEvictionTimeThreshold(TimeUnit.MILLISECONDS
+            managedLedgerFactory.updateCacheEvictionTimeThreshold(MILLISECONDS
                     .toNanos((long) cacheEvictionTimeThresholdMills));
         });
 
@@ -3015,7 +3027,7 @@ public class BrokerService implements Closeable {
             pendingTopic.getTopicFuture()
                     .completeExceptionally((e instanceof RuntimeException && e.getCause() != null) ? e.getCause() : e);
             // schedule to process next pending topic
-            inactivityMonitor.schedule(this::createPendingLoadTopic, 100, TimeUnit.MILLISECONDS);
+            inactivityMonitor.schedule(this::createPendingLoadTopic, 100, MILLISECONDS);
             return null;
         });
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index 343aef09c1c..a296052a411 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -68,7 +68,7 @@ public interface Topic {
 
         /**
          * Return the producer name for the original producer.
-         *
+         * <p>
          * For messages published locally, this will return the same local producer name, though in case of replicated
          * messages, the original producer name will differ
          */
@@ -136,7 +136,7 @@ public interface Topic {
     /**
      * Tries to add a producer to the topic. Several validations will be performed.
      *
-     * @param producer
+     * @param producer Producer to add
      * @param producerQueuedFuture
      *            a future that will be triggered if the producer is being queued up prior of getting established
      * @return the "topic epoch" if there is one or empty
@@ -148,7 +148,7 @@ public interface Topic {
     /**
      * Wait TransactionBuffer Recovers completely.
      * Take snapshot after TB Recovers completely.
-     * @param isTxnEnabled
+     * @param isTxnEnabled isTxnEnabled
      * @return a future which has completely if isTxn = false. Or a future return by takeSnapshot.
      */
     CompletableFuture<Void> checkIfTransactionBufferRecoverCompletely(boolean isTxnEnabled);
@@ -243,6 +243,13 @@ public interface Topic {
 
     BacklogQuota getBacklogQuota(BacklogQuotaType backlogQuotaType);
 
+    /**
+     * Uses the best-effort (not necessarily up-to-date) information available to return the age.
+     * @return The oldest unacknowledged message age in seconds, or -1 if not available
+     */
+    long getBestEffortOldestUnacknowledgedMessageAgeSeconds();
+
+
     void updateRates(NamespaceStats nsStats, NamespaceBundleStats currentBundleStats,
             StatsOutputStream topicStatsStream, ClusterReplicationMetrics clusterReplicationMetrics,
             String namespaceName, boolean hydratePublishers);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index a59190ea6fc..7dc0f949904 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -168,7 +168,7 @@ public class NonPersistentTopic extends AbstractTopic implements Topic, TopicPol
                 .getPoliciesAsync(TopicName.get(topic).getNamespaceObject())
                 .thenCompose(optPolicies -> {
                     final Policies policies;
-                    if (!optPolicies.isPresent()) {
+                    if (optPolicies.isEmpty()) {
                         log.warn("[{}] Policies not present and isEncryptionRequired will be set to false", topic);
                         isEncryptionRequired = false;
                         policies = new Policies();
@@ -1261,4 +1261,9 @@ public class NonPersistentTopic extends AbstractTopic implements Topic, TopicPol
     public boolean isPersistent() {
         return false;
     }
+
+    @Override
+    public long getBestEffortOldestUnacknowledgedMessageAgeSeconds() {
+        return -1;
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 8ae0546f051..a5240372d45 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker.service.persistent;
 
 import static java.util.Objects.requireNonNull;
+import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.commons.lang3.StringUtils.isBlank;
 import static org.apache.pulsar.broker.service.persistent.SubscribeRateLimiter.isSubscribeRateEnabled;
 import static org.apache.pulsar.common.naming.SystemTopicNames.isEventSystemTopic;
@@ -48,10 +49,12 @@ import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import java.util.function.BiFunction;
 import java.util.stream.Collectors;
 import javax.annotation.Nonnull;
 import lombok.Getter;
+import lombok.Value;
 import org.apache.bookkeeper.client.api.LedgerMetadata;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
@@ -73,6 +76,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.MetadataNotFoundExce
 import org.apache.bookkeeper.mledger.ManagedLedgerException.NonRecoverableLedgerException;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorContainer;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorContainer.CursorInfo;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
@@ -264,10 +268,34 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
     protected final TransactionBuffer transactionBuffer;
 
     // Record the last time a data message (ie: not an internal Pulsar marker) is published on the topic
+    @Getter
     private volatile long lastDataMessagePublishedTimestamp = 0;
     @Getter
     private final ExecutorService orderedExecutor;
 
+    @Getter
+    private final PersistentTopicMetrics persistentTopicMetrics = new PersistentTopicMetrics();
+
+    private volatile TimeBasedBacklogQuotaCheckResult timeBasedBacklogQuotaCheckResult;
+    private static final AtomicReferenceFieldUpdater<PersistentTopic, TimeBasedBacklogQuotaCheckResult>
+            TIME_BASED_BACKLOG_QUOTA_CHECK_RESULT_UPDATER = AtomicReferenceFieldUpdater.newUpdater(
+            PersistentTopic.class,
+            TimeBasedBacklogQuotaCheckResult.class,
+            "timeBasedBacklogQuotaCheckResult");
+    @Value
+    private static class TimeBasedBacklogQuotaCheckResult {
+        PositionImpl oldestCursorMarkDeletePosition;
+        String cursorName;
+        long positionPublishTimestampInMillis;
+        long dataVersion;
+    }
+
+    @Value
+    private static class EstimateTimeBasedBacklogQuotaCheckResult {
+        boolean truncateBacklogToMatchQuota;
+        Long estimatedOldestUnacknowledgedMessageTimestamp;
+    }
+
     private static class TopicStatsHelper {
         public double averageMsgSize;
         public double aggMsgRateIn;
@@ -480,7 +508,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
             if (!lock.writeLock().tryLock()) {
                 return CompletableFuture.failedFuture(new SubscriptionConflictUnloadException(String.format("Conflict"
                         + " topic-close, topic-delete, another-subscribe-unload, cannot unload subscription %s now",
-                        topic, subName)));
+                        subName)));
             }
             try {
                 if (isFenced) {
@@ -1131,7 +1159,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
     private void resetSubscriptionCursor(Subscription subscription, CompletableFuture<Subscription> subscriptionFuture,
                                          long startMessageRollbackDurationSec) {
         long timestamp = System.currentTimeMillis()
-                - TimeUnit.SECONDS.toMillis(startMessageRollbackDurationSec);
+                - SECONDS.toMillis(startMessageRollbackDurationSec);
         final Subscription finalSubscription = subscription;
         subscription.resetCursor(timestamp).handle((s, ex) -> {
             if (ex != null) {
@@ -1635,7 +1663,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
             if (!(th.getCause() instanceof TopicFencedException)) {
                 // retriable exception
                 brokerService.executor().schedule(this::checkReplicationAndRetryOnFailure,
-                        POLICY_UPDATE_FAILURE_RETRY_TIME_SECONDS, TimeUnit.SECONDS);
+                        POLICY_UPDATE_FAILURE_RETRY_TIME_SECONDS, SECONDS);
             }
             result.completeExceptionally(th);
             return null;
@@ -2416,6 +2444,19 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         stats.lastOffloadFailureTimeStamp = ledger.getLastOffloadedFailureTimestamp();
         Optional<CompactorMXBean> mxBean = getCompactorMXBean();
 
+        stats.backlogQuotaLimitSize = getBacklogQuota(BacklogQuotaType.destination_storage).getLimitSize();
+        stats.backlogQuotaLimitTime = getBacklogQuota(BacklogQuotaType.message_age).getLimitTime();
+
+        TimeBasedBacklogQuotaCheckResult backlogQuotaCheckResult = timeBasedBacklogQuotaCheckResult;
+        stats.oldestBacklogMessageAgeSeconds = (backlogQuotaCheckResult == null)
+            ? (long) -1
+                : TimeUnit.MILLISECONDS.toSeconds(
+                Clock.systemUTC().millis() - backlogQuotaCheckResult.getPositionPublishTimestampInMillis());
+
+        stats.oldestBacklogMessageSubscriptionName = (backlogQuotaCheckResult == null)
+            ? null
+            : backlogQuotaCheckResult.getCursorName();
+
         stats.compaction.reset();
         mxBean.flatMap(bean -> bean.getCompactionRecordForTopic(topic)).map(compactionRecord -> {
             stats.compaction.lastCompactionRemovedEventCount = compactionRecord.getLastCompactionRemovedEventCount();
@@ -2874,7 +2915,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         int maxInactiveDurationInSec = topicPolicies.getInactiveTopicPolicies().get().getMaxInactiveDurationSeconds();
         if (isActive(deleteMode)) {
             lastActive = System.nanoTime();
-        } else if (System.nanoTime() - lastActive < TimeUnit.SECONDS.toNanos(maxInactiveDurationInSec)) {
+        } else if (System.nanoTime() - lastActive < SECONDS.toNanos(maxInactiveDurationInSec)) {
             // Gc interval did not expire yet
             return;
         } else if (shouldTopicBeRetained()) {
@@ -3206,36 +3247,128 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         return (storageSize >= backlogQuotaLimitInBytes);
     }
 
+    @Override
+    public long getBestEffortOldestUnacknowledgedMessageAgeSeconds() {
+        TimeBasedBacklogQuotaCheckResult result = timeBasedBacklogQuotaCheckResult;
+        if (result == null) {
+            return -1;
+        } else {
+            return TimeUnit.MILLISECONDS.toSeconds(
+                    Clock.systemUTC().millis() - result.getPositionPublishTimestampInMillis());
+        }
+    }
+
+    private void updateResultIfNewer(TimeBasedBacklogQuotaCheckResult updatedResult) {
+        TIME_BASED_BACKLOG_QUOTA_CHECK_RESULT_UPDATER.updateAndGet(this,
+                existingResult -> {
+                    if (existingResult == null
+                            || ManagedCursorContainer.DataVersion.compareVersions(
+                                    updatedResult.getDataVersion(), existingResult.getDataVersion()) > 0) {
+                        return updatedResult;
+                    } else {
+                        return existingResult;
+                    }
+                });
+
+    }
+
     /**
      * @return determine if backlog quota enforcement needs to be done for topic based on time limit
      */
     public CompletableFuture<Boolean> checkTimeBacklogExceeded() {
         TopicName topicName = TopicName.get(getName());
         int backlogQuotaLimitInSecond = getBacklogQuota(BacklogQuotaType.message_age).getLimitTime();
+        if (log.isDebugEnabled()) {
+            log.debug("[{}] Time backlog quota = [{}]. Checking if exceeded.", topicName, backlogQuotaLimitInSecond);
+        }
 
-        // If backlog quota by time is not set and we have no durable cursor.
-        if (backlogQuotaLimitInSecond <= 0
-                || ((ManagedCursorContainer) ledger.getCursors()).getSlowestReaderPosition() == null) {
+        // If backlog quota by time is not set
+        if (backlogQuotaLimitInSecond <= 0) {
             return CompletableFuture.completedFuture(false);
         }
 
+        ManagedCursorContainer managedCursorContainer = (ManagedCursorContainer) ledger.getCursors();
+        CursorInfo oldestMarkDeleteCursorInfo = managedCursorContainer.getCursorWithOldestPosition();
+
+        // If we have no durable cursor since `ledger.getCursors()` only managed durable cursors
+        if (oldestMarkDeleteCursorInfo == null
+                || oldestMarkDeleteCursorInfo.getPosition() == null) {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] No durable cursor found. Skipping time based backlog quota check."
+                        + " Oldest mark-delete cursor info: {}", topicName, oldestMarkDeleteCursorInfo);
+            }
+            return CompletableFuture.completedFuture(false);
+        }
+
+        PositionImpl oldestMarkDeletePosition = oldestMarkDeleteCursorInfo.getPosition();
+
+        TimeBasedBacklogQuotaCheckResult lastCheckResult = timeBasedBacklogQuotaCheckResult;
+        if (lastCheckResult != null
+            && oldestMarkDeletePosition.compareTo(lastCheckResult.getOldestCursorMarkDeletePosition()) == 0) {
+
+            // Same position, but the cursor causing it has changed?
+            if (!lastCheckResult.getCursorName().equals(oldestMarkDeleteCursorInfo.getCursor().getName())) {
+                final TimeBasedBacklogQuotaCheckResult updatedResult = new TimeBasedBacklogQuotaCheckResult(
+                        lastCheckResult.getOldestCursorMarkDeletePosition(),
+                        oldestMarkDeleteCursorInfo.getCursor().getName(),
+                        lastCheckResult.getPositionPublishTimestampInMillis(),
+                        oldestMarkDeleteCursorInfo.getVersion());
+
+                updateResultIfNewer(updatedResult);
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Time-based backlog quota check. Updating cached result for position {}, "
+                        + "since cursor causing it has changed from {} to {}",
+                            topicName,
+                            oldestMarkDeletePosition,
+                            lastCheckResult.getCursorName(),
+                            oldestMarkDeleteCursorInfo.getCursor().getName());
+                }
+            }
+
+            long entryTimestamp = lastCheckResult.getPositionPublishTimestampInMillis();
+            boolean expired = MessageImpl.isEntryExpired(backlogQuotaLimitInSecond, entryTimestamp);
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Time based backlog quota check. Using cache result for position {}. "
+                        + "Entry timestamp: {}, expired: {}",
+                        topicName, oldestMarkDeletePosition, entryTimestamp, expired);
+            }
+            return CompletableFuture.completedFuture(expired);
+        }
+
         if (brokerService.pulsar().getConfiguration().isPreciseTimeBasedBacklogQuotaCheck()) {
             CompletableFuture<Boolean> future = new CompletableFuture<>();
             // Check if first unconsumed message(first message after mark delete position)
             // for slowest cursor's has expired.
-            PositionImpl position = ((ManagedLedgerImpl) ledger).getNextValidPosition(((ManagedCursorContainer)
-                    ledger.getCursors()).getSlowestReaderPosition());
+            PositionImpl position = ((ManagedLedgerImpl) ledger).getNextValidPosition(oldestMarkDeletePosition);
             ((ManagedLedgerImpl) ledger).asyncReadEntry(position,
                     new AsyncCallbacks.ReadEntryCallback() {
                         @Override
                         public void readEntryComplete(Entry entry, Object ctx) {
                             try {
                                 long entryTimestamp = Commands.getEntryTimestamp(entry.getDataBuffer());
+
+                                updateResultIfNewer(
+                                        new TimeBasedBacklogQuotaCheckResult(
+                                            oldestMarkDeleteCursorInfo.getPosition(),
+                                            oldestMarkDeleteCursorInfo.getCursor().getName(),
+                                            entryTimestamp,
+                                            oldestMarkDeleteCursorInfo.getVersion()));
+
                                 boolean expired = MessageImpl.isEntryExpired(backlogQuotaLimitInSecond, entryTimestamp);
-                                if (expired && log.isDebugEnabled()) {
-                                    log.debug("Time based backlog quota exceeded, oldest entry in cursor {}'s backlog"
-                                    + "exceeded quota {}", ((ManagedLedgerImpl) ledger).getSlowestConsumer().getName(),
-                                            backlogQuotaLimitInSecond);
+                                if (log.isDebugEnabled()) {
+                                    log.debug("[{}] Time based backlog quota check. Oldest unacked entry read from BK. "
+                                                    + "Oldest entry in cursor {}'s backlog: {}. "
+                                                    + "Oldest mark-delete position: {}. "
+                                                    + "Quota {}. Last check result position [{}]. "
+                                                    + "Expired: {}, entryTimestamp: {}",
+                                            topicName,
+                                            oldestMarkDeleteCursorInfo.getCursor().getName(),
+                                            position,
+                                            oldestMarkDeletePosition,
+                                            backlogQuotaLimitInSecond,
+                                            lastCheckResult.getOldestCursorMarkDeletePosition(),
+                                            expired,
+                                            entryTimestamp);
                                 }
                                 future.complete(expired);
                             } catch (Exception e) {
@@ -3255,9 +3388,19 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
                     }, null);
             return future;
         } else {
-            PositionImpl slowestPosition = ((ManagedCursorContainer) ledger.getCursors()).getSlowestReaderPosition();
             try {
-                return slowestReaderTimeBasedBacklogQuotaCheck(slowestPosition);
+                EstimateTimeBasedBacklogQuotaCheckResult checkResult =
+                        estimatedTimeBasedBacklogQuotaCheck(oldestMarkDeletePosition);
+                if (checkResult.getEstimatedOldestUnacknowledgedMessageTimestamp() != null) {
+                    updateResultIfNewer(
+                            new TimeBasedBacklogQuotaCheckResult(
+                                oldestMarkDeleteCursorInfo.getPosition(),
+                                oldestMarkDeleteCursorInfo.getCursor().getName(),
+                                checkResult.getEstimatedOldestUnacknowledgedMessageTimestamp(),
+                                oldestMarkDeleteCursorInfo.getVersion()));
+                }
+
+                return CompletableFuture.completedFuture(checkResult.isTruncateBacklogToMatchQuota());
             } catch (Exception e) {
                 log.error("[{}][{}] Error reading entry for precise time based backlog check", topicName, e);
                 return CompletableFuture.completedFuture(false);
@@ -3265,33 +3408,47 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         }
     }
 
-    private CompletableFuture<Boolean> slowestReaderTimeBasedBacklogQuotaCheck(PositionImpl slowestPosition)
+    private EstimateTimeBasedBacklogQuotaCheckResult estimatedTimeBasedBacklogQuotaCheck(
+            PositionImpl markDeletePosition)
             throws ExecutionException, InterruptedException {
         int backlogQuotaLimitInSecond = getBacklogQuota(BacklogQuotaType.message_age).getLimitTime();
-        Long ledgerId = slowestPosition.getLedgerId();
-        if (((ManagedLedgerImpl) ledger).getLedgersInfo().lastKey().equals(ledgerId)) {
-            return CompletableFuture.completedFuture(false);
+        ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) ledger;
+
+        // The ledger timestamp is only known when ledger is closed, hence when the mark-delete
+        // is at active ledger (open) we can't estimate it.
+        if (managedLedger.getLedgersInfo().lastKey().equals(markDeletePosition.getLedgerId())) {
+            return new EstimateTimeBasedBacklogQuotaCheckResult(false, null);
         }
-        int result;
+
         org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo
-                ledgerInfo = ledger.getLedgerInfo(ledgerId).get();
-        if (ledgerInfo != null && ledgerInfo.hasTimestamp() && ledgerInfo.getTimestamp() > 0
-                && ((ManagedLedgerImpl) ledger).getClock().millis() - ledgerInfo.getTimestamp()
-                > backlogQuotaLimitInSecond * 1000 && (result = slowestPosition.compareTo(
-                new PositionImpl(ledgerInfo.getLedgerId(), ledgerInfo.getEntries() - 1))) <= 0) {
-            if (result < 0) {
-                if (log.isDebugEnabled()) {
-                    log.debug("Time based backlog quota exceeded, quota {}, age of ledger "
-                                    + "slowest cursor currently on {}", backlogQuotaLimitInSecond * 1000,
-                            ((ManagedLedgerImpl) ledger).getClock().millis() - ledgerInfo.getTimestamp());
-                }
-                return CompletableFuture.completedFuture(true);
-            } else {
-                return slowestReaderTimeBasedBacklogQuotaCheck(
-                        ((ManagedLedgerImpl) ledger).getNextValidPosition(slowestPosition));
+                markDeletePositionLedgerInfo = ledger.getLedgerInfo(markDeletePosition.getLedgerId()).get();
+
+        org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo positionToCheckLedgerInfo =
+                markDeletePositionLedgerInfo;
+
+        // if the mark-delete position is the last entry it means all entries for
+        // that ledger are acknowledged
+        if (markDeletePosition.getEntryId() == markDeletePositionLedgerInfo.getEntries() - 1) {
+            PositionImpl positionToCheck = managedLedger.getNextValidPosition(markDeletePosition);
+            positionToCheckLedgerInfo = ledger.getLedgerInfo(positionToCheck.getLedgerId()).get();
+        }
+
+        if (positionToCheckLedgerInfo != null
+                && positionToCheckLedgerInfo.hasTimestamp()
+                && positionToCheckLedgerInfo.getTimestamp() > 0) {
+            long estimateMsgAgeMs = managedLedger.getClock().millis() - positionToCheckLedgerInfo.getTimestamp();
+            boolean shouldTruncateBacklog = estimateMsgAgeMs > SECONDS.toMillis(backlogQuotaLimitInSecond);
+            if (log.isDebugEnabled()) {
+                log.debug("Time based backlog quota exceeded, quota {}[ms], age of ledger "
+                                + "slowest cursor currently on {}[ms]", backlogQuotaLimitInSecond * 1000,
+                        estimateMsgAgeMs);
             }
+
+            return new EstimateTimeBasedBacklogQuotaCheckResult(
+                    shouldTruncateBacklog,
+                    positionToCheckLedgerInfo.getTimestamp());
         } else {
-            return CompletableFuture.completedFuture(false);
+            return new EstimateTimeBasedBacklogQuotaCheckResult(false, null);
         }
     }
 
@@ -3666,7 +3823,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
             final int timeout = brokerService.pulsar().getConfiguration().getTopicFencingTimeoutSeconds();
             if (timeout > 0) {
                 this.fencedTopicMonitoringTask = brokerService.executor().schedule(this::closeFencedTopicForcefully,
-                        timeout, TimeUnit.SECONDS);
+                        timeout, SECONDS);
             }
         }
     }
@@ -3933,10 +4090,6 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         return transactionBuffer.clearSnapshot().thenCompose(__ -> transactionBuffer.closeAsync());
     }
 
-    public long getLastDataMessagePublishedTimestamp() {
-        return lastDataMessagePublishedTimestamp;
-    }
-
     public Optional<TopicName> getShadowSourceTopic() {
         return Optional.ofNullable(shadowSourceTopic);
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopicMetrics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopicMetrics.java
new file mode 100644
index 00000000000..f79d053a979
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopicMetrics.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+import java.util.concurrent.atomic.LongAdder;
+import lombok.Getter;
+
+@SuppressWarnings("LombokGetterMayBeUsed")
+public class PersistentTopicMetrics {
+
+    @Getter
+    private final BacklogQuotaMetrics backlogQuotaMetrics = new BacklogQuotaMetrics();
+
+    public static class BacklogQuotaMetrics {
+        private final LongAdder timeBasedBacklogQuotaExceededEvictionCount = new LongAdder();
+        private final LongAdder sizeBasedBacklogQuotaExceededEvictionCount = new LongAdder();
+
+        public void recordTimeBasedBacklogEviction() {
+            timeBasedBacklogQuotaExceededEvictionCount.increment();
+        }
+
+        public void recordSizeBasedBacklogEviction() {
+            sizeBasedBacklogQuotaExceededEvictionCount.increment();
+        }
+
+        public long getSizeBasedBacklogQuotaExceededEvictionCount() {
+            return sizeBasedBacklogQuotaExceededEvictionCount.longValue();
+        }
+
+        public long getTimeBasedBacklogQuotaExceededEvictionCount() {
+            return timeBasedBacklogQuotaExceededEvictionCount.longValue();
+        }
+    }
+}
\ No newline at end of file
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedBrokerStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedBrokerStats.java
index 715231d3c6e..037fb29a999 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedBrokerStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedBrokerStats.java
@@ -33,7 +33,10 @@ public class AggregatedBrokerStats {
     public double storageReadRate;
     public double storageReadCacheMissesRate;
     public long msgBacklog;
+    public long sizeBasedBacklogQuotaExceededEvictionCount;
+    public long timeBasedBacklogQuotaExceededEvictionCount;
 
+    @SuppressWarnings("DuplicatedCode")
     void updateStats(TopicStats stats) {
         topicsCount++;
         subscriptionsCount += stats.subscriptionsCount;
@@ -49,8 +52,11 @@ public class AggregatedBrokerStats {
         storageReadRate += stats.managedLedgerStats.storageReadRate;
         storageReadCacheMissesRate += stats.managedLedgerStats.storageReadCacheMissesRate;
         msgBacklog += stats.msgBacklog;
+        timeBasedBacklogQuotaExceededEvictionCount += stats.timeBasedBacklogQuotaExceededEvictionCount;
+        sizeBasedBacklogQuotaExceededEvictionCount += stats.sizeBasedBacklogQuotaExceededEvictionCount;
     }
 
+    @SuppressWarnings("DuplicatedCode")
     public void reset() {
         topicsCount = 0;
         subscriptionsCount = 0;
@@ -66,5 +72,7 @@ public class AggregatedBrokerStats {
         storageReadRate = 0;
         storageReadCacheMissesRate = 0;
         msgBacklog = 0;
+        sizeBasedBacklogQuotaExceededEvictionCount = 0;
+        timeBasedBacklogQuotaExceededEvictionCount = 0;
     }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
index d0dc4fe2a7e..3975cd89cfa 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
@@ -51,6 +51,9 @@ public class AggregatedNamespaceStats {
     long backlogQuotaLimit;
     long backlogQuotaLimitTime;
 
+    public long sizeBasedBacklogQuotaExceededEvictionCount;
+    public long timeBasedBacklogQuotaExceededEvictionCount;
+
     public Map<String, AggregatedReplicationStats> replicationStats = new HashMap<>();
 
     public Map<String, AggregatedSubscriptionStats> subscriptionStats = new HashMap<>();
@@ -68,6 +71,7 @@ public class AggregatedNamespaceStats {
 
     Map<String, TopicMetricBean> bucketDelayedIndexStats = new HashMap<>();
 
+    @SuppressWarnings("DuplicatedCode")
     void updateStats(TopicStats stats) {
         topicsCount++;
 
@@ -105,6 +109,9 @@ public class AggregatedNamespaceStats {
         backlogQuotaLimit = Math.max(backlogQuotaLimit, stats.backlogQuotaLimit);
         backlogQuotaLimitTime = Math.max(backlogQuotaLimitTime, stats.backlogQuotaLimitTime);
 
+        sizeBasedBacklogQuotaExceededEvictionCount += stats.sizeBasedBacklogQuotaExceededEvictionCount;
+        timeBasedBacklogQuotaExceededEvictionCount += stats.timeBasedBacklogQuotaExceededEvictionCount;
+
         managedLedgerStats.storageWriteRate += stats.managedLedgerStats.storageWriteRate;
         managedLedgerStats.storageReadRate += stats.managedLedgerStats.storageReadRate;
         managedLedgerStats.storageReadCacheMissesRate += stats.managedLedgerStats.storageReadCacheMissesRate;
@@ -172,6 +179,7 @@ public class AggregatedNamespaceStats {
         compactionLatencyBuckets.addAll(stats.compactionLatencyBuckets);
     }
 
+    @SuppressWarnings("DuplicatedCode")
     public void reset() {
         managedLedgerStats.reset();
         topicsCount = 0;
@@ -201,6 +209,9 @@ public class AggregatedNamespaceStats {
         replicationStats.clear();
         subscriptionStats.clear();
 
+        sizeBasedBacklogQuotaExceededEvictionCount = 0;
+        timeBasedBacklogQuotaExceededEvictionCount = 0;
+
         compactionRemovedEventCount = 0;
         compactionSucceedCount = 0;
         compactionFailedCount = 0;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
index 4e72fa0d72b..3728c3edd1e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
@@ -32,7 +32,10 @@ import org.apache.commons.lang3.ArrayUtils;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
-import org.apache.pulsar.common.policies.data.BacklogQuota;
+import org.apache.pulsar.broker.service.persistent.PersistentTopicMetrics;
+import org.apache.pulsar.broker.service.persistent.PersistentTopicMetrics.BacklogQuotaMetrics;
+import org.apache.pulsar.broker.stats.prometheus.metrics.PrometheusLabels;
+import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
 import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
 import org.apache.pulsar.common.policies.data.stats.NonPersistentSubscriptionStatsImpl;
 import org.apache.pulsar.common.policies.data.stats.NonPersistentTopicStatsImpl;
@@ -159,14 +162,15 @@ public class NamespaceStatsAggregator {
         subsStats.bucketDelayedIndexStats = subscriptionStats.bucketDelayedIndexStats;
     }
 
+    @SuppressWarnings("OptionalUsedAsFieldOrParameterType")
     private static void getTopicStats(Topic topic, TopicStats stats, boolean includeConsumerMetrics,
                                       boolean includeProducerMetrics, boolean getPreciseBacklog,
                                       boolean subscriptionBacklogSize, Optional<CompactorMXBean> compactorMXBean) {
         stats.reset();
 
-        if (topic instanceof PersistentTopic) {
+        if (topic instanceof PersistentTopic persistentTopic) {
             // Managed Ledger stats
-            ManagedLedger ml = ((PersistentTopic) topic).getManagedLedger();
+            ManagedLedger ml = persistentTopic.getManagedLedger();
             ManagedLedgerMBeanImpl mlStats = (ManagedLedgerMBeanImpl) ml.getStats();
 
             stats.managedLedgerStats.storageSize = mlStats.getStoredMessagesSize();
@@ -174,9 +178,10 @@ public class NamespaceStatsAggregator {
             stats.managedLedgerStats.backlogSize = ml.getEstimatedBacklogSize();
             stats.managedLedgerStats.offloadedStorageUsed = ml.getOffloadedSize();
             stats.backlogQuotaLimit = topic
-                    .getBacklogQuota(BacklogQuota.BacklogQuotaType.destination_storage).getLimitSize();
+                    .getBacklogQuota(BacklogQuotaType.destination_storage).getLimitSize();
             stats.backlogQuotaLimitTime = topic
-                    .getBacklogQuota(BacklogQuota.BacklogQuotaType.message_age).getLimitTime();
+                    .getBacklogQuota(BacklogQuotaType.message_age).getLimitTime();
+            stats.backlogAgeSeconds = topic.getBestEffortOldestUnacknowledgedMessageAgeSeconds();
 
             stats.managedLedgerStats.storageWriteLatencyBuckets
                     .addAll(mlStats.getInternalAddEntryLatencyBuckets());
@@ -191,7 +196,17 @@ public class NamespaceStatsAggregator {
             stats.managedLedgerStats.storageWriteRate = mlStats.getAddEntryMessagesRate();
             stats.managedLedgerStats.storageReadRate = mlStats.getReadEntriesRate();
             stats.managedLedgerStats.storageReadCacheMissesRate = mlStats.getReadEntriesOpsCacheMissesRate();
+
+            // Topic Stats
+            PersistentTopicMetrics persistentTopicMetrics = persistentTopic.getPersistentTopicMetrics();
+
+            BacklogQuotaMetrics backlogQuotaMetrics = persistentTopicMetrics.getBacklogQuotaMetrics();
+            stats.sizeBasedBacklogQuotaExceededEvictionCount =
+                    backlogQuotaMetrics.getSizeBasedBacklogQuotaExceededEvictionCount();
+            stats.timeBasedBacklogQuotaExceededEvictionCount =
+                    backlogQuotaMetrics.getTimeBasedBacklogQuotaExceededEvictionCount();
         }
+
         TopicStatsImpl tStatus = topic.getStats(getPreciseBacklog, subscriptionBacklogSize, false);
         stats.msgInCounter = tStatus.msgInCounter;
         stats.bytesInCounter = tStatus.bytesInCounter;
@@ -334,6 +349,14 @@ public class NamespaceStatsAggregator {
         writeMetric(stream, "pulsar_broker_storage_read_rate", brokerStats.storageReadRate, cluster);
         writeMetric(stream, "pulsar_broker_storage_read_cache_misses_rate",
                 brokerStats.storageReadCacheMissesRate, cluster);
+
+        writePulsarBacklogQuotaMetricBrokerLevel(stream,
+                "pulsar_broker_storage_backlog_quota_exceeded_evictions_total",
+                brokerStats.sizeBasedBacklogQuotaExceededEvictionCount, cluster, BacklogQuotaType.destination_storage);
+        writePulsarBacklogQuotaMetricBrokerLevel(stream,
+                "pulsar_broker_storage_backlog_quota_exceeded_evictions_total",
+                brokerStats.timeBasedBacklogQuotaExceededEvictionCount, cluster, BacklogQuotaType.message_age);
+
         writeMetric(stream, "pulsar_broker_msg_backlog", brokerStats.msgBacklog, cluster);
     }
 
@@ -372,6 +395,7 @@ public class NamespaceStatsAggregator {
                 stats.managedLedgerStats.storageLogicalSize, cluster, namespace);
         writeMetric(stream, "pulsar_storage_backlog_size", stats.managedLedgerStats.backlogSize, cluster,
                 namespace);
+
         writeMetric(stream, "pulsar_storage_offloaded_size",
                 stats.managedLedgerStats.offloadedStorageUsed, cluster, namespace);
 
@@ -392,6 +416,14 @@ public class NamespaceStatsAggregator {
         });
 
         writePulsarMsgBacklog(stream, stats.msgBacklog, cluster, namespace);
+        writePulsarBacklogQuotaMetricNamespaceLevel(stream,
+                "pulsar_storage_backlog_quota_exceeded_evictions_total",
+                stats.sizeBasedBacklogQuotaExceededEvictionCount, cluster, namespace,
+                BacklogQuotaType.destination_storage);
+        writePulsarBacklogQuotaMetricNamespaceLevel(stream,
+                "pulsar_storage_backlog_quota_exceeded_evictions_total",
+                stats.timeBasedBacklogQuotaExceededEvictionCount, cluster, namespace,
+                BacklogQuotaType.message_age);
 
         stats.managedLedgerStats.storageWriteLatencyBuckets.refresh();
         long[] latencyBuckets = stats.managedLedgerStats.storageWriteLatencyBuckets.getBuckets();
@@ -471,6 +503,25 @@ public class NamespaceStatsAggregator {
                 replStats -> replStats.replicationDelayInSeconds, cluster, namespace);
     }
 
+    @SuppressWarnings("SameParameterValue")
+    private static void writePulsarBacklogQuotaMetricBrokerLevel(PrometheusMetricStreams stream, String metricName,
+                                                                 Number value, String cluster,
+                                                                 BacklogQuotaType backlogQuotaType) {
+        String quotaTypeLabelValue = PrometheusLabels.backlogQuotaTypeLabel(backlogQuotaType);
+        stream.writeSample(metricName, value, "cluster", cluster,
+                "quota_type", quotaTypeLabelValue);
+    }
+
+    @SuppressWarnings("SameParameterValue")
+    private static void writePulsarBacklogQuotaMetricNamespaceLevel(PrometheusMetricStreams stream, String metricName,
+                                                                    Number value, String cluster, String namespace,
+                                                                    BacklogQuotaType backlogQuotaType) {
+        String quotaTypeLabelValue = PrometheusLabels.backlogQuotaTypeLabel(backlogQuotaType);
+        stream.writeSample(metricName, value, "cluster", cluster,
+                "namespace", namespace,
+                "quota_type", quotaTypeLabelValue);
+    }
+
     private static void writePulsarMsgBacklog(PrometheusMetricStreams stream, Number value,
                                               String cluster, String namespace) {
         stream.writeSample("pulsar_msg_backlog", value, "cluster", cluster, "namespace", namespace,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
index dda03e3e59d..4be006423f5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
@@ -25,6 +25,8 @@ import java.util.Optional;
 import org.apache.bookkeeper.mledger.util.StatsBuckets;
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.pulsar.broker.service.Consumer;
+import org.apache.pulsar.broker.stats.prometheus.metrics.PrometheusLabels;
+import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
 import org.apache.pulsar.common.policies.data.stats.TopicMetricBean;
 import org.apache.pulsar.compaction.CompactionRecord;
 import org.apache.pulsar.compaction.CompactorMXBean;
@@ -52,6 +54,7 @@ class TopicStats {
 
     long backlogQuotaLimit;
     long backlogQuotaLimitTime;
+    long backlogAgeSeconds;
 
     ManagedLedgerStats managedLedgerStats = new ManagedLedgerStats();
 
@@ -73,6 +76,11 @@ class TopicStats {
 
     Map<String, TopicMetricBean> bucketDelayedIndexStats = new HashMap<>();
 
+    public long sizeBasedBacklogQuotaExceededEvictionCount;
+    public long timeBasedBacklogQuotaExceededEvictionCount;
+
+
+    @SuppressWarnings("DuplicatedCode")
     public void reset() {
         subscriptionsCount = 0;
         producersCount = 0;
@@ -111,8 +119,13 @@ class TopicStats {
         compactionLatencyBuckets.reset();
         delayedMessageIndexSizeInBytes = 0;
         bucketDelayedIndexStats.clear();
+
+        timeBasedBacklogQuotaExceededEvictionCount = 0;
+        sizeBasedBacklogQuotaExceededEvictionCount = 0;
+        backlogAgeSeconds = -1;
     }
 
+    @SuppressWarnings("OptionalUsedAsFieldOrParameterType")
     public static void printTopicStats(PrometheusMetricStreams stream, TopicStats stats,
                                        Optional<CompactorMXBean> compactorMXBean, String cluster, String namespace,
                                        String topic, boolean splitTopicAndPartitionIndexLabel) {
@@ -165,6 +178,14 @@ class TopicStats {
                 cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
         writeMetric(stream, "pulsar_storage_backlog_quota_limit_time", stats.backlogQuotaLimitTime,
                 cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+        writeMetric(stream, "pulsar_storage_backlog_age_seconds", stats.backlogAgeSeconds,
+                cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+        writeBacklogQuotaMetric(stream, "pulsar_storage_backlog_quota_exceeded_evictions_total",
+                stats.sizeBasedBacklogQuotaExceededEvictionCount, cluster, namespace, topic,
+                splitTopicAndPartitionIndexLabel, BacklogQuotaType.destination_storage);
+        writeBacklogQuotaMetric(stream, "pulsar_storage_backlog_quota_exceeded_evictions_total",
+                stats.timeBasedBacklogQuotaExceededEvictionCount, cluster, namespace, topic,
+                splitTopicAndPartitionIndexLabel, BacklogQuotaType.message_age);
 
         writeMetric(stream, "pulsar_delayed_message_index_size_bytes", stats.delayedMessageIndexSizeInBytes,
                 cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
@@ -442,6 +463,17 @@ class TopicStats {
         writeTopicMetric(stream, metricName, value, cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
     }
 
+    @SuppressWarnings("SameParameterValue")
+    private static void writeBacklogQuotaMetric(PrometheusMetricStreams stream, String metricName, Number value,
+                                                String cluster, String namespace, String topic,
+                                                boolean splitTopicAndPartitionIndexLabel,
+                                                BacklogQuotaType backlogQuotaType) {
+
+        String quotaTypeLabelValue = PrometheusLabels.backlogQuotaTypeLabel(backlogQuotaType);
+        writeTopicMetric(stream, metricName, value, cluster, namespace, topic, splitTopicAndPartitionIndexLabel,
+                "quota_type", quotaTypeLabelValue);
+    }
+
     private static void writeMetric(PrometheusMetricStreams stream, String metricName, Number value, String cluster,
                                     String namespace, String topic, String remoteCluster,
                                     boolean splitTopicAndPartitionIndexLabel) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusLabels.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusLabels.java
new file mode 100644
index 00000000000..9a2c5207314
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusLabels.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats.prometheus.metrics;
+
+import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
+
+public class PrometheusLabels {
+
+    public static String backlogQuotaTypeLabel(BacklogQuotaType backlogQuotaType) {
+        if (backlogQuotaType == BacklogQuotaType.message_age) {
+            return "time";
+        } else /* destination_storage */ {
+            return "size";
+        }
+    }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
index 3c829b02cb8..e24fb493b95 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
@@ -18,6 +18,13 @@
  */
 package org.apache.pulsar.broker.service;
 
+import static java.util.Map.entry;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType.destination_storage;
+import static org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType.message_age;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.within;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
@@ -33,15 +40,18 @@ import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import lombok.Cleanup;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient;
+import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.Metric;
+import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.Metrics;
 import org.apache.pulsar.client.admin.GetStatsOptions;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -73,6 +83,9 @@ import org.testng.annotations.Test;
 
 @Test(groups = "broker")
 public class BacklogQuotaManagerTest {
+    private static final Logger log = LoggerFactory.getLogger(BacklogQuotaManagerTest.class);
+
+    public static final String CLUSTER_NAME = "usc";
     PulsarService pulsar;
     ServiceConfiguration config;
 
@@ -80,6 +93,7 @@ public class BacklogQuotaManagerTest {
     PulsarAdmin admin;
 
     LocalBookkeeperEnsemble bkEnsemble;
+    PrometheusMetricsClient prometheusMetricsClient;
 
     private static final int TIME_TO_CHECK_BACKLOG_QUOTA = 2;
     private static final int MAX_ENTRIES_PER_LEDGER = 5;
@@ -117,7 +131,7 @@ public class BacklogQuotaManagerTest {
             config.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
             config.setAdvertisedAddress("localhost");
             config.setWebServicePort(Optional.of(0));
-            config.setClusterName("usc");
+            config.setClusterName(CLUSTER_NAME);
             config.setBrokerShutdownTimeoutMs(0L);
             config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
             config.setBrokerServicePort(Optional.of(0));
@@ -136,6 +150,7 @@ public class BacklogQuotaManagerTest {
 
             adminUrl = new URL("http://127.0.0.1" + ":" + pulsar.getListenPortHTTP().get());
             admin = PulsarAdmin.builder().serviceHttpUrl(adminUrl.toString()).build();
+            prometheusMetricsClient = new PrometheusMetricsClient("127.0.0.1", pulsar.getListenPortHTTP().get());
 
             admin.clusters().createCluster("usc", ClusterData.builder().serviceUrl(adminUrl.toString()).build());
             admin.tenants().createTenant("prop",
@@ -190,7 +205,7 @@ public class BacklogQuotaManagerTest {
     }
 
     /**
-     * Readers should not effect backlog quota
+     * Readers should not affect backlog quota
      */
     @Test
     public void testBacklogQuotaWithReader() throws Exception {
@@ -202,18 +217,18 @@ public class BacklogQuotaManagerTest {
                         .limitTime(TIME_TO_CHECK_BACKLOG_QUOTA)
                         .retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception)
                         .build());
-        try (PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0, TimeUnit.SECONDS).build();) {
+        try (PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0, SECONDS).build()) {
             final String topic1 = "persistent://prop/ns-quota/topic1" + UUID.randomUUID();
             final int numMsgs = 20;
 
             Reader<byte[]> reader = client.newReader().topic(topic1).receiverQueueSize(1).startMessageId(MessageId.latest).create();
 
-            org.apache.pulsar.client.api.Producer<byte[]> producer = createProducer(client, topic1);
+            Producer<byte[]> producer = createProducer(client, topic1);
 
             byte[] content = new byte[1024];
             for (int i = 0; i < numMsgs; i++) {
                 content[0] = (byte) (content[0] + 1);
-                MessageId msgId = producer.send(content);
+                producer.send(content);
             }
 
             Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA + 1) * 1000);
@@ -262,7 +277,7 @@ public class BacklogQuotaManagerTest {
             // check reader can still read with out error
 
             while (true) {
-                Message<byte[]> msg = reader.readNext(5, TimeUnit.SECONDS);
+                Message<byte[]> msg = reader.readNext(5, SECONDS);
                 if (msg == null) {
                     break;
                 }
@@ -287,10 +302,11 @@ public class BacklogQuotaManagerTest {
                         .limitTime(TIME_TO_CHECK_BACKLOG_QUOTA)
                         .retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception)
                         .build());
-        try (PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0, TimeUnit.SECONDS).build();) {
+        try (PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0, SECONDS).build();) {
             final String topic1 = "persistent://prop/ns-quota/topic1" + UUID.randomUUID();
             final int numMsgs = 20;
             Reader<byte[]> reader = client.newReader().topic(topic1).receiverQueueSize(1).startMessageId(MessageId.latest).create();
+
             Producer<byte[]> producer = createProducer(client, topic1);
             byte[] content = new byte[1024];
             for (int i = 0; i < numMsgs; i++) {
@@ -327,13 +343,13 @@ public class BacklogQuotaManagerTest {
                 // check there is only one ledger left
                 assertEquals(internalStats.ledgers.size(), 1);
 
-                // check if its the expected ledger id given MAX_ENTRIES_PER_LEDGER
+                // check if it's the expected ledger id given MAX_ENTRIES_PER_LEDGER
                 assertEquals(internalStats.ledgers.get(0).ledgerId, finalMessageId.getLedgerId());
             });
-            // check reader can still read with out error
+            // check reader can still read without error
 
             while (true) {
-                Message<byte[]> msg = reader.readNext(5, TimeUnit.SECONDS);
+                Message<byte[]> msg = reader.readNext(5, SECONDS);
                 if (msg == null) {
                     break;
                 }
@@ -344,6 +360,280 @@ public class BacklogQuotaManagerTest {
         }
     }
 
+    @Test
+    public void backlogsStatsPrecise() throws PulsarAdminException, PulsarClientException, InterruptedException {
+        config.setPreciseTimeBasedBacklogQuotaCheck(true);
+        final String namespace = "prop/ns-quota";
+        assertEquals(admin.namespaces().getBacklogQuotaMap(namespace), new HashMap<>());
+        final int sizeLimitBytes = 15 * 1024 * 1024;
+        final int timeLimitSeconds = 123;
+        admin.namespaces().setBacklogQuota(
+                namespace,
+                BacklogQuota.builder()
+                        .limitSize(sizeLimitBytes)
+                        .retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception)
+                        .build(),
+                destination_storage);
+        admin.namespaces().setBacklogQuota(
+                namespace,
+                BacklogQuota.builder()
+                        .limitTime(timeLimitSeconds)
+                        .retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception)
+                        .build(),
+                message_age);
+
+        try (PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString())
+                .statsInterval(0, SECONDS).build()) {
+            final String topic1 = "persistent://prop/ns-quota/topic2" + UUID.randomUUID();
+
+            final String subName1 = "c1";
+            final String subName2 = "c2";
+            final int numMsgs = 4;
+
+            Consumer<byte[]> consumer1 = client.newConsumer().topic(topic1).subscriptionName(subName1)
+                    .acknowledgmentGroupTime(0, SECONDS)
+                    .subscribe();
+            Consumer<byte[]> consumer2 = client.newConsumer().topic(topic1).subscriptionName(subName2)
+                    .acknowledgmentGroupTime(0, SECONDS)
+                    .subscribe();
+            Producer<byte[]> producer = createProducer(client, topic1);
+
+            byte[] content = new byte[1024];
+            for (int i = 0; i < numMsgs; i++) {
+                Thread.sleep(3000); // Guarantees if we use wrong message in age, to show up in failed test
+                producer.send(content);
+            }
+
+            String c1MarkDeletePositionBefore =
+                    admin.topics().getInternalStats(topic1).cursors.get(subName1).markDeletePosition;
+
+            // Move subscription 1, one message, such that subscription 2 is the oldest
+            //  S2  S1
+            //  0   1
+            Message<byte[]> oldestMessage = consumer1.receive();
+            consumer1.acknowledge(oldestMessage);
+
+            log.info("Subscription 1 moved 1 message. Now subscription 2 is the oldest. Oldest message:"+
+                    oldestMessage.getMessageId());
+
+            c1MarkDeletePositionBefore = waitForMarkDeletePositionToChange(topic1, subName1, c1MarkDeletePositionBefore);
+            waitForQuotaCheckToRunTwice();
+
+            Metrics metrics = prometheusMetricsClient.getMetrics();
+            TopicStats topicStats = getTopicStats(topic1);
+
+            assertThat(topicStats.getBacklogQuotaLimitSize()).isEqualTo(sizeLimitBytes);
+            assertThat(topicStats.getBacklogQuotaLimitTime()).isEqualTo(timeLimitSeconds);
+            long expectedMessageAgeSeconds = MILLISECONDS.toSeconds(System.currentTimeMillis() - oldestMessage.getPublishTime());
+            assertThat(topicStats.getOldestBacklogMessageAgeSeconds())
+                    .isCloseTo(expectedMessageAgeSeconds, within(1L));
+            assertThat(topicStats.getOldestBacklogMessageSubscriptionName()).isEqualTo(subName2);
+
+            Metric backlogAgeMetric =
+                    metrics.findSingleMetricByNameAndLabels("pulsar_storage_backlog_age_seconds",
+                            Pair.of("topic", topic1));
+            assertThat(backlogAgeMetric.tags).containsExactly(
+                    entry("cluster", CLUSTER_NAME),
+                    entry("namespace", namespace),
+                    entry("topic", topic1));
+            assertThat((long) backlogAgeMetric.value).isCloseTo(expectedMessageAgeSeconds, within(2L));
+
+            // Move subscription 2 away from being the oldest mark delete
+            //     S2/S1
+            //  0   1
+            Message<byte[]> firstOldestMessage = consumer2.receive();
+            consumer2.acknowledge(firstOldestMessage);
+            // We only read and not ack, since we just need its publish-timestamp for later assert
+            Message<byte[]> secondOldestMessage = consumer2.receive();
+
+            // Switch subscription 1 to be where subscription 2 was in terms of oldest mark delete
+            //  S1  S2
+            //  0   1
+            consumer1.seek(MessageId.earliest);
+
+            log.info("Subscription 1 moved to be the oldest");
+
+            c1MarkDeletePositionBefore = waitForMarkDeletePositionToChange(topic1, subName1, c1MarkDeletePositionBefore);
+            waitForQuotaCheckToRunTwice();
+
+            metrics = prometheusMetricsClient.getMetrics();
+            long actualAge = (long) metrics.findByNameAndLabels(
+                    "pulsar_storage_backlog_age_seconds", "topic", topic1)
+                    .get(0).value;
+
+            expectedMessageAgeSeconds = MILLISECONDS.toSeconds(System.currentTimeMillis() - oldestMessage.getPublishTime());
+            assertThat(actualAge).isCloseTo(expectedMessageAgeSeconds, within(2L));
+
+            topicStats = getTopicStats(topic1);
+            assertThat(topicStats.getOldestBacklogMessageSubscriptionName()).isEqualTo(subName1);
+
+            long entriesReadBefore = getReadEntries(topic1);
+
+            // Move subscription 1 passed subscription 2
+            for (int i = 0; i < 3; i++) {
+                Message<byte[]> message = consumer1.receive();
+                log.info("Subscription 1 about to ack message ID {}", message.getMessageId());
+                consumer1.acknowledge(message);
+            }
+
+            log.info("Subscription 1 moved 3 messages. Now subscription 2 is the oldest");
+            waitForMarkDeletePositionToChange(topic1, subName1, c1MarkDeletePositionBefore);
+            waitForQuotaCheckToRunTwice();
+
+            // Cache shouldn't be used, since position has changed
+            long readEntries = getReadEntries(topic1);
+            assertThat(readEntries).isGreaterThan(entriesReadBefore);
+
+            topicStats = getTopicStats(topic1);
+            expectedMessageAgeSeconds = MILLISECONDS.toSeconds(System.currentTimeMillis() - secondOldestMessage.getPublishTime());
+            assertThat(topicStats.getOldestBacklogMessageAgeSeconds()).isCloseTo(expectedMessageAgeSeconds, within(2L));
+            assertThat(topicStats.getOldestBacklogMessageSubscriptionName()).isEqualTo(subName2);
+
+            waitForQuotaCheckToRunTwice();
+
+            // Cache should be used, since position hasn't changed
+            assertThat(getReadEntries(topic1)).isEqualTo(readEntries);
+        }
+    }
+
+    private long getReadEntries(String topic1) {
+        return ((PersistentTopic) pulsar.getBrokerService().getTopicReference(topic1).get())
+                .getManagedLedger().getStats().getEntriesReadTotalCount();
+    }
+
+    @Test
+    public void backlogsStatsNotPrecise() throws PulsarAdminException, PulsarClientException, InterruptedException {
+        config.setPreciseTimeBasedBacklogQuotaCheck(false);
+        final String namespace = "prop/ns-quota";
+        assertEquals(admin.namespaces().getBacklogQuotaMap(namespace), new HashMap<>());
+        final int sizeLimitBytes = 15 * 1024 * 1024;
+        final int timeLimitSeconds = 123;
+        admin.namespaces().setBacklogQuota(
+                namespace,
+                BacklogQuota.builder()
+                        .limitSize(sizeLimitBytes)
+                        .retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception)
+                        .build(),
+                destination_storage);
+        admin.namespaces().setBacklogQuota(
+                namespace,
+                BacklogQuota.builder()
+                        .limitTime(timeLimitSeconds)
+                        .retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception)
+                        .build(),
+                message_age);
+
+        try (PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString())
+                .statsInterval(0, SECONDS).build()) {
+            final String topic1 = "persistent://prop/ns-quota/topic2" + UUID.randomUUID();
+
+            final String subName1 = "brandNewC1";
+            final String subName2 = "brandNewC2";
+            final int numMsgs = 5;
+
+            Consumer<byte[]> consumer1 = client.newConsumer().topic(topic1).subscriptionName(subName1)
+                    .acknowledgmentGroupTime(0, SECONDS)
+                    .isAckReceiptEnabled(true)
+                    .subscribe();
+            Consumer<byte[]> consumer2 = client.newConsumer().topic(topic1).subscriptionName(subName2)
+                    .acknowledgmentGroupTime(0, SECONDS)
+                    .isAckReceiptEnabled(true)
+                    .subscribe();
+            Producer<byte[]> producer = createProducer(client, topic1);
+
+            byte[] content = new byte[1024];
+            for (int i = 0; i < numMsgs; i++) {
+                Thread.sleep(500);
+                producer.send(content);
+            }
+
+            String c1MarkDeletePositionBefore =
+                    admin.topics().getInternalStats(topic1).cursors.get(subName1).markDeletePosition;
+
+            consumer1.acknowledge(consumer1.receive());
+            log.info("Moved subscription 1, by 1 message");
+            c1MarkDeletePositionBefore = waitForMarkDeletePositionToChange(topic1, subName1, c1MarkDeletePositionBefore);
+            waitForQuotaCheckToRunTwice();
+
+            TopicStats topicStats = getTopicStats(topic1);
+
+            // We have only one ledger, and it is not closed yet, so we can't tell the age until it is closed
+            assertThat(topicStats.getOldestBacklogMessageSubscriptionName()).isNull();
+
+            Metrics metrics = prometheusMetricsClient.getMetrics();
+            Metric backlogAgeMetric =
+                    metrics.findSingleMetricByNameAndLabels("pulsar_storage_backlog_age_seconds",
+                            Pair.of("topic", topic1));
+            assertThat(backlogAgeMetric.value).isEqualTo(-1);
+
+            unloadAndLoadTopic(topic1, producer);
+            long unloadTime = System.currentTimeMillis();
+
+            waitForQuotaCheckToRunTwice();
+
+            topicStats = getTopicStats(topic1);
+            assertThat(topicStats.getOldestBacklogMessageSubscriptionName()).isEqualTo(subName2);
+            // age is measured against the ledger closing time
+            long expectedAge = MILLISECONDS.toSeconds(System.currentTimeMillis() - unloadTime);
+            assertThat(topicStats.getOldestBacklogMessageAgeSeconds()).isCloseTo(expectedAge, within(1L));
+
+            String c2MarkDeletePositionBefore =
+                    admin.topics().getInternalStats(topic1).cursors.get(subName2).markDeletePosition;
+            Message<byte[]> message;
+            for (int i = 0; i < numMsgs-1; i++) {
+                consumer1.acknowledge(consumer1.receive());
+                message = consumer2.receive();
+                consumer2.acknowledge(message);
+            }
+            // At this point subscription 2 is the oldest
+
+            waitForMarkDeletePositionToChange(topic1, subName1, c1MarkDeletePositionBefore);
+            waitForMarkDeletePositionToChange(topic1, subName2, c2MarkDeletePositionBefore);
+            waitForQuotaCheckToRunTwice();
+
+            topicStats = getTopicStats(topic1);
+            assertThat(topicStats.getOldestBacklogMessageSubscriptionName()).isEqualTo(subName2);
+            expectedAge = MILLISECONDS.toSeconds(System.currentTimeMillis() - unloadTime);
+            assertThat(topicStats.getOldestBacklogMessageAgeSeconds()).isCloseTo(expectedAge, within(1L));
+        }
+    }
+
+    private void unloadAndLoadTopic(String topic, Producer producer) throws PulsarAdminException,
+            PulsarClientException {
+        admin.topics().unload(topic);
+        // This will load the topic
+        producer.send("Bla".getBytes());
+        Awaitility.await().pollInterval(100, MILLISECONDS).atMost(5, SECONDS)
+                .until(() -> admin.topics().getInternalStats(topic).numberOfEntries > 0);
+    }
+
+    private void waitForQuotaCheckToRunTwice() {
+        final long initialQuotaCheckCount = getQuotaCheckCount();
+        Awaitility.await()
+                .pollInterval(1, SECONDS)
+                .atMost(TIME_TO_CHECK_BACKLOG_QUOTA*3, SECONDS)
+                .until(() -> getQuotaCheckCount() > initialQuotaCheckCount + 1);
+    }
+
+    /**
+     * @return The new mark delete position
+     */
+    private String waitForMarkDeletePositionToChange(String topic,
+                                                     String subscriptionName,
+                                                     String previousMarkDeletePosition) {
+        return Awaitility.await().pollInterval(1, SECONDS).atMost(5, SECONDS).until(
+            () -> admin.topics().getInternalStats(topic).cursors.get(subscriptionName).markDeletePosition,
+            markDeletePosition -> markDeletePosition != null && !markDeletePosition.equals(previousMarkDeletePosition));
+    }
+
+    private long getQuotaCheckCount() {
+        Metrics metrics = prometheusMetricsClient.getMetrics();
+        return (long) metrics.findByNameAndLabels(
+                        "pulsar_storage_backlog_quota_check_duration_seconds_count",
+                        "cluster", CLUSTER_NAME)
+                .get(0).value;
+    }
+
     /**
      * Time based backlog quota won't affect reader since broker doesn't keep track of consuming position for reader
      * and can't do message age check against the quota.
@@ -359,7 +649,7 @@ public class BacklogQuotaManagerTest {
                         .limitTime(TIME_TO_CHECK_BACKLOG_QUOTA)
                         .retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception)
                         .build());
-        try (PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0, TimeUnit.SECONDS).build();) {
+        try (PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0, SECONDS).build();) {
             final String topic1 = "persistent://prop/ns-quota/topic2" + UUID.randomUUID();
             final int numMsgs = 9;
             Reader<byte[]> reader = client.newReader().topic(topic1).receiverQueueSize(1).startMessageId(MessageId.latest).create();
@@ -405,7 +695,7 @@ public class BacklogQuotaManagerTest {
 
             // check reader can still read without error
             while (true) {
-                Message<byte[]> msg = reader.readNext(5, TimeUnit.SECONDS);
+                Message<byte[]> msg = reader.readNext(5, SECONDS);
                 if (msg == null) {
                     break;
                 }
@@ -426,7 +716,7 @@ public class BacklogQuotaManagerTest {
                         .retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction)
                         .build());
         @Cleanup
-        PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0, TimeUnit.SECONDS)
+        PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0, SECONDS)
                 .build();
 
         final String topic1 = "persistent://prop/ns-quota/topic2" + UUID.randomUUID();
@@ -436,7 +726,7 @@ public class BacklogQuotaManagerTest {
 
         Consumer<byte[]> consumer1 = client.newConsumer().topic(topic1).subscriptionName(subName1).subscribe();
         Consumer<byte[]> consumer2 = client.newConsumer().topic(topic1).subscriptionName(subName2).subscribe();
-        org.apache.pulsar.client.api.Producer<byte[]> producer = createProducer(client, topic1);
+        Producer<byte[]> producer = createProducer(client, topic1);
         byte[] content = new byte[1024];
         for (int i = 0; i < numMsgs; i++) {
             producer.send(content);
@@ -449,6 +739,8 @@ public class BacklogQuotaManagerTest {
 
         TopicStats stats = getTopicStats(topic1);
         assertTrue(stats.getBacklogSize() < 10 * 1024, "Storage size is [" + stats.getStorageSize() + "]");
+        assertThat(evictionCountMetric("prop/ns-quota", topic1, "size")).isEqualTo(1);
+        assertThat(evictionCountMetric("size")).isEqualTo(1);
     }
 
     @Test
@@ -459,10 +751,10 @@ public class BacklogQuotaManagerTest {
                 BacklogQuota.builder()
                         .limitTime(TIME_TO_CHECK_BACKLOG_QUOTA)
                         .retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction)
-                        .build(), BacklogQuota.BacklogQuotaType.message_age);
+                        .build(), message_age);
         config.setPreciseTimeBasedBacklogQuotaCheck(true);
         @Cleanup
-        PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0, TimeUnit.SECONDS)
+        PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0, SECONDS)
                 .build();
 
         final String topic1 = "persistent://prop/ns-quota/topic3" + UUID.randomUUID();
@@ -472,7 +764,7 @@ public class BacklogQuotaManagerTest {
 
         Consumer<byte[]> consumer1 = client.newConsumer().topic(topic1).subscriptionName(subName1).subscribe();
         Consumer<byte[]> consumer2 = client.newConsumer().topic(topic1).subscriptionName(subName2).subscribe();
-        org.apache.pulsar.client.api.Producer<byte[]> producer = createProducer(client, topic1);
+        Producer<byte[]> producer = createProducer(client, topic1);
         byte[] content = new byte[1024];
         for (int i = 0; i < numMsgs; i++) {
             producer.send(content);
@@ -491,8 +783,32 @@ public class BacklogQuotaManagerTest {
         // All messages for both subscription should be cleaned up from backlog by backlog monitor task.
         assertEquals(stats.getSubscriptions().get(subName1).getMsgBacklog(), 0);
         assertEquals(stats.getSubscriptions().get(subName2).getMsgBacklog(), 0);
+        assertThat(evictionCountMetric("prop/ns-quota", topic1, "time")).isEqualTo(1);
+        assertThat(evictionCountMetric("time")).isEqualTo(1);
     }
 
+    @SuppressWarnings("SameParameterValue")
+    private long evictionCountMetric(String namespace, String topic, String quotaType) {
+        Metrics metrics = prometheusMetricsClient.getMetrics();
+        Metric topicEvictionsTotal = metrics.findSingleMetricByNameAndLabels(
+                "pulsar_storage_backlog_quota_exceeded_evictions_total",
+                Pair.of("topic", topic),
+                Pair.of("quota_type",  quotaType),
+                Pair.of("namespace", namespace),
+                Pair.of("cluster", CLUSTER_NAME));
+        return (long) topicEvictionsTotal.value;
+    }
+
+    private long evictionCountMetric(String quotaType) {
+        Metrics metrics = prometheusMetricsClient.getMetrics();
+        Metric topicEvictionsTotal = metrics.findSingleMetricByNameAndLabels(
+                "pulsar_broker_storage_backlog_quota_exceeded_evictions_total",
+                Pair.of("quota_type",  quotaType),
+                Pair.of("cluster", CLUSTER_NAME));
+        return (long) topicEvictionsTotal.value;
+    }
+
+
     @Test(timeOut = 60000)
     public void testConsumerBacklogEvictionTimeQuota() throws Exception {
         assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"),
@@ -501,9 +817,9 @@ public class BacklogQuotaManagerTest {
                 BacklogQuota.builder()
                         .limitTime(TIME_TO_CHECK_BACKLOG_QUOTA)
                         .retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction)
-                        .build(), BacklogQuota.BacklogQuotaType.message_age);
+                        .build(), message_age);
         @Cleanup
-        PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0, TimeUnit.SECONDS)
+        PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0, SECONDS)
                 .build();
 
         final String topic1 = "persistent://prop/ns-quota/topic3" + UUID.randomUUID();
@@ -513,7 +829,7 @@ public class BacklogQuotaManagerTest {
 
         Consumer<byte[]> consumer1 = client.newConsumer().topic(topic1).subscriptionName(subName1).subscribe();
         Consumer<byte[]> consumer2 = client.newConsumer().topic(topic1).subscriptionName(subName2).subscribe();
-        org.apache.pulsar.client.api.Producer<byte[]> producer = createProducer(client, topic1);
+        Producer<byte[]> producer = createProducer(client, topic1);
         byte[] content = new byte[1024];
         for (int i = 0; i < numMsgs; i++) {
             producer.send(content);
@@ -551,9 +867,9 @@ public class BacklogQuotaManagerTest {
                 BacklogQuota.builder()
                         .limitTime(5) // set limit time as 5 seconds
                         .retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction)
-                        .build(), BacklogQuota.BacklogQuotaType.message_age);
+                        .build(), message_age);
         @Cleanup
-        PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0, TimeUnit.SECONDS)
+        PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0, SECONDS)
                 .build();
 
         final String topic1 = "persistent://prop/ns-quota/topic3" + UUID.randomUUID();
@@ -563,7 +879,7 @@ public class BacklogQuotaManagerTest {
 
         Consumer<byte[]> consumer1 = client.newConsumer().topic(topic1).subscriptionName(subName1).subscribe();
         Consumer<byte[]> consumer2 = client.newConsumer().topic(topic1).subscriptionName(subName2).subscribe();
-        org.apache.pulsar.client.api.Producer<byte[]> producer = createProducer(client, topic1);
+        Producer<byte[]> producer = createProducer(client, topic1);
         byte[] content = new byte[1024];
         for (int i = 0; i < numMsgs; i++) {
             producer.send(content);
@@ -605,17 +921,17 @@ public class BacklogQuotaManagerTest {
                 BacklogQuota.builder()
                         .limitTime(TIME_TO_CHECK_BACKLOG_QUOTA)
                         .retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction)
-                        .build(), BacklogQuota.BacklogQuotaType.message_age);
+                        .build(), message_age);
         config.setPreciseTimeBasedBacklogQuotaCheck(true);
         @Cleanup
-        PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0, TimeUnit.SECONDS)
+        PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0, SECONDS)
                 .build();
 
         final String topic = "persistent://prop/ns-quota/topic4" + UUID.randomUUID();
         final String subName = "c1";
 
         Consumer<byte[]> consumer = client.newConsumer().topic(topic).subscriptionName(subName).subscribe();
-        org.apache.pulsar.client.api.Producer<byte[]> producer = createProducer(client, topic);
+        Producer<byte[]> producer = createProducer(client, topic);
         producer.send(new byte[1024]);
         consumer.receive();
 
@@ -663,7 +979,7 @@ public class BacklogQuotaManagerTest {
 
         Consumer<byte[]> consumer1 = client.newConsumer().topic(topic1).subscriptionName(subName1).subscribe();
         Consumer<byte[]> consumer2 = client.newConsumer().topic(topic1).subscriptionName(subName2).subscribe();
-        org.apache.pulsar.client.api.Producer<byte[]> producer = createProducer(client, topic1);
+        Producer<byte[]> producer = createProducer(client, topic1);
         byte[] content = new byte[1024];
         for (int i = 0; i < numMsgs; i++) {
             producer.send(content);
@@ -687,7 +1003,7 @@ public class BacklogQuotaManagerTest {
                 BacklogQuota.builder()
                         .limitTime(TIME_TO_CHECK_BACKLOG_QUOTA)
                         .retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction)
-                        .build(), BacklogQuota.BacklogQuotaType.message_age);
+                        .build(), message_age);
         config.setPreciseTimeBasedBacklogQuotaCheck(true);
         @Cleanup
         PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).build();
@@ -699,7 +1015,7 @@ public class BacklogQuotaManagerTest {
 
         Consumer<byte[]> consumer1 = client.newConsumer().topic(topic1).subscriptionName(subName1).subscribe();
         Consumer<byte[]> consumer2 = client.newConsumer().topic(topic1).subscriptionName(subName2).subscribe();
-        org.apache.pulsar.client.api.Producer<byte[]> producer = createProducer(client, topic1);
+        Producer<byte[]> producer = createProducer(client, topic1);
         byte[] content = new byte[1024];
 
         for (int i = 0; i < numMsgs; i++) {
@@ -737,7 +1053,7 @@ public class BacklogQuotaManagerTest {
             throws PulsarClientException {
         return client.newProducer()
                 .enableBatching(false)
-                .sendTimeout(2, TimeUnit.SECONDS)
+                .sendTimeout(2, SECONDS)
                 .topic(topic)
                 .create();
     }
@@ -756,7 +1072,7 @@ public class BacklogQuotaManagerTest {
 
         Consumer<byte[]> consumer1 = client.newConsumer().topic(topic1).subscriptionName(subName1).subscribe();
         Consumer<byte[]> consumer2 = client.newConsumer().topic(topic1).subscriptionName(subName2).subscribe();
-        org.apache.pulsar.client.api.Producer<byte[]> producer = createProducer(client, topic1);
+        Producer<byte[]> producer = createProducer(client, topic1);
         byte[] content = new byte[1024];
 
         List<Message<byte[]>> messagesToAcknowledge = new ArrayList<>();
@@ -797,7 +1113,7 @@ public class BacklogQuotaManagerTest {
                 BacklogQuota.builder()
                         .limitTime(2 * TIME_TO_CHECK_BACKLOG_QUOTA)
                         .retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction)
-                        .build(), BacklogQuota.BacklogQuotaType.message_age);
+                        .build(), message_age);
 
         Awaitility.await()
                 .pollInterval(Duration.ofSeconds(1))
@@ -831,10 +1147,10 @@ public class BacklogQuotaManagerTest {
         final CountDownLatch counter = new CountDownLatch(2);
         final AtomicBoolean gotException = new AtomicBoolean(false);
         @Cleanup
-        PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0, TimeUnit.SECONDS)
+        PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0, SECONDS)
                 .build();
         @Cleanup
-        PulsarClient client2 = PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0, TimeUnit.SECONDS)
+        PulsarClient client2 = PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0, SECONDS)
                 .build();
         Consumer<byte[]> consumer1 = client2.newConsumer().topic(topic1).subscriptionName(subName1).subscribe();
         Consumer<byte[]> consumer2 = client2.newConsumer().topic(topic1).subscriptionName(subName2).subscribe();
@@ -874,7 +1190,7 @@ public class BacklogQuotaManagerTest {
         consumerThread.start();
 
         // test hangs without timeout since there is nothing to consume due to eviction
-        counter.await(20, TimeUnit.SECONDS);
+        counter.await(20, SECONDS);
         assertFalse(gotException.get());
         Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA + 1) * 1000);
         rolloverStats();
@@ -903,13 +1219,13 @@ public class BacklogQuotaManagerTest {
         final AtomicBoolean gotException = new AtomicBoolean(false);
         @Cleanup
         final PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString())
-                .statsInterval(0, TimeUnit.SECONDS).build();
+                .statsInterval(0, SECONDS).build();
 
         final Consumer<byte[]> consumer1 = client.newConsumer().topic(topic1).subscriptionName(subName1).subscribe();
         final Consumer<byte[]> consumer2 = client.newConsumer().topic(topic1).subscriptionName(subName2).subscribe();
         @Cleanup
         final PulsarClient client2 = PulsarClient.builder().serviceUrl(adminUrl.toString())
-                .statsInterval(0, TimeUnit.SECONDS).build();
+                .statsInterval(0, SECONDS).build();
 
         Thread producerThread = new Thread(() -> {
             try {
@@ -967,16 +1283,16 @@ public class BacklogQuotaManagerTest {
         final AtomicBoolean gotException = new AtomicBoolean(false);
         @Cleanup
         final PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString())
-                .statsInterval(0, TimeUnit.SECONDS).build();
+                .statsInterval(0, SECONDS).build();
 
         final Consumer<byte[]> consumer1 = client.newConsumer().topic(topic1).subscriptionName(subName1).subscribe();
         final Consumer<byte[]> consumer2 = client.newConsumer().topic(topic1).subscriptionName(subName2).subscribe();
         @Cleanup
         final PulsarClient client3 = PulsarClient.builder().serviceUrl(adminUrl.toString())
-                .statsInterval(0, TimeUnit.SECONDS).build();
+                .statsInterval(0, SECONDS).build();
         @Cleanup
         final PulsarClient client2 = PulsarClient.builder().serviceUrl(adminUrl.toString())
-                .statsInterval(0, TimeUnit.SECONDS).build();
+                .statsInterval(0, SECONDS).build();
 
         Thread producerThread1 = new Thread(() -> {
             try {
@@ -1040,7 +1356,7 @@ public class BacklogQuotaManagerTest {
         producerThread2.start();
         consumerThread1.start();
         consumerThread2.start();
-        counter.await(20, TimeUnit.SECONDS);
+        counter.await(20, SECONDS);
         assertFalse(gotException.get());
         Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA + 1) * 1000);
         rolloverStats();
@@ -1060,7 +1376,7 @@ public class BacklogQuotaManagerTest {
                         .build());
         @Cleanup
         final PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString())
-                .statsInterval(0, TimeUnit.SECONDS).build();
+                .statsInterval(0, SECONDS).build();
         final String topic1 = "persistent://prop/quotahold/hold";
         final String subName1 = "c1hold";
         final int numMsgs = 10;
@@ -1102,7 +1418,7 @@ public class BacklogQuotaManagerTest {
                         .build());
         @Cleanup
         final PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString())
-                .statsInterval(0, TimeUnit.SECONDS).build();
+                .statsInterval(0, SECONDS).build();
         final String topic1 = "persistent://prop/quotahold/holdtimeout";
         final String subName1 = "c1holdtimeout";
         boolean gotException = false;
@@ -1140,7 +1456,7 @@ public class BacklogQuotaManagerTest {
                         .build());
         @Cleanup
         final PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString())
-                .statsInterval(0, TimeUnit.SECONDS).build();
+                .statsInterval(0, SECONDS).build();
         final String topic1 = "persistent://prop/quotahold/except";
         final String subName1 = "c1except";
         boolean gotException = false;
@@ -1185,7 +1501,7 @@ public class BacklogQuotaManagerTest {
                         .build());
         @Cleanup
         final PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString())
-                .statsInterval(0, TimeUnit.SECONDS).build();
+                .statsInterval(0, SECONDS).build();
         final String topic1 = "persistent://prop/quotahold/exceptandunblock";
         final String subName1 = "c1except";
         boolean gotException = false;
@@ -1269,11 +1585,11 @@ public class BacklogQuotaManagerTest {
                 BacklogQuota.builder()
                         .limitTime(TIME_TO_CHECK_BACKLOG_QUOTA)
                         .retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception)
-                        .build(), BacklogQuota.BacklogQuotaType.message_age);
+                        .build(), message_age);
         config.setPreciseTimeBasedBacklogQuotaCheck(true);
         @Cleanup
         final PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString())
-                .statsInterval(0, TimeUnit.SECONDS).build();
+                .statsInterval(0, SECONDS).build();
         final String topic1 = "persistent://prop/quotahold/exceptandunblock2";
         final String subName1 = "c1except";
         boolean gotException = false;
@@ -1335,10 +1651,10 @@ public class BacklogQuotaManagerTest {
                 BacklogQuota.builder()
                         .limitTime(TIME_TO_CHECK_BACKLOG_QUOTA)
                         .retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception)
-                        .build(), BacklogQuota.BacklogQuotaType.message_age);
+                        .build(), message_age);
         @Cleanup
         final PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString())
-                .statsInterval(0, TimeUnit.SECONDS).build();
+                .statsInterval(0, SECONDS).build();
         final String topic1 = "persistent://prop/quotahold/exceptandunblock2";
         final String subName1 = "c1except";
         boolean gotException = false;
@@ -1412,7 +1728,7 @@ public class BacklogQuotaManagerTest {
         admin = PulsarAdmin.builder().serviceHttpUrl(adminUrl.toString()).build();
 
         @Cleanup
-        PulsarClient client = PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).statsInterval(0, TimeUnit.SECONDS)
+        PulsarClient client = PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).statsInterval(0, SECONDS)
                 .build();
 
         final String topic1 = "persistent://prop/ns-quota/topic2" + UUID.randomUUID();
@@ -1422,7 +1738,7 @@ public class BacklogQuotaManagerTest {
 
         Consumer<byte[]> consumer1 = client.newConsumer().topic(topic1).subscriptionName(subName1).subscribe();
         Consumer<byte[]> consumer2 = client.newConsumer().topic(topic1).subscriptionName(subName2).subscribe();
-        org.apache.pulsar.client.api.Producer<byte[]> producer = createProducer(client, topic1);
+        Producer<byte[]> producer = createProducer(client, topic1);
         byte[] content = new byte[1024];
         for (int i = 0; i < numMsgs; i++) {
             producer.send(content);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index b6dd42d7028..e195f220f87 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service;
 
 import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
 import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgsRecordingInvocations;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyInt;
@@ -1844,14 +1845,14 @@ public class ServerCnxTest {
         ByteBuf clientCommand1 = Commands.newProducer(successTopicName, 1 /* producer id */, 1 /* request id */,
                 producerName, Collections.emptyMap(), false);
         channel.writeInbound(clientCommand1);
-        assertTrue(getResponse() instanceof CommandProducerSuccess);
+        assertThat(getResponse()).isInstanceOf(CommandProducerSuccess.class);
 
         // Call disconnect method on producer to trigger activity similar to unloading
         Producer producer = serverCnx.getProducers().get(1).get();
         assertNotNull(producer);
         producer.disconnect();
         channel.runPendingTasks();
-        assertTrue(getResponse() instanceof CommandCloseProducer);
+        assertThat(getResponse()).isInstanceOf(CommandCloseProducer.class);
 
         // Send message and expect no response
         sendMessage();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
index b11946069c9..fd08f284bbf 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.service;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
@@ -128,6 +129,29 @@ public class SubscriptionSeekTest extends BrokerTestBase {
         assertEquals(sub.getNumberOfEntriesInBacklog(false), 0);
     }
 
+    @Test
+    public void testSeekIsByReceive() throws PulsarClientException {
+        final String topicName = "persistent://prop/use/ns-abc/testSeek";
+
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
+
+        String subscriptionName = "my-subscription";
+        org.apache.pulsar.client.api.Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName)
+                .subscriptionName(subscriptionName)
+                .subscribe();
+
+        List<MessageId> messageIds = new ArrayList<>();
+        for (int i = 0; i < 10; i++) {
+            String message = "my-message-" + i;
+            MessageId msgId = producer.send(message.getBytes());
+            messageIds.add(msgId);
+        }
+
+        consumer.seek(messageIds.get(5));
+        Message<byte[]> message = consumer.receive();
+        assertThat(message.getMessageId()).isEqualTo(messageIds.get(6));
+    }
+
     @Test
     public void testSeekForBatch() throws Exception {
         final String topicName = "persistent://prop/use/ns-abcd/testSeekForBatch";
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BucketDelayedDeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BucketDelayedDeliveryTest.java
index 54fec3934dd..8be0aa4bc7d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BucketDelayedDeliveryTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BucketDelayedDeliveryTest.java
@@ -19,6 +19,8 @@
 package org.apache.pulsar.broker.service.persistent;
 
 import static org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.CURSOR_INTERNAL_PROPERTY_PREFIX;
+import static org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.Metric;
+import static org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.parseMetrics;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
@@ -41,7 +43,6 @@ import org.apache.commons.lang3.mutable.MutableInt;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.delayed.BucketDelayedDeliveryTrackerFactory;
 import org.apache.pulsar.broker.service.Dispatcher;
-import org.apache.pulsar.broker.stats.PrometheusMetricsTest;
 import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.MessageId;
@@ -219,9 +220,9 @@ public class BucketDelayedDeliveryTest extends DelayedDeliveryTest {
         ByteArrayOutputStream output = new ByteArrayOutputStream();
         PrometheusMetricsGenerator.generate(pulsar, true, true, true, output);
         String metricsStr = output.toString(StandardCharsets.UTF_8);
-        Multimap<String, PrometheusMetricsTest.Metric> metricsMap = PrometheusMetricsTest.parseMetrics(metricsStr);
+        Multimap<String, Metric> metricsMap = parseMetrics(metricsStr);
 
-        List<PrometheusMetricsTest.Metric> bucketsMetrics =
+        List<Metric> bucketsMetrics =
                 metricsMap.get("pulsar_delayed_message_index_bucket_total").stream()
                         .filter(metric -> metric.tags.get("topic").equals(topic)).toList();
         MutableInt bucketsSum = new MutableInt();
@@ -230,12 +231,12 @@ public class BucketDelayedDeliveryTest extends DelayedDeliveryTest {
             bucketsSum.add(metric.value);
         });
         assertEquals(6, bucketsSum.intValue());
-        Optional<PrometheusMetricsTest.Metric> bucketsTopicMetric =
+        Optional<Metric> bucketsTopicMetric =
                 bucketsMetrics.stream().filter(metric -> !metric.tags.containsKey("subscription")).findFirst();
         assertTrue(bucketsTopicMetric.isPresent());
         assertEquals(bucketsSum.intValue(), bucketsTopicMetric.get().value);
 
-        List<PrometheusMetricsTest.Metric> loadedIndexMetrics =
+        List<Metric> loadedIndexMetrics =
                 metricsMap.get("pulsar_delayed_message_index_loaded").stream()
                         .filter(metric -> metric.tags.get("topic").equals(topic)).toList();
         MutableInt loadedIndexSum = new MutableInt();
@@ -244,12 +245,12 @@ public class BucketDelayedDeliveryTest extends DelayedDeliveryTest {
             loadedIndexSum.add(metric.value);
         }).count();
         assertEquals(2, count);
-        Optional<PrometheusMetricsTest.Metric> loadedIndexTopicMetrics =
+        Optional<Metric> loadedIndexTopicMetrics =
                 bucketsMetrics.stream().filter(metric -> !metric.tags.containsKey("subscription")).findFirst();
         assertTrue(loadedIndexTopicMetrics.isPresent());
         assertEquals(loadedIndexSum.intValue(), loadedIndexTopicMetrics.get().value);
 
-        List<PrometheusMetricsTest.Metric> snapshotSizeBytesMetrics =
+        List<Metric> snapshotSizeBytesMetrics =
                 metricsMap.get("pulsar_delayed_message_index_bucket_snapshot_size_bytes").stream()
                         .filter(metric -> metric.tags.get("topic").equals(topic)).toList();
         MutableInt snapshotSizeBytesSum = new MutableInt();
@@ -259,12 +260,12 @@ public class BucketDelayedDeliveryTest extends DelayedDeliveryTest {
                     snapshotSizeBytesSum.add(metric.value);
                 }).count();
         assertEquals(2, count);
-        Optional<PrometheusMetricsTest.Metric> snapshotSizeBytesTopicMetrics =
+        Optional<Metric> snapshotSizeBytesTopicMetrics =
                 snapshotSizeBytesMetrics.stream().filter(metric -> !metric.tags.containsKey("subscription")).findFirst();
         assertTrue(snapshotSizeBytesTopicMetrics.isPresent());
         assertEquals(snapshotSizeBytesSum.intValue(), snapshotSizeBytesTopicMetrics.get().value);
 
-        List<PrometheusMetricsTest.Metric> opCountMetrics =
+        List<Metric> opCountMetrics =
                 metricsMap.get("pulsar_delayed_message_index_bucket_op_count").stream()
                         .filter(metric -> metric.tags.get("topic").equals(topic)).toList();
         MutableInt opCountMetricsSum = new MutableInt();
@@ -276,14 +277,14 @@ public class BucketDelayedDeliveryTest extends DelayedDeliveryTest {
                     opCountMetricsSum.add(metric.value);
                 }).count();
         assertEquals(2, count);
-        Optional<PrometheusMetricsTest.Metric> opCountTopicMetrics =
+        Optional<Metric> opCountTopicMetrics =
                 opCountMetrics.stream()
                         .filter(metric -> metric.tags.get("state").equals("succeed") && metric.tags.get("type")
                                 .equals("create") && !metric.tags.containsKey("subscription")).findFirst();
         assertTrue(opCountTopicMetrics.isPresent());
         assertEquals(opCountMetricsSum.intValue(), opCountTopicMetrics.get().value);
 
-        List<PrometheusMetricsTest.Metric> opLatencyMetrics =
+        List<Metric> opLatencyMetrics =
                 metricsMap.get("pulsar_delayed_message_index_bucket_op_latency_ms").stream()
                         .filter(metric -> metric.tags.get("topic").equals(topic)).toList();
         MutableInt opLatencyMetricsSum = new MutableInt();
@@ -295,7 +296,7 @@ public class BucketDelayedDeliveryTest extends DelayedDeliveryTest {
                     opLatencyMetricsSum.add(metric.value);
                 }).count();
         assertTrue(count >= 2);
-        Optional<PrometheusMetricsTest.Metric> opLatencyTopicMetrics =
+        Optional<Metric> opLatencyTopicMetrics =
                 opCountMetrics.stream()
                         .filter(metric -> metric.tags.get("type").equals("create")
                                 && !metric.tags.containsKey("subscription")).findFirst();
@@ -304,9 +305,9 @@ public class BucketDelayedDeliveryTest extends DelayedDeliveryTest {
 
         ByteArrayOutputStream namespaceOutput = new ByteArrayOutputStream();
         PrometheusMetricsGenerator.generate(pulsar, false, true, true, namespaceOutput);
-        Multimap<String, PrometheusMetricsTest.Metric> namespaceMetricsMap = PrometheusMetricsTest.parseMetrics(namespaceOutput.toString(StandardCharsets.UTF_8));
+        Multimap<String, Metric> namespaceMetricsMap = parseMetrics(namespaceOutput.toString(StandardCharsets.UTF_8));
 
-        Optional<PrometheusMetricsTest.Metric> namespaceMetric =
+        Optional<Metric> namespaceMetric =
                 namespaceMetricsMap.get("pulsar_delayed_message_index_bucket_total").stream().findFirst();
         assertTrue(namespaceMetric.isPresent());
         assertEquals(6, namespaceMetric.get().value);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
index 6f60a13fd48..4eb2aa15fa2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.broker.service.persistent;
 
+import static org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.Metric;
+import static org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.parseMetrics;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.Mockito.doAnswer;
@@ -62,7 +64,6 @@ import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.BrokerTestBase;
 import org.apache.pulsar.broker.service.TopicPoliciesService;
-import org.apache.pulsar.broker.stats.PrometheusMetricsTest;
 import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
@@ -356,14 +357,14 @@ public class PersistentTopicTest extends BrokerTestBase {
         PrometheusMetricsGenerator.generate(pulsar, exposeTopicLevelMetrics, true, true, output);
         String metricsStr = output.toString(StandardCharsets.UTF_8);
 
-        Multimap<String, PrometheusMetricsTest.Metric> metricsMap = PrometheusMetricsTest.parseMetrics(metricsStr);
-        Collection<PrometheusMetricsTest.Metric> metrics = metricsMap.get("pulsar_delayed_message_index_size_bytes");
+        Multimap<String, Metric> metricsMap = parseMetrics(metricsStr);
+        Collection<Metric> metrics = metricsMap.get("pulsar_delayed_message_index_size_bytes");
         Assert.assertTrue(metrics.size() > 0);
 
         int topicLevelNum = 0;
         int namespaceLevelNum = 0;
         int subscriptionLevelNum = 0;
-        for (PrometheusMetricsTest.Metric metric : metrics) {
+        for (Metric metric : metrics) {
             if (exposeTopicLevelMetrics && metric.tags.get("topic").equals(topic)) {
                 Assert.assertTrue(metric.value > 0);
                 topicLevelNum++;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
index fbf734f331f..a520b8c241b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.broker.service.schema;
 
+import static org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.Metric;
+import static org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.parseMetrics;
 import static org.testng.Assert.assertThrows;
 import static org.testng.Assert.fail;
 import static org.testng.AssertJUnit.assertEquals;
@@ -45,7 +47,6 @@ import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.service.schema.SchemaRegistry.SchemaAndMetadata;
-import org.apache.pulsar.broker.stats.PrometheusMetricsTest;
 import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.impl.schema.KeyValueSchemaInfo;
@@ -123,29 +124,29 @@ public class SchemaServiceTest extends MockedPulsarServiceBaseTest {
         PrometheusMetricsGenerator.generate(pulsar, false, false, false, output);
         output.flush();
         String metricsStr = output.toString(StandardCharsets.UTF_8);
-        Multimap<String, PrometheusMetricsTest.Metric> metrics = PrometheusMetricsTest.parseMetrics(metricsStr);
+        Multimap<String, Metric> metrics = parseMetrics(metricsStr);
 
-        Collection<PrometheusMetricsTest.Metric> delMetrics = metrics.get("pulsar_schema_del_ops_failed_total");
+        Collection<Metric> delMetrics = metrics.get("pulsar_schema_del_ops_failed_total");
         Assert.assertEquals(delMetrics.size(), 0);
-        Collection<PrometheusMetricsTest.Metric> getMetrics = metrics.get("pulsar_schema_get_ops_failed_total");
+        Collection<Metric> getMetrics = metrics.get("pulsar_schema_get_ops_failed_total");
         Assert.assertEquals(getMetrics.size(), 0);
-        Collection<PrometheusMetricsTest.Metric> putMetrics = metrics.get("pulsar_schema_put_ops_failed_total");
+        Collection<Metric> putMetrics = metrics.get("pulsar_schema_put_ops_failed_total");
         Assert.assertEquals(putMetrics.size(), 0);
 
-        Collection<PrometheusMetricsTest.Metric> deleteLatency = metrics.get("pulsar_schema_del_ops_latency_count");
-        for (PrometheusMetricsTest.Metric metric : deleteLatency) {
+        Collection<Metric> deleteLatency = metrics.get("pulsar_schema_del_ops_latency_count");
+        for (Metric metric : deleteLatency) {
             Assert.assertEquals(metric.tags.get("namespace"), namespace);
             Assert.assertTrue(metric.value > 0);
         }
 
-        Collection<PrometheusMetricsTest.Metric> getLatency = metrics.get("pulsar_schema_get_ops_latency_count");
-        for (PrometheusMetricsTest.Metric metric : getLatency) {
+        Collection<Metric> getLatency = metrics.get("pulsar_schema_get_ops_latency_count");
+        for (Metric metric : getLatency) {
             Assert.assertEquals(metric.tags.get("namespace"), namespace);
             Assert.assertTrue(metric.value > 0);
         }
 
-        Collection<PrometheusMetricsTest.Metric> putLatency = metrics.get("pulsar_schema_put_ops_latency_count");
-        for (PrometheusMetricsTest.Metric metric : putLatency) {
+        Collection<Metric> putLatency = metrics.get("pulsar_schema_put_ops_latency_count");
+        for (Metric metric : putLatency) {
             Assert.assertEquals(metric.tags.get("namespace"), namespace);
             Assert.assertTrue(metric.value > 0);
         }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
index f29c643a8f5..eb4500c1366 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
@@ -19,6 +19,8 @@
 package org.apache.pulsar.broker.stats;
 
 import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
+import static org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.Metric;
+import static org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.parseMetrics;
 import static org.mockito.Mockito.mock;
 import static org.testng.Assert.assertNotEquals;
 import static org.testng.AssertJUnit.assertEquals;
@@ -336,11 +338,11 @@ public class ConsumerStatsTest extends ProducerConsumerBase {
         PrometheusMetricsGenerator.generate(pulsar, exposeTopicLevelMetrics, true, true, output);
         String metricStr = output.toString(StandardCharsets.UTF_8);
 
-        Multimap<String, PrometheusMetricsTest.Metric> metricsMap = PrometheusMetricsTest.parseMetrics(metricStr);
-        Collection<PrometheusMetricsTest.Metric> ackRateMetric = metricsMap.get("pulsar_consumer_msg_ack_rate");
+        Multimap<String, Metric> metricsMap = parseMetrics(metricStr);
+        Collection<Metric> ackRateMetric = metricsMap.get("pulsar_consumer_msg_ack_rate");
 
         String rateOutMetricName = exposeTopicLevelMetrics ? "pulsar_consumer_msg_rate_out" : "pulsar_rate_out";
-        Collection<PrometheusMetricsTest.Metric> rateOutMetric = metricsMap.get(rateOutMetricName);
+        Collection<Metric> rateOutMetric = metricsMap.get(rateOutMetricName);
         Assert.assertTrue(ackRateMetric.size() > 0);
         Assert.assertTrue(rateOutMetric.size() > 0);
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/MetadataStoreStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/MetadataStoreStatsTest.java
index 8ae0242c623..15f41365da8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/MetadataStoreStatsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/MetadataStoreStatsTest.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.broker.stats;
 
+import static org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.Metric;
+import static org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.parseMetrics;
 import com.google.common.collect.Multimap;
 import java.io.ByteArrayOutputStream;
 import java.util.Collection;
@@ -101,12 +103,12 @@ public class MetadataStoreStatsTest extends BrokerTestBase {
         ByteArrayOutputStream output = new ByteArrayOutputStream();
         PrometheusMetricsGenerator.generate(pulsar, false, false, false, false, output);
         String metricsStr = output.toString();
-        Multimap<String, PrometheusMetricsTest.Metric> metricsMap = PrometheusMetricsTest.parseMetrics(metricsStr);
+        Multimap<String, Metric> metricsMap = parseMetrics(metricsStr);
 
         String metricsDebugMessage = "Assertion failed with metrics:\n" + metricsStr + "\n";
 
-        Collection<PrometheusMetricsTest.Metric> opsLatency = metricsMap.get("pulsar_metadata_store_ops_latency_ms" + "_sum");
-        Collection<PrometheusMetricsTest.Metric> putBytes = metricsMap.get("pulsar_metadata_store_put_bytes" + "_total");
+        Collection<Metric> opsLatency = metricsMap.get("pulsar_metadata_store_ops_latency_ms" + "_sum");
+        Collection<Metric> putBytes = metricsMap.get("pulsar_metadata_store_put_bytes" + "_total");
 
         Assert.assertTrue(opsLatency.size() > 1, metricsDebugMessage);
         Assert.assertTrue(putBytes.size() > 1, metricsDebugMessage);
@@ -116,7 +118,7 @@ public class MetadataStoreStatsTest extends BrokerTestBase {
         expectedMetadataStoreName.add(MetadataStoreConfig.CONFIGURATION_METADATA_STORE);
 
         AtomicInteger matchCount = new AtomicInteger(0);
-        for (PrometheusMetricsTest.Metric m : opsLatency) {
+        for (Metric m : opsLatency) {
             Assert.assertEquals(m.tags.get("cluster"), "test", metricsDebugMessage);
             String metadataStoreName = m.tags.get("name");
             if (!isExpectedLabel(metadataStoreName, expectedMetadataStoreName, matchCount)) {
@@ -150,7 +152,7 @@ public class MetadataStoreStatsTest extends BrokerTestBase {
         Assert.assertEquals(matchCount.get(), expectedMetadataStoreName.size() * 6);
 
         matchCount = new AtomicInteger(0);
-        for (PrometheusMetricsTest.Metric m : putBytes) {
+        for (Metric m : putBytes) {
             Assert.assertEquals(m.tags.get("cluster"), "test", metricsDebugMessage);
             String metadataStoreName = m.tags.get("name");
             if (!isExpectedLabel(metadataStoreName, expectedMetadataStoreName, matchCount)) {
@@ -191,12 +193,12 @@ public class MetadataStoreStatsTest extends BrokerTestBase {
         ByteArrayOutputStream output = new ByteArrayOutputStream();
         PrometheusMetricsGenerator.generate(pulsar, false, false, false, false, output);
         String metricsStr = output.toString();
-        Multimap<String, PrometheusMetricsTest.Metric> metricsMap = PrometheusMetricsTest.parseMetrics(metricsStr);
+        Multimap<String, Metric> metricsMap = parseMetrics(metricsStr);
 
-        Collection<PrometheusMetricsTest.Metric> executorQueueSize = metricsMap.get("pulsar_batch_metadata_store_executor_queue_size");
-        Collection<PrometheusMetricsTest.Metric> opsWaiting = metricsMap.get("pulsar_batch_metadata_store_queue_wait_time_ms" + "_sum");
-        Collection<PrometheusMetricsTest.Metric> batchExecuteTime = metricsMap.get("pulsar_batch_metadata_store_batch_execute_time_ms" + "_sum");
-        Collection<PrometheusMetricsTest.Metric> opsPerBatch = metricsMap.get("pulsar_batch_metadata_store_batch_size" + "_sum");
+        Collection<Metric> executorQueueSize = metricsMap.get("pulsar_batch_metadata_store_executor_queue_size");
+        Collection<Metric> opsWaiting = metricsMap.get("pulsar_batch_metadata_store_queue_wait_time_ms" + "_sum");
+        Collection<Metric> batchExecuteTime = metricsMap.get("pulsar_batch_metadata_store_batch_execute_time_ms" + "_sum");
+        Collection<Metric> opsPerBatch = metricsMap.get("pulsar_batch_metadata_store_batch_size" + "_sum");
 
         String metricsDebugMessage = "Assertion failed with metrics:\n" + metricsStr + "\n";
 
@@ -210,7 +212,7 @@ public class MetadataStoreStatsTest extends BrokerTestBase {
         expectedMetadataStoreName.add(MetadataStoreConfig.CONFIGURATION_METADATA_STORE);
 
         AtomicInteger matchCount = new AtomicInteger(0);
-        for (PrometheusMetricsTest.Metric m : executorQueueSize) {
+        for (Metric m : executorQueueSize) {
             Assert.assertEquals(m.tags.get("cluster"), "test", metricsDebugMessage);
             String metadataStoreName = m.tags.get("name");
             if (isExpectedLabel(metadataStoreName, expectedMetadataStoreName, matchCount)) {
@@ -221,7 +223,7 @@ public class MetadataStoreStatsTest extends BrokerTestBase {
         Assert.assertEquals(matchCount.get(), expectedMetadataStoreName.size());
 
         matchCount = new AtomicInteger(0);
-        for (PrometheusMetricsTest.Metric m : opsWaiting) {
+        for (Metric m : opsWaiting) {
             Assert.assertEquals(m.tags.get("cluster"), "test", metricsDebugMessage);
             String metadataStoreName = m.tags.get("name");
             if (isExpectedLabel(metadataStoreName, expectedMetadataStoreName, matchCount)) {
@@ -232,7 +234,7 @@ public class MetadataStoreStatsTest extends BrokerTestBase {
         Assert.assertEquals(matchCount.get(), expectedMetadataStoreName.size());
 
         matchCount = new AtomicInteger(0);
-        for (PrometheusMetricsTest.Metric m : batchExecuteTime) {
+        for (Metric m : batchExecuteTime) {
             Assert.assertEquals(m.tags.get("cluster"), "test", metricsDebugMessage);
             String metadataStoreName = m.tags.get("name");
             if (isExpectedLabel(metadataStoreName, expectedMetadataStoreName, matchCount)) {
@@ -243,7 +245,7 @@ public class MetadataStoreStatsTest extends BrokerTestBase {
         Assert.assertEquals(matchCount.get(), expectedMetadataStoreName.size());
 
         matchCount = new AtomicInteger(0);
-        for (PrometheusMetricsTest.Metric m : opsPerBatch) {
+        for (Metric m : opsPerBatch) {
             Assert.assertEquals(m.tags.get("cluster"), "test", metricsDebugMessage);
             String metadataStoreName = m.tags.get("name");
             if (isExpectedLabel(metadataStoreName, expectedMetadataStoreName, matchCount)) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index 476cf3f9b4a..c2cacac56ca 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -19,13 +19,13 @@
 package org.apache.pulsar.broker.stats;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.Metric;
+import static org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.parseMetrics;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
-import com.google.common.base.MoreObjects;
 import com.google.common.base.Splitter;
-import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Multimap;
 import io.jsonwebtoken.SignatureAlgorithm;
 import io.prometheus.client.Collector;
@@ -49,7 +49,6 @@ import java.util.Optional;
 import java.util.Properties;
 import java.util.Random;
 import java.util.Set;
-import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
@@ -1908,62 +1907,6 @@ public class PrometheusMetricsTest extends BrokerTestBase {
         p2.close();
     }
 
-    /**
-     * Hacky parsing of Prometheus text format. Should be good enough for unit tests
-     */
-    public static Multimap<String, Metric> parseMetrics(String metrics) {
-        Multimap<String, Metric> parsed = ArrayListMultimap.create();
-
-        // Example of lines are
-        // jvm_threads_current{cluster="standalone",} 203.0
-        // or
-        // pulsar_subscriptions_count{cluster="standalone", namespace="public/default",
-        // topic="persistent://public/default/test-2"} 0.0
-        Pattern pattern = Pattern.compile("^(\\w+)\\{([^\\}]+)\\}\\s([+-]?[\\d\\w\\.-]+)$");
-        Pattern tagsPattern = Pattern.compile("(\\w+)=\"([^\"]+)\"(,\\s?)?");
-
-        Splitter.on("\n").split(metrics).forEach(line -> {
-            if (line.isEmpty() || line.startsWith("#")) {
-                return;
-            }
-
-            Matcher matcher = pattern.matcher(line);
-            assertTrue(matcher.matches(), "line " + line + " does not match pattern " + pattern);
-            String name = matcher.group(1);
-
-            Metric m = new Metric();
-            String numericValue = matcher.group(3);
-            if (numericValue.equalsIgnoreCase("-Inf")) {
-                m.value = Double.NEGATIVE_INFINITY;
-            } else if (numericValue.equalsIgnoreCase("+Inf")) {
-                m.value = Double.POSITIVE_INFINITY;
-            } else {
-                m.value = Double.parseDouble(numericValue);
-            }
-            String tags = matcher.group(2);
-            Matcher tagsMatcher = tagsPattern.matcher(tags);
-            while (tagsMatcher.find()) {
-                String tag = tagsMatcher.group(1);
-                String value = tagsMatcher.group(2);
-                m.tags.put(tag, value);
-            }
-
-            parsed.put(name, m);
-        });
-
-        return parsed;
-    }
-
-    public static class Metric {
-        public Map<String, String> tags = new TreeMap<>();
-        public double value;
-
-        @Override
-        public String toString() {
-            return MoreObjects.toStringHelper(this).add("tags", tags).add("value", value).toString();
-        }
-    }
-
     @Test
     public void testEscapeLabelValue() throws Exception {
         String ns1 = "prop/ns-abc1";
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java
index d5e0066a86f..e39860274d1 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java
@@ -19,6 +19,8 @@
 package org.apache.pulsar.broker.stats;
 
 import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
+import static org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.Metric;
+import static org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.parseMetrics;
 import static org.mockito.Mockito.mock;
 import com.google.common.collect.Multimap;
 import java.io.ByteArrayOutputStream;
@@ -84,7 +86,7 @@ public class SubscriptionStatsTest extends ProducerConsumerBase {
     @Test
     public void testConsumersAfterMarkDelete() throws PulsarClientException, PulsarAdminException {
         final String topicName = "persistent://my-property/my-ns/testConsumersAfterMarkDelete-"
-                + UUID.randomUUID().toString();
+                + UUID.randomUUID();
         final String subName = "my-sub";
 
         Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
@@ -233,15 +235,15 @@ public class SubscriptionStatsTest extends ProducerConsumerBase {
         ByteArrayOutputStream output = new ByteArrayOutputStream();
         PrometheusMetricsGenerator.generate(pulsar, enableTopicStats, false, false, output);
         String metricsStr = output.toString();
-        Multimap<String, PrometheusMetricsTest.Metric> metrics = PrometheusMetricsTest.parseMetrics(metricsStr);
+        Multimap<String, Metric> metrics = parseMetrics(metricsStr);
 
-        Collection<PrometheusMetricsTest.Metric> throughFilterMetrics =
+        Collection<Metric> throughFilterMetrics =
                 metrics.get("pulsar_subscription_filter_processed_msg_count");
-        Collection<PrometheusMetricsTest.Metric> acceptedMetrics =
+        Collection<Metric> acceptedMetrics =
                 metrics.get("pulsar_subscription_filter_accepted_msg_count");
-        Collection<PrometheusMetricsTest.Metric> rejectedMetrics =
+        Collection<Metric> rejectedMetrics =
                 metrics.get("pulsar_subscription_filter_rejected_msg_count");
-        Collection<PrometheusMetricsTest.Metric> rescheduledMetrics =
+        Collection<Metric> rescheduledMetrics =
                 metrics.get("pulsar_subscription_filter_rescheduled_msg_count");
 
         if (enableTopicStats) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
index 4d38f5fad51..723a493eca1 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
@@ -19,7 +19,8 @@
 package org.apache.pulsar.broker.stats;
 
 import static com.google.common.base.Preconditions.checkArgument;
-import static org.apache.pulsar.broker.stats.PrometheusMetricsTest.parseMetrics;
+import static org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.Metric;
+import static org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.parseMetrics;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
@@ -119,8 +120,8 @@ public class TransactionMetricsTest extends BrokerTestBase {
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
         PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut);
         String metricsStr = statsOut.toString();
-        Multimap<String, PrometheusMetricsTest.Metric> metrics = parseMetrics(metricsStr);
-        Collection<PrometheusMetricsTest.Metric> metric = metrics.get("pulsar_txn_active_count");
+        Multimap<String, Metric> metrics = parseMetrics(metricsStr);
+        Collection<Metric> metric = metrics.get("pulsar_txn_active_count");
         assertEquals(metric.size(), 2);
         metric.forEach(item -> {
             if ("0".equals(item.tags.get("coordinator_id"))) {
@@ -187,9 +188,9 @@ public class TransactionMetricsTest extends BrokerTestBase {
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
         PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut);
         String metricsStr = statsOut.toString();
-        Multimap<String, PrometheusMetricsTest.Metric> metrics = parseMetrics(metricsStr);
+        Multimap<String, Metric> metrics = parseMetrics(metricsStr);
 
-        Collection<PrometheusMetricsTest.Metric> metric = metrics.get("pulsar_txn_created_total");
+        Collection<Metric> metric = metrics.get("pulsar_txn_created_total");
         assertEquals(metric.size(), 1);
         metric.forEach(item -> assertEquals(item.value, txnCount));
 
@@ -274,9 +275,9 @@ public class TransactionMetricsTest extends BrokerTestBase {
         PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut);
         String metricsStr = statsOut.toString();
 
-        Multimap<String, PrometheusMetricsTest.Metric> metrics = parseMetrics(metricsStr);
+        Multimap<String, Metric> metrics = parseMetrics(metricsStr);
 
-        Collection<PrometheusMetricsTest.Metric> metric = metrics.get("pulsar_storage_size");
+        Collection<Metric> metric = metrics.get("pulsar_storage_size");
         checkManagedLedgerMetrics(subName, 32, metric);
         checkManagedLedgerMetrics(MLTransactionLogImpl.TRANSACTION_SUBSCRIPTION_NAME, 252, metric);
 
@@ -336,12 +337,12 @@ public class TransactionMetricsTest extends BrokerTestBase {
         PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut);
         String metricsStr = statsOut.toString();
 
-        Multimap<String, PrometheusMetricsTest.Metric> metrics = parseMetrics(metricsStr);
+        Multimap<String, Metric> metrics = parseMetrics(metricsStr);
 
-        Collection<PrometheusMetricsTest.Metric> metric = metrics.get("pulsar_storage_size");
+        Collection<Metric> metric = metrics.get("pulsar_storage_size");
         checkManagedLedgerMetrics(subName, 32, metric);
         //No statistics of the pendingAck are generated when the pendingAck is not initialized.
-        for (PrometheusMetricsTest.Metric metric1 : metric) {
+        for (Metric metric1 : metric) {
             if (metric1.tags.containsValue(subName2)) {
                 Assert.fail();
             }
@@ -431,9 +432,9 @@ public class TransactionMetricsTest extends BrokerTestBase {
 
     }
 
-    private void checkManagedLedgerMetrics(String tag, double value, Collection<PrometheusMetricsTest.Metric> metrics) {
+    private void checkManagedLedgerMetrics(String tag, double value, Collection<Metric> metrics) {
         boolean exist = false;
-        for (PrometheusMetricsTest.Metric metric1 : metrics) {
+        for (Metric metric1 : metrics) {
             if (metric1.tags.containsValue(tag)) {
                 assertEquals(metric1.value, value);
                 exist = true;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregatorTest.java
index e63f644f3d0..cf923df0411 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregatorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregatorTest.java
@@ -32,6 +32,7 @@ import org.apache.pulsar.broker.service.Replicator;
 import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.service.persistent.PersistentTopicMetrics;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
 import org.apache.pulsar.common.policies.data.stats.SubscriptionStatsImpl;
@@ -102,6 +103,8 @@ public class NamespaceStatsAggregatorTest {
         when(topic.getReplicators()).thenReturn(ConcurrentOpenHashMap.<String,Replicator>newBuilder().build());
         when(topic.getManagedLedger()).thenReturn(ml);
         when(topic.getBacklogQuota(Mockito.any())).thenReturn(Mockito.mock(BacklogQuota.class));
+        PersistentTopicMetrics persistentTopicMetrics = new PersistentTopicMetrics();
+        when(topic.getPersistentTopicMetrics()).thenReturn(persistentTopicMetrics);
         topicsMap.put("my-topic", topic);
         PrometheusMetricStreams metricStreams = Mockito.spy(new PrometheusMetricStreams());
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsClient.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsClient.java
new file mode 100644
index 00000000000..6fd50969027
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsClient.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats.prometheus;
+
+import static org.assertj.core.api.Fail.fail;
+import static org.testng.Assert.assertTrue;
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Splitter;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+import io.restassured.RestAssured;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.commons.lang3.tuple.Pair;
+
+public class PrometheusMetricsClient {
+    private final String host;
+    private final int port;
+
+    public PrometheusMetricsClient(String host, int port) {
+        this.host = host;
+        this.port = port;
+    }
+
+    @SuppressWarnings("HttpUrlsUsage")
+    public Metrics getMetrics() {
+        String metrics = RestAssured.given().baseUri("http://" + host).port(port).get("/metrics").asString();
+        return new Metrics(parseMetrics(metrics));
+    }
+
+    /**
+     * Hacky parsing of Prometheus text format. Should be good enough for unit tests
+     */
+    public static Multimap<String, Metric> parseMetrics(String metrics) {
+        Multimap<String, Metric> parsed = ArrayListMultimap.create();
+
+        // Example of lines are
+        // jvm_threads_current{cluster="standalone",} 203.0
+        // or
+        // pulsar_subscriptions_count{cluster="standalone", namespace="public/default",
+        // topic="persistent://public/default/test-2"} 0.0
+        Pattern pattern = Pattern.compile("^(\\w+)\\{([^}]+)}\\s([+-]?[\\d\\w.-]+)$");
+        Pattern tagsPattern = Pattern.compile("(\\w+)=\"([^\"]+)\"(,\\s?)?");
+
+        Splitter.on("\n").split(metrics).forEach(line -> {
+            if (line.isEmpty() || line.startsWith("#")) {
+                return;
+            }
+
+            Matcher matcher = pattern.matcher(line);
+            assertTrue(matcher.matches(), "line " + line + " does not match pattern " + pattern);
+            String name = matcher.group(1);
+
+            Metric m = new Metric();
+            String numericValue = matcher.group(3);
+            if (numericValue.equalsIgnoreCase("-Inf")) {
+                m.value = Double.NEGATIVE_INFINITY;
+            } else if (numericValue.equalsIgnoreCase("+Inf")) {
+                m.value = Double.POSITIVE_INFINITY;
+            } else {
+                m.value = Double.parseDouble(numericValue);
+            }
+            String tags = matcher.group(2);
+            Matcher tagsMatcher = tagsPattern.matcher(tags);
+            while (tagsMatcher.find()) {
+                String tag = tagsMatcher.group(1);
+                String value = tagsMatcher.group(2);
+                m.tags.put(tag, value);
+            }
+
+            parsed.put(name, m);
+        });
+
+        return parsed;
+    }
+
+    public static class Metric {
+        public Map<String, String> tags = new TreeMap<>();
+        public double value;
+
+        @Override
+        public String toString() {
+            return MoreObjects.toStringHelper(this).add("tags", tags).add("value", value).toString();
+        }
+
+        public boolean contains(String labelName, String labelValue) {
+            String value = tags.get(labelName);
+            return value != null && value.equals(labelValue);
+        }
+    }
+
+    public static class Metrics {
+        final Multimap<String, Metric> nameToDataPoints;
+
+        public Metrics(Multimap<String, Metric> nameToDataPoints) {
+            this.nameToDataPoints = nameToDataPoints;
+        }
+
+        public List<Metric> findByNameAndLabels(String metricName, String labelName, String labelValue) {
+            return nameToDataPoints.get(metricName)
+                    .stream()
+                    .filter(metric -> metric.contains(labelName, labelValue))
+                    .toList();
+        }
+
+        @SafeVarargs
+        public final List<Metric> findByNameAndLabels(String metricName, Pair<String, String>... nameValuePairs) {
+            return nameToDataPoints.get(metricName)
+                    .stream()
+                    .filter(metric -> {
+                        for (Pair<String, String> nameValuePair : nameValuePairs) {
+                            String labelName = nameValuePair.getLeft();
+                            String labelValue = nameValuePair.getRight();
+                            if (!metric.contains(labelName, labelValue)) {
+                                return false;
+                            }
+                        }
+                        return true;
+                    })
+                    .toList();
+        }
+
+        @SafeVarargs
+        public final Metric findSingleMetricByNameAndLabels(String metricName, Pair<String, String>... nameValuePairs) {
+            List<Metric> metricByNameAndLabels = findByNameAndLabels(metricName, nameValuePairs);
+            if (metricByNameAndLabels.size() != 1) {
+                fail("Expected to find 1 metric, but found the following: "+metricByNameAndLabels +
+                ". Metrics are = "+nameToDataPoints.get(metricName)+". Labels requested = "+ Arrays.toString(
+                        nameValuePairs));
+            }
+            return metricByNameAndLabels.get(0);
+        }
+    }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
index 864b481b72a..a5f446e061c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
@@ -18,28 +18,36 @@
  */
 package org.apache.pulsar.broker.transaction.buffer;
 
+import static org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.Metric;
+import static org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.parseMetrics;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.concurrent.DefaultThreadFactory;
-import lombok.Cleanup;
 import java.io.ByteArrayOutputStream;
+import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.ExecutionException;
-
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
-import org.apache.pulsar.broker.stats.PrometheusMetricsTest;
 import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
 import org.apache.pulsar.broker.transaction.TransactionTestBase;
 import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientImpl;
@@ -71,14 +79,6 @@ import org.slf4j.LoggerFactory;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
-import java.lang.reflect.Field;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.TimeUnit;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.fail;
 
 @Test(groups = "broker")
 public class TransactionBufferClientTest extends TransactionTestBase {
@@ -229,28 +229,28 @@ public class TransactionBufferClientTest extends TransactionTestBase {
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
         PrometheusMetricsGenerator.generate(pulsarServiceList.get(0), true, false, false, statsOut);
         String metricsStr = statsOut.toString();
-        Multimap<String, PrometheusMetricsTest.Metric> metricsMap = PrometheusMetricsTest.parseMetrics(metricsStr);
+        Multimap<String, Metric> metricsMap = parseMetrics(metricsStr);
 
-        Collection<PrometheusMetricsTest.Metric> abortFailed = metricsMap.get("pulsar_txn_tb_client_abort_failed_total");
-        Collection<PrometheusMetricsTest.Metric> commitFailed = metricsMap.get("pulsar_txn_tb_client_commit_failed_total");
-        Collection<PrometheusMetricsTest.Metric> abortLatencyCount =
+        Collection<Metric> abortFailed = metricsMap.get("pulsar_txn_tb_client_abort_failed_total");
+        Collection<Metric> commitFailed = metricsMap.get("pulsar_txn_tb_client_commit_failed_total");
+        Collection<Metric> abortLatencyCount =
                 metricsMap.get("pulsar_txn_tb_client_abort_latency_count");
-        Collection<PrometheusMetricsTest.Metric> commitLatencyCount =
+        Collection<Metric> commitLatencyCount =
                 metricsMap.get("pulsar_txn_tb_client_commit_latency_count");
-        Collection<PrometheusMetricsTest.Metric> pending = metricsMap.get("pulsar_txn_tb_client_pending_requests");
+        Collection<Metric> pending = metricsMap.get("pulsar_txn_tb_client_pending_requests");
 
         assertEquals(abortFailed.stream().mapToDouble(metric -> metric.value).sum(), 0);
         assertEquals(commitFailed.stream().mapToDouble(metric -> metric.value).sum(), 0);
 
         for (int i = 0; i < partitions; i++) {
             String topic = partitionedTopicName.getPartition(i).toString();
-            Optional<PrometheusMetricsTest.Metric> optional = abortLatencyCount.stream()
+            Optional<Metric> optional = abortLatencyCount.stream()
                     .filter(metric -> metric.tags.get("topic").equals(topic)).findFirst();
 
             assertTrue(optional.isPresent());
             assertEquals(optional.get().value, 1D);
 
-            Optional<PrometheusMetricsTest.Metric> optional1 = commitLatencyCount.stream()
+            Optional<Metric> optional1 = commitLatencyCount.stream()
                     .filter(metric -> metric.tags.get("topic").equals(topic)).findFirst();
             assertTrue(optional1.isPresent());
             assertEquals(optional1.get().value, 1D);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
index bc537fb784f..6c24b6b3f01 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
@@ -19,6 +19,8 @@
 package org.apache.pulsar.broker.transaction.pendingack;
 
 
+import static org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.Metric;
+import static org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.parseMetrics;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.spy;
@@ -26,6 +28,7 @@ import static org.testng.AssertJUnit.assertFalse;
 import static org.testng.AssertJUnit.assertNotNull;
 import static org.testng.AssertJUnit.assertTrue;
 import static org.testng.AssertJUnit.fail;
+import com.google.common.collect.Multimap;
 import java.io.ByteArrayOutputStream;
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
@@ -38,7 +41,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
-import com.google.common.collect.Multimap;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.ManagedCursor;
@@ -49,7 +51,6 @@ import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
-import org.apache.pulsar.broker.stats.PrometheusMetricsTest;
 import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
 import org.apache.pulsar.broker.transaction.TransactionTestBase;
 import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
@@ -62,9 +63,9 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.transaction.Transaction;
+import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
-import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.client.impl.transaction.TransactionImpl;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.SystemTopicNames;
@@ -256,28 +257,28 @@ public class PendingAckPersistentTest extends TransactionTestBase {
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
         PrometheusMetricsGenerator.generate(pulsarServiceList.get(0), true, false, false, statsOut);
         String metricsStr = statsOut.toString();
-        Multimap<String, PrometheusMetricsTest.Metric> metricsMap = PrometheusMetricsTest.parseMetrics(metricsStr);
+        Multimap<String, Metric> metricsMap = parseMetrics(metricsStr);
 
-        Collection<PrometheusMetricsTest.Metric> abortedCount = metricsMap.get("pulsar_txn_tp_aborted_count_total");
-        Collection<PrometheusMetricsTest.Metric> committedCount = metricsMap.get("pulsar_txn_tp_committed_count_total");
-        Collection<PrometheusMetricsTest.Metric> commitLatency = metricsMap.get("pulsar_txn_tp_commit_latency");
+        Collection<Metric> abortedCount = metricsMap.get("pulsar_txn_tp_aborted_count_total");
+        Collection<Metric> committedCount = metricsMap.get("pulsar_txn_tp_committed_count_total");
+        Collection<Metric> commitLatency = metricsMap.get("pulsar_txn_tp_commit_latency");
         Assert.assertTrue(commitLatency.size() > 0);
 
         int count = 0;
-        for (PrometheusMetricsTest.Metric metric : commitLatency) {
+        for (Metric metric : commitLatency) {
             if (metric.tags.get("topic").endsWith(PENDING_ACK_REPLAY_TOPIC) && metric.value > 0) {
                 count++;
             }
         }
         Assert.assertTrue(count > 0);
 
-        for (PrometheusMetricsTest.Metric metric : abortedCount) {
+        for (Metric metric : abortedCount) {
             if (metric.tags.get("subscription").equals(subName) && metric.tags.get("status").equals("succeed")) {
                 assertTrue(metric.tags.get("topic").endsWith(PENDING_ACK_REPLAY_TOPIC));
                 assertTrue(metric.value > 0);
             }
         }
-        for (PrometheusMetricsTest.Metric metric : committedCount) {
+        for (Metric metric : committedCount) {
             if (metric.tags.get("subscription").equals(subName) && metric.tags.get("status").equals("succeed")) {
                 assertTrue(metric.tags.get("topic").endsWith(PENDING_ACK_REPLAY_TOPIC));
                 assertTrue(metric.value > 0);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java
index 405f3a11b5d..8fb95eed789 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.broker.web;
 
+import static org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.Metric;
+import static org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.parseMetrics;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
@@ -51,7 +53,6 @@ import lombok.Cleanup;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.broker.stats.PrometheusMetricsTest;
 import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
 import org.apache.pulsar.broker.testcontext.PulsarTestContext;
 import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -104,31 +105,31 @@ public class WebServiceTest {
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
         PrometheusMetricsGenerator.generate(pulsar, false, false, false, statsOut);
         String metricsStr = statsOut.toString();
-        Multimap<String, PrometheusMetricsTest.Metric> metrics = PrometheusMetricsTest.parseMetrics(metricsStr);
+        Multimap<String, Metric> metrics = parseMetrics(metricsStr);
 
-        Collection<PrometheusMetricsTest.Metric> maxThreads = metrics.get("pulsar_web_executor_max_threads");
-        Collection<PrometheusMetricsTest.Metric> minThreads = metrics.get("pulsar_web_executor_min_threads");
-        Collection<PrometheusMetricsTest.Metric> activeThreads = metrics.get("pulsar_web_executor_active_threads");
-        Collection<PrometheusMetricsTest.Metric> idleThreads = metrics.get("pulsar_web_executor_idle_threads");
-        Collection<PrometheusMetricsTest.Metric> currentThreads = metrics.get("pulsar_web_executor_current_threads");
+        Collection<Metric> maxThreads = metrics.get("pulsar_web_executor_max_threads");
+        Collection<Metric> minThreads = metrics.get("pulsar_web_executor_min_threads");
+        Collection<Metric> activeThreads = metrics.get("pulsar_web_executor_active_threads");
+        Collection<Metric> idleThreads = metrics.get("pulsar_web_executor_idle_threads");
+        Collection<Metric> currentThreads = metrics.get("pulsar_web_executor_current_threads");
 
-        for (PrometheusMetricsTest.Metric metric : maxThreads) {
+        for (Metric metric : maxThreads) {
             Assert.assertNotNull(metric.tags.get("cluster"));
             Assert.assertTrue(metric.value > 0);
         }
-        for (PrometheusMetricsTest.Metric metric : minThreads) {
+        for (Metric metric : minThreads) {
             Assert.assertNotNull(metric.tags.get("cluster"));
             Assert.assertTrue(metric.value > 0);
         }
-        for (PrometheusMetricsTest.Metric metric : activeThreads) {
+        for (Metric metric : activeThreads) {
             Assert.assertNotNull(metric.tags.get("cluster"));
             Assert.assertTrue(metric.value >= 0);
         }
-        for (PrometheusMetricsTest.Metric metric : idleThreads) {
+        for (Metric metric : idleThreads) {
             Assert.assertNotNull(metric.tags.get("cluster"));
             Assert.assertTrue(metric.value >= 0);
         }
-        for (PrometheusMetricsTest.Metric metric : currentThreads) {
+        for (Metric metric : currentThreads) {
             Assert.assertNotNull(metric.tags.get("cluster"));
             Assert.assertTrue(metric.value > 0);
         }
diff --git a/pulsar-broker/src/test/resources/log4j2.xml b/pulsar-broker/src/test/resources/log4j2.xml
new file mode 100644
index 00000000000..4038dd59b1d
--- /dev/null
+++ b/pulsar-broker/src/test/resources/log4j2.xml
@@ -0,0 +1,39 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<Configuration xmlns="http://logging.apache.org/log4j/2.0/config"
+               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+               xsi:schemaLocation="http://logging.apache.org/log4j/2.0/config https://logging.apache.org/log4j/2.0/log4j-core.xsd">
+  <Appenders>
+    <Console name="CONSOLE" target="SYSTEM_OUT">
+      <PatternLayout pattern="%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n"/>
+    </Console>
+  </Appenders>
+  <Loggers>
+<!--    <Logger name="org.apache.pulsar.broker.service.persistent.PersistentTopic" level="DEBUG" additivity="false">-->
+<!--       <AppenderRef ref="CONSOLE" />-->
+<!--    </Logger>-->
+
+    <Root level="INFO">
+      <AppenderRef ref="CONSOLE"/>
+    </Root>
+  </Loggers>
+</Configuration>
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java
index 985e42b280e..ac50763b7e0 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java
@@ -64,6 +64,31 @@ public interface TopicStats {
     /** Get the publish time of the earliest message over all the backlogs. */
     long getEarliestMsgPublishTimeInBacklogs();
 
+    /** the size in bytes of the topic backlog quota. */
+    long getBacklogQuotaLimitSize();
+
+    /** the topic backlog age quota, in seconds. */
+    long getBacklogQuotaLimitTime();
+
+    /**
+     * Age of oldest unacknowledged message, as recorded in last backlog quota check interval.
+     * <p>
+     * The age of the oldest unacknowledged (i.e. backlog) message, measured by the time elapsed from its published
+     * time, in seconds. This value is recorded every backlog quota check interval, hence it represents the value
+     * seen in the last check.
+     * </p>
+     */
+    long getOldestBacklogMessageAgeSeconds();
+
+    /**
+     * The subscription name containing oldest unacknowledged message as recorded in last backlog quota check.
+     * <p>
+     * The name of the subscription containing the oldest unacknowledged message. This value is recorded every backlog
+     * quota check interval, hence it represents the value seen in the last check.
+     * </p>
+     */
+    String getOldestBacklogMessageSubscriptionName();
+
     /** Space used to store the offloaded messages for the topic/. */
     long getOffloadedStorageSize();
 
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
index d24d674c018..beba2988e67 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
@@ -499,6 +499,9 @@ public interface Consumer<T> extends Closeable, MessageAcknowledger {
      * <li><code>MessageId.earliest</code> : Reset the subscription on the earliest message available in the topic
      * <li><code>MessageId.latest</code> : Reset the subscription on the latest message in the topic
      * </ul>
+     * <p>
+     * This effectively resets the acknowledgement state of the subscription: all messages up to and
+     * <b>including</b> <code>messageId</code> will be marked as acknowledged and the rest unacknowledged.
      *
      * <p>Note: For multi-topics consumer, if `messageId` is a {@link TopicMessageId}, the seek operation will happen
      * on the owner topic of the message, which is returned by {@link TopicMessageId#getOwnerTopic()}. Otherwise, you
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
index e022c885d66..70cf4cd3414 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
@@ -84,6 +84,31 @@ public class TopicStatsImpl implements TopicStats {
     /** Get estimated total unconsumed or backlog size in bytes. */
     public long backlogSize;
 
+    /** the size in bytes of the topic backlog quota. */
+    public long backlogQuotaLimitSize;
+
+    /** the topic backlog age quota, in seconds. */
+    public long backlogQuotaLimitTime;
+
+    /**
+     * Age of oldest unacknowledged message, as recorded in last backlog quota check interval.
+     * <p>
+     * The age of the oldest unacknowledged (i.e. backlog) message, measured by the time elapsed from its published
+     * time, in seconds. This value is recorded every backlog quota check interval, hence it represents the value
+     * seen in the last check.
+     * </p>
+     */
+    public long oldestBacklogMessageAgeSeconds;
+
+    /**
+     * The subscription name containing oldest unacknowledged message as recorded in last backlog quota check.
+     * <p>
+     * The name of the subscription containing the oldest unacknowledged message. This value is recorded every backlog
+     * quota check interval, hence it represents the value seen in the last check.
+     * </p>
+     */
+    public String oldestBacklogMessageSubscriptionName;
+
     /** The number of times the publishing rate limit was triggered. */
     public long publishRateLimitedTimes;
 
@@ -221,6 +246,10 @@ public class TopicStatsImpl implements TopicStats {
         this.compaction.reset();
         this.ownerBroker = null;
         this.bucketDelayedIndexStats.clear();
+        this.backlogQuotaLimitSize = 0;
+        this.backlogQuotaLimitTime = 0;
+        this.oldestBacklogMessageAgeSeconds = -1;
+        this.oldestBacklogMessageSubscriptionName = null;
     }
 
     // if the stats are added for the 1st time, we will need to make a copy of these stats and add it to the current
@@ -250,6 +279,12 @@ public class TopicStatsImpl implements TopicStats {
         this.ongoingTxnCount = stats.ongoingTxnCount;
         this.abortedTxnCount = stats.abortedTxnCount;
         this.committedTxnCount = stats.committedTxnCount;
+        this.backlogQuotaLimitTime = stats.backlogQuotaLimitTime;
+        this.backlogQuotaLimitSize = stats.backlogQuotaLimitSize;
+        if (stats.oldestBacklogMessageAgeSeconds > this.oldestBacklogMessageAgeSeconds) {
+            this.oldestBacklogMessageAgeSeconds = stats.oldestBacklogMessageAgeSeconds;
+            this.oldestBacklogMessageSubscriptionName = stats.oldestBacklogMessageSubscriptionName;
+        }
 
         stats.bucketDelayedIndexStats.forEach((k, v) -> {
             TopicMetricBean topicMetricBean =