You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/10/06 05:12:48 UTC

[pulsar] 05/12: [Metrics] Add support for splitting topic and partition label in Prometheus (#12225)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 5fad80cd5e3bb19850738e05fe764f18516def80
Author: lipenghui <pe...@apache.org>
AuthorDate: Thu Sep 30 11:56:09 2021 +0800

    [Metrics] Add support for splitting topic and partition label in Prometheus (#12225)
    
    * [Metrics] Add support for splitting topic and partition label in Prometheus
    
    Fix: #11432
    
    Currently, we are only expose the partition name for the topic label in Prometheus metrics, which is difficult to
    have an aggregated metrics for a partitioned topic.
    
    Before this change, we can only get (topic=xxx-partition-0) in the metrics. After this change, we can get 2 labels (topic=xxx, partition=0).
    By default, the broker expose the single tag for topic. It need to change `splitTopicAndPartitionLabelInPrometheus=true` in the broker.conf
    
    New tests added.
    
    * Fix checkstyle.
    
    (cherry picked from commit 039079e850e7756c18929119ec215fff8ada643d)
---
 conf/broker.conf                                   |   9 +
 conf/standalone.conf                               |   9 +
 .../apache/pulsar/broker/ServiceConfiguration.java |  11 +
 .../org/apache/pulsar/broker/PulsarService.java    |   3 +-
 .../stats/prometheus/NamespaceStatsAggregator.java |   5 +-
 .../prometheus/PrometheusMetricsGenerator.java     |  14 +-
 .../stats/prometheus/PrometheusMetricsServlet.java |   7 +-
 .../pulsar/broker/stats/prometheus/TopicStats.java | 352 ++++++++++++---------
 .../pulsar/broker/stats/PrometheusMetricsTest.java |  42 +++
 9 files changed, 303 insertions(+), 149 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 775f9aa..d81e86c 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -1172,6 +1172,15 @@ statsUpdateInitialDelayInSecs=60
 # Default is false.
 exposePreciseBacklogInPrometheus=false
 
+# Enable splitting topic and partition label in Prometheus.
+# If enabled, a topic name will split into 2 parts, one is topic name without partition index,
+# another one is partition index, e.g. (topic=xxx, partition=0).
+# If the topic is a non-partitioned topic, -1 will be used for the partition index.
+# If disabled, one label to represent the topic and partition, e.g. (topic=xxx-partition-0)
+# Default is false.
+
+splitTopicAndPartitionLabelInPrometheus=false
+
 ### --- Schema storage --- ###
 # The schema storage implementation used by this broker
 schemaRegistryStorageClassName=org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorageFactory
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 363ce3b..e13723f 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -855,6 +855,15 @@ exposePublisherStats=true
 # Default is false.
 exposePreciseBacklogInPrometheus=false
 
+# Enable splitting topic and partition label in Prometheus.
+# If enabled, a topic name will split into 2 parts, one is topic name without partition index,
+# another one is partition index, e.g. (topic=xxx, partition=0).
+# If the topic is a non-partitioned topic, -1 will be used for the partition index.
+# If disabled, one label to represent the topic and partition, e.g. (topic=xxx-partition-0)
+# Default is false.
+
+splitTopicAndPartitionLabelInPrometheus=false
+
 ### --- Deprecated config variables --- ###
 
 # Deprecated. Use configurationStoreServers
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index a223b58..c9b0c48 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1995,6 +1995,17 @@ public class ServiceConfiguration implements PulsarConfiguration {
     )
     private boolean exposeSubscriptionBacklogSizeInPrometheus = false;
 
+    @FieldContext(
+            category = CATEGORY_METRICS,
+            doc = "Enable splitting topic and partition label in Prometheus.\n" +
+                    " If enabled, a topic name will split into 2 parts, one is topic name without partition index,\n" +
+                    " another one is partition index, e.g. (topic=xxx, partition=0).\n" +
+                    " If the topic is a non-partitioned topic, -1 will be used for the partition index.\n" +
+                    " If disabled, one label to represent the topic and partition, e.g. (topic=xxx-partition-0)\n" +
+                    " Default is false."
+    )
+    private boolean splitTopicAndPartitionLabelInPrometheus = false;
+
     /**** --- Functions --- ****/
     @FieldContext(
         category = CATEGORY_FUNCTIONS,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index f3eae1a..23f50b6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -680,7 +680,8 @@ public class PulsarService implements AutoCloseable {
             this.metricsServlet = new PrometheusMetricsServlet(
                     this, config.isExposeTopicLevelMetricsInPrometheus(),
                     config.isExposeConsumerLevelMetricsInPrometheus(),
-                    config.isExposeProducerLevelMetricsInPrometheus());
+                    config.isExposeProducerLevelMetricsInPrometheus(),
+                    config.isSplitTopicAndPartitionLabelInPrometheus());
             if (pendingMetricsProviders != null) {
                 pendingMetricsProviders.forEach(provider -> metricsServlet.addRawMetricsProvider(provider));
                 this.pendingMetricsProviders = null;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
index a2661ed..9df9ad4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
@@ -57,7 +57,7 @@ public class NamespaceStatsAggregator {
     };
 
     public static void generate(PulsarService pulsar, boolean includeTopicMetrics, boolean includeConsumerMetrics,
-           boolean includeProducerMetrics, SimpleTextOutputStream stream) {
+           boolean includeProducerMetrics, boolean splitTopicAndPartitionIndexLabel, SimpleTextOutputStream stream) {
         String cluster = pulsar.getConfiguration().getClusterName();
         AggregatedNamespaceStats namespaceStats = localNamespaceStats.get();
         TopicStats.resetTypes();
@@ -81,7 +81,8 @@ public class NamespaceStatsAggregator {
 
                     if (includeTopicMetrics) {
                         topicsCount.add(1);
-                        TopicStats.printTopicStats(stream, cluster, namespace, name, topicStats, compactorMXBean);
+                        TopicStats.printTopicStats(stream, cluster, namespace, name, topicStats, compactorMXBean,
+                                splitTopicAndPartitionIndexLabel);
                     } else {
                         namespaceStats.updateStats(topicStats);
                     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
index 8f303a3..9d5e1c7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
@@ -87,11 +87,19 @@ public class PrometheusMetricsGenerator {
 
     public static void generate(PulsarService pulsar, boolean includeTopicMetrics, boolean includeConsumerMetrics,
         boolean includeProducerMetrics, OutputStream out) throws IOException {
-        generate(pulsar, includeTopicMetrics, includeConsumerMetrics, includeProducerMetrics, out, null);
+        generate(pulsar, includeTopicMetrics, includeConsumerMetrics, includeProducerMetrics, false, out, null);
     }
 
     public static void generate(PulsarService pulsar, boolean includeTopicMetrics, boolean includeConsumerMetrics,
-        boolean includeProducerMetrics, OutputStream out, List<PrometheusRawMetricsProvider> metricsProviders)
+        boolean includeProducerMetrics, boolean splitTopicAndPartitionIndexLabel,
+        OutputStream out) throws IOException {
+        generate(pulsar, includeTopicMetrics, includeConsumerMetrics, includeProducerMetrics,
+                splitTopicAndPartitionIndexLabel, out, null);
+    }
+
+    public static void generate(PulsarService pulsar, boolean includeTopicMetrics, boolean includeConsumerMetrics,
+        boolean includeProducerMetrics, boolean splitTopicAndPartitionIndexLabel, OutputStream out,
+        List<PrometheusRawMetricsProvider> metricsProviders)
         throws IOException {
         ByteBuf buf = ByteBufAllocator.DEFAULT.heapBuffer();
         try {
@@ -100,7 +108,7 @@ public class PrometheusMetricsGenerator {
             generateSystemMetrics(stream, pulsar.getConfiguration().getClusterName());
 
             NamespaceStatsAggregator.generate(pulsar, includeTopicMetrics, includeConsumerMetrics,
-                    includeProducerMetrics, stream);
+                    includeProducerMetrics, splitTopicAndPartitionIndexLabel, stream);
 
             if (pulsar.getWorkerServiceOpt().isPresent()) {
                 pulsar.getWorkerService().generateFunctionsStats(stream);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java
index 026c26f..145f7a7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java
@@ -44,17 +44,19 @@ public class PrometheusMetricsServlet extends HttpServlet {
     private final boolean shouldExportConsumerMetrics;
     private final boolean shouldExportProducerMetrics;
     private final long metricsServletTimeoutMs;
+    private final boolean splitTopicAndPartitionLabel;
     private List<PrometheusRawMetricsProvider> metricsProviders;
 
     private ExecutorService executor = null;
 
     public PrometheusMetricsServlet(PulsarService pulsar, boolean includeTopicMetrics, boolean includeConsumerMetrics,
-                                    boolean shouldExportProducerMetrics) {
+                                    boolean shouldExportProducerMetrics, boolean splitTopicAndPartitionLabel) {
         this.pulsar = pulsar;
         this.shouldExportTopicMetrics = includeTopicMetrics;
         this.shouldExportConsumerMetrics = includeConsumerMetrics;
         this.shouldExportProducerMetrics = shouldExportProducerMetrics;
         this.metricsServletTimeoutMs = pulsar.getConfiguration().getMetricsServletTimeoutMs();
+        this.splitTopicAndPartitionLabel = splitTopicAndPartitionLabel;
     }
 
     @Override
@@ -73,7 +75,8 @@ public class PrometheusMetricsServlet extends HttpServlet {
                 res.setStatus(HttpStatus.OK_200);
                 res.setContentType("text/plain");
                 PrometheusMetricsGenerator.generate(pulsar, shouldExportTopicMetrics, shouldExportConsumerMetrics,
-                        shouldExportProducerMetrics, res.getOutputStream(), metricsProviders);
+                        shouldExportProducerMetrics, splitTopicAndPartitionLabel, res.getOutputStream(),
+                        metricsProviders);
                 context.complete();
 
             } catch (Exception e) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
index 61e0175..bfa427e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.stats.prometheus;
 
+import static org.apache.pulsar.common.naming.TopicName.PARTITIONED_TOPIC_SUFFIX;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
@@ -104,222 +105,263 @@ class TopicStats {
     }
 
     static void printTopicStats(SimpleTextOutputStream stream, String cluster, String namespace, String topic,
-                                TopicStats stats, Optional<CompactorMXBean> compactorMXBean) {
-        metric(stream, cluster, namespace, topic, "pulsar_subscriptions_count", stats.subscriptionsCount);
-        metric(stream, cluster, namespace, topic, "pulsar_producers_count", stats.producersCount);
-        metric(stream, cluster, namespace, topic, "pulsar_consumers_count", stats.consumersCount);
-
-        metric(stream, cluster, namespace, topic, "pulsar_rate_in", stats.rateIn);
-        metric(stream, cluster, namespace, topic, "pulsar_rate_out", stats.rateOut);
-        metric(stream, cluster, namespace, topic, "pulsar_throughput_in", stats.throughputIn);
-        metric(stream, cluster, namespace, topic, "pulsar_throughput_out", stats.throughputOut);
-        metric(stream, cluster, namespace, topic, "pulsar_average_msg_size", stats.averageMsgSize);
-
-        metric(stream, cluster, namespace, topic, "pulsar_storage_size", stats.managedLedgerStats.storageSize);
+                                TopicStats stats, Optional<CompactorMXBean> compactorMXBean,
+                                boolean splitTopicAndPartitionIndexLabel) {
+        metric(stream, cluster, namespace, topic, "pulsar_subscriptions_count", stats.subscriptionsCount,
+                splitTopicAndPartitionIndexLabel);
+        metric(stream, cluster, namespace, topic, "pulsar_producers_count", stats.producersCount,
+                splitTopicAndPartitionIndexLabel);
+        metric(stream, cluster, namespace, topic, "pulsar_consumers_count", stats.consumersCount,
+                splitTopicAndPartitionIndexLabel);
+
+        metric(stream, cluster, namespace, topic, "pulsar_rate_in", stats.rateIn,
+                splitTopicAndPartitionIndexLabel);
+        metric(stream, cluster, namespace, topic, "pulsar_rate_out", stats.rateOut,
+                splitTopicAndPartitionIndexLabel);
+        metric(stream, cluster, namespace, topic, "pulsar_throughput_in", stats.throughputIn,
+                splitTopicAndPartitionIndexLabel);
+        metric(stream, cluster, namespace, topic, "pulsar_throughput_out", stats.throughputOut,
+                splitTopicAndPartitionIndexLabel);
+        metric(stream, cluster, namespace, topic, "pulsar_average_msg_size", stats.averageMsgSize,
+                splitTopicAndPartitionIndexLabel);
+
+        metric(stream, cluster, namespace, topic, "pulsar_storage_size", stats.managedLedgerStats.storageSize,
+                splitTopicAndPartitionIndexLabel);
         metric(stream, cluster, namespace, topic, "pulsar_storage_logical_size",
-                stats.managedLedgerStats.storageLogicalSize);
-        metric(stream, cluster, namespace, topic, "pulsar_msg_backlog", stats.msgBacklog);
+                stats.managedLedgerStats.storageLogicalSize, splitTopicAndPartitionIndexLabel);
+        metric(stream, cluster, namespace, topic, "pulsar_msg_backlog", stats.msgBacklog,
+                splitTopicAndPartitionIndexLabel);
         metric(stream, cluster, namespace, topic, "pulsar_storage_backlog_size",
-                stats.managedLedgerStats.backlogSize);
+                stats.managedLedgerStats.backlogSize, splitTopicAndPartitionIndexLabel);
         metric(stream, cluster, namespace, topic, "pulsar_storage_offloaded_size", stats.managedLedgerStats
-                .offloadedStorageUsed);
-        metric(stream, cluster, namespace, topic, "pulsar_storage_backlog_quota_limit", stats.backlogQuotaLimit);
+                .offloadedStorageUsed, splitTopicAndPartitionIndexLabel);
+        metric(stream, cluster, namespace, topic, "pulsar_storage_backlog_quota_limit", stats.backlogQuotaLimit,
+                splitTopicAndPartitionIndexLabel);
         metric(stream, cluster, namespace, topic, "pulsar_storage_backlog_quota_limit_time",
-                stats.backlogQuotaLimitTime);
+                stats.backlogQuotaLimitTime, splitTopicAndPartitionIndexLabel);
 
         long[] latencyBuckets = stats.managedLedgerStats.storageWriteLatencyBuckets.getBuckets();
-        metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_0_5", latencyBuckets[0]);
-        metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_1", latencyBuckets[1]);
-        metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_5", latencyBuckets[2]);
-        metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_10", latencyBuckets[3]);
-        metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_20", latencyBuckets[4]);
-        metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_50", latencyBuckets[5]);
-        metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_100", latencyBuckets[6]);
-        metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_200", latencyBuckets[7]);
-        metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_1000", latencyBuckets[8]);
-        metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_overflow", latencyBuckets[9]);
+        metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_0_5", latencyBuckets[0],
+                splitTopicAndPartitionIndexLabel);
+        metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_1", latencyBuckets[1],
+                splitTopicAndPartitionIndexLabel);
+        metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_5", latencyBuckets[2],
+                splitTopicAndPartitionIndexLabel);
+        metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_10", latencyBuckets[3],
+                splitTopicAndPartitionIndexLabel);
+        metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_20", latencyBuckets[4],
+                splitTopicAndPartitionIndexLabel);
+        metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_50", latencyBuckets[5],
+                splitTopicAndPartitionIndexLabel);
+        metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_100", latencyBuckets[6],
+                splitTopicAndPartitionIndexLabel);
+        metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_200", latencyBuckets[7],
+                splitTopicAndPartitionIndexLabel);
+        metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_1000", latencyBuckets[8],
+                splitTopicAndPartitionIndexLabel);
+        metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_overflow", latencyBuckets[9],
+                splitTopicAndPartitionIndexLabel);
         metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_count",
