You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/09/29 00:34:49 UTC

[pulsar] branch branch-2.9 updated: Group prometheus metrics (#17852)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new 976a3181b7e Group prometheus metrics (#17852)
976a3181b7e is described below

commit 976a3181b7e47bcc501294b8489362a1cd3e3e89
Author: Tao Jiuming <95...@users.noreply.github.com>
AuthorDate: Thu Sep 29 08:34:41 2022 +0800

    Group prometheus metrics (#17852)
---
 .../stats/prometheus/AggregatedNamespaceStats.java |   2 +-
 .../stats/prometheus/NamespaceStatsAggregator.java | 352 ++++++------
 .../stats/prometheus/PrometheusMetricStreams.java  |  75 +++
 .../prometheus/PrometheusMetricsGenerator.java     |  54 +-
 .../pulsar/broker/stats/prometheus/TopicStats.java | 600 ++++++++++-----------
 .../stats/prometheus/TransactionAggregator.java    | 321 +++++------
 .../metrics/PrometheusTextFormatUtil.java          |  32 --
 .../pulsar/broker/stats/PrometheusMetricsTest.java |  59 ++
 .../prometheus/PrometheusMetricStreamsTest.java    |  85 +++
 .../pulsar/common/util/SimpleTextOutputStream.java |  12 +-
 .../instance/stats/PrometheusTextFormat.java       |  29 +-
 .../functions/worker/WorkerStatsManager.java       |   5 +
 12 files changed, 885 insertions(+), 741 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
index 5610dbab218..1980af91b7b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
@@ -96,7 +96,7 @@ public class AggregatedNamespaceStats {
 
         stats.replicationStats.forEach((n, as) -> {
             AggregatedReplicationStats replStats =
-                    replicationStats.computeIfAbsent(n,  k -> new AggregatedReplicationStats());
+                    replicationStats.computeIfAbsent(n, k -> new AggregatedReplicationStats());
             replStats.msgRateIn += as.msgRateIn;
             replStats.msgRateOut += as.msgRateOut;
             replStats.msgThroughputIn += as.msgThroughputIn;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
index 16e438e2a2e..3e67d929350 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
@@ -19,8 +19,11 @@
 package org.apache.pulsar.broker.stats.prometheus;
 
 import io.netty.util.concurrent.FastThreadLocal;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.atomic.LongAdder;
+import java.util.function.Function;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.mledger.ManagedLedger;
@@ -32,7 +35,6 @@ import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
 import org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl;
 import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
-import org.apache.pulsar.common.util.SimpleTextOutputStream;
 import org.apache.pulsar.compaction.CompactedTopicContext;
 import org.apache.pulsar.compaction.Compactor;
 import org.apache.pulsar.compaction.CompactorMXBean;
@@ -40,72 +42,75 @@ import org.apache.pulsar.compaction.CompactorMXBean;
 @Slf4j
 public class NamespaceStatsAggregator {
 
-    private static FastThreadLocal<AggregatedNamespaceStats> localNamespaceStats =
+    private static final FastThreadLocal<AggregatedNamespaceStats> localNamespaceStats =
             new FastThreadLocal<AggregatedNamespaceStats>() {
                 @Override
-                protected AggregatedNamespaceStats initialValue() throws Exception {
+                protected AggregatedNamespaceStats initialValue() {
                     return new AggregatedNamespaceStats();
                 }
             };
 
-    private static FastThreadLocal<TopicStats> localTopicStats = new FastThreadLocal<TopicStats>() {
+    private static final FastThreadLocal<TopicStats> localTopicStats = new FastThreadLocal<TopicStats>() {
         @Override
-        protected TopicStats initialValue() throws Exception {
+        protected TopicStats initialValue() {
             return new TopicStats();
         }
     };
 
     public static void generate(PulsarService pulsar, boolean includeTopicMetrics, boolean includeConsumerMetrics,
-           boolean includeProducerMetrics, boolean splitTopicAndPartitionIndexLabel, SimpleTextOutputStream stream) {
+                                boolean includeProducerMetrics, boolean splitTopicAndPartitionIndexLabel,
+                                PrometheusMetricStreams stream) {
         String cluster = pulsar.getConfiguration().getClusterName();
         AggregatedNamespaceStats namespaceStats = localNamespaceStats.get();
-        TopicStats.resetTypes();
         TopicStats topicStats = localTopicStats.get();
+        Optional<CompactorMXBean> compactorMXBean = getCompactorMXBean(pulsar);
+        LongAdder topicsCount = new LongAdder();
+        Map<String, Long> localNamespaceTopicCount = new HashMap<>();
 
         printDefaultBrokerStats(stream, cluster);
 
-        Optional<CompactorMXBean> compactorMXBean = getCompactorMXBean(pulsar);
-        LongAdder topicsCount = new LongAdder();
         pulsar.getBrokerService().getMultiLayerTopicMap().forEach((namespace, bundlesMap) -> {
             namespaceStats.reset();
             topicsCount.reset();
 
-            bundlesMap.forEach((bundle, topicsMap) -> {
-                topicsMap.forEach((name, topic) -> {
-                    getTopicStats(topic, topicStats, includeConsumerMetrics, includeProducerMetrics,
-                            pulsar.getConfiguration().isExposePreciseBacklogInPrometheus(),
-                            pulsar.getConfiguration().isExposeSubscriptionBacklogSizeInPrometheus(),
-                            compactorMXBean
-                    );
-
-                    if (includeTopicMetrics) {
-                        topicsCount.add(1);
-                        TopicStats.printTopicStats(stream, cluster, namespace, name, topicStats, compactorMXBean,
-                                splitTopicAndPartitionIndexLabel);
-                    } else {
-                        namespaceStats.updateStats(topicStats);
-                    }
-                });
-            });
+            bundlesMap.forEach((bundle, topicsMap) -> topicsMap.forEach((name, topic) -> {
+                getTopicStats(topic, topicStats, includeConsumerMetrics, includeProducerMetrics,
+                        pulsar.getConfiguration().isExposePreciseBacklogInPrometheus(),
+                        pulsar.getConfiguration().isExposeSubscriptionBacklogSizeInPrometheus(),
+                        compactorMXBean
+                );
+
+                if (includeTopicMetrics) {
+                    topicsCount.add(1);
+                    TopicStats.printTopicStats(stream, topicStats, compactorMXBean, cluster, namespace, name,
+                            splitTopicAndPartitionIndexLabel);
+                } else {
+                    namespaceStats.updateStats(topicStats);
+                }
+            }));
 
             if (!includeTopicMetrics) {
-                // Only include namespace level stats if we don't have the per-topic, otherwise we're going to report
-                // the same data twice, and it will make the aggregation difficult
-                printNamespaceStats(stream, cluster, namespace, namespaceStats);
+                // Only include namespace level stats if we don't have the per-topic, otherwise we're going to
+                // report the same data twice, and it will make the aggregation difficult
+                printNamespaceStats(stream, namespaceStats, cluster, namespace);
             } else {
-                printTopicsCountStats(stream, cluster, namespace, topicsCount);
+                localNamespaceTopicCount.put(namespace, topicsCount.sum());
             }
         });
+
+        if (includeTopicMetrics) {
+            printTopicsCountStats(stream, localNamespaceTopicCount, cluster);
+        }
     }
 
     private static Optional<CompactorMXBean> getCompactorMXBean(PulsarService pulsar) {
         Compactor compactor = pulsar.getNullableCompactor();
-        return Optional.ofNullable(compactor).map(c -> c.getStats());
+        return Optional.ofNullable(compactor).map(Compactor::getStats);
     }
 
     private static void getTopicStats(Topic topic, TopicStats stats, boolean includeConsumerMetrics,
-            boolean includeProducerMetrics, boolean getPreciseBacklog, boolean subscriptionBacklogSize,
-                                      Optional<CompactorMXBean> compactorMXBean) {
+                                      boolean includeProducerMetrics, boolean getPreciseBacklog,
+                                      boolean subscriptionBacklogSize, Optional<CompactorMXBean> compactorMXBean) {
         stats.reset();
 
         if (topic instanceof PersistentTopic) {
@@ -267,161 +272,174 @@ public class NamespaceStatsAggregator {
                 });
     }
 
-    private static void printDefaultBrokerStats(SimpleTextOutputStream stream, String cluster) {
+    private static void printDefaultBrokerStats(PrometheusMetricStreams stream, String cluster) {
         // Print metrics with 0 values. This is necessary to have the available brokers being
         // reported in the brokers dashboard even if they don't have any topic or traffic
-        metric(stream, cluster, "pulsar_topics_count", 0);
-        metric(stream, cluster, "pulsar_subscriptions_count", 0);
-        metric(stream, cluster, "pulsar_producers_count", 0);
-        metric(stream, cluster, "pulsar_consumers_count", 0);
-        metric(stream, cluster, "pulsar_rate_in", 0);
-        metric(stream, cluster, "pulsar_rate_out", 0);
-        metric(stream, cluster, "pulsar_throughput_in", 0);
-        metric(stream, cluster, "pulsar_throughput_out", 0);
-        metric(stream, cluster, "pulsar_storage_size", 0);
-        metric(stream, cluster, "pulsar_storage_logical_size", 0);
-        metric(stream, cluster, "pulsar_storage_write_rate", 0);
-        metric(stream, cluster, "pulsar_storage_read_rate", 0);
-        metric(stream, cluster, "pulsar_msg_backlog", 0);
+        writeMetric(stream, "pulsar_topics_count", 0, cluster);
+        writeMetric(stream, "pulsar_subscriptions_count", 0, cluster);
+        writeMetric(stream, "pulsar_producers_count", 0, cluster);
+        writeMetric(stream, "pulsar_consumers_count", 0, cluster);
+        writeMetric(stream, "pulsar_rate_in", 0, cluster);
+        writeMetric(stream, "pulsar_rate_out", 0, cluster);
+        writeMetric(stream, "pulsar_throughput_in", 0, cluster);
+        writeMetric(stream, "pulsar_throughput_out", 0, cluster);
+        writeMetric(stream, "pulsar_storage_size", 0, cluster);
+        writeMetric(stream, "pulsar_storage_logical_size", 0, cluster);
+        writeMetric(stream, "pulsar_storage_write_rate", 0, cluster);
+        writeMetric(stream, "pulsar_storage_read_rate", 0, cluster);
+        writeMetric(stream, "pulsar_msg_backlog", 0, cluster);
     }
 
-    private static void printTopicsCountStats(SimpleTextOutputStream stream, String cluster, String namespace,
-                                              LongAdder topicsCount) {
-        metric(stream, cluster, namespace, "pulsar_topics_count", topicsCount.sum());
+    private static void printTopicsCountStats(PrometheusMetricStreams stream, Map<String, Long> namespaceTopicsCount,
+                                              String cluster) {
+        namespaceTopicsCount.forEach(
+                (ns, topicCount) -> writeMetric(stream, "pulsar_topics_count", topicCount, cluster, ns)
+        );
     }
 
-    private static void printNamespaceStats(SimpleTextOutputStream stream, String cluster, String namespace,
-                                            AggregatedNamespaceStats stats) {
-        metric(stream, cluster, namespace, "pulsar_topics_count", stats.topicsCount);
-        metric(stream, cluster, namespace, "pulsar_subscriptions_count", stats.subscriptionsCount);
-        metric(stream, cluster, namespace, "pulsar_producers_count", stats.producersCount);
-        metric(stream, cluster, namespace, "pulsar_consumers_count", stats.consumersCount);
-
-        metric(stream, cluster, namespace, "pulsar_rate_in", stats.rateIn);
-        metric(stream, cluster, namespace, "pulsar_rate_out", stats.rateOut);
-        metric(stream, cluster, namespace, "pulsar_throughput_in", stats.throughputIn);
-        metric(stream, cluster, namespace, "pulsar_throughput_out", stats.throughputOut);
-        metric(stream, cluster, namespace, "pulsar_consumer_msg_ack_rate", stats.messageAckRate);
-
-        metric(stream, cluster, namespace, "pulsar_in_bytes_total", stats.bytesInCounter);
-        metric(stream, cluster, namespace, "pulsar_in_messages_total", stats.msgInCounter);
-        metric(stream, cluster, namespace, "pulsar_out_bytes_total", stats.bytesOutCounter);
-        metric(stream, cluster, namespace, "pulsar_out_messages_total", stats.msgOutCounter);
-
-        metric(stream, cluster, namespace, "pulsar_storage_size", stats.managedLedgerStats.storageSize);
-        metric(stream, cluster, namespace, "pulsar_storage_logical_size", stats.managedLedgerStats.storageLogicalSize);
-        metric(stream, cluster, namespace, "pulsar_storage_backlog_size", stats.managedLedgerStats.backlogSize);
-        metric(stream, cluster, namespace, "pulsar_storage_offloaded_size",
-                stats.managedLedgerStats.offloadedStorageUsed);
-
-        metric(stream, cluster, namespace, "pulsar_storage_write_rate", stats.managedLedgerStats.storageWriteRate);
-        metric(stream, cluster, namespace, "pulsar_storage_read_rate", stats.managedLedgerStats.storageReadRate);
-
-        metric(stream, cluster, namespace, "pulsar_subscription_delayed", stats.msgDelayed);
-
-        metricWithRemoteCluster(stream, cluster, namespace, "pulsar_msg_backlog", "local", stats.msgBacklog);
+    private static void printNamespaceStats(PrometheusMetricStreams stream, AggregatedNamespaceStats stats,
+                                            String cluster, String namespace) {
+        writeMetric(stream, "pulsar_topics_count", stats.topicsCount, cluster, namespace);
+        writeMetric(stream, "pulsar_subscriptions_count", stats.subscriptionsCount, cluster,
+                namespace);
+        writeMetric(stream, "pulsar_producers_count", stats.producersCount, cluster, namespace);
+        writeMetric(stream, "pulsar_consumers_count", stats.consumersCount, cluster, namespace);
+
+        writeMetric(stream, "pulsar_rate_in", stats.rateIn, cluster, namespace);
+        writeMetric(stream, "pulsar_rate_out", stats.rateOut, cluster, namespace);
+        writeMetric(stream, "pulsar_throughput_in", stats.throughputIn, cluster, namespace);
+        writeMetric(stream, "pulsar_throughput_out", stats.throughputOut, cluster, namespace);
+        writeMetric(stream, "pulsar_consumer_msg_ack_rate", stats.messageAckRate, cluster, namespace);
+
+        writeMetric(stream, "pulsar_in_bytes_total", stats.bytesInCounter, cluster, namespace);
+        writeMetric(stream, "pulsar_in_messages_total", stats.msgInCounter, cluster, namespace);
+        writeMetric(stream, "pulsar_out_bytes_total", stats.bytesOutCounter, cluster, namespace);
+        writeMetric(stream, "pulsar_out_messages_total", stats.msgOutCounter, cluster, namespace);
+
+        writeMetric(stream, "pulsar_storage_size", stats.managedLedgerStats.storageSize, cluster,
+                namespace);
+        writeMetric(stream, "pulsar_storage_logical_size",
+                stats.managedLedgerStats.storageLogicalSize, cluster, namespace);
+        writeMetric(stream, "pulsar_storage_backlog_size", stats.managedLedgerStats.backlogSize, cluster,
+                namespace);
+        writeMetric(stream, "pulsar_storage_offloaded_size",
+                stats.managedLedgerStats.offloadedStorageUsed, cluster, namespace);
+
+        writeMetric(stream, "pulsar_storage_write_rate", stats.managedLedgerStats.storageWriteRate,
+                cluster, namespace);
+        writeMetric(stream, "pulsar_storage_read_rate", stats.managedLedgerStats.storageReadRate,
+                cluster, namespace);
+
+        writeMetric(stream, "pulsar_subscription_delayed", stats.msgDelayed, cluster, namespace);
+
+        writePulsarMsgBacklog(stream, stats.msgBacklog, cluster, namespace);
 
         stats.managedLedgerStats.storageWriteLatencyBuckets.refresh();
         long[] latencyBuckets = stats.managedLedgerStats.storageWriteLatencyBuckets.getBuckets();
-        metric(stream, cluster, namespace, "pulsar_storage_write_latency_le_0_5", latencyBuckets[0]);
-        metric(stream, cluster, namespace, "pulsar_storage_write_latency_le_1", latencyBuckets[1]);
-        metric(stream, cluster, namespace, "pulsar_storage_write_latency_le_5", latencyBuckets[2]);
-        metric(stream, cluster, namespace, "pulsar_storage_write_latency_le_10", latencyBuckets[3]);
-        metric(stream, cluster, namespace, "pulsar_storage_write_latency_le_20", latencyBuckets[4]);
-        metric(stream, cluster, namespace, "pulsar_storage_write_latency_le_50", latencyBuckets[5]);
-        metric(stream, cluster, namespace, "pulsar_storage_write_latency_le_100", latencyBuckets[6]);
-        metric(stream, cluster, namespace, "pulsar_storage_write_latency_le_200", latencyBuckets[7]);
-        metric(stream, cluster, namespace, "pulsar_storage_write_latency_le_1000", latencyBuckets[8]);
-        metric(stream, cluster, namespace, "pulsar_storage_write_latency_overflow", latencyBuckets[9]);
-        metric(stream, cluster, namespace, "pulsar_storage_write_latency_count",
-                stats.managedLedgerStats.storageWriteLatencyBuckets.getCount());
-        metric(stream, cluster, namespace, "pulsar_storage_write_latency_sum",
-                stats.managedLedgerStats.storageWriteLatencyBuckets.getSum());
+        writeMetric(stream, "pulsar_storage_write_latency_le_0_5", latencyBuckets[0], cluster, namespace);
+        writeMetric(stream, "pulsar_storage_write_latency_le_1", latencyBuckets[1], cluster, namespace);
+        writeMetric(stream, "pulsar_storage_write_latency_le_5", latencyBuckets[2], cluster, namespace);
+        writeMetric(stream, "pulsar_storage_write_latency_le_10", latencyBuckets[3], cluster, namespace);
+        writeMetric(stream, "pulsar_storage_write_latency_le_20", latencyBuckets[4], cluster, namespace);
+        writeMetric(stream, "pulsar_storage_write_latency_le_50", latencyBuckets[5], cluster, namespace);
+        writeMetric(stream, "pulsar_storage_write_latency_le_100", latencyBuckets[6], cluster, namespace);
+        writeMetric(stream, "pulsar_storage_write_latency_le_200", latencyBuckets[7], cluster, namespace);
+        writeMetric(stream, "pulsar_storage_write_latency_le_1000", latencyBuckets[8], cluster, namespace);
+        writeMetric(stream, "pulsar_storage_write_latency_overflow", latencyBuckets[9], cluster, namespace);
+        writeMetric(stream, "pulsar_storage_write_latency_count",
+                stats.managedLedgerStats.storageWriteLatencyBuckets.getCount(), cluster, namespace);
+        writeMetric(stream, "pulsar_storage_write_latency_sum",
+                stats.managedLedgerStats.storageWriteLatencyBuckets.getSum(), cluster, namespace);
 
         stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.refresh();
-        long[] ledgerWritelatencyBuckets = stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets();
-        metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_le_0_5", ledgerWritelatencyBuckets[0]);
-        metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_le_1", ledgerWritelatencyBuckets[1]);
-        metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_le_5", ledgerWritelatencyBuckets[2]);
-        metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_le_10", ledgerWritelatencyBuckets[3]);
-        metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_le_20", ledgerWritelatencyBuckets[4]);
-        metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_le_50", ledgerWritelatencyBuckets[5]);
-        metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_le_100", ledgerWritelatencyBuckets[6]);
-        metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_le_200", ledgerWritelatencyBuckets[7]);
-        metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_le_1000", ledgerWritelatencyBuckets[8]);
-        metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_overflow",
-                ledgerWritelatencyBuckets[9]);
-        metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_count",
-                stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getCount());
-        metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_sum",
-                stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getSum());
+        long[] ledgerWriteLatencyBuckets = stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets();
+        writeMetric(stream, "pulsar_storage_ledger_write_latency_le_0_5", ledgerWriteLatencyBuckets[0],
+                cluster, namespace);
+        writeMetric(stream, "pulsar_storage_ledger_write_latency_le_1", ledgerWriteLatencyBuckets[1],
+                cluster, namespace);
+        writeMetric(stream, "pulsar_storage_ledger_write_latency_le_5", ledgerWriteLatencyBuckets[2],
+                cluster, namespace);
+        writeMetric(stream, "pulsar_storage_ledger_write_latency_le_10", ledgerWriteLatencyBuckets[3],
+                cluster, namespace);
+        writeMetric(stream, "pulsar_storage_ledger_write_latency_le_20", ledgerWriteLatencyBuckets[4],
+                cluster, namespace);
+        writeMetric(stream, "pulsar_storage_ledger_write_latency_le_50", ledgerWriteLatencyBuckets[5],
+                cluster, namespace);
+        writeMetric(stream, "pulsar_storage_ledger_write_latency_le_100", ledgerWriteLatencyBuckets[6],
+                cluster, namespace);
+        writeMetric(stream, "pulsar_storage_ledger_write_latency_le_200", ledgerWriteLatencyBuckets[7],
+                cluster, namespace);
+        writeMetric(stream, "pulsar_storage_ledger_write_latency_le_1000", ledgerWriteLatencyBuckets[8],
+                cluster, namespace);
+        writeMetric(stream, "pulsar_storage_ledger_write_latency_overflow", ledgerWriteLatencyBuckets[9],
+                cluster, namespace);
+        writeMetric(stream, "pulsar_storage_ledger_write_latency_count",
+                stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getCount(), cluster, namespace);
+        writeMetric(stream, "pulsar_storage_ledger_write_latency_sum",
+                stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getSum(), cluster, namespace);
 
         stats.managedLedgerStats.entrySizeBuckets.refresh();
         long[] entrySizeBuckets = stats.managedLedgerStats.entrySizeBuckets.getBuckets();
-        metric(stream, cluster, namespace, "pulsar_entry_size_le_128", entrySizeBuckets[0]);
-        metric(stream, cluster, namespace, "pulsar_entry_size_le_512", entrySizeBuckets[1]);
-        metric(stream, cluster, namespace, "pulsar_entry_size_le_1_kb", entrySizeBuckets[2]);
-        metric(stream, cluster, namespace, "pulsar_entry_size_le_2_kb", entrySizeBuckets[3]);
-        metric(stream, cluster, namespace, "pulsar_entry_size_le_4_kb", entrySizeBuckets[4]);
-        metric(stream, cluster, namespace, "pulsar_entry_size_le_16_kb", entrySizeBuckets[5]);
-        metric(stream, cluster, namespace, "pulsar_entry_size_le_100_kb", entrySizeBuckets[6]);
-        metric(stream, cluster, namespace, "pulsar_entry_size_le_1_mb", entrySizeBuckets[7]);
-        metric(stream, cluster, namespace, "pulsar_entry_size_le_overflow", entrySizeBuckets[8]);
-        metric(stream, cluster, namespace, "pulsar_entry_size_count",
-                stats.managedLedgerStats.entrySizeBuckets.getCount());
-        metric(stream, cluster, namespace, "pulsar_entry_size_sum",
-                stats.managedLedgerStats.entrySizeBuckets.getSum());
-
-        if (!stats.replicationStats.isEmpty()) {
-            stats.replicationStats.forEach((remoteCluster, replStats) -> {
-                metricWithRemoteCluster(stream, cluster, namespace, "pulsar_replication_rate_in", remoteCluster,
-                        replStats.msgRateIn);
-                metricWithRemoteCluster(stream, cluster, namespace, "pulsar_replication_rate_out", remoteCluster,
-                        replStats.msgRateOut);
-                metricWithRemoteCluster(stream, cluster, namespace, "pulsar_replication_throughput_in", remoteCluster,
-                        replStats.msgThroughputIn);
-                metricWithRemoteCluster(stream, cluster, namespace, "pulsar_replication_throughput_out", remoteCluster,
-                        replStats.msgThroughputOut);
-                metricWithRemoteCluster(stream, cluster, namespace, "pulsar_replication_backlog", remoteCluster,
-                        replStats.replicationBacklog);
-                metricWithRemoteCluster(stream, cluster, namespace, "pulsar_replication_connected_count", remoteCluster,
-                        replStats.connectedCount);
-                metricWithRemoteCluster(stream, cluster, namespace, "pulsar_replication_rate_expired", remoteCluster,
-                        replStats.msgRateExpired);
-                metricWithRemoteCluster(stream, cluster, namespace, "pulsar_replication_delay_in_seconds",
-                        remoteCluster, replStats.replicationDelayInSeconds);
-            });
-        }
+        writeMetric(stream, "pulsar_entry_size_le_128", entrySizeBuckets[0], cluster, namespace);
+        writeMetric(stream, "pulsar_entry_size_le_512", entrySizeBuckets[1], cluster, namespace);
+        writeMetric(stream, "pulsar_entry_size_le_1_kb", entrySizeBuckets[2], cluster, namespace);
+        writeMetric(stream, "pulsar_entry_size_le_2_kb", entrySizeBuckets[3], cluster, namespace);
+        writeMetric(stream, "pulsar_entry_size_le_4_kb", entrySizeBuckets[4], cluster, namespace);
+        writeMetric(stream, "pulsar_entry_size_le_16_kb", entrySizeBuckets[5], cluster, namespace);
+        writeMetric(stream, "pulsar_entry_size_le_100_kb", entrySizeBuckets[6], cluster, namespace);
+        writeMetric(stream, "pulsar_entry_size_le_1_mb", entrySizeBuckets[7], cluster, namespace);
+        writeMetric(stream, "pulsar_entry_size_le_overflow", entrySizeBuckets[8], cluster, namespace);
+        writeMetric(stream, "pulsar_entry_size_count", stats.managedLedgerStats.entrySizeBuckets.getCount(),
+                cluster, namespace);
+        writeMetric(stream, "pulsar_entry_size_sum", stats.managedLedgerStats.entrySizeBuckets.getSum(),
+                cluster, namespace);
+
+        writeReplicationStat(stream, "pulsar_replication_rate_in", stats,
+                replStats -> replStats.msgRateIn, cluster, namespace);
+        writeReplicationStat(stream, "pulsar_replication_rate_out", stats,
+                replStats -> replStats.msgRateOut, cluster, namespace);
+        writeReplicationStat(stream, "pulsar_replication_throughput_in", stats,
+                replStats -> replStats.msgThroughputIn, cluster, namespace);
+        writeReplicationStat(stream, "pulsar_replication_throughput_out", stats,
+                replStats -> replStats.msgThroughputOut, cluster, namespace);
+        writeReplicationStat(stream, "pulsar_replication_backlog", stats,
+                replStats -> replStats.replicationBacklog, cluster, namespace);
+        writeReplicationStat(stream, "pulsar_replication_connected_count", stats,
+                replStats -> replStats.connectedCount, cluster, namespace);
+        writeReplicationStat(stream, "pulsar_replication_rate_expired", stats,
+                replStats -> replStats.msgRateExpired, cluster, namespace);
+        writeReplicationStat(stream, "pulsar_replication_delay_in_seconds", stats,
+                replStats -> replStats.replicationDelayInSeconds, cluster, namespace);
     }
 
-    private static void metric(SimpleTextOutputStream stream, String cluster, String name,
-            long value) {
-        TopicStats.metricType(stream, name);
-        stream.write(name)
-                .write("{cluster=\"").write(cluster).write("\"} ")
-                .write(value).write(' ').write(System.currentTimeMillis())
-                .write('\n');
+    private static void writePulsarMsgBacklog(PrometheusMetricStreams stream, Number value,
+                                              String cluster, String namespace) {
+        stream.writeSample("pulsar_msg_backlog", value, "cluster", cluster, "namespace", namespace,
+                "remote_cluster",
+                "local");
     }
 
-    private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String name,
-                               long value) {
-        TopicStats.metricType(stream, name);
-        stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace).write("\"} ");
-        stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
+    private static void writeMetric(PrometheusMetricStreams stream, String metricName, Number value,
+                                    String cluster) {
+        stream.writeSample(metricName, value, "cluster", cluster);
     }
 
-    private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String name,
-                               double value) {
-        TopicStats.metricType(stream, name);
-        stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace).write("\"} ");
-        stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
+    private static void writeMetric(PrometheusMetricStreams stream, String metricName, Number value, String cluster,
+                                    String namespace) {
+        stream.writeSample(metricName, value, "cluster", cluster, "namespace", namespace);
     }
 
-    private static void metricWithRemoteCluster(SimpleTextOutputStream stream, String cluster, String namespace,
-                                                String name, String remoteCluster, double value) {
-        TopicStats.metricType(stream, name);
-        stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace);
-        stream.write("\",remote_cluster=\"").write(remoteCluster).write("\"} ");
-        stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
+    private static void writeReplicationStat(PrometheusMetricStreams stream, String metricName,
+                                             AggregatedNamespaceStats namespaceStats,
+                                             Function<AggregatedReplicationStats, Number> sampleValueFunction,
+                                             String cluster, String namespace) {
+        if (!namespaceStats.replicationStats.isEmpty()) {
+            namespaceStats.replicationStats.forEach((remoteCluster, replStats) ->
+                    stream.writeSample(metricName, sampleValueFunction.apply(replStats),
+                            "cluster", cluster,
+                            "namespace", namespace,
+                            "remote_cluster", remoteCluster)
+            );
+        }
     }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricStreams.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricStreams.java
new file mode 100644
index 00000000000..6b6b972c175
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricStreams.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats.prometheus;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
+
+/**
+ * Helper class to ensure that metrics of the same name are grouped together under the same TYPE header when written.
+ * Those are the requirements of the
+ * <a href="https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md#grouping-and-sorting">Prometheus Exposition Format</a>.
+ */
+public class PrometheusMetricStreams {
+    private final Map<String, SimpleTextOutputStream> metricStreamMap = new HashMap<>();
+
+    /**
+     * Write the given metric and sample value to the stream. Will write #TYPE header if metric not seen before.
+     * @param metricName name of the metric.
+     * @param value value of the sample
+     * @param labelsAndValuesArray varargs of label and label value
+     */
+    void writeSample(String metricName, Number value, String... labelsAndValuesArray) {
+        SimpleTextOutputStream stream = initGaugeType(metricName);
+        stream.write(metricName).write('{');
+        for (int i = 0; i < labelsAndValuesArray.length; i += 2) {
+            stream.write(labelsAndValuesArray[i]).write("=\"").write(labelsAndValuesArray[i + 1]).write('\"');
+            if (i + 2 != labelsAndValuesArray.length) {
+                stream.write(',');
+            }
+        }
+        stream.write("} ").write(value).write(' ').write(System.currentTimeMillis()).write('\n');
+    }
+
+    /**
+     * Flush all the stored metrics to the supplied stream.
+     * @param stream the stream to write to.
+     */
+    void flushAllToStream(SimpleTextOutputStream stream) {
+        metricStreamMap.values().forEach(s -> stream.write(s.getBuffer()));
+    }
+
+    /**
+     * Release all the streams to clean up resources.
+     */
+    void releaseAll() {
+        metricStreamMap.values().forEach(s -> s.getBuffer().release());
+        metricStreamMap.clear();
+    }
+
+    private SimpleTextOutputStream initGaugeType(String metricName) {
+        return metricStreamMap.computeIfAbsent(metricName, s -> {
+            SimpleTextOutputStream stream = new SimpleTextOutputStream(PulsarByteBufAllocator.DEFAULT.directBuffer());
+            stream.write("# TYPE ").write(metricName).write(" gauge\n");
+            return stream;
+        });
+    }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
index cd6afd1535d..aa5822c826a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
@@ -52,7 +52,8 @@ import org.apache.pulsar.common.util.SimpleTextOutputStream;
 /**
  * Generate metrics aggregated at the namespace level and optionally at a topic level and formats them out
  * in a text format suitable to be consumed by Prometheus.
- * Format specification can be found at {@link https://prometheus.io/docs/instrumenting/exposition_formats/}
+ * Format specification can be found at <a
+ * href="https://prometheus.io/docs/instrumenting/exposition_formats/">Exposition Formats</a>
  */
 public class PrometheusMetricsGenerator {
 
@@ -86,38 +87,43 @@ public class PrometheusMetricsGenerator {
     }
 
     public static void generate(PulsarService pulsar, boolean includeTopicMetrics, boolean includeConsumerMetrics,
-        boolean includeProducerMetrics, OutputStream out) throws IOException {
+                                boolean includeProducerMetrics, OutputStream out) throws IOException {
         generate(pulsar, includeTopicMetrics, includeConsumerMetrics, includeProducerMetrics, false, out, null);
     }
 
     public static void generate(PulsarService pulsar, boolean includeTopicMetrics, boolean includeConsumerMetrics,
-        boolean includeProducerMetrics, boolean splitTopicAndPartitionIndexLabel,
-        OutputStream out) throws IOException {
+                                boolean includeProducerMetrics, boolean splitTopicAndPartitionIndexLabel,
+                                OutputStream out) throws IOException {
         generate(pulsar, includeTopicMetrics, includeConsumerMetrics, includeProducerMetrics,
                 splitTopicAndPartitionIndexLabel, out, null);
     }
 
     public static void generate(PulsarService pulsar, boolean includeTopicMetrics, boolean includeConsumerMetrics,
-        boolean includeProducerMetrics, boolean splitTopicAndPartitionIndexLabel, OutputStream out,
-        List<PrometheusRawMetricsProvider> metricsProviders)
-        throws IOException {
+                                boolean includeProducerMetrics, boolean splitTopicAndPartitionIndexLabel,
+                                OutputStream out,
+                                List<PrometheusRawMetricsProvider> metricsProviders)
+            throws IOException {
         ByteBuf buf = ByteBufAllocator.DEFAULT.heapBuffer();
+        //Used in namespace/topic and transaction aggregators as share metric names
+        PrometheusMetricStreams metricStreams = new PrometheusMetricStreams();
         try {
             SimpleTextOutputStream stream = new SimpleTextOutputStream(buf);
 
             generateSystemMetrics(stream, pulsar.getConfiguration().getClusterName());
 
             NamespaceStatsAggregator.generate(pulsar, includeTopicMetrics, includeConsumerMetrics,
-                    includeProducerMetrics, splitTopicAndPartitionIndexLabel, stream);
+                    includeProducerMetrics, splitTopicAndPartitionIndexLabel, metricStreams);
 
             if (pulsar.getWorkerServiceOpt().isPresent()) {
                 pulsar.getWorkerService().generateFunctionsStats(stream);
             }
 
             if (pulsar.getConfiguration().isTransactionCoordinatorEnabled()) {
-                TransactionAggregator.generate(pulsar, stream, includeTopicMetrics);
+                TransactionAggregator.generate(pulsar, metricStreams, includeTopicMetrics);
             }
 
+            metricStreams.flushAllToStream(stream);
+
             generateBrokerBasicMetrics(pulsar, stream);
 
             generateManagedLedgerBookieClientMetrics(pulsar, stream);
@@ -129,6 +135,8 @@ public class PrometheusMetricsGenerator {
             }
             out.write(buf.array(), buf.arrayOffset(), buf.readableBytes());
         } finally {
+            //release all the metrics buffers
+            metricStreams.releaseAll();
             buf.release();
         }
     }
@@ -142,17 +150,17 @@ public class PrometheusMetricsGenerator {
         if (pulsar.getConfiguration().isExposeManagedLedgerMetricsInPrometheus()) {
             // generate managedLedger metrics
             parseMetricsToPrometheusMetrics(new ManagedLedgerMetrics(pulsar).generate(),
-                clusterName, Collector.Type.GAUGE, stream);
+                    clusterName, Collector.Type.GAUGE, stream);
         }
 
         if (pulsar.getConfiguration().isExposeManagedCursorMetricsInPrometheus()) {
             // generate managedCursor metrics
             parseMetricsToPrometheusMetrics(new ManagedCursorMetrics(pulsar).generate(),
-                clusterName, Collector.Type.GAUGE, stream);
+                    clusterName, Collector.Type.GAUGE, stream);
         }
 
         parseMetricsToPrometheusMetrics(Collections.singletonList(pulsar.getBrokerService()
-                .getPulsarStats().getBrokerOperabilityMetrics().generateConnectionMetrics()),
+                        .getPulsarStats().getBrokerOperabilityMetrics().generateConnectionMetrics()),
                 clusterName, Collector.Type.GAUGE, stream);
 
         // generate loadBalance metrics
@@ -267,17 +275,17 @@ public class PrometheusMetricsGenerator {
 
     static String getTypeStr(Collector.Type type) {
         switch (type) {
-        case COUNTER:
-            return "counter";
-        case GAUGE:
-            return "gauge";
-        case SUMMARY        :
-            return "summary";
-        case HISTOGRAM:
-            return "histogram";
-        case UNTYPED:
-        default:
-            return "untyped";
+            case COUNTER:
+                return "counter";
+            case GAUGE:
+                return "gauge";
+            case SUMMARY:
+                return "summary";
+            case HISTOGRAM:
+                return "histogram";
+            case UNTYPED:
+            default:
+                return "untyped";
         }
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
index e6e5883847d..9ac2f04eae4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
@@ -23,12 +23,12 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 import org.apache.bookkeeper.mledger.util.StatsBuckets;
-import org.apache.pulsar.common.util.SimpleTextOutputStream;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.compaction.CompactionRecord;
 import org.apache.pulsar.compaction.CompactorMXBean;
 
 class TopicStats {
-
     int subscriptionsCount;
     int producersCount;
     int consumersCount;
@@ -43,7 +43,6 @@ class TopicStats {
     double averageMsgSize;
 
     public long msgBacklog;
-
     long publishRateLimitedTimes;
 
     long backlogQuotaLimit;
@@ -55,9 +54,6 @@ class TopicStats {
     Map<String, AggregatedSubscriptionStats> subscriptionStats = new HashMap<>();
     Map<String, AggregatedProducerStats> producerStats = new HashMap<>();
 
-    // Used for tracking duplicate TYPE definitions
-    static Map<String, String> metricWithTypeDefinition = new HashMap<>();
-
     // For compaction
     long compactionRemovedEventCount;
     long compactionSucceedCount;
@@ -103,378 +99,340 @@ class TopicStats {
         compactionLatencyBuckets.reset();
     }
 
-    static void resetTypes() {
-        metricWithTypeDefinition.clear();
-    }
-
-    static void printTopicStats(SimpleTextOutputStream stream, String cluster, String namespace, String topic,
-                                TopicStats stats, Optional<CompactorMXBean> compactorMXBean,
-                                boolean splitTopicAndPartitionIndexLabel) {
-        metric(stream, cluster, namespace, topic, "pulsar_subscriptions_count", stats.subscriptionsCount,
-                splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, "pulsar_producers_count", stats.producersCount,
-                splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, "pulsar_consumers_count", stats.consumersCount,
-                splitTopicAndPartitionIndexLabel);
-
-        metric(stream, cluster, namespace, topic, "pulsar_rate_in", stats.rateIn,
-                splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, "pulsar_rate_out", stats.rateOut,
-                splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, "pulsar_throughput_in", stats.throughputIn,
-                splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, "pulsar_throughput_out", stats.throughputOut,
-                splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, "pulsar_average_msg_size", stats.averageMsgSize,
-                splitTopicAndPartitionIndexLabel);
-
-        metric(stream, cluster, namespace, topic, "pulsar_storage_size", stats.managedLedgerStats.storageSize,
-                splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, "pulsar_storage_logical_size",
-                stats.managedLedgerStats.storageLogicalSize, splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, "pulsar_msg_backlog", stats.msgBacklog,
-                splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, "pulsar_storage_write_rate",
-                stats.managedLedgerStats.storageWriteRate, splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, "pulsar_storage_read_rate", stats.managedLedgerStats.storageReadRate,
-                splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, "pulsar_storage_backlog_size",
-                stats.managedLedgerStats.backlogSize, splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, "pulsar_publish_rate_limit_times", stats.publishRateLimitedTimes,
-                splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, "pulsar_storage_offloaded_size", stats.managedLedgerStats
-                .offloadedStorageUsed, splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, "pulsar_storage_backlog_quota_limit", stats.backlogQuotaLimit,
-                splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, "pulsar_storage_backlog_quota_limit_time",
-                stats.backlogQuotaLimitTime, splitTopicAndPartitionIndexLabel);
+    public static void printTopicStats(PrometheusMetricStreams stream, TopicStats stats,
+                                       Optional<CompactorMXBean> compactorMXBean, String cluster, String namespace,
+                                       String topic, boolean splitTopicAndPartitionIndexLabel) {
+        writeMetric(stream, "pulsar_subscriptions_count", stats.subscriptionsCount,
+                cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+        writeMetric(stream, "pulsar_producers_count", stats.producersCount,
+                cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+        writeMetric(stream, "pulsar_consumers_count", stats.consumersCount,
+                cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+
+        writeMetric(stream, "pulsar_rate_in", stats.rateIn,
+                cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+        writeMetric(stream, "pulsar_rate_out", stats.rateOut,
+                cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+        writeMetric(stream, "pulsar_throughput_in", stats.throughputIn,
+                cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+        writeMetric(stream, "pulsar_throughput_out", stats.throughputOut,
+                cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+        writeMetric(stream, "pulsar_average_msg_size", stats.averageMsgSize,
+                cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+
+        writeMetric(stream, "pulsar_storage_size", stats.managedLedgerStats.storageSize,
+                cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+        writeMetric(stream, "pulsar_storage_logical_size",
+                stats.managedLedgerStats.storageLogicalSize, cluster, namespace, topic,
+                splitTopicAndPartitionIndexLabel);
+        writeMetric(stream, "pulsar_msg_backlog", stats.msgBacklog,
+                cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+        writeMetric(stream, "pulsar_storage_write_rate", stats.managedLedgerStats.storageWriteRate,
+                cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+        writeMetric(stream, "pulsar_storage_read_rate", stats.managedLedgerStats.storageReadRate,
+                cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+        writeMetric(stream, "pulsar_storage_backlog_size", stats.managedLedgerStats.backlogSize,
+                cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+        writeMetric(stream, "pulsar_publish_rate_limit_times", stats.publishRateLimitedTimes,
+                cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+        writeMetric(stream, "pulsar_storage_offloaded_size", stats.managedLedgerStats
+                .offloadedStorageUsed, cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+        writeMetric(stream, "pulsar_storage_backlog_quota_limit", stats.backlogQuotaLimit,
+                cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+        writeMetric(stream, "pulsar_storage_backlog_quota_limit_time", stats.backlogQuotaLimitTime,
+                cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
 
         long[] latencyBuckets = stats.managedLedgerStats.storageWriteLatencyBuckets.getBuckets();
-        metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_0_5", latencyBuckets[0],
-                splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_1", latencyBuckets[1],
+        writeMetric(stream, "pulsar_storage_write_latency_le_0_5",
+                latencyBuckets[0], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+        writeMetric(stream, "pulsar_storage_write_latency_le_1",
+                latencyBuckets[1], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+        writeMetric(stream, "pulsar_storage_write_latency_le_5",
+                latencyBuckets[2], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+        writeMetric(stream, "pulsar_storage_write_latency_le_10",
+                latencyBuckets[3], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+        writeMetric(stream, "pulsar_storage_write_latency_le_20",
+                latencyBuckets[4], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+        writeMetric(stream, "pulsar_storage_write_latency_le_50",
+                latencyBuckets[5], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+        writeMetric(stream, "pulsar_storage_write_latency_le_100",
+                latencyBuckets[6], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+        writeMetric(stream, "pulsar_storage_write_latency_le_200",
+                latencyBuckets[7], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+        writeMetric(stream, "pulsar_storage_write_latency_le_1000",
+                latencyBuckets[8], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+        writeMetric(stream, "pulsar_storage_write_latency_overflow",
+                latencyBuckets[9], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+        writeMetric(stream, "pulsar_storage_write_latency_count",
+                stats.managedLedgerStats.storageWriteLatencyBuckets.getCount(),
+                cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+        writeMetric(stream, "pulsar_storage_write_latency_sum",
+                stats.managedLedgerStats.storageWriteLatencyBuckets.getSum(), cluster, namespace, topic,
                 splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_5", latencyBuckets[2],
-                splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_10", latencyBuckets[3],
-                splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_20", latencyBuckets[4],
-                splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_50", latencyBuckets[5],
-                splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_100", latencyBuckets[6],
-                splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_200", latencyBuckets[7],
-                splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_1000", latencyBuckets[8],
-                splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_overflow", latencyBuckets[9],
-                splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_count",
-                stats.managedLedgerStats.storageWriteLatencyBuckets.getCount(), splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_sum",
-                stats.managedLedgerStats.storageWriteLatencyBuckets.getSum(), splitTopicAndPartitionIndexLabel);
 
         long[] ledgerWriteLatencyBuckets = stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets();
-        metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_0_5",
-                ledgerWriteLatencyBuckets[0], splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_1",
-                ledgerWriteLatencyBuckets[1], splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_5",
-                ledgerWriteLatencyBuckets[2], splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_10",
-                ledgerWriteLatencyBuckets[3], splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_20",
-                ledgerWriteLatencyBuckets[4], splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_50",
-                ledgerWriteLatencyBuckets[5], splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_100",
-                ledgerWriteLatencyBuckets[6], splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_200",
-                ledgerWriteLatencyBuckets[7], splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_1000",
-                ledgerWriteLatencyBuckets[8], splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_overflow",
-                ledgerWriteLatencyBuckets[9], splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_count",
+        writeMetric(stream, "pulsar_storage_ledger_write_latency_le_0_5",
+                ledgerWriteLatencyBuckets[0], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+        writeMetric(stream, "pulsar_storage_ledger_write_latency_le_1",
+                ledgerWriteLatencyBuckets[1], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+        writeMetric(stream, "pulsar_storage_ledger_write_latency_le_5",
+                ledgerWriteLatencyBuckets[2], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+        writeMetric(stream, "pulsar_storage_ledger_write_latency_le_10",
+                ledgerWriteLatencyBuckets[3], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+        writeMetric(stream, "pulsar_storage_ledger_write_latency_le_20",
+                ledgerWriteLatencyBuckets[4], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+        writeMetric(stream, "pulsar_storage_ledger_write_latency_le_50",
+                ledgerWriteLatencyBuckets[5], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+        writeMetric(stream, "pulsar_storage_ledger_write_latency_le_100",
+                ledgerWriteLatencyBuckets[6], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+        writeMetric(stream, "pulsar_storage_ledger_write_latency_le_200",
+                ledgerWriteLatencyBuckets[7], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+        writeMetric(stream, "pulsar_storage_ledger_write_latency_le_1000",
+                ledgerWriteLatencyBuckets[8], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+        writeMetric(stream, "pulsar_storage_ledger_write_latency_overflow",
+                ledgerWriteLatencyBuckets[9], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+        writeMetric(stream, "pulsar_storage_ledger_write_latency_count",
                 stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getCount(),
-                splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_sum",
+                cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+        writeMetric(stream, "pulsar_storage_ledger_write_latency_sum",
                 stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getSum(),
-                splitTopicAndPartitionIndexLabel);
+                cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
 
         long[] entrySizeBuckets = stats.managedLedgerStats.entrySizeBuckets.getBuckets();
-        metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_128", entrySizeBuckets[0],
+        writeMetric(stream, "pulsar_entry_size_le_128", entrySizeBuckets[0], cluster, namespace, topic,
                 splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_512", entrySizeBuckets[1],
+        writeMetric(stream, "pulsar_entry_size_le_512", entrySizeBuckets[1], cluster, namespace, topic,
                 splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_1_kb", entrySizeBuckets[2],
+        writeMetric(stream, "pulsar_entry_size_le_1_kb", entrySizeBuckets[2], cluster, namespace, topic,
                 splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_2_kb", entrySizeBuckets[3],
+        writeMetric(stream, "pulsar_entry_size_le_2_kb", entrySizeBuckets[3], cluster, namespace, topic,
                 splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_4_kb", entrySizeBuckets[4],
+        writeMetric(stream, "pulsar_entry_size_le_4_kb", entrySizeBuckets[4], cluster, namespace, topic,
                 splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_16_kb", entrySizeBuckets[5],
+        writeMetric(stream, "pulsar_entry_size_le_16_kb", entrySizeBuckets[5], cluster, namespace, topic,
                 splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_100_kb", entrySizeBuckets[6],
+        writeMetric(stream, "pulsar_entry_size_le_100_kb", entrySizeBuckets[6], cluster, namespace, topic,
                 splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_1_mb", entrySizeBuckets[7],
+        writeMetric(stream, "pulsar_entry_size_le_1_mb", entrySizeBuckets[7], cluster, namespace, topic,
                 splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_overflow", entrySizeBuckets[8],
+        writeMetric(stream, "pulsar_entry_size_le_overflow", entrySizeBuckets[8], cluster, namespace, topic,
                 splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, "pulsar_entry_size_count",
-                stats.managedLedgerStats.entrySizeBuckets.getCount(), splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, "pulsar_entry_size_sum",
-                stats.managedLedgerStats.entrySizeBuckets.getSum(), splitTopicAndPartitionIndexLabel);
+        writeMetric(stream, "pulsar_entry_size_count", stats.managedLedgerStats.entrySizeBuckets.getCount(),
+                cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+        writeMetric(stream, "pulsar_entry_size_sum", stats.managedLedgerStats.entrySizeBuckets.getSum(),
+                cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
 
         stats.producerStats.forEach((p, producerStats) -> {
-            metric(stream, cluster, namespace, topic, p, producerStats.producerId, "pulsar_producer_msg_rate_in",
-                    producerStats.msgRateIn, splitTopicAndPartitionIndexLabel);
-            metric(stream, cluster, namespace, topic, p, producerStats.producerId, "pulsar_producer_msg_throughput_in",
-                    producerStats.msgThroughputIn, splitTopicAndPartitionIndexLabel);
-            metric(stream, cluster, namespace, topic, p, producerStats.producerId, "pulsar_producer_msg_average_Size",
-                    producerStats.averageMsgSize, splitTopicAndPartitionIndexLabel);
+            writeProducerMetric(stream, "pulsar_producer_msg_rate_in", producerStats.msgRateIn,
+                    cluster, namespace, topic, p, producerStats.producerId, splitTopicAndPartitionIndexLabel);
+            writeProducerMetric(stream, "pulsar_producer_msg_throughput_in", producerStats.msgThroughputIn,
+                    cluster, namespace, topic, p, producerStats.producerId, splitTopicAndPartitionIndexLabel);
+            writeProducerMetric(stream, "pulsar_producer_msg_average_Size", producerStats.averageMsgSize,
+                    cluster, namespace, topic, p, producerStats.producerId, splitTopicAndPartitionIndexLabel);
         });
 
-        stats.subscriptionStats.forEach((n, subsStats) -> {
-            metric(stream, cluster, namespace, topic, n, "pulsar_subscription_back_log",
-                    subsStats.msgBacklog, splitTopicAndPartitionIndexLabel);
-            metric(stream, cluster, namespace, topic, n, "pulsar_subscription_back_log_no_delayed",
-                    subsStats.msgBacklogNoDelayed, splitTopicAndPartitionIndexLabel);
-            metric(stream, cluster, namespace, topic, n, "pulsar_subscription_delayed",
-                    subsStats.msgDelayed, splitTopicAndPartitionIndexLabel);
-            metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_rate_redeliver",
-                    subsStats.msgRateRedeliver, splitTopicAndPartitionIndexLabel);
-            metric(stream, cluster, namespace, topic, n, "pulsar_subscription_unacked_messages",
-                    subsStats.unackedMessages, splitTopicAndPartitionIndexLabel);
-            metric(stream, cluster, namespace, topic, n, "pulsar_subscription_blocked_on_unacked_messages",
-                    subsStats.blockedSubscriptionOnUnackedMsgs ? 1 : 0, splitTopicAndPartitionIndexLabel);
-            metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_rate_out",
-                    subsStats.msgRateOut, splitTopicAndPartitionIndexLabel);
-            metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_ack_rate",
-                    subsStats.messageAckRate, splitTopicAndPartitionIndexLabel);
-            metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_throughput_out",
-                    subsStats.msgThroughputOut, splitTopicAndPartitionIndexLabel);
-            metric(stream, cluster, namespace, topic, n, "pulsar_out_bytes_total",
-                    subsStats.bytesOutCounter, splitTopicAndPartitionIndexLabel);
-            metric(stream, cluster, namespace, topic, n, "pulsar_out_messages_total",
-                    subsStats.msgOutCounter, splitTopicAndPartitionIndexLabel);
-            metric(stream, cluster, namespace, topic, n, "pulsar_subscription_last_expire_timestamp",
-                    subsStats.lastExpireTimestamp, splitTopicAndPartitionIndexLabel);
-            metric(stream, cluster, namespace, topic, n, "pulsar_subscription_last_acked_timestamp",
-                subsStats.lastAckedTimestamp, splitTopicAndPartitionIndexLabel);
-            metric(stream, cluster, namespace, topic, n, "pulsar_subscription_last_consumed_flow_timestamp",
-                subsStats.lastConsumedFlowTimestamp, splitTopicAndPartitionIndexLabel);
-            metric(stream, cluster, namespace, topic, n, "pulsar_subscription_last_consumed_timestamp",
-                subsStats.lastConsumedTimestamp, splitTopicAndPartitionIndexLabel);
-            metric(stream, cluster, namespace, topic, n, "pulsar_subscription_last_mark_delete_advanced_timestamp",
-                subsStats.lastMarkDeleteAdvancedTimestamp, splitTopicAndPartitionIndexLabel);
-            metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_rate_expired",
-                    subsStats.msgRateExpired, splitTopicAndPartitionIndexLabel);
-            metric(stream, cluster, namespace, topic, n, "pulsar_subscription_total_msg_expired",
-                    subsStats.totalMsgExpired, splitTopicAndPartitionIndexLabel);
+        stats.subscriptionStats.forEach((sub, subsStats) -> {
+            writeSubscriptionMetric(stream, "pulsar_subscription_back_log", subsStats.msgBacklog,
+                    cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
+            writeSubscriptionMetric(stream, "pulsar_subscription_back_log_no_delayed",
+                    subsStats.msgBacklogNoDelayed, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
+            writeSubscriptionMetric(stream, "pulsar_subscription_delayed",
+                    subsStats.msgDelayed, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
+            writeSubscriptionMetric(stream, "pulsar_subscription_msg_rate_redeliver",
+                    subsStats.msgRateRedeliver, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
+            writeSubscriptionMetric(stream, "pulsar_subscription_unacked_messages",
+                    subsStats.unackedMessages, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
+            writeSubscriptionMetric(stream, "pulsar_subscription_blocked_on_unacked_messages",
+                    subsStats.blockedSubscriptionOnUnackedMsgs ? 1 : 0, cluster, namespace, topic, sub,
+                    splitTopicAndPartitionIndexLabel);
+            writeSubscriptionMetric(stream, "pulsar_subscription_msg_rate_out",
+                    subsStats.msgRateOut, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
+            writeSubscriptionMetric(stream, "pulsar_subscription_msg_ack_rate",
+                    subsStats.messageAckRate, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
+            writeSubscriptionMetric(stream, "pulsar_subscription_msg_throughput_out",
+                    subsStats.msgThroughputOut, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
+            writeSubscriptionMetric(stream, "pulsar_out_bytes_total",
+                    subsStats.bytesOutCounter, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
+            writeSubscriptionMetric(stream, "pulsar_out_messages_total",
+                    subsStats.msgOutCounter, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
+            writeSubscriptionMetric(stream, "pulsar_subscription_last_expire_timestamp",
+                    subsStats.lastExpireTimestamp, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
+            writeSubscriptionMetric(stream, "pulsar_subscription_last_acked_timestamp",
+                    subsStats.lastAckedTimestamp, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
+            writeSubscriptionMetric(stream, "pulsar_subscription_last_consumed_flow_timestamp",
+                    subsStats.lastConsumedFlowTimestamp, cluster, namespace, topic, sub,
+                    splitTopicAndPartitionIndexLabel);
+            writeSubscriptionMetric(stream, "pulsar_subscription_last_consumed_timestamp",
+                    subsStats.lastConsumedTimestamp, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
+            writeSubscriptionMetric(stream, "pulsar_subscription_last_mark_delete_advanced_timestamp",
+                    subsStats.lastMarkDeleteAdvancedTimestamp, cluster, namespace, topic, sub,
+                    splitTopicAndPartitionIndexLabel);
+            writeSubscriptionMetric(stream, "pulsar_subscription_msg_rate_expired",
+                    subsStats.msgRateExpired, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
+            writeSubscriptionMetric(stream, "pulsar_subscription_total_msg_expired",
+                    subsStats.totalMsgExpired, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
+
             subsStats.consumerStat.forEach((c, consumerStats) -> {
-                metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(),
-                        "pulsar_consumer_msg_rate_redeliver", consumerStats.msgRateRedeliver,
-                        splitTopicAndPartitionIndexLabel);
-                metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(),
-                        "pulsar_consumer_unacked_messages", consumerStats.unackedMessages,
-                        splitTopicAndPartitionIndexLabel);
-                metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(),
-                        "pulsar_consumer_blocked_on_unacked_messages",
+                writeConsumerMetric(stream, "pulsar_consumer_msg_rate_redeliver", consumerStats.msgRateRedeliver,
+                        cluster, namespace, topic, sub, c, splitTopicAndPartitionIndexLabel);
+                writeConsumerMetric(stream, "pulsar_consumer_unacked_messages", consumerStats.unackedMessages,
+                        cluster, namespace, topic, sub, c, splitTopicAndPartitionIndexLabel);
+                writeConsumerMetric(stream, "pulsar_consumer_blocked_on_unacked_messages",
                         consumerStats.blockedSubscriptionOnUnackedMsgs ? 1 : 0,
-                        splitTopicAndPartitionIndexLabel);
-                metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(),
-                        "pulsar_consumer_msg_rate_out", consumerStats.msgRateOut,
-                        splitTopicAndPartitionIndexLabel);
-                metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(),
-                "pulsar_consumer_msg_ack_rate", consumerStats.msgAckRate,
-                        splitTopicAndPartitionIndexLabel);
-                metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(),
-                        "pulsar_consumer_msg_throughput_out", consumerStats.msgThroughputOut,
-                        splitTopicAndPartitionIndexLabel);
-                metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(),
-                        "pulsar_consumer_available_permits", consumerStats.availablePermits,
-                        splitTopicAndPartitionIndexLabel);
-                metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(),
-                        "pulsar_out_bytes_total", consumerStats.bytesOutCounter,
-                        splitTopicAndPartitionIndexLabel);
-                metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(),
-                        "pulsar_out_messages_total", consumerStats.msgOutCounter,
-                        splitTopicAndPartitionIndexLabel);
+                        cluster, namespace, topic, sub, c, splitTopicAndPartitionIndexLabel);
+                writeConsumerMetric(stream, "pulsar_consumer_msg_rate_out", consumerStats.msgRateOut,
+                        cluster, namespace, topic, sub, c, splitTopicAndPartitionIndexLabel);
+
+                writeConsumerMetric(stream, "pulsar_consumer_msg_ack_rate", consumerStats.msgAckRate,
+                        cluster, namespace, topic, sub, c, splitTopicAndPartitionIndexLabel);
+
+                writeConsumerMetric(stream, "pulsar_consumer_msg_throughput_out", consumerStats.msgThroughputOut,
+                        cluster, namespace, topic, sub, c, splitTopicAndPartitionIndexLabel);
+                writeConsumerMetric(stream, "pulsar_consumer_available_permits", consumerStats.availablePermits,
+                        cluster, namespace, topic, sub, c, splitTopicAndPartitionIndexLabel);
+                writeConsumerMetric(stream, "pulsar_out_bytes_total", consumerStats.bytesOutCounter,
+                        cluster, namespace, topic, sub, c, splitTopicAndPartitionIndexLabel);
+                writeConsumerMetric(stream, "pulsar_out_messages_total", consumerStats.msgOutCounter,
+                        cluster, namespace, topic, sub, c, splitTopicAndPartitionIndexLabel);
             });
         });
 
         if (!stats.replicationStats.isEmpty()) {
             stats.replicationStats.forEach((remoteCluster, replStats) -> {
-                metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_rate_in", remoteCluster,
-                        replStats.msgRateIn, splitTopicAndPartitionIndexLabel);
-                metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_rate_out", remoteCluster,
-                        replStats.msgRateOut, splitTopicAndPartitionIndexLabel);
-                metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_throughput_in",
-                        remoteCluster, replStats.msgThroughputIn, splitTopicAndPartitionIndexLabel);
-                metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_throughput_out",
-                        remoteCluster, replStats.msgThroughputOut, splitTopicAndPartitionIndexLabel);
-                metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_backlog", remoteCluster,
-                        replStats.replicationBacklog, splitTopicAndPartitionIndexLabel);
-                metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_connected_count",
-                        remoteCluster, replStats.connectedCount, splitTopicAndPartitionIndexLabel);
-                metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_rate_expired",
-                        remoteCluster, replStats.msgRateExpired, splitTopicAndPartitionIndexLabel);
-                metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_delay_in_seconds",
-                        remoteCluster, replStats.replicationDelayInSeconds, splitTopicAndPartitionIndexLabel);
+                writeMetric(stream, "pulsar_replication_rate_in", replStats.msgRateIn,
+                        cluster, namespace, topic, remoteCluster, splitTopicAndPartitionIndexLabel);
+                writeMetric(stream, "pulsar_replication_rate_out", replStats.msgRateOut,
+                        cluster, namespace, topic, remoteCluster, splitTopicAndPartitionIndexLabel);
+                writeMetric(stream, "pulsar_replication_throughput_in", replStats.msgThroughputIn,
+                        cluster, namespace, topic, remoteCluster, splitTopicAndPartitionIndexLabel);
+                writeMetric(stream, "pulsar_replication_throughput_out", replStats.msgThroughputOut,
+                        cluster, namespace, topic, remoteCluster, splitTopicAndPartitionIndexLabel);
+                writeMetric(stream, "pulsar_replication_backlog", replStats.replicationBacklog,
+                        cluster, namespace, topic, remoteCluster, splitTopicAndPartitionIndexLabel);
+                writeMetric(stream, "pulsar_replication_connected_count", replStats.connectedCount,
+                        cluster, namespace, topic, remoteCluster, splitTopicAndPartitionIndexLabel);
+                writeMetric(stream, "pulsar_replication_rate_expired", replStats.msgRateExpired,
+                        cluster, namespace, topic, remoteCluster, splitTopicAndPartitionIndexLabel);
+                writeMetric(stream, "pulsar_replication_delay_in_seconds", replStats.replicationDelayInSeconds,
+                        cluster, namespace, topic, remoteCluster, splitTopicAndPartitionIndexLabel);
             });
         }
 
-        metric(stream, cluster, namespace, topic, "pulsar_in_bytes_total", stats.bytesInCounter,
+        writeMetric(stream, "pulsar_in_bytes_total", stats.bytesInCounter, cluster, namespace, topic,
                 splitTopicAndPartitionIndexLabel);
-        metric(stream, cluster, namespace, topic, "pulsar_in_messages_total", stats.msgInCounter,
+        writeMetric(stream, "pulsar_in_messages_total", stats.msgInCounter, cluster, namespace, topic,
                 splitTopicAndPartitionIndexLabel);
 
         // Compaction
         boolean hasCompaction = compactorMXBean.flatMap(mxBean -> mxBean.getCompactionRecordForTopic(topic))
-                .map(__ -> true).orElse(false);
+                .isPresent();
         if (hasCompaction) {
-            metric(stream, cluster, namespace, topic, "pulsar_compaction_removed_event_count",
-                    stats.compactionRemovedEventCount, splitTopicAndPartitionIndexLabel);
-            metric(stream, cluster, namespace, topic, "pulsar_compaction_succeed_count",
-                    stats.compactionSucceedCount, splitTopicAndPartitionIndexLabel);
-            metric(stream, cluster, namespace, topic, "pulsar_compaction_failed_count",
-                    stats.compactionFailedCount, splitTopicAndPartitionIndexLabel);
-            metric(stream, cluster, namespace, topic, "pulsar_compaction_duration_time_in_mills",
-                    stats.compactionDurationTimeInMills, splitTopicAndPartitionIndexLabel);
-            metric(stream, cluster, namespace, topic, "pulsar_compaction_read_throughput",
-                    stats.compactionReadThroughput, splitTopicAndPartitionIndexLabel);
-            metric(stream, cluster, namespace, topic, "pulsar_compaction_write_throughput",
-                    stats.compactionWriteThroughput, splitTopicAndPartitionIndexLabel);
-            metric(stream, cluster, namespace, topic, "pulsar_compaction_compacted_entries_count",
-                    stats.compactionCompactedEntriesCount, splitTopicAndPartitionIndexLabel);
-            metric(stream, cluster, namespace, topic, "pulsar_compaction_compacted_entries_size",
-                    stats.compactionCompactedEntriesSize, splitTopicAndPartitionIndexLabel);
-            long[] compactionLatencyBuckets = stats.compactionLatencyBuckets.getBuckets();
-            metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_0_5",
-                    compactionLatencyBuckets[0], splitTopicAndPartitionIndexLabel);
-            metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_1",
-                    compactionLatencyBuckets[1], splitTopicAndPartitionIndexLabel);
-            metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_5",
-                    compactionLatencyBuckets[2], splitTopicAndPartitionIndexLabel);
-            metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_10",
-                    compactionLatencyBuckets[3], splitTopicAndPartitionIndexLabel);
-            metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_20",
-                    compactionLatencyBuckets[4], splitTopicAndPartitionIndexLabel);
-            metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_50",
-                    compactionLatencyBuckets[5], splitTopicAndPartitionIndexLabel);
-            metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_100",
-                    compactionLatencyBuckets[6], splitTopicAndPartitionIndexLabel);
-            metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_200",
-                    compactionLatencyBuckets[7], splitTopicAndPartitionIndexLabel);
-            metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_1000",
-                    compactionLatencyBuckets[8], splitTopicAndPartitionIndexLabel);
-            metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_overflow",
-                    compactionLatencyBuckets[9], splitTopicAndPartitionIndexLabel);
-            metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_sum",
-                    stats.compactionLatencyBuckets.getSum(), splitTopicAndPartitionIndexLabel);
-            metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_count",
-                    stats.compactionLatencyBuckets.getCount(), splitTopicAndPartitionIndexLabel);
+            writeMetric(stream, "pulsar_compaction_removed_event_count",
+                    stats.compactionRemovedEventCount, cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+            writeMetric(stream, "pulsar_compaction_succeed_count",
+                    stats.compactionSucceedCount, cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+            writeMetric(stream, "pulsar_compaction_failed_count",
+                    stats.compactionFailedCount, cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+            writeMetric(stream, "pulsar_compaction_duration_time_in_mills",
+                    stats.compactionDurationTimeInMills, cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+            writeMetric(stream, "pulsar_compaction_read_throughput",
+                    stats.compactionReadThroughput, cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+            writeMetric(stream, "pulsar_compaction_write_throughput",
+                    stats.compactionWriteThroughput, cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+            writeMetric(stream, "pulsar_compaction_compacted_entries_count",
+                    stats.compactionCompactedEntriesCount, cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+            writeMetric(stream, "pulsar_compaction_compacted_entries_size",
+                    stats.compactionCompactedEntriesSize, cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+
+            long[] compactionBuckets = stats.compactionLatencyBuckets.getBuckets();
+            writeMetric(stream, "pulsar_compaction_latency_le_0_5",
+                    compactionBuckets[0], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+            writeMetric(stream, "pulsar_compaction_latency_le_1",
+                    compactionBuckets[1], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+            writeMetric(stream, "pulsar_compaction_latency_le_5",
+                    compactionBuckets[2], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+            writeMetric(stream, "pulsar_compaction_latency_le_10",
+                    compactionBuckets[3], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+            writeMetric(stream, "pulsar_compaction_latency_le_20",
+                    compactionBuckets[4], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+            writeMetric(stream, "pulsar_compaction_latency_le_50",
+                    compactionBuckets[5], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+            writeMetric(stream, "pulsar_compaction_latency_le_100",
+                    compactionBuckets[6], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+            writeMetric(stream, "pulsar_compaction_latency_le_200",
+                    compactionBuckets[7], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+            writeMetric(stream, "pulsar_compaction_latency_le_1000",
+                    compactionBuckets[8], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+            writeMetric(stream, "pulsar_compaction_latency_overflow",
+                    compactionBuckets[9], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+            writeMetric(stream, "pulsar_compaction_latency_sum",
+                    stats.compactionLatencyBuckets.getSum(), cluster, namespace, topic,
+                    splitTopicAndPartitionIndexLabel);
+            writeMetric(stream, "pulsar_compaction_latency_count",
+                    stats.compactionLatencyBuckets.getCount(), cluster, namespace, topic,
+                    splitTopicAndPartitionIndexLabel);
         }
     }
 
-    static void metricType(SimpleTextOutputStream stream, String name) {
-
-        if (!metricWithTypeDefinition.containsKey(name)) {
-            metricWithTypeDefinition.put(name, "gauge");
-            stream.write("# TYPE ").write(name).write(" gauge\n");
-        }
-
-    }
-
-    private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic,
-            String name, double value, boolean splitTopicAndPartitionIndexLabel) {
-        metricType(stream, name);
-        appendRequiredLabels(stream, cluster, namespace, topic, name, splitTopicAndPartitionIndexLabel).write("\"} ");
-        stream.write(value);
-        appendEndings(stream);
+    private static void writeMetric(PrometheusMetricStreams stream, String metricName, Number value, String cluster,
+                                    String namespace, String topic, boolean splitTopicAndPartitionIndexLabel) {
+        writeTopicMetric(stream, metricName, value, cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
     }
 
-    private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic,
-           String subscription, String name, long value, boolean splitTopicAndPartitionIndexLabel) {
-        metricType(stream, name);
-        appendRequiredLabels(stream, cluster, namespace, topic, name, splitTopicAndPartitionIndexLabel)
-                .write("\",subscription=\"").write(subscription).write("\"} ");
-        stream.write(value);
-        appendEndings(stream);
+    private static void writeMetric(PrometheusMetricStreams stream, String metricName, Number value, String cluster,
+                                    String namespace, String topic, String remoteCluster,
+                                    boolean splitTopicAndPartitionIndexLabel) {
+        writeTopicMetric(stream, metricName, value, cluster, namespace, topic, splitTopicAndPartitionIndexLabel,
+                "remote_cluster", remoteCluster);
     }
 
-    private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic,
-            String producerName, long produceId, String name, double value, boolean splitTopicAndPartitionIndexLabel) {
-        metricType(stream, name);
-        appendRequiredLabels(stream, cluster, namespace, topic, name, splitTopicAndPartitionIndexLabel)
-                .write("\",producer_name=\"").write(producerName)
-                .write("\",producer_id=\"").write(produceId).write("\"} ");
-        stream.write(value);
-        appendEndings(stream);
+    private static void writeProducerMetric(PrometheusMetricStreams stream, String metricName, Number value,
+                                            String cluster, String namespace, String topic, String producer,
+                                            long producerId, boolean splitTopicAndPartitionIndexLabel) {
+        writeTopicMetric(stream, metricName, value, cluster, namespace, topic, splitTopicAndPartitionIndexLabel,
+                "producer_name", producer, "producer_id", String.valueOf(producerId));
     }
 
-    private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic,
-            String subscription, String name, double value, boolean splitTopicAndPartitionIndexLabel) {
-        metricType(stream, name);
-        appendRequiredLabels(stream, cluster, namespace, topic, name, splitTopicAndPartitionIndexLabel)
-                .write("\",subscription=\"").write(subscription).write("\"} ");
-        stream.write(value);
-        appendEndings(stream);
-    }
-
-    private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic,
-            String subscription, String consumerName, long consumerId, String name, long value,
-            boolean splitTopicAndPartitionIndexLabel) {
-        metricType(stream, name);
-        appendRequiredLabels(stream, cluster, namespace, topic, name, splitTopicAndPartitionIndexLabel)
-                .write("\",subscription=\"").write(subscription)
-                .write("\",consumer_name=\"").write(consumerName).write("\",consumer_id=\"").write(consumerId)
-                .write("\"} ");
-        stream.write(value);
-        appendEndings(stream);
-    }
 
-    private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic,
-            String subscription, String consumerName, long consumerId, String name, double value,
-            boolean splitTopicAndPartitionIndexLabel) {
-        metricType(stream, name);
-        appendRequiredLabels(stream, cluster, namespace, topic, name, splitTopicAndPartitionIndexLabel)
-                .write("\",subscription=\"").write(subscription)
-                .write("\",consumer_name=\"").write(consumerName).write("\",consumer_id=\"")
-                .write(consumerId).write("\"} ");
-        stream.write(value);
-        appendEndings(stream);
+    private static void writeSubscriptionMetric(PrometheusMetricStreams stream, String metricName, Number value,
+                                                String cluster, String namespace, String topic, String subscription,
+                                                boolean splitTopicAndPartitionIndexLabel) {
+        writeTopicMetric(stream, metricName, value, cluster, namespace, topic, splitTopicAndPartitionIndexLabel,
+                "subscription", subscription);
     }
 
-    private static void metricWithRemoteCluster(SimpleTextOutputStream stream, String cluster, String namespace,
-            String topic, String name, String remoteCluster, double value, boolean splitTopicAndPartitionIndexLabel) {
-        metricType(stream, name);
-        appendRequiredLabels(stream, cluster, namespace, topic, name, splitTopicAndPartitionIndexLabel)
-                .write("\",remote_cluster=\"").write(remoteCluster).write("\"} ");
-        stream.write(value);
-        appendEndings(stream);
+    private static void writeConsumerMetric(PrometheusMetricStreams stream, String metricName, Number value,
+                                            String cluster, String namespace, String topic, String subscription,
+                                            Consumer consumer, boolean splitTopicAndPartitionIndexLabel) {
+        writeTopicMetric(stream, metricName, value, cluster, namespace, topic, splitTopicAndPartitionIndexLabel,
+                "subscription", subscription, "consumer_name", consumer.consumerName(),
+                "consumer_id", String.valueOf(consumer.consumerId()));
     }
 
-    private static SimpleTextOutputStream appendRequiredLabels(SimpleTextOutputStream stream, String cluster,
-            String namespace, String topic, String name, boolean splitTopicAndPartitionIndexLabel) {
-        stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace);
+    static void writeTopicMetric(PrometheusMetricStreams stream, String metricName, Number value, String cluster,
+                                 String namespace, String topic, boolean splitTopicAndPartitionIndexLabel,
+                                 String... extraLabelsAndValues) {
+        String[] labelsAndValues = new String[splitTopicAndPartitionIndexLabel ? 8 : 6];
+        labelsAndValues[0] = "cluster";
+        labelsAndValues[1] = cluster;
+        labelsAndValues[2] = "namespace";
+        labelsAndValues[3] = namespace;
+        labelsAndValues[4] = "topic";
         if (splitTopicAndPartitionIndexLabel) {
             int index = topic.indexOf(PARTITIONED_TOPIC_SUFFIX);
             if (index > 0) {
-                stream.write("\",topic=\"").write(topic.substring(0, index)).write("\",partition=\"")
-                        .write(topic.substring(index + PARTITIONED_TOPIC_SUFFIX.length()));
+                labelsAndValues[5] = topic.substring(0, index);
+                labelsAndValues[6] = "partition";
+                labelsAndValues[7] = topic.substring(index + PARTITIONED_TOPIC_SUFFIX.length());
             } else {
-                stream.write("\",topic=\"").write(topic).write("\",partition=\"").write("-1");
+                labelsAndValues[5] = topic;
+                labelsAndValues[6] = "partition";
+                labelsAndValues[7] = "-1";
             }
         } else {
-            stream.write("\",topic=\"").write(topic);
+            labelsAndValues[5] = topic;
         }
-        return stream;
-    }
-
-    private static void appendEndings(SimpleTextOutputStream stream) {
-        stream.write(' ').write(System.currentTimeMillis()).write('\n');
+        String[] labels = ArrayUtils.addAll(labelsAndValues, extraLabelsAndValues);
+        stream.writeSample(metricName, value, labels);
     }
-}
+}
\ No newline at end of file
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java
index e6ac1535f43..8c58b516333 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java
@@ -20,8 +20,6 @@ package org.apache.pulsar.broker.stats.prometheus;
 
 import static org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsEventsNames;
 import io.netty.util.concurrent.FastThreadLocal;
-import java.util.HashMap;
-import java.util.Map;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl;
@@ -30,7 +28,6 @@ import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.util.SimpleTextOutputStream;
 import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl;
 import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
 import org.apache.pulsar.transaction.coordinator.impl.TransactionMetadataStoreStats;
@@ -38,21 +35,10 @@ import org.apache.pulsar.transaction.coordinator.impl.TransactionMetadataStoreSt
 @Slf4j
 public class TransactionAggregator {
 
-    /**
-     * Used for tracking duplicate TYPE definitions.
-     */
-    private static final FastThreadLocal<Map<String, String>> threadLocalMetricWithTypeDefinition =
-            new FastThreadLocal() {
-                @Override
-                protected Map<String, String> initialValue() {
-                    return new HashMap<>();
-                }
-             };
-
     private static final FastThreadLocal<AggregatedTransactionCoordinatorStats> localTransactionCoordinatorStats =
             new FastThreadLocal<AggregatedTransactionCoordinatorStats>() {
                 @Override
-                protected AggregatedTransactionCoordinatorStats initialValue() throws Exception {
+                protected AggregatedTransactionCoordinatorStats initialValue() {
                     return new AggregatedTransactionCoordinatorStats();
                 }
             };
@@ -60,21 +46,18 @@ public class TransactionAggregator {
     private static final FastThreadLocal<ManagedLedgerStats> localManageLedgerStats =
             new FastThreadLocal<ManagedLedgerStats>() {
                 @Override
-                protected ManagedLedgerStats initialValue() throws Exception {
+                protected ManagedLedgerStats initialValue() {
                     return new ManagedLedgerStats();
                 }
             };
 
-    public static void generate(PulsarService pulsar, SimpleTextOutputStream stream, boolean includeTopicMetrics) {
+    public static void generate(PulsarService pulsar, PrometheusMetricStreams stream, boolean includeTopicMetrics) {
         String cluster = pulsar.getConfiguration().getClusterName();
-        Map<String, String> metricWithTypeDefinition = threadLocalMetricWithTypeDefinition.get();
-        metricWithTypeDefinition.clear();
 
         if (includeTopicMetrics) {
-            pulsar.getBrokerService().getMultiLayerTopicMap().forEach((namespace, bundlesMap) -> {
 
-                bundlesMap.forEach((bundle, topicsMap) -> {
-                    topicsMap.forEach((name, topic) -> {
+            pulsar.getBrokerService().getMultiLayerTopicMap().forEach((namespace, bundlesMap) ->
+                    bundlesMap.forEach((bundle, topicsMap) -> topicsMap.forEach((name, topic) -> {
                         if (topic instanceof PersistentTopic) {
                             topic.getSubscriptions().values().forEach(subscription -> {
                                 try {
@@ -82,9 +65,8 @@ public class TransactionAggregator {
                                     if (!checkTopicIsEventsNames(TopicName.get(subscription.getTopic().getName()))
                                             && subscription instanceof  PersistentSubscription
                                             && ((PersistentSubscription) subscription).checkIfPendingAckStoreInit()) {
-                                        ManagedLedger managedLedger =
-                                                ((PersistentSubscription) subscription)
-                                                        .getPendingAckManageLedger().get();
+                                        ManagedLedger managedLedger = ((PersistentSubscription) subscription)
+                                                .getPendingAckManageLedger().get();
                                         generateManageLedgerStats(managedLedger,
                                                 stream, cluster, namespace, name, subscription.getName());
                                     }
@@ -93,9 +75,7 @@ public class TransactionAggregator {
                                 }
                             });
                         }
-                    });
-                });
-            });
+                    })));
         }
         AggregatedTransactionCoordinatorStats transactionCoordinatorStats = localTransactionCoordinatorStats.get();
 
@@ -124,18 +104,18 @@ public class TransactionAggregator {
 
                     localManageLedgerStats.get().reset();
                     if (transactionMetadataStore instanceof MLTransactionMetadataStore) {
-                       ManagedLedger managedLedger =
-                               ((MLTransactionMetadataStore) transactionMetadataStore).getManagedLedger();
+                        ManagedLedger managedLedger =
+                                ((MLTransactionMetadataStore) transactionMetadataStore).getManagedLedger();
                         generateManageLedgerStats(managedLedger,
                                 stream, cluster, NamespaceName.SYSTEM_NAMESPACE.toString(),
                                 MLTransactionLogImpl.TRANSACTION_LOG_PREFIX + transactionCoordinatorID.getId(),
                                 MLTransactionLogImpl.TRANSACTION_SUBSCRIPTION_NAME);
                     }
 
-        });
+                });
     }
 
-    private static void generateManageLedgerStats(ManagedLedger managedLedger, SimpleTextOutputStream stream,
+    private static void generateManageLedgerStats(ManagedLedger managedLedger, PrometheusMetricStreams stream,
                                                   String cluster, String namespace, String topic, String subscription) {
         ManagedLedgerStats managedLedgerStats = localManageLedgerStats.get();
         ManagedLedgerMBeanImpl mlStats = (ManagedLedgerMBeanImpl) managedLedger.getStats();
@@ -157,174 +137,149 @@ public class TransactionAggregator {
 
         managedLedgerStats.storageWriteRate = mlStats.getAddEntryMessagesRate();
         managedLedgerStats.storageReadRate = mlStats.getReadEntriesRate();
-        printManageLedgerStats(stream, cluster, namespace, topic,
-                subscription, managedLedgerStats);
-    }
-
-    private static void metricType(SimpleTextOutputStream stream, String name) {
-        Map<String, String> metricWithTypeDefinition = threadLocalMetricWithTypeDefinition.get();
-        if (!metricWithTypeDefinition.containsKey(name)) {
-            metricWithTypeDefinition.put(name, "gauge");
-            stream.write("# TYPE ").write(name).write(" gauge\n");
-        }
-
-    }
-
-    private static void metric(SimpleTextOutputStream stream, String cluster, String name,
-                               double value, long coordinatorId) {
-        metricType(stream, name);
-        stream.write(name)
-                .write("{cluster=\"").write(cluster)
-                .write("\",coordinator_id=\"").write(coordinatorId).write("\"} ")
-                .write(value).write(' ').write(System.currentTimeMillis())
-                .write('\n');
+        printManageLedgerStats(stream, cluster, namespace, topic, subscription, managedLedgerStats);
     }
 
-    private static void metrics(SimpleTextOutputStream stream, String cluster, String namespace,
-                                String topic, String subscription, String name, long value) {
-        stream.write(name).write("{cluster=\"").write(cluster).write("\", namespace=\"").write(namespace)
-                .write("\",topic=\"").write(topic).write("\",subscription=\"").write(subscription).write("\"} ");
-        stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
-    }
-
-    private static void metrics(SimpleTextOutputStream stream, String cluster, String namespace,
-                                String topic, String subscription, String name, double value) {
-        stream.write(name).write("{cluster=\"").write(cluster).write("\", namespace=\"").write(namespace)
-                .write("\",topic=\"").write(topic).write("\",subscription=\"").write(subscription).write("\"} ");
-        stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
-    }
-
-    private static void printManageLedgerStats(SimpleTextOutputStream stream, String cluster, String namespace,
+    private static void printManageLedgerStats(PrometheusMetricStreams stream, String cluster, String namespace,
                                                String topic, String subscription, ManagedLedgerStats stats) {
 
-        metrics(stream, cluster, namespace, topic, subscription,
-                "pulsar_storage_size", stats.storageSize);
-        metrics(stream, cluster, namespace, topic, subscription,
-                "pulsar_storage_logical_size", stats.storageLogicalSize);
-        metrics(stream, cluster, namespace, topic, subscription,
-                "pulsar_storage_backlog_size", stats.backlogSize);
-        metrics(stream, cluster, namespace, topic, subscription,
-                "pulsar_storage_offloaded_size", stats.offloadedStorageUsed);
+        writeMetric(stream, "pulsar_storage_size", stats.storageSize, cluster, namespace, topic, subscription);
+        writeMetric(stream, "pulsar_storage_logical_size", stats.storageLogicalSize, cluster, namespace, topic,
+                subscription);
+        writeMetric(stream, "pulsar_storage_backlog_size", stats.backlogSize, cluster, namespace, topic,
+                subscription);
+        writeMetric(stream, "pulsar_storage_offloaded_size", stats.offloadedStorageUsed, cluster, namespace, topic,
+                subscription);
 
-        metrics(stream, cluster, namespace, topic, subscription,
-                "pulsar_storage_write_rate", stats.storageWriteRate);
-        metrics(stream, cluster, namespace, topic, subscription,
-                "pulsar_storage_read_rate", stats.storageReadRate);
+        writeMetric(stream, "pulsar_storage_write_rate", stats.storageWriteRate, cluster, namespace, topic,
+                subscription);
+        writeMetric(stream, "pulsar_storage_read_rate", stats.storageReadRate, cluster, namespace, topic,
+                subscription);
 
         stats.storageWriteLatencyBuckets.refresh();
         long[] latencyBuckets = stats.storageWriteLatencyBuckets.getBuckets();
-        metric(stream, cluster, namespace, topic, subscription,
-                "pulsar_storage_write_latency_le_0_5", latencyBuckets[0]);
-        metric(stream, cluster, namespace, topic, subscription,
-                "pulsar_storage_write_latency_le_1", latencyBuckets[1]);
-        metric(stream, cluster, namespace, topic, subscription,
-                "pulsar_storage_write_latency_le_5", latencyBuckets[2]);
-        metric(stream, cluster, namespace, topic, subscription,
-                "pulsar_storage_write_latency_le_10", latencyBuckets[3]);
-        metric(stream, cluster, namespace, topic, subscription,
-                "pulsar_storage_write_latency_le_20", latencyBuckets[4]);
-        metric(stream, cluster, namespace, topic, subscription,
-                "pulsar_storage_write_latency_le_50", latencyBuckets[5]);
-        metric(stream, cluster, namespace, topic, subscription,
-                "pulsar_storage_write_latency_le_100", latencyBuckets[6]);
-        metric(stream, cluster, namespace, topic, subscription,
-                "pulsar_storage_write_latency_le_200", latencyBuckets[7]);
-        metric(stream, cluster, namespace, topic, subscription,
-                "pulsar_storage_write_latency_le_1000", latencyBuckets[8]);
-        metric(stream, cluster, namespace, topic, subscription,
-                "pulsar_storage_write_latency_overflow", latencyBuckets[9]);
-        metric(stream, cluster, namespace, topic, subscription, "pulsar_storage_write_latency_count",
-                stats.storageWriteLatencyBuckets.getCount());
-        metric(stream, cluster, namespace, topic, subscription, "pulsar_storage_write_latency_sum",
-                stats.storageWriteLatencyBuckets.getSum());
+        writeMetric(stream, "pulsar_storage_write_latency_le_0_5", latencyBuckets[0], cluster, namespace, topic,
+                subscription);
+        writeMetric(stream, "pulsar_storage_write_latency_le_1", latencyBuckets[1], cluster, namespace, topic,
+                subscription);
+        writeMetric(stream, "pulsar_storage_write_latency_le_5", latencyBuckets[2], cluster, namespace, topic,
+                subscription);
+        writeMetric(stream, "pulsar_storage_write_latency_le_10", latencyBuckets[3], cluster, namespace, topic,
+                subscription);
+        writeMetric(stream, "pulsar_storage_write_latency_le_20", latencyBuckets[4], cluster, namespace, topic,
+                subscription);
+        writeMetric(stream, "pulsar_storage_write_latency_le_50", latencyBuckets[5], cluster, namespace, topic,
+                subscription);
+        writeMetric(stream, "pulsar_storage_write_latency_le_100", latencyBuckets[6], cluster, namespace, topic,
+                subscription);
+        writeMetric(stream, "pulsar_storage_write_latency_le_200", latencyBuckets[7], cluster, namespace, topic,
+                subscription);
+        writeMetric(stream, "pulsar_storage_write_latency_le_1000", latencyBuckets[8], cluster, namespace, topic,
+                subscription);
+        writeMetric(stream, "pulsar_storage_write_latency_overflow", latencyBuckets[9], cluster, namespace, topic,
+                subscription);
+        writeMetric(stream, "pulsar_storage_write_latency_count", stats.storageWriteLatencyBuckets.getCount(),
+                cluster, namespace, topic, subscription);
+        writeMetric(stream, "pulsar_storage_write_latency_sum", stats.storageWriteLatencyBuckets.getSum(), cluster,
+                namespace, topic, subscription);
 
         stats.storageLedgerWriteLatencyBuckets.refresh();
-        long[] ledgerWritelatencyBuckets = stats.storageLedgerWriteLatencyBuckets.getBuckets();
-        metric(stream, cluster, namespace, topic, subscription,
-                "pulsar_storage_ledger_write_latency_le_0_5", ledgerWritelatencyBuckets[0]);
-        metric(stream, cluster, namespace, topic, subscription,
-                "pulsar_storage_ledger_write_latency_le_1", ledgerWritelatencyBuckets[1]);
-        metric(stream, cluster, namespace, topic, subscription,
-                "pulsar_storage_ledger_write_latency_le_5", ledgerWritelatencyBuckets[2]);
-        metric(stream, cluster, namespace, topic, subscription,
-                "pulsar_storage_ledger_write_latency_le_10", ledgerWritelatencyBuckets[3]);
-        metric(stream, cluster, namespace, topic, subscription,
-                "pulsar_storage_ledger_write_latency_le_20", ledgerWritelatencyBuckets[4]);
-        metric(stream, cluster, namespace, topic, subscription,
-                "pulsar_storage_ledger_write_latency_le_50", ledgerWritelatencyBuckets[5]);
-        metric(stream, cluster, namespace, topic, subscription,
-                "pulsar_storage_ledger_write_latency_le_100", ledgerWritelatencyBuckets[6]);
-        metric(stream, cluster, namespace, topic, subscription,
-                "pulsar_storage_ledger_write_latency_le_200", ledgerWritelatencyBuckets[7]);
-        metric(stream, cluster, namespace, topic, subscription,
-                "pulsar_storage_ledger_write_latency_le_1000", ledgerWritelatencyBuckets[8]);
-        metric(stream, cluster, namespace, topic, subscription, "pulsar_storage_ledger_write_latency_overflow",
-                ledgerWritelatencyBuckets[9]);
-        metric(stream, cluster, namespace, topic, subscription, "pulsar_storage_ledger_write_latency_count",
-                stats.storageLedgerWriteLatencyBuckets.getCount());
-        metric(stream, cluster, namespace, topic, subscription, "pulsar_storage_ledger_write_latency_sum",
-                stats.storageLedgerWriteLatencyBuckets.getSum());
+        long[] ledgerWriteLatencyBuckets = stats.storageLedgerWriteLatencyBuckets.getBuckets();
+        writeMetric(stream, "pulsar_storage_ledger_write_latency_le_0_5", ledgerWriteLatencyBuckets[0], cluster,
+                namespace, topic, subscription);
+        writeMetric(stream, "pulsar_storage_ledger_write_latency_le_1", ledgerWriteLatencyBuckets[1], cluster,
+                namespace, topic, subscription);
+        writeMetric(stream, "pulsar_storage_ledger_write_latency_le_5", ledgerWriteLatencyBuckets[2], cluster,
+                namespace, topic, subscription);
+        writeMetric(stream, "pulsar_storage_ledger_write_latency_le_10", ledgerWriteLatencyBuckets[3], cluster,
+                namespace, topic, subscription);
+        writeMetric(stream, "pulsar_storage_ledger_write_latency_le_20", ledgerWriteLatencyBuckets[4], cluster,
+                namespace, topic, subscription);
+        writeMetric(stream, "pulsar_storage_ledger_write_latency_le_50", ledgerWriteLatencyBuckets[5], cluster,
+                namespace, topic, subscription);
+        writeMetric(stream, "pulsar_storage_ledger_write_latency_le_100", ledgerWriteLatencyBuckets[6], cluster,
+                namespace, topic, subscription);
+        writeMetric(stream, "pulsar_storage_ledger_write_latency_le_200", ledgerWriteLatencyBuckets[7], cluster,
+                namespace, topic, subscription);
+        writeMetric(stream, "pulsar_storage_ledger_write_latency_le_1000", ledgerWriteLatencyBuckets[8], cluster,
+                namespace, topic, subscription);
+        writeMetric(stream, "pulsar_storage_ledger_write_latency_overflow", ledgerWriteLatencyBuckets[9], cluster,
+                namespace, topic, subscription);
+        writeMetric(stream, "pulsar_storage_ledger_write_latency_count",
+                stats.storageLedgerWriteLatencyBuckets.getCount(), cluster, namespace, topic, subscription);
+        writeMetric(stream, "pulsar_storage_ledger_write_latency_sum",
+                stats.storageLedgerWriteLatencyBuckets.getSum(), cluster, namespace, topic, subscription);
 
         stats.entrySizeBuckets.refresh();
         long[] entrySizeBuckets = stats.entrySizeBuckets.getBuckets();
-        metric(stream, cluster, namespace, topic, subscription, "pulsar_entry_size_le_128", entrySizeBuckets[0]);
-        metric(stream, cluster, namespace, topic, subscription, "pulsar_entry_size_le_512", entrySizeBuckets[1]);
-        metric(stream, cluster, namespace, topic, subscription, "pulsar_entry_size_le_1_kb", entrySizeBuckets[2]);
-        metric(stream, cluster, namespace, topic, subscription, "pulsar_entry_size_le_2_kb", entrySizeBuckets[3]);
-        metric(stream, cluster, namespace, topic, subscription, "pulsar_entry_size_le_4_kb", entrySizeBuckets[4]);
-        metric(stream, cluster, namespace, topic, subscription, "pulsar_entry_size_le_16_kb", entrySizeBuckets[5]);
-        metric(stream, cluster, namespace, topic, subscription,
-                "pulsar_entry_size_le_100_kb", entrySizeBuckets[6]);
-        metric(stream, cluster, namespace, topic, subscription, "pulsar_entry_size_le_1_mb", entrySizeBuckets[7]);
-        metric(stream, cluster, namespace, topic, subscription,
-                "pulsar_entry_size_le_overflow", entrySizeBuckets[8]);
-        metric(stream, cluster, namespace, topic, subscription,
-                "pulsar_entry_size_count", stats.entrySizeBuckets.getCount());
-        metric(stream, cluster, namespace, topic, subscription,
-                "pulsar_entry_size_sum", stats.entrySizeBuckets.getSum());
-    }
-
-    private static void metric(SimpleTextOutputStream stream, String cluster,
-                               String namespace, String topic, String subscription,
-                               String name, long value) {
-        stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace)
-                .write("\",topic=\"").write(topic).write("\",subscription=\"").write(subscription).write("\"} ");
-        stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
+        writeMetric(stream, "pulsar_entry_size_le_128", entrySizeBuckets[0], cluster, namespace, topic,
+                subscription);
+        writeMetric(stream, "pulsar_entry_size_le_512", entrySizeBuckets[1], cluster, namespace, topic,
+                subscription);
+        writeMetric(stream, "pulsar_entry_size_le_1_kb", entrySizeBuckets[2], cluster, namespace, topic,
+                subscription);
+        writeMetric(stream, "pulsar_entry_size_le_2_kb", entrySizeBuckets[3], cluster, namespace, topic,
+                subscription);
+        writeMetric(stream, "pulsar_entry_size_le_4_kb", entrySizeBuckets[4], cluster, namespace, topic,
+                subscription);
+        writeMetric(stream, "pulsar_entry_size_le_16_kb", entrySizeBuckets[5], cluster, namespace, topic,
+                subscription);
+        writeMetric(stream, "pulsar_entry_size_le_100_kb", entrySizeBuckets[6], cluster, namespace, topic,
+                subscription);
+        writeMetric(stream, "pulsar_entry_size_le_1_mb", entrySizeBuckets[7], cluster, namespace, topic,
+                subscription);
+        writeMetric(stream, "pulsar_entry_size_le_overflow", entrySizeBuckets[8], cluster, namespace, topic,
+                subscription);
+        writeMetric(stream, "pulsar_entry_size_count", stats.entrySizeBuckets.getCount(), cluster, namespace,
+                topic, subscription);
+        writeMetric(stream, "pulsar_entry_size_sum", stats.entrySizeBuckets.getSum(), cluster, namespace, topic,
+                subscription);
     }
 
-    static void printTransactionCoordinatorStats(SimpleTextOutputStream stream, String cluster,
+    static void printTransactionCoordinatorStats(PrometheusMetricStreams stream, String cluster,
                                                  AggregatedTransactionCoordinatorStats stats,
                                                  long coordinatorId) {
-        metric(stream, cluster, "pulsar_txn_active_count",
-                stats.actives, coordinatorId);
-        metric(stream, cluster, "pulsar_txn_committed_count",
-                stats.committedCount, coordinatorId);
-        metric(stream, cluster, "pulsar_txn_aborted_count",
-                stats.abortedCount, coordinatorId);
-        metric(stream, cluster, "pulsar_txn_created_count",
-                stats.createdCount, coordinatorId);
-        metric(stream, cluster, "pulsar_txn_timeout_count",
-                stats.timeoutCount, coordinatorId);
-        metric(stream, cluster, "pulsar_txn_append_log_count",
-                stats.appendLogCount, coordinatorId);
+        writeMetric(stream, "pulsar_txn_active_count", stats.actives, cluster,
+                coordinatorId);
+        writeMetric(stream, "pulsar_txn_committed_count", stats.committedCount, cluster,
+                coordinatorId);
+        writeMetric(stream, "pulsar_txn_aborted_count", stats.abortedCount, cluster,
+                coordinatorId);
+        writeMetric(stream, "pulsar_txn_created_count", stats.createdCount, cluster,
+                coordinatorId);
+        writeMetric(stream, "pulsar_txn_timeout_count", stats.timeoutCount, cluster,
+                coordinatorId);
+        writeMetric(stream, "pulsar_txn_append_log_count", stats.appendLogCount, cluster,
+                coordinatorId);
         long[] latencyBuckets = stats.executionLatency;
-        metric(stream, cluster, "pulsar_txn_execution_latency_le_10", latencyBuckets[0], coordinatorId);
-        metric(stream, cluster, "pulsar_txn_execution_latency_le_20", latencyBuckets[1], coordinatorId);
-        metric(stream, cluster, "pulsar_txn_execution_latency_le_50", latencyBuckets[2], coordinatorId);
-        metric(stream, cluster, "pulsar_txn_execution_latency_le_100", latencyBuckets[3], coordinatorId);
-        metric(stream, cluster, "pulsar_txn_execution_latency_le_500", latencyBuckets[4], coordinatorId);
-        metric(stream, cluster, "pulsar_txn_execution_latency_le_1000", latencyBuckets[5], coordinatorId);
-        metric(stream, cluster, "pulsar_txn_execution_latency_le_5000", latencyBuckets[6], coordinatorId);
-        metric(stream, cluster, "pulsar_txn_execution_latency_le_15000", latencyBuckets[7], coordinatorId);
-        metric(stream, cluster, "pulsar_txn_execution_latency_le_30000", latencyBuckets[8], coordinatorId);
-        metric(stream, cluster, "pulsar_txn_execution_latency_le_60000", latencyBuckets[9], coordinatorId);
-        metric(stream, cluster, "pulsar_txn_execution_latency_le_300000",
-                latencyBuckets[10], coordinatorId);
-        metric(stream, cluster, "pulsar_txn_execution_latency_le_1500000",
-                latencyBuckets[11], coordinatorId);
-        metric(stream, cluster, "pulsar_txn_execution_latency_le_3000000",
-                latencyBuckets[12], coordinatorId);
-        metric(stream, cluster, "pulsar_txn_execution_latency_le_overflow",
-                latencyBuckets[13], coordinatorId);
+        writeMetric(stream, "pulsar_txn_execution_latency_le_10", latencyBuckets[0], cluster, coordinatorId);
+        writeMetric(stream, "pulsar_txn_execution_latency_le_20", latencyBuckets[1], cluster, coordinatorId);
+        writeMetric(stream, "pulsar_txn_execution_latency_le_50", latencyBuckets[2], cluster, coordinatorId);
+        writeMetric(stream, "pulsar_txn_execution_latency_le_100", latencyBuckets[3], cluster, coordinatorId);
+        writeMetric(stream, "pulsar_txn_execution_latency_le_500", latencyBuckets[4], cluster, coordinatorId);
+        writeMetric(stream, "pulsar_txn_execution_latency_le_1000", latencyBuckets[5], cluster, coordinatorId);
+        writeMetric(stream, "pulsar_txn_execution_latency_le_5000", latencyBuckets[6], cluster, coordinatorId);
+        writeMetric(stream, "pulsar_txn_execution_latency_le_15000", latencyBuckets[7], cluster, coordinatorId);
+        writeMetric(stream, "pulsar_txn_execution_latency_le_30000", latencyBuckets[8], cluster, coordinatorId);
+        writeMetric(stream, "pulsar_txn_execution_latency_le_60000", latencyBuckets[9], cluster, coordinatorId);
+        writeMetric(stream, "pulsar_txn_execution_latency_le_300000", latencyBuckets[10], cluster,
+                coordinatorId);
+        writeMetric(stream, "pulsar_txn_execution_latency_le_1500000", latencyBuckets[11], cluster,
+                coordinatorId);
+        writeMetric(stream, "pulsar_txn_execution_latency_le_3000000", latencyBuckets[12], cluster,
+                coordinatorId);
+        writeMetric(stream, "pulsar_txn_execution_latency_le_overflow", latencyBuckets[13], cluster,
+                coordinatorId);
+    }
+
+    private static void writeMetric(PrometheusMetricStreams stream, String metricName, double value, String cluster,
+                                    long coordinatorId) {
+        stream.writeSample(metricName, value, "cluster", cluster, "coordinator_id", String.valueOf(coordinatorId));
+    }
+
+    private static void writeMetric(PrometheusMetricStreams stream, String metricName, Number value, String cluster,
+                                    String namespace, String topic, String subscription) {
+        stream.writeSample(metricName, value, "cluster", cluster, "namespace", namespace, "topic", topic,
+                "subscription", subscription);
     }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusTextFormatUtil.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusTextFormatUtil.java
index 7550096c2b5..8f704b11e76 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusTextFormatUtil.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusTextFormatUtil.java
@@ -18,13 +18,8 @@
  */
 package org.apache.pulsar.broker.stats.prometheus.metrics;
 
-import io.prometheus.client.Collector;
-import io.prometheus.client.Collector.MetricFamilySamples;
-import io.prometheus.client.Collector.MetricFamilySamples.Sample;
-import io.prometheus.client.CollectorRegistry;
 import java.io.IOException;
 import java.io.Writer;
-import java.util.Enumeration;
 import org.apache.bookkeeper.stats.Counter;
 
 /**
@@ -140,31 +135,4 @@ public class PrometheusTextFormatUtil {
                 .append(success.toString()).append("\"} ")
                 .append(Double.toString(opStat.getSum(success))).append('\n');
     }
-
-    public static void writeMetricsCollectedByPrometheusClient(Writer w, CollectorRegistry registry)
-            throws IOException {
-        Enumeration<MetricFamilySamples> metricFamilySamples = registry.metricFamilySamples();
-        while (metricFamilySamples.hasMoreElements()) {
-            MetricFamilySamples metricFamily = metricFamilySamples.nextElement();
-
-            for (int i = 0; i < metricFamily.samples.size(); i++) {
-                Sample sample = metricFamily.samples.get(i);
-                w.write(sample.name);
-                w.write('{');
-                for (int j = 0; j < sample.labelNames.size(); j++) {
-                    if (j != 0) {
-                        w.write(", ");
-                    }
-                    w.write(sample.labelNames.get(j));
-                    w.write("=\"");
-                    w.write(sample.labelValues.get(j));
-                    w.write('"');
-                }
-
-                w.write("} ");
-                w.write(Collector.doubleToGoString(sample.value));
-                w.write('\n');
-            }
-        }
-    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index f28412ea751..18f7597207e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -50,6 +50,7 @@ import java.util.UUID;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import javax.crypto.SecretKey;
@@ -1397,6 +1398,64 @@ public class PrometheusMetricsTest extends BrokerTestBase {
         consumer2.close();
     }
 
+
+    @Test
+    public void testMetricsGroupedByTypeDefinitions() throws Exception {
+        Producer<byte[]> p1 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();
+        Producer<byte[]> p2 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic2").create();
+        for (int i = 0; i < 10; i++) {
+            String message = "my-message-" + i;
+            p1.send(message.getBytes());
+            p2.send(message.getBytes());
+        }
+
+        ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
+        PrometheusMetricsGenerator.generate(pulsar, false, false, false, statsOut);
+        String metricsStr = statsOut.toString();
+
+        Pattern typePattern = Pattern.compile("^#\\s+TYPE\\s+(\\w+)\\s+(\\w+)");
+        Pattern metricNamePattern = Pattern.compile("^(\\w+)\\{.+");
+
+        AtomicReference<String> currentMetric = new AtomicReference<>();
+        Splitter.on("\n").split(metricsStr).forEach(line -> {
+            if (line.isEmpty()) {
+                return;
+            }
+            if (line.startsWith("#")) {
+                // Get the current type definition
+                Matcher typeMatcher = typePattern.matcher(line);
+                checkArgument(typeMatcher.matches());
+                String metricName = typeMatcher.group(1);
+                currentMetric.set(metricName);
+            } else {
+                Matcher metricMatcher = metricNamePattern.matcher(line);
+                checkArgument(metricMatcher.matches());
+                String metricName = metricMatcher.group(1);
+
+                if (metricName.endsWith("_bucket")) {
+                    metricName = metricName.substring(0, metricName.indexOf("_bucket"));
+                } else if (metricName.endsWith("_count") && !currentMetric.get().endsWith("_count")) {
+                    metricName = metricName.substring(0, metricName.indexOf("_count"));
+                } else if (metricName.endsWith("_sum") && !currentMetric.get().endsWith("_sum")) {
+                    metricName = metricName.substring(0, metricName.indexOf("_sum"));
+                } else if (metricName.endsWith("_total") && !currentMetric.get().endsWith("_total")) {
+                    metricName = metricName.substring(0, metricName.indexOf("_total"));
+                } else if (metricName.endsWith("_created") && !currentMetric.get().endsWith("_created")) {
+                    metricName = metricName.substring(0, metricName.indexOf("_created"));
+                }
+
+                if (!metricName.equals(currentMetric.get())) {
+                    System.out.println(metricsStr);
+                    fail("Metric not grouped under its type definition: " + line);
+                }
+
+            }
+        });
+
+        p1.close();
+        p2.close();
+    }
+
     private void compareCompactionStateCount(List<Metric> cm, double count) {
         assertEquals(cm.size(), 1);
         assertEquals(cm.get(0).tags.get("cluster"), "test");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricStreamsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricStreamsTest.java
new file mode 100644
index 00000000000..15c29a0dc66
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricStreamsTest.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats.prometheus;
+
+import static org.testng.Assert.assertTrue;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import java.io.ByteArrayOutputStream;
+import java.nio.charset.StandardCharsets;
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker")
+public class PrometheusMetricStreamsTest {
+
+    private PrometheusMetricStreams underTest;
+
+    @BeforeMethod(alwaysRun = true)
+    protected void setup() throws Exception {
+        underTest = new PrometheusMetricStreams();
+    }
+
+    @AfterMethod(alwaysRun = true)
+    protected void cleanup() throws Exception {
+        underTest.releaseAll();
+    }
+
+    @Test
+    public void canWriteSampleWithoutLabels() {
+        underTest.writeSample("my-metric", 123);
+
+        String actual = writeToString();
+
+        assertTrue(actual.startsWith("# TYPE my-metric gauge"), "Gauge type line missing");
+        assertTrue(actual.contains("my-metric{} 123"), "Metric line missing");
+    }
+
+    @Test
+    public void canWriteSampleWithLabels() {
+        underTest.writeSample("my-other-metric", 123, "cluster", "local");
+        underTest.writeSample("my-other-metric", 456, "cluster", "local", "namespace", "my-ns");
+
+        String actual = writeToString();
+
+        assertTrue(actual.startsWith("# TYPE my-other-metric gauge"), "Gauge type line missing");
+        assertTrue(actual.contains("my-other-metric{cluster=\"local\"} 123"), "Cluster metric line missing");
+        assertTrue(actual.contains("my-other-metric{cluster=\"local\",namespace=\"my-ns\"} 456"),
+                "Cluster and Namespace metric line missing");
+    }
+
+    private String writeToString() {
+        ByteBuf buffer = ByteBufAllocator.DEFAULT.directBuffer();
+        try {
+            SimpleTextOutputStream stream = new SimpleTextOutputStream(buffer);
+            underTest.flushAllToStream(stream);
+            ByteArrayOutputStream out = new ByteArrayOutputStream();
+            int readIndex = buffer.readerIndex();
+            int readableBytes = buffer.readableBytes();
+            for (int i = 0; i < readableBytes; i++) {
+                out.write(buffer.getByte(readIndex + i));
+            }
+            return out.toString();
+        } finally {
+            buffer.release();
+        }
+    }
+}
\ No newline at end of file
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/SimpleTextOutputStream.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/SimpleTextOutputStream.java
index 9fc4b347c85..9bfbbff0211 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/SimpleTextOutputStream.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/SimpleTextOutputStream.java
@@ -26,8 +26,8 @@ import io.netty.buffer.ByteBuf;
  */
 public class SimpleTextOutputStream {
     private final ByteBuf buffer;
-    private static final char[] hexChars = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e',
-            'f' };
+    private static final char[] hexChars = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e',
+            'f'};
 
     public SimpleTextOutputStream(ByteBuf buffer) {
         this.buffer = buffer;
@@ -131,4 +131,12 @@ public class SimpleTextOutputStream {
         write(r);
         return this;
     }
+
+    public void write(ByteBuf byteBuf) {
+        buffer.writeBytes(byteBuf);
+    }
+
+    public ByteBuf getBuffer() {
+        return buffer;
+    }
 }
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/PrometheusTextFormat.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/PrometheusTextFormat.java
index f7a205c7db0..46d232da3e7 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/PrometheusTextFormat.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/PrometheusTextFormat.java
@@ -37,6 +37,11 @@ public class PrometheusTextFormat {
          */
         while (mfs.hasMoreElements()) {
             Collector.MetricFamilySamples metricFamilySamples = mfs.nextElement();
+            writer.write("# TYPE ");
+            writer.write(metricFamilySamples.name);
+            writer.write(' ');
+            writer.write(metricFamilySamples.type.name().toLowerCase());
+            writer.write('\n');
             for (Collector.MetricFamilySamples.Sample sample : metricFamilySamples.samples) {
                 writer.write(sample.name);
                 if (sample.labelNames.size() > 0) {
@@ -64,19 +69,19 @@ public class PrometheusTextFormat {
         for (int i = 0; i < s.length(); i++) {
             char c = s.charAt(i);
             switch (c) {
-            case '\\':
-                writer.append("\\\\");
-                break;
-            case '\"':
-                writer.append("\\\"");
-                break;
-            case '\n':
-                writer.append("\\n");
-                break;
-            default:
-                writer.append(c);
+                case '\\':
+                    writer.append("\\\\");
+                    break;
+                case '\"':
+                    writer.append("\\\"");
+                    break;
+                case '\n':
+                    writer.append("\\n");
+                    break;
+                default:
+                    writer.append(c);
             }
         }
     }
 
-}
+}
\ No newline at end of file
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerStatsManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerStatsManager.java
index c8b411cbf57..2ad407b2e5e 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerStatsManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerStatsManager.java
@@ -328,6 +328,11 @@ public class WorkerStatsManager {
   }
 
   private void writeMetric(String metricName, long value, StringWriter stream) {
+    stream.write("# TYPE ");
+    stream.write(PULSAR_FUNCTION_WORKER_METRICS_PREFIX);
+    stream.write(metricName);
+    stream.write(" gauge \n");
+
     stream.write(PULSAR_FUNCTION_WORKER_METRICS_PREFIX);
     stream.write(metricName);
     stream.write("{");