You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/06/08 00:53:30 UTC

[pulsar] branch master updated: Expose managed ledger bookie client metric to prometheus (#6814)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b983abf  Expose managed ledger bookie client metric to prometheus (#6814)
b983abf is described below

commit b983abf6e2648334bf5af21f4aa2227baf4b8b37
Author: hangc0276 <ha...@163.com>
AuthorDate: Mon Jun 8 08:53:18 2020 +0800

    Expose managed ledger bookie client metric to prometheus (#6814)
    
    ### Motivation
    Pulsar use bookkeeper as distributed log storage, and init a bookie client to read/write data from/to bookkeeper. However the pulsar bookie client use default ` NullStatsLogger.INSTANCE` to expose runtime metric, which doesn't expose to prometheus or other state storage. When tuning pulsar bookie client performance, we doesn't have any bookie metric to measure where is the bottleneck.
    
    ### Changes
    I implement a prometheus state provider, and use it to trace bookie client runtime metric, and expose it to prometheus.
---
 conf/broker.conf                                   |   9 +
 conf/standalone.conf                               |   9 +
 .../bookkeeper/mledger/ManagedLedgerFactory.java   |   1 +
 .../mledger/ManagedLedgerFactoryConfig.java        |  15 ++
 .../mledger/impl/ManagedLedgerFactoryImpl.java     |  30 +++-
 .../apache/pulsar/broker/ServiceConfiguration.java |  19 ++
 .../pulsar/broker/BookKeeperClientFactory.java     |   5 +
 .../pulsar/broker/BookKeeperClientFactoryImpl.java |  13 +-
 .../pulsar/broker/ManagedLedgerClientFactory.java  |  28 ++-
 .../prometheus/PrometheusMetricsGenerator.java     |  21 +++
 .../metrics/DataSketchesOpStatsLogger.java         | 199 +++++++++++++++++++++
 .../metrics/PrometheusMetricsProvider.java         | 129 +++++++++++++
 .../prometheus/metrics/PrometheusStatsLogger.java  |  76 ++++++++
 .../metrics/PrometheusTextFormatUtil.java          | 156 ++++++++++++++++
 .../prometheus/metrics/SimpleGauge.java}           |  28 ++-
 .../broker/MockedBookKeeperClientFactory.java      |   8 +
 .../broker/auth/MockedPulsarServiceBaseTest.java   |  11 ++
 .../pulsar/broker/stats/PrometheusMetricsTest.java |  44 +++++
 site2/docs/reference-metrics.md                    |  21 ++-
 19 files changed, 797 insertions(+), 25 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 523f7ad..05c2747 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -691,6 +691,9 @@ bookkeeperDiskWeightBasedPlacementEnabled=false
 # A value of '0' disables sending any explicit LACs. Default is 0.
 bookkeeperExplicitLacIntervalInMills=0
 
+# Expose bookkeeper client managed ledger stats to prometheus. default is false
+# bookkeeperClientExposeStatsToPrometheus=false
+
 ### --- Managed Ledger --- ###
 
 # Number of bookies to use when creating a ledger
@@ -793,6 +796,12 @@ managedLedgerReadEntryTimeoutSeconds=0
 # Add entry timeout when broker tries to publish message to bookkeeper (0 to disable it).
 managedLedgerAddEntryTimeoutSeconds=0
 
+# Managed ledger prometheus stats latency rollover seconds (default: 60s)
+managedLedgerPrometheusStatsLatencyRolloverSeconds=60
+
+# Whether trace managed ledger task execution time
+managedLedgerTraceTaskExecution=true
+
 # New entries check delay for the cursor under the managed ledger.
 # If no new messages in the topic, the cursor will try to check again after the delay time.
 # For consumption latency sensitive scenario, can set to a smaller value or set to 0.
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 79b520b..2efc651 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -473,6 +473,9 @@ bookkeeperDiskWeightBasedPlacementEnabled=false
 # A value of '0' disables sending any explicit LACs. Default is 0.
 bookkeeperExplicitLacIntervalInMills=0
 
+# Expose bookkeeper client managed ledger stats to prometheus. default is false
+# bookkeeperClientExposeStatsToPrometheus=false
+
 ### --- Managed Ledger --- ###
 
 # Number of bookies to use when creating a ledger
@@ -576,6 +579,12 @@ managedLedgerNewEntriesCheckDelayInMillis=10
 # Use Open Range-Set to cache unacked messages
 managedLedgerUnackedRangesOpenCacheSetEnabled=true
 
+# Managed ledger prometheus stats latency rollover seconds (default: 60s)
+managedLedgerPrometheusStatsLatencyRolloverSeconds=60
+
+# Whether trace managed ledger task execution time
+managedLedgerTraceTaskExecution=true
+
 ### --- Load balancer --- ###
 
 loadManagerClassName=org.apache.pulsar.broker.loadbalance.NoopLoadManager
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java
index 8e94e95..7e1f086 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java
@@ -26,6 +26,7 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteLedgerCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ManagedLedgerInfoCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenLedgerCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenReadOnlyCursorCallback;
+import org.apache.bookkeeper.stats.StatsProvider;
 
 /**
  * A factory to open/create managed ledgers and delete them.
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java
index 4d1eb31..e42befc 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java
@@ -51,4 +51,19 @@ public class ManagedLedgerFactoryConfig {
      * Whether we should make a copy of the entry payloads when inserting in cache
      */
     private boolean copyEntriesInCache = false;
+
+    /**
+     * Whether trace managed ledger task execution time
+     */
+    private boolean traceTaskExecution = true;
+
+    /**
+     * Managed ledger prometheus stats Latency Rollover Seconds
+     */
+    private int prometheusStatsLatencyRolloverSeconds = 60;
+
+    /**
+     * cluster name for prometheus stats
+     */
+    private String clusterName;
 }
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index 36bae4c..517c62c 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -76,6 +76,8 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats.LongProperty;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.MessageRange;
 import org.apache.bookkeeper.mledger.util.Futures;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
 import org.apache.pulsar.common.util.DateFormatter;
 import org.apache.pulsar.metadata.api.MetadataStore;