-                stats.managedLedgerStats.storageWriteLatencyBuckets.getCount());
+                stats.managedLedgerStats.storageWriteLatencyBuckets.getCount(), splitTopicAndPartitionIndexLabel);
         metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_sum",
-                stats.managedLedgerStats.storageWriteLatencyBuckets.getSum());
+                stats.managedLedgerStats.storageWriteLatencyBuckets.getSum(), splitTopicAndPartitionIndexLabel);
 
         long[] ledgerWriteLatencyBuckets = stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets();
         metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_0_5",
-                ledgerWriteLatencyBuckets[0]);
+                ledgerWriteLatencyBuckets[0], splitTopicAndPartitionIndexLabel);
         metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_1",
-                ledgerWriteLatencyBuckets[1]);
+                ledgerWriteLatencyBuckets[1], splitTopicAndPartitionIndexLabel);
         metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_5",
-                ledgerWriteLatencyBuckets[2]);
+                ledgerWriteLatencyBuckets[2], splitTopicAndPartitionIndexLabel);
         metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_10",
-                ledgerWriteLatencyBuckets[3]);
+                ledgerWriteLatencyBuckets[3], splitTopicAndPartitionIndexLabel);
         metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_20",
-                ledgerWriteLatencyBuckets[4]);
+                ledgerWriteLatencyBuckets[4], splitTopicAndPartitionIndexLabel);
         metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_50",
