You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/10/06 05:12:48 UTC
[pulsar] 05/12: [Metrics] Add support for splitting topic and
partition label in Prometheus (#12225)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 5fad80cd5e3bb19850738e05fe764f18516def80
Author: lipenghui <pe...@apache.org>
AuthorDate: Thu Sep 30 11:56:09 2021 +0800
[Metrics] Add support for splitting topic and partition label in Prometheus (#12225)
* [Metrics] Add support for splitting topic and partition label in Prometheus
Fix: #11432
Currently, we are only expose the partition name for the topic label in Prometheus metrics, which is difficult to
have an aggregated metrics for a partitioned topic.
Before this change, we can only get (topic=xxx-partition-0) in the metrics. After this change, we can get 2 labels (topic=xxx, partition=0).
By default, the broker expose the single tag for topic. It need to change `splitTopicAndPartitionLabelInPrometheus=true` in the broker.conf
New tests added.
* Fix checkstyle.
(cherry picked from commit 039079e850e7756c18929119ec215fff8ada643d)
---
conf/broker.conf | 9 +
conf/standalone.conf | 9 +
.../apache/pulsar/broker/ServiceConfiguration.java | 11 +
.../org/apache/pulsar/broker/PulsarService.java | 3 +-
.../stats/prometheus/NamespaceStatsAggregator.java | 5 +-
.../prometheus/PrometheusMetricsGenerator.java | 14 +-
.../stats/prometheus/PrometheusMetricsServlet.java | 7 +-
.../pulsar/broker/stats/prometheus/TopicStats.java | 352 ++++++++++++---------
.../pulsar/broker/stats/PrometheusMetricsTest.java | 42 +++
9 files changed, 303 insertions(+), 149 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index 775f9aa..d81e86c 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -1172,6 +1172,15 @@ statsUpdateInitialDelayInSecs=60
# Default is false.
exposePreciseBacklogInPrometheus=false
+# Enable splitting topic and partition label in Prometheus.
+# If enabled, a topic name will split into 2 parts, one is topic name without partition index,
+# another one is partition index, e.g. (topic=xxx, partition=0).
+# If the topic is a non-partitioned topic, -1 will be used for the partition index.
+# If disabled, one label to represent the topic and partition, e.g. (topic=xxx-partition-0)
+# Default is false.
+
+splitTopicAndPartitionLabelInPrometheus=false
+
### --- Schema storage --- ###
# The schema storage implementation used by this broker
schemaRegistryStorageClassName=org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorageFactory
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 363ce3b..e13723f 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -855,6 +855,15 @@ exposePublisherStats=true
# Default is false.
exposePreciseBacklogInPrometheus=false
+# Enable splitting topic and partition label in Prometheus.
+# If enabled, a topic name will split into 2 parts, one is topic name without partition index,
+# another one is partition index, e.g. (topic=xxx, partition=0).
+# If the topic is a non-partitioned topic, -1 will be used for the partition index.
+# If disabled, one label to represent the topic and partition, e.g. (topic=xxx-partition-0)
+# Default is false.
+
+splitTopicAndPartitionLabelInPrometheus=false
+
### --- Deprecated config variables --- ###
# Deprecated. Use configurationStoreServers
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index a223b58..c9b0c48 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1995,6 +1995,17 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private boolean exposeSubscriptionBacklogSizeInPrometheus = false;
+ @FieldContext(
+ category = CATEGORY_METRICS,
+ doc = "Enable splitting topic and partition label in Prometheus.\n" +
+ " If enabled, a topic name will split into 2 parts, one is topic name without partition index,\n" +
+ " another one is partition index, e.g. (topic=xxx, partition=0).\n" +
+ " If the topic is a non-partitioned topic, -1 will be used for the partition index.\n" +
+ " If disabled, one label to represent the topic and partition, e.g. (topic=xxx-partition-0)\n" +
+ " Default is false."
+ )
+ private boolean splitTopicAndPartitionLabelInPrometheus = false;
+
/**** --- Functions --- ****/
@FieldContext(
category = CATEGORY_FUNCTIONS,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index f3eae1a..23f50b6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -680,7 +680,8 @@ public class PulsarService implements AutoCloseable {
this.metricsServlet = new PrometheusMetricsServlet(
this, config.isExposeTopicLevelMetricsInPrometheus(),
config.isExposeConsumerLevelMetricsInPrometheus(),
- config.isExposeProducerLevelMetricsInPrometheus());
+ config.isExposeProducerLevelMetricsInPrometheus(),
+ config.isSplitTopicAndPartitionLabelInPrometheus());
if (pendingMetricsProviders != null) {
pendingMetricsProviders.forEach(provider -> metricsServlet.addRawMetricsProvider(provider));
this.pendingMetricsProviders = null;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
index a2661ed..9df9ad4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
@@ -57,7 +57,7 @@ public class NamespaceStatsAggregator {
};
public static void generate(PulsarService pulsar, boolean includeTopicMetrics, boolean includeConsumerMetrics,
- boolean includeProducerMetrics, SimpleTextOutputStream stream) {
+ boolean includeProducerMetrics, boolean splitTopicAndPartitionIndexLabel, SimpleTextOutputStream stream) {
String cluster = pulsar.getConfiguration().getClusterName();
AggregatedNamespaceStats namespaceStats = localNamespaceStats.get();
TopicStats.resetTypes();
@@ -81,7 +81,8 @@ public class NamespaceStatsAggregator {
if (includeTopicMetrics) {
topicsCount.add(1);
- TopicStats.printTopicStats(stream, cluster, namespace, name, topicStats, compactorMXBean);
+ TopicStats.printTopicStats(stream, cluster, namespace, name, topicStats, compactorMXBean,
+ splitTopicAndPartitionIndexLabel);
} else {
namespaceStats.updateStats(topicStats);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
index 8f303a3..9d5e1c7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
@@ -87,11 +87,19 @@ public class PrometheusMetricsGenerator {
public static void generate(PulsarService pulsar, boolean includeTopicMetrics, boolean includeConsumerMetrics,
boolean includeProducerMetrics, OutputStream out) throws IOException {
- generate(pulsar, includeTopicMetrics, includeConsumerMetrics, includeProducerMetrics, out, null);
+ generate(pulsar, includeTopicMetrics, includeConsumerMetrics, includeProducerMetrics, false, out, null);
}
public static void generate(PulsarService pulsar, boolean includeTopicMetrics, boolean includeConsumerMetrics,
- boolean includeProducerMetrics, OutputStream out, List<PrometheusRawMetricsProvider> metricsProviders)
+ boolean includeProducerMetrics, boolean splitTopicAndPartitionIndexLabel,
+ OutputStream out) throws IOException {
+ generate(pulsar, includeTopicMetrics, includeConsumerMetrics, includeProducerMetrics,
+ splitTopicAndPartitionIndexLabel, out, null);
+ }
+
+ public static void generate(PulsarService pulsar, boolean includeTopicMetrics, boolean includeConsumerMetrics,
+ boolean includeProducerMetrics, boolean splitTopicAndPartitionIndexLabel, OutputStream out,
+ List<PrometheusRawMetricsProvider> metricsProviders)
throws IOException {
ByteBuf buf = ByteBufAllocator.DEFAULT.heapBuffer();
try {
@@ -100,7 +108,7 @@ public class PrometheusMetricsGenerator {
generateSystemMetrics(stream, pulsar.getConfiguration().getClusterName());
NamespaceStatsAggregator.generate(pulsar, includeTopicMetrics, includeConsumerMetrics,
- includeProducerMetrics, stream);
+ includeProducerMetrics, splitTopicAndPartitionIndexLabel, stream);
if (pulsar.getWorkerServiceOpt().isPresent()) {
pulsar.getWorkerService().generateFunctionsStats(stream);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java
index 026c26f..145f7a7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java
@@ -44,17 +44,19 @@ public class PrometheusMetricsServlet extends HttpServlet {
private final boolean shouldExportConsumerMetrics;
private final boolean shouldExportProducerMetrics;
private final long metricsServletTimeoutMs;
+ private final boolean splitTopicAndPartitionLabel;
private List<PrometheusRawMetricsProvider> metricsProviders;
private ExecutorService executor = null;
public PrometheusMetricsServlet(PulsarService pulsar, boolean includeTopicMetrics, boolean includeConsumerMetrics,
- boolean shouldExportProducerMetrics) {
+ boolean shouldExportProducerMetrics, boolean splitTopicAndPartitionLabel) {
this.pulsar = pulsar;
this.shouldExportTopicMetrics = includeTopicMetrics;
this.shouldExportConsumerMetrics = includeConsumerMetrics;
this.shouldExportProducerMetrics = shouldExportProducerMetrics;
this.metricsServletTimeoutMs = pulsar.getConfiguration().getMetricsServletTimeoutMs();
+ this.splitTopicAndPartitionLabel = splitTopicAndPartitionLabel;
}
@Override
@@ -73,7 +75,8 @@ public class PrometheusMetricsServlet extends HttpServlet {
res.setStatus(HttpStatus.OK_200);
res.setContentType("text/plain");
PrometheusMetricsGenerator.generate(pulsar, shouldExportTopicMetrics, shouldExportConsumerMetrics,
- shouldExportProducerMetrics, res.getOutputStream(), metricsProviders);
+ shouldExportProducerMetrics, splitTopicAndPartitionLabel, res.getOutputStream(),
+ metricsProviders);
context.complete();
} catch (Exception e) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
index 61e0175..bfa427e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.stats.prometheus;
+import static org.apache.pulsar.common.naming.TopicName.PARTITIONED_TOPIC_SUFFIX;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
@@ -104,222 +105,263 @@ class TopicStats {
}
static void printTopicStats(SimpleTextOutputStream stream, String cluster, String namespace, String topic,
- TopicStats stats, Optional<CompactorMXBean> compactorMXBean) {
- metric(stream, cluster, namespace, topic, "pulsar_subscriptions_count", stats.subscriptionsCount);
- metric(stream, cluster, namespace, topic, "pulsar_producers_count", stats.producersCount);
- metric(stream, cluster, namespace, topic, "pulsar_consumers_count", stats.consumersCount);
-
- metric(stream, cluster, namespace, topic, "pulsar_rate_in", stats.rateIn);
- metric(stream, cluster, namespace, topic, "pulsar_rate_out", stats.rateOut);
- metric(stream, cluster, namespace, topic, "pulsar_throughput_in", stats.throughputIn);
- metric(stream, cluster, namespace, topic, "pulsar_throughput_out", stats.throughputOut);
- metric(stream, cluster, namespace, topic, "pulsar_average_msg_size", stats.averageMsgSize);
-
- metric(stream, cluster, namespace, topic, "pulsar_storage_size", stats.managedLedgerStats.storageSize);
+ TopicStats stats, Optional<CompactorMXBean> compactorMXBean,
+ boolean splitTopicAndPartitionIndexLabel) {
+ metric(stream, cluster, namespace, topic, "pulsar_subscriptions_count", stats.subscriptionsCount,
+ splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_producers_count", stats.producersCount,
+ splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_consumers_count", stats.consumersCount,
+ splitTopicAndPartitionIndexLabel);
+
+ metric(stream, cluster, namespace, topic, "pulsar_rate_in", stats.rateIn,
+ splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_rate_out", stats.rateOut,
+ splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_throughput_in", stats.throughputIn,
+ splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_throughput_out", stats.throughputOut,
+ splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_average_msg_size", stats.averageMsgSize,
+ splitTopicAndPartitionIndexLabel);
+
+ metric(stream, cluster, namespace, topic, "pulsar_storage_size", stats.managedLedgerStats.storageSize,
+ splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_storage_logical_size",
- stats.managedLedgerStats.storageLogicalSize);
- metric(stream, cluster, namespace, topic, "pulsar_msg_backlog", stats.msgBacklog);
+ stats.managedLedgerStats.storageLogicalSize, splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_msg_backlog", stats.msgBacklog,
+ splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_storage_backlog_size",
- stats.managedLedgerStats.backlogSize);
+ stats.managedLedgerStats.backlogSize, splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_storage_offloaded_size", stats.managedLedgerStats
- .offloadedStorageUsed);
- metric(stream, cluster, namespace, topic, "pulsar_storage_backlog_quota_limit", stats.backlogQuotaLimit);
+ .offloadedStorageUsed, splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_storage_backlog_quota_limit", stats.backlogQuotaLimit,
+ splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_storage_backlog_quota_limit_time",
- stats.backlogQuotaLimitTime);
+ stats.backlogQuotaLimitTime, splitTopicAndPartitionIndexLabel);
long[] latencyBuckets = stats.managedLedgerStats.storageWriteLatencyBuckets.getBuckets();
- metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_0_5", latencyBuckets[0]);
- metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_1", latencyBuckets[1]);
- metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_5", latencyBuckets[2]);
- metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_10", latencyBuckets[3]);
- metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_20", latencyBuckets[4]);
- metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_50", latencyBuckets[5]);
- metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_100", latencyBuckets[6]);
- metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_200", latencyBuckets[7]);
- metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_1000", latencyBuckets[8]);
- metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_overflow", latencyBuckets[9]);
+ metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_0_5", latencyBuckets[0],
+ splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_1", latencyBuckets[1],
+ splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_5", latencyBuckets[2],
+ splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_10", latencyBuckets[3],
+ splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_20", latencyBuckets[4],
+ splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_50", latencyBuckets[5],
+ splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_100", latencyBuckets[6],
+ splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_200", latencyBuckets[7],
+ splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_1000", latencyBuckets[8],
+ splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_overflow", latencyBuckets[9],
+ splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_count",
- stats.managedLedgerStats.storageWriteLatencyBuckets.getCount());
+ stats.managedLedgerStats.storageWriteLatencyBuckets.getCount(), splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_sum",
- stats.managedLedgerStats.storageWriteLatencyBuckets.getSum());
+ stats.managedLedgerStats.storageWriteLatencyBuckets.getSum(), splitTopicAndPartitionIndexLabel);
long[] ledgerWriteLatencyBuckets = stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets();
metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_0_5",
- ledgerWriteLatencyBuckets[0]);
+ ledgerWriteLatencyBuckets[0], splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_1",
- ledgerWriteLatencyBuckets[1]);
+ ledgerWriteLatencyBuckets[1], splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_5",
- ledgerWriteLatencyBuckets[2]);
+ ledgerWriteLatencyBuckets[2], splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_10",
- ledgerWriteLatencyBuckets[3]);
+ ledgerWriteLatencyBuckets[3], splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_20",
- ledgerWriteLatencyBuckets[4]);
+ ledgerWriteLatencyBuckets[4], splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_50",
- ledgerWriteLatencyBuckets[5]);
+ ledgerWriteLatencyBuckets[5], splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_100",
- ledgerWriteLatencyBuckets[6]);
+ ledgerWriteLatencyBuckets[6], splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_200",
- ledgerWriteLatencyBuckets[7]);
+ ledgerWriteLatencyBuckets[7], splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_1000",
- ledgerWriteLatencyBuckets[8]);
+ ledgerWriteLatencyBuckets[8], splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_overflow",
- ledgerWriteLatencyBuckets[9]);
+ ledgerWriteLatencyBuckets[9], splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_count",
- stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getCount());
+ stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getCount(),
+ splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_sum",
- stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getSum());
+ stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getSum(),
+ splitTopicAndPartitionIndexLabel);
long[] entrySizeBuckets = stats.managedLedgerStats.entrySizeBuckets.getBuckets();
- metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_128", entrySizeBuckets[0]);
- metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_512", entrySizeBuckets[1]);
- metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_1_kb", entrySizeBuckets[2]);
- metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_2_kb", entrySizeBuckets[3]);
- metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_4_kb", entrySizeBuckets[4]);
- metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_16_kb", entrySizeBuckets[5]);
- metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_100_kb", entrySizeBuckets[6]);
- metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_1_mb", entrySizeBuckets[7]);
- metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_overflow", entrySizeBuckets[8]);
+ metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_128", entrySizeBuckets[0],
+ splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_512", entrySizeBuckets[1],
+ splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_1_kb", entrySizeBuckets[2],
+ splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_2_kb", entrySizeBuckets[3],
+ splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_4_kb", entrySizeBuckets[4],
+ splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_16_kb", entrySizeBuckets[5],
+ splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_100_kb", entrySizeBuckets[6],
+ splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_1_mb", entrySizeBuckets[7],
+ splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_overflow", entrySizeBuckets[8],
+ splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_entry_size_count",
- stats.managedLedgerStats.entrySizeBuckets.getCount());
+ stats.managedLedgerStats.entrySizeBuckets.getCount(), splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_entry_size_sum",
- stats.managedLedgerStats.entrySizeBuckets.getSum());
+ stats.managedLedgerStats.entrySizeBuckets.getSum(), splitTopicAndPartitionIndexLabel);
stats.producerStats.forEach((p, producerStats) -> {
metric(stream, cluster, namespace, topic, p, producerStats.producerId, "pulsar_producer_msg_rate_in",
- producerStats.msgRateIn);
+ producerStats.msgRateIn, splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, p, producerStats.producerId, "pulsar_producer_msg_throughput_in",
- producerStats.msgThroughputIn);
+ producerStats.msgThroughputIn, splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, p, producerStats.producerId, "pulsar_producer_msg_average_Size",
- producerStats.averageMsgSize);
+ producerStats.averageMsgSize, splitTopicAndPartitionIndexLabel);
});
stats.subscriptionStats.forEach((n, subsStats) -> {
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_back_log",
- subsStats.msgBacklog);
+ subsStats.msgBacklog, splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_back_log_no_delayed",
- subsStats.msgBacklogNoDelayed);
+ subsStats.msgBacklogNoDelayed, splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_delayed",
- subsStats.msgDelayed);
+ subsStats.msgDelayed, splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_rate_redeliver",
- subsStats.msgRateRedeliver);
+ subsStats.msgRateRedeliver, splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_unacked_messages",
- subsStats.unackedMessages);
+ subsStats.unackedMessages, splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_blocked_on_unacked_messages",
- subsStats.blockedSubscriptionOnUnackedMsgs ? 1 : 0);
+ subsStats.blockedSubscriptionOnUnackedMsgs ? 1 : 0, splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_rate_out",
- subsStats.msgRateOut);
+ subsStats.msgRateOut, splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_throughput_out",
- subsStats.msgThroughputOut);
+ subsStats.msgThroughputOut, splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, n, "pulsar_out_bytes_total",
- subsStats.bytesOutCounter);
+ subsStats.bytesOutCounter, splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, n, "pulsar_out_messages_total",
- subsStats.msgOutCounter);
+ subsStats.msgOutCounter, splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_last_expire_timestamp",
- subsStats.lastExpireTimestamp);
+ subsStats.lastExpireTimestamp, splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_last_acked_timestamp",
- subsStats.lastAckedTimestamp);
+ subsStats.lastAckedTimestamp, splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_last_consumed_flow_timestamp",
- subsStats.lastConsumedFlowTimestamp);
+ subsStats.lastConsumedFlowTimestamp, splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_last_consumed_timestamp",
- subsStats.lastConsumedTimestamp);
+ subsStats.lastConsumedTimestamp, splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_last_mark_delete_advanced_timestamp",
- subsStats.lastMarkDeleteAdvancedTimestamp);
+ subsStats.lastMarkDeleteAdvancedTimestamp, splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_rate_expired",
- subsStats.msgRateExpired);
+ subsStats.msgRateExpired, splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_total_msg_expired",
- subsStats.totalMsgExpired);
+ subsStats.totalMsgExpired, splitTopicAndPartitionIndexLabel);
subsStats.consumerStat.forEach((c, consumerStats) -> {
metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(),
- "pulsar_consumer_msg_rate_redeliver", consumerStats.msgRateRedeliver);
+ "pulsar_consumer_msg_rate_redeliver", consumerStats.msgRateRedeliver,
+ splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(),
- "pulsar_consumer_unacked_messages", consumerStats.unackedMessages);
+ "pulsar_consumer_unacked_messages", consumerStats.unackedMessages,
+ splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(),
"pulsar_consumer_blocked_on_unacked_messages",
- consumerStats.blockedSubscriptionOnUnackedMsgs ? 1 : 0);
+ consumerStats.blockedSubscriptionOnUnackedMsgs ? 1 : 0,
+ splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(),
- "pulsar_consumer_msg_rate_out", consumerStats.msgRateOut);
+ "pulsar_consumer_msg_rate_out", consumerStats.msgRateOut,
+ splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(),
- "pulsar_consumer_msg_throughput_out", consumerStats.msgThroughputOut);
+ "pulsar_consumer_msg_throughput_out", consumerStats.msgThroughputOut,
+ splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(),
- "pulsar_consumer_available_permits", consumerStats.availablePermits);
+ "pulsar_consumer_available_permits", consumerStats.availablePermits,
+ splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(),
- "pulsar_out_bytes_total", consumerStats.bytesOutCounter);
+ "pulsar_out_bytes_total", consumerStats.bytesOutCounter,
+ splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(),
- "pulsar_out_messages_total", consumerStats.msgOutCounter);
+ "pulsar_out_messages_total", consumerStats.msgOutCounter,
+ splitTopicAndPartitionIndexLabel);
});
});
if (!stats.replicationStats.isEmpty()) {
stats.replicationStats.forEach((remoteCluster, replStats) -> {
metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_rate_in", remoteCluster,
- replStats.msgRateIn);
+ replStats.msgRateIn, splitTopicAndPartitionIndexLabel);
metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_rate_out", remoteCluster,
- replStats.msgRateOut);
+ replStats.msgRateOut, splitTopicAndPartitionIndexLabel);
metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_throughput_in",
- remoteCluster,
- replStats.msgThroughputIn);
+ remoteCluster, replStats.msgThroughputIn, splitTopicAndPartitionIndexLabel);
metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_throughput_out",
- remoteCluster,
- replStats.msgThroughputOut);
+ remoteCluster, replStats.msgThroughputOut, splitTopicAndPartitionIndexLabel);
metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_backlog", remoteCluster,
- replStats.replicationBacklog);
+ replStats.replicationBacklog, splitTopicAndPartitionIndexLabel);
metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_connected_count",
- remoteCluster, replStats.connectedCount);
+ remoteCluster, replStats.connectedCount, splitTopicAndPartitionIndexLabel);
metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_rate_expired",
- remoteCluster, replStats.msgRateExpired);
+ remoteCluster, replStats.msgRateExpired, splitTopicAndPartitionIndexLabel);
metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_delay_in_seconds",
- remoteCluster, replStats.replicationDelayInSeconds);
+ remoteCluster, replStats.replicationDelayInSeconds, splitTopicAndPartitionIndexLabel);
});
}
- metric(stream, cluster, namespace, topic, "pulsar_in_bytes_total", stats.bytesInCounter);
- metric(stream, cluster, namespace, topic, "pulsar_in_messages_total", stats.msgInCounter);
+ metric(stream, cluster, namespace, topic, "pulsar_in_bytes_total", stats.bytesInCounter,
+ splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_in_messages_total", stats.msgInCounter,
+ splitTopicAndPartitionIndexLabel);
// Compaction
boolean hasCompaction = compactorMXBean.flatMap(mxBean -> mxBean.getCompactionRecordForTopic(topic))
.map(__ -> true).orElse(false);
if (hasCompaction) {
metric(stream, cluster, namespace, topic, "pulsar_compaction_removed_event_count",
- stats.compactionRemovedEventCount);
+ stats.compactionRemovedEventCount, splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_compaction_succeed_count",
- stats.compactionSucceedCount);
+ stats.compactionSucceedCount, splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_compaction_failed_count",
- stats.compactionFailedCount);
+ stats.compactionFailedCount, splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_compaction_duration_time_in_mills",
- stats.compactionDurationTimeInMills);
+ stats.compactionDurationTimeInMills, splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_compaction_read_throughput",
- stats.compactionReadThroughput);
+ stats.compactionReadThroughput, splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_compaction_write_throughput",
- stats.compactionWriteThroughput);
+ stats.compactionWriteThroughput, splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_compaction_compacted_entries_count",
- stats.compactionCompactedEntriesCount);
+ stats.compactionCompactedEntriesCount, splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_compaction_compacted_entries_size",
- stats.compactionCompactedEntriesSize);
+ stats.compactionCompactedEntriesSize, splitTopicAndPartitionIndexLabel);
long[] compactionLatencyBuckets = stats.compactionLatencyBuckets.getBuckets();
metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_0_5",
- compactionLatencyBuckets[0]);
+ compactionLatencyBuckets[0], splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_1",
- compactionLatencyBuckets[1]);
+ compactionLatencyBuckets[1], splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_5",
- compactionLatencyBuckets[2]);
+ compactionLatencyBuckets[2], splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_10",
- compactionLatencyBuckets[3]);
+ compactionLatencyBuckets[3], splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_20",
- compactionLatencyBuckets[4]);
+ compactionLatencyBuckets[4], splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_50",
- compactionLatencyBuckets[5]);
+ compactionLatencyBuckets[5], splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_100",
- compactionLatencyBuckets[6]);
+ compactionLatencyBuckets[6], splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_200",
- compactionLatencyBuckets[7]);
+ compactionLatencyBuckets[7], splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_1000",
- compactionLatencyBuckets[8]);
+ compactionLatencyBuckets[8], splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_overflow",
- compactionLatencyBuckets[9]);
+ compactionLatencyBuckets[9], splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_sum",
- stats.compactionLatencyBuckets.getSum());
+ stats.compactionLatencyBuckets.getSum(), splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_count",
- stats.compactionLatencyBuckets.getCount());
+ stats.compactionLatencyBuckets.getCount(), splitTopicAndPartitionIndexLabel);
}
}
@@ -333,64 +375,92 @@ class TopicStats {
}
private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic,
- String name, double value) {
+ String name, double value, boolean splitTopicAndPartitionIndexLabel) {
metricType(stream, name);
- stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace)
- .write("\",topic=\"").write(topic).write("\"} ");
- stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
+ appendRequiredLabels(stream, cluster, namespace, topic, name, splitTopicAndPartitionIndexLabel).write("\"} ");
+ stream.write(value);
+ appendEndings(stream);
}
private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic,
- String subscription, String name, long value) {
+ String subscription, String name, long value, boolean splitTopicAndPartitionIndexLabel) {
metricType(stream, name);
- stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace)
- .write("\",topic=\"").write(topic).write("\",subscription=\"").write(subscription).write("\"} ");
- stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
+ appendRequiredLabels(stream, cluster, namespace, topic, name, splitTopicAndPartitionIndexLabel)
+ .write("\",subscription=\"").write(subscription).write("\"} ");
+ stream.write(value);
+ appendEndings(stream);
}
private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic,
- String producerName, long produceId, String name, double value) {
+ String producerName, long produceId, String name, double value, boolean splitTopicAndPartitionIndexLabel) {
metricType(stream, name);
- stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace)
- .write("\",topic=\"").write(topic).write("\",producer_name=\"").write(producerName)
+ appendRequiredLabels(stream, cluster, namespace, topic, name, splitTopicAndPartitionIndexLabel)
+ .write("\",producer_name=\"").write(producerName)
.write("\",producer_id=\"").write(produceId).write("\"} ");
- stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
+ stream.write(value);
+ appendEndings(stream);
}
private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic,
- String subscription, String name, double value) {
+ String subscription, String name, double value, boolean splitTopicAndPartitionIndexLabel) {
metricType(stream, name);
- stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace)
- .write("\",topic=\"").write(topic).write("\",subscription=\"").write(subscription).write("\"} ");
- stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
+ appendRequiredLabels(stream, cluster, namespace, topic, name, splitTopicAndPartitionIndexLabel)
+ .write("\",subscription=\"").write(subscription).write("\"} ");
+ stream.write(value);
+ appendEndings(stream);
}
private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic,
- String subscription, String consumerName, long consumerId, String name, long value) {
+ String subscription, String consumerName, long consumerId, String name, long value,
+ boolean splitTopicAndPartitionIndexLabel) {
metricType(stream, name);
- stream.write(name).write("{cluster=\"").write(cluster).write("\", namespace=\"").write(namespace)
- .write("\",topic=\"").write(topic).write("\",subscription=\"").write(subscription)
+ appendRequiredLabels(stream, cluster, namespace, topic, name, splitTopicAndPartitionIndexLabel)
+ .write("\",subscription=\"").write(subscription)
.write("\",consumer_name=\"").write(consumerName).write("\",consumer_id=\"").write(consumerId)
.write("\"} ");
- stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
+ stream.write(value);
+ appendEndings(stream);
}
private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic,
- String subscription, String consumerName, long consumerId, String name, double value) {
+ String subscription, String consumerName, long consumerId, String name, double value,
+ boolean splitTopicAndPartitionIndexLabel) {
metricType(stream, name);
- stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace)
- .write("\",topic=\"").write(topic).write("\",subscription=\"").write(subscription)
+ appendRequiredLabels(stream, cluster, namespace, topic, name, splitTopicAndPartitionIndexLabel)
+ .write("\",subscription=\"").write(subscription)
.write("\",consumer_name=\"").write(consumerName).write("\",consumer_id=\"")
.write(consumerId).write("\"} ");
- stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
+ stream.write(value);
+ appendEndings(stream);
}
private static void metricWithRemoteCluster(SimpleTextOutputStream stream, String cluster, String namespace,
- String topic,
- String name, String remoteCluster, double value) {
+ String topic, String name, String remoteCluster, double value, boolean splitTopicAndPartitionIndexLabel) {
metricType(stream, name);
+ appendRequiredLabels(stream, cluster, namespace, topic, name, splitTopicAndPartitionIndexLabel)
+ .write("\",remote_cluster=\"").write(remoteCluster).write("\"} ");
+ stream.write(value);
+ appendEndings(stream);
+ }
+
+ private static SimpleTextOutputStream appendRequiredLabels(SimpleTextOutputStream stream, String cluster,
+ String namespace, String topic, String name, boolean splitTopicAndPartitionIndexLabel) {
stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace);
- stream.write("\",topic=\"").write(topic).write("\",remote_cluster=\"").write(remoteCluster).write("\"} ");
- stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
+ if (splitTopicAndPartitionIndexLabel) {
+ int index = topic.indexOf(PARTITIONED_TOPIC_SUFFIX);
+ if (index > 0) {
+ stream.write("\",topic=\"").write(topic.substring(0, index)).write("\",partition=\"")
+ .write(topic.substring(index + PARTITIONED_TOPIC_SUFFIX.length()));
+ } else {
+ stream.write("\",topic=\"").write(topic).write("\",partition=\"").write("-1");
+ }
+ } else {
+ stream.write("\",topic=\"").write(topic);
+ }
+ return stream;
+ }
+
+ private static void appendEndings(SimpleTextOutputStream stream) {
+ stream.write(' ').write(System.currentTimeMillis()).write('\n');
}
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index 3de3a04..9f09893 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -1222,6 +1222,48 @@ public class PrometheusMetricsTest extends BrokerTestBase {
pulsarClient.close();
}
+ @Test
+ public void testSplitTopicAndPartitionLabel() throws Exception {
+ String ns1 = "prop/ns-abc1";
+ String ns2 = "prop/ns-abc2";
+ admin.namespaces().createNamespace(ns1);
+ admin.namespaces().createNamespace(ns2);
+ String baseTopic1 = "persistent://" + ns1 + "/testMetricsTopicCount";
+ String baseTopic2 = "persistent://" + ns2 + "/testMetricsTopicCount";
+ for (int i = 0; i < 6; i++) {
+ admin.topics().createNonPartitionedTopic(baseTopic1 + UUID.randomUUID());
+ }
+ for (int i = 0; i < 3; i++) {
+ admin.topics().createPartitionedTopic(baseTopic2 + UUID.randomUUID(), 3);
+ }
+ Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
+ .topicsPattern("persistent://" + ns1 + "/.*")
+ .subscriptionName("sub")
+ .subscribe();
+ Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
+ .topicsPattern("persistent://" + ns2 + "/.*")
+ .subscriptionName("sub")
+ .subscribe();
+ @Cleanup
+ ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
+ PrometheusMetricsGenerator.generate(pulsar, true, false, false, true, statsOut);
+ String metricsStr = statsOut.toString();
+ Multimap<String, Metric> metrics = parseMetrics(metricsStr);
+ Collection<Metric> metric = metrics.get("pulsar_consumers_count");
+ assertTrue(metric.size() >= 15);
+ metric.forEach(item -> {
+ if (ns1.equals(item.tags.get("namespace"))) {
+ assertEquals(item.tags.get("partition"), "-1");
+ }
+ if (ns2.equals(item.tags.get("namespace"))) {
+ System.out.println(item);
+ assertTrue(Integer.parseInt(item.tags.get("partition")) >= 0);
+ }
+ });
+ consumer1.close();
+ consumer2.close();
+ }
+
private void compareCompactionStateCount(List<Metric> cm, double count) {
assertEquals(cm.size(), 1);
assertEquals(cm.get(0).tags.get("cluster"), "test");