You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/09/20 08:38:13 UTC
[pulsar] branch branch-2.9 updated: [fix][broker][functions-worker] Ensure prometheus metrics are grouped by type (#8407, #13865) (#17618)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new bc69cfbe9d3 [fix][broker][functions-worker] Ensure prometheus metrics are grouped by type (#8407, #13865) (#17618)
bc69cfbe9d3 is described below
commit bc69cfbe9d3cf06fc6e407c284eed45f095b41be
Author: Mark Silcox <63...@users.noreply.github.com>
AuthorDate: Tue Sep 20 09:38:06 2022 +0100
[fix][broker][functions-worker] Ensure prometheus metrics are grouped by type (#8407, #13865) (#17618)
---
.../stats/prometheus/AggregatedNamespaceStats.java | 2 +-
.../stats/prometheus/NamespaceStatsAggregator.java | 354 ++++++------
.../stats/prometheus/PrometheusMetricStreams.java | 75 +++
.../prometheus/PrometheusMetricsGenerator.java | 60 ++-
.../pulsar/broker/stats/prometheus/TopicStats.java | 598 ++++++++++-----------
.../stats/prometheus/TransactionAggregator.java | 321 +++++------
.../metrics/PrometheusTextFormatUtil.java | 32 --
.../pulsar/broker/stats/PrometheusMetricsTest.java | 58 ++
.../prometheus/PrometheusMetricStreamsTest.java | 85 +++
.../pulsar/common/util/SimpleTextOutputStream.java | 13 +-
.../instance/stats/PrometheusTextFormat.java | 5 +
.../functions/worker/WorkerStatsManager.java | 5 +
12 files changed, 878 insertions(+), 730 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
index 5610dbab218..1980af91b7b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
@@ -96,7 +96,7 @@ public class AggregatedNamespaceStats {
stats.replicationStats.forEach((n, as) -> {
AggregatedReplicationStats replStats =
- replicationStats.computeIfAbsent(n, k -> new AggregatedReplicationStats());
+ replicationStats.computeIfAbsent(n, k -> new AggregatedReplicationStats());
replStats.msgRateIn += as.msgRateIn;
replStats.msgRateOut += as.msgRateOut;
replStats.msgThroughputIn += as.msgThroughputIn;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
index 16e438e2a2e..29915f071c0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
@@ -19,8 +19,11 @@
package org.apache.pulsar.broker.stats.prometheus;
import io.netty.util.concurrent.FastThreadLocal;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.LongAdder;
+import java.util.function.Function;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.ManagedLedger;
@@ -32,7 +35,6 @@ import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
import org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl;
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
-import org.apache.pulsar.common.util.SimpleTextOutputStream;
import org.apache.pulsar.compaction.CompactedTopicContext;
import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.compaction.CompactorMXBean;
@@ -40,72 +42,75 @@ import org.apache.pulsar.compaction.CompactorMXBean;
@Slf4j
public class NamespaceStatsAggregator {
- private static FastThreadLocal<AggregatedNamespaceStats> localNamespaceStats =
+ private static final FastThreadLocal<AggregatedNamespaceStats> localNamespaceStats =
new FastThreadLocal<AggregatedNamespaceStats>() {
@Override
- protected AggregatedNamespaceStats initialValue() throws Exception {
+ protected AggregatedNamespaceStats initialValue() {
return new AggregatedNamespaceStats();
}
};
- private static FastThreadLocal<TopicStats> localTopicStats = new FastThreadLocal<TopicStats>() {
+ private static final FastThreadLocal<TopicStats> localTopicStats = new FastThreadLocal<TopicStats>() {
@Override
- protected TopicStats initialValue() throws Exception {
+ protected TopicStats initialValue() {
return new TopicStats();
}
};
public static void generate(PulsarService pulsar, boolean includeTopicMetrics, boolean includeConsumerMetrics,
- boolean includeProducerMetrics, boolean splitTopicAndPartitionIndexLabel, SimpleTextOutputStream stream) {
+ boolean includeProducerMetrics, boolean splitTopicAndPartitionIndexLabel,
+ PrometheusMetricStreams stream) {
String cluster = pulsar.getConfiguration().getClusterName();
AggregatedNamespaceStats namespaceStats = localNamespaceStats.get();
- TopicStats.resetTypes();
TopicStats topicStats = localTopicStats.get();
+ Optional<CompactorMXBean> compactorMXBean = getCompactorMXBean(pulsar);
+ LongAdder topicsCount = new LongAdder();
+ Map<String, Long> localNamespaceTopicCount = new HashMap<>();
printDefaultBrokerStats(stream, cluster);
- Optional<CompactorMXBean> compactorMXBean = getCompactorMXBean(pulsar);
- LongAdder topicsCount = new LongAdder();
pulsar.getBrokerService().getMultiLayerTopicMap().forEach((namespace, bundlesMap) -> {
namespaceStats.reset();
topicsCount.reset();
- bundlesMap.forEach((bundle, topicsMap) -> {
- topicsMap.forEach((name, topic) -> {
- getTopicStats(topic, topicStats, includeConsumerMetrics, includeProducerMetrics,
- pulsar.getConfiguration().isExposePreciseBacklogInPrometheus(),
- pulsar.getConfiguration().isExposeSubscriptionBacklogSizeInPrometheus(),
- compactorMXBean
- );
-
- if (includeTopicMetrics) {
- topicsCount.add(1);
- TopicStats.printTopicStats(stream, cluster, namespace, name, topicStats, compactorMXBean,
- splitTopicAndPartitionIndexLabel);
- } else {
- namespaceStats.updateStats(topicStats);
- }
- });
- });
+ bundlesMap.forEach((bundle, topicsMap) -> topicsMap.forEach((name, topic) -> {
+ getTopicStats(topic, topicStats, includeConsumerMetrics, includeProducerMetrics,
+ pulsar.getConfiguration().isExposePreciseBacklogInPrometheus(),
+ pulsar.getConfiguration().isExposeSubscriptionBacklogSizeInPrometheus(),
+ compactorMXBean
+ );
+
+ if (includeTopicMetrics) {
+ topicsCount.add(1);
+ TopicStats.printTopicStats(stream, topicStats, compactorMXBean, cluster, namespace, name,
+ splitTopicAndPartitionIndexLabel);
+ } else {
+ namespaceStats.updateStats(topicStats);
+ }
+ }));
if (!includeTopicMetrics) {
- // Only include namespace level stats if we don't have the per-topic, otherwise we're going to report
- // the same data twice, and it will make the aggregation difficult
- printNamespaceStats(stream, cluster, namespace, namespaceStats);
+ // Only include namespace level stats if we don't have the per-topic, otherwise we're going to
+ // report the same data twice, and it will make the aggregation difficult
+ printNamespaceStats(stream, namespaceStats, cluster, namespace);
} else {
- printTopicsCountStats(stream, cluster, namespace, topicsCount);
+ localNamespaceTopicCount.put(namespace, topicsCount.sum());
}
});
+
+ if (includeTopicMetrics) {
+ printTopicsCountStats(stream, localNamespaceTopicCount, cluster);
+ }
}
private static Optional<CompactorMXBean> getCompactorMXBean(PulsarService pulsar) {
Compactor compactor = pulsar.getNullableCompactor();
- return Optional.ofNullable(compactor).map(c -> c.getStats());
+ return Optional.ofNullable(compactor).map(Compactor::getStats);
}
private static void getTopicStats(Topic topic, TopicStats stats, boolean includeConsumerMetrics,
- boolean includeProducerMetrics, boolean getPreciseBacklog, boolean subscriptionBacklogSize,
- Optional<CompactorMXBean> compactorMXBean) {
+ boolean includeProducerMetrics, boolean getPreciseBacklog,
+ boolean subscriptionBacklogSize, Optional<CompactorMXBean> compactorMXBean) {
stats.reset();
if (topic instanceof PersistentTopic) {
@@ -267,161 +272,176 @@ public class NamespaceStatsAggregator {
});
}
- private static void printDefaultBrokerStats(SimpleTextOutputStream stream, String cluster) {
+ private static void printDefaultBrokerStats(PrometheusMetricStreams stream, String cluster) {
// Print metrics with 0 values. This is necessary to have the available brokers being
// reported in the brokers dashboard even if they don't have any topic or traffic
- metric(stream, cluster, "pulsar_topics_count", 0);
- metric(stream, cluster, "pulsar_subscriptions_count", 0);
- metric(stream, cluster, "pulsar_producers_count", 0);
- metric(stream, cluster, "pulsar_consumers_count", 0);
- metric(stream, cluster, "pulsar_rate_in", 0);
- metric(stream, cluster, "pulsar_rate_out", 0);
- metric(stream, cluster, "pulsar_throughput_in", 0);
- metric(stream, cluster, "pulsar_throughput_out", 0);
- metric(stream, cluster, "pulsar_storage_size", 0);
- metric(stream, cluster, "pulsar_storage_logical_size", 0);
- metric(stream, cluster, "pulsar_storage_write_rate", 0);
- metric(stream, cluster, "pulsar_storage_read_rate", 0);
- metric(stream, cluster, "pulsar_msg_backlog", 0);
+ writeMetric(stream, "pulsar_topics_count", 0, cluster);
+ writeMetric(stream, "pulsar_subscriptions_count", 0, cluster);
+ writeMetric(stream, "pulsar_producers_count", 0, cluster);
+ writeMetric(stream, "pulsar_consumers_count", 0, cluster);
+ writeMetric(stream, "pulsar_rate_in", 0, cluster);
+ writeMetric(stream, "pulsar_rate_out", 0, cluster);
+ writeMetric(stream, "pulsar_throughput_in", 0, cluster);
+ writeMetric(stream, "pulsar_throughput_out", 0, cluster);
+ writeMetric(stream, "pulsar_storage_size", 0, cluster);
+ writeMetric(stream, "pulsar_storage_logical_size", 0, cluster);
+ writeMetric(stream, "pulsar_storage_write_rate", 0, cluster);
+ writeMetric(stream, "pulsar_storage_read_rate", 0, cluster);
+ writeMetric(stream, "pulsar_msg_backlog", 0, cluster);
}
- private static void printTopicsCountStats(SimpleTextOutputStream stream, String cluster, String namespace,
- LongAdder topicsCount) {
- metric(stream, cluster, namespace, "pulsar_topics_count", topicsCount.sum());
+ private static void printTopicsCountStats(PrometheusMetricStreams stream, Map<String, Long> namespaceTopicsCount,
+ String cluster) {
+ namespaceTopicsCount.forEach(
+ (ns, topicCount) -> writeMetric(stream, "pulsar_topics_count", topicCount, cluster, ns)
+ );
}
- private static void printNamespaceStats(SimpleTextOutputStream stream, String cluster, String namespace,
- AggregatedNamespaceStats stats) {
- metric(stream, cluster, namespace, "pulsar_topics_count", stats.topicsCount);
- metric(stream, cluster, namespace, "pulsar_subscriptions_count", stats.subscriptionsCount);
- metric(stream, cluster, namespace, "pulsar_producers_count", stats.producersCount);
- metric(stream, cluster, namespace, "pulsar_consumers_count", stats.consumersCount);
-
- metric(stream, cluster, namespace, "pulsar_rate_in", stats.rateIn);
- metric(stream, cluster, namespace, "pulsar_rate_out", stats.rateOut);
- metric(stream, cluster, namespace, "pulsar_throughput_in", stats.throughputIn);
- metric(stream, cluster, namespace, "pulsar_throughput_out", stats.throughputOut);
- metric(stream, cluster, namespace, "pulsar_consumer_msg_ack_rate", stats.messageAckRate);
-
- metric(stream, cluster, namespace, "pulsar_in_bytes_total", stats.bytesInCounter);
- metric(stream, cluster, namespace, "pulsar_in_messages_total", stats.msgInCounter);
- metric(stream, cluster, namespace, "pulsar_out_bytes_total", stats.bytesOutCounter);
- metric(stream, cluster, namespace, "pulsar_out_messages_total", stats.msgOutCounter);
-
- metric(stream, cluster, namespace, "pulsar_storage_size", stats.managedLedgerStats.storageSize);
- metric(stream, cluster, namespace, "pulsar_storage_logical_size", stats.managedLedgerStats.storageLogicalSize);
- metric(stream, cluster, namespace, "pulsar_storage_backlog_size", stats.managedLedgerStats.backlogSize);
- metric(stream, cluster, namespace, "pulsar_storage_offloaded_size",
- stats.managedLedgerStats.offloadedStorageUsed);
-
- metric(stream, cluster, namespace, "pulsar_storage_write_rate", stats.managedLedgerStats.storageWriteRate);
- metric(stream, cluster, namespace, "pulsar_storage_read_rate", stats.managedLedgerStats.storageReadRate);
-
- metric(stream, cluster, namespace, "pulsar_subscription_delayed", stats.msgDelayed);
-
- metricWithRemoteCluster(stream, cluster, namespace, "pulsar_msg_backlog", "local", stats.msgBacklog);
+ private static void printNamespaceStats(PrometheusMetricStreams stream, AggregatedNamespaceStats stats,
+ String cluster, String namespace) {
+ writeMetric(stream, "pulsar_topics_count", stats.topicsCount, cluster, namespace);
+ writeMetric(stream, "pulsar_subscriptions_count", stats.subscriptionsCount, cluster,
+ namespace);
+ writeMetric(stream, "pulsar_producers_count", stats.producersCount, cluster, namespace);
+ writeMetric(stream, "pulsar_consumers_count", stats.consumersCount, cluster, namespace);
+
+ writeMetric(stream, "pulsar_rate_in", stats.rateIn, cluster, namespace);
+ writeMetric(stream, "pulsar_rate_out", stats.rateOut, cluster, namespace);
+ writeMetric(stream, "pulsar_throughput_in", stats.throughputIn, cluster, namespace);
+ writeMetric(stream, "pulsar_throughput_out", stats.throughputOut, cluster, namespace);
+ writeMetric(stream, "pulsar_consumer_msg_ack_rate", stats.messageAckRate, cluster, namespace);
+
+ writeMetric(stream, "pulsar_in_bytes_total", stats.bytesInCounter, cluster, namespace);
+ writeMetric(stream, "pulsar_in_messages_total", stats.msgInCounter, cluster, namespace);
+ writeMetric(stream, "pulsar_out_bytes_total", stats.bytesOutCounter, cluster, namespace);
+ writeMetric(stream, "pulsar_out_messages_total", stats.msgOutCounter, cluster, namespace);
+
+ writeMetric(stream, "pulsar_storage_size", stats.managedLedgerStats.storageSize, cluster,
+ namespace);
+ writeMetric(stream, "pulsar_storage_logical_size",
+ stats.managedLedgerStats.storageLogicalSize, cluster, namespace);
+ writeMetric(stream, "pulsar_storage_backlog_size", stats.managedLedgerStats.backlogSize, cluster,
+ namespace);
+ writeMetric(stream, "pulsar_storage_offloaded_size",
+ stats.managedLedgerStats.offloadedStorageUsed, cluster, namespace);
+
+ writeMetric(stream, "pulsar_storage_write_rate", stats.managedLedgerStats.storageWriteRate,
+ cluster, namespace);
+ writeMetric(stream, "pulsar_storage_read_rate", stats.managedLedgerStats.storageReadRate,
+ cluster, namespace);
+
+ writeMetric(stream, "pulsar_subscription_delayed", stats.msgDelayed, cluster, namespace);
+
+ writePulsarMsgBacklog(stream, stats.msgBacklog, cluster, namespace);
stats.managedLedgerStats.storageWriteLatencyBuckets.refresh();
long[] latencyBuckets = stats.managedLedgerStats.storageWriteLatencyBuckets.getBuckets();
- metric(stream, cluster, namespace, "pulsar_storage_write_latency_le_0_5", latencyBuckets[0]);
- metric(stream, cluster, namespace, "pulsar_storage_write_latency_le_1", latencyBuckets[1]);
- metric(stream, cluster, namespace, "pulsar_storage_write_latency_le_5", latencyBuckets[2]);
- metric(stream, cluster, namespace, "pulsar_storage_write_latency_le_10", latencyBuckets[3]);
- metric(stream, cluster, namespace, "pulsar_storage_write_latency_le_20", latencyBuckets[4]);
- metric(stream, cluster, namespace, "pulsar_storage_write_latency_le_50", latencyBuckets[5]);
- metric(stream, cluster, namespace, "pulsar_storage_write_latency_le_100", latencyBuckets[6]);
- metric(stream, cluster, namespace, "pulsar_storage_write_latency_le_200", latencyBuckets[7]);
- metric(stream, cluster, namespace, "pulsar_storage_write_latency_le_1000", latencyBuckets[8]);
- metric(stream, cluster, namespace, "pulsar_storage_write_latency_overflow", latencyBuckets[9]);
- metric(stream, cluster, namespace, "pulsar_storage_write_latency_count",
- stats.managedLedgerStats.storageWriteLatencyBuckets.getCount());
- metric(stream, cluster, namespace, "pulsar_storage_write_latency_sum",
- stats.managedLedgerStats.storageWriteLatencyBuckets.getSum());
+ writeMetric(stream, "pulsar_storage_write_latency_le_0_5", latencyBuckets[0], cluster, namespace);
+ writeMetric(stream, "pulsar_storage_write_latency_le_1", latencyBuckets[1], cluster, namespace);
+ writeMetric(stream, "pulsar_storage_write_latency_le_5", latencyBuckets[2], cluster, namespace);
+ writeMetric(stream, "pulsar_storage_write_latency_le_10", latencyBuckets[3], cluster, namespace);
+ writeMetric(stream, "pulsar_storage_write_latency_le_20", latencyBuckets[4], cluster, namespace);
+ writeMetric(stream, "pulsar_storage_write_latency_le_50", latencyBuckets[5], cluster, namespace);
+ writeMetric(stream, "pulsar_storage_write_latency_le_100", latencyBuckets[6], cluster, namespace);
+ writeMetric(stream, "pulsar_storage_write_latency_le_200", latencyBuckets[7], cluster, namespace);
+ writeMetric(stream, "pulsar_storage_write_latency_le_1000", latencyBuckets[8], cluster, namespace);
+ writeMetric(stream, "pulsar_storage_write_latency_overflow", latencyBuckets[9], cluster, namespace);
+ writeMetric(stream, "pulsar_storage_write_latency_count",
+ stats.managedLedgerStats.storageWriteLatencyBuckets.getCount(), cluster, namespace);
+ writeMetric(stream, "pulsar_storage_write_latency_sum",
+ stats.managedLedgerStats.storageWriteLatencyBuckets.getSum(), cluster, namespace);
stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.refresh();
- long[] ledgerWritelatencyBuckets = stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets();
- metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_le_0_5", ledgerWritelatencyBuckets[0]);
- metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_le_1", ledgerWritelatencyBuckets[1]);
- metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_le_5", ledgerWritelatencyBuckets[2]);
- metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_le_10", ledgerWritelatencyBuckets[3]);
- metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_le_20", ledgerWritelatencyBuckets[4]);
- metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_le_50", ledgerWritelatencyBuckets[5]);
- metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_le_100", ledgerWritelatencyBuckets[6]);
- metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_le_200", ledgerWritelatencyBuckets[7]);
- metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_le_1000", ledgerWritelatencyBuckets[8]);
- metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_overflow",
- ledgerWritelatencyBuckets[9]);
- metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_count",
- stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getCount());
- metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_sum",
- stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getSum());
+ long[] ledgerWriteLatencyBuckets = stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets();
+ writeMetric(stream, "pulsar_storage_ledger_write_latency_le_0_5", ledgerWriteLatencyBuckets[0],
+ cluster, namespace);
+ writeMetric(stream, "pulsar_storage_ledger_write_latency_le_1", ledgerWriteLatencyBuckets[1],
+ cluster, namespace);
+ writeMetric(stream, "pulsar_storage_ledger_write_latency_le_5", ledgerWriteLatencyBuckets[2],
+ cluster, namespace);
+ writeMetric(stream, "pulsar_storage_ledger_write_latency_le_10", ledgerWriteLatencyBuckets[3],
+ cluster, namespace);
+ writeMetric(stream, "pulsar_storage_ledger_write_latency_le_20", ledgerWriteLatencyBuckets[4],
+ cluster, namespace);
+ writeMetric(stream, "pulsar_storage_ledger_write_latency_le_50", ledgerWriteLatencyBuckets[5],
+ cluster, namespace);
+ writeMetric(stream, "pulsar_storage_ledger_write_latency_le_100", ledgerWriteLatencyBuckets[6],
+ cluster, namespace);
+ writeMetric(stream, "pulsar_storage_ledger_write_latency_le_200", ledgerWriteLatencyBuckets[7],
+ cluster, namespace);
+ writeMetric(stream, "pulsar_storage_ledger_write_latency_le_1000", ledgerWriteLatencyBuckets[8],
+ cluster, namespace);
+ writeMetric(stream, "pulsar_storage_ledger_write_latency_overflow", ledgerWriteLatencyBuckets[9],
+ cluster, namespace);
+ writeMetric(stream, "pulsar_storage_ledger_write_latency_count",
+ stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getCount(), cluster, namespace);
+ writeMetric(stream, "pulsar_storage_ledger_write_latency_sum",
+ stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getSum(), cluster, namespace);
stats.managedLedgerStats.entrySizeBuckets.refresh();
long[] entrySizeBuckets = stats.managedLedgerStats.entrySizeBuckets.getBuckets();
- metric(stream, cluster, namespace, "pulsar_entry_size_le_128", entrySizeBuckets[0]);
- metric(stream, cluster, namespace, "pulsar_entry_size_le_512", entrySizeBuckets[1]);
- metric(stream, cluster, namespace, "pulsar_entry_size_le_1_kb", entrySizeBuckets[2]);
- metric(stream, cluster, namespace, "pulsar_entry_size_le_2_kb", entrySizeBuckets[3]);
- metric(stream, cluster, namespace, "pulsar_entry_size_le_4_kb", entrySizeBuckets[4]);
- metric(stream, cluster, namespace, "pulsar_entry_size_le_16_kb", entrySizeBuckets[5]);
- metric(stream, cluster, namespace, "pulsar_entry_size_le_100_kb", entrySizeBuckets[6]);
- metric(stream, cluster, namespace, "pulsar_entry_size_le_1_mb", entrySizeBuckets[7]);
- metric(stream, cluster, namespace, "pulsar_entry_size_le_overflow", entrySizeBuckets[8]);
- metric(stream, cluster, namespace, "pulsar_entry_size_count",
- stats.managedLedgerStats.entrySizeBuckets.getCount());
- metric(stream, cluster, namespace, "pulsar_entry_size_sum",
- stats.managedLedgerStats.entrySizeBuckets.getSum());
-
- if (!stats.replicationStats.isEmpty()) {
- stats.replicationStats.forEach((remoteCluster, replStats) -> {
- metricWithRemoteCluster(stream, cluster, namespace, "pulsar_replication_rate_in", remoteCluster,
- replStats.msgRateIn);
- metricWithRemoteCluster(stream, cluster, namespace, "pulsar_replication_rate_out", remoteCluster,
- replStats.msgRateOut);
- metricWithRemoteCluster(stream, cluster, namespace, "pulsar_replication_throughput_in", remoteCluster,
- replStats.msgThroughputIn);
- metricWithRemoteCluster(stream, cluster, namespace, "pulsar_replication_throughput_out", remoteCluster,
- replStats.msgThroughputOut);
- metricWithRemoteCluster(stream, cluster, namespace, "pulsar_replication_backlog", remoteCluster,
- replStats.replicationBacklog);
- metricWithRemoteCluster(stream, cluster, namespace, "pulsar_replication_connected_count", remoteCluster,
- replStats.connectedCount);
- metricWithRemoteCluster(stream, cluster, namespace, "pulsar_replication_rate_expired", remoteCluster,
- replStats.msgRateExpired);
- metricWithRemoteCluster(stream, cluster, namespace, "pulsar_replication_delay_in_seconds",
- remoteCluster, replStats.replicationDelayInSeconds);
- });
- }
+ writeMetric(stream, "pulsar_entry_size_le_128", entrySizeBuckets[0], cluster, namespace);
+ writeMetric(stream, "pulsar_entry_size_le_512", entrySizeBuckets[1], cluster, namespace);
+ writeMetric(stream, "pulsar_entry_size_le_1_kb", entrySizeBuckets[2], cluster, namespace);
+ writeMetric(stream, "pulsar_entry_size_le_2_kb", entrySizeBuckets[3], cluster, namespace);
+ writeMetric(stream, "pulsar_entry_size_le_4_kb", entrySizeBuckets[4], cluster, namespace);
+ writeMetric(stream, "pulsar_entry_size_le_16_kb", entrySizeBuckets[5], cluster, namespace);
+ writeMetric(stream, "pulsar_entry_size_le_100_kb", entrySizeBuckets[6], cluster, namespace);
+ writeMetric(stream, "pulsar_entry_size_le_1_mb", entrySizeBuckets[7], cluster, namespace);
+ writeMetric(stream, "pulsar_entry_size_le_overflow", entrySizeBuckets[8], cluster, namespace);
+ writeMetric(stream, "pulsar_entry_size_count", stats.managedLedgerStats.entrySizeBuckets.getCount(),
+ cluster, namespace);
+ writeMetric(stream, "pulsar_entry_size_sum", stats.managedLedgerStats.entrySizeBuckets.getSum(),
+ cluster, namespace);
+
+ writeReplicationStat(stream, "pulsar_replication_rate_in", stats,
+ replStats -> replStats.msgRateIn, cluster, namespace);
+ writeReplicationStat(stream, "pulsar_replication_rate_out", stats,
+ replStats -> replStats.msgRateOut, cluster, namespace);
+ writeReplicationStat(stream, "pulsar_replication_throughput_in", stats,
+ replStats -> replStats.msgThroughputIn, cluster, namespace);
+ writeReplicationStat(stream, "pulsar_replication_throughput_out", stats,
+ replStats -> replStats.msgThroughputOut, cluster, namespace);
+ writeReplicationStat(stream, "pulsar_replication_backlog", stats,
+ replStats -> replStats.replicationBacklog, cluster, namespace);
+ writeReplicationStat(stream, "pulsar_replication_connected_count", stats,
+ replStats -> replStats.connectedCount, cluster, namespace);
+ writeReplicationStat(stream, "pulsar_replication_rate_expired", stats,
+ replStats -> replStats.msgRateExpired, cluster, namespace);
+ writeReplicationStat(stream, "pulsar_replication_delay_in_seconds", stats,
+ replStats -> replStats.replicationDelayInSeconds, cluster, namespace);
}
- private static void metric(SimpleTextOutputStream stream, String cluster, String name,
- long value) {
- TopicStats.metricType(stream, name);
- stream.write(name)
- .write("{cluster=\"").write(cluster).write("\"} ")
- .write(value).write(' ').write(System.currentTimeMillis())
- .write('\n');
+ private static void writePulsarMsgBacklog(PrometheusMetricStreams stream, Number value,
+ String cluster, String namespace) {
+ stream.writeSample("pulsar_msg_backlog", value, "cluster", cluster, "namespace", namespace,
+ "remote_cluster",
+ "local");
}
- private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String name,
- long value) {
- TopicStats.metricType(stream, name);
- stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace).write("\"} ");
- stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
+ private static void writeMetric(PrometheusMetricStreams stream, String metricName, Number value,
+ String cluster) {
+ stream.writeSample(metricName, value, "cluster", cluster);
}
- private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String name,
- double value) {
- TopicStats.metricType(stream, name);
- stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace).write("\"} ");
- stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
+ private static void writeMetric(PrometheusMetricStreams stream, String metricName, Number value, String cluster,
+ String namespace) {
+ stream.writeSample(metricName, value, "cluster", cluster, "namespace", namespace);
}
- private static void metricWithRemoteCluster(SimpleTextOutputStream stream, String cluster, String namespace,
- String name, String remoteCluster, double value) {
- TopicStats.metricType(stream, name);
- stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace);
- stream.write("\",remote_cluster=\"").write(remoteCluster).write("\"} ");
- stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
+ private static void writeReplicationStat(PrometheusMetricStreams stream, String metricName,
+ AggregatedNamespaceStats namespaceStats,
+ Function<AggregatedReplicationStats, Number> sampleValueFunction,
+ String cluster, String namespace) {
+ if (!namespaceStats.replicationStats.isEmpty()) {
+ namespaceStats.replicationStats.forEach((remoteCluster, replStats) ->
+ stream.writeSample(metricName, sampleValueFunction.apply(replStats),
+ "cluster", cluster,
+ "namespace", namespace,
+ "remote_cluster", remoteCluster)
+ );
+ }
}
+
+
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricStreams.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricStreams.java
new file mode 100644
index 00000000000..6b6b972c175
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricStreams.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats.prometheus;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
+
+/**
+ * Helper class to ensure that metrics of the same name are grouped together under the same TYPE header when written.
+ * Those are the requirements of the
+ * <a href="https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md#grouping-and-sorting">Prometheus Exposition Format</a>.
+ */
+public class PrometheusMetricStreams {
+ private final Map<String, SimpleTextOutputStream> metricStreamMap = new HashMap<>();
+
+ /**
+ * Write the given metric and sample value to the stream. Will write #TYPE header if metric not seen before.
+ * @param metricName name of the metric.
+ * @param value value of the sample
+ * @param labelsAndValuesArray varargs of label and label value
+ */
+ void writeSample(String metricName, Number value, String... labelsAndValuesArray) {
+ SimpleTextOutputStream stream = initGaugeType(metricName);
+ stream.write(metricName).write('{');
+ for (int i = 0; i < labelsAndValuesArray.length; i += 2) {
+ stream.write(labelsAndValuesArray[i]).write("=\"").write(labelsAndValuesArray[i + 1]).write('\"');
+ if (i + 2 != labelsAndValuesArray.length) {
+ stream.write(',');
+ }
+ }
+ stream.write("} ").write(value).write(' ').write(System.currentTimeMillis()).write('\n');
+ }
+
+ /**
+ * Flush all the stored metrics to the supplied stream.
+ * @param stream the stream to write to.
+ */
+ void flushAllToStream(SimpleTextOutputStream stream) {
+ metricStreamMap.values().forEach(s -> stream.write(s.getBuffer()));
+ }
+
+ /**
+ * Release all the streams to clean up resources.
+ */
+ void releaseAll() {
+ metricStreamMap.values().forEach(s -> s.getBuffer().release());
+ metricStreamMap.clear();
+ }
+
+ private SimpleTextOutputStream initGaugeType(String metricName) {
+ return metricStreamMap.computeIfAbsent(metricName, s -> {
+ SimpleTextOutputStream stream = new SimpleTextOutputStream(PulsarByteBufAllocator.DEFAULT.directBuffer());
+ stream.write("# TYPE ").write(metricName).write(" gauge\n");
+ return stream;
+ });
+ }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
index cd6afd1535d..a993d1edf3a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
@@ -52,7 +52,8 @@ import org.apache.pulsar.common.util.SimpleTextOutputStream;
/**
* Generate metrics aggregated at the namespace level and optionally at a topic level and formats them out
* in a text format suitable to be consumed by Prometheus.
- * Format specification can be found at {@link https://prometheus.io/docs/instrumenting/exposition_formats/}
+ * Format specification can be found at <a
+ * href="https://prometheus.io/docs/instrumenting/exposition_formats/">Exposition Formats</a>
*/
public class PrometheusMetricsGenerator {
@@ -86,38 +87,44 @@ public class PrometheusMetricsGenerator {
}
public static void generate(PulsarService pulsar, boolean includeTopicMetrics, boolean includeConsumerMetrics,
- boolean includeProducerMetrics, OutputStream out) throws IOException {
+ boolean includeProducerMetrics, OutputStream out) throws IOException {
generate(pulsar, includeTopicMetrics, includeConsumerMetrics, includeProducerMetrics, false, out, null);
}
public static void generate(PulsarService pulsar, boolean includeTopicMetrics, boolean includeConsumerMetrics,
- boolean includeProducerMetrics, boolean splitTopicAndPartitionIndexLabel,
- OutputStream out) throws IOException {
+ boolean includeProducerMetrics, boolean splitTopicAndPartitionIndexLabel,
+ OutputStream out) throws IOException {
generate(pulsar, includeTopicMetrics, includeConsumerMetrics, includeProducerMetrics,
splitTopicAndPartitionIndexLabel, out, null);
}
public static void generate(PulsarService pulsar, boolean includeTopicMetrics, boolean includeConsumerMetrics,
- boolean includeProducerMetrics, boolean splitTopicAndPartitionIndexLabel, OutputStream out,
- List<PrometheusRawMetricsProvider> metricsProviders)
- throws IOException {
+ boolean includeProducerMetrics, boolean splitTopicAndPartitionIndexLabel,
+ OutputStream out,
+ List<PrometheusRawMetricsProvider> metricsProviders)
+ throws IOException {
ByteBuf buf = ByteBufAllocator.DEFAULT.heapBuffer();
+ boolean exceptionHappens = false;
+ //Used in namespace/topic and transaction aggregators as share metric names
+ PrometheusMetricStreams metricStreams = new PrometheusMetricStreams();
try {
SimpleTextOutputStream stream = new SimpleTextOutputStream(buf);
generateSystemMetrics(stream, pulsar.getConfiguration().getClusterName());
NamespaceStatsAggregator.generate(pulsar, includeTopicMetrics, includeConsumerMetrics,
- includeProducerMetrics, splitTopicAndPartitionIndexLabel, stream);
+ includeProducerMetrics, splitTopicAndPartitionIndexLabel, metricStreams);
if (pulsar.getWorkerServiceOpt().isPresent()) {
pulsar.getWorkerService().generateFunctionsStats(stream);
}
if (pulsar.getConfiguration().isTransactionCoordinatorEnabled()) {
- TransactionAggregator.generate(pulsar, stream, includeTopicMetrics);
+ TransactionAggregator.generate(pulsar, metricStreams, includeTopicMetrics);
}
+ metricStreams.flushAllToStream(stream);
+
generateBrokerBasicMetrics(pulsar, stream);
generateManagedLedgerBookieClientMetrics(pulsar, stream);
@@ -129,7 +136,12 @@ public class PrometheusMetricsGenerator {
}
out.write(buf.array(), buf.arrayOffset(), buf.readableBytes());
} finally {
- buf.release();
+ //release all the metrics buffers
+ metricStreams.releaseAll();
+ //if exception happens, release buffer
+ if (exceptionHappens) {
+ buf.release();
+ }
}
}
@@ -142,17 +154,17 @@ public class PrometheusMetricsGenerator {
if (pulsar.getConfiguration().isExposeManagedLedgerMetricsInPrometheus()) {
// generate managedLedger metrics
parseMetricsToPrometheusMetrics(new ManagedLedgerMetrics(pulsar).generate(),
- clusterName, Collector.Type.GAUGE, stream);
+ clusterName, Collector.Type.GAUGE, stream);
}
if (pulsar.getConfiguration().isExposeManagedCursorMetricsInPrometheus()) {
// generate managedCursor metrics
parseMetricsToPrometheusMetrics(new ManagedCursorMetrics(pulsar).generate(),
- clusterName, Collector.Type.GAUGE, stream);
+ clusterName, Collector.Type.GAUGE, stream);
}
parseMetricsToPrometheusMetrics(Collections.singletonList(pulsar.getBrokerService()
- .getPulsarStats().getBrokerOperabilityMetrics().generateConnectionMetrics()),
+ .getPulsarStats().getBrokerOperabilityMetrics().generateConnectionMetrics()),
clusterName, Collector.Type.GAUGE, stream);
// generate loadBalance metrics
@@ -267,17 +279,17 @@ public class PrometheusMetricsGenerator {
static String getTypeStr(Collector.Type type) {
switch (type) {
- case COUNTER:
- return "counter";
- case GAUGE:
- return "gauge";
- case SUMMARY :
- return "summary";
- case HISTOGRAM:
- return "histogram";
- case UNTYPED:
- default:
- return "untyped";
+ case COUNTER:
+ return "counter";
+ case GAUGE:
+ return "gauge";
+ case SUMMARY:
+ return "summary";
+ case HISTOGRAM:
+ return "histogram";
+ case UNTYPED:
+ default:
+ return "untyped";
}
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
index e6e5883847d..e91521aff55 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
@@ -23,12 +23,12 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.apache.bookkeeper.mledger.util.StatsBuckets;
-import org.apache.pulsar.common.util.SimpleTextOutputStream;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.compaction.CompactionRecord;
import org.apache.pulsar.compaction.CompactorMXBean;
class TopicStats {
-
int subscriptionsCount;
int producersCount;
int consumersCount;
@@ -43,7 +43,6 @@ class TopicStats {
double averageMsgSize;
public long msgBacklog;
-
long publishRateLimitedTimes;
long backlogQuotaLimit;
@@ -55,9 +54,6 @@ class TopicStats {
Map<String, AggregatedSubscriptionStats> subscriptionStats = new HashMap<>();
Map<String, AggregatedProducerStats> producerStats = new HashMap<>();
- // Used for tracking duplicate TYPE definitions
- static Map<String, String> metricWithTypeDefinition = new HashMap<>();
-
// For compaction
long compactionRemovedEventCount;
long compactionSucceedCount;
@@ -103,378 +99,340 @@ class TopicStats {
compactionLatencyBuckets.reset();
}
- static void resetTypes() {
- metricWithTypeDefinition.clear();
- }
-
- static void printTopicStats(SimpleTextOutputStream stream, String cluster, String namespace, String topic,
- TopicStats stats, Optional<CompactorMXBean> compactorMXBean,
- boolean splitTopicAndPartitionIndexLabel) {
- metric(stream, cluster, namespace, topic, "pulsar_subscriptions_count", stats.subscriptionsCount,
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_producers_count", stats.producersCount,
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_consumers_count", stats.consumersCount,
- splitTopicAndPartitionIndexLabel);
-
- metric(stream, cluster, namespace, topic, "pulsar_rate_in", stats.rateIn,
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_rate_out", stats.rateOut,
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_throughput_in", stats.throughputIn,
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_throughput_out", stats.throughputOut,
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_average_msg_size", stats.averageMsgSize,
- splitTopicAndPartitionIndexLabel);
-
- metric(stream, cluster, namespace, topic, "pulsar_storage_size", stats.managedLedgerStats.storageSize,
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_storage_logical_size",
- stats.managedLedgerStats.storageLogicalSize, splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_msg_backlog", stats.msgBacklog,
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_storage_write_rate",
- stats.managedLedgerStats.storageWriteRate, splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_storage_read_rate", stats.managedLedgerStats.storageReadRate,
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_storage_backlog_size",
- stats.managedLedgerStats.backlogSize, splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_publish_rate_limit_times", stats.publishRateLimitedTimes,
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_storage_offloaded_size", stats.managedLedgerStats
- .offloadedStorageUsed, splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_storage_backlog_quota_limit", stats.backlogQuotaLimit,
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_storage_backlog_quota_limit_time",
- stats.backlogQuotaLimitTime, splitTopicAndPartitionIndexLabel);
+ public static void printTopicStats(PrometheusMetricStreams stream, TopicStats stats,
+ Optional<CompactorMXBean> compactorMXBean, String cluster, String namespace,
+ String topic, boolean splitTopicAndPartitionIndexLabel) {
+ writeMetric(stream, "pulsar_subscriptions_count", stats.subscriptionsCount,
+ cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_producers_count", stats.producersCount,
+ cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_consumers_count", stats.consumersCount,
+ cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+
+ writeMetric(stream, "pulsar_rate_in", stats.rateIn,
+ cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_rate_out", stats.rateOut,
+ cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_throughput_in", stats.throughputIn,
+ cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_throughput_out", stats.throughputOut,
+ cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_average_msg_size", stats.averageMsgSize,
+ cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+
+ writeMetric(stream, "pulsar_storage_size", stats.managedLedgerStats.storageSize,
+ cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_storage_logical_size",
+ stats.managedLedgerStats.storageLogicalSize, cluster, namespace, topic,
+ splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_msg_backlog", stats.msgBacklog,
+ cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_storage_write_rate", stats.managedLedgerStats.storageWriteRate,
+ cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_storage_read_rate", stats.managedLedgerStats.storageReadRate,
+ cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_storage_backlog_size", stats.managedLedgerStats.backlogSize,
+ cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_publish_rate_limit_times", stats.publishRateLimitedTimes,
+ cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_storage_offloaded_size", stats.managedLedgerStats
+ .offloadedStorageUsed, cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_storage_backlog_quota_limit", stats.backlogQuotaLimit,
+ cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_storage_backlog_quota_limit_time", stats.backlogQuotaLimitTime,
+ cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
long[] latencyBuckets = stats.managedLedgerStats.storageWriteLatencyBuckets.getBuckets();
- metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_0_5", latencyBuckets[0],
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_1", latencyBuckets[1],
+ writeMetric(stream, "pulsar_storage_write_latency_le_0_5",
+ latencyBuckets[0], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_storage_write_latency_le_1",
+ latencyBuckets[1], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_storage_write_latency_le_5",
+ latencyBuckets[2], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_storage_write_latency_le_10",
+ latencyBuckets[3], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_storage_write_latency_le_20",
+ latencyBuckets[4], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_storage_write_latency_le_50",
+ latencyBuckets[5], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_storage_write_latency_le_100",
+ latencyBuckets[6], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_storage_write_latency_le_200",
+ latencyBuckets[7], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_storage_write_latency_le_1000",
+ latencyBuckets[8], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_storage_write_latency_overflow",
+ latencyBuckets[9], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_storage_write_latency_count",
+ stats.managedLedgerStats.storageWriteLatencyBuckets.getCount(),
+ cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_storage_write_latency_sum",
+ stats.managedLedgerStats.storageWriteLatencyBuckets.getSum(), cluster, namespace, topic,
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_5", latencyBuckets[2],
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_10", latencyBuckets[3],
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_20", latencyBuckets[4],
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_50", latencyBuckets[5],
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_100", latencyBuckets[6],
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_200", latencyBuckets[7],
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_1000", latencyBuckets[8],
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_overflow", latencyBuckets[9],
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_count",
- stats.managedLedgerStats.storageWriteLatencyBuckets.getCount(), splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_sum",
- stats.managedLedgerStats.storageWriteLatencyBuckets.getSum(), splitTopicAndPartitionIndexLabel);
long[] ledgerWriteLatencyBuckets = stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets();
- metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_0_5",
- ledgerWriteLatencyBuckets[0], splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_1",
- ledgerWriteLatencyBuckets[1], splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_5",
- ledgerWriteLatencyBuckets[2], splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_10",
- ledgerWriteLatencyBuckets[3], splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_20",
- ledgerWriteLatencyBuckets[4], splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_50",
- ledgerWriteLatencyBuckets[5], splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_100",
- ledgerWriteLatencyBuckets[6], splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_200",
- ledgerWriteLatencyBuckets[7], splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_1000",
- ledgerWriteLatencyBuckets[8], splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_overflow",
- ledgerWriteLatencyBuckets[9], splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_count",
+ writeMetric(stream, "pulsar_storage_ledger_write_latency_le_0_5",
+ ledgerWriteLatencyBuckets[0], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_storage_ledger_write_latency_le_1",
+ ledgerWriteLatencyBuckets[1], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_storage_ledger_write_latency_le_5",
+ ledgerWriteLatencyBuckets[2], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_storage_ledger_write_latency_le_10",
+ ledgerWriteLatencyBuckets[3], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_storage_ledger_write_latency_le_20",
+ ledgerWriteLatencyBuckets[4], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_storage_ledger_write_latency_le_50",
+ ledgerWriteLatencyBuckets[5], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_storage_ledger_write_latency_le_100",
+ ledgerWriteLatencyBuckets[6], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_storage_ledger_write_latency_le_200",
+ ledgerWriteLatencyBuckets[7], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_storage_ledger_write_latency_le_1000",
+ ledgerWriteLatencyBuckets[8], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_storage_ledger_write_latency_overflow",
+ ledgerWriteLatencyBuckets[9], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_storage_ledger_write_latency_count",
stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getCount(),
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_sum",
+ cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_storage_ledger_write_latency_sum",
stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getSum(),
- splitTopicAndPartitionIndexLabel);
+ cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
long[] entrySizeBuckets = stats.managedLedgerStats.entrySizeBuckets.getBuckets();
- metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_128", entrySizeBuckets[0],
+ writeMetric(stream, "pulsar_entry_size_le_128", entrySizeBuckets[0], cluster, namespace, topic,
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_512", entrySizeBuckets[1],
+ writeMetric(stream, "pulsar_entry_size_le_512", entrySizeBuckets[1], cluster, namespace, topic,
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_1_kb", entrySizeBuckets[2],
+ writeMetric(stream, "pulsar_entry_size_le_1_kb", entrySizeBuckets[2], cluster, namespace, topic,
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_2_kb", entrySizeBuckets[3],
+ writeMetric(stream, "pulsar_entry_size_le_2_kb", entrySizeBuckets[3], cluster, namespace, topic,
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_4_kb", entrySizeBuckets[4],
+ writeMetric(stream, "pulsar_entry_size_le_4_kb", entrySizeBuckets[4], cluster, namespace, topic,
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_16_kb", entrySizeBuckets[5],
+ writeMetric(stream, "pulsar_entry_size_le_16_kb", entrySizeBuckets[5], cluster, namespace, topic,
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_100_kb", entrySizeBuckets[6],
+ writeMetric(stream, "pulsar_entry_size_le_100_kb", entrySizeBuckets[6], cluster, namespace, topic,
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_1_mb", entrySizeBuckets[7],
+ writeMetric(stream, "pulsar_entry_size_le_1_mb", entrySizeBuckets[7], cluster, namespace, topic,
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_overflow", entrySizeBuckets[8],
+ writeMetric(stream, "pulsar_entry_size_le_overflow", entrySizeBuckets[8], cluster, namespace, topic,
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_entry_size_count",
- stats.managedLedgerStats.entrySizeBuckets.getCount(), splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_entry_size_sum",
- stats.managedLedgerStats.entrySizeBuckets.getSum(), splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_entry_size_count", stats.managedLedgerStats.entrySizeBuckets.getCount(),
+ cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_entry_size_sum", stats.managedLedgerStats.entrySizeBuckets.getSum(),
+ cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
stats.producerStats.forEach((p, producerStats) -> {
- metric(stream, cluster, namespace, topic, p, producerStats.producerId, "pulsar_producer_msg_rate_in",
- producerStats.msgRateIn, splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, p, producerStats.producerId, "pulsar_producer_msg_throughput_in",
- producerStats.msgThroughputIn, splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, p, producerStats.producerId, "pulsar_producer_msg_average_Size",
- producerStats.averageMsgSize, splitTopicAndPartitionIndexLabel);
+ writeProducerMetric(stream, "pulsar_producer_msg_rate_in", producerStats.msgRateIn,
+ cluster, namespace, topic, p, producerStats.producerId, splitTopicAndPartitionIndexLabel);
+ writeProducerMetric(stream, "pulsar_producer_msg_throughput_in", producerStats.msgThroughputIn,
+ cluster, namespace, topic, p, producerStats.producerId, splitTopicAndPartitionIndexLabel);
+ writeProducerMetric(stream, "pulsar_producer_msg_average_Size", producerStats.averageMsgSize,
+ cluster, namespace, topic, p, producerStats.producerId, splitTopicAndPartitionIndexLabel);
});
- stats.subscriptionStats.forEach((n, subsStats) -> {
- metric(stream, cluster, namespace, topic, n, "pulsar_subscription_back_log",
- subsStats.msgBacklog, splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, n, "pulsar_subscription_back_log_no_delayed",
- subsStats.msgBacklogNoDelayed, splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, n, "pulsar_subscription_delayed",
- subsStats.msgDelayed, splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_rate_redeliver",
- subsStats.msgRateRedeliver, splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, n, "pulsar_subscription_unacked_messages",
- subsStats.unackedMessages, splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, n, "pulsar_subscription_blocked_on_unacked_messages",
- subsStats.blockedSubscriptionOnUnackedMsgs ? 1 : 0, splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_rate_out",
- subsStats.msgRateOut, splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_ack_rate",
- subsStats.messageAckRate, splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_throughput_out",
- subsStats.msgThroughputOut, splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, n, "pulsar_out_bytes_total",
- subsStats.bytesOutCounter, splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, n, "pulsar_out_messages_total",
- subsStats.msgOutCounter, splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, n, "pulsar_subscription_last_expire_timestamp",
- subsStats.lastExpireTimestamp, splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, n, "pulsar_subscription_last_acked_timestamp",
- subsStats.lastAckedTimestamp, splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, n, "pulsar_subscription_last_consumed_flow_timestamp",
- subsStats.lastConsumedFlowTimestamp, splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, n, "pulsar_subscription_last_consumed_timestamp",
- subsStats.lastConsumedTimestamp, splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, n, "pulsar_subscription_last_mark_delete_advanced_timestamp",
- subsStats.lastMarkDeleteAdvancedTimestamp, splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_rate_expired",
- subsStats.msgRateExpired, splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, n, "pulsar_subscription_total_msg_expired",
- subsStats.totalMsgExpired, splitTopicAndPartitionIndexLabel);
+ stats.subscriptionStats.forEach((sub, subsStats) -> {
+ writeSubscriptionMetric(stream, "pulsar_subscription_back_log", subsStats.msgBacklog,
+ cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
+ writeSubscriptionMetric(stream, "pulsar_subscription_back_log_no_delayed",
+ subsStats.msgBacklogNoDelayed, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
+ writeSubscriptionMetric(stream, "pulsar_subscription_delayed",
+ subsStats.msgDelayed, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
+ writeSubscriptionMetric(stream, "pulsar_subscription_msg_rate_redeliver",
+ subsStats.msgRateRedeliver, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
+ writeSubscriptionMetric(stream, "pulsar_subscription_unacked_messages",
+ subsStats.unackedMessages, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
+ writeSubscriptionMetric(stream, "pulsar_subscription_blocked_on_unacked_messages",
+ subsStats.blockedSubscriptionOnUnackedMsgs ? 1 : 0, cluster, namespace, topic, sub,
+ splitTopicAndPartitionIndexLabel);
+ writeSubscriptionMetric(stream, "pulsar_subscription_msg_rate_out",
+ subsStats.msgRateOut, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
+ writeSubscriptionMetric(stream, "pulsar_subscription_msg_ack_rate",
+ subsStats.messageAckRate, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
+ writeSubscriptionMetric(stream, "pulsar_subscription_msg_throughput_out",
+ subsStats.msgThroughputOut, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
+ writeSubscriptionMetric(stream, "pulsar_out_bytes_total",
+ subsStats.bytesOutCounter, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
+ writeSubscriptionMetric(stream, "pulsar_out_messages_total",
+ subsStats.msgOutCounter, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
+ writeSubscriptionMetric(stream, "pulsar_subscription_last_expire_timestamp",
+ subsStats.lastExpireTimestamp, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
+ writeSubscriptionMetric(stream, "pulsar_subscription_last_acked_timestamp",
+ subsStats.lastAckedTimestamp, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
+ writeSubscriptionMetric(stream, "pulsar_subscription_last_consumed_flow_timestamp",
+ subsStats.lastConsumedFlowTimestamp, cluster, namespace, topic, sub,
+ splitTopicAndPartitionIndexLabel);
+ writeSubscriptionMetric(stream, "pulsar_subscription_last_consumed_timestamp",
+ subsStats.lastConsumedTimestamp, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
+ writeSubscriptionMetric(stream, "pulsar_subscription_last_mark_delete_advanced_timestamp",
+ subsStats.lastMarkDeleteAdvancedTimestamp, cluster, namespace, topic, sub,
+ splitTopicAndPartitionIndexLabel);
+ writeSubscriptionMetric(stream, "pulsar_subscription_msg_rate_expired",
+ subsStats.msgRateExpired, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
+ writeSubscriptionMetric(stream, "pulsar_subscription_total_msg_expired",
+ subsStats.totalMsgExpired, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
+
subsStats.consumerStat.forEach((c, consumerStats) -> {
- metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(),
- "pulsar_consumer_msg_rate_redeliver", consumerStats.msgRateRedeliver,
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(),
- "pulsar_consumer_unacked_messages", consumerStats.unackedMessages,
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(),
- "pulsar_consumer_blocked_on_unacked_messages",
+ writeConsumerMetric(stream, "pulsar_consumer_msg_rate_redeliver", consumerStats.msgRateRedeliver,
+ cluster, namespace, topic, sub, c, splitTopicAndPartitionIndexLabel);
+ writeConsumerMetric(stream, "pulsar_consumer_unacked_messages", consumerStats.unackedMessages,
+ cluster, namespace, topic, sub, c, splitTopicAndPartitionIndexLabel);
+ writeConsumerMetric(stream, "pulsar_consumer_blocked_on_unacked_messages",
consumerStats.blockedSubscriptionOnUnackedMsgs ? 1 : 0,
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(),
- "pulsar_consumer_msg_rate_out", consumerStats.msgRateOut,
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(),
- "pulsar_consumer_msg_ack_rate", consumerStats.msgAckRate,
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(),
- "pulsar_consumer_msg_throughput_out", consumerStats.msgThroughputOut,
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(),
- "pulsar_consumer_available_permits", consumerStats.availablePermits,
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(),
- "pulsar_out_bytes_total", consumerStats.bytesOutCounter,
- splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(),
- "pulsar_out_messages_total", consumerStats.msgOutCounter,
- splitTopicAndPartitionIndexLabel);
+ cluster, namespace, topic, sub, c, splitTopicAndPartitionIndexLabel);
+ writeConsumerMetric(stream, "pulsar_consumer_msg_rate_out", consumerStats.msgRateOut,
+ cluster, namespace, topic, sub, c, splitTopicAndPartitionIndexLabel);
+
+ writeConsumerMetric(stream, "pulsar_consumer_msg_ack_rate", consumerStats.msgAckRate,
+ cluster, namespace, topic, sub, c, splitTopicAndPartitionIndexLabel);
+
+ writeConsumerMetric(stream, "pulsar_consumer_msg_throughput_out", consumerStats.msgThroughputOut,
+ cluster, namespace, topic, sub, c, splitTopicAndPartitionIndexLabel);
+ writeConsumerMetric(stream, "pulsar_consumer_available_permits", consumerStats.availablePermits,
+ cluster, namespace, topic, sub, c, splitTopicAndPartitionIndexLabel);
+ writeConsumerMetric(stream, "pulsar_out_bytes_total", consumerStats.bytesOutCounter,
+ cluster, namespace, topic, sub, c, splitTopicAndPartitionIndexLabel);
+ writeConsumerMetric(stream, "pulsar_out_messages_total", consumerStats.msgOutCounter,
+ cluster, namespace, topic, sub, c, splitTopicAndPartitionIndexLabel);
});
});
if (!stats.replicationStats.isEmpty()) {
stats.replicationStats.forEach((remoteCluster, replStats) -> {
- metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_rate_in", remoteCluster,
- replStats.msgRateIn, splitTopicAndPartitionIndexLabel);
- metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_rate_out", remoteCluster,
- replStats.msgRateOut, splitTopicAndPartitionIndexLabel);
- metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_throughput_in",
- remoteCluster, replStats.msgThroughputIn, splitTopicAndPartitionIndexLabel);
- metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_throughput_out",
- remoteCluster, replStats.msgThroughputOut, splitTopicAndPartitionIndexLabel);
- metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_backlog", remoteCluster,
- replStats.replicationBacklog, splitTopicAndPartitionIndexLabel);
- metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_connected_count",
- remoteCluster, replStats.connectedCount, splitTopicAndPartitionIndexLabel);
- metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_rate_expired",
- remoteCluster, replStats.msgRateExpired, splitTopicAndPartitionIndexLabel);
- metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_delay_in_seconds",
- remoteCluster, replStats.replicationDelayInSeconds, splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_replication_rate_in", replStats.msgRateIn,
+ cluster, namespace, topic, remoteCluster, splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_replication_rate_out", replStats.msgRateOut,
+ cluster, namespace, topic, remoteCluster, splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_replication_throughput_in", replStats.msgThroughputIn,
+ cluster, namespace, topic, remoteCluster, splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_replication_throughput_out", replStats.msgThroughputOut,
+ cluster, namespace, topic, remoteCluster, splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_replication_backlog", replStats.replicationBacklog,
+ cluster, namespace, topic, remoteCluster, splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_replication_connected_count", replStats.connectedCount,
+ cluster, namespace, topic, remoteCluster, splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_replication_rate_expired", replStats.msgRateExpired,
+ cluster, namespace, topic, remoteCluster, splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_replication_delay_in_seconds", replStats.replicationDelayInSeconds,
+ cluster, namespace, topic, remoteCluster, splitTopicAndPartitionIndexLabel);
});
}
- metric(stream, cluster, namespace, topic, "pulsar_in_bytes_total", stats.bytesInCounter,
+ writeMetric(stream, "pulsar_in_bytes_total", stats.bytesInCounter, cluster, namespace, topic,
splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_in_messages_total", stats.msgInCounter,
+ writeMetric(stream, "pulsar_in_messages_total", stats.msgInCounter, cluster, namespace, topic,
splitTopicAndPartitionIndexLabel);
// Compaction
boolean hasCompaction = compactorMXBean.flatMap(mxBean -> mxBean.getCompactionRecordForTopic(topic))
- .map(__ -> true).orElse(false);
+ .isPresent();
if (hasCompaction) {
- metric(stream, cluster, namespace, topic, "pulsar_compaction_removed_event_count",
- stats.compactionRemovedEventCount, splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_compaction_succeed_count",
- stats.compactionSucceedCount, splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_compaction_failed_count",
- stats.compactionFailedCount, splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_compaction_duration_time_in_mills",
- stats.compactionDurationTimeInMills, splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_compaction_read_throughput",
- stats.compactionReadThroughput, splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_compaction_write_throughput",
- stats.compactionWriteThroughput, splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_compaction_compacted_entries_count",
- stats.compactionCompactedEntriesCount, splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_compaction_compacted_entries_size",
- stats.compactionCompactedEntriesSize, splitTopicAndPartitionIndexLabel);
- long[] compactionLatencyBuckets = stats.compactionLatencyBuckets.getBuckets();
- metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_0_5",
- compactionLatencyBuckets[0], splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_1",
- compactionLatencyBuckets[1], splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_5",
- compactionLatencyBuckets[2], splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_10",
- compactionLatencyBuckets[3], splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_20",
- compactionLatencyBuckets[4], splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_50",
- compactionLatencyBuckets[5], splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_100",
- compactionLatencyBuckets[6], splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_200",
- compactionLatencyBuckets[7], splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_1000",
- compactionLatencyBuckets[8], splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_overflow",
- compactionLatencyBuckets[9], splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_sum",
- stats.compactionLatencyBuckets.getSum(), splitTopicAndPartitionIndexLabel);
- metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_count",
- stats.compactionLatencyBuckets.getCount(), splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_compaction_removed_event_count",
+ stats.compactionRemovedEventCount, cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_compaction_succeed_count",
+ stats.compactionSucceedCount, cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_compaction_failed_count",
+ stats.compactionFailedCount, cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_compaction_duration_time_in_mills",
+ stats.compactionDurationTimeInMills, cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_compaction_read_throughput",
+ stats.compactionReadThroughput, cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_compaction_write_throughput",
+ stats.compactionWriteThroughput, cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_compaction_compacted_entries_count",
+ stats.compactionCompactedEntriesCount, cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_compaction_compacted_entries_size",
+ stats.compactionCompactedEntriesSize, cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+
+ long[] compactionBuckets = stats.compactionLatencyBuckets.getBuckets();
+ writeMetric(stream, "pulsar_compaction_latency_le_0_5",
+ compactionBuckets[0], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_compaction_latency_le_1",
+ compactionBuckets[1], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_compaction_latency_le_5",
+ compactionBuckets[2], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_compaction_latency_le_10",
+ compactionBuckets[3], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_compaction_latency_le_20",
+ compactionBuckets[4], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_compaction_latency_le_50",
+ compactionBuckets[5], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_compaction_latency_le_100",
+ compactionBuckets[6], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_compaction_latency_le_200",
+ compactionBuckets[7], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_compaction_latency_le_1000",
+ compactionBuckets[8], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_compaction_latency_overflow",
+ compactionBuckets[9], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_compaction_latency_sum",
+ stats.compactionLatencyBuckets.getSum(), cluster, namespace, topic,
+ splitTopicAndPartitionIndexLabel);
+ writeMetric(stream, "pulsar_compaction_latency_count",
+ stats.compactionLatencyBuckets.getCount(), cluster, namespace, topic,
+ splitTopicAndPartitionIndexLabel);
}
}
- static void metricType(SimpleTextOutputStream stream, String name) {
-
- if (!metricWithTypeDefinition.containsKey(name)) {
- metricWithTypeDefinition.put(name, "gauge");
- stream.write("# TYPE ").write(name).write(" gauge\n");
- }
-
- }
-
- private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic,
- String name, double value, boolean splitTopicAndPartitionIndexLabel) {
- metricType(stream, name);
- appendRequiredLabels(stream, cluster, namespace, topic, name, splitTopicAndPartitionIndexLabel).write("\"} ");
- stream.write(value);
- appendEndings(stream);
+ private static void writeMetric(PrometheusMetricStreams stream, String metricName, Number value, String cluster,
+ String namespace, String topic, boolean splitTopicAndPartitionIndexLabel) {
+ writeTopicMetric(stream, metricName, value, cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
}
- private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic,
- String subscription, String name, long value, boolean splitTopicAndPartitionIndexLabel) {
- metricType(stream, name);
- appendRequiredLabels(stream, cluster, namespace, topic, name, splitTopicAndPartitionIndexLabel)
- .write("\",subscription=\"").write(subscription).write("\"} ");
- stream.write(value);
- appendEndings(stream);
+ private static void writeMetric(PrometheusMetricStreams stream, String metricName, Number value, String cluster,
+ String namespace, String topic, String remoteCluster,
+ boolean splitTopicAndPartitionIndexLabel) {
+ writeTopicMetric(stream, metricName, value, cluster, namespace, topic, splitTopicAndPartitionIndexLabel,
+ "remote_cluster", remoteCluster);
}
- private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic,
- String producerName, long produceId, String name, double value, boolean splitTopicAndPartitionIndexLabel) {
- metricType(stream, name);
- appendRequiredLabels(stream, cluster, namespace, topic, name, splitTopicAndPartitionIndexLabel)
- .write("\",producer_name=\"").write(producerName)
- .write("\",producer_id=\"").write(produceId).write("\"} ");
- stream.write(value);
- appendEndings(stream);
+ private static void writeProducerMetric(PrometheusMetricStreams stream, String metricName, Number value,
+ String cluster, String namespace, String topic, String producer,
+ long producerId, boolean splitTopicAndPartitionIndexLabel) {
+ writeTopicMetric(stream, metricName, value, cluster, namespace, topic, splitTopicAndPartitionIndexLabel,
+ "producer_name", producer, "producer_id", String.valueOf(producerId));
}
- private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic,
- String subscription, String name, double value, boolean splitTopicAndPartitionIndexLabel) {
- metricType(stream, name);
- appendRequiredLabels(stream, cluster, namespace, topic, name, splitTopicAndPartitionIndexLabel)
- .write("\",subscription=\"").write(subscription).write("\"} ");
- stream.write(value);
- appendEndings(stream);
- }
-
- private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic,
- String subscription, String consumerName, long consumerId, String name, long value,
- boolean splitTopicAndPartitionIndexLabel) {
- metricType(stream, name);
- appendRequiredLabels(stream, cluster, namespace, topic, name, splitTopicAndPartitionIndexLabel)
- .write("\",subscription=\"").write(subscription)
- .write("\",consumer_name=\"").write(consumerName).write("\",consumer_id=\"").write(consumerId)
- .write("\"} ");
- stream.write(value);
- appendEndings(stream);
- }
- private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic,
- String subscription, String consumerName, long consumerId, String name, double value,
- boolean splitTopicAndPartitionIndexLabel) {
- metricType(stream, name);
- appendRequiredLabels(stream, cluster, namespace, topic, name, splitTopicAndPartitionIndexLabel)
- .write("\",subscription=\"").write(subscription)
- .write("\",consumer_name=\"").write(consumerName).write("\",consumer_id=\"")
- .write(consumerId).write("\"} ");
- stream.write(value);
- appendEndings(stream);
+ private static void writeSubscriptionMetric(PrometheusMetricStreams stream, String metricName, Number value,
+ String cluster, String namespace, String topic, String subscription,
+ boolean splitTopicAndPartitionIndexLabel) {
+ writeTopicMetric(stream, metricName, value, cluster, namespace, topic, splitTopicAndPartitionIndexLabel,
+ "subscription", subscription);
}
- private static void metricWithRemoteCluster(SimpleTextOutputStream stream, String cluster, String namespace,
- String topic, String name, String remoteCluster, double value, boolean splitTopicAndPartitionIndexLabel) {
- metricType(stream, name);
- appendRequiredLabels(stream, cluster, namespace, topic, name, splitTopicAndPartitionIndexLabel)
- .write("\",remote_cluster=\"").write(remoteCluster).write("\"} ");
- stream.write(value);
- appendEndings(stream);
+ private static void writeConsumerMetric(PrometheusMetricStreams stream, String metricName, Number value,
+ String cluster, String namespace, String topic, String subscription,
+ Consumer consumer, boolean splitTopicAndPartitionIndexLabel) {
+ writeTopicMetric(stream, metricName, value, cluster, namespace, topic, splitTopicAndPartitionIndexLabel,
+ "subscription", subscription, "consumer_name", consumer.consumerName(),
+ "consumer_id", String.valueOf(consumer.consumerId()));
}
- private static SimpleTextOutputStream appendRequiredLabels(SimpleTextOutputStream stream, String cluster,
- String namespace, String topic, String name, boolean splitTopicAndPartitionIndexLabel) {
- stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace);
+ static void writeTopicMetric(PrometheusMetricStreams stream, String metricName, Number value, String cluster,
+ String namespace, String topic, boolean splitTopicAndPartitionIndexLabel,
+ String... extraLabelsAndValues) {
+ String[] labelsAndValues = new String[splitTopicAndPartitionIndexLabel ? 8 : 6];
+ labelsAndValues[0] = "cluster";
+ labelsAndValues[1] = cluster;
+ labelsAndValues[2] = "namespace";
+ labelsAndValues[3] = namespace;
+ labelsAndValues[4] = "topic";
if (splitTopicAndPartitionIndexLabel) {
int index = topic.indexOf(PARTITIONED_TOPIC_SUFFIX);
if (index > 0) {
- stream.write("\",topic=\"").write(topic.substring(0, index)).write("\",partition=\"")
- .write(topic.substring(index + PARTITIONED_TOPIC_SUFFIX.length()));
+ labelsAndValues[5] = topic.substring(0, index);
+ labelsAndValues[6] = "partition";
+ labelsAndValues[7] = topic.substring(index + PARTITIONED_TOPIC_SUFFIX.length());
} else {
- stream.write("\",topic=\"").write(topic).write("\",partition=\"").write("-1");
+ labelsAndValues[5] = topic;
+ labelsAndValues[6] = "partition";
+ labelsAndValues[7] = "-1";
}
} else {
- stream.write("\",topic=\"").write(topic);
+ labelsAndValues[5] = topic;
}
- return stream;
- }
-
- private static void appendEndings(SimpleTextOutputStream stream) {
- stream.write(' ').write(System.currentTimeMillis()).write('\n');
+ String[] labels = ArrayUtils.addAll(labelsAndValues, extraLabelsAndValues);
+ stream.writeSample(metricName, value, labels);
}
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java
index e6ac1535f43..8c58b516333 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java
@@ -20,8 +20,6 @@ package org.apache.pulsar.broker.stats.prometheus;
import static org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsEventsNames;
import io.netty.util.concurrent.FastThreadLocal;
-import java.util.HashMap;
-import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl;
@@ -30,7 +28,6 @@ import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.util.SimpleTextOutputStream;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
import org.apache.pulsar.transaction.coordinator.impl.TransactionMetadataStoreStats;
@@ -38,21 +35,10 @@ import org.apache.pulsar.transaction.coordinator.impl.TransactionMetadataStoreSt
@Slf4j
public class TransactionAggregator {
- /**
- * Used for tracking duplicate TYPE definitions.
- */
- private static final FastThreadLocal<Map<String, String>> threadLocalMetricWithTypeDefinition =
- new FastThreadLocal() {
- @Override
- protected Map<String, String> initialValue() {
- return new HashMap<>();
- }
- };
-
private static final FastThreadLocal<AggregatedTransactionCoordinatorStats> localTransactionCoordinatorStats =
new FastThreadLocal<AggregatedTransactionCoordinatorStats>() {
@Override
- protected AggregatedTransactionCoordinatorStats initialValue() throws Exception {
+ protected AggregatedTransactionCoordinatorStats initialValue() {
return new AggregatedTransactionCoordinatorStats();
}
};
@@ -60,21 +46,18 @@ public class TransactionAggregator {
private static final FastThreadLocal<ManagedLedgerStats> localManageLedgerStats =
new FastThreadLocal<ManagedLedgerStats>() {
@Override
- protected ManagedLedgerStats initialValue() throws Exception {
+ protected ManagedLedgerStats initialValue() {
return new ManagedLedgerStats();
}
};
- public static void generate(PulsarService pulsar, SimpleTextOutputStream stream, boolean includeTopicMetrics) {
+ public static void generate(PulsarService pulsar, PrometheusMetricStreams stream, boolean includeTopicMetrics) {
String cluster = pulsar.getConfiguration().getClusterName();
- Map<String, String> metricWithTypeDefinition = threadLocalMetricWithTypeDefinition.get();
- metricWithTypeDefinition.clear();
if (includeTopicMetrics) {
- pulsar.getBrokerService().getMultiLayerTopicMap().forEach((namespace, bundlesMap) -> {
- bundlesMap.forEach((bundle, topicsMap) -> {
- topicsMap.forEach((name, topic) -> {
+ pulsar.getBrokerService().getMultiLayerTopicMap().forEach((namespace, bundlesMap) ->
+ bundlesMap.forEach((bundle, topicsMap) -> topicsMap.forEach((name, topic) -> {
if (topic instanceof PersistentTopic) {
topic.getSubscriptions().values().forEach(subscription -> {
try {
@@ -82,9 +65,8 @@ public class TransactionAggregator {
if (!checkTopicIsEventsNames(TopicName.get(subscription.getTopic().getName()))
&& subscription instanceof PersistentSubscription
&& ((PersistentSubscription) subscription).checkIfPendingAckStoreInit()) {
- ManagedLedger managedLedger =
- ((PersistentSubscription) subscription)
- .getPendingAckManageLedger().get();
+ ManagedLedger managedLedger = ((PersistentSubscription) subscription)
+ .getPendingAckManageLedger().get();
generateManageLedgerStats(managedLedger,
stream, cluster, namespace, name, subscription.getName());
}
@@ -93,9 +75,7 @@ public class TransactionAggregator {
}
});
}
- });
- });
- });
+ })));
}
AggregatedTransactionCoordinatorStats transactionCoordinatorStats = localTransactionCoordinatorStats.get();
@@ -124,18 +104,18 @@ public class TransactionAggregator {
localManageLedgerStats.get().reset();
if (transactionMetadataStore instanceof MLTransactionMetadataStore) {
- ManagedLedger managedLedger =
- ((MLTransactionMetadataStore) transactionMetadataStore).getManagedLedger();
+ ManagedLedger managedLedger =
+ ((MLTransactionMetadataStore) transactionMetadataStore).getManagedLedger();
generateManageLedgerStats(managedLedger,
stream, cluster, NamespaceName.SYSTEM_NAMESPACE.toString(),
MLTransactionLogImpl.TRANSACTION_LOG_PREFIX + transactionCoordinatorID.getId(),
MLTransactionLogImpl.TRANSACTION_SUBSCRIPTION_NAME);
}
- });
+ });
}
- private static void generateManageLedgerStats(ManagedLedger managedLedger, SimpleTextOutputStream stream,
+ private static void generateManageLedgerStats(ManagedLedger managedLedger, PrometheusMetricStreams stream,
String cluster, String namespace, String topic, String subscription) {
ManagedLedgerStats managedLedgerStats = localManageLedgerStats.get();
ManagedLedgerMBeanImpl mlStats = (ManagedLedgerMBeanImpl) managedLedger.getStats();
@@ -157,174 +137,149 @@ public class TransactionAggregator {
managedLedgerStats.storageWriteRate = mlStats.getAddEntryMessagesRate();
managedLedgerStats.storageReadRate = mlStats.getReadEntriesRate();
- printManageLedgerStats(stream, cluster, namespace, topic,
- subscription, managedLedgerStats);
- }
-
- private static void metricType(SimpleTextOutputStream stream, String name) {
- Map<String, String> metricWithTypeDefinition = threadLocalMetricWithTypeDefinition.get();
- if (!metricWithTypeDefinition.containsKey(name)) {
- metricWithTypeDefinition.put(name, "gauge");
- stream.write("# TYPE ").write(name).write(" gauge\n");
- }
-
- }
-
- private static void metric(SimpleTextOutputStream stream, String cluster, String name,
- double value, long coordinatorId) {
- metricType(stream, name);
- stream.write(name)
- .write("{cluster=\"").write(cluster)
- .write("\",coordinator_id=\"").write(coordinatorId).write("\"} ")
- .write(value).write(' ').write(System.currentTimeMillis())
- .write('\n');
+ printManageLedgerStats(stream, cluster, namespace, topic, subscription, managedLedgerStats);
}
- private static void metrics(SimpleTextOutputStream stream, String cluster, String namespace,
- String topic, String subscription, String name, long value) {
- stream.write(name).write("{cluster=\"").write(cluster).write("\", namespace=\"").write(namespace)
- .write("\",topic=\"").write(topic).write("\",subscription=\"").write(subscription).write("\"} ");
- stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
- }
-
- private static void metrics(SimpleTextOutputStream stream, String cluster, String namespace,
- String topic, String subscription, String name, double value) {
- stream.write(name).write("{cluster=\"").write(cluster).write("\", namespace=\"").write(namespace)
- .write("\",topic=\"").write(topic).write("\",subscription=\"").write(subscription).write("\"} ");
- stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
- }
-
- private static void printManageLedgerStats(SimpleTextOutputStream stream, String cluster, String namespace,
+ private static void printManageLedgerStats(PrometheusMetricStreams stream, String cluster, String namespace,
String topic, String subscription, ManagedLedgerStats stats) {
- metrics(stream, cluster, namespace, topic, subscription,
- "pulsar_storage_size", stats.storageSize);
- metrics(stream, cluster, namespace, topic, subscription,
- "pulsar_storage_logical_size", stats.storageLogicalSize);
- metrics(stream, cluster, namespace, topic, subscription,
- "pulsar_storage_backlog_size", stats.backlogSize);
- metrics(stream, cluster, namespace, topic, subscription,
- "pulsar_storage_offloaded_size", stats.offloadedStorageUsed);
+ writeMetric(stream, "pulsar_storage_size", stats.storageSize, cluster, namespace, topic, subscription);
+ writeMetric(stream, "pulsar_storage_logical_size", stats.storageLogicalSize, cluster, namespace, topic,
+ subscription);
+ writeMetric(stream, "pulsar_storage_backlog_size", stats.backlogSize, cluster, namespace, topic,
+ subscription);
+ writeMetric(stream, "pulsar_storage_offloaded_size", stats.offloadedStorageUsed, cluster, namespace, topic,
+ subscription);
- metrics(stream, cluster, namespace, topic, subscription,
- "pulsar_storage_write_rate", stats.storageWriteRate);
- metrics(stream, cluster, namespace, topic, subscription,
- "pulsar_storage_read_rate", stats.storageReadRate);
+ writeMetric(stream, "pulsar_storage_write_rate", stats.storageWriteRate, cluster, namespace, topic,
+ subscription);
+ writeMetric(stream, "pulsar_storage_read_rate", stats.storageReadRate, cluster, namespace, topic,
+ subscription);
stats.storageWriteLatencyBuckets.refresh();
long[] latencyBuckets = stats.storageWriteLatencyBuckets.getBuckets();
- metric(stream, cluster, namespace, topic, subscription,
- "pulsar_storage_write_latency_le_0_5", latencyBuckets[0]);
- metric(stream, cluster, namespace, topic, subscription,
- "pulsar_storage_write_latency_le_1", latencyBuckets[1]);
- metric(stream, cluster, namespace, topic, subscription,
- "pulsar_storage_write_latency_le_5", latencyBuckets[2]);
- metric(stream, cluster, namespace, topic, subscription,
- "pulsar_storage_write_latency_le_10", latencyBuckets[3]);
- metric(stream, cluster, namespace, topic, subscription,
- "pulsar_storage_write_latency_le_20", latencyBuckets[4]);
- metric(stream, cluster, namespace, topic, subscription,
- "pulsar_storage_write_latency_le_50", latencyBuckets[5]);
- metric(stream, cluster, namespace, topic, subscription,
- "pulsar_storage_write_latency_le_100", latencyBuckets[6]);
- metric(stream, cluster, namespace, topic, subscription,
- "pulsar_storage_write_latency_le_200", latencyBuckets[7]);
- metric(stream, cluster, namespace, topic, subscription,
- "pulsar_storage_write_latency_le_1000", latencyBuckets[8]);
- metric(stream, cluster, namespace, topic, subscription,
- "pulsar_storage_write_latency_overflow", latencyBuckets[9]);
- metric(stream, cluster, namespace, topic, subscription, "pulsar_storage_write_latency_count",
- stats.storageWriteLatencyBuckets.getCount());
- metric(stream, cluster, namespace, topic, subscription, "pulsar_storage_write_latency_sum",
- stats.storageWriteLatencyBuckets.getSum());
+ writeMetric(stream, "pulsar_storage_write_latency_le_0_5", latencyBuckets[0], cluster, namespace, topic,
+ subscription);
+ writeMetric(stream, "pulsar_storage_write_latency_le_1", latencyBuckets[1], cluster, namespace, topic,
+ subscription);
+ writeMetric(stream, "pulsar_storage_write_latency_le_5", latencyBuckets[2], cluster, namespace, topic,
+ subscription);
+ writeMetric(stream, "pulsar_storage_write_latency_le_10", latencyBuckets[3], cluster, namespace, topic,
+ subscription);
+ writeMetric(stream, "pulsar_storage_write_latency_le_20", latencyBuckets[4], cluster, namespace, topic,
+ subscription);
+ writeMetric(stream, "pulsar_storage_write_latency_le_50", latencyBuckets[5], cluster, namespace, topic,
+ subscription);
+ writeMetric(stream, "pulsar_storage_write_latency_le_100", latencyBuckets[6], cluster, namespace, topic,
+ subscription);
+ writeMetric(stream, "pulsar_storage_write_latency_le_200", latencyBuckets[7], cluster, namespace, topic,
+ subscription);
+ writeMetric(stream, "pulsar_storage_write_latency_le_1000", latencyBuckets[8], cluster, namespace, topic,
+ subscription);
+ writeMetric(stream, "pulsar_storage_write_latency_overflow", latencyBuckets[9], cluster, namespace, topic,
+ subscription);
+ writeMetric(stream, "pulsar_storage_write_latency_count", stats.storageWriteLatencyBuckets.getCount(),
+ cluster, namespace, topic, subscription);
+ writeMetric(stream, "pulsar_storage_write_latency_sum", stats.storageWriteLatencyBuckets.getSum(), cluster,
+ namespace, topic, subscription);
stats.storageLedgerWriteLatencyBuckets.refresh();
- long[] ledgerWritelatencyBuckets = stats.storageLedgerWriteLatencyBuckets.getBuckets();
- metric(stream, cluster, namespace, topic, subscription,
- "pulsar_storage_ledger_write_latency_le_0_5", ledgerWritelatencyBuckets[0]);
- metric(stream, cluster, namespace, topic, subscription,
- "pulsar_storage_ledger_write_latency_le_1", ledgerWritelatencyBuckets[1]);
- metric(stream, cluster, namespace, topic, subscription,
- "pulsar_storage_ledger_write_latency_le_5", ledgerWritelatencyBuckets[2]);
- metric(stream, cluster, namespace, topic, subscription,
- "pulsar_storage_ledger_write_latency_le_10", ledgerWritelatencyBuckets[3]);
- metric(stream, cluster, namespace, topic, subscription,
- "pulsar_storage_ledger_write_latency_le_20", ledgerWritelatencyBuckets[4]);
- metric(stream, cluster, namespace, topic, subscription,
- "pulsar_storage_ledger_write_latency_le_50", ledgerWritelatencyBuckets[5]);
- metric(stream, cluster, namespace, topic, subscription,
- "pulsar_storage_ledger_write_latency_le_100", ledgerWritelatencyBuckets[6]);
- metric(stream, cluster, namespace, topic, subscription,
- "pulsar_storage_ledger_write_latency_le_200", ledgerWritelatencyBuckets[7]);
- metric(stream, cluster, namespace, topic, subscription,
- "pulsar_storage_ledger_write_latency_le_1000", ledgerWritelatencyBuckets[8]);
- metric(stream, cluster, namespace, topic, subscription, "pulsar_storage_ledger_write_latency_overflow",
- ledgerWritelatencyBuckets[9]);
- metric(stream, cluster, namespace, topic, subscription, "pulsar_storage_ledger_write_latency_count",
- stats.storageLedgerWriteLatencyBuckets.getCount());
- metric(stream, cluster, namespace, topic, subscription, "pulsar_storage_ledger_write_latency_sum",
- stats.storageLedgerWriteLatencyBuckets.getSum());
+ long[] ledgerWriteLatencyBuckets = stats.storageLedgerWriteLatencyBuckets.getBuckets();
+ writeMetric(stream, "pulsar_storage_ledger_write_latency_le_0_5", ledgerWriteLatencyBuckets[0], cluster,
+ namespace, topic, subscription);
+ writeMetric(stream, "pulsar_storage_ledger_write_latency_le_1", ledgerWriteLatencyBuckets[1], cluster,
+ namespace, topic, subscription);
+ writeMetric(stream, "pulsar_storage_ledger_write_latency_le_5", ledgerWriteLatencyBuckets[2], cluster,
+ namespace, topic, subscription);
+ writeMetric(stream, "pulsar_storage_ledger_write_latency_le_10", ledgerWriteLatencyBuckets[3], cluster,
+ namespace, topic, subscription);
+ writeMetric(stream, "pulsar_storage_ledger_write_latency_le_20", ledgerWriteLatencyBuckets[4], cluster,
+ namespace, topic, subscription);
+ writeMetric(stream, "pulsar_storage_ledger_write_latency_le_50", ledgerWriteLatencyBuckets[5], cluster,
+ namespace, topic, subscription);
+ writeMetric(stream, "pulsar_storage_ledger_write_latency_le_100", ledgerWriteLatencyBuckets[6], cluster,
+ namespace, topic, subscription);
+ writeMetric(stream, "pulsar_storage_ledger_write_latency_le_200", ledgerWriteLatencyBuckets[7], cluster,
+ namespace, topic, subscription);
+ writeMetric(stream, "pulsar_storage_ledger_write_latency_le_1000", ledgerWriteLatencyBuckets[8], cluster,
+ namespace, topic, subscription);
+ writeMetric(stream, "pulsar_storage_ledger_write_latency_overflow", ledgerWriteLatencyBuckets[9], cluster,
+ namespace, topic, subscription);
+ writeMetric(stream, "pulsar_storage_ledger_write_latency_count",
+ stats.storageLedgerWriteLatencyBuckets.getCount(), cluster, namespace, topic, subscription);
+ writeMetric(stream, "pulsar_storage_ledger_write_latency_sum",
+ stats.storageLedgerWriteLatencyBuckets.getSum(), cluster, namespace, topic, subscription);
stats.entrySizeBuckets.refresh();
long[] entrySizeBuckets = stats.entrySizeBuckets.getBuckets();
- metric(stream, cluster, namespace, topic, subscription, "pulsar_entry_size_le_128", entrySizeBuckets[0]);
- metric(stream, cluster, namespace, topic, subscription, "pulsar_entry_size_le_512", entrySizeBuckets[1]);
- metric(stream, cluster, namespace, topic, subscription, "pulsar_entry_size_le_1_kb", entrySizeBuckets[2]);
- metric(stream, cluster, namespace, topic, subscription, "pulsar_entry_size_le_2_kb", entrySizeBuckets[3]);
- metric(stream, cluster, namespace, topic, subscription, "pulsar_entry_size_le_4_kb", entrySizeBuckets[4]);
- metric(stream, cluster, namespace, topic, subscription, "pulsar_entry_size_le_16_kb", entrySizeBuckets[5]);
- metric(stream, cluster, namespace, topic, subscription,
- "pulsar_entry_size_le_100_kb", entrySizeBuckets[6]);
- metric(stream, cluster, namespace, topic, subscription, "pulsar_entry_size_le_1_mb", entrySizeBuckets[7]);
- metric(stream, cluster, namespace, topic, subscription,
- "pulsar_entry_size_le_overflow", entrySizeBuckets[8]);
- metric(stream, cluster, namespace, topic, subscription,
- "pulsar_entry_size_count", stats.entrySizeBuckets.getCount());
- metric(stream, cluster, namespace, topic, subscription,
- "pulsar_entry_size_sum", stats.entrySizeBuckets.getSum());
- }
-
- private static void metric(SimpleTextOutputStream stream, String cluster,
- String namespace, String topic, String subscription,
- String name, long value) {
- stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace)
- .write("\",topic=\"").write(topic).write("\",subscription=\"").write(subscription).write("\"} ");
- stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
+ writeMetric(stream, "pulsar_entry_size_le_128", entrySizeBuckets[0], cluster, namespace, topic,
+ subscription);
+ writeMetric(stream, "pulsar_entry_size_le_512", entrySizeBuckets[1], cluster, namespace, topic,
+ subscription);
+ writeMetric(stream, "pulsar_entry_size_le_1_kb", entrySizeBuckets[2], cluster, namespace, topic,
+ subscription);
+ writeMetric(stream, "pulsar_entry_size_le_2_kb", entrySizeBuckets[3], cluster, namespace, topic,
+ subscription);
+ writeMetric(stream, "pulsar_entry_size_le_4_kb", entrySizeBuckets[4], cluster, namespace, topic,
+ subscription);
+ writeMetric(stream, "pulsar_entry_size_le_16_kb", entrySizeBuckets[5], cluster, namespace, topic,
+ subscription);
+ writeMetric(stream, "pulsar_entry_size_le_100_kb", entrySizeBuckets[6], cluster, namespace, topic,
+ subscription);
+ writeMetric(stream, "pulsar_entry_size_le_1_mb", entrySizeBuckets[7], cluster, namespace, topic,
+ subscription);
+ writeMetric(stream, "pulsar_entry_size_le_overflow", entrySizeBuckets[8], cluster, namespace, topic,
+ subscription);
+ writeMetric(stream, "pulsar_entry_size_count", stats.entrySizeBuckets.getCount(), cluster, namespace,
+ topic, subscription);
+ writeMetric(stream, "pulsar_entry_size_sum", stats.entrySizeBuckets.getSum(), cluster, namespace, topic,
+ subscription);
}
- static void printTransactionCoordinatorStats(SimpleTextOutputStream stream, String cluster,
+ static void printTransactionCoordinatorStats(PrometheusMetricStreams stream, String cluster,
AggregatedTransactionCoordinatorStats stats,
long coordinatorId) {
- metric(stream, cluster, "pulsar_txn_active_count",
- stats.actives, coordinatorId);
- metric(stream, cluster, "pulsar_txn_committed_count",
- stats.committedCount, coordinatorId);
- metric(stream, cluster, "pulsar_txn_aborted_count",
- stats.abortedCount, coordinatorId);
- metric(stream, cluster, "pulsar_txn_created_count",
- stats.createdCount, coordinatorId);
- metric(stream, cluster, "pulsar_txn_timeout_count",
- stats.timeoutCount, coordinatorId);
- metric(stream, cluster, "pulsar_txn_append_log_count",
- stats.appendLogCount, coordinatorId);
+ writeMetric(stream, "pulsar_txn_active_count", stats.actives, cluster,
+ coordinatorId);
+ writeMetric(stream, "pulsar_txn_committed_count", stats.committedCount, cluster,
+ coordinatorId);
+ writeMetric(stream, "pulsar_txn_aborted_count", stats.abortedCount, cluster,
+ coordinatorId);
+ writeMetric(stream, "pulsar_txn_created_count", stats.createdCount, cluster,
+ coordinatorId);
+ writeMetric(stream, "pulsar_txn_timeout_count", stats.timeoutCount, cluster,
+ coordinatorId);
+ writeMetric(stream, "pulsar_txn_append_log_count", stats.appendLogCount, cluster,
+ coordinatorId);
long[] latencyBuckets = stats.executionLatency;
- metric(stream, cluster, "pulsar_txn_execution_latency_le_10", latencyBuckets[0], coordinatorId);
- metric(stream, cluster, "pulsar_txn_execution_latency_le_20", latencyBuckets[1], coordinatorId);
- metric(stream, cluster, "pulsar_txn_execution_latency_le_50", latencyBuckets[2], coordinatorId);
- metric(stream, cluster, "pulsar_txn_execution_latency_le_100", latencyBuckets[3], coordinatorId);
- metric(stream, cluster, "pulsar_txn_execution_latency_le_500", latencyBuckets[4], coordinatorId);
- metric(stream, cluster, "pulsar_txn_execution_latency_le_1000", latencyBuckets[5], coordinatorId);
- metric(stream, cluster, "pulsar_txn_execution_latency_le_5000", latencyBuckets[6], coordinatorId);
- metric(stream, cluster, "pulsar_txn_execution_latency_le_15000", latencyBuckets[7], coordinatorId);
- metric(stream, cluster, "pulsar_txn_execution_latency_le_30000", latencyBuckets[8], coordinatorId);
- metric(stream, cluster, "pulsar_txn_execution_latency_le_60000", latencyBuckets[9], coordinatorId);
- metric(stream, cluster, "pulsar_txn_execution_latency_le_300000",
- latencyBuckets[10], coordinatorId);
- metric(stream, cluster, "pulsar_txn_execution_latency_le_1500000",
- latencyBuckets[11], coordinatorId);
- metric(stream, cluster, "pulsar_txn_execution_latency_le_3000000",
- latencyBuckets[12], coordinatorId);
- metric(stream, cluster, "pulsar_txn_execution_latency_le_overflow",
- latencyBuckets[13], coordinatorId);
+ writeMetric(stream, "pulsar_txn_execution_latency_le_10", latencyBuckets[0], cluster, coordinatorId);
+ writeMetric(stream, "pulsar_txn_execution_latency_le_20", latencyBuckets[1], cluster, coordinatorId);
+ writeMetric(stream, "pulsar_txn_execution_latency_le_50", latencyBuckets[2], cluster, coordinatorId);
+ writeMetric(stream, "pulsar_txn_execution_latency_le_100", latencyBuckets[3], cluster, coordinatorId);
+ writeMetric(stream, "pulsar_txn_execution_latency_le_500", latencyBuckets[4], cluster, coordinatorId);
+ writeMetric(stream, "pulsar_txn_execution_latency_le_1000", latencyBuckets[5], cluster, coordinatorId);
+ writeMetric(stream, "pulsar_txn_execution_latency_le_5000", latencyBuckets[6], cluster, coordinatorId);
+ writeMetric(stream, "pulsar_txn_execution_latency_le_15000", latencyBuckets[7], cluster, coordinatorId);
+ writeMetric(stream, "pulsar_txn_execution_latency_le_30000", latencyBuckets[8], cluster, coordinatorId);
+ writeMetric(stream, "pulsar_txn_execution_latency_le_60000", latencyBuckets[9], cluster, coordinatorId);
+ writeMetric(stream, "pulsar_txn_execution_latency_le_300000", latencyBuckets[10], cluster,
+ coordinatorId);
+ writeMetric(stream, "pulsar_txn_execution_latency_le_1500000", latencyBuckets[11], cluster,
+ coordinatorId);
+ writeMetric(stream, "pulsar_txn_execution_latency_le_3000000", latencyBuckets[12], cluster,
+ coordinatorId);
+ writeMetric(stream, "pulsar_txn_execution_latency_le_overflow", latencyBuckets[13], cluster,
+ coordinatorId);
+ }
+
+ private static void writeMetric(PrometheusMetricStreams stream, String metricName, double value, String cluster,
+ long coordinatorId) {
+ stream.writeSample(metricName, value, "cluster", cluster, "coordinator_id", String.valueOf(coordinatorId));
+ }
+
+ private static void writeMetric(PrometheusMetricStreams stream, String metricName, Number value, String cluster,
+ String namespace, String topic, String subscription) {
+ stream.writeSample(metricName, value, "cluster", cluster, "namespace", namespace, "topic", topic,
+ "subscription", subscription);
}
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusTextFormatUtil.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusTextFormatUtil.java
index 7550096c2b5..8f704b11e76 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusTextFormatUtil.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusTextFormatUtil.java
@@ -18,13 +18,8 @@
*/
package org.apache.pulsar.broker.stats.prometheus.metrics;
-import io.prometheus.client.Collector;
-import io.prometheus.client.Collector.MetricFamilySamples;
-import io.prometheus.client.Collector.MetricFamilySamples.Sample;
-import io.prometheus.client.CollectorRegistry;
import java.io.IOException;
import java.io.Writer;
-import java.util.Enumeration;
import org.apache.bookkeeper.stats.Counter;
/**
@@ -140,31 +135,4 @@ public class PrometheusTextFormatUtil {
.append(success.toString()).append("\"} ")
.append(Double.toString(opStat.getSum(success))).append('\n');
}
-
- public static void writeMetricsCollectedByPrometheusClient(Writer w, CollectorRegistry registry)
- throws IOException {
- Enumeration<MetricFamilySamples> metricFamilySamples = registry.metricFamilySamples();
- while (metricFamilySamples.hasMoreElements()) {
- MetricFamilySamples metricFamily = metricFamilySamples.nextElement();
-
- for (int i = 0; i < metricFamily.samples.size(); i++) {
- Sample sample = metricFamily.samples.get(i);
- w.write(sample.name);
- w.write('{');
- for (int j = 0; j < sample.labelNames.size(); j++) {
- if (j != 0) {
- w.write(", ");
- }
- w.write(sample.labelNames.get(j));
- w.write("=\"");
- w.write(sample.labelValues.get(j));
- w.write('"');
- }
-
- w.write("} ");
- w.write(Collector.doubleToGoString(sample.value));
- w.write('\n');
- }
- }
- }
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index f28412ea751..be10c8dc65e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -50,6 +50,7 @@ import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.crypto.SecretKey;
@@ -1404,6 +1405,63 @@ public class PrometheusMetricsTest extends BrokerTestBase {
assertEquals(cm.get(0).value, count);
}
+ @Test
+ public void testMetricsGroupedByTypeDefinitions() throws Exception {
+ Producer<byte[]> p1 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();
+ Producer<byte[]> p2 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic2").create();
+ for (int i = 0; i < 10; i++) {
+ String message = "my-message-" + i;
+ p1.send(message.getBytes());
+ p2.send(message.getBytes());
+ }
+
+ ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
+ PrometheusMetricsGenerator.generate(pulsar, false, false, false, statsOut);
+ String metricsStr = statsOut.toString();
+
+ Pattern typePattern = Pattern.compile("^#\\s+TYPE\\s+(\\w+)\\s+(\\w+)");
+ Pattern metricNamePattern = Pattern.compile("^(\\w+)\\{.+");
+
+ AtomicReference<String> currentMetric = new AtomicReference<>();
+ Splitter.on("\n").split(metricsStr).forEach(line -> {
+ if (line.isEmpty()) {
+ return;
+ }
+ if (line.startsWith("#")) {
+ // Get the current type definition
+ Matcher typeMatcher = typePattern.matcher(line);
+ checkArgument(typeMatcher.matches());
+ String metricName = typeMatcher.group(1);
+ currentMetric.set(metricName);
+ } else {
+ Matcher metricMatcher = metricNamePattern.matcher(line);
+ checkArgument(metricMatcher.matches());
+ String metricName = metricMatcher.group(1);
+
+ if (metricName.endsWith("_bucket")) {
+ metricName = metricName.substring(0, metricName.indexOf("_bucket"));
+ } else if (metricName.endsWith("_count") && !currentMetric.get().endsWith("_count")) {
+ metricName = metricName.substring(0, metricName.indexOf("_count"));
+ } else if (metricName.endsWith("_sum") && !currentMetric.get().endsWith("_sum")) {
+ metricName = metricName.substring(0, metricName.indexOf("_sum"));
+ } else if (metricName.endsWith("_total") && !currentMetric.get().endsWith("_total")) {
+ metricName = metricName.substring(0, metricName.indexOf("_total"));
+ } else if (metricName.endsWith("_created") && !currentMetric.get().endsWith("_created")) {
+ metricName = metricName.substring(0, metricName.indexOf("_created"));
+ }
+
+ if (!metricName.equals(currentMetric.get())) {
+ System.out.println(metricsStr);
+ fail("Metric not grouped under its type definition: " + line);
+ }
+
+ }
+ });
+
+ p1.close();
+ p2.close();
+ }
+
/**
* Hacky parsing of Prometheus text format. Should be good enough for unit tests
*/
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricStreamsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricStreamsTest.java
new file mode 100644
index 00000000000..15c29a0dc66
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricStreamsTest.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats.prometheus;
+
+import static org.testng.Assert.assertTrue;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import java.io.ByteArrayOutputStream;
+import java.nio.charset.StandardCharsets;
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker")
+public class PrometheusMetricStreamsTest {
+
+ private PrometheusMetricStreams underTest;
+
+ @BeforeMethod(alwaysRun = true)
+ protected void setup() throws Exception {
+ underTest = new PrometheusMetricStreams();
+ }
+
+ @AfterMethod(alwaysRun = true)
+ protected void cleanup() throws Exception {
+ underTest.releaseAll();
+ }
+
+ @Test
+ public void canWriteSampleWithoutLabels() {
+ underTest.writeSample("my-metric", 123);
+
+ String actual = writeToString();
+
+ assertTrue(actual.startsWith("# TYPE my-metric gauge"), "Gauge type line missing");
+ assertTrue(actual.contains("my-metric{} 123"), "Metric line missing");
+ }
+
+ @Test
+ public void canWriteSampleWithLabels() {
+ underTest.writeSample("my-other-metric", 123, "cluster", "local");
+ underTest.writeSample("my-other-metric", 456, "cluster", "local", "namespace", "my-ns");
+
+ String actual = writeToString();
+
+ assertTrue(actual.startsWith("# TYPE my-other-metric gauge"), "Gauge type line missing");
+ assertTrue(actual.contains("my-other-metric{cluster=\"local\"} 123"), "Cluster metric line missing");
+ assertTrue(actual.contains("my-other-metric{cluster=\"local\",namespace=\"my-ns\"} 456"),
+ "Cluster and Namespace metric line missing");
+ }
+
+ private String writeToString() {
+ ByteBuf buffer = ByteBufAllocator.DEFAULT.directBuffer();
+ try {
+ SimpleTextOutputStream stream = new SimpleTextOutputStream(buffer);
+ underTest.flushAllToStream(stream);
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ int readIndex = buffer.readerIndex();
+ int readableBytes = buffer.readableBytes();
+ for (int i = 0; i < readableBytes; i++) {
+ out.write(buffer.getByte(readIndex + i));
+ }
+ return out.toString();
+ } finally {
+ buffer.release();
+ }
+ }
+}
\ No newline at end of file
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/SimpleTextOutputStream.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/SimpleTextOutputStream.java
index 9fc4b347c85..dd78b4cfe58 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/SimpleTextOutputStream.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/SimpleTextOutputStream.java
@@ -22,12 +22,11 @@ import io.netty.buffer.ByteBuf;
/**
* Format strings and numbers into a ByteBuf without any memory allocation.
- *
*/
public class SimpleTextOutputStream {
private final ByteBuf buffer;
- private static final char[] hexChars = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e',
- 'f' };
+ private static final char[] hexChars = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e',
+ 'f'};
public SimpleTextOutputStream(ByteBuf buffer) {
this.buffer = buffer;
@@ -131,4 +130,12 @@ public class SimpleTextOutputStream {
write(r);
return this;
}
+
+ public void write(ByteBuf byteBuf) {
+ buffer.writeBytes(byteBuf);
+ }
+
+ public ByteBuf getBuffer() {
+ return buffer;
+ }
}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/PrometheusTextFormat.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/PrometheusTextFormat.java
index f7a205c7db0..f5aa273f656 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/PrometheusTextFormat.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/PrometheusTextFormat.java
@@ -37,6 +37,11 @@ public class PrometheusTextFormat {
*/
while (mfs.hasMoreElements()) {
Collector.MetricFamilySamples metricFamilySamples = mfs.nextElement();
+ writer.write("# TYPE ");
+ writer.write(metricFamilySamples.name);
+ writer.write(' ');
+ writer.write(metricFamilySamples.type.name().toLowerCase());
+ writer.write('\n');
for (Collector.MetricFamilySamples.Sample sample : metricFamilySamples.samples) {
writer.write(sample.name);
if (sample.labelNames.size() > 0) {
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerStatsManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerStatsManager.java
index c8b411cbf57..2ad407b2e5e 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerStatsManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerStatsManager.java
@@ -328,6 +328,11 @@ public class WorkerStatsManager {
}
private void writeMetric(String metricName, long value, StringWriter stream) {
+ stream.write("# TYPE ");
+ stream.write(PULSAR_FUNCTION_WORKER_METRICS_PREFIX);
+ stream.write(metricName);
+ stream.write(" gauge \n");
+
stream.write(PULSAR_FUNCTION_WORKER_METRICS_PREFIX);
stream.write(metricName);
stream.write("{");