-                ledgerWriteLatencyBuckets[5]);
+                ledgerWriteLatencyBuckets[5], splitTopicAndPartitionIndexLabel);
         metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_100",
-                ledgerWriteLatencyBuckets[6]);
+                ledgerWriteLatencyBuckets[6], splitTopicAndPartitionIndexLabel);
         metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_200",
-                ledgerWriteLatencyBuckets[7]);
+                ledgerWriteLatencyBuckets[7], splitTopicAndPartitionIndexLabel);
         metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_1000",
-                ledgerWriteLatencyBuckets[8]);
+                ledgerWriteLatencyBuckets[8], splitTopicAndPartitionIndexLabel);
         metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_overflow",
-                ledgerWriteLatencyBuckets[9]);
+                ledgerWriteLatencyBuckets[9], splitTopicAndPartitionIndexLabel);
         metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_count",
-                stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getCount());
+                stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getCount(),
+                splitTopicAndPartitionIndexLabel);
         metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_sum",
-                stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getSum());
+                stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getSum(),
+                splitTopicAndPartitionIndexLabel);
 
         long[] entrySizeBuckets = stats.managedLedgerStats.entrySizeBuckets.getBuckets();
-        metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_128", entrySizeBuckets[0]);
-        metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_512", entrySizeBuckets[1]);
-        metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_1_kb", entrySizeBuckets[2]);
-        metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_2_kb", entrySizeBuckets[3]);
-        metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_4_kb", entrySizeBuckets[4]);
-        metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_16_kb", entrySizeBuckets[5]);
-        metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_100_kb", entrySizeBuckets[6]);
-        metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_1_mb", entrySizeBuckets[7]);
-        metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_overflow", entrySizeBuckets[8]);
+        metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_128", entrySizeBuckets[0],
+                splitTopicAndPartitionIndexLabel);
+        metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_512", entrySizeBuckets[1],
+                splitTopicAndPartitionIndexLabel);
+        metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_1_kb", entrySizeBuckets[2],
+                splitTopicAndPartitionIndexLabel);
+        metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_2_kb", entrySizeBuckets[3],
+                splitTopicAndPartitionIndexLabel);
+        metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_4_kb", entrySizeBuckets[4],
+                splitTopicAndPartitionIndexLabel);
+        metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_16_kb", entrySizeBuckets[5],
+                splitTopicAndPartitionIndexLabel);
+        metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_100_kb", entrySizeBuckets[6],
+                splitTopicAndPartitionIndexLabel);
+        metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_1_mb", entrySizeBuckets[7],
+                splitTopicAndPartitionIndexLabel);
+        metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_overflow", entrySizeBuckets[8],
+                splitTopicAndPartitionIndexLabel);
         metric(stream, cluster, namespace, topic, "pulsar_entry_size_count",
-                stats.managedLedgerStats.entrySizeBuckets.getCount());
+                stats.managedLedgerStats.entrySizeBuckets.getCount(), splitTopicAndPartitionIndexLabel);
         metric(stream, cluster, namespace, topic, "pulsar_entry_size_sum",
-                stats.managedLedgerStats.entrySizeBuckets.getSum());
+                stats.managedLedgerStats.entrySizeBuckets.getSum(), splitTopicAndPartitionIndexLabel);
 
         stats.producerStats.forEach((p, producerStats) -> {
             metric(stream, cluster, namespace, topic, p, producerStats.producerId, "pulsar_producer_msg_rate_in",
-                    producerStats.msgRateIn);
+                    producerStats.msgRateIn, splitTopicAndPartitionIndexLabel);
             metric(stream, cluster, namespace, topic, p, producerStats.producerId, "pulsar_producer_msg_throughput_in",
-                    producerStats.msgThroughputIn);
+                    producerStats.msgThroughputIn, splitTopicAndPartitionIndexLabel);
             metric(stream, cluster, namespace, topic, p, producerStats.producerId, "pulsar_producer_msg_average_Size",
-                    producerStats.averageMsgSize);
+                    producerStats.averageMsgSize, splitTopicAndPartitionIndexLabel);
         });
 
         stats.subscriptionStats.forEach((n, subsStats) -> {
             metric(stream, cluster, namespace, topic, n, "pulsar_subscription_back_log",
-                    subsStats.msgBacklog);
+                    subsStats.msgBacklog, splitTopicAndPartitionIndexLabel);
             metric(stream, cluster, namespace, topic, n, "pulsar_subscription_back_log_no_delayed",
-                    subsStats.msgBacklogNoDelayed);
+                    subsStats.msgBacklogNoDelayed, splitTopicAndPartitionIndexLabel);
             metric(stream, cluster, namespace, topic, n, "pulsar_subscription_delayed",
-                    subsStats.msgDelayed);
+                    subsStats.msgDelayed, splitTopicAndPartitionIndexLabel);
             metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_rate_redeliver",