@@ -124,7 +126,8 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
 
     private ManagedLedgerFactoryImpl(ZooKeeper zkc, ClientConfiguration bkClientConfiguration,
             ManagedLedgerFactoryConfig config) throws Exception {
-        this(new DefaultBkFactory(bkClientConfiguration, zkc), true /* isBookkeeperManaged */, zkc, config);
+        this(new DefaultBkFactory(bkClientConfiguration, zkc), true /* isBookkeeperManaged */,
+                zkc, config, NullStatsLogger.INSTANCE);
     }
 
     private ManagedLedgerFactoryImpl(ClientConfiguration clientConfiguration, String zkConnection, ManagedLedgerFactoryConfig config) throws Exception {
@@ -132,7 +135,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
             true,
             ZooKeeperClient.newBuilder()
                 .connectString(zkConnection)
-                .sessionTimeoutMs(clientConfiguration.getZkTimeout()).build(), config);
+                .sessionTimeoutMs(clientConfiguration.getZkTimeout()).build(), config, NullStatsLogger.INSTANCE);
     }
 
     public ManagedLedgerFactoryImpl(BookKeeper bookKeeper, ZooKeeper zooKeeper) throws Exception {
@@ -141,22 +144,35 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
 
     public ManagedLedgerFactoryImpl(BookKeeper bookKeeper, ZooKeeper zooKeeper, ManagedLedgerFactoryConfig config)
             throws Exception {
-        this((policyConfig) -> bookKeeper, false /* isBookkeeperManaged */, zooKeeper, config);
+        this((policyConfig) -> bookKeeper, false /* isBookkeeperManaged */,
+                zooKeeper, config, NullStatsLogger.INSTANCE);
     }
 
-    public ManagedLedgerFactoryImpl(BookkeeperFactoryForCustomEnsemblePlacementPolicy bookKeeperGroupFactory, ZooKeeper zooKeeper, ManagedLedgerFactoryConfig config)
+    public ManagedLedgerFactoryImpl(BookkeeperFactoryForCustomEnsemblePlacementPolicy bookKeeperGroupFactory,
+                                    ZooKeeper zooKeeper, ManagedLedgerFactoryConfig config)
             throws Exception {
-        this(bookKeeperGroupFactory, false /* isBookkeeperManaged */, zooKeeper, config);
+        this(bookKeeperGroupFactory, false /* isBookkeeperManaged */, zooKeeper, config, NullStatsLogger.INSTANCE);
     }
 
-    private ManagedLedgerFactoryImpl(BookkeeperFactoryForCustomEnsemblePlacementPolicy bookKeeperGroupFactory, boolean isBookkeeperManaged, ZooKeeper zooKeeper,
-            ManagedLedgerFactoryConfig config) throws Exception {
+    public ManagedLedgerFactoryImpl(BookkeeperFactoryForCustomEnsemblePlacementPolicy bookKeeperGroupFactory,
+                                    ZooKeeper zooKeeper, ManagedLedgerFactoryConfig config, StatsLogger statsLogger)
+            throws Exception {
+        this(bookKeeperGroupFactory, false /* isBookkeeperManaged */, zooKeeper, config, statsLogger);
+    }
+
+    private ManagedLedgerFactoryImpl(BookkeeperFactoryForCustomEnsemblePlacementPolicy bookKeeperGroupFactory,
+                                     boolean isBookkeeperManaged, ZooKeeper zooKeeper,
+                                     ManagedLedgerFactoryConfig config, StatsLogger statsLogger) throws Exception {
         scheduledExecutor = OrderedScheduler.newSchedulerBuilder()
                 .numThreads(config.getNumManagedLedgerSchedulerThreads())
+                .statsLogger(statsLogger)
+                .traceTaskExecution(config.isTraceTaskExecution())
                 .name("bookkeeper-ml-scheduler")
                 .build();
         orderedExecutor = OrderedExecutor.newBuilder()
                 .numThreads(config.getNumManagedLedgerWorkerThreads())
+                .statsLogger(statsLogger)
+                .traceTaskExecution(config.isTraceTaskExecution())
                 .name("bookkeeper-ml-workers")
                 .build();
         cacheEvictionExecutor = Executors
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index b91fe0a..340a19d 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1029,6 +1029,12 @@ public class ServiceConfiguration implements PulsarConfiguration {
     @FieldContext(category = CATEGORY_STORAGE_BK, doc = "Set the interval to check the need for sending an explicit LAC")
     private int bookkeeperExplicitLacIntervalInMills = 0;
 
+    @FieldContext(
+        category = CATEGORY_STORAGE_BK,
+        doc = "whether expose managed ledger client stats to prometheus"
+    )
+    private boolean bookkeeperClientExposeStatsToPrometheus = false;
+
     /**** --- Managed Ledger --- ****/
     @FieldContext(
         minValue = 1,
@@ -1231,6 +1237,19 @@ public class ServiceConfiguration implements PulsarConfiguration {
             doc = "Add entry timeout when broker tries to publish message to bookkeeper.(0 to disable it)")
     private long managedLedgerAddEntryTimeoutSeconds = 0;
 
+    @FieldContext(
+            category = CATEGORY_STORAGE_ML,
+            doc = "Managed ledger prometheus stats latency rollover seconds"
+    )
+    private int managedLedgerPrometheusStatsLatencyRolloverSeconds = 60;
+
+    @FieldContext(
+            dynamic = true,
+            category = CATEGORY_STORAGE_ML,
+            doc = "Whether trace managed ledger task execution time"
+    )
+    private boolean managedLedgerTraceTaskExecution = true;
+
     @FieldContext(category = CATEGORY_STORAGE_ML,
             doc = "New entries check delay for the cursor under the managed ledger. \n"
                     + "If no new messages in the topic, the cursor will try to check again after the delay time. \n"
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactory.java
index c0c43ab..476857a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactory.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactory.java
@@ -24,6 +24,7 @@ import java.util.Optional;
 
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
+import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.zookeeper.ZooKeeper;
 
@@ -35,5 +36,9 @@ public interface BookKeeperClientFactory {
             Optional<Class<? extends EnsemblePlacementPolicy>> ensemblePlacementPolicyClass,
             Map<String, Object> ensemblePlacementPolicyProperties) throws IOException;
 
+    BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient,
+                      Optional<Class<? extends EnsemblePlacementPolicy>> ensemblePlacementPolicyClass,
+                      Map<String, Object> ensemblePlacementPolicyProperties,
+                      StatsLogger statsLogger) throws IOException;
     void close();
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
index 37e8494..26f56d0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
@@ -37,6 +37,8 @@ import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
 import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy;
 import org.apache.bookkeeper.client.RegionAwareEnsemblePlacementPolicy;
 import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.zookeeper.ZkBookieRackAffinityMapping;
@@ -54,7 +56,15 @@ public class BookKeeperClientFactoryImpl implements BookKeeperClientFactory {
 
     @Override
     public BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient,
-            Optional<Class<? extends EnsemblePlacementPolicy>> ensemblePlacementPolicyClass, Map<String, Object> properties) throws IOException {
+                             Optional<Class<? extends EnsemblePlacementPolicy>> ensemblePlacementPolicyClass,
+                             Map<String, Object> properties) throws IOException {
+        return create(conf, zkClient, ensemblePlacementPolicyClass, properties, NullStatsLogger.INSTANCE);
+    }
+
+    @Override
+    public BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient,
+                             Optional<Class<? extends EnsemblePlacementPolicy>> ensemblePlacementPolicyClass,
+                             Map<String, Object> properties, StatsLogger statsLogger) throws IOException {
         ClientConfiguration bkConf = createBkClientConfiguration(conf);
         if (properties != null) {
             properties.forEach((key, value) -> bkConf.setProperty(key, value));
@@ -67,6 +77,7 @@ public class BookKeeperClientFactoryImpl implements BookKeeperClientFactory {
         try {
             return BookKeeper.forConfig(bkConf)
                     .allocator(PulsarByteBufAllocator.DEFAULT)
+                    .statsLogger(statsLogger)
                     .build();
         } catch (InterruptedException | BKException e) {
             throw new IOException(e);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
index 09fea0c..d96b32b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
@@ -25,12 +25,18 @@ import java.util.Optional;
 import java.util.concurrent.RejectedExecutionException;
 
 import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl.BookkeeperFactoryForCustomEnsemblePlacementPolicy;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl.EnsemblePlacementPolicyConfig;
+import org.apache.bookkeeper.stats.NullStatsProvider;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stats.StatsProvider;
+import org.apache.commons.configuration.Configuration;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.stats.prometheus.metrics.PrometheusMetricsProvider;
 import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,6 +51,7 @@ public class ManagedLedgerClientFactory implements Closeable {
     private final ManagedLedgerFactory managedLedgerFactory;
     private final BookKeeper defaultBkClient;
     private final Map<EnsemblePlacementPolicyConfig, BookKeeper> bkEnsemblePolicyToBkClientMap = Maps.newConcurrentMap();
+    private StatsProvider statsProvider = new NullStatsProvider();
 
     public ManagedLedgerClientFactory(ServiceConfiguration conf, ZooKeeper zkClient,
             BookKeeperClientFactory bookkeeperProvider) throws Exception {
@@ -56,6 +63,19 @@ public class ManagedLedgerClientFactory implements Closeable {
         managedLedgerFactoryConfig.setCacheEvictionFrequency(conf.getManagedLedgerCacheEvictionFrequency());
         managedLedgerFactoryConfig.setCacheEvictionTimeThresholdMillis(conf.getManagedLedgerCacheEvictionTimeThresholdMillis());
         managedLedgerFactoryConfig.setCopyEntriesInCache(conf.isManagedLedgerCacheCopyEntries());
+        managedLedgerFactoryConfig.setPrometheusStatsLatencyRolloverSeconds(conf.getManagedLedgerPrometheusStatsLatencyRolloverSeconds());
+        managedLedgerFactoryConfig.setTraceTaskExecution(conf.isManagedLedgerTraceTaskExecution());
+
+        Configuration configuration = new ClientConfiguration();
+        if (conf.isBookkeeperClientExposeStatsToPrometheus()) {
+            configuration.addProperty(PrometheusMetricsProvider.PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS,
+                    conf.getManagedLedgerPrometheusStatsLatencyRolloverSeconds());
+            configuration.addProperty(PrometheusMetricsProvider.CLUSTER_NAME, conf.getClusterName());
+            statsProvider = new PrometheusMetricsProvider();
+        }
+
+        statsProvider.start(configuration);
+        StatsLogger statsLogger = statsProvider.getStatsLogger("pulsar_managedLedger_client");
 
         this.defaultBkClient = bookkeeperProvider.create(conf, zkClient, Optional.empty(), null);
 
@@ -80,7 +100,7 @@ public class ManagedLedgerClientFactory implements Closeable {
             return bkClient != null ? bkClient : defaultBkClient;
         };
 
-        this.managedLedgerFactory = new ManagedLedgerFactoryImpl(bkFactory, zkClient, managedLedgerFactoryConfig);
+        this.managedLedgerFactory = new ManagedLedgerFactoryImpl(bkFactory, zkClient, managedLedgerFactoryConfig, statsLogger);
     }
 
     public ManagedLedgerFactory getManagedLedgerFactory() {
@@ -91,16 +111,22 @@ public class ManagedLedgerClientFactory implements Closeable {
         return defaultBkClient;
     }
 
+    public StatsProvider getStatsProvider() {
+        return statsProvider;
+    }
+
     @VisibleForTesting
     public Map<EnsemblePlacementPolicyConfig, BookKeeper> getBkEnsemblePolicyToBookKeeperMap() {
         return bkEnsemblePolicyToBkClientMap;
     }
 
+    @Override
     public void close() throws IOException {
         try {
             managedLedgerFactory.shutdown();
             log.info("Closed managed ledger factory");
 
+            statsProvider.stop();
             try {
                 defaultBkClient.close();
             } catch (RejectedExecutionException ree) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
index 5796c8e..745eef8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
@@ -24,8 +24,12 @@ import java.util.Collection;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.io.StringWriter;
+import java.io.Writer;
 import java.util.Enumeration;
 
+import org.apache.bookkeeper.stats.NullStatsProvider;
+import org.apache.bookkeeper.stats.StatsProvider;
 import org.apache.pulsar.broker.PulsarService;
 import static org.apache.pulsar.common.stats.JvmMetrics.getJvmDirectMemoryUsed;
 
@@ -85,6 +89,8 @@ public class PrometheusMetricsGenerator {
 
             generateBrokerBasicMetrics(pulsar, stream);
 
+            generateManagedLedgerBookieClientMetrics(pulsar, stream);
+
             out.write(buf.array(), buf.arrayOffset(), buf.readableBytes());
         } finally {
             buf.release();
@@ -155,6 +161,21 @@ public class PrometheusMetricsGenerator {
         }
     }
 
+    private static void generateManagedLedgerBookieClientMetrics(PulsarService pulsar, SimpleTextOutputStream stream) {
+        StatsProvider statsProvider = pulsar.getManagedLedgerClientFactory().getStatsProvider();
+        if (statsProvider instanceof NullStatsProvider) {
+            return;
+        }
+
+        try {
+            Writer writer = new StringWriter();
+            statsProvider.writeAllMetrics(writer);
+            stream.write(writer.toString());
+        } catch (IOException e) {
+            // nop
+        }
+    }
+
     private static void generateSystemMetrics(SimpleTextOutputStream stream, String cluster) {
         Enumeration<MetricFamilySamples> metricFamilySamples = CollectorRegistry.defaultRegistry.metricFamilySamples();
         while (metricFamilySamples.hasMoreElements()) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/DataSketchesOpStatsLogger.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/DataSketchesOpStatsLogger.java
new file mode 100644
index 0000000..3ef453d
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/DataSketchesOpStatsLogger.java
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats.prometheus.metrics;
+
+import com.yahoo.sketches.quantiles.DoublesSketch;
+import com.yahoo.sketches.quantiles.DoublesSketchBuilder;
+import com.yahoo.sketches.quantiles.DoublesUnion;
+import com.yahoo.sketches.quantiles.DoublesUnionBuilder;
+import io.netty.util.concurrent.FastThreadLocal;
+import org.apache.bookkeeper.stats.OpStatsData;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.concurrent.locks.StampedLock;
+
+/**
+ * OpStatsLogger implementation that uses DataSketches library to calculate the approximated latency quantiles.
+ */
+public class DataSketchesOpStatsLogger implements OpStatsLogger {
+
+    /**
+     * Use 2 rotating thread local accessor so that we can safely swap them.
+     */
+    private volatile ThreadLocalAccessor current;
+    private volatile ThreadLocalAccessor replacement;
+
+    /**
+     * These are the sketches where all the aggregated results are published.
+     */
+    private volatile DoublesSketch successResult;
+    private volatile DoublesSketch failResult;
+
+    private final LongAdder successCountAdder = new LongAdder();
+    private final LongAdder failCountAdder = new LongAdder();
+
+    private final LongAdder successSumAdder = new LongAdder();
+    private final LongAdder failSumAdder = new LongAdder();
+
+    DataSketchesOpStatsLogger() {
+        this.current = new ThreadLocalAccessor();
+        this.replacement = new ThreadLocalAccessor();
+    }
+
+    @Override
+    public void registerFailedEvent(long eventLatency, TimeUnit unit) {
+        double valueMillis = unit.toMicros(eventLatency) / 1000.0;
+
+        failCountAdder.increment();
+        failSumAdder.add((long) valueMillis);
+
+        LocalData localData = current.localData.get();
+
+        long stamp = localData.lock.readLock();
+        try {
+            localData.failSketch.update(valueMillis);
+        } finally {
+            localData.lock.unlockRead(stamp);
+        }
+    }
+
+    @Override
+    public void registerSuccessfulEvent(long eventLatency, TimeUnit unit) {
+        double valueMillis = unit.toMicros(eventLatency) / 1000.0;
+
+        successCountAdder.increment();
+        successSumAdder.add((long) valueMillis);
+
+        LocalData localData = current.localData.get();
+
+        long stamp = localData.lock.readLock();
+        try {
+            localData.successSketch.update(valueMillis);
+        } finally {
+            localData.lock.unlockRead(stamp);
+        }
+    }
+
+    @Override
+    public void registerSuccessfulValue(long value) {
+        successCountAdder.increment();
+        successSumAdder.add(value);
+
+        LocalData localData = current.localData.get();
+
+        long stamp = localData.lock.readLock();
+        try {
+            localData.successSketch.update(value);
+        } finally {
+            localData.lock.unlockRead(stamp);
+        }
+    }
+
+    @Override
+    public void registerFailedValue(long value) {
+        failCountAdder.increment();
+        failSumAdder.add(value);
+
+        LocalData localData = current.localData.get();
+
+        long stamp = localData.lock.readLock();
+        try {
+            localData.failSketch.update(value);
+        } finally {
+            localData.lock.unlockRead(stamp);
+        }
+    }
+
+    @Override
+    public OpStatsData toOpStatsData() {
+        // Not relevant as we don't use JMX here
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void clear() {
+        // Not relevant as we don't use JMX here
+        throw new UnsupportedOperationException();
+    }
+
+    public void rotateLatencyCollection() {
+        // Swap current with replacement
+        ThreadLocalAccessor local = current;
+        current = replacement;
+        replacement = local;
+
+        final DoublesUnion aggregateSuccesss = new DoublesUnionBuilder().build();
+        final DoublesUnion aggregateFail = new DoublesUnionBuilder().build();
+        local.map.forEach((localData, b) -> {
+            long stamp = localData.lock.writeLock();
+            try {
+                aggregateSuccesss.update(localData.successSketch);
+                localData.successSketch.reset();
+                aggregateFail.update(localData.failSketch);
+                localData.failSketch.reset();
+            } finally {
+                localData.lock.unlockWrite(stamp);
+            }
+        });
+
+        successResult = aggregateSuccesss.getResultAndReset();
+        failResult = aggregateFail.getResultAndReset();
+    }
+
+    public long getCount(boolean success) {
+        return success ? successCountAdder.sum() : failCountAdder.sum();
+    }
+
+    public long getSum(boolean success) {
+        return success ? successSumAdder.sum() : failSumAdder.sum();
+    }
+
+    public double getQuantileValue(boolean success, double quantile) {
+        DoublesSketch s = success ? successResult : failResult;
+        return s != null ? s.getQuantile(quantile) : Double.NaN;
+    }
+
+    private static class LocalData {
+        private final DoublesSketch successSketch = new DoublesSketchBuilder().build();
+        private final DoublesSketch failSketch = new DoublesSketchBuilder().build();
+        private final StampedLock lock = new StampedLock();
+    }
+
+    private static class ThreadLocalAccessor {
+        private final Map<LocalData, Boolean> map = new ConcurrentHashMap<>();
+        private final FastThreadLocal<LocalData> localData = new FastThreadLocal<LocalData>() {
+
+            @Override
+            protected LocalData initialValue() throws Exception {
+                LocalData localData = new LocalData();
+                map.put(localData, Boolean.TRUE);
+                return localData;
+            }
+
+            @Override
+            protected void onRemoval(LocalData value) throws Exception {
+                map.remove(value);
+            }
+        };
+    }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusMetricsProvider.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusMetricsProvider.java
new file mode 100644
index 0000000..a8a05e1
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusMetricsProvider.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats.prometheus.metrics;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import io.prometheus.client.Collector;
+import org.apache.bookkeeper.stats.CachingStatsProvider;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stats.StatsProvider;
+import org.apache.bookkeeper.stats.prometheus.LongAdderCounter;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.lang.StringUtils;
+
+import java.io.IOException;
+import java.io.Writer;
+import java.util.concurrent.*;
+
+/**
+ * A <i>Prometheus</i> based {@link StatsProvider} implementation.
+ */
+public class PrometheusMetricsProvider implements StatsProvider {
+    private ScheduledExecutorService executor;
+
+    public static final String PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS = "prometheusStatsLatencyRolloverSeconds";
+    public static final int DEFAULT_PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS = 60;
+    public static final String CLUSTER_NAME = "cluster";
+    public static final String DEFAULT_CLUSTER_NAME = "pulsar";
+
+    private String cluster;
+    private final CachingStatsProvider cachingStatsProvider;
+
+    /**
+     * These acts a registry of the metrics defined in this provider
+     */
+    final ConcurrentMap<String, LongAdderCounter> counters = new ConcurrentSkipListMap<>();
+    final ConcurrentMap<String, SimpleGauge<? extends Number>> gauges = new ConcurrentSkipListMap<>();
+    final ConcurrentMap<String, DataSketchesOpStatsLogger> opStats = new ConcurrentSkipListMap<>();
+
+    public PrometheusMetricsProvider() {
+        this.cachingStatsProvider = new CachingStatsProvider(new StatsProvider() {
+            @Override
+            public void start(Configuration conf) {
+                // nop
+            }
+
+            @Override
+            public void stop() {
+                // nop
+            }
+
+            @Override
+            public StatsLogger getStatsLogger(String scope) {
+                return new PrometheusStatsLogger(PrometheusMetricsProvider.this, scope);
+            }
+
+            @Override
+            public String getStatsName(String... statsComponents) {
+                String completeName;
+                if (statsComponents.length == 0) {
+                    return "";
+                } else if (statsComponents[0].isEmpty()) {
+                    completeName = StringUtils.join(statsComponents, '_', 1, statsComponents.length);
+                } else {
+                    completeName = StringUtils.join(statsComponents, '_');
+                }
+                return Collector.sanitizeMetricName(completeName);
+            }
+        });
+    }
+
+    @Override
+    public void start(Configuration conf) {
+        executor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("metrics"));
+
+        int latencyRolloverSeconds = conf.getInt(PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS,
+                DEFAULT_PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS);
+        cluster = conf.getString(CLUSTER_NAME, DEFAULT_CLUSTER_NAME);
+
+        executor.scheduleAtFixedRate(() -> {
+            rotateLatencyCollection();
+        }, 1, latencyRolloverSeconds, TimeUnit.SECONDS);
+    }
+
+    @Override
+    public void stop() {
+        executor.shutdownNow();
+    }
+
+    @Override
+    public StatsLogger getStatsLogger(String scope) {
+        return this.cachingStatsProvider.getStatsLogger(scope);
+    }
+
+    @Override
+    public void writeAllMetrics(Writer writer) throws IOException {
+        gauges.forEach((name, gauge) -> PrometheusTextFormatUtil.writeGauge(writer, name, cluster, gauge));
+        counters.forEach((name, counter) -> PrometheusTextFormatUtil.writeCounter(writer, name, cluster, counter));
+        opStats.forEach((name, opStatLogger) -> PrometheusTextFormatUtil.writeOpStat(writer, name, cluster, opStatLogger));
+    }
+
+    @Override
+    public String getStatsName(String... statsComponents) {
+        return cachingStatsProvider.getStatsName(statsComponents);
+    }
+
+    @VisibleForTesting
+    void rotateLatencyCollection() {
+        opStats.forEach((name, metric) -> {
+            metric.rotateLatencyCollection();
+        });
+    }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusStatsLogger.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusStatsLogger.java
new file mode 100644
index 0000000..ad3c62f
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusStatsLogger.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats.prometheus.metrics;
+
+import com.google.common.base.Joiner;
+import io.prometheus.client.Collector;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.Gauge;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stats.prometheus.LongAdderCounter;
+
+/**
+ * A {@code Prometheus} based {@link StatsLogger} implementation.
+ */
+public class PrometheusStatsLogger implements StatsLogger {
+
+    private final PrometheusMetricsProvider provider;
+    private final String scope;
+
+    PrometheusStatsLogger(PrometheusMetricsProvider provider, String scope) {
+        this.provider = provider;
+        this.scope = scope;
+    }
+
+    @Override
+    public OpStatsLogger getOpStatsLogger(String name) {
+        return provider.opStats.computeIfAbsent(completeName(name), x -> new DataSketchesOpStatsLogger());
+    }
+
+    @Override
+    public Counter getCounter(String name) {
+        return provider.counters.computeIfAbsent(completeName(name), x -> new LongAdderCounter());
+    }
+
+    @Override
+    public <T extends Number> void registerGauge(String name, Gauge<T> gauge) {
+        provider.gauges.computeIfAbsent(completeName(name), x -> new SimpleGauge<T>(gauge));
+    }
+
+    @Override
+    public <T extends Number> void unregisterGauge(String name, Gauge<T> gauge) {
+        // no-op
+    }
+
+    @Override
+    public void removeScope(String name, StatsLogger statsLogger) {
+        // no-op
+    }
+
+    @Override
+    public StatsLogger scope(String name) {
+        return new PrometheusStatsLogger(provider, completeName(name));
+    }
+
+    private String completeName(String name) {
+        String completeName = scope.isEmpty() ? name : Joiner.on('_').join(scope, name);
+        return Collector.sanitizeMetricName(completeName);
+    }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusTextFormatUtil.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusTextFormatUtil.java
new file mode 100644
index 0000000..abe0b56
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusTextFormatUtil.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats.prometheus.metrics;
+
+import io.prometheus.client.Collector;
+import io.prometheus.client.Collector.MetricFamilySamples;
+import io.prometheus.client.Collector.MetricFamilySamples.Sample;
+import io.prometheus.client.CollectorRegistry;
+import org.apache.bookkeeper.stats.Counter;
+
+import java.io.IOException;
+import java.io.Writer;
+import java.util.Enumeration;
+
+/**
+ * Logic to write metrics in Prometheus text format.
+ */
+public class PrometheusTextFormatUtil {
+    static void writeGauge(Writer w, String name, String cluster, SimpleGauge<? extends Number> gauge) {
+        // Example:
+        // # TYPE bookie_client_bookkeeper_ml_scheduler_completed_tasks_0 gauge
+        // pulsar_bookie_client_bookkeeper_ml_scheduler_completed_tasks_0{cluster="pulsar"} 1044057
+        try {
+            w.append("# TYPE ").append(name).append(" gauge\n");
+            w.append(name).append("{cluster=\"").append(cluster).append("\"}")
+                    .append(' ').append(gauge.getSample().toString()).append('\n');
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    static void writeCounter(Writer w, String name, String cluster, Counter counter) {
+        // Example:
+        // # TYPE jvm_threads_started_total counter
+        // jvm_threads_started_total{cluster="test"} 59
+        try {
+            w.append("# TYPE ").append(name).append(" counter\n");
+            w.append(name).append("{cluster=\"").append(cluster).append("\"}")
+                    .append(' ').append(counter.get().toString()).append('\n');
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    static void writeOpStat(Writer w, String name, String cluster, DataSketchesOpStatsLogger opStat) {
+        // Example:
+        // # TYPE pulsar_bookie_client_bookkeeper_ml_workers_task_queued summary
+        // pulsar_bookie_client_bookkeeper_ml_workers_task_queued{cluster="pulsar", success="false", quantile="0.5"} NaN
+        // pulsar_bookie_client_bookkeeper_ml_workers_task_queued{cluster="pulsar", success="false", quantile="0.75"} NaN
+        // pulsar_bookie_client_bookkeeper_ml_workers_task_queued{cluster="pulsar", success="false", quantile="0.95"} NaN
+        // pulsar_bookie_client_bookkeeper_ml_workers_task_queued{cluster="pulsar", success="false", quantile="0.99"} NaN
+        // pulsar_bookie_client_bookkeeper_ml_workers_task_queued{cluster="pulsar", success="false", quantile="0.999"} NaN
+        // pulsar_bookie_client_bookkeeper_ml_workers_task_queued{cluster="pulsar", success="false", quantile="0.9999"} NaN
+        // pulsar_bookie_client_bookkeeper_ml_workers_task_queued{cluster="pulsar", success="false", quantile="1.0"} -Infinity
+        // pulsar_bookie_client_bookkeeper_ml_workers_task_queued_count{cluster="pulsar", success="false"} 0
+        // pulsar_bookie_client_bookkeeper_ml_workers_task_queued_sum{cluster="pulsar", success="false"} 0.0
+        // pulsar_bookie_client_bookkeeper_ml_workers_task_queued{cluster="pulsar", success="true", quantile="0.5"} 0.031
+        // pulsar_bookie_client_bookkeeper_ml_workers_task_queued{cluster="pulsar", success="true", quantile="0.75"} 0.043
+        // pulsar_bookie_client_bookkeeper_ml_workers_task_queued{cluster="pulsar", success="true", quantile="0.95"} 0.061
+        // pulsar_bookie_client_bookkeeper_ml_workers_task_queued{cluster="pulsar", success="true", quantile="0.99"} 0.064
+        // pulsar_bookie_client_bookkeeper_ml_workers_task_queued{cluster="pulsar", success="true", quantile="0.999"} 0.073
+        // pulsar_bookie_client_bookkeeper_ml_workers_task_queued{cluster="pulsar", success="true", quantile="0.9999"} 0.073
+        // pulsar_bookie_client_bookkeeper_ml_workers_task_queued{cluster="pulsar", success="true", quantile="1.0"} 0.552
+        // pulsar_bookie_client_bookkeeper_ml_workers_task_queued_count{cluster="pulsar", success="true"} 40911432
+        // pulsar_bookie_client_bookkeeper_ml_workers_task_queued_sum{cluster="pulsar", success="true"} 527.0
+        try {
+            w.append("# TYPE ").append(name).append(" summary\n");
+            writeQuantile(w, opStat, name, cluster,false, 0.5);
+            writeQuantile(w, opStat, name, cluster, false, 0.75);
+            writeQuantile(w, opStat, name, cluster,false, 0.95);
+            writeQuantile(w, opStat, name, cluster,false, 0.99);
+            writeQuantile(w, opStat, name, cluster,false, 0.999);
+            writeQuantile(w, opStat, name, cluster,false, 0.9999);
+            writeQuantile(w, opStat, name, cluster,false, 1.0);
+            writeCount(w, opStat, name, cluster, false);
+            writeSum(w, opStat, name, cluster, false);
+
+            writeQuantile(w, opStat, name, cluster,true, 0.5);
+            writeQuantile(w, opStat, name, cluster,true, 0.75);
+            writeQuantile(w, opStat, name, cluster,true, 0.95);
+            writeQuantile(w, opStat, name, cluster,true, 0.99);
+            writeQuantile(w, opStat, name, cluster,true, 0.999);
+            writeQuantile(w, opStat, name, cluster,true, 0.9999);
+            writeQuantile(w, opStat, name, cluster,true, 1.0);
+            writeCount(w, opStat, name, cluster, true);
+            writeSum(w, opStat, name, cluster, true);
+
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static void writeQuantile(Writer w, DataSketchesOpStatsLogger opStat, String name, String cluster,
+                                      Boolean success, double quantile) throws IOException {
+        w.append(name).append("{cluster=\"").append(cluster).append("\", success=\"")
+                .append(success.toString()).append("\",quantile=\"")
+                .append(Double.toString(quantile)).append("\"} ")
+                .append(Double.toString(opStat.getQuantileValue(success, quantile))).append('\n');
+    }
+
+    private static void writeCount(Writer w, DataSketchesOpStatsLogger opStat, String name, String cluster,
+                                   Boolean success) throws IOException {
+        w.append(name).append("_count{cluster=\"").append(cluster).append("\", success=\"")
+                .append(success.toString()).append("\"} ")
+                .append(Long.toString(opStat.getCount(success))).append('\n');
+    }
+
+    private static void writeSum(Writer w, DataSketchesOpStatsLogger opStat, String name, String cluster,
+                                 Boolean success) throws IOException {
+        w.append(name).append("_sum{cluster=\"").append(cluster).append("\", success=\"")
+                .append(success.toString()).append("\"} ")
+                .append(Double.toString(opStat.getSum(success))).append('\n');
+    }
+
+    static void writeMetricsCollectedByPrometheusClient(Writer w, CollectorRegistry registry) throws IOException {
+        Enumeration<MetricFamilySamples> metricFamilySamples = registry.metricFamilySamples();
+        while (metricFamilySamples.hasMoreElements()) {
+            MetricFamilySamples metricFamily = metricFamilySamples.nextElement();
+
+            for (int i = 0; i < metricFamily.samples.size(); i++) {
+                Sample sample = metricFamily.samples.get(i);
+                w.write(sample.name);
+                w.write('{');
+                for (int j = 0; j < sample.labelNames.size(); j++) {
+                    if (j != 0) {
+                        w.write(", ");
+                    }
+                    w.write(sample.labelNames.get(j));
+                    w.write("=\"");
+                    w.write(sample.labelValues.get(j));
+                    w.write('"');
+                }
+
+                w.write("} ");
+                w.write(Collector.doubleToGoString(sample.value));
+                w.write('\n');
+            }
+        }
+    }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/SimpleGauge.java
similarity index 55%
copy from pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactory.java
copy to pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/SimpleGauge.java
index c0c43ab..a93a26c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactory.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/SimpleGauge.java
@@ -16,24 +16,22 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.broker;
+package org.apache.pulsar.broker.stats.prometheus.metrics;
 
-import java.io.IOException;
-import java.util.Map;
-import java.util.Optional;
-
-import org.apache.bookkeeper.client.BookKeeper;
-import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
-import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.zookeeper.ZooKeeper;
+import org.apache.bookkeeper.stats.Gauge;
 
 /**
- * Provider of a new BookKeeper client instance
+ * A {@link Gauge} implementation that forwards on the value supplier.
  */
-public interface BookKeeperClientFactory {
-    BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient,
-            Optional<Class<? extends EnsemblePlacementPolicy>> ensemblePlacementPolicyClass,
-            Map<String, Object> ensemblePlacementPolicyProperties) throws IOException;
+public class SimpleGauge<T extends Number> {
+
+    private final Gauge<T> gauge;
+
+    public SimpleGauge(final Gauge<T> gauge) {
+        this.gauge = gauge;
+    }
 
-    void close();
+    Number getSample() {
+        return gauge.getSample();
+    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/MockedBookKeeperClientFactory.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/MockedBookKeeperClientFactory.java
index 4b7f49b..7c66ccc 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/MockedBookKeeperClientFactory.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/MockedBookKeeperClientFactory.java
@@ -31,6 +31,7 @@ import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
 import org.apache.bookkeeper.client.PulsarMockBookKeeper;
 
+import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -61,6 +62,13 @@ public class MockedBookKeeperClientFactory implements BookKeeperClientFactory {
     }
 
     @Override
+    public BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient,
+                             Optional<Class<? extends EnsemblePlacementPolicy>> ensemblePlacementPolicyClass,
+                             Map<String, Object> properties, StatsLogger statsLogger) throws IOException {
+        return mockedBk;
+    }
+
+    @Override
     public void close() {
         try {
             mockedBk.close();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index c1e39b4..fc0c66d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.auth;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
 
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.MoreExecutors;
@@ -44,6 +45,7 @@ import java.util.function.Supplier;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
 import org.apache.bookkeeper.client.PulsarMockBookKeeper;
+import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.pulsar.broker.BookKeeperClientFactory;
 import org.apache.pulsar.broker.PulsarService;
@@ -106,6 +108,7 @@ public abstract class MockedPulsarServiceBaseTest {
         this.conf.setBrokerServicePortTls(Optional.of(0));
         this.conf.setWebServicePort(Optional.of(0));
         this.conf.setWebServicePortTls(Optional.of(0));
+        this.conf.setBookkeeperClientExposeStatsToPrometheus(true);
     }
 
     protected final void internalSetup() throws Exception {
@@ -339,6 +342,14 @@ public abstract class MockedPulsarServiceBaseTest {
         }
 
         @Override
+        public BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient,
+                                 Optional<Class<? extends EnsemblePlacementPolicy>> ensemblePlacementPolicyClass,
+                                 Map<String, Object> properties, StatsLogger statsLogger) {
+            // Always return the same instance (so that we don't loose the mock BK content on broker restart
+            return mockBookKeeper;
+        }
+
+        @Override
         public void close() {
             // no-op
         }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index 948bdbd..8d4cef4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -471,6 +471,50 @@ public class PrometheusMetricsTest extends BrokerTestBase {
         p2.close();
     }
 
+    @Test
+    public void testManagedLedgerBookieClientStats() throws Exception {
+        Producer<byte[]> p1 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();
+        Producer<byte[]> p2 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic2").create();
+        for (int i = 0; i < 10; i++) {
+            String message = "my-message-" + i;
+            p1.send(message.getBytes());
+            p2.send(message.getBytes());
+        }
+
+        ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
+        PrometheusMetricsGenerator.generate(pulsar, false, false, statsOut);
+        String metricsStr = new String(statsOut.toByteArray());
+
+        Multimap<String, Metric> metrics = parseMetrics(metricsStr);
+
+        metrics.entries().forEach(e ->
+                System.out.println(e.getKey() + ": " + e.getValue())
+        );
+
+        List<Metric> cm = (List<Metric>) metrics.get("pulsar_managedLedger_client_bookkeeper_ml_scheduler_completed_tasks_0");
+        assertEquals(cm.size(), 1);
+        assertEquals(cm.get(0).tags.get("cluster"), "test");
+
+        cm = (List<Metric>) metrics.get("pulsar_managedLedger_client_bookkeeper_ml_scheduler_queue_0");
+        assertEquals(cm.size(), 1);
+        assertEquals(cm.get(0).tags.get("cluster"), "test");
+
+        cm = (List<Metric>) metrics.get("pulsar_managedLedger_client_bookkeeper_ml_scheduler_total_tasks_0");
+        assertEquals(cm.size(), 1);
+        assertEquals(cm.get(0).tags.get("cluster"), "test");
+
+        cm = (List<Metric>) metrics.get("pulsar_managedLedger_client_bookkeeper_ml_workers_completed_tasks_0");
+        assertEquals(cm.size(), 1);
+        assertEquals(cm.get(0).tags.get("cluster"), "test");
+
+        cm = (List<Metric>) metrics.get("pulsar_managedLedger_client_bookkeeper_ml_workers_task_execution_count");
+        assertEquals(cm.size(), 2);
+        assertEquals(cm.get(0).tags.get("cluster"), "test");
+
+        p1.close();
+        p2.close();
+    }
+
     /**
      * Hacky parsing of Prometheus text format. Sould be good enough for unit tests
      */
diff --git a/site2/docs/reference-metrics.md b/site2/docs/reference-metrics.md
index 081a9df..ffabb31 100644
--- a/site2/docs/reference-metrics.md
+++ b/site2/docs/reference-metrics.md
@@ -110,6 +110,7 @@ Broker has the following kinds of metrics:
     * [BundleSplit metrics](#bundlesplit-metrics)
 * [Subscription metrics](#subscription-metrics)
 * [Consumer metrics](#consumer-metrics)
+* [ManagedLedger bookie client metrics](#managed-ledger-bookie-client-metrics)
 
 ### Namespace metrics
 
@@ -324,6 +325,25 @@ All the consumer metrics are labelled with the following labels:
 | pulsar_consumer_msg_throughput_out | Gauge | The total message dispatch throughput for a consumer (bytes/second). |
 | pulsar_consumer_available_permits | Gauge | The available permits for for a consumer. |
 
+### Managed ledger bookie client metrics
+
+All the managed ledger bookie client metrics labelled with the following labels:
+
+- *cluster*: `cluster=${pulsar_cluster}`. `${pulsar_cluster}` is the cluster name that you configured in `broker.conf`.
+
+| Name | Type | Description |
+| --- | --- | --- |
+| pulsar_managedLedger_client_bookkeeper_ml_scheduler_completed_tasks_* | Gauge |  The number of tasks the scheduler executor execute completed. <br>The number of metrics determined by the scheduler executor thread number configured by `managedLedgerNumSchedulerThreads` in `broker.conf`. <br> |
+| pulsar_managedLedger_client_bookkeeper_ml_scheduler_queue_* | Gauge | The number of tasks queued in the scheduler executor's queue. <br>The number of metrics determined by scheduler executor's thread number configured by `managedLedgerNumSchedulerThreads` in `broker.conf`. <br> |
+| pulsar_managedLedger_client_bookkeeper_ml_scheduler_total_tasks_* | Gauge | The total number of tasks the scheduler executor received. <br>The number of metrics determined by scheduler executor's thread number configured by `managedLedgerNumSchedulerThreads` in `broker.conf`. <br> |
+| pulsar_managedLedger_client_bookkeeper_ml_workers_completed_tasks_* | Gauge | The number of tasks the worker executor execute completed. <br>The number of metrics determined by the number of worker task thread number configured by `managedLedgerNumWorkerThreads` in `broker.conf` <br> |
+| pulsar_managedLedger_client_bookkeeper_ml_workers_queue_* | Gauge | The number of tasks queued in the worker executor's queue. <br>The number of metrics determined by scheduler executor's thread number configured by `managedLedgerNumWorkerThreads` in `broker.conf`. <br> |
+| pulsar_managedLedger_client_bookkeeper_ml_workers_total_tasks_* | Gauge | The total number of tasks the worker executor received. <br>The number of metrics determined by worker executor's thread number configured by `managedLedgerNumWorkerThreads` in `broker.conf`. <br> |
+| pulsar_managedLedger_client_bookkeeper_ml_scheduler_task_execution | Summary | The scheduler task execution latency calculated in milliseconds. |
+| pulsar_managedLedger_client_bookkeeper_ml_scheduler_task_queued | Summary | The scheduler task queued latency calculated in milliseconds. |
+| pulsar_managedLedger_client_bookkeeper_ml_workers_task_execution | Summary | The worker task execution latency calculated in milliseconds. |
+| pulsar_managedLedger_client_bookkeeper_ml_workers_task_queued | Summary | The worker task queued latency calculated in milliseconds. |
+
 # Pulsar Functions
 
 All the Pulsar Functions metrics are labelled with the following labels:
@@ -387,7 +407,6 @@ All the proxy metrics are labelled with the following labels:
 | split_record_deserialize_time_per_query | Summary | Time spent on deserializing message to record per query. |
 | split_total_execution_time | Summary | Total execution time . |
 
-
 ## Monitor
 
 You can [set up a Prometheus instance](https://prometheus.io/) to collect all the metrics exposed at Pulsar components and set up