You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/07/01 09:50:49 UTC

[GitHub] [pulsar] marksilcox commented on a diff in pull request #15558: [fix][broker][functions-worker] Ensure prometheus metrics are grouped by type (#8407, #13865)

marksilcox commented on code in PR #15558:
URL: https://github.com/apache/pulsar/pull/15558#discussion_r911809565


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java:
##########
@@ -305,155 +332,224 @@ private static void getTopicStats(Topic topic, TopicStats stats, boolean include
                 });
     }
 
-    private static void buildDefaultBrokerStats(Map<String, Collector.MetricFamilySamples> metrics, String cluster) {
-        // Print metrics with 0 values. This is necessary to have the available brokers being
-        // reported in the brokers dashboard even if they don't have any topic or traffic
-        metric(metrics, cluster, "pulsar_topics_count", 0);
-        metric(metrics, cluster, "pulsar_subscriptions_count", 0);
-        metric(metrics, cluster, "pulsar_producers_count", 0);
-        metric(metrics, cluster, "pulsar_consumers_count", 0);
-        metric(metrics, cluster, "pulsar_rate_in", 0);
-        metric(metrics, cluster, "pulsar_rate_out", 0);
-        metric(metrics, cluster, "pulsar_throughput_in", 0);
-        metric(metrics, cluster, "pulsar_throughput_out", 0);
-        metric(metrics, cluster, "pulsar_storage_size", 0);
-        metric(metrics, cluster, "pulsar_storage_logical_size", 0);
-        metric(metrics, cluster, "pulsar_storage_write_rate", 0);
-        metric(metrics, cluster, "pulsar_storage_read_rate", 0);
-        metric(metrics, cluster, "pulsar_msg_backlog", 0);
+    private static void printTopicsCountStats(SimpleTextOutputStream stream, String cluster,
+                                              Map<String, Long> namespaceTopicsCount) {
+        stream.write("# TYPE ").write("pulsar_topics_count").write(" gauge\n");
+        stream.write("pulsar_topics_count")
+                .write("{cluster=\"").write(cluster).write("\"} ")
+                .write(0).write(' ').write(System.currentTimeMillis())
+                .write('\n');
+        namespaceTopicsCount.forEach((ns, topicCount) -> stream.write("pulsar_topics_count")
+                .write("{cluster=\"").write(cluster)
+                .write("\",namespace=\"").write(ns)
+                .write("\"} ")
+                .write(topicCount).write(' ').write(System.currentTimeMillis())
+                .write('\n')
+        );
     }
 
-    private static void printTopicsCountStats(Map<String, Collector.MetricFamilySamples> metrics, String cluster,
-                                              String namespace,
-                                              LongAdder topicsCount) {
-        metric(metrics, cluster, namespace, "pulsar_topics_count", topicsCount.sum());
+    private static void printNamespaceStats(SimpleTextOutputStream stream, String cluster,
+                                            List<AggregatedNamespaceStats> stats) {
+        writeMetricWithBrokerDefault(stream, cluster, "pulsar_topics_count", stats, s -> s.topicsCount);
+        writeMetricWithBrokerDefault(stream, cluster, "pulsar_subscriptions_count", stats, s -> s.subscriptionsCount);
+        writeMetricWithBrokerDefault(stream, cluster, "pulsar_producers_count", stats, s -> s.producersCount);
+        writeMetricWithBrokerDefault(stream, cluster, "pulsar_consumers_count", stats, s -> s.consumersCount);
+
+        writeMetricWithBrokerDefault(stream, cluster, "pulsar_rate_in", stats, s -> s.rateIn);
+        writeMetricWithBrokerDefault(stream, cluster, "pulsar_rate_out", stats, s -> s.rateOut);
+        writeMetricWithBrokerDefault(stream, cluster, "pulsar_throughput_in", stats, s -> s.throughputIn);
+        writeMetricWithBrokerDefault(stream, cluster, "pulsar_throughput_out", stats, s -> s.throughputOut);
+        writeMetric(stream, cluster, "pulsar_consumer_msg_ack_rate", stats, s -> s.messageAckRate);
+
+        writeMetric(stream, cluster, "pulsar_in_bytes_total", stats, s -> s.bytesInCounter);
+        writeMetric(stream, cluster, "pulsar_in_messages_total", stats, s -> s.msgInCounter);
+        writeMetric(stream, cluster, "pulsar_out_bytes_total", stats, s -> s.bytesOutCounter);
+        writeMetric(stream, cluster, "pulsar_out_messages_total", stats, s -> s.msgOutCounter);
+
+        writeMetricWithBrokerDefault(stream, cluster, "pulsar_storage_size", stats,
+                s -> s.managedLedgerStats.storageSize);
+        writeMetricWithBrokerDefault(stream, cluster, "pulsar_storage_logical_size", stats,
+                s -> s.managedLedgerStats.storageLogicalSize);
+        writeMetric(stream, cluster, "pulsar_storage_backlog_size", stats, s -> s.managedLedgerStats.backlogSize);
+        writeMetric(stream, cluster, "pulsar_storage_offloaded_size",
+                stats, s -> s.managedLedgerStats.offloadedStorageUsed);
+
+        writeMetricWithBrokerDefault(stream, cluster, "pulsar_storage_write_rate", stats,
+                s -> s.managedLedgerStats.storageWriteRate);
+        writeMetricWithBrokerDefault(stream, cluster, "pulsar_storage_read_rate", stats,
+                s -> s.managedLedgerStats.storageReadRate);
+
+        writeMetric(stream, cluster, "pulsar_subscription_delayed", stats, s -> s.msgDelayed);
+
+        writeMsgBacklog(stream, cluster, stats, s -> s.msgBacklog);
+
+        stats.forEach(s -> s.managedLedgerStats.storageWriteLatencyBuckets.refresh());
+        writeMetric(stream, cluster, "pulsar_storage_write_latency_le_0_5", stats,
+                s -> s.managedLedgerStats.storageWriteLatencyBuckets.getBuckets()[0]);
+        writeMetric(stream, cluster, "pulsar_storage_write_latency_le_1", stats,
+                s -> s.managedLedgerStats.storageWriteLatencyBuckets.getBuckets()[1]);
+        writeMetric(stream, cluster, "pulsar_storage_write_latency_le_5", stats,
+                s -> s.managedLedgerStats.storageWriteLatencyBuckets.getBuckets()[2]);
+        writeMetric(stream, cluster, "pulsar_storage_write_latency_le_10", stats,
+                s -> s.managedLedgerStats.storageWriteLatencyBuckets.getBuckets()[3]);
+        writeMetric(stream, cluster, "pulsar_storage_write_latency_le_20", stats,
+                s -> s.managedLedgerStats.storageWriteLatencyBuckets.getBuckets()[4]);
+        writeMetric(stream, cluster, "pulsar_storage_write_latency_le_50", stats,
+                s -> s.managedLedgerStats.storageWriteLatencyBuckets.getBuckets()[5]);
+        writeMetric(stream, cluster, "pulsar_storage_write_latency_le_100", stats,
+                s -> s.managedLedgerStats.storageWriteLatencyBuckets.getBuckets()[6]);
+        writeMetric(stream, cluster, "pulsar_storage_write_latency_le_200", stats,
+                s -> s.managedLedgerStats.storageWriteLatencyBuckets.getBuckets()[7]);
+        writeMetric(stream, cluster, "pulsar_storage_write_latency_le_1000", stats,
+                s -> s.managedLedgerStats.storageWriteLatencyBuckets.getBuckets()[8]);
+        writeMetric(stream, cluster, "pulsar_storage_write_latency_overflow", stats,
+                s -> s.managedLedgerStats.storageWriteLatencyBuckets.getBuckets()[9]);
+        writeMetric(stream, cluster, "pulsar_storage_write_latency_count",
+                stats, s -> s.managedLedgerStats.storageWriteLatencyBuckets.getCount());
+        writeMetric(stream, cluster, "pulsar_storage_write_latency_sum",
+                stats, s -> s.managedLedgerStats.storageWriteLatencyBuckets.getSum());
+
+        stats.forEach(s -> s.managedLedgerStats.storageLedgerWriteLatencyBuckets.refresh());
+        writeMetric(stream, cluster, "pulsar_storage_ledger_write_latency_le_0_5", stats,
+                s -> s.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets()[0]);
+        writeMetric(stream, cluster, "pulsar_storage_ledger_write_latency_le_1", stats,
+                s -> s.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets()[1]);
+        writeMetric(stream, cluster, "pulsar_storage_ledger_write_latency_le_5", stats,
+                s -> s.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets()[2]);
+        writeMetric(stream, cluster, "pulsar_storage_ledger_write_latency_le_10", stats,
+                s -> s.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets()[3]);
+        writeMetric(stream, cluster, "pulsar_storage_ledger_write_latency_le_20", stats,
+                s -> s.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets()[4]);
+        writeMetric(stream, cluster, "pulsar_storage_ledger_write_latency_le_50", stats,
+                s -> s.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets()[5]);
+        writeMetric(stream, cluster, "pulsar_storage_ledger_write_latency_le_100", stats,
+                s -> s.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets()[6]);
+        writeMetric(stream, cluster, "pulsar_storage_ledger_write_latency_le_200", stats,
+                s -> s.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets()[7]);
+        writeMetric(stream, cluster, "pulsar_storage_ledger_write_latency_le_1000",
+                stats, s -> s.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets()[8]);
+        writeMetric(stream, cluster, "pulsar_storage_ledger_write_latency_overflow",
+                stats, s -> s.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets()[9]);
+        writeMetric(stream, cluster, "pulsar_storage_ledger_write_latency_count",
+                stats, s -> s.managedLedgerStats.storageLedgerWriteLatencyBuckets.getCount());
+        writeMetric(stream, cluster, "pulsar_storage_ledger_write_latency_sum",
+                stats, s -> s.managedLedgerStats.storageLedgerWriteLatencyBuckets.getSum());
+
+        stats.forEach(s -> s.managedLedgerStats.entrySizeBuckets.refresh());
+        writeMetric(stream, cluster, "pulsar_entry_size_le_128", stats,
+                s -> s.managedLedgerStats.entrySizeBuckets.getBuckets()[0]);
+        writeMetric(stream, cluster, "pulsar_entry_size_le_512", stats,
+                s -> s.managedLedgerStats.entrySizeBuckets.getBuckets()[1]);
+        writeMetric(stream, cluster, "pulsar_entry_size_le_1_kb", stats,
+                s -> s.managedLedgerStats.entrySizeBuckets.getBuckets()[2]);
+        writeMetric(stream, cluster, "pulsar_entry_size_le_2_kb", stats,
+                s -> s.managedLedgerStats.entrySizeBuckets.getBuckets()[3]);
+        writeMetric(stream, cluster, "pulsar_entry_size_le_4_kb", stats,
+                s -> s.managedLedgerStats.entrySizeBuckets.getBuckets()[4]);
+        writeMetric(stream, cluster, "pulsar_entry_size_le_16_kb", stats,
+                s -> s.managedLedgerStats.entrySizeBuckets.getBuckets()[5]);
+        writeMetric(stream, cluster, "pulsar_entry_size_le_100_kb", stats,
+                s -> s.managedLedgerStats.entrySizeBuckets.getBuckets()[6]);
+        writeMetric(stream, cluster, "pulsar_entry_size_le_1_mb", stats,
+                s -> s.managedLedgerStats.entrySizeBuckets.getBuckets()[7]);
+        writeMetric(stream, cluster, "pulsar_entry_size_le_overflow", stats,
+                s -> s.managedLedgerStats.entrySizeBuckets.getBuckets()[8]);
+        writeMetric(stream, cluster, "pulsar_entry_size_count",
+                stats, s -> s.managedLedgerStats.entrySizeBuckets.getCount());
+        writeMetric(stream, cluster, "pulsar_entry_size_sum",
+                stats, s -> s.managedLedgerStats.entrySizeBuckets.getSum());
+
+        writeReplicationStat(stream, cluster, "pulsar_replication_rate_in", stats,
+                replStats -> replStats.msgRateIn);
+        writeReplicationStat(stream, cluster, "pulsar_replication_rate_out", stats,
+                replStats -> replStats.msgRateOut);
+        writeReplicationStat(stream, cluster, "pulsar_replication_throughput_in", stats,
+                replStats -> replStats.msgThroughputIn);
+        writeReplicationStat(stream, cluster, "pulsar_replication_throughput_out", stats,
+                replStats -> replStats.msgThroughputOut);
+        writeReplicationStat(stream, cluster, "pulsar_replication_backlog", stats,
+                replStats -> replStats.replicationBacklog);
+        writeReplicationStat(stream, cluster, "pulsar_replication_connected_count", stats,
+                replStats -> replStats.connectedCount);
+        writeReplicationStat(stream, cluster, "pulsar_replication_rate_expired", stats,
+                replStats -> replStats.msgRateExpired);
+        writeReplicationStat(stream, cluster, "pulsar_replication_delay_in_seconds", stats,
+                replStats -> replStats.replicationDelayInSeconds);
     }
 
-    private static void printNamespaceStats(Map<String, Collector.MetricFamilySamples> metrics, String cluster,
-                                            String namespace,
-                                            AggregatedNamespaceStats stats) {
-        metric(metrics, cluster, namespace, "pulsar_topics_count", stats.topicsCount);
-        metric(metrics, cluster, namespace, "pulsar_subscriptions_count", stats.subscriptionsCount);
-        metric(metrics, cluster, namespace, "pulsar_producers_count", stats.producersCount);
-        metric(metrics, cluster, namespace, "pulsar_consumers_count", stats.consumersCount);
-
-        metric(metrics, cluster, namespace, "pulsar_rate_in", stats.rateIn);
-        metric(metrics, cluster, namespace, "pulsar_rate_out", stats.rateOut);
-        metric(metrics, cluster, namespace, "pulsar_throughput_in", stats.throughputIn);
-        metric(metrics, cluster, namespace, "pulsar_throughput_out", stats.throughputOut);
-        metric(metrics, cluster, namespace, "pulsar_consumer_msg_ack_rate", stats.messageAckRate);
-
-        metric(metrics, cluster, namespace, "pulsar_in_bytes_total", stats.bytesInCounter);
-        metric(metrics, cluster, namespace, "pulsar_in_messages_total", stats.msgInCounter);
-        metric(metrics, cluster, namespace, "pulsar_out_bytes_total", stats.bytesOutCounter);
-        metric(metrics, cluster, namespace, "pulsar_out_messages_total", stats.msgOutCounter);
-
-        metric(metrics, cluster, namespace, "pulsar_storage_size", stats.managedLedgerStats.storageSize);
-        metric(metrics, cluster, namespace, "pulsar_storage_logical_size", stats.managedLedgerStats.storageLogicalSize);
-        metric(metrics, cluster, namespace, "pulsar_storage_backlog_size", stats.managedLedgerStats.backlogSize);
-        metric(metrics, cluster, namespace, "pulsar_storage_offloaded_size",
-                stats.managedLedgerStats.offloadedStorageUsed);
-
-        metric(metrics, cluster, namespace, "pulsar_storage_write_rate", stats.managedLedgerStats.storageWriteRate);
-        metric(metrics, cluster, namespace, "pulsar_storage_read_rate", stats.managedLedgerStats.storageReadRate);
-
-        metric(metrics, cluster, namespace, "pulsar_subscription_delayed", stats.msgDelayed);
-
-        metricWithRemoteCluster(metrics, cluster, namespace, "pulsar_msg_backlog", "local", stats.msgBacklog);
-
-        stats.managedLedgerStats.storageWriteLatencyBuckets.refresh();
-        long[] latencyBuckets = stats.managedLedgerStats.storageWriteLatencyBuckets.getBuckets();
-        metric(metrics, cluster, namespace, "pulsar_storage_write_latency_le_0_5", latencyBuckets[0]);
-        metric(metrics, cluster, namespace, "pulsar_storage_write_latency_le_1", latencyBuckets[1]);
-        metric(metrics, cluster, namespace, "pulsar_storage_write_latency_le_5", latencyBuckets[2]);
-        metric(metrics, cluster, namespace, "pulsar_storage_write_latency_le_10", latencyBuckets[3]);
-        metric(metrics, cluster, namespace, "pulsar_storage_write_latency_le_20", latencyBuckets[4]);
-        metric(metrics, cluster, namespace, "pulsar_storage_write_latency_le_50", latencyBuckets[5]);
-        metric(metrics, cluster, namespace, "pulsar_storage_write_latency_le_100", latencyBuckets[6]);
-        metric(metrics, cluster, namespace, "pulsar_storage_write_latency_le_200", latencyBuckets[7]);
-        metric(metrics, cluster, namespace, "pulsar_storage_write_latency_le_1000", latencyBuckets[8]);
-        metric(metrics, cluster, namespace, "pulsar_storage_write_latency_overflow", latencyBuckets[9]);
-        metric(metrics, cluster, namespace, "pulsar_storage_write_latency_count",
-                stats.managedLedgerStats.storageWriteLatencyBuckets.getCount());
-        metric(metrics, cluster, namespace, "pulsar_storage_write_latency_sum",
-                stats.managedLedgerStats.storageWriteLatencyBuckets.getSum());
-
-        stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.refresh();
-        long[] ledgerWritelatencyBuckets = stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets();
-        metric(metrics, cluster, namespace, "pulsar_storage_ledger_write_latency_le_0_5", ledgerWritelatencyBuckets[0]);
-        metric(metrics, cluster, namespace, "pulsar_storage_ledger_write_latency_le_1", ledgerWritelatencyBuckets[1]);
-        metric(metrics, cluster, namespace, "pulsar_storage_ledger_write_latency_le_5", ledgerWritelatencyBuckets[2]);
-        metric(metrics, cluster, namespace, "pulsar_storage_ledger_write_latency_le_10", ledgerWritelatencyBuckets[3]);
-        metric(metrics, cluster, namespace, "pulsar_storage_ledger_write_latency_le_20", ledgerWritelatencyBuckets[4]);
-        metric(metrics, cluster, namespace, "pulsar_storage_ledger_write_latency_le_50", ledgerWritelatencyBuckets[5]);
-        metric(metrics, cluster, namespace, "pulsar_storage_ledger_write_latency_le_100", ledgerWritelatencyBuckets[6]);
-        metric(metrics, cluster, namespace, "pulsar_storage_ledger_write_latency_le_200", ledgerWritelatencyBuckets[7]);
-        metric(metrics, cluster, namespace, "pulsar_storage_ledger_write_latency_le_1000",
-                ledgerWritelatencyBuckets[8]);
-        metric(metrics, cluster, namespace, "pulsar_storage_ledger_write_latency_overflow",
-                ledgerWritelatencyBuckets[9]);
-        metric(metrics, cluster, namespace, "pulsar_storage_ledger_write_latency_count",
-                stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getCount());
-        metric(metrics, cluster, namespace, "pulsar_storage_ledger_write_latency_sum",
-                stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getSum());
-
-        stats.managedLedgerStats.entrySizeBuckets.refresh();
-        long[] entrySizeBuckets = stats.managedLedgerStats.entrySizeBuckets.getBuckets();
-        metric(metrics, cluster, namespace, "pulsar_entry_size_le_128", entrySizeBuckets[0]);
-        metric(metrics, cluster, namespace, "pulsar_entry_size_le_512", entrySizeBuckets[1]);
-        metric(metrics, cluster, namespace, "pulsar_entry_size_le_1_kb", entrySizeBuckets[2]);
-        metric(metrics, cluster, namespace, "pulsar_entry_size_le_2_kb", entrySizeBuckets[3]);
-        metric(metrics, cluster, namespace, "pulsar_entry_size_le_4_kb", entrySizeBuckets[4]);
-        metric(metrics, cluster, namespace, "pulsar_entry_size_le_16_kb", entrySizeBuckets[5]);
-        metric(metrics, cluster, namespace, "pulsar_entry_size_le_100_kb", entrySizeBuckets[6]);
-        metric(metrics, cluster, namespace, "pulsar_entry_size_le_1_mb", entrySizeBuckets[7]);
-        metric(metrics, cluster, namespace, "pulsar_entry_size_le_overflow", entrySizeBuckets[8]);
-        metric(metrics, cluster, namespace, "pulsar_entry_size_count",
-                stats.managedLedgerStats.entrySizeBuckets.getCount());
-        metric(metrics, cluster, namespace, "pulsar_entry_size_sum",
-                stats.managedLedgerStats.entrySizeBuckets.getSum());
-
-        if (!stats.replicationStats.isEmpty()) {
-            stats.replicationStats.forEach((remoteCluster, replStats) -> {
-                metricWithRemoteCluster(metrics, cluster, namespace, "pulsar_replication_rate_in", remoteCluster,
-                        replStats.msgRateIn);
-                metricWithRemoteCluster(metrics, cluster, namespace, "pulsar_replication_rate_out", remoteCluster,
-                        replStats.msgRateOut);
-                metricWithRemoteCluster(metrics, cluster, namespace, "pulsar_replication_throughput_in", remoteCluster,
-                        replStats.msgThroughputIn);
-                metricWithRemoteCluster(metrics, cluster, namespace, "pulsar_replication_throughput_out", remoteCluster,
-                        replStats.msgThroughputOut);
-                metricWithRemoteCluster(metrics, cluster, namespace, "pulsar_replication_backlog", remoteCluster,
-                        replStats.replicationBacklog);
-                metricWithRemoteCluster(metrics, cluster, namespace, "pulsar_replication_connected_count",
-                        remoteCluster,
-                        replStats.connectedCount);
-                metricWithRemoteCluster(metrics, cluster, namespace, "pulsar_replication_rate_expired", remoteCluster,
-                        replStats.msgRateExpired);
-                metricWithRemoteCluster(metrics, cluster, namespace, "pulsar_replication_delay_in_seconds",
-                        remoteCluster, replStats.replicationDelayInSeconds);
-            });
-        }
+    private static void writeMetricWithBrokerDefault(SimpleTextOutputStream stream, String cluster, String name,
+                                                     List<AggregatedNamespaceStats> allNamespaceStats,
+                                                     Function<AggregatedNamespaceStats, Number> namespaceFunction) {
+        stream.write("# TYPE ").write(name).write(" gauge\n");
+        stream.write(name)
+                .write("{cluster=\"").write(cluster).write("\"} ")
+                .write(0).write(' ').write(System.currentTimeMillis())
+                .write('\n');
+        writeNamespaceStats(stream, cluster, name, allNamespaceStats, namespaceFunction);
     }
 
-    private static void metric(Map<String, Collector.MetricFamilySamples> metrics, String cluster, String name,
-                               long value) {
-        addMetric(metrics, Map.of("cluster", cluster), name, value);
+    private static void writeMetric(SimpleTextOutputStream stream, String cluster, String name,
+                                    List<AggregatedNamespaceStats> allNamespaceStats,
+                                    Function<AggregatedNamespaceStats, Number> namespaceFunction) {
+        stream.write("# TYPE ").write(name).write(" gauge\n");
+        writeNamespaceStats(stream, cluster, name, allNamespaceStats, namespaceFunction);
     }
 
-    private static void metric(Map<String, Collector.MetricFamilySamples> metrics, String cluster, String namespace,
-                               String name, long value) {
-        addMetric(metrics, Map.of("cluster", cluster, "namespace", namespace), name, value);
+    private static void writeNamespaceStats(SimpleTextOutputStream stream, String cluster, String name,
+                                            List<AggregatedNamespaceStats> allNamespaceStats,
+                                            Function<AggregatedNamespaceStats, Number> namespaceFunction) {
+        allNamespaceStats.forEach(n -> {
+            Number value = namespaceFunction.apply(n);
+            if (value != null) {
+                stream.write(name)
+                        .write("{cluster=\"").write(cluster)
+                        .write("\",namespace=\"").write(n.name)
+                        .write("\"} ")
+                        .write(value).write(' ').write(System.currentTimeMillis())
+                        .write('\n');
+            }
+        });
     }
 
-    private static void metric(Map<String, Collector.MetricFamilySamples> metrics, String cluster, String namespace,
-                               String name, double value) {
-        addMetric(metrics, Map.of("cluster", cluster, "namespace", namespace), name, value);
+    private static void writeMsgBacklog(SimpleTextOutputStream stream, String cluster,
+                                        List<AggregatedNamespaceStats> allNamespaceStats,
+                                        Function<AggregatedNamespaceStats, Number> namespaceFunction) {
+        stream.write("# TYPE ").write("pulsar_msg_backlog").write(" gauge\n");
+        stream.write("pulsar_msg_backlog")

Review Comment:
   This was originally written in `printDefaultBrokerStats` which was always called.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org