-                    subsStats.msgRateRedeliver);
+                    subsStats.msgRateRedeliver, splitTopicAndPartitionIndexLabel);
             metric(stream, cluster, namespace, topic, n, "pulsar_subscription_unacked_messages",
-                    subsStats.unackedMessages);
+                    subsStats.unackedMessages, splitTopicAndPartitionIndexLabel);
             metric(stream, cluster, namespace, topic, n, "pulsar_subscription_blocked_on_unacked_messages",
-                    subsStats.blockedSubscriptionOnUnackedMsgs ? 1 : 0);
+                    subsStats.blockedSubscriptionOnUnackedMsgs ? 1 : 0, splitTopicAndPartitionIndexLabel);
             metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_rate_out",
-                    subsStats.msgRateOut);
+                    subsStats.msgRateOut, splitTopicAndPartitionIndexLabel);
             metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_throughput_out",
-                    subsStats.msgThroughputOut);
+                    subsStats.msgThroughputOut, splitTopicAndPartitionIndexLabel);
             metric(stream, cluster, namespace, topic, n, "pulsar_out_bytes_total",
-                    subsStats.bytesOutCounter);
+                    subsStats.bytesOutCounter, splitTopicAndPartitionIndexLabel);
             metric(stream, cluster, namespace, topic, n, "pulsar_out_messages_total",
-                    subsStats.msgOutCounter);
+                    subsStats.msgOutCounter, splitTopicAndPartitionIndexLabel);
             metric(stream, cluster, namespace, topic, n, "pulsar_subscription_last_expire_timestamp",
-                    subsStats.lastExpireTimestamp);
+                    subsStats.lastExpireTimestamp, splitTopicAndPartitionIndexLabel);
             metric(stream, cluster, namespace, topic, n, "pulsar_subscription_last_acked_timestamp",
-                subsStats.lastAckedTimestamp);
+                subsStats.lastAckedTimestamp, splitTopicAndPartitionIndexLabel);
             metric(stream, cluster, namespace, topic, n, "pulsar_subscription_last_consumed_flow_timestamp",
-                subsStats.lastConsumedFlowTimestamp);
+                subsStats.lastConsumedFlowTimestamp, splitTopicAndPartitionIndexLabel);
             metric(stream, cluster, namespace, topic, n, "pulsar_subscription_last_consumed_timestamp",
-                subsStats.lastConsumedTimestamp);
+                subsStats.lastConsumedTimestamp, splitTopicAndPartitionIndexLabel);
             metric(stream, cluster, namespace, topic, n, "pulsar_subscription_last_mark_delete_advanced_timestamp",
-                subsStats.lastMarkDeleteAdvancedTimestamp);
+                subsStats.lastMarkDeleteAdvancedTimestamp, splitTopicAndPartitionIndexLabel);
             metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_rate_expired",
