You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/09/24 03:44:20 UTC

[pulsar] branch branch-2.9 updated: Revert "[fix][broker][functions-worker] Ensure prometheus metrics are grouped by type (#8407, #13865) (#17618)"

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new 3564f423c48 Revert "[fix][broker][functions-worker] Ensure prometheus metrics are grouped by type (#8407, #13865) (#17618)"
3564f423c48 is described below

commit 3564f423c4893f6f097d8a9c7e9e3d99687ca776
Author: penghui <pe...@apache.org>
AuthorDate: Sat Sep 24 11:43:08 2022 +0800

    Revert "[fix][broker][functions-worker] Ensure prometheus metrics are grouped by type (#8407, #13865) (#17618)"
    
    This reverts commit bc69cfbe9d3cf06fc6e407c284eed45f095b41be.
---
 .../stats/prometheus/AggregatedNamespaceStats.java |   2 +-
 .../stats/prometheus/NamespaceStatsAggregator.java | 354 ++++++------
 .../stats/prometheus/PrometheusMetricStreams.java  |  75 ---
 .../prometheus/PrometheusMetricsGenerator.java     |  60 +--
 .../pulsar/broker/stats/prometheus/TopicStats.java | 598 +++++++++++----------
 .../stats/prometheus/TransactionAggregator.java    | 321 ++++++-----
 .../metrics/PrometheusTextFormatUtil.java          |  32 ++
 .../pulsar/broker/stats/PrometheusMetricsTest.java |  58 --
 .../prometheus/PrometheusMetricStreamsTest.java    |  85 ---
 .../pulsar/common/util/SimpleTextOutputStream.java |  13 +-
 .../instance/stats/PrometheusTextFormat.java       |   5 -
 .../functions/worker/WorkerStatsManager.java       |   5 -
 12 files changed, 730 insertions(+), 878 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
