You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by yo...@apache.org on 2021/02/25 11:44:54 UTC

[pulsar] branch branch-2.7 updated: [Issue 9535] Add metrics for the cursor ack state (#9618)

This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new 0f02424  [Issue 9535] Add metrics for the cursor ack state (#9618)
0f02424 is described below

commit 0f024242a920aff7e53158d989319bba0ebdb8ac
Author: limingnihao <li...@live.com>
AuthorDate: Mon Feb 22 16:14:09 2021 +0800

    [Issue 9535] Add metrics for the cursor ack state (#9618)
    
    Fixes #9535
    
    The acknowledgment state is persistent to the Ledger or Zookeeper(can't persistent to the Ledger). But currently, we don't have any metrics for the persistence of the acknowledgment state such as the total count of the success/failed persistent operations.
    
    Add metrics for the ManagedCursor:
    
    brk_ml_cursor_persistLedgerSucceed(namespace="", ledger_name="", cursor_name:"")
    brk_ml_cursor_persistLedgerErrors(namespace="", ledger_name="", cursor_name:"")
    brk_ml_cursor_persistZookeeperSucceed(namespace="", ledger_name="", cursor_name:"")
    brk_ml_cursor_persistZookeeperErrors(namespace="", ledger_name="", cursor_name:"")
    
    (cherry picked from commit 99a907b845d1d4682bccf92c9809968ee15413f0)
---
 .../apache/bookkeeper/mledger/ManagedCursor.java   |   6 ++
 .../bookkeeper/mledger/ManagedCursorMXBean.java    |  73 +++++++++++++++
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |  13 +++
 .../mledger/impl/ManagedCursorMXBeanImpl.java      |  87 +++++++++++++++++
 .../mledger/impl/ManagedCursorContainerTest.java   |   6 ++
 .../broker/stats/metrics/ManagedCursorMetrics.java |  83 +++++++++++++++++
 .../prometheus/PrometheusMetricsGenerator.java     |   5 +
 .../broker/stats/ManagedCursorMetricsTest.java     | 103 +++++++++++++++++++++
 .../pulsar/broker/stats/PrometheusMetricsTest.java |  38 ++++++++
 .../bookkeeper/client/PulsarMockBookKeeper.java    |   4 +
 10 files changed, 418 insertions(+)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
index 62ba829..a9724de 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
@@ -667,4 +667,10 @@ public interface ManagedCursor {
      * Get deleted batch indexes list for a batch message.
      */
     long[] getDeletedBatchIndexesAsLongArray(PositionImpl position);
+
+    /**
+     * @return the managed cursor stats MBean
+     */
+    ManagedCursorMXBean getStats();
+
 }
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursorMXBean.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursorMXBean.java
new file mode 100644
index 0000000..ffc0af2
--- /dev/null
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursorMXBean.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger;
+
+import org.apache.bookkeeper.common.annotation.InterfaceAudience;
+import org.apache.bookkeeper.common.annotation.InterfaceStability;
+
+/**
+ * JMX Bean interface for ManagedCursor stats.
+ */
+@InterfaceAudience.LimitedPrivate
+@InterfaceStability.Stable
+public interface ManagedCursorMXBean {
+
+    /**
+     * @return the ManagedCursor name
+     */
+    String getName();
+
+    /**
+     * @return the ManagedLedger name
+     */
+    String getLedgerName();
+
+    /**
+     * persist cursor by ledger
+     * @param success
+     */
+    void persistToLedger(boolean success);
+
+    /**
+     * persist cursor by zookeeper
+     * @param success
+     */
+    void persistToZookeeper(boolean success);
+
+    /**
+     * @return the number of persist cursor by ledger that succeed
+     */
+    long getPersistLedgerSucceed();
+
+    /**
+     * @return the number of persist cursor by ledger that failed
+     */
+    long getPersistLedgerErrors();
+
+    /**
+     * @return the number of persist cursor by zookeeper that succeed
+     */
+    long getPersistZookeeperSucceed();
+
+    /**
+     * @return the number of persist cursor by zookeeper that failed
+     */
+    long getPersistZookeeperErrors();
+
+}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index a0ba634..3bfe45f 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -86,6 +86,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.ManagedLedgerException.CursorAlreadyClosedException;
 import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException;
 import org.apache.bookkeeper.mledger.ManagedLedgerException.NoMoreEntriesToReadException;
+import org.apache.bookkeeper.mledger.ManagedCursorMXBean;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.PositionBound;
 import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
@@ -232,6 +233,8 @@ public class ManagedCursorImpl implements ManagedCursor {
         AtomicReferenceFieldUpdater.newUpdater(ManagedCursorImpl.class, State.class, "state");
     protected volatile State state = null;
 
+    protected final ManagedCursorMXBean mbean;
+
     @SuppressWarnings("checkstyle:javadoctype")
     public interface VoidCallback {
         void operationComplete();
@@ -268,6 +271,7 @@ public class ManagedCursorImpl implements ManagedCursor {
             // Disable mark-delete rate limiter
             markDeleteLimiter = null;
         }
+        this.mbean = new ManagedCursorMXBeanImpl(this);
     }
 
     @Override
@@ -2515,6 +2519,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                     startCreatingNewMetadataLedger();
                 }
 
+                mbean.persistToLedger(true);
                 callback.operationComplete();
             } else {
                 log.warn("[{}] Error updating cursor {} position {} in meta-ledger {}: {}", ledger.getName(), name,
@@ -2523,6 +2528,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                 // in the meantime the mark-delete will be queued.
                 STATE_UPDATER.compareAndSet(ManagedCursorImpl.this, State.Open, State.NoLedger);
 
+                mbean.persistToLedger(false);
                 // Before giving up, try to persist the position in the metadata store
                 persistPositionMetaStore(-1, position, mdEntry.properties, new MetaStoreCallback<Void>() {
                     @Override
@@ -2532,6 +2538,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                                     "[{}][{}] Updated cursor in meta store after previous failure in ledger at position {}",
                                     ledger.getName(), name, position);
                         }
+                        mbean.persistToZookeeper(true);
                         callback.operationComplete();
                     }
 
@@ -2539,6 +2546,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                     public void operationFailed(MetaStoreException e) {
                         log.warn("[{}][{}] Failed to update cursor in meta store after previous failure in ledger: {}",
                                 ledger.getName(), name, e.getMessage());
+                        mbean.persistToZookeeper(false);
                         callback.operationFailed(createManagedLedgerException(rc));
                     }
                 }, true);
@@ -2902,6 +2910,11 @@ public class ManagedCursorImpl implements ManagedCursor {
         }
     }
 
+    @Override
+    public ManagedCursorMXBean getStats() {
+        return this.mbean;
+    }
+
     void updateReadStats(int readEntriesCount, long readEntriesSize) {
         this.entriesReadCount += readEntriesCount;
         this.entriesReadSize += readEntriesSize;
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorMXBeanImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorMXBeanImpl.java
new file mode 100644
index 0000000..b56d69f
--- /dev/null
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorMXBeanImpl.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl;
+
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedCursorMXBean;
+
+import java.util.concurrent.atomic.LongAdder;
+
+public class ManagedCursorMXBeanImpl implements ManagedCursorMXBean {
+
+    private final LongAdder persistLedgeSucceed = new LongAdder();
+    private final LongAdder persistLedgeFailed = new LongAdder();
+
+    private final LongAdder persistZookeeperSucceed = new LongAdder();
+    private final LongAdder persistZookeeperFailed = new LongAdder();
+
+    private final ManagedCursor managedCursor;
+
+    public ManagedCursorMXBeanImpl(ManagedCursor managedCursor) {
+        this.managedCursor = managedCursor;
+    }
+
+    @Override
+    public String getName() {
+        return this.managedCursor.getName();
+    }
+
+    @Override
+    public String getLedgerName() {
+        return this.managedCursor.getManagedLedger().getName();
+    }
+
+    @Override
+    public void persistToLedger(boolean success) {
+        if (success) {
+            persistLedgeSucceed.increment();
+        } else {
+            persistLedgeFailed.increment();
+        }
+    }
+
+    @Override
+    public void persistToZookeeper(boolean success) {
+        if (success) {
+            persistZookeeperSucceed.increment();
+        } else {
+            persistZookeeperFailed.increment();
+        }
+    }
+
+    @Override
+    public long getPersistLedgerSucceed() {
+        return persistLedgeSucceed.longValue();
+    }
+
+    @Override
+    public long getPersistLedgerErrors() {
+        return persistLedgeFailed.longValue();
+    }
+
+    @Override
+    public long getPersistZookeeperSucceed() {
+        return persistZookeeperSucceed.longValue();
+    }
+
+    @Override
+    public long getPersistZookeeperErrors() {
+        return persistZookeeperFailed.longValue();
+    }
+}
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
index f2766c2..d0b0b2c 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
@@ -41,6 +41,7 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.SkipEntriesCallback;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedCursorMXBean;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.Position;
@@ -342,6 +343,11 @@ public class ManagedCursorContainerTest {
             return new long[0];
         }
 
+        @Override
+        public ManagedCursorMXBean getStats() {
+            return null;
+        }
+
         public void asyncReadEntriesOrWait(int maxEntries, long maxSizeBytes, ReadEntriesCallback callback,
                 Object ctx) {
         }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedCursorMetrics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedCursorMetrics.java
new file mode 100644
index 0000000..e888c93
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedCursorMetrics.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats.metrics;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedCursorMXBean;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorContainer;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.common.stats.Metrics;
+
+public class ManagedCursorMetrics extends AbstractMetrics {
+
+    private Map<String, String> dimensionMap;
+    private List<Metrics> metricsCollection;
+
+    public ManagedCursorMetrics(PulsarService pulsar) {
+        super(pulsar);
+        this.metricsCollection = Lists.newArrayList();
+        this.dimensionMap = Maps.newHashMap();
+    }
+
+    @Override
+    public synchronized List<Metrics> generate() {
+        return aggregate();
+    }
+
+
+    /**
+     * Aggregation by namespace, ledger, cursor.
+     *
+     * @return List<Metrics>
+     */
+    private List<Metrics> aggregate() {
+        metricsCollection.clear();
+        for (Map.Entry<String, ManagedLedgerImpl> e : getManagedLedgers().entrySet()) {
+            String ledgerName = e.getKey();
+            ManagedLedgerImpl ledger = e.getValue();
+            String namespace = parseNamespaceFromLedgerName(ledgerName);
+
+            ManagedCursorContainer cursorContainer = ledger.getCursors();
+            Iterator<ManagedCursor> cursorIterator = cursorContainer.iterator();
+
+            while (cursorIterator.hasNext()) {
+                ManagedCursorImpl cursor = (ManagedCursorImpl) cursorIterator.next();
+                ManagedCursorMXBean cStats = cursor.getStats();
+                dimensionMap.clear();
+                dimensionMap.put("namespace", namespace);
+                dimensionMap.put("ledger_name", ledgerName);
+                dimensionMap.put("cursor_name", cursor.getName());
+                Metrics metrics = createMetrics(dimensionMap);
+                metrics.put("brk_ml_cursor_persistLedgerSucceed", cStats.getPersistLedgerSucceed());
+                metrics.put("brk_ml_cursor_persistLedgerErrors", cStats.getPersistLedgerErrors());
+                metrics.put("brk_ml_cursor_persistZookeeperSucceed", cStats.getPersistZookeeperSucceed());
+                metrics.put("brk_ml_cursor_persistZookeeperErrors", cStats.getPersistZookeeperErrors());
+                metricsCollection.add(metrics);
+            }
+        }
+        return metricsCollection;
+    }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
index bd7ede4..be85249 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
@@ -35,6 +35,7 @@ import org.apache.pulsar.PulsarVersion;
 import org.apache.pulsar.broker.PulsarService;
 import static org.apache.pulsar.common.stats.JvmMetrics.getJvmDirectMemoryUsed;
 
+import org.apache.pulsar.broker.stats.metrics.ManagedCursorMetrics;
 import org.apache.pulsar.broker.stats.metrics.ManagedLedgerCacheMetrics;
 import org.apache.pulsar.broker.stats.metrics.ManagedLedgerMetrics;
 import org.apache.pulsar.common.stats.Metrics;
@@ -128,6 +129,10 @@ public class PrometheusMetricsGenerator {
         parseMetricsToPrometheusMetrics(new ManagedLedgerMetrics(pulsar).generate(),
                 clusterName, Collector.Type.GAUGE, stream);
 
+        // generate managedCursor metrics
+        parseMetricsToPrometheusMetrics(new ManagedCursorMetrics(pulsar).generate(),
+                clusterName, Collector.Type.GAUGE, stream);
+
         // generate loadBalance metrics
         parseMetricsToPrometheusMetrics(pulsar.getLoadManager().get().getLoadBalancingMetrics(),
                 clusterName, Collector.Type.GAUGE, stream);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java
new file mode 100644
index 0000000..d7fc3f01
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats;
+
+import org.apache.bookkeeper.client.PulsarMockLedgerHandle;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.broker.stats.metrics.ManagedCursorMetrics;
+import org.apache.pulsar.broker.stats.metrics.ManagedLedgerMetrics;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.stats.Metrics;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+
+/**
+ */
+public class ManagedCursorMetricsTest extends MockedPulsarServiceBaseTest {
+
+    @BeforeClass
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+    }
+
+    @AfterClass(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testManagedCursorMetrics() throws Exception {
+        final String subName = "my-sub";
+        final String topicName = "persistent://my-namespace/use/my-ns/my-topic1";
+        final int messageSize = 10;
+
+        ManagedCursorMetrics metrics = new ManagedCursorMetrics(pulsar);
+
+        List<Metrics> metricsList = metrics.generate();
+        Assert.assertTrue(metricsList.isEmpty());
+
+        metricsList = metrics.generate();
+        Assert.assertTrue(metricsList.isEmpty());
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topicName)
+                .subscriptionType(SubscriptionType.Shared)
+                .ackTimeout(1, TimeUnit.SECONDS)
+                .subscriptionName(subName)
+                .subscribe();
+
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topicName)
+                .create();
+
+        for(PulsarMockLedgerHandle ledgerHandle : mockBookKeeper.getLedgerMap().values()) {
+            ledgerHandle.close();
+        }
+
+        for (int i = 0; i < messageSize; i++) {
+            String message = "my-message-" + i;
+            producer.send(message.getBytes());
+            consumer.acknowledge(consumer.receive().getMessageId());
+        }
+        metricsList = metrics.generate();
+        Assert.assertTrue(!metricsList.isEmpty());
+        Assert.assertNotEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistLedgerSucceed"), 0L);
+        Assert.assertNotEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistLedgerErrors"), 0L);
+        Assert.assertNotEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistZookeeperSucceed"), 0L);
+        Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistZookeeperErrors"), 0L);
+    }
+
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index a266224..b0c7e32 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -67,6 +67,7 @@ import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.SubscriptionType;
 import org.awaitility.Awaitility;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -881,6 +882,43 @@ public class PrometheusMetricsTest extends BrokerTestBase {
         provider.close();
     }
 
+    @Test
+    public void testManagedCursorPersistStats() throws Exception {
+        final String subName = "my-sub";
+        final String topicName = "persistent://my-namespace/use/my-ns/my-topic1";
+        final int messageSize = 10;
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topicName)
+                .subscriptionType(SubscriptionType.Shared)
+                .ackTimeout(1, TimeUnit.SECONDS)
+                .subscriptionName(subName)
+                .subscribe();
+
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topicName)
+                .create();
+        for (int i = 0; i < messageSize; i++) {
+            String message = "my-message-" + i;
+            producer.send(message.getBytes());
+            consumer.acknowledge(consumer.receive().getMessageId());
+        }
+
+        ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
+        PrometheusMetricsGenerator.generate(pulsar, true, false, statsOut);
+        String metricsStr = new String(statsOut.toByteArray());
+
+        Multimap<String, Metric> metrics = parseMetrics(metricsStr);
+
+        List<Metric> cm = (List<Metric>) metrics.get("pulsar_ml_cursor_persistLedgerSucceed");
+        assertEquals(cm.size(), 1);
+        assertEquals(cm.get(0).tags.get("cluster"), "test");
+        assertEquals(cm.get(0).tags.get("cursor_name"), subName);
+
+        producer.close();
+        consumer.close();
+    }
+
     /**
      * Hacky parsing of Prometheus text format. Sould be good enough for unit tests
      */
diff --git a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
index 291544e..37d1cd2 100644
--- a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
+++ b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
@@ -265,6 +265,10 @@ public class PulsarMockBookKeeper extends BookKeeper {
         return ledgers.keySet();
     }
 
+    public Map<Long, PulsarMockLedgerHandle> getLedgerMap() {
+        return ledgers;
+    }
+
     void checkProgrammedFail() throws BKException, InterruptedException {
         try {
             getProgrammedFailure().get();