-                    subsStats.msgRateExpired);
+                    subsStats.msgRateExpired, splitTopicAndPartitionIndexLabel);
             metric(stream, cluster, namespace, topic, n, "pulsar_subscription_total_msg_expired",
-                    subsStats.totalMsgExpired);
+                    subsStats.totalMsgExpired, splitTopicAndPartitionIndexLabel);
             subsStats.consumerStat.forEach((c, consumerStats) -> {
                 metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(),
-                        "pulsar_consumer_msg_rate_redeliver", consumerStats.msgRateRedeliver);
+                        "pulsar_consumer_msg_rate_redeliver", consumerStats.msgRateRedeliver,
+                        splitTopicAndPartitionIndexLabel);
                 metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(),
-                        "pulsar_consumer_unacked_messages", consumerStats.unackedMessages);
+                        "pulsar_consumer_unacked_messages", consumerStats.unackedMessages,
+                        splitTopicAndPartitionIndexLabel);
                 metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(),
                         "pulsar_consumer_blocked_on_unacked_messages",
-                        consumerStats.blockedSubscriptionOnUnackedMsgs ? 1 : 0);
+                        consumerStats.blockedSubscriptionOnUnackedMsgs ? 1 : 0,
+                        splitTopicAndPartitionIndexLabel);
                 metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(),