index 1980af91b7b..5610dbab218 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
@@ -96,7 +96,7 @@ public class AggregatedNamespaceStats {
 
         stats.replicationStats.forEach((n, as) -> {
             AggregatedReplicationStats replStats =
-                    replicationStats.computeIfAbsent(n, k -> new AggregatedReplicationStats());
+                    replicationStats.computeIfAbsent(n,  k -> new AggregatedReplicationStats());
             replStats.msgRateIn += as.msgRateIn;
             replStats.msgRateOut += as.msgRateOut;
             replStats.msgThroughputIn += as.msgThroughputIn;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
index 29915f071c0..16e438e2a2e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
@@ -19,11 +19,8 @@
 package org.apache.pulsar.broker.stats.prometheus;
 
 import io.netty.util.concurrent.FastThreadLocal;
-import java.util.HashMap;
-import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.atomic.LongAdder;
-import java.util.function.Function;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.mledger.ManagedLedger;
@@ -35,6 +32,7 @@ import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
 import org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl;
 import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
 import org.apache.pulsar.compaction.CompactedTopicContext;
 import org.apache.pulsar.compaction.Compactor;
 import org.apache.pulsar.compaction.CompactorMXBean;
@@ -42,75 +40,72 @@ import org.apache.pulsar.compaction.CompactorMXBean;
 @Slf4j
 public class NamespaceStatsAggregator {
 
-    private static final FastThreadLocal<AggregatedNamespaceStats> localNamespaceStats =
+    private static FastThreadLocal<AggregatedNamespaceStats> localNamespaceStats =
             new FastThreadLocal<AggregatedNamespaceStats>() {
                 @Override
-                protected AggregatedNamespaceStats initialValue() {
+                protected AggregatedNamespaceStats initialValue() throws Exception {
                     return new AggregatedNamespaceStats();
                 }
             };
 
-    private static final FastThreadLocal<TopicStats> localTopicStats = new FastThreadLocal<TopicStats>() {
+    private static FastThreadLocal<TopicStats> localTopicStats = new FastThreadLocal<TopicStats>() {
         @Override
-        protected TopicStats initialValue() {
+        protected TopicStats initialValue() throws Exception {
             return new TopicStats();
         }
     };
 
     public static void generate(PulsarService pulsar, boolean includeTopicMetrics, boolean includeConsumerMetrics,
-                                boolean includeProducerMetrics, boolean splitTopicAndPartitionIndexLabel,
-                                PrometheusMetricStreams stream) {
+           boolean includeProducerMetrics, boolean splitTopicAndPartitionIndexLabel, SimpleTextOutputStream stream) {
         String cluster = pulsar.getConfiguration().getClusterName();
         AggregatedNamespaceStats namespaceStats = localNamespaceStats.get();
+        TopicStats.resetTypes();
         TopicStats topicStats = localTopicStats.get();
-        Optional<CompactorMXBean> compactorMXBean = getCompactorMXBean(pulsar);
-        LongAdder topicsCount = new LongAdder();
-        Map<String, Long> localNamespaceTopicCount = new HashMap<>();
 
         printDefaultBrokerStats(stream, cluster);
 
+        Optional<CompactorMXBean> compactorMXBean = getCompactorMXBean(pulsar);
+        LongAdder topicsCount = new LongAdder();
         pulsar.getBrokerService().getMultiLayerTopicMap().forEach((namespace, bundlesMap) -> {
             namespaceStats.reset();
             topicsCount.reset();
 
-            bundlesMap.forEach((bundle, topicsMap) -> topicsMap.forEach((name, topic) -> {
-                getTopicStats(topic, topicStats, includeConsumerMetrics, includeProducerMetrics,
-                        pulsar.getConfiguration().isExposePreciseBacklogInPrometheus(),
-                        pulsar.getConfiguration().isExposeSubscriptionBacklogSizeInPrometheus(),
-                        compactorMXBean
-                );
-
-                if (includeTopicMetrics) {
-                    topicsCount.add(1);
-                    TopicStats.printTopicStats(stream, topicStats, compactorMXBean, cluster, namespace, name,
-                            splitTopicAndPartitionIndexLabel);
-                } else {
-                    namespaceStats.updateStats(topicStats);
-                }
-            }));
+            bundlesMap.forEach((bundle, topicsMap) -> {
+                topicsMap.forEach((name, topic) -> {
+                    getTopicStats(topic, topicStats, includeConsumerMetrics, includeProducerMetrics,
+                            pulsar.getConfiguration().isExposePreciseBacklogInPrometheus(),
+                            pulsar.getConfiguration().isExposeSubscriptionBacklogSizeInPrometheus(),
+                            compactorMXBean
+                    );
+
+                    if (includeTopicMetrics) {
+                        topicsCount.add(1);
+                        TopicStats.printTopicStats(stream, cluster, namespace, name, topicStats, compactorMXBean,
+                                splitTopicAndPartitionIndexLabel);
+                    } else {
+                        namespaceStats.updateStats(topicStats);
+                    }
+                });
+            });
 
             if (!includeTopicMetrics) {
-                // Only include namespace level stats if we don't have the per-topic, otherwise we're going to
-                // report the same data twice, and it will make the aggregation difficult
-                printNamespaceStats(stream, namespaceStats, cluster, namespace);
+                // Only include namespace level stats if we don't have the per-topic, otherwise we're going to report
+                // the same data twice, and it will make the aggregation difficult
+                printNamespaceStats(stream, cluster, namespace, namespaceStats);
             } else {
-                localNamespaceTopicCount.put(namespace, topicsCount.sum());
+                printTopicsCountStats(stream, cluster, namespace, topicsCount);
             }
         });
-
-        if (includeTopicMetrics) {
-            printTopicsCountStats(stream, localNamespaceTopicCount, cluster);
-        }
     }
 
     private static Optional<CompactorMXBean> getCompactorMXBean(PulsarService pulsar) {
         Compactor compactor = pulsar.getNullableCompactor();
-        return Optional.ofNullable(compactor).map(Compactor::getStats);
+        return Optional.ofNullable(compactor).map(c -> c.getStats());
     }
 
     private static void getTopicStats(Topic topic, TopicStats stats, boolean includeConsumerMetrics,
-                                      boolean includeProducerMetrics, boolean getPreciseBacklog,
-                                      boolean subscriptionBacklogSize, Optional<CompactorMXBean> compactorMXBean) {
+            boolean includeProducerMetrics, boolean getPreciseBacklog, boolean subscriptionBacklogSize,
+                                      Optional<CompactorMXBean> compactorMXBean) {
         stats.reset();
 
         if (topic instanceof PersistentTopic) {
@@ -272,176 +267,161 @@ public class NamespaceStatsAggregator {
                 });
     }
 
-    private static void printDefaultBrokerStats(PrometheusMetricStreams stream, String cluster) {
+    private static void printDefaultBrokerStats(SimpleTextOutputStream stream, String cluster) {
         // Print metrics with 0 values. This is necessary to have the available brokers being
         // reported in the brokers dashboard even if they don't have any topic or traffic
-        writeMetric(stream, "pulsar_topics_count", 0, cluster);
-        writeMetric(stream, "pulsar_subscriptions_count", 0, cluster);
-        writeMetric(stream, "pulsar_producers_count", 0, cluster);
-        writeMetric(stream, "pulsar_consumers_count", 0, cluster);
-        writeMetric(stream, "pulsar_rate_in", 0, cluster);
-        writeMetric(stream, "pulsar_rate_out", 0, cluster);
-        writeMetric(stream, "pulsar_throughput_in", 0, cluster);
-        writeMetric(stream, "pulsar_throughput_out", 0, cluster);
-        writeMetric(stream, "pulsar_storage_size", 0, cluster);
-        writeMetric(stream, "pulsar_storage_logical_size", 0, cluster);
-        writeMetric(stream, "pulsar_storage_write_rate", 0, cluster);
-        writeMetric(stream, "pulsar_storage_read_rate", 0, cluster);
-        writeMetric(stream, "pulsar_msg_backlog", 0, cluster);
+        metric(stream, cluster, "pulsar_topics_count", 0);
+        metric(stream, cluster, "pulsar_subscriptions_count", 0);
+        metric(stream, cluster, "pulsar_producers_count", 0);
+        metric(stream, cluster, "pulsar_consumers_count", 0);
+        metric(stream, cluster, "pulsar_rate_in", 0);
+        metric(stream, cluster, "pulsar_rate_out", 0);
+        metric(stream, cluster, "pulsar_throughput_in", 0);
+        metric(stream, cluster, "pulsar_throughput_out", 0);
+        metric(stream, cluster, "pulsar_storage_size", 0);
+        metric(stream, cluster, "pulsar_storage_logical_size", 0);
+        metric(stream, cluster, "pulsar_storage_write_rate", 0);
+        metric(stream, cluster, "pulsar_storage_read_rate", 0);
+        metric(stream, cluster, "pulsar_msg_backlog", 0);
     }
 
-    private static void printTopicsCountStats(PrometheusMetricStreams stream, Map<String, Long> namespaceTopicsCount,
-                                              String cluster) {
-        namespaceTopicsCount.forEach(
-                (ns, topicCount) -> writeMetric(stream, "pulsar_topics_count", topicCount, cluster, ns)
-        );
+    private static void printTopicsCountStats(SimpleTextOutputStream stream, String cluster, String namespace,
+                                              LongAdder topicsCount) {
+        metric(stream, cluster, namespace, "pulsar_topics_count", topicsCount.sum());
     }
 
-    private static void printNamespaceStats(PrometheusMetricStreams stream, AggregatedNamespaceStats stats,
-                                            String cluster, String namespace) {
-        writeMetric(stream, "pulsar_topics_count", stats.topicsCount, cluster, namespace);
-        writeMetric(stream, "pulsar_subscriptions_count", stats.subscriptionsCount, cluster,
-                namespace);
-        writeMetric(stream, "pulsar_producers_count", stats.producersCount, cluster, namespace);
-        writeMetric(stream, "pulsar_consumers_count", stats.consumersCount, cluster, namespace);
-
-        writeMetric(stream, "pulsar_rate_in", stats.rateIn, cluster, namespace);
-        writeMetric(stream, "pulsar_rate_out", stats.rateOut, cluster, namespace);
-        writeMetric(stream, "pulsar_throughput_in", stats.throughputIn, cluster, namespace);
-        writeMetric(stream, "pulsar_throughput_out", stats.throughputOut, cluster, namespace);
-        writeMetric(stream, "pulsar_consumer_msg_ack_rate", stats.messageAckRate, cluster, namespace);
-
-        writeMetric(stream, "pulsar_in_bytes_total", stats.bytesInCounter, cluster, namespace);
-        writeMetric(stream, "pulsar_in_messages_total", stats.msgInCounter, cluster, namespace);
-        writeMetric(stream, "pulsar_out_bytes_total", stats.bytesOutCounter, cluster, namespace);
-        writeMetric(stream, "pulsar_out_messages_total", stats.msgOutCounter, cluster, namespace);
-
-        writeMetric(stream, "pulsar_storage_size", stats.managedLedgerStats.storageSize, cluster,
-                namespace);
-        writeMetric(stream, "pulsar_storage_logical_size",
-                stats.managedLedgerStats.storageLogicalSize, cluster, namespace);
-        writeMetric(stream, "pulsar_storage_backlog_size", stats.managedLedgerStats.backlogSize, cluster,
-                namespace);
-        writeMetric(stream, "pulsar_storage_offloaded_size",
-                stats.managedLedgerStats.offloadedStorageUsed, cluster, namespace);
-
-        writeMetric(stream, "pulsar_storage_write_rate", stats.managedLedgerStats.storageWriteRate,
-                cluster, namespace);
-        writeMetric(stream, "pulsar_storage_read_rate", stats.managedLedgerStats.storageReadRate,
-                cluster, namespace);
-
-        writeMetric(stream, "pulsar_subscription_delayed", stats.msgDelayed, cluster, namespace);
-
-        writePulsarMsgBacklog(stream, stats.msgBacklog, cluster, namespace);
+    private static void printNamespaceStats(SimpleTextOutputStream stream, String cluster, String namespace,
+                                            AggregatedNamespaceStats stats) {
+        metric(stream, cluster, namespace, "pulsar_topics_count", stats.topicsCount);
+        metric(stream, cluster, namespace, "pulsar_subscriptions_count", stats.subscriptionsCount);
+        metric(stream, cluster, namespace, "pulsar_producers_count", stats.producersCount);
+        metric(stream, cluster, namespace, "pulsar_consumers_count", stats.consumersCount);
+
+        metric(stream, cluster, namespace, "pulsar_rate_in", stats.rateIn);
+        metric(stream, cluster, namespace, "pulsar_rate_out", stats.rateOut);
+        metric(stream, cluster, namespace, "pulsar_throughput_in", stats.throughputIn);
+        metric(stream, cluster, namespace, "pulsar_throughput_out", stats.throughputOut);
+        metric(stream, cluster, namespace, "pulsar_consumer_msg_ack_rate", stats.messageAckRate);
+
+        metric(stream, cluster, namespace, "pulsar_in_bytes_total", stats.bytesInCounter);
+        metric(stream, cluster, namespace, "pulsar_in_messages_total", stats.msgInCounter);
+        metric(stream, cluster, namespace, "pulsar_out_bytes_total", stats.bytesOutCounter);
+        metric(stream, cluster, namespace, "pulsar_out_messages_total", stats.msgOutCounter);
+
+        metric(stream, cluster, namespace, "pulsar_storage_size", stats.managedLedgerStats.storageSize);
+        metric(stream, cluster, namespace, "pulsar_storage_logical_size", stats.managedLedgerStats.storageLogicalSize);
+        metric(stream, cluster, namespace, "pulsar_storage_backlog_size", stats.managedLedgerStats.backlogSize);
+        metric(stream, cluster, namespace, "pulsar_storage_offloaded_size",
+                stats.managedLedgerStats.offloadedStorageUsed);
+
+        metric(stream, cluster, namespace, "pulsar_storage_write_rate", stats.managedLedgerStats.storageWriteRate);
+        metric(stream, cluster, namespace, "pulsar_storage_read_rate", stats.managedLedgerStats.storageReadRate);
+
+        metric(stream, cluster, namespace, "pulsar_subscription_delayed", stats.msgDelayed);
+
+        metricWithRemoteCluster(stream, cluster, namespace, "pulsar_msg_backlog", "local", stats.msgBacklog);
 
         stats.managedLedgerStats.storageWriteLatencyBuckets.refresh();
         long[] latencyBuckets = stats.managedLedgerStats.storageWriteLatencyBuckets.getBuckets();
-        writeMetric(stream, "pulsar_storage_write_latency_le_0_5", latencyBuckets[0], cluster, namespace);
-        writeMetric(stream, "pulsar_storage_write_latency_le_1", latencyBuckets[1], cluster, namespace);
-        writeMetric(stream, "pulsar_storage_write_latency_le_5", latencyBuckets[2], cluster, namespace);
-        writeMetric(stream, "pulsar_storage_write_latency_le_10", latencyBuckets[3], cluster, namespace);
-        writeMetric(stream, "pulsar_storage_write_latency_le_20", latencyBuckets[4], cluster, namespace);
-        writeMetric(stream, "pulsar_storage_write_latency_le_50", latencyBuckets[5], cluster, namespace);
-        writeMetric(stream, "pulsar_storage_write_latency_le_100", latencyBuckets[6], cluster, namespace);
-        writeMetric(stream, "pulsar_storage_write_latency_le_200", latencyBuckets[7], cluster, namespace);
-        writeMetric(stream, "pulsar_storage_write_latency_le_1000", latencyBuckets[8], cluster, namespace);
-        writeMetric(stream, "pulsar_storage_write_latency_overflow", latencyBuckets[9], cluster, namespace);
-        writeMetric(stream, "pulsar_storage_write_latency_count",
-                stats.managedLedgerStats.storageWriteLatencyBuckets.getCount(), cluster, namespace);
-        writeMetric(stream, "pulsar_storage_write_latency_sum",
-                stats.managedLedgerStats.storageWriteLatencyBuckets.getSum(), cluster, namespace);
+        metric(stream, cluster, namespace, "pulsar_storage_write_latency_le_0_5", latencyBuckets[0]);
+        metric(stream, cluster, namespace, "pulsar_storage_write_latency_le_1", latencyBuckets[1]);
+        metric(stream, cluster, namespace, "pulsar_storage_write_latency_le_5", latencyBuckets[2]);
+        metric(stream, cluster, namespace, "pulsar_storage_write_latency_le_10", latencyBuckets[3]);
+        metric(stream, cluster, namespace, "pulsar_storage_write_latency_le_20", latencyBuckets[4]);
+        metric(stream, cluster, namespace, "pulsar_storage_write_latency_le_50", latencyBuckets[5]);
+        metric(stream, cluster, namespace, "pulsar_storage_write_latency_le_100", latencyBuckets[6]);
+        metric(stream, cluster, namespace, "pulsar_storage_write_latency_le_200", latencyBuckets[7]);
+        metric(stream, cluster, namespace, "pulsar_storage_write_latency_le_1000", latencyBuckets[8]);
+        metric(stream, cluster, namespace, "pulsar_storage_write_latency_overflow", latencyBuckets[9]);
+        metric(stream, cluster, namespace, "pulsar_storage_write_latency_count",
+                stats.managedLedgerStats.storageWriteLatencyBuckets.getCount());
+        metric(stream, cluster, namespace, "pulsar_storage_write_latency_sum",
+                stats.managedLedgerStats.storageWriteLatencyBuckets.getSum());
 
         stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.refresh();
-        long[] ledgerWriteLatencyBuckets = stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets();
-        writeMetric(stream, "pulsar_storage_ledger_write_latency_le_0_5", ledgerWriteLatencyBuckets[0],
-                cluster, namespace);
-        writeMetric(stream, "pulsar_storage_ledger_write_latency_le_1", ledgerWriteLatencyBuckets[1],
-                cluster, namespace);
-        writeMetric(stream, "pulsar_storage_ledger_write_latency_le_5", ledgerWriteLatencyBuckets[2],
-                cluster, namespace);
-        writeMetric(stream, "pulsar_storage_ledger_write_latency_le_10", ledgerWriteLatencyBuckets[3],
-                cluster, namespace);
-        writeMetric(stream, "pulsar_storage_ledger_write_latency_le_20", ledgerWriteLatencyBuckets[4],
-                cluster, namespace);
-        writeMetric(stream, "pulsar_storage_ledger_write_latency_le_50", ledgerWriteLatencyBuckets[5],
-                cluster, namespace);
-        writeMetric(stream, "pulsar_storage_ledger_write_latency_le_100", ledgerWriteLatencyBuckets[6],
-                cluster, namespace);
-        writeMetric(stream, "pulsar_storage_ledger_write_latency_le_200", ledgerWriteLatencyBuckets[7],
-                cluster, namespace);
-        writeMetric(stream, "pulsar_storage_ledger_write_latency_le_1000", ledgerWriteLatencyBuckets[8],
-                cluster, namespace);
-        writeMetric(stream, "pulsar_storage_ledger_write_latency_overflow", ledgerWriteLatencyBuckets[9],
-                cluster, namespace);
-        writeMetric(stream, "pulsar_storage_ledger_write_latency_count",
-                stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getCount(), cluster, namespace);
-        writeMetric(stream, "pulsar_storage_ledger_write_latency_sum",
-                stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getSum(), cluster, namespace);
+        long[] ledgerWritelatencyBuckets = stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets();
+        metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_le_0_5", ledgerWritelatencyBuckets[0]);
+        metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_le_1", ledgerWritelatencyBuckets[1]);
+        metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_le_5", ledgerWritelatencyBuckets[2]);
+        metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_le_10", ledgerWritelatencyBuckets[3]);
+        metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_le_20", ledgerWritelatencyBuckets[4]);
+        metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_le_50", ledgerWritelatencyBuckets[5]);
+        metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_le_100", ledgerWritelatencyBuckets[6]);
+        metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_le_200", ledgerWritelatencyBuckets[7]);
+        metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_le_1000", ledgerWritelatencyBuckets[8]);
+        metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_overflow",
+                ledgerWritelatencyBuckets[9]);
+        metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_count",
+                stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getCount());
+        metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_sum",
+                stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getSum());
 
         stats.managedLedgerStats.entrySizeBuckets.refresh();
         long[] entrySizeBuckets = stats.managedLedgerStats.entrySizeBuckets.getBuckets();
-        writeMetric(stream, "pulsar_entry_size_le_128", entrySizeBuckets[0], cluster, namespace);
-        writeMetric(stream, "pulsar_entry_size_le_512", entrySizeBuckets[1], cluster, namespace);
-        writeMetric(stream, "pulsar_entry_size_le_1_kb", entrySizeBuckets[2], cluster, namespace);
-        writeMetric(stream, "pulsar_entry_size_le_2_kb", entrySizeBuckets[3], cluster, namespace);
-        writeMetric(stream, "pulsar_entry_size_le_4_kb", entrySizeBuckets[4], cluster, namespace);
-        writeMetric(stream, "pulsar_entry_size_le_16_kb", entrySizeBuckets[5], cluster, namespace);
-        writeMetric(stream, "pulsar_entry_size_le_100_kb", entrySizeBuckets[6], cluster, namespace);
-        writeMetric(stream, "pulsar_entry_size_le_1_mb", entrySizeBuckets[7], cluster, namespace);
-        writeMetric(stream, "pulsar_entry_size_le_overflow", entrySizeBuckets[8], cluster, namespace);
-        writeMetric(stream, "pulsar_entry_size_count", stats.managedLedgerStats.entrySizeBuckets.getCount(),
-                cluster, namespace);
-        writeMetric(stream, "pulsar_entry_size_sum", stats.managedLedgerStats.entrySizeBuckets.getSum(),
-                cluster, namespace);
-
-        writeReplicationStat(stream, "pulsar_replication_rate_in", stats,
-                replStats -> replStats.msgRateIn, cluster, namespace);
-        writeReplicationStat(stream, "pulsar_replication_rate_out", stats,
-                replStats -> replStats.msgRateOut, cluster, namespace);
-        writeReplicationStat(stream, "pulsar_replication_throughput_in", stats,
-                replStats -> replStats.msgThroughputIn, cluster, namespace);
-        writeReplicationStat(stream, "pulsar_replication_throughput_out", stats,
-                replStats -> replStats.msgThroughputOut, cluster, namespace);
-        writeReplicationStat(stream, "pulsar_replication_backlog", stats,
-                replStats -> replStats.replicationBacklog, cluster, namespace);
-        writeReplicationStat(stream, "pulsar_replication_connected_count", stats,
-                replStats -> replStats.connectedCount, cluster, namespace);
-        writeReplicationStat(stream, "pulsar_replication_rate_expired", stats,
-                replStats -> replStats.msgRateExpired, cluster, namespace);
-        writeReplicationStat(stream, "pulsar_replication_delay_in_seconds", stats,
-                replStats -> replStats.replicationDelayInSeconds, cluster, namespace);
+        metric(stream, cluster, namespace, "pulsar_entry_size_le_128", entrySizeBuckets[0]);
+        metric(stream, cluster, namespace, "pulsar_entry_size_le_512", entrySizeBuckets[1]);
+        metric(stream, cluster, namespace, "pulsar_entry_size_le_1_kb", entrySizeBuckets[2]);
+        metric(stream, cluster, namespace, "pulsar_entry_size_le_2_kb", entrySizeBuckets[3]);
+        metric(stream, cluster, namespace, "pulsar_entry_size_le_4_kb", entrySizeBuckets[4]);
+        metric(stream, cluster, namespace, "pulsar_entry_size_le_16_kb", entrySizeBuckets[5]);
+        metric(stream, cluster, namespace, "pulsar_entry_size_le_100_kb", entrySizeBuckets[6]);
+        metric(stream, cluster, namespace, "pulsar_entry_size_le_1_mb", entrySizeBuckets[7]);
+        metric(stream, cluster, namespace, "pulsar_entry_size_le_overflow", entrySizeBuckets[8]);
+        metric(stream, cluster, namespace, "pulsar_entry_size_count",
+                stats.managedLedgerStats.entrySizeBuckets.getCount());
+        metric(stream, cluster, namespace, "pulsar_entry_size_sum",
+                stats.managedLedgerStats.entrySizeBuckets.getSum());
+
+        if (!stats.replicationStats.isEmpty()) {
+            stats.replicationStats.forEach((remoteCluster, replStats) -> {
+                metricWithRemoteCluster(stream, cluster, namespace, "pulsar_replication_rate_in", remoteCluster,
+                        replStats.msgRateIn);
+                metricWithRemoteCluster(stream, cluster, namespace, "pulsar_replication_rate_out", remoteCluster,
+                        replStats.msgRateOut);
+                metricWithRemoteCluster(stream, cluster, namespace, "pulsar_replication_throughput_in", remoteCluster,
+                        replStats.msgThroughputIn);
+                metricWithRemoteCluster(stream, cluster, namespace, "pulsar_replication_throughput_out", remoteCluster,
+                        replStats.msgThroughputOut);
+                metricWithRemoteCluster(stream, cluster, namespace, "pulsar_replication_backlog", remoteCluster,
+                        replStats.replicationBacklog);
+                metricWithRemoteCluster(stream, cluster, namespace, "pulsar_replication_connected_count", remoteCluster,
+                        replStats.connectedCount);
+                metricWithRemoteCluster(stream, cluster, namespace, "pulsar_replication_rate_expired", remoteCluster,
+                        replStats.msgRateExpired);
+                metricWithRemoteCluster(stream, cluster, namespace, "pulsar_replication_delay_in_seconds",
+                        remoteCluster, replStats.replicationDelayInSeconds);
+            });
+        }
     }
 
-    private static void writePulsarMsgBacklog(PrometheusMetricStreams stream, Number value,
-                                              String cluster, String namespace) {
-        stream.writeSample("pulsar_msg_backlog", value, "cluster", cluster, "namespace", namespace,
-                "remote_cluster",
-                "local");
+    private static void metric(SimpleTextOutputStream stream, String cluster, String name,
+            long value) {
+        TopicStats.metricType(stream, name);
+        stream.write(name)
+                .write("{cluster=\"").write(cluster).write("\"} ")
+                .write(value).write(' ').write(System.currentTimeMillis())
+                .write('\n');
     }
 
-    private static void writeMetric(PrometheusMetricStreams stream, String metricName, Number value,
-                                    String cluster) {
-        stream.writeSample(metricName, value, "cluster", cluster);
+    private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String name,
+                               long value) {
+        TopicStats.metricType(stream, name);
+        stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace).write("\"} ");
+        stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
     }
 
-    private static void writeMetric(PrometheusMetricStreams stream, String metricName, Number value, String cluster,
-                                    String namespace) {
-        stream.writeSample(metricName, value, "cluster", cluster, "namespace", namespace);
+    private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String name,
+                               double value) {
+        TopicStats.metricType(stream, name);
+        stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace).write("\"} ");
+        stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
     }
 
-    private static void writeReplicationStat(PrometheusMetricStreams stream, String metricName,
-                                             AggregatedNamespaceStats namespaceStats,
-                                             Function<AggregatedReplicationStats, Number> sampleValueFunction,
-                                             String cluster, String namespace) {
-        if (!namespaceStats.replicationStats.isEmpty()) {
-            namespaceStats.replicationStats.forEach((remoteCluster, replStats) ->
-                    stream.writeSample(metricName, sampleValueFunction.apply(replStats),
-                            "cluster", cluster,
-                            "namespace", namespace,
-                            "remote_cluster", remoteCluster)
-            );
-        }
+    private static void metricWithRemoteCluster(SimpleTextOutputStream stream, String cluster, String namespace,
+                                                String name, String remoteCluster, double value) {
+        TopicStats.metricType(stream, name);
+        stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace);
+        stream.write("\",remote_cluster=\"").write(remoteCluster).write("\"} ");
+        stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
     }
-
-
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricStreams.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricStreams.java
deleted file mode 100644
index 6b6b972c175..00000000000
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricStreams.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker.stats.prometheus;
-
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
-import org.apache.pulsar.common.util.SimpleTextOutputStream;
-
-/**
- * Helper class to ensure that metrics of the same name are grouped together under the same TYPE header when written.
- * Those are the requirements of the
- * <a href="https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md#grouping-and-sorting">Prometheus Exposition Format</a>.
- */
-public class PrometheusMetricStreams {
-    private final Map<String, SimpleTextOutputStream> metricStreamMap = new HashMap<>();
-
-    /**
-     * Write the given metric and sample value to the stream. Will write #TYPE header if metric not seen before.
-     * @param metricName name of the metric.
-     * @param value value of the sample
-     * @param labelsAndValuesArray varargs of label and label value
-     */
-    void writeSample(String metricName, Number value, String... labelsAndValuesArray) {
-        SimpleTextOutputStream stream = initGaugeType(metricName);
-        stream.write(metricName).write('{');
-        for (int i = 0; i < labelsAndValuesArray.length; i += 2) {
-            stream.write(labelsAndValuesArray[i]).write("=\"").write(labelsAndValuesArray[i + 1]).write('\"');
-            if (i + 2 != labelsAndValuesArray.length) {
-                stream.write(',');
-            }
-        }
-        stream.write("} ").write(value).write(' ').write(System.currentTimeMillis()).write('\n');
-    }
-
-    /**
-     * Flush all the stored metrics to the supplied stream.
-     * @param stream the stream to write to.
-     */
-    void flushAllToStream(SimpleTextOutputStream stream) {
-        metricStreamMap.values().forEach(s -> stream.write(s.getBuffer()));
-    }
-
-    /**
-     * Release all the streams to clean up resources.
-     */
-    void releaseAll() {
-        metricStreamMap.values().forEach(s -> s.getBuffer().release());
-        metricStreamMap.clear();
-    }
-
-    private SimpleTextOutputStream initGaugeType(String metricName) {
-        return metricStreamMap.computeIfAbsent(metricName, s -> {
-            SimpleTextOutputStream stream = new SimpleTextOutputStream(PulsarByteBufAllocator.DEFAULT.directBuffer());
-            stream.write("# TYPE ").write(metricName).write(" gauge\n");
-            return stream;
-        });
-    }
-}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
index a993d1edf3a..cd6afd1535d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
@@ -52,8 +52,7 @@ import org.apache.pulsar.common.util.SimpleTextOutputStream;
 /**
  * Generate metrics aggregated at the namespace level and optionally at a topic level and formats them out
  * in a text format suitable to be consumed by Prometheus.
- * Format specification can be found at <a
- * href="https://prometheus.io/docs/instrumenting/exposition_formats/">Exposition Formats</a>
+ * Format specification can be found at {@link https://prometheus.io/docs/instrumenting/exposition_formats/}
  */
 public class PrometheusMetricsGenerator {
 
@@ -87,44 +86,38 @@ public class PrometheusMetricsGenerator {
     }
 
     public static void generate(PulsarService pulsar, boolean includeTopicMetrics, boolean includeConsumerMetrics,
-                                boolean includeProducerMetrics, OutputStream out) throws IOException {
+        boolean includeProducerMetrics, OutputStream out) throws IOException {
         generate(pulsar, includeTopicMetrics, includeConsumerMetrics, includeProducerMetrics, false, out, null);
     }
 
     public static void generate(PulsarService pulsar, boolean includeTopicMetrics, boolean includeConsumerMetrics,
-                                boolean includeProducerMetrics, boolean splitTopicAndPartitionIndexLabel,
-                                OutputStream out) throws IOException {
+        boolean includeProducerMetrics, boolean splitTopicAndPartitionIndexLabel,
+        OutputStream out) throws IOException {
         generate(pulsar, includeTopicMetrics, includeConsumerMetrics, includeProducerMetrics,
                 splitTopicAndPartitionIndexLabel, out, null);
     }
 
     public static void generate(PulsarService pulsar, boolean includeTopicMetrics, boolean includeConsumerMetrics,
-                                boolean includeProducerMetrics, boolean splitTopicAndPartitionIndexLabel,
-                                OutputStream out,
-                                List<PrometheusRawMetricsProvider> metricsProviders)
-            throws IOException {
+        boolean includeProducerMetrics, boolean splitTopicAndPartitionIndexLabel, OutputStream out,
+        List<PrometheusRawMetricsProvider> metricsProviders)
+        throws IOException {
         ByteBuf buf = ByteBufAllocator.DEFAULT.heapBuffer();
-        boolean exceptionHappens = false;
-        //Used in namespace/topic and transaction aggregators as share metric names
-        PrometheusMetricStreams metricStreams = new PrometheusMetricStreams();
         try {
             SimpleTextOutputStream stream = new SimpleTextOutputStream(buf);
 
             generateSystemMetrics(stream, pulsar.getConfiguration().getClusterName());
 
             NamespaceStatsAggregator.generate(pulsar, includeTopicMetrics, includeConsumerMetrics,
-                    includeProducerMetrics, splitTopicAndPartitionIndexLabel, metricStreams);
+                    includeProducerMetrics, splitTopicAndPartitionIndexLabel, stream);
 
             if (pulsar.getWorkerServiceOpt().isPresent()) {
                 pulsar.getWorkerService().generateFunctionsStats(stream);
             }
 
             if (pulsar.getConfiguration().isTransactionCoordinatorEnabled()) {
-                TransactionAggregator.generate(pulsar, metricStreams, includeTopicMetrics);
+                TransactionAggregator.generate(pulsar, stream, includeTopicMetrics);
             }
 
-            metricStreams.flushAllToStream(stream);
-
             generateBrokerBasicMetrics(pulsar, stream);
 
             generateManagedLedgerBookieClientMetrics(pulsar, stream);
@@ -136,12 +129,7 @@ public class PrometheusMetricsGenerator {
             }
             out.write(buf.array(), buf.arrayOffset(), buf.readableBytes());
         } finally {
-            //release all the metrics buffers
-            metricStreams.releaseAll();
-            //if exception happens, release buffer
-            if (exceptionHappens) {
-                buf.release();
-            }
+            buf.release();
         }
     }
 
@@ -154,17 +142,17 @@ public class PrometheusMetricsGenerator {
         if (pulsar.getConfiguration().isExposeManagedLedgerMetricsInPrometheus()) {
             // generate managedLedger metrics
             parseMetricsToPrometheusMetrics(new ManagedLedgerMetrics(pulsar).generate(),
-                    clusterName, Collector.Type.GAUGE, stream);
+                clusterName, Collector.Type.GAUGE, stream);
         }
 
         if (pulsar.getConfiguration().isExposeManagedCursorMetricsInPrometheus()) {
             // generate managedCursor metrics
             parseMetricsToPrometheusMetrics(new ManagedCursorMetrics(pulsar).generate(),
-                    clusterName, Collector.Type.GAUGE, stream);
+                clusterName, Collector.Type.GAUGE, stream);
         }
 
         parseMetricsToPrometheusMetrics(Collections.singletonList(pulsar.getBrokerService()
-                        .getPulsarStats().getBrokerOperabilityMetrics().generateConnectionMetrics()),
+                .getPulsarStats().getBrokerOperabilityMetrics().generateConnectionMetrics()),
                 clusterName, Collector.Type.GAUGE, stream);
 
         // generate loadBalance metrics
@@ -279,17 +267,17 @@ public class PrometheusMetricsGenerator {
 
     static String getTypeStr(Collector.Type type) {
         switch (type) {
-            case COUNTER:
-                return "counter";
-            case GAUGE:
-                return "gauge";
-            case SUMMARY:
-                return "summary";
-            case HISTOGRAM:
-                return "histogram";
-            case UNTYPED:
-            default:
-                return "untyped";
+        case COUNTER:
+            return "counter";
+        case GAUGE:
+            return "gauge";
+        case SUMMARY        :
+            return "summary";
+        case HISTOGRAM:
+            return "histogram";
+        case UNTYPED:
+        default:
+            return "untyped";
         }
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
index e91521aff55..e6e5883847d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
@@ -23,12 +23,12 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 import org.apache.bookkeeper.mledger.util.StatsBuckets;
-import org.apache.commons.lang3.ArrayUtils;
-import org.apache.pulsar.broker.service.Consumer;
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
 import org.apache.pulsar.compaction.CompactionRecord;
 import org.apache.pulsar.compaction.CompactorMXBean;
 
 class TopicStats {
+
     int subscriptionsCount;
     int producersCount;
     int consumersCount;
@@ -43,6 +43,7 @@ class TopicStats {
     double averageMsgSize;
 
     public long msgBacklog;
+
     long publishRateLimitedTimes;
 
     long backlogQuotaLimit;
@@ -54,6 +55,9 @@ class TopicStats {
     Map<String, AggregatedSubscriptionStats> subscriptionStats = new HashMap<>();
     Map<String, AggregatedProducerStats> producerStats = new HashMap<>();
 
+    // Used for tracking duplicate TYPE definitions
+    static Map<String, String> metricWithTypeDefinition = new HashMap<>();
+
     // For compaction
     long compactionRemovedEventCount;
     long compactionSucceedCount;
@@ -99,340 +103,378 @@ class TopicStats {
         compactionLatencyBuckets.reset();
     }
 
-    public static void printTopicStats(PrometheusMetricStreams stream, TopicStats stats,
-                                       Optional<CompactorMXBean> compactorMXBean, String cluster, String namespace,
-                                       String topic, boolean splitTopicAndPartitionIndexLabel) {
-        writeMetric(stream, "pulsar_subscriptions_count", stats.subscriptionsCount,
-                cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
-        writeMetric(stream, "pulsar_producers_count", stats.producersCount,
-                cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
-        writeMetric(stream, "pulsar_consumers_count", stats.consumersCount,
-                cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
-
-        writeMetric(stream, "pulsar_rate_in", stats.rateIn,
-                cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
-        writeMetric(stream, "pulsar_rate_out", stats.rateOut,
-                cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
-        writeMetric(stream, "pulsar_throughput_in", stats.throughputIn,
-                cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
-        writeMetric(stream, "pulsar_throughput_out", stats.throughputOut,
-                cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
-        writeMetric(stream, "pulsar_average_msg_size", stats.averageMsgSize,
-                cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
-
-        writeMetric(stream, "pulsar_storage_size", stats.managedLedgerStats.storageSize,
-                cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
-        writeMetric(stream, "pulsar_storage_logical_size",
-                stats.managedLedgerStats.storageLogicalSize, cluster, namespace, topic,
-                splitTopicAndPartitionIndexLabel);
-        writeMetric(stream, "pulsar_msg_backlog", stats.msgBacklog,
-                cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
-        writeMetric(stream, "pulsar_storage_write_rate", stats.managedLedgerStats.storageWriteRate,
-                cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
-        writeMetric(stream, "pulsar_storage_read_rate", stats.managedLedgerStats.storageReadRate,
-                cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
-        writeMetric(stream, "pulsar_storage_backlog_size", stats.managedLedgerStats.backlogSize,
-                cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
-        writeMetric(stream, "pulsar_publish_rate_limit_times", stats.publishRateLimitedTimes,
-                cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
-        writeMetric(stream, "pulsar_storage_offloaded_size", stats.managedLedgerStats
-                .offloadedStorageUsed, cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
-        writeMetric(stream, "pulsar_storage_backlog_quota_limit", stats.backlogQuotaLimit,
-                cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
-        writeMetric(stream, "pulsar_storage_backlog_quota_limit_time", stats.backlogQuotaLimitTime,
-                cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+    static void resetTypes() {
+        metricWithTypeDefinition.clear();
+    }
+
+    static void printTopicStats(SimpleTextOutputStream stream, String cluster, String namespace, String topic,
+                                TopicStats stats, Optional<CompactorMXBean> compactorMXBean,
+                                boolean splitTopicAndPartitionIndexLabel) {
+        metric(stream, cluster, namespace, topic, "pulsar_subscriptions_count", stats.subscriptionsCount,
+                splitTopicAndPartitionIndexLabel);
+        metric(stream, cluster, namespace, topic, "pulsar_producers_count", stats.producersCount,
+                splitTopicAndPartitionIndexLabel);
+        metric(stream, cluster, namespace, topic, "pulsar_consumers_count", stats.consumersCount,
+                splitTopicAndPartitionIndexLabel);
+
+        metric(stream, cluster, namespace, topic, "pulsar_rate_in", stats.rateIn,
+                splitTopicAndPartitionIndexLabel);
+        metric(stream, cluster, namespace, topic, "pulsar_rate_out", stats.rateOut,
+                splitTopicAndPartitionIndexLabel);
+        metric(stream, cluster, namespace, topic, "pulsar_throughput_in", stats.throughputIn,
+                splitTopicAndPartitionIndexLabel);
+        metric(stream, cluster, namespace, topic, "pulsar_throughput_out", stats.throughputOut,
+                splitTopicAndPartitionIndexLabel);
+        metric(stream, cluster, namespace, topic, "pulsar_average_msg_size", stats.averageMsgSize,
+                splitTopicAndPartitionIndexLabel);
+
+        metric(stream, cluster, namespace, topic, "pulsar_storage_size", stats.managedLedgerStats.storageSize,
+                splitTopicAndPartitionIndexLabel);
+        metric(stream, cluster, namespace, topic, "pulsar_storage_logical_size",
+                stats.managedLedgerStats.storageLogicalSize, splitTopicAndPartitionIndexLabel);
+        metric(stream, cluster, namespace, topic, "pulsar_msg_backlog", stats.msgBacklog,
+                splitTopicAndPartitionIndexLabel);
+        metric(stream, cluster, namespace, topic, "pulsar_storage_write_rate",
+                stats.managedLedgerStats.storageWriteRate, splitTopicAndPartitionIndexLabel);
+        metric(stream, cluster, namespace, topic, "pulsar_storage_read_rate", stats.managedLedgerStats.storageReadRate,
+                splitTopicAndPartitionIndexLabel);
+        metric(stream, cluster, namespace, topic, "pulsar_storage_backlog_size",
+                stats.managedLedgerStats.backlogSize, splitTopicAndPartitionIndexLabel);
+        metric(stream, cluster, namespace, topic, "pulsar_publish_rate_limit_times", stats.publishRateLimitedTimes,
+                splitTopicAndPartitionIndexLabel);
+        metric(stream, cluster, namespace, topic, "pulsar_storage_offloaded_size", stats.managedLedgerStats
+                .offloadedStorageUsed, splitTopicAndPartitionIndexLabel);
+        metric(stream, cluster, namespace, topic, "pulsar_storage_backlog_quota_limit", stats.backlogQuotaLimit,
+                splitTopicAndPartitionIndexLabel);
+        metric(stream, cluster, namespace, topic, "pulsar_storage_backlog_quota_limit_time",
+                stats.backlogQuotaLimitTime, splitTopicAndPartitionIndexLabel);
 
         long[] latencyBuckets = stats.managedLedgerStats.storageWriteLatencyBuckets.getBuckets();
-        writeMetric(stream, "pulsar_storage_write_latency_le_0_5",
-                latencyBuckets[0], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
-        writeMetric(stream, "pulsar_storage_write_latency_le_1",
-                latencyBuckets[1], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
-        writeMetric(stream, "pulsar_storage_write_latency_le_5",
-                latencyBuckets[2], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
-        writeMetric(stream, "pulsar_storage_write_latency_le_10",
-                latencyBuckets[3], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
-        writeMetric(stream, "pulsar_storage_write_latency_le_20",
-                latencyBuckets[4], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
-        writeMetric(stream, "pulsar_storage_write_latency_le_50",
-                latencyBuckets[5], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
-        writeMetric(stream, "pulsar_storage_write_latency_le_100",
-                latencyBuckets[6], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
-        writeMetric(stream, "pulsar_storage_write_latency_le_200",
-                latencyBuckets[7], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
-        writeMetric(stream, "pulsar_storage_write_latency_le_1000",
-                latencyBuckets[8], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
-        writeMetric(stream, "pulsar_storage_write_latency_overflow",
-                latencyBuckets[9], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
-        writeMetric(stream, "pulsar_storage_write_latency_count",
-                stats.managedLedgerStats.storageWriteLatencyBuckets.getCount(),
-                cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
-        writeMetric(stream, "pulsar_storage_write_latency_sum",
-                stats.managedLedgerStats.storageWriteLatencyBuckets.getSum(), cluster, namespace, topic,
+        metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_0_5", latencyBuckets[0],
+                splitTopicAndPartitionIndexLabel);
+        metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_1", latencyBuckets[1],
                 splitTopicAndPartitionIndexLabel);
+        metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_5", latencyBuckets[2],
+                splitTopicAndPartitionIndexLabel);
+        metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_10", latencyBuckets[3],
+                splitTopicAndPartitionIndexLabel);
+        metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_20", latencyBuckets[4],
+                splitTopicAndPartitionIndexLabel);
+        metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_50", latencyBuckets[5],
+                splitTopicAndPartitionIndexLabel);
+        metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_100", latencyBuckets[6],
+                splitTopicAndPartitionIndexLabel);
+        metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_200", latencyBuckets[7],
+                splitTopicAndPartitionIndexLabel);
+        metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_1000", latencyBuckets[8],
+                splitTopicAndPartitionIndexLabel);
+        metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_overflow", latencyBuckets[9],
+                splitTopicAndPartitionIndexLabel);
+        metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_count",
+                stats.managedLedgerStats.storageWriteLatencyBuckets.getCount(), splitTopicAndPartitionIndexLabel);
+        metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_sum",
+                stats.managedLedgerStats.storageWriteLatencyBuckets.getSum(), splitTopicAndPartitionIndexLabel);
 
         long[] ledgerWriteLatencyBuckets = stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets();
-        writeMetric(stream, "pulsar_storage_ledger_write_latency_le_0_5",
-                ledgerWriteLatencyBuckets[0], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
-        writeMetric(stream, "pulsar_storage_ledger_write_latency_le_1",
-                ledgerWriteLatencyBuckets[1], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
-        writeMetric(stream, "pulsar_storage_ledger_write_latency_le_5",
-                ledgerWriteLatencyBuckets[2], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
-        writeMetric(stream, "pulsar_storage_ledger_write_latency_le_10",
-                ledgerWriteLatencyBuckets[3], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
-        writeMetric(stream, "pulsar_storage_ledger_write_latency_le_20",
-                ledgerWriteLatencyBuckets[4], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
-        writeMetric(stream, "pulsar_storage_ledger_write_latency_le_50",
-                ledgerWriteLatencyBuckets[5], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
-        writeMetric(stream, "pulsar_storage_ledger_write_latency_le_100",
-                ledgerWriteLatencyBuckets[6], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
-        writeMetric(stream, "pulsar_storage_ledger_write_latency_le_200",
-                ledgerWriteLatencyBuckets[7], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
-        writeMetric(stream, "pulsar_storage_ledger_write_latency_le_1000",
-                ledgerWriteLatencyBuckets[8], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
-        writeMetric(stream, "pulsar_storage_ledger_write_latency_overflow",
-                ledgerWriteLatencyBuckets[9], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
-        writeMetric(stream, "pulsar_storage_ledger_write_latency_count",
+        metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_0_5",
+                ledgerWriteLatencyBuckets[0], splitTopicAndPartitionIndexLabel);
+        metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_1",
+                ledgerWriteLatencyBuckets[1], splitTopicAndPartitionIndexLabel);
+        metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_5",
+                ledgerWriteLatencyBuckets[2], splitTopicAndPartitionIndexLabel);
+        metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_10",
+                ledgerWriteLatencyBuckets[3], splitTopicAndPartitionIndexLabel);
+        metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_20",
+                ledgerWriteLatencyBuckets[4], splitTopicAndPartitionIndexLabel);
+        metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_50",
+                ledgerWriteLatencyBuckets[5], splitTopicAndPartitionIndexLabel);
+        metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_100",
+                ledgerWriteLatencyBuckets[6], splitTopicAndPartitionIndexLabel);
+        metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_200",
+                ledgerWriteLatencyBuckets[7], splitTopicAndPartitionIndexLabel);
+        metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_1000",
+                ledgerWriteLatencyBuckets[8], splitTopicAndPartitionIndexLabel);
+        metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_overflow",
+                ledgerWriteLatencyBuckets[9], splitTopicAndPartitionIndexLabel);
+        metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_count",
                 stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getCount(),
-                cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
-        writeMetric(stream, "pulsar_storage_ledger_write_latency_sum",
+                splitTopicAndPartitionIndexLabel);
+        metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_sum",
                 stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getSum(),
-                cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+                splitTopicAndPartitionIndexLabel);
 
         long[] entrySizeBuckets = stats.managedLedgerStats.entrySizeBuckets.getBuckets();
-        writeMetric(stream, "pulsar_entry_size_le_128", entrySizeBuckets[0], cluster, namespace, topic,
+        metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_128", entrySizeBuckets[0],
                 splitTopicAndPartitionIndexLabel);
-        writeMetric(stream, "pulsar_entry_size_le_512", entrySizeBuckets[1], cluster, namespace, topic,
+        metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_512", entrySizeBuckets[1],
                 splitTopicAndPartitionIndexLabel);
-        writeMetric(stream, "pulsar_entry_size_le_1_kb", entrySizeBuckets[2], cluster, namespace, topic,
+        metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_1_kb", entrySizeBuckets[2],
                 splitTopicAndPartitionIndexLabel);
-        writeMetric(stream, "pulsar_entry_size_le_2_kb", entrySizeBuckets[3], cluster, namespace, topic,
+        metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_2_kb", entrySizeBuckets[3],
                 splitTopicAndPartitionIndexLabel);
-        writeMetric(stream, "pulsar_entry_size_le_4_kb", entrySizeBuckets[4], cluster, namespace, topic,
+        metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_4_kb", entrySizeBuckets[4],
                 splitTopicAndPartitionIndexLabel);
-        writeMetric(stream, "pulsar_entry_size_le_16_kb", entrySizeBuckets[5], cluster, namespace, topic,
+        metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_16_kb", entrySizeBuckets[5],
                 splitTopicAndPartitionIndexLabel);
-        writeMetric(stream, "pulsar_entry_size_le_100_kb", entrySizeBuckets[6], cluster, namespace, topic,
+        metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_100_kb", entrySizeBuckets[6],
                 splitTopicAndPartitionIndexLabel);
-        writeMetric(stream, "pulsar_entry_size_le_1_mb", entrySizeBuckets[7], cluster, namespace, topic,
+        metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_1_mb", entrySizeBuckets[7],
                 splitTopicAndPartitionIndexLabel);
-        writeMetric(stream, "pulsar_entry_size_le_overflow", entrySizeBuckets[8], cluster, namespace, topic,
+        metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_overflow", entrySizeBuckets[8],
                 splitTopicAndPartitionIndexLabel);
-        writeMetric(stream, "pulsar_entry_size_count", stats.managedLedgerStats.entrySizeBuckets.getCount(),
-                cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
-        writeMetric(stream, "pulsar_entry_size_sum", stats.managedLedgerStats.entrySizeBuckets.getSum(),
-                cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+        metric(stream, cluster, namespace, topic, "pulsar_entry_size_count",
+                stats.managedLedgerStats.entrySizeBuckets.getCount(), splitTopicAndPartitionIndexLabel);
+        metric(stream, cluster, namespace, topic, "pulsar_entry_size_sum",
+                stats.managedLedgerStats.entrySizeBuckets.getSum(), splitTopicAndPartitionIndexLabel);
 
         stats.producerStats.forEach((p, producerStats) -> {
-            writeProducerMetric(stream, "pulsar_producer_msg_rate_in", producerStats.msgRateIn,
-                    cluster, namespace, topic, p, producerStats.producerId, splitTopicAndPartitionIndexLabel);
-            writeProducerMetric(stream, "pulsar_producer_msg_throughput_in", producerStats.msgThroughputIn,
-                    cluster, namespace, topic, p, producerStats.producerId, splitTopicAndPartitionIndexLabel);
-            writeProducerMetric(stream, "pulsar_producer_msg_average_Size", producerStats.averageMsgSize,
-                    cluster, namespace, topic, p, producerStats.producerId, splitTopicAndPartitionIndexLabel);
+            metric(stream, cluster, namespace, topic, p, producerStats.producerId, "pulsar_producer_msg_rate_in",
+                    producerStats.msgRateIn, splitTopicAndPartitionIndexLabel);
+            metric(stream, cluster, namespace, topic, p, producerStats.producerId, "pulsar_producer_msg_throughput_in",
+                    producerStats.msgThroughputIn, splitTopicAndPartitionIndexLabel);
+            metric(stream, cluster, namespace, topic, p, producerStats.producerId, "pulsar_producer_msg_average_Size",
+                    producerStats.averageMsgSize, splitTopicAndPartitionIndexLabel);
         });
 
-        stats.subscriptionStats.forEach((sub, subsStats) -> {
-            writeSubscriptionMetric(stream, "pulsar_subscription_back_log", subsStats.msgBacklog,
-                    cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
-            writeSubscriptionMetric(stream, "pulsar_subscription_back_log_no_delayed",
-                    subsStats.msgBacklogNoDelayed, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
-            writeSubscriptionMetric(stream, "pulsar_subscription_delayed",
-                    subsStats.msgDelayed, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
-            writeSubscriptionMetric(stream, "pulsar_subscription_msg_rate_redeliver",
-                    subsStats.msgRateRedeliver, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
-            writeSubscriptionMetric(stream, "pulsar_subscription_unacked_messages",
-                    subsStats.unackedMessages, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
-            writeSubscriptionMetric(stream, "pulsar_subscription_blocked_on_unacked_messages",
-                    subsStats.blockedSubscriptionOnUnackedMsgs ? 1 : 0, cluster, namespace, topic, sub,
-                    splitTopicAndPartitionIndexLabel);
-            writeSubscriptionMetric(stream, "pulsar_subscription_msg_rate_out",
-                    subsStats.msgRateOut, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
-            writeSubscriptionMetric(stream, "pulsar_subscription_msg_ack_rate",
-                    subsStats.messageAckRate, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
-            writeSubscriptionMetric(stream, "pulsar_subscription_msg_throughput_out",
-                    subsStats.msgThroughputOut, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
-            writeSubscriptionMetric(stream, "pulsar_out_bytes_total",
-                    subsStats.bytesOutCounter, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
-            writeSubscriptionMetric(stream, "pulsar_out_messages_total",
-                    subsStats.msgOutCounter, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
-            writeSubscriptionMetric(stream, "pulsar_subscription_last_expire_timestamp",
-                    subsStats.lastExpireTimestamp, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
-            writeSubscriptionMetric(stream, "pulsar_subscription_last_acked_timestamp",
-                    subsStats.lastAckedTimestamp, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
-            writeSubscriptionMetric(stream, "pulsar_subscription_last_consumed_flow_timestamp",
-                    subsStats.lastConsumedFlowTimestamp, cluster, namespace, topic, sub,
-                    splitTopicAndPartitionIndexLabel);
-            writeSubscriptionMetric(stream, "pulsar_subscription_last_consumed_timestamp",
-                    subsStats.lastConsumedTimestamp, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
-            writeSubscriptionMetric(stream, "pulsar_subscription_last_mark_delete_advanced_timestamp",
-                    subsStats.lastMarkDeleteAdvancedTimestamp, cluster, namespace, topic, sub,
-                    splitTopicAndPartitionIndexLabel);
-            writeSubscriptionMetric(stream, "pulsar_subscription_msg_rate_expired",
-                    subsStats.msgRateExpired, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
-            writeSubscriptionMetric(stream, "pulsar_subscription_total_msg_expired",
-                    subsStats.totalMsgExpired, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
-
+        stats.subscriptionStats.forEach((n, subsStats) -> {
+            metric(stream, cluster, namespace, topic, n, "pulsar_subscription_back_log",
+                    subsStats.msgBacklog, splitTopicAndPartitionIndexLabel);
+            metric(stream, cluster, namespace, topic, n, "pulsar_subscription_back_log_no_delayed",
+                    subsStats.msgBacklogNoDelayed, splitTopicAndPartitionIndexLabel);
+            metric(stream, cluster, namespace, topic, n, "pulsar_subscription_delayed",
+                    subsStats.msgDelayed, splitTopicAndPartitionIndexLabel);
+            metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_rate_redeliver",
+                    subsStats.msgRateRedeliver, splitTopicAndPartitionIndexLabel);
+            metric(stream, cluster, namespace, topic, n, "pulsar_subscription_unacked_messages",
+                    subsStats.unackedMessages, splitTopicAndPartitionIndexLabel);
+            metric(stream, cluster, namespace, topic, n, "pulsar_subscription_blocked_on_unacked_messages",
+                    subsStats.blockedSubscriptionOnUnackedMsgs ? 1 : 0, splitTopicAndPartitionIndexLabel);
+            metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_rate_out",
+                    subsStats.msgRateOut, splitTopicAndPartitionIndexLabel);
+            metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_ack_rate",
+                    subsStats.messageAckRate, splitTopicAndPartitionIndexLabel);
+            metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_throughput_out",
+                    subsStats.msgThroughputOut, splitTopicAndPartitionIndexLabel);
+            metric(stream, cluster, namespace, topic, n, "pulsar_out_bytes_total",
+                    subsStats.bytesOutCounter, splitTopicAndPartitionIndexLabel);
+            metric(stream, cluster, namespace, topic, n, "pulsar_out_messages_total",
+                    subsStats.msgOutCounter, splitTopicAndPartitionIndexLabel);
+            metric(stream, cluster, namespace, topic, n, "pulsar_subscription_last_expire_timestamp",
+                    subsStats.lastExpireTimestamp, splitTopicAndPartitionIndexLabel);
+            metric(stream, cluster, namespace, topic, n, "pulsar_subscription_last_acked_timestamp",
+                subsStats.lastAckedTimestamp, splitTopicAndPartitionIndexLabel);
+            metric(stream, cluster, namespace, topic, n, "pulsar_subscription_last_consumed_flow_timestamp",
+                subsStats.lastConsumedFlowTimestamp, splitTopicAndPartitionIndexLabel);
+            metric(stream, cluster, namespace, topic, n, "pulsar_subscription_last_consumed_timestamp",
+                subsStats.lastConsumedTimestamp, splitTopicAndPartitionIndexLabel);
+            metric(stream, cluster, namespace, topic, n, "pulsar_subscription_last_mark_delete_advanced_timestamp",
+                subsStats.lastMarkDeleteAdvancedTimestamp, splitTopicAndPartitionIndexLabel);
+            metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_rate_expired",
+                    subsStats.msgRateExpired, splitTopicAndPartitionIndexLabel);
+            metric(stream, cluster, namespace, topic, n, "pulsar_subscription_total_msg_expired",
+                    subsStats.totalMsgExpired, splitTopicAndPartitionIndexLabel);
             subsStats.consumerStat.forEach((c, consumerStats) -> {
-                writeConsumerMetric(stream, "pulsar_consumer_msg_rate_redeliver", consumerStats.msgRateRedeliver,
-                        cluster, namespace, topic, sub, c, splitTopicAndPartitionIndexLabel);
-                writeConsumerMetric(stream, "pulsar_consumer_unacked_messages", consumerStats.unackedMessages,
-                        cluster, namespace, topic, sub, c, splitTopicAndPartitionIndexLabel);
-                writeConsumerMetric(stream, "pulsar_consumer_blocked_on_unacked_messages",
+                metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(),
+                        "pulsar_consumer_msg_rate_redeliver", consumerStats.msgRateRedeliver,
+                        splitTopicAndPartitionIndexLabel);
+                metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(),
+                        "pulsar_consumer_unacked_messages", consumerStats.unackedMessages,
+                        splitTopicAndPartitionIndexLabel);
+                metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(),
+                        "pulsar_consumer_blocked_on_unacked_messages",
                         consumerStats.blockedSubscriptionOnUnackedMsgs ? 1 : 0,
-                        cluster, namespace, topic, sub, c, splitTopicAndPartitionIndexLabel);
-                writeConsumerMetric(stream, "pulsar_consumer_msg_rate_out", consumerStats.msgRateOut,
-                        cluster, namespace, topic, sub, c, splitTopicAndPartitionIndexLabel);
-
-                writeConsumerMetric(stream, "pulsar_consumer_msg_ack_rate", consumerStats.msgAckRate,
-                        cluster, namespace, topic, sub, c, splitTopicAndPartitionIndexLabel);
-
-                writeConsumerMetric(stream, "pulsar_consumer_msg_throughput_out", consumerStats.msgThroughputOut,
-                        cluster, namespace, topic, sub, c, splitTopicAndPartitionIndexLabel);
-                writeConsumerMetric(stream, "pulsar_consumer_available_permits", consumerStats.availablePermits,
-                        cluster, namespace, topic, sub, c, splitTopicAndPartitionIndexLabel);
-                writeConsumerMetric(stream, "pulsar_out_bytes_total", consumerStats.bytesOutCounter,
-                        cluster, namespace, topic, sub, c, splitTopicAndPartitionIndexLabel);
-                writeConsumerMetric(stream, "pulsar_out_messages_total", consumerStats.msgOutCounter,
-                        cluster, namespace, topic, sub, c, splitTopicAndPartitionIndexLabel);
+                        splitTopicAndPartitionIndexLabel);
+                metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(),
+                        "pulsar_consumer_msg_rate_out", consumerStats.msgRateOut,
+                        splitTopicAndPartitionIndexLabel);
+                metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(),
+                "pulsar_consumer_msg_ack_rate", consumerStats.msgAckRate,
+                        splitTopicAndPartitionIndexLabel);
+                metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(),
+                        "pulsar_consumer_msg_throughput_out", consumerStats.msgThroughputOut,
+                        splitTopicAndPartitionIndexLabel);
+                metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(),
+                        "pulsar_consumer_available_permits", consumerStats.availablePermits,
+                        splitTopicAndPartitionIndexLabel);
+                metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(),
+                        "pulsar_out_bytes_total", consumerStats.bytesOutCounter,
+                        splitTopicAndPartitionIndexLabel);
+                metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(),
+                        "pulsar_out_messages_total", consumerStats.msgOutCounter,
+                        splitTopicAndPartitionIndexLabel);
             });
         });
 
         if (!stats.replicationStats.isEmpty()) {
             stats.replicationStats.forEach((remoteCluster, replStats) -> {
-                writeMetric(stream, "pulsar_replication_rate_in", replStats.msgRateIn,
-                        cluster, namespace, topic, remoteCluster, splitTopicAndPartitionIndexLabel);
-                writeMetric(stream, "pulsar_replication_rate_out", replStats.msgRateOut,
-                        cluster, namespace, topic, remoteCluster, splitTopicAndPartitionIndexLabel);
-                writeMetric(stream, "pulsar_replication_throughput_in", replStats.msgThroughputIn,
-                        cluster, namespace, topic, remoteCluster, splitTopicAndPartitionIndexLabel);
-                writeMetric(stream, "pulsar_replication_throughput_out", replStats.msgThroughputOut,
-                        cluster, namespace, topic, remoteCluster, splitTopicAndPartitionIndexLabel);
-                writeMetric(stream, "pulsar_replication_backlog", replStats.replicationBacklog,
-                        cluster, namespace, topic, remoteCluster, splitTopicAndPartitionIndexLabel);
-                writeMetric(stream, "pulsar_replication_connected_count", replStats.connectedCount,
-                        cluster, namespace, topic, remoteCluster, splitTopicAndPartitionIndexLabel);
-                writeMetric(stream, "pulsar_replication_rate_expired", replStats.msgRateExpired,
-                        cluster, namespace, topic, remoteCluster, splitTopicAndPartitionIndexLabel);
-                writeMetric(stream, "pulsar_replication_delay_in_seconds", replStats.replicationDelayInSeconds,
-                        cluster, namespace, topic, remoteCluster, splitTopicAndPartitionIndexLabel);
+                metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_rate_in", remoteCluster,
+                        replStats.msgRateIn, splitTopicAndPartitionIndexLabel);
+                metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_rate_out", remoteCluster,
+                        replStats.msgRateOut, splitTopicAndPartitionIndexLabel);
+                metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_throughput_in",
+                        remoteCluster, replStats.msgThroughputIn, splitTopicAndPartitionIndexLabel);
+                metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_throughput_out",
+                        remoteCluster, replStats.msgThroughputOut, splitTopicAndPartitionIndexLabel);
+                metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_backlog", remoteCluster,
+                        replStats.replicationBacklog, splitTopicAndPartitionIndexLabel);
+                metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_connected_count",
+                        remoteCluster, replStats.connectedCount, splitTopicAndPartitionIndexLabel);
+                metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_rate_expired",
+                        remoteCluster, replStats.msgRateExpired, splitTopicAndPartitionIndexLabel);
+                metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_delay_in_seconds",
+                        remoteCluster, replStats.replicationDelayInSeconds, splitTopicAndPartitionIndexLabel);
             });
         }
 
-        writeMetric(stream, "pulsar_in_bytes_total", stats.bytesInCounter, cluster, namespace, topic,
+        metric(stream, cluster, namespace, topic, "pulsar_in_bytes_total", stats.bytesInCounter,
                 splitTopicAndPartitionIndexLabel);
-        writeMetric(stream, "pulsar_in_messages_total", stats.msgInCounter, cluster, namespace, topic,
+        metric(stream, cluster, namespace, topic, "pulsar_in_messages_total", stats.msgInCounter,
                 splitTopicAndPartitionIndexLabel);
 
         // Compaction
         boolean hasCompaction = compactorMXBean.flatMap(mxBean -> mxBean.getCompactionRecordForTopic(topic))
-                .isPresent();
+                .map(__ -> true).orElse(false);
         if (hasCompaction) {
-            writeMetric(stream, "pulsar_compaction_removed_event_count",
-                    stats.compactionRemovedEventCount, cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
-            writeMetric(stream, "pulsar_compaction_succeed_count",
-                    stats.compactionSucceedCount, cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
-            writeMetric(stream, "pulsar_compaction_failed_count",
-                    stats.compactionFailedCount, cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
-            writeMetric(stream, "pulsar_compaction_duration_time_in_mills",
-                    stats.compactionDurationTimeInMills, cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
-            writeMetric(stream, "pulsar_compaction_read_throughput",
-                    stats.compactionReadThroughput, cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
-            writeMetric(stream, "pulsar_compaction_write_throughput",
-                    stats.compactionWriteThroughput, cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
-            writeMetric(stream, "pulsar_compaction_compacted_entries_count",
-                    stats.compactionCompactedEntriesCount, cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
-            writeMetric(stream, "pulsar_compaction_compacted_entries_size",
-                    stats.compactionCompactedEntriesSize, cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
-
-            long[] compactionBuckets = stats.compactionLatencyBuckets.getBuckets();
-            writeMetric(stream, "pulsar_compaction_latency_le_0_5",
-                    compactionBuckets[0], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
-            writeMetric(stream, "pulsar_compaction_latency_le_1",
-                    compactionBuckets[1], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
-            writeMetric(stream, "pulsar_compaction_latency_le_5",
-                    compactionBuckets[2], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
-            writeMetric(stream, "pulsar_compaction_latency_le_10",
-                    compactionBuckets[3], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
-            writeMetric(stream, "pulsar_compaction_latency_le_20",
-                    compactionBuckets[4], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
-            writeMetric(stream, "pulsar_compaction_latency_le_50",
-                    compactionBuckets[5], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
-            writeMetric(stream, "pulsar_compaction_latency_le_100",
-                    compactionBuckets[6], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
-            writeMetric(stream, "pulsar_compaction_latency_le_200",
-                    compactionBuckets[7], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
-            writeMetric(stream, "pulsar_compaction_latency_le_1000",
-                    compactionBuckets[8], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
-            writeMetric(stream, "pulsar_compaction_latency_overflow",
-                    compactionBuckets[9], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
-            writeMetric(stream, "pulsar_compaction_latency_sum",
-                    stats.compactionLatencyBuckets.getSum(), cluster, namespace, topic,
-                    splitTopicAndPartitionIndexLabel);
-            writeMetric(stream, "pulsar_compaction_latency_count",
-                    stats.compactionLatencyBuckets.getCount(), cluster, namespace, topic,
-                    splitTopicAndPartitionIndexLabel);
+            metric(stream, cluster, namespace, topic, "pulsar_compaction_removed_event_count",
+                    stats.compactionRemovedEventCount, splitTopicAndPartitionIndexLabel);
+            metric(stream, cluster, namespace, topic, "pulsar_compaction_succeed_count",
+                    stats.compactionSucceedCount, splitTopicAndPartitionIndexLabel);
+            metric(stream, cluster, namespace, topic, "pulsar_compaction_failed_count",
+                    stats.compactionFailedCount, splitTopicAndPartitionIndexLabel);
+            metric(stream, cluster, namespace, topic, "pulsar_compaction_duration_time_in_mills",
+                    stats.compactionDurationTimeInMills, splitTopicAndPartitionIndexLabel);
+            metric(stream, cluster, namespace, topic, "pulsar_compaction_read_throughput",
+                    stats.compactionReadThroughput, splitTopicAndPartitionIndexLabel);
+            metric(stream, cluster, namespace, topic, "pulsar_compaction_write_throughput",
+                    stats.compactionWriteThroughput, splitTopicAndPartitionIndexLabel);
+            metric(stream, cluster, namespace, topic, "pulsar_compaction_compacted_entries_count",
+                    stats.compactionCompactedEntriesCount, splitTopicAndPartitionIndexLabel);
+            metric(stream, cluster, namespace, topic, "pulsar_compaction_compacted_entries_size",
+                    stats.compactionCompactedEntriesSize, splitTopicAndPartitionIndexLabel);
+            long[] compactionLatencyBuckets = stats.compactionLatencyBuckets.getBuckets();
+            metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_0_5",
+                    compactionLatencyBuckets[0], splitTopicAndPartitionIndexLabel);
+            metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_1",
+                    compactionLatencyBuckets[1], splitTopicAndPartitionIndexLabel);
+            metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_5",
+                    compactionLatencyBuckets[2], splitTopicAndPartitionIndexLabel);
+            metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_10",
+                    compactionLatencyBuckets[3], splitTopicAndPartitionIndexLabel);
+            metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_20",
+                    compactionLatencyBuckets[4], splitTopicAndPartitionIndexLabel);
+            metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_50",
+                    compactionLatencyBuckets[5], splitTopicAndPartitionIndexLabel);
+            metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_100",
+                    compactionLatencyBuckets[6], splitTopicAndPartitionIndexLabel);
+            metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_200",
+                    compactionLatencyBuckets[7], splitTopicAndPartitionIndexLabel);
+            metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_1000",
+                    compactionLatencyBuckets[8], splitTopicAndPartitionIndexLabel);
+            metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_overflow",
+                    compactionLatencyBuckets[9], splitTopicAndPartitionIndexLabel);
+            metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_sum",
+                    stats.compactionLatencyBuckets.getSum(), splitTopicAndPartitionIndexLabel);
+            metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_count",
+                    stats.compactionLatencyBuckets.getCount(), splitTopicAndPartitionIndexLabel);
         }
     }
 
-    private static void writeMetric(PrometheusMetricStreams stream, String metricName, Number value, String cluster,
-                                    String namespace, String topic, boolean splitTopicAndPartitionIndexLabel) {
-        writeTopicMetric(stream, metricName, value, cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+    static void metricType(SimpleTextOutputStream stream, String name) {
+
+        if (!metricWithTypeDefinition.containsKey(name)) {
+            metricWithTypeDefinition.put(name, "gauge");
+            stream.write("# TYPE ").write(name).write(" gauge\n");
+        }
+
+    }
+
+    private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic,
+            String name, double value, boolean splitTopicAndPartitionIndexLabel) {
+        metricType(stream, name);
+        appendRequiredLabels(stream, cluster, namespace, topic, name, splitTopicAndPartitionIndexLabel).write("\"} ");
+        stream.write(value);
+        appendEndings(stream);
     }
 
-    private static void writeMetric(PrometheusMetricStreams stream, String metricName, Number value, String cluster,
-                                    String namespace, String topic, String remoteCluster,
-                                    boolean splitTopicAndPartitionIndexLabel) {
-        writeTopicMetric(stream, metricName, value, cluster, namespace, topic, splitTopicAndPartitionIndexLabel,
-                "remote_cluster", remoteCluster);
+    private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic,
+           String subscription, String name, long value, boolean splitTopicAndPartitionIndexLabel) {
+        metricType(stream, name);
+        appendRequiredLabels(stream, cluster, namespace, topic, name, splitTopicAndPartitionIndexLabel)
+                .write("\",subscription=\"").write(subscription).write("\"} ");
+        stream.write(value);
+        appendEndings(stream);
     }
 
-    private static void writeProducerMetric(PrometheusMetricStreams stream, String metricName, Number value,
-                                            String cluster, String namespace, String topic, String producer,
-                                            long producerId, boolean splitTopicAndPartitionIndexLabel) {
-        writeTopicMetric(stream, metricName, value, cluster, namespace, topic, splitTopicAndPartitionIndexLabel,
-                "producer_name", producer, "producer_id", String.valueOf(producerId));
+    private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic,
+            String producerName, long produceId, String name, double value, boolean splitTopicAndPartitionIndexLabel) {
+        metricType(stream, name);
+        appendRequiredLabels(stream, cluster, namespace, topic, name, splitTopicAndPartitionIndexLabel)
+                .write("\",producer_name=\"").write(producerName)
+                .write("\",producer_id=\"").write(produceId).write("\"} ");
+        stream.write(value);
+        appendEndings(stream);
     }
 
+    private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic,
+            String subscription, String name, double value, boolean splitTopicAndPartitionIndexLabel) {
+        metricType(stream, name);
+        appendRequiredLabels(stream, cluster, namespace, topic, name, splitTopicAndPartitionIndexLabel)
+                .write("\",subscription=\"").write(subscription).write("\"} ");
+        stream.write(value);
+        appendEndings(stream);
+    }
+
+    private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic,
+            String subscription, String consumerName, long consumerId, String name, long value,
+            boolean splitTopicAndPartitionIndexLabel) {
+        metricType(stream, name);
+        appendRequiredLabels(stream, cluster, namespace, topic, name, splitTopicAndPartitionIndexLabel)
+                .write("\",subscription=\"").write(subscription)
+                .write("\",consumer_name=\"").write(consumerName).write("\",consumer_id=\"").write(consumerId)
+                .write("\"} ");
+        stream.write(value);
+        appendEndings(stream);
+    }
 
-    private static void writeSubscriptionMetric(PrometheusMetricStreams stream, String metricName, Number value,
-                                                String cluster, String namespace, String topic, String subscription,
-                                                boolean splitTopicAndPartitionIndexLabel) {
-        writeTopicMetric(stream, metricName, value, cluster, namespace, topic, splitTopicAndPartitionIndexLabel,
-                "subscription", subscription);
+    private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic,
+            String subscription, String consumerName, long consumerId, String name, double value,
+            boolean splitTopicAndPartitionIndexLabel) {
+        metricType(stream, name);
+        appendRequiredLabels(stream, cluster, namespace, topic, name, splitTopicAndPartitionIndexLabel)
+                .write("\",subscription=\"").write(subscription)
+                .write("\",consumer_name=\"").write(consumerName).write("\",consumer_id=\"")
+                .write(consumerId).write("\"} ");
+        stream.write(value);
+        appendEndings(stream);
     }
 
-    private static void writeConsumerMetric(PrometheusMetricStreams stream, String metricName, Number value,
-                                            String cluster, String namespace, String topic, String subscription,
-                                            Consumer consumer, boolean splitTopicAndPartitionIndexLabel) {
-        writeTopicMetric(stream, metricName, value, cluster, namespace, topic, splitTopicAndPartitionIndexLabel,
-                "subscription", subscription, "consumer_name", consumer.consumerName(),
-                "consumer_id", String.valueOf(consumer.consumerId()));
+    private static void metricWithRemoteCluster(SimpleTextOutputStream stream, String cluster, String namespace,
+            String topic, String name, String remoteCluster, double value, boolean splitTopicAndPartitionIndexLabel) {
+        metricType(stream, name);
+        appendRequiredLabels(stream, cluster, namespace, topic, name, splitTopicAndPartitionIndexLabel)
+                .write("\",remote_cluster=\"").write(remoteCluster).write("\"} ");
+        stream.write(value);
+        appendEndings(stream);
     }
 
-    static void writeTopicMetric(PrometheusMetricStreams stream, String metricName, Number value, String cluster,
-                                 String namespace, String topic, boolean splitTopicAndPartitionIndexLabel,
-                                 String... extraLabelsAndValues) {
-        String[] labelsAndValues = new String[splitTopicAndPartitionIndexLabel ? 8 : 6];
-        labelsAndValues[0] = "cluster";
-        labelsAndValues[1] = cluster;
-        labelsAndValues[2] = "namespace";
-        labelsAndValues[3] = namespace;
-        labelsAndValues[4] = "topic";
+    private static SimpleTextOutputStream appendRequiredLabels(SimpleTextOutputStream stream, String cluster,
+            String namespace, String topic, String name, boolean splitTopicAndPartitionIndexLabel) {
+        stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace);
         if (splitTopicAndPartitionIndexLabel) {
             int index = topic.indexOf(PARTITIONED_TOPIC_SUFFIX);
             if (index > 0) {
-                labelsAndValues[5] = topic.substring(0, index);
-                labelsAndValues[6] = "partition";
-                labelsAndValues[7] = topic.substring(index + PARTITIONED_TOPIC_SUFFIX.length());
+                stream.write("\",topic=\"").write(topic.substring(0, index)).write("\",partition=\"")
+                        .write(topic.substring(index + PARTITIONED_TOPIC_SUFFIX.length()));
             } else {
-                labelsAndValues[5] = topic;
-                labelsAndValues[6] = "partition";
-                labelsAndValues[7] = "-1";
+                stream.write("\",topic=\"").write(topic).write("\",partition=\"").write("-1");
             }
         } else {
-            labelsAndValues[5] = topic;
+            stream.write("\",topic=\"").write(topic);
         }
-        String[] labels = ArrayUtils.addAll(labelsAndValues, extraLabelsAndValues);
-        stream.writeSample(metricName, value, labels);
+        return stream;
+    }
+
+    private static void appendEndings(SimpleTextOutputStream stream) {
+        stream.write(' ').write(System.currentTimeMillis()).write('\n');
     }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java
index 8c58b516333..e6ac1535f43 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java
@@ -20,6 +20,8 @@ package org.apache.pulsar.broker.stats.prometheus;
 
 import static org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsEventsNames;
 import io.netty.util.concurrent.FastThreadLocal;
+import java.util.HashMap;
+import java.util.Map;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl;
@@ -28,6 +30,7 @@ import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
 import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl;
 import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
 import org.apache.pulsar.transaction.coordinator.impl.TransactionMetadataStoreStats;
@@ -35,10 +38,21 @@ import org.apache.pulsar.transaction.coordinator.impl.TransactionMetadataStoreSt
 @Slf4j
 public class TransactionAggregator {
 
+    /**
+     * Used for tracking duplicate TYPE definitions.
+     */
+    private static final FastThreadLocal<Map<String, String>> threadLocalMetricWithTypeDefinition =
+            new FastThreadLocal() {
+                @Override
+                protected Map<String, String> initialValue() {
+                    return new HashMap<>();
+                }
+             };
+
     private static final FastThreadLocal<AggregatedTransactionCoordinatorStats> localTransactionCoordinatorStats =
             new FastThreadLocal<AggregatedTransactionCoordinatorStats>() {
                 @Override
-                protected AggregatedTransactionCoordinatorStats initialValue() {
+                protected AggregatedTransactionCoordinatorStats initialValue() throws Exception {
                     return new AggregatedTransactionCoordinatorStats();
                 }
             };
@@ -46,18 +60,21 @@ public class TransactionAggregator {
     private static final FastThreadLocal<ManagedLedgerStats> localManageLedgerStats =
             new FastThreadLocal<ManagedLedgerStats>() {
                 @Override
-                protected ManagedLedgerStats initialValue() {
+                protected ManagedLedgerStats initialValue() throws Exception {
                     return new ManagedLedgerStats();
                 }
             };
 
-    public static void generate(PulsarService pulsar, PrometheusMetricStreams stream, boolean includeTopicMetrics) {
+    public static void generate(PulsarService pulsar, SimpleTextOutputStream stream, boolean includeTopicMetrics) {
         String cluster = pulsar.getConfiguration().getClusterName();
+        Map<String, String> metricWithTypeDefinition = threadLocalMetricWithTypeDefinition.get();
+        metricWithTypeDefinition.clear();
 
         if (includeTopicMetrics) {
+            pulsar.getBrokerService().getMultiLayerTopicMap().forEach((namespace, bundlesMap) -> {
 
-            pulsar.getBrokerService().getMultiLayerTopicMap().forEach((namespace, bundlesMap) ->
-                    bundlesMap.forEach((bundle, topicsMap) -> topicsMap.forEach((name, topic) -> {
+                bundlesMap.forEach((bundle, topicsMap) -> {
+                    topicsMap.forEach((name, topic) -> {
                         if (topic instanceof PersistentTopic) {
                             topic.getSubscriptions().values().forEach(subscription -> {
                                 try {
@@ -65,8 +82,9 @@ public class TransactionAggregator {
                                     if (!checkTopicIsEventsNames(TopicName.get(subscription.getTopic().getName()))
                                             && subscription instanceof  PersistentSubscription
                                             && ((PersistentSubscription) subscription).checkIfPendingAckStoreInit()) {
-                                        ManagedLedger managedLedger = ((PersistentSubscription) subscription)
-                                                .getPendingAckManageLedger().get();
+                                        ManagedLedger managedLedger =
+                                                ((PersistentSubscription) subscription)
+                                                        .getPendingAckManageLedger().get();
                                         generateManageLedgerStats(managedLedger,
                                                 stream, cluster, namespace, name, subscription.getName());
                                     }
@@ -75,7 +93,9 @@ public class TransactionAggregator {
                                 }
                             });
                         }
-                    })));
+                    });
+                });
+            });
         }
         AggregatedTransactionCoordinatorStats transactionCoordinatorStats = localTransactionCoordinatorStats.get();
 
@@ -104,18 +124,18 @@ public class TransactionAggregator {
 
                     localManageLedgerStats.get().reset();
                     if (transactionMetadataStore instanceof MLTransactionMetadataStore) {
-                        ManagedLedger managedLedger =
-                                ((MLTransactionMetadataStore) transactionMetadataStore).getManagedLedger();
+                       ManagedLedger managedLedger =
+                               ((MLTransactionMetadataStore) transactionMetadataStore).getManagedLedger();
                         generateManageLedgerStats(managedLedger,
                                 stream, cluster, NamespaceName.SYSTEM_NAMESPACE.toString(),
                                 MLTransactionLogImpl.TRANSACTION_LOG_PREFIX + transactionCoordinatorID.getId(),
                                 MLTransactionLogImpl.TRANSACTION_SUBSCRIPTION_NAME);
                     }
 
-                });
+        });
     }
 
-    private static void generateManageLedgerStats(ManagedLedger managedLedger, PrometheusMetricStreams stream,
+    private static void generateManageLedgerStats(ManagedLedger managedLedger, SimpleTextOutputStream stream,
                                                   String cluster, String namespace, String topic, String subscription) {
         ManagedLedgerStats managedLedgerStats = localManageLedgerStats.get();
         ManagedLedgerMBeanImpl mlStats = (ManagedLedgerMBeanImpl) managedLedger.getStats();
@@ -137,149 +157,174 @@ public class TransactionAggregator {
 
         managedLedgerStats.storageWriteRate = mlStats.getAddEntryMessagesRate();
         managedLedgerStats.storageReadRate = mlStats.getReadEntriesRate();
-        printManageLedgerStats(stream, cluster, namespace, topic, subscription, managedLedgerStats);
+        printManageLedgerStats(stream, cluster, namespace, topic,
+                subscription, managedLedgerStats);
+    }
+
+    private static void metricType(SimpleTextOutputStream stream, String name) {
+        Map<String, String> metricWithTypeDefinition = threadLocalMetricWithTypeDefinition.get();
+        if (!metricWithTypeDefinition.containsKey(name)) {
+            metricWithTypeDefinition.put(name, "gauge");
+            stream.write("# TYPE ").write(name).write(" gauge\n");
+        }
+
+    }
+
+    private static void metric(SimpleTextOutputStream stream, String cluster, String name,
+                               double value, long coordinatorId) {
+        metricType(stream, name);
+        stream.write(name)
+                .write("{cluster=\"").write(cluster)
+                .write("\",coordinator_id=\"").write(coordinatorId).write("\"} ")
+                .write(value).write(' ').write(System.currentTimeMillis())
+                .write('\n');
     }
 
-    private static void printManageLedgerStats(PrometheusMetricStreams stream, String cluster, String namespace,
+    private static void metrics(SimpleTextOutputStream stream, String cluster, String namespace,
+                                String topic, String subscription, String name, long value) {
+        stream.write(name).write("{cluster=\"").write(cluster).write("\", namespace=\"").write(namespace)
+                .write("\",topic=\"").write(topic).write("\",subscription=\"").write(subscription).write("\"} ");
+        stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
+    }
+
+    private static void metrics(SimpleTextOutputStream stream, String cluster, String namespace,
+                                String topic, String subscription, String name, double value) {
+        stream.write(name).write("{cluster=\"").write(cluster).write("\", namespace=\"").write(namespace)
+                .write("\",topic=\"").write(topic).write("\",subscription=\"").write(subscription).write("\"} ");
+        stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
+    }
+
+    private static void printManageLedgerStats(SimpleTextOutputStream stream, String cluster, String namespace,
                                                String topic, String subscription, ManagedLedgerStats stats) {
 
-        writeMetric(stream, "pulsar_storage_size", stats.storageSize, cluster, namespace, topic, subscription);
-        writeMetric(stream, "pulsar_storage_logical_size", stats.storageLogicalSize, cluster, namespace, topic,
-                subscription);
-        writeMetric(stream, "pulsar_storage_backlog_size", stats.backlogSize, cluster, namespace, topic,
-                subscription);
-        writeMetric(stream, "pulsar_storage_offloaded_size", stats.offloadedStorageUsed, cluster, namespace, topic,
-                subscription);
+        metrics(stream, cluster, namespace, topic, subscription,
+                "pulsar_storage_size", stats.storageSize);
+        metrics(stream, cluster, namespace, topic, subscription,
+                "pulsar_storage_logical_size", stats.storageLogicalSize);
+        metrics(stream, cluster, namespace, topic, subscription,
+                "pulsar_storage_backlog_size", stats.backlogSize);
+        metrics(stream, cluster, namespace, topic, subscription,
+                "pulsar_storage_offloaded_size", stats.offloadedStorageUsed);
 
-        writeMetric(stream, "pulsar_storage_write_rate", stats.storageWriteRate, cluster, namespace, topic,
-                subscription);
-        writeMetric(stream, "pulsar_storage_read_rate", stats.storageReadRate, cluster, namespace, topic,
-                subscription);
+        metrics(stream, cluster, namespace, topic, subscription,
+                "pulsar_storage_write_rate", stats.storageWriteRate);
+        metrics(stream, cluster, namespace, topic, subscription,
+                "pulsar_storage_read_rate", stats.storageReadRate);
 
         stats.storageWriteLatencyBuckets.refresh();
         long[] latencyBuckets = stats.storageWriteLatencyBuckets.getBuckets();
-        writeMetric(stream, "pulsar_storage_write_latency_le_0_5", latencyBuckets[0], cluster, namespace, topic,
-                subscription);
-        writeMetric(stream, "pulsar_storage_write_latency_le_1", latencyBuckets[1], cluster, namespace, topic,
-                subscription);
-        writeMetric(stream, "pulsar_storage_write_latency_le_5", latencyBuckets[2], cluster, namespace, topic,
-                subscription);
-        writeMetric(stream, "pulsar_storage_write_latency_le_10", latencyBuckets[3], cluster, namespace, topic,
-                subscription);
-        writeMetric(stream, "pulsar_storage_write_latency_le_20", latencyBuckets[4], cluster, namespace, topic,
-                subscription);
-        writeMetric(stream, "pulsar_storage_write_latency_le_50", latencyBuckets[5], cluster, namespace, topic,
-                subscription);
-        writeMetric(stream, "pulsar_storage_write_latency_le_100", latencyBuckets[6], cluster, namespace, topic,
-                subscription);
-        writeMetric(stream, "pulsar_storage_write_latency_le_200", latencyBuckets[7], cluster, namespace, topic,
-                subscription);
-        writeMetric(stream, "pulsar_storage_write_latency_le_1000", latencyBuckets[8], cluster, namespace, topic,
-                subscription);
-        writeMetric(stream, "pulsar_storage_write_latency_overflow", latencyBuckets[9], cluster, namespace, topic,
-                subscription);
-        writeMetric(stream, "pulsar_storage_write_latency_count", stats.storageWriteLatencyBuckets.getCount(),
-                cluster, namespace, topic, subscription);
-        writeMetric(stream, "pulsar_storage_write_latency_sum", stats.storageWriteLatencyBuckets.getSum(), cluster,
-                namespace, topic, subscription);
+        metric(stream, cluster, namespace, topic, subscription,
+                "pulsar_storage_write_latency_le_0_5", latencyBuckets[0]);
+        metric(stream, cluster, namespace, topic, subscription,
+                "pulsar_storage_write_latency_le_1", latencyBuckets[1]);
+        metric(stream, cluster, namespace, topic, subscription,
+                "pulsar_storage_write_latency_le_5", latencyBuckets[2]);
+        metric(stream, cluster, namespace, topic, subscription,
+                "pulsar_storage_write_latency_le_10", latencyBuckets[3]);
+        metric(stream, cluster, namespace, topic, subscription,
+                "pulsar_storage_write_latency_le_20", latencyBuckets[4]);
+        metric(stream, cluster, namespace, topic, subscription,
+                "pulsar_storage_write_latency_le_50", latencyBuckets[5]);
+        metric(stream, cluster, namespace, topic, subscription,
+                "pulsar_storage_write_latency_le_100", latencyBuckets[6]);
+        metric(stream, cluster, namespace, topic, subscription,
+                "pulsar_storage_write_latency_le_200", latencyBuckets[7]);
+        metric(stream, cluster, namespace, topic, subscription,
+                "pulsar_storage_write_latency_le_1000", latencyBuckets[8]);
+        metric(stream, cluster, namespace, topic, subscription,
+                "pulsar_storage_write_latency_overflow", latencyBuckets[9]);
+        metric(stream, cluster, namespace, topic, subscription, "pulsar_storage_write_latency_count",
+                stats.storageWriteLatencyBuckets.getCount());
+        metric(stream, cluster, namespace, topic, subscription, "pulsar_storage_write_latency_sum",
+                stats.storageWriteLatencyBuckets.getSum());
 
         stats.storageLedgerWriteLatencyBuckets.refresh();
-        long[] ledgerWriteLatencyBuckets = stats.storageLedgerWriteLatencyBuckets.getBuckets();
-        writeMetric(stream, "pulsar_storage_ledger_write_latency_le_0_5", ledgerWriteLatencyBuckets[0], cluster,
-                namespace, topic, subscription);
-        writeMetric(stream, "pulsar_storage_ledger_write_latency_le_1", ledgerWriteLatencyBuckets[1], cluster,
-                namespace, topic, subscription);
-        writeMetric(stream, "pulsar_storage_ledger_write_latency_le_5", ledgerWriteLatencyBuckets[2], cluster,
-                namespace, topic, subscription);
-        writeMetric(stream, "pulsar_storage_ledger_write_latency_le_10", ledgerWriteLatencyBuckets[3], cluster,
-                namespace, topic, subscription);
-        writeMetric(stream, "pulsar_storage_ledger_write_latency_le_20", ledgerWriteLatencyBuckets[4], cluster,
-                namespace, topic, subscription);
-        writeMetric(stream, "pulsar_storage_ledger_write_latency_le_50", ledgerWriteLatencyBuckets[5], cluster,
-                namespace, topic, subscription);
-        writeMetric(stream, "pulsar_storage_ledger_write_latency_le_100", ledgerWriteLatencyBuckets[6], cluster,
-                namespace, topic, subscription);
-        writeMetric(stream, "pulsar_storage_ledger_write_latency_le_200", ledgerWriteLatencyBuckets[7], cluster,
-                namespace, topic, subscription);
-        writeMetric(stream, "pulsar_storage_ledger_write_latency_le_1000", ledgerWriteLatencyBuckets[8], cluster,
-                namespace, topic, subscription);
-        writeMetric(stream, "pulsar_storage_ledger_write_latency_overflow", ledgerWriteLatencyBuckets[9], cluster,
-                namespace, topic, subscription);
-        writeMetric(stream, "pulsar_storage_ledger_write_latency_count",
-                stats.storageLedgerWriteLatencyBuckets.getCount(), cluster, namespace, topic, subscription);
-        writeMetric(stream, "pulsar_storage_ledger_write_latency_sum",
-                stats.storageLedgerWriteLatencyBuckets.getSum(), cluster, namespace, topic, subscription);
+        long[] ledgerWritelatencyBuckets = stats.storageLedgerWriteLatencyBuckets.getBuckets();
+        metric(stream, cluster, namespace, topic, subscription,
+                "pulsar_storage_ledger_write_latency_le_0_5", ledgerWritelatencyBuckets[0]);
+        metric(stream, cluster, namespace, topic, subscription,
+                "pulsar_storage_ledger_write_latency_le_1", ledgerWritelatencyBuckets[1]);
+        metric(stream, cluster, namespace, topic, subscription,
+                "pulsar_storage_ledger_write_latency_le_5", ledgerWritelatencyBuckets[2]);
+        metric(stream, cluster, namespace, topic, subscription,
+                "pulsar_storage_ledger_write_latency_le_10", ledgerWritelatencyBuckets[3]);
+        metric(stream, cluster, namespace, topic, subscription,
+                "pulsar_storage_ledger_write_latency_le_20", ledgerWritelatencyBuckets[4]);
+        metric(stream, cluster, namespace, topic, subscription,
+                "pulsar_storage_ledger_write_latency_le_50", ledgerWritelatencyBuckets[5]);
+        metric(stream, cluster, namespace, topic, subscription,
+                "pulsar_storage_ledger_write_latency_le_100", ledgerWritelatencyBuckets[6]);
+        metric(stream, cluster, namespace, topic, subscription,
+                "pulsar_storage_ledger_write_latency_le_200", ledgerWritelatencyBuckets[7]);
+        metric(stream, cluster, namespace, topic, subscription,
+                "pulsar_storage_ledger_write_latency_le_1000", ledgerWritelatencyBuckets[8]);
+        metric(stream, cluster, namespace, topic, subscription, "pulsar_storage_ledger_write_latency_overflow",
+                ledgerWritelatencyBuckets[9]);
+        metric(stream, cluster, namespace, topic, subscription, "pulsar_storage_ledger_write_latency_count",
+                stats.storageLedgerWriteLatencyBuckets.getCount());
+        metric(stream, cluster, namespace, topic, subscription, "pulsar_storage_ledger_write_latency_sum",
+                stats.storageLedgerWriteLatencyBuckets.getSum());
 
         stats.entrySizeBuckets.refresh();
         long[] entrySizeBuckets = stats.entrySizeBuckets.getBuckets();
-        writeMetric(stream, "pulsar_entry_size_le_128", entrySizeBuckets[0], cluster, namespace, topic,
-                subscription);
-        writeMetric(stream, "pulsar_entry_size_le_512", entrySizeBuckets[1], cluster, namespace, topic,
-                subscription);
-        writeMetric(stream, "pulsar_entry_size_le_1_kb", entrySizeBuckets[2], cluster, namespace, topic,
-                subscription);
-        writeMetric(stream, "pulsar_entry_size_le_2_kb", entrySizeBuckets[3], cluster, namespace, topic,
-                subscription);
-        writeMetric(stream, "pulsar_entry_size_le_4_kb", entrySizeBuckets[4], cluster, namespace, topic,
-                subscription);
-        writeMetric(stream, "pulsar_entry_size_le_16_kb", entrySizeBuckets[5], cluster, namespace, topic,
-                subscription);
-        writeMetric(stream, "pulsar_entry_size_le_100_kb", entrySizeBuckets[6], cluster, namespace, topic,
-                subscription);
-        writeMetric(stream, "pulsar_entry_size_le_1_mb", entrySizeBuckets[7], cluster, namespace, topic,
-                subscription);
-        writeMetric(stream, "pulsar_entry_size_le_overflow", entrySizeBuckets[8], cluster, namespace, topic,
-                subscription);
-        writeMetric(stream, "pulsar_entry_size_count", stats.entrySizeBuckets.getCount(), cluster, namespace,
-                topic, subscription);
-        writeMetric(stream, "pulsar_entry_size_sum", stats.entrySizeBuckets.getSum(), cluster, namespace, topic,
-                subscription);
+        metric(stream, cluster, namespace, topic, subscription, "pulsar_entry_size_le_128", entrySizeBuckets[0]);
+        metric(stream, cluster, namespace, topic, subscription, "pulsar_entry_size_le_512", entrySizeBuckets[1]);
+        metric(stream, cluster, namespace, topic, subscription, "pulsar_entry_size_le_1_kb", entrySizeBuckets[2]);
+        metric(stream, cluster, namespace, topic, subscription, "pulsar_entry_size_le_2_kb", entrySizeBuckets[3]);
+        metric(stream, cluster, namespace, topic, subscription, "pulsar_entry_size_le_4_kb", entrySizeBuckets[4]);
+        metric(stream, cluster, namespace, topic, subscription, "pulsar_entry_size_le_16_kb", entrySizeBuckets[5]);
+        metric(stream, cluster, namespace, topic, subscription,
+                "pulsar_entry_size_le_100_kb", entrySizeBuckets[6]);
+        metric(stream, cluster, namespace, topic, subscription, "pulsar_entry_size_le_1_mb", entrySizeBuckets[7]);
+        metric(stream, cluster, namespace, topic, subscription,
+                "pulsar_entry_size_le_overflow", entrySizeBuckets[8]);
+        metric(stream, cluster, namespace, topic, subscription,
+                "pulsar_entry_size_count", stats.entrySizeBuckets.getCount());
+        metric(stream, cluster, namespace, topic, subscription,
+                "pulsar_entry_size_sum", stats.entrySizeBuckets.getSum());
+    }
+
+    private static void metric(SimpleTextOutputStream stream, String cluster,
+                               String namespace, String topic, String subscription,
+                               String name, long value) {
+        stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace)
+                .write("\",topic=\"").write(topic).write("\",subscription=\"").write(subscription).write("\"} ");
+        stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
     }
 
-    static void printTransactionCoordinatorStats(PrometheusMetricStreams stream, String cluster,
+    static void printTransactionCoordinatorStats(SimpleTextOutputStream stream, String cluster,
                                                  AggregatedTransactionCoordinatorStats stats,
                                                  long coordinatorId) {
-        writeMetric(stream, "pulsar_txn_active_count", stats.actives, cluster,
-                coordinatorId);
-        writeMetric(stream, "pulsar_txn_committed_count", stats.committedCount, cluster,
-                coordinatorId);
-        writeMetric(stream, "pulsar_txn_aborted_count", stats.abortedCount, cluster,
-                coordinatorId);
-        writeMetric(stream, "pulsar_txn_created_count", stats.createdCount, cluster,
-                coordinatorId);
-        writeMetric(stream, "pulsar_txn_timeout_count", stats.timeoutCount, cluster,
-                coordinatorId);
-        writeMetric(stream, "pulsar_txn_append_log_count", stats.appendLogCount, cluster,
-                coordinatorId);
+        metric(stream, cluster, "pulsar_txn_active_count",
+                stats.actives, coordinatorId);
+        metric(stream, cluster, "pulsar_txn_committed_count",
+                stats.committedCount, coordinatorId);
+        metric(stream, cluster, "pulsar_txn_aborted_count",
+                stats.abortedCount, coordinatorId);
+        metric(stream, cluster, "pulsar_txn_created_count",
+                stats.createdCount, coordinatorId);
+        metric(stream, cluster, "pulsar_txn_timeout_count",
+                stats.timeoutCount, coordinatorId);
+        metric(stream, cluster, "pulsar_txn_append_log_count",
+                stats.appendLogCount, coordinatorId);
         long[] latencyBuckets = stats.executionLatency;
-        writeMetric(stream, "pulsar_txn_execution_latency_le_10", latencyBuckets[0], cluster, coordinatorId);
-        writeMetric(stream, "pulsar_txn_execution_latency_le_20", latencyBuckets[1], cluster, coordinatorId);
-        writeMetric(stream, "pulsar_txn_execution_latency_le_50", latencyBuckets[2], cluster, coordinatorId);
-        writeMetric(stream, "pulsar_txn_execution_latency_le_100", latencyBuckets[3], cluster, coordinatorId);
-        writeMetric(stream, "pulsar_txn_execution_latency_le_500", latencyBuckets[4], cluster, coordinatorId);
-        writeMetric(stream, "pulsar_txn_execution_latency_le_1000", latencyBuckets[5], cluster, coordinatorId);
-        writeMetric(stream, "pulsar_txn_execution_latency_le_5000", latencyBuckets[6], cluster, coordinatorId);
-        writeMetric(stream, "pulsar_txn_execution_latency_le_15000", latencyBuckets[7], cluster, coordinatorId);
-        writeMetric(stream, "pulsar_txn_execution_latency_le_30000", latencyBuckets[8], cluster, coordinatorId);
-        writeMetric(stream, "pulsar_txn_execution_latency_le_60000", latencyBuckets[9], cluster, coordinatorId);
-        writeMetric(stream, "pulsar_txn_execution_latency_le_300000", latencyBuckets[10], cluster,
-                coordinatorId);
-        writeMetric(stream, "pulsar_txn_execution_latency_le_1500000", latencyBuckets[11], cluster,
-                coordinatorId);
-        writeMetric(stream, "pulsar_txn_execution_latency_le_3000000", latencyBuckets[12], cluster,
-                coordinatorId);
-        writeMetric(stream, "pulsar_txn_execution_latency_le_overflow", latencyBuckets[13], cluster,
-                coordinatorId);
-    }
-
-    private static void writeMetric(PrometheusMetricStreams stream, String metricName, double value, String cluster,
-                                    long coordinatorId) {
-        stream.writeSample(metricName, value, "cluster", cluster, "coordinator_id", String.valueOf(coordinatorId));
-    }
-
-    private static void writeMetric(PrometheusMetricStreams stream, String metricName, Number value, String cluster,
-                                    String namespace, String topic, String subscription) {
-        stream.writeSample(metricName, value, "cluster", cluster, "namespace", namespace, "topic", topic,
-                "subscription", subscription);
+        metric(stream, cluster, "pulsar_txn_execution_latency_le_10", latencyBuckets[0], coordinatorId);
+        metric(stream, cluster, "pulsar_txn_execution_latency_le_20", latencyBuckets[1], coordinatorId);
+        metric(stream, cluster, "pulsar_txn_execution_latency_le_50", latencyBuckets[2], coordinatorId);
+        metric(stream, cluster, "pulsar_txn_execution_latency_le_100", latencyBuckets[3], coordinatorId);
+        metric(stream, cluster, "pulsar_txn_execution_latency_le_500", latencyBuckets[4], coordinatorId);
+        metric(stream, cluster, "pulsar_txn_execution_latency_le_1000", latencyBuckets[5], coordinatorId);
+        metric(stream, cluster, "pulsar_txn_execution_latency_le_5000", latencyBuckets[6], coordinatorId);
+        metric(stream, cluster, "pulsar_txn_execution_latency_le_15000", latencyBuckets[7], coordinatorId);
+        metric(stream, cluster, "pulsar_txn_execution_latency_le_30000", latencyBuckets[8], coordinatorId);
+        metric(stream, cluster, "pulsar_txn_execution_latency_le_60000", latencyBuckets[9], coordinatorId);
+        metric(stream, cluster, "pulsar_txn_execution_latency_le_300000",
+                latencyBuckets[10], coordinatorId);
+        metric(stream, cluster, "pulsar_txn_execution_latency_le_1500000",
+                latencyBuckets[11], coordinatorId);
+        metric(stream, cluster, "pulsar_txn_execution_latency_le_3000000",
+                latencyBuckets[12], coordinatorId);
+        metric(stream, cluster, "pulsar_txn_execution_latency_le_overflow",
+                latencyBuckets[13], coordinatorId);
     }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusTextFormatUtil.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusTextFormatUtil.java
index 8f704b11e76..7550096c2b5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusTextFormatUtil.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusTextFormatUtil.java
@@ -18,8 +18,13 @@
  */
 package org.apache.pulsar.broker.stats.prometheus.metrics;
 
+import io.prometheus.client.Collector;
+import io.prometheus.client.Collector.MetricFamilySamples;
+import io.prometheus.client.Collector.MetricFamilySamples.Sample;
+import io.prometheus.client.CollectorRegistry;
 import java.io.IOException;
 import java.io.Writer;
+import java.util.Enumeration;
 import org.apache.bookkeeper.stats.Counter;
 
 /**
@@ -135,4 +140,31 @@ public class PrometheusTextFormatUtil {
                 .append(success.toString()).append("\"} ")
                 .append(Double.toString(opStat.getSum(success))).append('\n');
     }
+
+    public static void writeMetricsCollectedByPrometheusClient(Writer w, CollectorRegistry registry)
+            throws IOException {
+        Enumeration<MetricFamilySamples> metricFamilySamples = registry.metricFamilySamples();
+        while (metricFamilySamples.hasMoreElements()) {
+            MetricFamilySamples metricFamily = metricFamilySamples.nextElement();
+
+            for (int i = 0; i < metricFamily.samples.size(); i++) {
+                Sample sample = metricFamily.samples.get(i);
+                w.write(sample.name);
+                w.write('{');
+                for (int j = 0; j < sample.labelNames.size(); j++) {
+                    if (j != 0) {
+                        w.write(", ");
+                    }
+                    w.write(sample.labelNames.get(j));
+                    w.write("=\"");
+                    w.write(sample.labelValues.get(j));
+                    w.write('"');
+                }
+
+                w.write("} ");
+                w.write(Collector.doubleToGoString(sample.value));
+                w.write('\n');
+            }
+        }
+    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index be10c8dc65e..f28412ea751 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -50,7 +50,6 @@ import java.util.UUID;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import javax.crypto.SecretKey;
@@ -1405,63 +1404,6 @@ public class PrometheusMetricsTest extends BrokerTestBase {
         assertEquals(cm.get(0).value, count);
     }
 
-    @Test
-    public void testMetricsGroupedByTypeDefinitions() throws Exception {
-        Producer<byte[]> p1 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();
-        Producer<byte[]> p2 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic2").create();
-        for (int i = 0; i < 10; i++) {
-            String message = "my-message-" + i;
-            p1.send(message.getBytes());
-            p2.send(message.getBytes());
-        }
-
-        ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, false, false, false, statsOut);
-        String metricsStr = statsOut.toString();
-
-        Pattern typePattern = Pattern.compile("^#\\s+TYPE\\s+(\\w+)\\s+(\\w+)");
-        Pattern metricNamePattern = Pattern.compile("^(\\w+)\\{.+");
-
-        AtomicReference<String> currentMetric = new AtomicReference<>();
-        Splitter.on("\n").split(metricsStr).forEach(line -> {
-            if (line.isEmpty()) {
-                return;
-            }
-            if (line.startsWith("#")) {
-                // Get the current type definition
-                Matcher typeMatcher = typePattern.matcher(line);
-                checkArgument(typeMatcher.matches());
-                String metricName = typeMatcher.group(1);
-                currentMetric.set(metricName);
-            } else {
-                Matcher metricMatcher = metricNamePattern.matcher(line);
-                checkArgument(metricMatcher.matches());
-                String metricName = metricMatcher.group(1);
-
-                if (metricName.endsWith("_bucket")) {
-                    metricName = metricName.substring(0, metricName.indexOf("_bucket"));
-                } else if (metricName.endsWith("_count") && !currentMetric.get().endsWith("_count")) {
-                    metricName = metricName.substring(0, metricName.indexOf("_count"));
-                } else if (metricName.endsWith("_sum") && !currentMetric.get().endsWith("_sum")) {
-                    metricName = metricName.substring(0, metricName.indexOf("_sum"));
-                } else if (metricName.endsWith("_total") && !currentMetric.get().endsWith("_total")) {
-                    metricName = metricName.substring(0, metricName.indexOf("_total"));
-                } else if (metricName.endsWith("_created") && !currentMetric.get().endsWith("_created")) {
-                    metricName = metricName.substring(0, metricName.indexOf("_created"));
-                }
-
-                if (!metricName.equals(currentMetric.get())) {
-                    System.out.println(metricsStr);
-                    fail("Metric not grouped under its type definition: " + line);
-                }
-
-            }
-        });
-
-        p1.close();
-        p2.close();
-    }
-
     /**
      * Hacky parsing of Prometheus text format. Should be good enough for unit tests
      */
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricStreamsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricStreamsTest.java
deleted file mode 100644
index 15c29a0dc66..00000000000
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricStreamsTest.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker.stats.prometheus;
-
-import static org.testng.Assert.assertTrue;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
-import java.io.ByteArrayOutputStream;
-import java.nio.charset.StandardCharsets;
-import org.apache.pulsar.common.util.SimpleTextOutputStream;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-@Test(groups = "broker")
-public class PrometheusMetricStreamsTest {
-
-    private PrometheusMetricStreams underTest;
-
-    @BeforeMethod(alwaysRun = true)
-    protected void setup() throws Exception {
-        underTest = new PrometheusMetricStreams();
-    }
-
-    @AfterMethod(alwaysRun = true)
-    protected void cleanup() throws Exception {
-        underTest.releaseAll();
-    }
-
-    @Test
-    public void canWriteSampleWithoutLabels() {
-        underTest.writeSample("my-metric", 123);
-
-        String actual = writeToString();
-
-        assertTrue(actual.startsWith("# TYPE my-metric gauge"), "Gauge type line missing");
-        assertTrue(actual.contains("my-metric{} 123"), "Metric line missing");
-    }
-
-    @Test
-    public void canWriteSampleWithLabels() {
-        underTest.writeSample("my-other-metric", 123, "cluster", "local");
-        underTest.writeSample("my-other-metric", 456, "cluster", "local", "namespace", "my-ns");
-
-        String actual = writeToString();
-
-        assertTrue(actual.startsWith("# TYPE my-other-metric gauge"), "Gauge type line missing");
-        assertTrue(actual.contains("my-other-metric{cluster=\"local\"} 123"), "Cluster metric line missing");
-        assertTrue(actual.contains("my-other-metric{cluster=\"local\",namespace=\"my-ns\"} 456"),
-                "Cluster and Namespace metric line missing");
-    }
-
-    private String writeToString() {
-        ByteBuf buffer = ByteBufAllocator.DEFAULT.directBuffer();
-        try {
-            SimpleTextOutputStream stream = new SimpleTextOutputStream(buffer);
-            underTest.flushAllToStream(stream);
-            ByteArrayOutputStream out = new ByteArrayOutputStream();
-            int readIndex = buffer.readerIndex();
-            int readableBytes = buffer.readableBytes();
-            for (int i = 0; i < readableBytes; i++) {
-                out.write(buffer.getByte(readIndex + i));
-            }
-            return out.toString();
-        } finally {
-            buffer.release();
-        }
-    }
-}
\ No newline at end of file
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/SimpleTextOutputStream.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/SimpleTextOutputStream.java
index dd78b4cfe58..9fc4b347c85 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/SimpleTextOutputStream.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/SimpleTextOutputStream.java
@@ -22,11 +22,12 @@ import io.netty.buffer.ByteBuf;
 
 /**
  * Format strings and numbers into a ByteBuf without any memory allocation.
+ *
  */
 public class SimpleTextOutputStream {
     private final ByteBuf buffer;
-    private static final char[] hexChars = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e',
-            'f'};
+    private static final char[] hexChars = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e',
+            'f' };
 
     public SimpleTextOutputStream(ByteBuf buffer) {
         this.buffer = buffer;
@@ -130,12 +131,4 @@ public class SimpleTextOutputStream {
         write(r);
         return this;
     }
-
-    public void write(ByteBuf byteBuf) {
-        buffer.writeBytes(byteBuf);
-    }
-
-    public ByteBuf getBuffer() {
-        return buffer;
-    }
 }
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/PrometheusTextFormat.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/PrometheusTextFormat.java
index f5aa273f656..f7a205c7db0 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/PrometheusTextFormat.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/PrometheusTextFormat.java
@@ -37,11 +37,6 @@ public class PrometheusTextFormat {
          */
         while (mfs.hasMoreElements()) {
             Collector.MetricFamilySamples metricFamilySamples = mfs.nextElement();
-            writer.write("# TYPE ");
-            writer.write(metricFamilySamples.name);
-            writer.write(' ');
-            writer.write(metricFamilySamples.type.name().toLowerCase());
-            writer.write('\n');
             for (Collector.MetricFamilySamples.Sample sample : metricFamilySamples.samples) {
                 writer.write(sample.name);
                 if (sample.labelNames.size() > 0) {
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerStatsManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerStatsManager.java
index 2ad407b2e5e..c8b411cbf57 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerStatsManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerStatsManager.java
@@ -328,11 +328,6 @@ public class WorkerStatsManager {
   }
 
   private void writeMetric(String metricName, long value, StringWriter stream) {
-    stream.write("# TYPE ");
-    stream.write(PULSAR_FUNCTION_WORKER_METRICS_PREFIX);
-    stream.write(metricName);
-    stream.write(" gauge \n");
-
     stream.write(PULSAR_FUNCTION_WORKER_METRICS_PREFIX);
     stream.write(metricName);
     stream.write("{");