You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/07/01 09:50:49 UTC
[GitHub] [pulsar] marksilcox commented on a diff in pull request #15558: [fix][broker][functions-worker] Ensure prometheus metrics are grouped by type (#8407, #13865)
marksilcox commented on code in PR #15558:
URL: https://github.com/apache/pulsar/pull/15558#discussion_r911809565
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java:
##########
@@ -305,155 +332,224 @@ private static void getTopicStats(Topic topic, TopicStats stats, boolean include
});
}
- private static void buildDefaultBrokerStats(Map<String, Collector.MetricFamilySamples> metrics, String cluster) {
- // Print metrics with 0 values. This is necessary to have the available brokers being
- // reported in the brokers dashboard even if they don't have any topic or traffic
- metric(metrics, cluster, "pulsar_topics_count", 0);
- metric(metrics, cluster, "pulsar_subscriptions_count", 0);
- metric(metrics, cluster, "pulsar_producers_count", 0);
- metric(metrics, cluster, "pulsar_consumers_count", 0);
- metric(metrics, cluster, "pulsar_rate_in", 0);
- metric(metrics, cluster, "pulsar_rate_out", 0);
- metric(metrics, cluster, "pulsar_throughput_in", 0);
- metric(metrics, cluster, "pulsar_throughput_out", 0);
- metric(metrics, cluster, "pulsar_storage_size", 0);
- metric(metrics, cluster, "pulsar_storage_logical_size", 0);
- metric(metrics, cluster, "pulsar_storage_write_rate", 0);
- metric(metrics, cluster, "pulsar_storage_read_rate", 0);
- metric(metrics, cluster, "pulsar_msg_backlog", 0);
+ private static void printTopicsCountStats(SimpleTextOutputStream stream, String cluster,
+ Map<String, Long> namespaceTopicsCount) {
+ stream.write("# TYPE ").write("pulsar_topics_count").write(" gauge\n");
+ stream.write("pulsar_topics_count")
+ .write("{cluster=\"").write(cluster).write("\"} ")
+ .write(0).write(' ').write(System.currentTimeMillis())
+ .write('\n');
+ namespaceTopicsCount.forEach((ns, topicCount) -> stream.write("pulsar_topics_count")
+ .write("{cluster=\"").write(cluster)
+ .write("\",namespace=\"").write(ns)
+ .write("\"} ")
+ .write(topicCount).write(' ').write(System.currentTimeMillis())
+ .write('\n')
+ );
}
- private static void printTopicsCountStats(Map<String, Collector.MetricFamilySamples> metrics, String cluster,
- String namespace,
- LongAdder topicsCount) {
- metric(metrics, cluster, namespace, "pulsar_topics_count", topicsCount.sum());
+ private static void printNamespaceStats(SimpleTextOutputStream stream, String cluster,
+ List<AggregatedNamespaceStats> stats) {
+ writeMetricWithBrokerDefault(stream, cluster, "pulsar_topics_count", stats, s -> s.topicsCount);
+ writeMetricWithBrokerDefault(stream, cluster, "pulsar_subscriptions_count", stats, s -> s.subscriptionsCount);
+ writeMetricWithBrokerDefault(stream, cluster, "pulsar_producers_count", stats, s -> s.producersCount);
+ writeMetricWithBrokerDefault(stream, cluster, "pulsar_consumers_count", stats, s -> s.consumersCount);
+
+ writeMetricWithBrokerDefault(stream, cluster, "pulsar_rate_in", stats, s -> s.rateIn);
+ writeMetricWithBrokerDefault(stream, cluster, "pulsar_rate_out", stats, s -> s.rateOut);
+ writeMetricWithBrokerDefault(stream, cluster, "pulsar_throughput_in", stats, s -> s.throughputIn);
+ writeMetricWithBrokerDefault(stream, cluster, "pulsar_throughput_out", stats, s -> s.throughputOut);
+ writeMetric(stream, cluster, "pulsar_consumer_msg_ack_rate", stats, s -> s.messageAckRate);
+
+ writeMetric(stream, cluster, "pulsar_in_bytes_total", stats, s -> s.bytesInCounter);
+ writeMetric(stream, cluster, "pulsar_in_messages_total", stats, s -> s.msgInCounter);
+ writeMetric(stream, cluster, "pulsar_out_bytes_total", stats, s -> s.bytesOutCounter);
+ writeMetric(stream, cluster, "pulsar_out_messages_total", stats, s -> s.msgOutCounter);
+
+ writeMetricWithBrokerDefault(stream, cluster, "pulsar_storage_size", stats,
+ s -> s.managedLedgerStats.storageSize);
+ writeMetricWithBrokerDefault(stream, cluster, "pulsar_storage_logical_size", stats,
+ s -> s.managedLedgerStats.storageLogicalSize);
+ writeMetric(stream, cluster, "pulsar_storage_backlog_size", stats, s -> s.managedLedgerStats.backlogSize);
+ writeMetric(stream, cluster, "pulsar_storage_offloaded_size",
+ stats, s -> s.managedLedgerStats.offloadedStorageUsed);
+
+ writeMetricWithBrokerDefault(stream, cluster, "pulsar_storage_write_rate", stats,
+ s -> s.managedLedgerStats.storageWriteRate);
+ writeMetricWithBrokerDefault(stream, cluster, "pulsar_storage_read_rate", stats,
+ s -> s.managedLedgerStats.storageReadRate);
+
+ writeMetric(stream, cluster, "pulsar_subscription_delayed", stats, s -> s.msgDelayed);
+
+ writeMsgBacklog(stream, cluster, stats, s -> s.msgBacklog);
+
+ stats.forEach(s -> s.managedLedgerStats.storageWriteLatencyBuckets.refresh());
+ writeMetric(stream, cluster, "pulsar_storage_write_latency_le_0_5", stats,
+ s -> s.managedLedgerStats.storageWriteLatencyBuckets.getBuckets()[0]);
+ writeMetric(stream, cluster, "pulsar_storage_write_latency_le_1", stats,
+ s -> s.managedLedgerStats.storageWriteLatencyBuckets.getBuckets()[1]);
+ writeMetric(stream, cluster, "pulsar_storage_write_latency_le_5", stats,
+ s -> s.managedLedgerStats.storageWriteLatencyBuckets.getBuckets()[2]);
+ writeMetric(stream, cluster, "pulsar_storage_write_latency_le_10", stats,
+ s -> s.managedLedgerStats.storageWriteLatencyBuckets.getBuckets()[3]);
+ writeMetric(stream, cluster, "pulsar_storage_write_latency_le_20", stats,
+ s -> s.managedLedgerStats.storageWriteLatencyBuckets.getBuckets()[4]);
+ writeMetric(stream, cluster, "pulsar_storage_write_latency_le_50", stats,
+ s -> s.managedLedgerStats.storageWriteLatencyBuckets.getBuckets()[5]);
+ writeMetric(stream, cluster, "pulsar_storage_write_latency_le_100", stats,
+ s -> s.managedLedgerStats.storageWriteLatencyBuckets.getBuckets()[6]);
+ writeMetric(stream, cluster, "pulsar_storage_write_latency_le_200", stats,
+ s -> s.managedLedgerStats.storageWriteLatencyBuckets.getBuckets()[7]);
+ writeMetric(stream, cluster, "pulsar_storage_write_latency_le_1000", stats,
+ s -> s.managedLedgerStats.storageWriteLatencyBuckets.getBuckets()[8]);
+ writeMetric(stream, cluster, "pulsar_storage_write_latency_overflow", stats,
+ s -> s.managedLedgerStats.storageWriteLatencyBuckets.getBuckets()[9]);
+ writeMetric(stream, cluster, "pulsar_storage_write_latency_count",
+ stats, s -> s.managedLedgerStats.storageWriteLatencyBuckets.getCount());
+ writeMetric(stream, cluster, "pulsar_storage_write_latency_sum",
+ stats, s -> s.managedLedgerStats.storageWriteLatencyBuckets.getSum());
+
+ stats.forEach(s -> s.managedLedgerStats.storageLedgerWriteLatencyBuckets.refresh());
+ writeMetric(stream, cluster, "pulsar_storage_ledger_write_latency_le_0_5", stats,
+ s -> s.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets()[0]);
+ writeMetric(stream, cluster, "pulsar_storage_ledger_write_latency_le_1", stats,
+ s -> s.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets()[1]);
+ writeMetric(stream, cluster, "pulsar_storage_ledger_write_latency_le_5", stats,
+ s -> s.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets()[2]);
+ writeMetric(stream, cluster, "pulsar_storage_ledger_write_latency_le_10", stats,
+ s -> s.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets()[3]);
+ writeMetric(stream, cluster, "pulsar_storage_ledger_write_latency_le_20", stats,
+ s -> s.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets()[4]);
+ writeMetric(stream, cluster, "pulsar_storage_ledger_write_latency_le_50", stats,
+ s -> s.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets()[5]);
+ writeMetric(stream, cluster, "pulsar_storage_ledger_write_latency_le_100", stats,
+ s -> s.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets()[6]);
+ writeMetric(stream, cluster, "pulsar_storage_ledger_write_latency_le_200", stats,
+ s -> s.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets()[7]);
+ writeMetric(stream, cluster, "pulsar_storage_ledger_write_latency_le_1000",
+ stats, s -> s.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets()[8]);
+ writeMetric(stream, cluster, "pulsar_storage_ledger_write_latency_overflow",
+ stats, s -> s.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets()[9]);
+ writeMetric(stream, cluster, "pulsar_storage_ledger_write_latency_count",
+ stats, s -> s.managedLedgerStats.storageLedgerWriteLatencyBuckets.getCount());
+ writeMetric(stream, cluster, "pulsar_storage_ledger_write_latency_sum",
+ stats, s -> s.managedLedgerStats.storageLedgerWriteLatencyBuckets.getSum());
+
+ stats.forEach(s -> s.managedLedgerStats.entrySizeBuckets.refresh());
+ writeMetric(stream, cluster, "pulsar_entry_size_le_128", stats,
+ s -> s.managedLedgerStats.entrySizeBuckets.getBuckets()[0]);
+ writeMetric(stream, cluster, "pulsar_entry_size_le_512", stats,
+ s -> s.managedLedgerStats.entrySizeBuckets.getBuckets()[1]);
+ writeMetric(stream, cluster, "pulsar_entry_size_le_1_kb", stats,
+ s -> s.managedLedgerStats.entrySizeBuckets.getBuckets()[2]);
+ writeMetric(stream, cluster, "pulsar_entry_size_le_2_kb", stats,
+ s -> s.managedLedgerStats.entrySizeBuckets.getBuckets()[3]);
+ writeMetric(stream, cluster, "pulsar_entry_size_le_4_kb", stats,
+ s -> s.managedLedgerStats.entrySizeBuckets.getBuckets()[4]);
+ writeMetric(stream, cluster, "pulsar_entry_size_le_16_kb", stats,
+ s -> s.managedLedgerStats.entrySizeBuckets.getBuckets()[5]);
+ writeMetric(stream, cluster, "pulsar_entry_size_le_100_kb", stats,
+ s -> s.managedLedgerStats.entrySizeBuckets.getBuckets()[6]);
+ writeMetric(stream, cluster, "pulsar_entry_size_le_1_mb", stats,
+ s -> s.managedLedgerStats.entrySizeBuckets.getBuckets()[7]);
+ writeMetric(stream, cluster, "pulsar_entry_size_le_overflow", stats,
+ s -> s.managedLedgerStats.entrySizeBuckets.getBuckets()[8]);
+ writeMetric(stream, cluster, "pulsar_entry_size_count",
+ stats, s -> s.managedLedgerStats.entrySizeBuckets.getCount());
+ writeMetric(stream, cluster, "pulsar_entry_size_sum",
+ stats, s -> s.managedLedgerStats.entrySizeBuckets.getSum());
+
+ writeReplicationStat(stream, cluster, "pulsar_replication_rate_in", stats,
+ replStats -> replStats.msgRateIn);
+ writeReplicationStat(stream, cluster, "pulsar_replication_rate_out", stats,
+ replStats -> replStats.msgRateOut);
+ writeReplicationStat(stream, cluster, "pulsar_replication_throughput_in", stats,
+ replStats -> replStats.msgThroughputIn);
+ writeReplicationStat(stream, cluster, "pulsar_replication_throughput_out", stats,
+ replStats -> replStats.msgThroughputOut);
+ writeReplicationStat(stream, cluster, "pulsar_replication_backlog", stats,
+ replStats -> replStats.replicationBacklog);
+ writeReplicationStat(stream, cluster, "pulsar_replication_connected_count", stats,
+ replStats -> replStats.connectedCount);
+ writeReplicationStat(stream, cluster, "pulsar_replication_rate_expired", stats,
+ replStats -> replStats.msgRateExpired);
+ writeReplicationStat(stream, cluster, "pulsar_replication_delay_in_seconds", stats,
+ replStats -> replStats.replicationDelayInSeconds);
}
- private static void printNamespaceStats(Map<String, Collector.MetricFamilySamples> metrics, String cluster,
- String namespace,
- AggregatedNamespaceStats stats) {
- metric(metrics, cluster, namespace, "pulsar_topics_count", stats.topicsCount);
- metric(metrics, cluster, namespace, "pulsar_subscriptions_count", stats.subscriptionsCount);
- metric(metrics, cluster, namespace, "pulsar_producers_count", stats.producersCount);
- metric(metrics, cluster, namespace, "pulsar_consumers_count", stats.consumersCount);
-
- metric(metrics, cluster, namespace, "pulsar_rate_in", stats.rateIn);
- metric(metrics, cluster, namespace, "pulsar_rate_out", stats.rateOut);
- metric(metrics, cluster, namespace, "pulsar_throughput_in", stats.throughputIn);
- metric(metrics, cluster, namespace, "pulsar_throughput_out", stats.throughputOut);
- metric(metrics, cluster, namespace, "pulsar_consumer_msg_ack_rate", stats.messageAckRate);
-
- metric(metrics, cluster, namespace, "pulsar_in_bytes_total", stats.bytesInCounter);
- metric(metrics, cluster, namespace, "pulsar_in_messages_total", stats.msgInCounter);
- metric(metrics, cluster, namespace, "pulsar_out_bytes_total", stats.bytesOutCounter);
- metric(metrics, cluster, namespace, "pulsar_out_messages_total", stats.msgOutCounter);
-
- metric(metrics, cluster, namespace, "pulsar_storage_size", stats.managedLedgerStats.storageSize);
- metric(metrics, cluster, namespace, "pulsar_storage_logical_size", stats.managedLedgerStats.storageLogicalSize);
- metric(metrics, cluster, namespace, "pulsar_storage_backlog_size", stats.managedLedgerStats.backlogSize);
- metric(metrics, cluster, namespace, "pulsar_storage_offloaded_size",
- stats.managedLedgerStats.offloadedStorageUsed);
-
- metric(metrics, cluster, namespace, "pulsar_storage_write_rate", stats.managedLedgerStats.storageWriteRate);
- metric(metrics, cluster, namespace, "pulsar_storage_read_rate", stats.managedLedgerStats.storageReadRate);
-
- metric(metrics, cluster, namespace, "pulsar_subscription_delayed", stats.msgDelayed);
-
- metricWithRemoteCluster(metrics, cluster, namespace, "pulsar_msg_backlog", "local", stats.msgBacklog);
-
- stats.managedLedgerStats.storageWriteLatencyBuckets.refresh();
- long[] latencyBuckets = stats.managedLedgerStats.storageWriteLatencyBuckets.getBuckets();
- metric(metrics, cluster, namespace, "pulsar_storage_write_latency_le_0_5", latencyBuckets[0]);
- metric(metrics, cluster, namespace, "pulsar_storage_write_latency_le_1", latencyBuckets[1]);
- metric(metrics, cluster, namespace, "pulsar_storage_write_latency_le_5", latencyBuckets[2]);
- metric(metrics, cluster, namespace, "pulsar_storage_write_latency_le_10", latencyBuckets[3]);
- metric(metrics, cluster, namespace, "pulsar_storage_write_latency_le_20", latencyBuckets[4]);
- metric(metrics, cluster, namespace, "pulsar_storage_write_latency_le_50", latencyBuckets[5]);
- metric(metrics, cluster, namespace, "pulsar_storage_write_latency_le_100", latencyBuckets[6]);
- metric(metrics, cluster, namespace, "pulsar_storage_write_latency_le_200", latencyBuckets[7]);
- metric(metrics, cluster, namespace, "pulsar_storage_write_latency_le_1000", latencyBuckets[8]);
- metric(metrics, cluster, namespace, "pulsar_storage_write_latency_overflow", latencyBuckets[9]);
- metric(metrics, cluster, namespace, "pulsar_storage_write_latency_count",
- stats.managedLedgerStats.storageWriteLatencyBuckets.getCount());
- metric(metrics, cluster, namespace, "pulsar_storage_write_latency_sum",
- stats.managedLedgerStats.storageWriteLatencyBuckets.getSum());
-
- stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.refresh();
- long[] ledgerWritelatencyBuckets = stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets();
- metric(metrics, cluster, namespace, "pulsar_storage_ledger_write_latency_le_0_5", ledgerWritelatencyBuckets[0]);
- metric(metrics, cluster, namespace, "pulsar_storage_ledger_write_latency_le_1", ledgerWritelatencyBuckets[1]);
- metric(metrics, cluster, namespace, "pulsar_storage_ledger_write_latency_le_5", ledgerWritelatencyBuckets[2]);
- metric(metrics, cluster, namespace, "pulsar_storage_ledger_write_latency_le_10", ledgerWritelatencyBuckets[3]);
- metric(metrics, cluster, namespace, "pulsar_storage_ledger_write_latency_le_20", ledgerWritelatencyBuckets[4]);
- metric(metrics, cluster, namespace, "pulsar_storage_ledger_write_latency_le_50", ledgerWritelatencyBuckets[5]);
- metric(metrics, cluster, namespace, "pulsar_storage_ledger_write_latency_le_100", ledgerWritelatencyBuckets[6]);
- metric(metrics, cluster, namespace, "pulsar_storage_ledger_write_latency_le_200", ledgerWritelatencyBuckets[7]);
- metric(metrics, cluster, namespace, "pulsar_storage_ledger_write_latency_le_1000",
- ledgerWritelatencyBuckets[8]);
- metric(metrics, cluster, namespace, "pulsar_storage_ledger_write_latency_overflow",
- ledgerWritelatencyBuckets[9]);
- metric(metrics, cluster, namespace, "pulsar_storage_ledger_write_latency_count",
- stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getCount());
- metric(metrics, cluster, namespace, "pulsar_storage_ledger_write_latency_sum",
- stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getSum());
-
- stats.managedLedgerStats.entrySizeBuckets.refresh();
- long[] entrySizeBuckets = stats.managedLedgerStats.entrySizeBuckets.getBuckets();
- metric(metrics, cluster, namespace, "pulsar_entry_size_le_128", entrySizeBuckets[0]);
- metric(metrics, cluster, namespace, "pulsar_entry_size_le_512", entrySizeBuckets[1]);
- metric(metrics, cluster, namespace, "pulsar_entry_size_le_1_kb", entrySizeBuckets[2]);
- metric(metrics, cluster, namespace, "pulsar_entry_size_le_2_kb", entrySizeBuckets[3]);
- metric(metrics, cluster, namespace, "pulsar_entry_size_le_4_kb", entrySizeBuckets[4]);
- metric(metrics, cluster, namespace, "pulsar_entry_size_le_16_kb", entrySizeBuckets[5]);
- metric(metrics, cluster, namespace, "pulsar_entry_size_le_100_kb", entrySizeBuckets[6]);
- metric(metrics, cluster, namespace, "pulsar_entry_size_le_1_mb", entrySizeBuckets[7]);
- metric(metrics, cluster, namespace, "pulsar_entry_size_le_overflow", entrySizeBuckets[8]);
- metric(metrics, cluster, namespace, "pulsar_entry_size_count",
- stats.managedLedgerStats.entrySizeBuckets.getCount());
- metric(metrics, cluster, namespace, "pulsar_entry_size_sum",
- stats.managedLedgerStats.entrySizeBuckets.getSum());
-
- if (!stats.replicationStats.isEmpty()) {
- stats.replicationStats.forEach((remoteCluster, replStats) -> {
- metricWithRemoteCluster(metrics, cluster, namespace, "pulsar_replication_rate_in", remoteCluster,
- replStats.msgRateIn);
- metricWithRemoteCluster(metrics, cluster, namespace, "pulsar_replication_rate_out", remoteCluster,
- replStats.msgRateOut);
- metricWithRemoteCluster(metrics, cluster, namespace, "pulsar_replication_throughput_in", remoteCluster,
- replStats.msgThroughputIn);
- metricWithRemoteCluster(metrics, cluster, namespace, "pulsar_replication_throughput_out", remoteCluster,
- replStats.msgThroughputOut);
- metricWithRemoteCluster(metrics, cluster, namespace, "pulsar_replication_backlog", remoteCluster,
- replStats.replicationBacklog);
- metricWithRemoteCluster(metrics, cluster, namespace, "pulsar_replication_connected_count",
- remoteCluster,
- replStats.connectedCount);
- metricWithRemoteCluster(metrics, cluster, namespace, "pulsar_replication_rate_expired", remoteCluster,
- replStats.msgRateExpired);
- metricWithRemoteCluster(metrics, cluster, namespace, "pulsar_replication_delay_in_seconds",
- remoteCluster, replStats.replicationDelayInSeconds);
- });
- }
+ private static void writeMetricWithBrokerDefault(SimpleTextOutputStream stream, String cluster, String name,
+ List<AggregatedNamespaceStats> allNamespaceStats,
+ Function<AggregatedNamespaceStats, Number> namespaceFunction) {
+ stream.write("# TYPE ").write(name).write(" gauge\n");
+ stream.write(name)
+ .write("{cluster=\"").write(cluster).write("\"} ")
+ .write(0).write(' ').write(System.currentTimeMillis())
+ .write('\n');
+ writeNamespaceStats(stream, cluster, name, allNamespaceStats, namespaceFunction);
}
- private static void metric(Map<String, Collector.MetricFamilySamples> metrics, String cluster, String name,
- long value) {
- addMetric(metrics, Map.of("cluster", cluster), name, value);
+ private static void writeMetric(SimpleTextOutputStream stream, String cluster, String name,
+ List<AggregatedNamespaceStats> allNamespaceStats,
+ Function<AggregatedNamespaceStats, Number> namespaceFunction) {
+ stream.write("# TYPE ").write(name).write(" gauge\n");
+ writeNamespaceStats(stream, cluster, name, allNamespaceStats, namespaceFunction);
}
- private static void metric(Map<String, Collector.MetricFamilySamples> metrics, String cluster, String namespace,
- String name, long value) {
- addMetric(metrics, Map.of("cluster", cluster, "namespace", namespace), name, value);
+ private static void writeNamespaceStats(SimpleTextOutputStream stream, String cluster, String name,
+ List<AggregatedNamespaceStats> allNamespaceStats,
+ Function<AggregatedNamespaceStats, Number> namespaceFunction) {
+ allNamespaceStats.forEach(n -> {
+ Number value = namespaceFunction.apply(n);
+ if (value != null) {
+ stream.write(name)
+ .write("{cluster=\"").write(cluster)
+ .write("\",namespace=\"").write(n.name)
+ .write("\"} ")
+ .write(value).write(' ').write(System.currentTimeMillis())
+ .write('\n');
+ }
+ });
}
- private static void metric(Map<String, Collector.MetricFamilySamples> metrics, String cluster, String namespace,
- String name, double value) {
- addMetric(metrics, Map.of("cluster", cluster, "namespace", namespace), name, value);
+ private static void writeMsgBacklog(SimpleTextOutputStream stream, String cluster,
+ List<AggregatedNamespaceStats> allNamespaceStats,
+ Function<AggregatedNamespaceStats, Number> namespaceFunction) {
+ stream.write("# TYPE ").write("pulsar_msg_backlog").write(" gauge\n");
+ stream.write("pulsar_msg_backlog")
Review Comment:
This was originally written in `printDefaultBrokerStats` which was always called.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org