-                        "pulsar_consumer_msg_rate_out", consumerStats.msgRateOut);
+                        "pulsar_consumer_msg_rate_out", consumerStats.msgRateOut,
+                        splitTopicAndPartitionIndexLabel);
                 metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(),
-                        "pulsar_consumer_msg_throughput_out", consumerStats.msgThroughputOut);
+                        "pulsar_consumer_msg_throughput_out", consumerStats.msgThroughputOut,
+                        splitTopicAndPartitionIndexLabel);
                 metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(),
-                        "pulsar_consumer_available_permits", consumerStats.availablePermits);
+                        "pulsar_consumer_available_permits", consumerStats.availablePermits,
+                        splitTopicAndPartitionIndexLabel);
                 metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(),
-                        "pulsar_out_bytes_total", consumerStats.bytesOutCounter);
+                        "pulsar_out_bytes_total", consumerStats.bytesOutCounter,
+                        splitTopicAndPartitionIndexLabel);
                 metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(),
-                        "pulsar_out_messages_total", consumerStats.msgOutCounter);
+                        "pulsar_out_messages_total", consumerStats.msgOutCounter,
+                        splitTopicAndPartitionIndexLabel);
             });
         });
 
         if (!stats.replicationStats.isEmpty()) {
             stats.replicationStats.forEach((remoteCluster, replStats) -> {
                 metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_rate_in", remoteCluster,
-                        replStats.msgRateIn);
+                        replStats.msgRateIn, splitTopicAndPartitionIndexLabel);
                 metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_rate_out", remoteCluster,
-                        replStats.msgRateOut);
+                        replStats.msgRateOut, splitTopicAndPartitionIndexLabel);
                 metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_throughput_in",
-                        remoteCluster,
-                        replStats.msgThroughputIn);
+                        remoteCluster, replStats.msgThroughputIn, splitTopicAndPartitionIndexLabel);
                 metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_throughput_out",
-                        remoteCluster,
-                        replStats.msgThroughputOut);
+                        remoteCluster, replStats.msgThroughputOut, splitTopicAndPartitionIndexLabel);
                 metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_backlog", remoteCluster,
-                        replStats.replicationBacklog);
+                        replStats.replicationBacklog, splitTopicAndPartitionIndexLabel);
                 metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_connected_count",
-                        remoteCluster, replStats.connectedCount);
+                        remoteCluster, replStats.connectedCount, splitTopicAndPartitionIndexLabel);
                 metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_rate_expired",
-                        remoteCluster, replStats.msgRateExpired);
+                        remoteCluster, replStats.msgRateExpired, splitTopicAndPartitionIndexLabel);
                 metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_delay_in_seconds",
-                        remoteCluster, replStats.replicationDelayInSeconds);
+                        remoteCluster, replStats.replicationDelayInSeconds, splitTopicAndPartitionIndexLabel);
             });
         }
 
-        metric(stream, cluster, namespace, topic, "pulsar_in_bytes_total", stats.bytesInCounter);
-        metric(stream, cluster, namespace, topic, "pulsar_in_messages_total", stats.msgInCounter);
+        metric(stream, cluster, namespace, topic, "pulsar_in_bytes_total", stats.bytesInCounter,
+                splitTopicAndPartitionIndexLabel);
+        metric(stream, cluster, namespace, topic, "pulsar_in_messages_total", stats.msgInCounter,
+                splitTopicAndPartitionIndexLabel);
 
         // Compaction
         boolean hasCompaction = compactorMXBean.flatMap(mxBean -> mxBean.getCompactionRecordForTopic(topic))
                 .map(__ -> true).orElse(false);
         if (hasCompaction) {
             metric(stream, cluster, namespace, topic, "pulsar_compaction_removed_event_count",
-                    stats.compactionRemovedEventCount);
+                    stats.compactionRemovedEventCount, splitTopicAndPartitionIndexLabel);
             metric(stream, cluster, namespace, topic, "pulsar_compaction_succeed_count",
-                    stats.compactionSucceedCount);
+                    stats.compactionSucceedCount, splitTopicAndPartitionIndexLabel);
             metric(stream, cluster, namespace, topic, "pulsar_compaction_failed_count",
-                    stats.compactionFailedCount);
+                    stats.compactionFailedCount, splitTopicAndPartitionIndexLabel);
             metric(stream, cluster, namespace, topic, "pulsar_compaction_duration_time_in_mills",
-                    stats.compactionDurationTimeInMills);
+                    stats.compactionDurationTimeInMills, splitTopicAndPartitionIndexLabel);
             metric(stream, cluster, namespace, topic, "pulsar_compaction_read_throughput",
-                    stats.compactionReadThroughput);
+                    stats.compactionReadThroughput, splitTopicAndPartitionIndexLabel);
             metric(stream, cluster, namespace, topic, "pulsar_compaction_write_throughput",
-                    stats.compactionWriteThroughput);
+                    stats.compactionWriteThroughput, splitTopicAndPartitionIndexLabel);
             metric(stream, cluster, namespace, topic, "pulsar_compaction_compacted_entries_count",
-                    stats.compactionCompactedEntriesCount);
+                    stats.compactionCompactedEntriesCount, splitTopicAndPartitionIndexLabel);
             metric(stream, cluster, namespace, topic, "pulsar_compaction_compacted_entries_size",
-                    stats.compactionCompactedEntriesSize);
+                    stats.compactionCompactedEntriesSize, splitTopicAndPartitionIndexLabel);
             long[] compactionLatencyBuckets = stats.compactionLatencyBuckets.getBuckets();
             metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_0_5",
-                    compactionLatencyBuckets[0]);
+                    compactionLatencyBuckets[0], splitTopicAndPartitionIndexLabel);
             metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_1",
-                    compactionLatencyBuckets[1]);
+                    compactionLatencyBuckets[1], splitTopicAndPartitionIndexLabel);
             metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_5",
-                    compactionLatencyBuckets[2]);
+                    compactionLatencyBuckets[2], splitTopicAndPartitionIndexLabel);
             metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_10",
-                    compactionLatencyBuckets[3]);
+                    compactionLatencyBuckets[3], splitTopicAndPartitionIndexLabel);
             metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_20",
-                    compactionLatencyBuckets[4]);
+                    compactionLatencyBuckets[4], splitTopicAndPartitionIndexLabel);
             metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_50",
-                    compactionLatencyBuckets[5]);
+                    compactionLatencyBuckets[5], splitTopicAndPartitionIndexLabel);
             metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_100",
-                    compactionLatencyBuckets[6]);
+                    compactionLatencyBuckets[6], splitTopicAndPartitionIndexLabel);
             metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_200",
-                    compactionLatencyBuckets[7]);
+                    compactionLatencyBuckets[7], splitTopicAndPartitionIndexLabel);
             metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_1000",
-                    compactionLatencyBuckets[8]);
+                    compactionLatencyBuckets[8], splitTopicAndPartitionIndexLabel);
             metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_overflow",
-                    compactionLatencyBuckets[9]);
+                    compactionLatencyBuckets[9], splitTopicAndPartitionIndexLabel);
             metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_sum",
-                    stats.compactionLatencyBuckets.getSum());
+                    stats.compactionLatencyBuckets.getSum(), splitTopicAndPartitionIndexLabel);
             metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_count",
-                    stats.compactionLatencyBuckets.getCount());
+                    stats.compactionLatencyBuckets.getCount(), splitTopicAndPartitionIndexLabel);
         }
     }
 
@@ -333,64 +375,92 @@ class TopicStats {
     }
 
     private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic,
-                               String name, double value) {
+            String name, double value, boolean splitTopicAndPartitionIndexLabel) {
         metricType(stream, name);
-        stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace)
-                .write("\",topic=\"").write(topic).write("\"} ");
-        stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
+        appendRequiredLabels(stream, cluster, namespace, topic, name, splitTopicAndPartitionIndexLabel).write("\"} ");
+        stream.write(value);
+        appendEndings(stream);
     }
 
     private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic,
-                               String subscription, String name, long value) {
+           String subscription, String name, long value, boolean splitTopicAndPartitionIndexLabel) {
         metricType(stream, name);
-        stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace)
-                .write("\",topic=\"").write(topic).write("\",subscription=\"").write(subscription).write("\"} ");
-        stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
+        appendRequiredLabels(stream, cluster, namespace, topic, name, splitTopicAndPartitionIndexLabel)
+                .write("\",subscription=\"").write(subscription).write("\"} ");
+        stream.write(value);
+        appendEndings(stream);
     }
 
     private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic,
-                               String producerName, long produceId, String name, double value) {
+            String producerName, long produceId, String name, double value, boolean splitTopicAndPartitionIndexLabel) {
         metricType(stream, name);
-        stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace)
-                .write("\",topic=\"").write(topic).write("\",producer_name=\"").write(producerName)
+        appendRequiredLabels(stream, cluster, namespace, topic, name, splitTopicAndPartitionIndexLabel)
+                .write("\",producer_name=\"").write(producerName)
                 .write("\",producer_id=\"").write(produceId).write("\"} ");
-        stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
+        stream.write(value);
+        appendEndings(stream);
     }
 
     private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic,
-                               String subscription, String name, double value) {
+            String subscription, String name, double value, boolean splitTopicAndPartitionIndexLabel) {
         metricType(stream, name);
-        stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace)
-                .write("\",topic=\"").write(topic).write("\",subscription=\"").write(subscription).write("\"} ");
-        stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
+        appendRequiredLabels(stream, cluster, namespace, topic, name, splitTopicAndPartitionIndexLabel)
+                .write("\",subscription=\"").write(subscription).write("\"} ");
+        stream.write(value);
+        appendEndings(stream);
     }
 
     private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic,
-                               String subscription, String consumerName, long consumerId, String name, long value) {
+            String subscription, String consumerName, long consumerId, String name, long value,
+            boolean splitTopicAndPartitionIndexLabel) {
         metricType(stream, name);
-        stream.write(name).write("{cluster=\"").write(cluster).write("\", namespace=\"").write(namespace)
-                .write("\",topic=\"").write(topic).write("\",subscription=\"").write(subscription)
+        appendRequiredLabels(stream, cluster, namespace, topic, name, splitTopicAndPartitionIndexLabel)
+                .write("\",subscription=\"").write(subscription)
                 .write("\",consumer_name=\"").write(consumerName).write("\",consumer_id=\"").write(consumerId)
                 .write("\"} ");
-        stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
+        stream.write(value);
+        appendEndings(stream);
     }
 
     private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic,
-                               String subscription, String consumerName, long consumerId, String name, double value) {
+            String subscription, String consumerName, long consumerId, String name, double value,
+            boolean splitTopicAndPartitionIndexLabel) {
         metricType(stream, name);
-        stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace)
-                .write("\",topic=\"").write(topic).write("\",subscription=\"").write(subscription)
+        appendRequiredLabels(stream, cluster, namespace, topic, name, splitTopicAndPartitionIndexLabel)
+                .write("\",subscription=\"").write(subscription)
                 .write("\",consumer_name=\"").write(consumerName).write("\",consumer_id=\"")
                 .write(consumerId).write("\"} ");
-        stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
+        stream.write(value);
+        appendEndings(stream);
     }
 
     private static void metricWithRemoteCluster(SimpleTextOutputStream stream, String cluster, String namespace,
-                                                String topic,
-            String name, String remoteCluster, double value) {
+            String topic, String name, String remoteCluster, double value, boolean splitTopicAndPartitionIndexLabel) {
         metricType(stream, name);
+        appendRequiredLabels(stream, cluster, namespace, topic, name, splitTopicAndPartitionIndexLabel)
+                .write("\",remote_cluster=\"").write(remoteCluster).write("\"} ");
+        stream.write(value);
+        appendEndings(stream);
+    }
+
+    private static SimpleTextOutputStream appendRequiredLabels(SimpleTextOutputStream stream, String cluster,
+            String namespace, String topic, String name, boolean splitTopicAndPartitionIndexLabel) {
         stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace);
-        stream.write("\",topic=\"").write(topic).write("\",remote_cluster=\"").write(remoteCluster).write("\"} ");
-        stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
+        if (splitTopicAndPartitionIndexLabel) {
+            int index = topic.indexOf(PARTITIONED_TOPIC_SUFFIX);
+            if (index > 0) {
+                stream.write("\",topic=\"").write(topic.substring(0, index)).write("\",partition=\"")
+                        .write(topic.substring(index + PARTITIONED_TOPIC_SUFFIX.length()));
+            } else {
+                stream.write("\",topic=\"").write(topic).write("\",partition=\"").write("-1");
+            }
+        } else {
+            stream.write("\",topic=\"").write(topic);
+        }
+        return stream;
+    }
+
+    private static void appendEndings(SimpleTextOutputStream stream) {
+        stream.write(' ').write(System.currentTimeMillis()).write('\n');
     }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index 3de3a04..9f09893 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -1222,6 +1222,48 @@ public class PrometheusMetricsTest extends BrokerTestBase {
         pulsarClient.close();
     }
 
+    @Test
+    public void testSplitTopicAndPartitionLabel() throws Exception {
+        String ns1 = "prop/ns-abc1";
+        String ns2 = "prop/ns-abc2";
+        admin.namespaces().createNamespace(ns1);
+        admin.namespaces().createNamespace(ns2);
+        String baseTopic1 = "persistent://" + ns1 + "/testMetricsTopicCount";
+        String baseTopic2 = "persistent://" + ns2 + "/testMetricsTopicCount";
+        for (int i = 0; i < 6; i++) {
+            admin.topics().createNonPartitionedTopic(baseTopic1 + UUID.randomUUID());
+        }
+        for (int i = 0; i < 3; i++) {
+            admin.topics().createPartitionedTopic(baseTopic2 + UUID.randomUUID(), 3);
+        }
+        Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
+                .topicsPattern("persistent://" + ns1 + "/.*")
+                .subscriptionName("sub")
+                .subscribe();
+        Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
+                .topicsPattern("persistent://" + ns2 + "/.*")
+                .subscriptionName("sub")
+                .subscribe();
+        @Cleanup
+        ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
+        PrometheusMetricsGenerator.generate(pulsar, true, false, false, true,  statsOut);
+        String metricsStr = statsOut.toString();
+        Multimap<String, Metric> metrics = parseMetrics(metricsStr);
+        Collection<Metric> metric = metrics.get("pulsar_consumers_count");
+        assertTrue(metric.size() >= 15);
+        metric.forEach(item -> {
+            if (ns1.equals(item.tags.get("namespace"))) {
+                assertEquals(item.tags.get("partition"), "-1");
+            }
+            if (ns2.equals(item.tags.get("namespace"))) {
+                System.out.println(item);
+                assertTrue(Integer.parseInt(item.tags.get("partition")) >= 0);
+            }
+        });
+        consumer1.close();
+        consumer2.close();
+    }
+
     private void compareCompactionStateCount(List<Metric> cm, double count) {
         assertEquals(cm.size(), 1);
         assertEquals(cm.get(0).tags.get("cluster"), "test");