You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/09/24 03:44:20 UTC
[pulsar] branch branch-2.9 updated: Revert "[fix][broker][functions-worker] Ensure prometheus metrics are grouped by type (#8407, #13865) (#17618)"
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 3564f423c48 Revert "[fix][broker][functions-worker] Ensure prometheus metrics are grouped by type (#8407, #13865) (#17618)"
3564f423c48 is described below
commit 3564f423c4893f6f097d8a9c7e9e3d99687ca776
Author: penghui <pe...@apache.org>
AuthorDate: Sat Sep 24 11:43:08 2022 +0800
Revert "[fix][broker][functions-worker] Ensure prometheus metrics are grouped by type (#8407, #13865) (#17618)"
This reverts commit bc69cfbe9d3cf06fc6e407c284eed45f095b41be.
---
.../stats/prometheus/AggregatedNamespaceStats.java | 2 +-
.../stats/prometheus/NamespaceStatsAggregator.java | 354 ++++++------
.../stats/prometheus/PrometheusMetricStreams.java | 75 ---
.../prometheus/PrometheusMetricsGenerator.java | 60 +--
.../pulsar/broker/stats/prometheus/TopicStats.java | 598 +++++++++++----------
.../stats/prometheus/TransactionAggregator.java | 321 ++++++-----
.../metrics/PrometheusTextFormatUtil.java | 32 ++
.../pulsar/broker/stats/PrometheusMetricsTest.java | 58 --
.../prometheus/PrometheusMetricStreamsTest.java | 85 ---
.../pulsar/common/util/SimpleTextOutputStream.java | 13 +-
.../instance/stats/PrometheusTextFormat.java | 5 -
.../functions/worker/WorkerStatsManager.java | 5 -
12 files changed, 730 insertions(+), 878 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
index 1980af91b7b..5610dbab218 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
@@ -96,7 +96,7 @@ public class AggregatedNamespaceStats {
stats.replicationStats.forEach((n, as) -> {
AggregatedReplicationStats replStats =
- replicationStats.computeIfAbsent(n, k -> new AggregatedReplicationStats());
+ replicationStats.computeIfAbsent(n, k -> new AggregatedReplicationStats());
replStats.msgRateIn += as.msgRateIn;
replStats.msgRateOut += as.msgRateOut;
replStats.msgThroughputIn += as.msgThroughputIn;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
index 29915f071c0..16e438e2a2e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
@@ -19,11 +19,8 @@
package org.apache.pulsar.broker.stats.prometheus;
import io.netty.util.concurrent.FastThreadLocal;
-import java.util.HashMap;
-import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.LongAdder;
-import java.util.function.Function;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.ManagedLedger;
@@ -35,6 +32,7 @@ import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
import org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl;
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
import org.apache.pulsar.compaction.CompactedTopicContext;
import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.compaction.CompactorMXBean;
@@ -42,75 +40,72 @@ import org.apache.pulsar.compaction.CompactorMXBean;
@Slf4j
public class NamespaceStatsAggregator {
- private static final FastThreadLocal<AggregatedNamespaceStats> localNamespaceStats =
+ private static FastThreadLocal<AggregatedNamespaceStats> localNamespaceStats =
new FastThreadLocal<AggregatedNamespaceStats>() {
@Override
- protected AggregatedNamespaceStats initialValue() {
+ protected AggregatedNamespaceStats initialValue() throws Exception {
return new AggregatedNamespaceStats();
}
};
- private static final FastThreadLocal<TopicStats> localTopicStats = new FastThreadLocal<TopicStats>() {
+ private static FastThreadLocal<TopicStats> localTopicStats = new FastThreadLocal<TopicStats>() {
@Override
- protected TopicStats initialValue() {
+ protected TopicStats initialValue() throws Exception {
return new TopicStats();
}
};
public static void generate(PulsarService pulsar, boolean includeTopicMetrics, boolean includeConsumerMetrics,
- boolean includeProducerMetrics, boolean splitTopicAndPartitionIndexLabel,
- PrometheusMetricStreams stream) {
+ boolean includeProducerMetrics, boolean splitTopicAndPartitionIndexLabel, SimpleTextOutputStream stream) {
String cluster = pulsar.getConfiguration().getClusterName();
AggregatedNamespaceStats namespaceStats = localNamespaceStats.get();
+ TopicStats.resetTypes();
TopicStats topicStats = localTopicStats.get();
- Optional<CompactorMXBean> compactorMXBean = getCompactorMXBean(pulsar);
- LongAdder topicsCount = new LongAdder();
- Map<String, Long> localNamespaceTopicCount = new HashMap<>();
printDefaultBrokerStats(stream, cluster);
+ Optional<CompactorMXBean> compactorMXBean = getCompactorMXBean(pulsar);
+ LongAdder topicsCount = new LongAdder();
pulsar.getBrokerService().getMultiLayerTopicMap().forEach((namespace, bundlesMap) -> {
namespaceStats.reset();
topicsCount.reset();
- bundlesMap.forEach((bundle, topicsMap) -> topicsMap.forEach((name, topic) -> {
- getTopicStats(topic, topicStats, includeConsumerMetrics, includeProducerMetrics,
- pulsar.getConfiguration().isExposePreciseBacklogInPrometheus(),
- pulsar.getConfiguration().isExposeSubscriptionBacklogSizeInPrometheus(),
- compactorMXBean
- );
-
- if (includeTopicMetrics) {
- topicsCount.add(1);
- TopicStats.printTopicStats(stream, topicStats, compactorMXBean, cluster, namespace, name,
- splitTopicAndPartitionIndexLabel);
- } else {
- namespaceStats.updateStats(topicStats);
- }
- }));
+ bundlesMap.forEach((bundle, topicsMap) -> {
+ topicsMap.forEach((name, topic) -> {
+ getTopicStats(topic, topicStats, includeConsumerMetrics, includeProducerMetrics,
+ pulsar.getConfiguration().isExposePreciseBacklogInPrometheus(),
+ pulsar.getConfiguration().isExposeSubscriptionBacklogSizeInPrometheus(),
+ compactorMXBean
+ );
+
+ if (includeTopicMetrics) {
+ topicsCount.add(1);
+ TopicStats.printTopicStats(stream, cluster, namespace, name, topicStats, compactorMXBean,
+ splitTopicAndPartitionIndexLabel);
+ } else {
+ namespaceStats.updateStats(topicStats);
+ }
+ });
+ });
if (!includeTopicMetrics) {
- // Only include namespace level stats if we don't have the per-topic, otherwise we're going to
- // report the same data twice, and it will make the aggregation difficult
- printNamespaceStats(stream, namespaceStats, cluster, namespace);
+ // Only include namespace level stats if we don't have the per-topic, otherwise we're going to report
+ // the same data twice, and it will make the aggregation difficult
+ printNamespaceStats(stream, cluster, namespace, namespaceStats);
} else {
- localNamespaceTopicCount.put(namespace, topicsCount.sum());
+ printTopicsCountStats(stream, cluster, namespace, topicsCount);
}
});
-
- if (includeTopicMetrics) {
- printTopicsCountStats(stream, localNamespaceTopicCount, cluster);
- }
}
private static Optional<CompactorMXBean> getCompactorMXBean(PulsarService pulsar) {
Compactor compactor = pulsar.getNullableCompactor();
- return Optional.ofNullable(compactor).map(Compactor::getStats);
+ return Optional.ofNullable(compactor).map(c -> c.getStats());
}
private static void getTopicStats(Topic topic, TopicStats stats, boolean includeConsumerMetrics,
- boolean includeProducerMetrics, boolean getPreciseBacklog,
- boolean subscriptionBacklogSize, Optional<CompactorMXBean> compactorMXBean) {
+ boolean includeProducerMetrics, boolean getPreciseBacklog, boolean subscriptionBacklogSize,
+ Optional<CompactorMXBean> compactorMXBean) {
stats.reset();
if (topic instanceof PersistentTopic) {
@@ -272,176 +267,161 @@ public class NamespaceStatsAggregator {
});
}
- private static void printDefaultBrokerStats(PrometheusMetricStreams stream, String cluster) {
+ private static void printDefaultBrokerStats(SimpleTextOutputStream stream, String cluster) {
// Print metrics with 0 values. This is necessary to have the available brokers being
// reported in the brokers dashboard even if they don't have any topic or traffic
- writeMetric(stream, "pulsar_topics_count", 0, cluster);
- writeMetric(stream, "pulsar_subscriptions_count", 0, cluster);
- writeMetric(stream, "pulsar_producers_count", 0, cluster);
- writeMetric(stream, "pulsar_consumers_count", 0, cluster);
- writeMetric(stream, "pulsar_rate_in", 0, cluster);
- writeMetric(stream, "pulsar_rate_out", 0, cluster);
- writeMetric(stream, "pulsar_throughput_in", 0, cluster);
- writeMetric(stream, "pulsar_throughput_out", 0, cluster);
- writeMetric(stream, "pulsar_storage_size", 0, cluster);
- writeMetric(stream, "pulsar_storage_logical_size", 0, cluster);
- writeMetric(stream, "pulsar_storage_write_rate", 0, cluster);
- writeMetric(stream, "pulsar_storage_read_rate", 0, cluster);
- writeMetric(stream, "pulsar_msg_backlog", 0, cluster);
+ metric(stream, cluster, "pulsar_topics_count", 0);
+ metric(stream, cluster, "pulsar_subscriptions_count", 0);
+ metric(stream, cluster, "pulsar_producers_count", 0);
+ metric(stream, cluster, "pulsar_consumers_count", 0);
+ metric(stream, cluster, "pulsar_rate_in", 0);
+ metric(stream, cluster, "pulsar_rate_out", 0);
+ metric(stream, cluster, "pulsar_throughput_in", 0);
+ metric(stream, cluster, "pulsar_throughput_out", 0);
+ metric(stream, cluster, "pulsar_storage_size", 0);
+ metric(stream, cluster, "pulsar_storage_logical_size", 0);
+ metric(stream, cluster, "pulsar_storage_write_rate", 0);
+ metric(stream, cluster, "pulsar_storage_read_rate", 0);
+ metric(stream, cluster, "pulsar_msg_backlog", 0);
}
- private static void printTopicsCountStats(PrometheusMetricStreams stream, Map<String, Long> namespaceTopicsCount,
- String cluster) {
- namespaceTopicsCount.forEach(
- (ns, topicCount) -> writeMetric(stream, "pulsar_topics_count", topicCount, cluster, ns)
- );
+ private static void printTopicsCountStats(SimpleTextOutputStream stream, String cluster, String namespace,
+ LongAdder topicsCount) {
+ metric(stream, cluster, namespace, "pulsar_topics_count", topicsCount.sum());
}
- private static void printNamespaceStats(PrometheusMetricStreams stream, AggregatedNamespaceStats stats,
- String cluster, String namespace) {
- writeMetric(stream, "pulsar_topics_count", stats.topicsCount, cluster, namespace);
- writeMetric(stream, "pulsar_subscriptions_count", stats.subscriptionsCount, cluster,
- namespace);
- writeMetric(stream, "pulsar_producers_count", stats.producersCount, cluster, namespace);
- writeMetric(stream, "pulsar_consumers_count", stats.consumersCount, cluster, namespace);
-
- writeMetric(stream, "pulsar_rate_in", stats.rateIn, cluster, namespace);
- writeMetric(stream, "pulsar_rate_out", stats.rateOut, cluster, namespace);
- writeMetric(stream, "pulsar_throughput_in", stats.throughputIn, cluster, namespace);
- writeMetric(stream, "pulsar_throughput_out", stats.throughputOut, cluster, namespace);
- writeMetric(stream, "pulsar_consumer_msg_ack_rate", stats.messageAckRate, cluster, namespace);
-
- writeMetric(stream, "pulsar_in_bytes_total", stats.bytesInCounter, cluster, namespace);
- writeMetric(stream, "pulsar_in_messages_total", stats.msgInCounter, cluster, namespace);
- writeMetric(stream, "pulsar_out_bytes_total", stats.bytesOutCounter, cluster, namespace);
- writeMetric(stream, "pulsar_out_messages_total", stats.msgOutCounter, cluster, namespace);
-
- writeMetric(stream, "pulsar_storage_size", stats.managedLedgerStats.storageSize, cluster,
- namespace);
- writeMetric(stream, "pulsar_storage_logical_size",
- stats.managedLedgerStats.storageLogicalSize, cluster, namespace);
- writeMetric(stream, "pulsar_storage_backlog_size", stats.managedLedgerStats.backlogSize, cluster,
- namespace);
- writeMetric(stream, "pulsar_storage_offloaded_size",
- stats.managedLedgerStats.offloadedStorageUsed, cluster, namespace);
-
- writeMetric(stream, "pulsar_storage_write_rate", stats.managedLedgerStats.storageWriteRate,
- cluster, namespace);
- writeMetric(stream, "pulsar_storage_read_rate", stats.managedLedgerStats.storageReadRate,
- cluster, namespace);
-
- writeMetric(stream, "pulsar_subscription_delayed", stats.msgDelayed, cluster, namespace);
-
- writePulsarMsgBacklog(stream, stats.msgBacklog, cluster, namespace);
+ private static void printNamespaceStats(SimpleTextOutputStream stream, String cluster, String namespace,
+ AggregatedNamespaceStats stats) {
+ metric(stream, cluster, namespace, "pulsar_topics_count", stats.topicsCount);
+ metric(stream, cluster, namespace, "pulsar_subscriptions_count", stats.subscriptionsCount);
+ metric(stream, cluster, namespace, "pulsar_producers_count", stats.producersCount);
+ metric(stream, cluster, namespace, "pulsar_consumers_count", stats.consumersCount);
+
+ metric(stream, cluster, namespace, "pulsar_rate_in", stats.rateIn);
+ metric(stream, cluster, namespace, "pulsar_rate_out", stats.rateOut);
+ metric(stream, cluster, namespace, "pulsar_throughput_in", stats.throughputIn);
+ metric(stream, cluster, namespace, "pulsar_throughput_out", stats.throughputOut);
+ metric(stream, cluster, namespace, "pulsar_consumer_msg_ack_rate", stats.messageAckRate);
+
+ metric(stream, cluster, namespace, "pulsar_in_bytes_total", stats.bytesInCounter);
+ metric(stream, cluster, namespace, "pulsar_in_messages_total", stats.msgInCounter);
+ metric(stream, cluster, namespace, "pulsar_out_bytes_total", stats.bytesOutCounter);
+ metric(stream, cluster, namespace, "pulsar_out_messages_total", stats.msgOutCounter);
+
+ metric(stream, cluster, namespace, "pulsar_storage_size", stats.managedLedgerStats.storageSize);
+ metric(stream, cluster, namespace, "pulsar_storage_logical_size", stats.managedLedgerStats.storageLogicalSize);
+ metric(stream, cluster, namespace, "pulsar_storage_backlog_size", stats.managedLedgerStats.backlogSize);
+ metric(stream, cluster, namespace, "pulsar_storage_offloaded_size",
+ stats.managedLedgerStats.offloadedStorageUsed);
+
+ metric(stream, cluster, namespace, "pulsar_storage_write_rate", stats.managedLedgerStats.storageWriteRate);
+ metric(stream, cluster, namespace, "pulsar_storage_read_rate", stats.managedLedgerStats.storageReadRate);
+
+ metric(stream, cluster, namespace, "pulsar_subscription_delayed", stats.msgDelayed);
+
+ metricWithRemoteCluster(stream, cluster, namespace, "pulsar_msg_backlog", "local", stats.msgBacklog);
stats.managedLedgerStats.storageWriteLatencyBuckets.refresh();
long[] latencyBuckets = stats.managedLedgerStats.storageWriteLatencyBuckets.getBuckets();
- writeMetric(stream, "pulsar_storage_write_latency_le_0_5", latencyBuckets[0], cluster, namespace);
- writeMetric(stream, "pulsar_storage_write_latency_le_1", latencyBuckets[1], cluster, namespace);
- writeMetric(stream, "pulsar_storage_write_latency_le_5", latencyBuckets[2], cluster, namespace);
- writeMetric(stream, "pulsar_storage_write_latency_le_10", latencyBuckets[3], cluster, namespace);
- writeMetric(stream, "pulsar_storage_write_latency_le_20", latencyBuckets[4], cluster, namespace);
- writeMetric(stream, "pulsar_storage_write_latency_le_50", latencyBuckets[5], cluster, namespace);
- writeMetric(stream, "pulsar_storage_write_latency_le_100", latencyBuckets[6], cluster, namespace);
- writeMetric(stream, "pulsar_storage_write_latency_le_200", latencyBuckets[7], cluster, namespace);
- writeMetric(stream, "pulsar_storage_write_latency_le_1000", latencyBuckets[8], cluster, namespace);
- writeMetric(stream, "pulsar_storage_write_latency_overflow", latencyBuckets[9], cluster, namespace);
- writeMetric(stream, "pulsar_storage_write_latency_count",
- stats.managedLedgerStats.storageWriteLatencyBuckets.getCount(), cluster, namespace);
- writeMetric(stream, "pulsar_storage_write_latency_sum",
- stats.managedLedgerStats.storageWriteLatencyBuckets.getSum(), cluster, namespace);
+ metric(stream, cluster, namespace, "pulsar_storage_write_latency_le_0_5", latencyBuckets[0]);
+ metric(stream, cluster, namespace, "pulsar_storage_write_latency_le_1", latencyBuckets[1]);
+ metric(stream, cluster, namespace, "pulsar_storage_write_latency_le_5", latencyBuckets[2]);
+ metric(stream, cluster, namespace, "pulsar_storage_write_latency_le_10", latencyBuckets[3]);
+ metric(stream, cluster, namespace, "pulsar_storage_write_latency_le_20", latencyBuckets[4]);
+ metric(stream, cluster, namespace, "pulsar_storage_write_latency_le_50", latencyBuckets[5]);
+ metric(stream, cluster, namespace, "pulsar_storage_write_latency_le_100", latencyBuckets[6]);
+ metric(stream, cluster, namespace, "pulsar_storage_write_latency_le_200", latencyBuckets[7]);
+ metric(stream, cluster, namespace, "pulsar_storage_write_latency_le_1000", latencyBuckets[8]);
+ metric(stream, cluster, namespace, "pulsar_storage_write_latency_overflow", latencyBuckets[9]);
+ metric(stream, cluster, namespace, "pulsar_storage_write_latency_count",
+ stats.managedLedgerStats.storageWriteLatencyBuckets.getCount());
+ metric(stream, cluster, namespace, "pulsar_storage_write_latency_sum",
+ stats.managedLedgerStats.storageWriteLatencyBuckets.getSum());
stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.refresh();
- long[] ledgerWriteLatencyBuckets = stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets();
- writeMetric(stream, "pulsar_storage_ledger_write_latency_le_0_5", ledgerWriteLatencyBuckets[0],
- cluster, namespace);
- writeMetric(stream, "pulsar_storage_ledger_write_latency_le_1", ledgerWriteLatencyBuckets[1],
- cluster, namespace);
- writeMetric(stream, "pulsar_storage_ledger_write_latency_le_5", ledgerWriteLatencyBuckets[2],
- cluster, namespace);
- writeMetric(stream, "pulsar_storage_ledger_write_latency_le_10", ledgerWriteLatencyBuckets[3],
- cluster, namespace);
- writeMetric(stream, "pulsar_storage_ledger_write_latency_le_20", ledgerWriteLatencyBuckets[4],
- cluster, namespace);
- writeMetric(stream, "pulsar_storage_ledger_write_latency_le_50", ledgerWriteLatencyBuckets[5],
- cluster, namespace);
- writeMetric(stream, "pulsar_storage_ledger_write_latency_le_100", ledgerWriteLatencyBuckets[6],
- cluster, namespace);
- writeMetric(stream, "pulsar_storage_ledger_write_latency_le_200", ledgerWriteLatencyBuckets[7],
- cluster, namespace);
- writeMetric(stream, "pulsar_storage_ledger_write_latency_le_1000", ledgerWriteLatencyBuckets[8],
- cluster, namespace);
- writeMetric(stream, "pulsar_storage_ledger_write_latency_overflow", ledgerWriteLatencyBuckets[9],
- cluster, namespace);
- writeMetric(stream, "pulsar_storage_ledger_write_latency_count",
- stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getCount(), cluster, namespace);
- writeMetric(stream, "pulsar_storage_ledger_write_latency_sum",
- stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getSum(), cluster, namespace);
+ long[] ledgerWritelatencyBuckets = stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets();
+ metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_le_0_5", ledgerWritelatencyBuckets[0]);
+ metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_le_1", ledgerWritelatencyBuckets[1]);
+ metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_le_5", ledgerWritelatencyBuckets[2]);
+ metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_le_10", ledgerWritelatencyBuckets[3]);
+ metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_le_20", ledgerWritelatencyBuckets[4]);
+ metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_le_50", ledgerWritelatencyBuckets[5]);
+ metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_le_100", ledgerWritelatencyBuckets[6]);
+ metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_le_200", ledgerWritelatencyBuckets[7]);
+ metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_le_1000", ledgerWritelatencyBuckets[8]);
+ metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_overflow",
+ ledgerWritelatencyBuckets[9]);
+ metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_count",
+ stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getCount());
+ metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_sum",
+ stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getSum());
stats.managedLedgerStats.entrySizeBuckets.refresh();
long[] entrySizeBuckets = stats.managedLedgerStats.entrySizeBuckets.getBuckets();
- writeMetric(stream, "pulsar_entry_size_le_128", entrySizeBuckets[0], cluster, namespace);
- writeMetric(stream, "pulsar_entry_size_le_512", entrySizeBuckets[1], cluster, namespace);
- writeMetric(stream, "pulsar_entry_size_le_1_kb", entrySizeBuckets[2], cluster, namespace);
- writeMetric(stream, "pulsar_entry_size_le_2_kb", entrySizeBuckets[3], cluster, namespace);
- writeMetric(stream, "pulsar_entry_size_le_4_kb", entrySizeBuckets[4], cluster, namespace);
- writeMetric(stream, "pulsar_entry_size_le_16_kb", entrySizeBuckets[5], cluster, namespace);
- writeMetric(stream, "pulsar_entry_size_le_100_kb", entrySizeBuckets[6], cluster, namespace);
- writeMetric(stream, "pulsar_entry_size_le_1_mb", entrySizeBuckets[7], cluster, namespace);
- writeMetric(stream, "pulsar_entry_size_le_overflow", entrySizeBuckets[8], cluster, namespace);
- writeMetric(stream, "pulsar_entry_size_count", stats.managedLedgerStats.entrySizeBuckets.getCount(),
- cluster, namespace);
- writeMetric(stream, "pulsar_entry_size_sum", stats.managedLedgerStats.entrySizeBuckets.getSum(),
- cluster, namespace);
-
- writeReplicationStat(stream, "pulsar_replication_rate_in", stats,
- replStats -> replStats.msgRateIn, cluster, namespace);
- writeReplicationStat(stream, "pulsar_replication_rate_out", stats,
- replStats -> replStats.msgRateOut, cluster, namespace);
- writeReplicationStat(stream, "pulsar_replication_throughput_in", stats,
- replStats -> replStats.msgThroughputIn, cluster, namespace);
- writeReplicationStat(stream, "pulsar_replication_throughput_out", stats,
- replStats -> replStats.msgThroughputOut, cluster, namespace);
- writeReplicationStat(stream, "pulsar_replication_backlog", stats,
- replStats -> replStats.replicationBacklog, cluster, namespace);
- writeReplicationStat(stream, "pulsar_replication_connected_count", stats,
- replStats -> replStats.connectedCount, cluster, namespace);
- writeReplicationStat(stream, "pulsar_replication_rate_expired", stats,
- replStats -> replStats.msgRateExpired, cluster, namespace);
- writeReplicationStat(stream, "pulsar_replication_delay_in_seconds", stats,
- replStats -> replStats.replicationDelayInSeconds, cluster, namespace);
+ metric(stream, cluster, namespace, "pulsar_entry_size_le_128", entrySizeBuckets[0]);
+ metric(stream, cluster, namespace, "pulsar_entry_size_le_512", entrySizeBuckets[1]);
+ metric(stream, cluster, namespace, "pulsar_entry_size_le_1_kb", entrySizeBuckets[2]);
+ metric(stream, cluster, namespace, "pulsar_entry_size_le_2_kb", entrySizeBuckets[3]);
+ metric(stream, cluster, namespace, "pulsar_entry_size_le_4_kb", entrySizeBuckets[4]);
+ metric(stream, cluster, namespace, "pulsar_entry_size_le_16_kb", entrySizeBuckets[5]);
+ metric(stream, cluster, namespace, "pulsar_entry_size_le_100_kb", entrySizeBuckets[6]);
+ metric(stream, cluster, namespace, "pulsar_entry_size_le_1_mb", entrySizeBuckets[7]);
+ metric(stream, cluster, namespace, "pulsar_entry_size_le_overflow", entrySizeBuckets[8]);
+ metric(stream, cluster, namespace, "pulsar_entry_size_count",
+ stats.managedLedgerStats.entrySizeBuckets.getCount());
+ metric(stream, cluster, namespace, "pulsar_entry_size_sum",
+ stats.managedLedgerStats.entrySizeBuckets.getSum());
+
+ if (!stats.replicationStats.isEmpty()) {
+ stats.replicationStats.forEach((remoteCluster, replStats) -> {
+ metricWithRemoteCluster(stream, cluster, namespace, "pulsar_replication_rate_in", remoteCluster,
+ replStats.msgRateIn);
+ metricWithRemoteCluster(stream, cluster, namespace, "pulsar_replication_rate_out", remoteCluster,
+ replStats.msgRateOut);
+ metricWithRemoteCluster(stream, cluster, namespace, "pulsar_replication_throughput_in", remoteCluster,
+ replStats.msgThroughputIn);
+ metricWithRemoteCluster(stream, cluster, namespace, "pulsar_replication_throughput_out", remoteCluster,
+ replStats.msgThroughputOut);
+ metricWithRemoteCluster(stream, cluster, namespace, "pulsar_replication_backlog", remoteCluster,
+ replStats.replicationBacklog);
+ metricWithRemoteCluster(stream, cluster, namespace, "pulsar_replication_connected_count", remoteCluster,
+ replStats.connectedCount);
+ metricWithRemoteCluster(stream, cluster, namespace, "pulsar_replication_rate_expired", remoteCluster,
+ replStats.msgRateExpired);
+ metricWithRemoteCluster(stream, cluster, namespace, "pulsar_replication_delay_in_seconds",
+ remoteCluster, replStats.replicationDelayInSeconds);
+ });
+ }
}
- private static void writePulsarMsgBacklog(PrometheusMetricStreams stream, Number value,
- String cluster, String namespace) {
- stream.writeSample("pulsar_msg_backlog", value, "cluster", cluster, "namespace", namespace,
- "remote_cluster",
- "local");
+ private static void metric(SimpleTextOutputStream stream, String cluster, String name,
+ long value) {
+ TopicStats.metricType(stream, name);
+ stream.write(name)
+ .write("{cluster=\"").write(cluster).write("\"} ")
+ .write(value).write(' ').write(System.currentTimeMillis())
+ .write('\n');
}
- private static void writeMetric(PrometheusMetricStreams stream, String metricName, Number value,
- String cluster) {
- stream.writeSample(metricName, value, "cluster", cluster);
+ private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String name,
+ long value) {
+ TopicStats.metricType(stream, name);
+ stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace).write("\"} ");
+ stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
}
- private static void writeMetric(PrometheusMetricStreams stream, String metricName, Number value, String cluster,
- String namespace) {
- stream.writeSample(metricName, value, "cluster", cluster, "namespace", namespace);
+ private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String name,
+ double value) {
+ TopicStats.metricType(stream, name);
+ stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace).write("\"} ");
+ stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
}
- private static void writeReplicationStat(PrometheusMetricStreams stream, String metricName,
- AggregatedNamespaceStats namespaceStats,
- Function<AggregatedReplicationStats, Number> sampleValueFunction,
- String cluster, String namespace) {
- if (!namespaceStats.replicationStats.isEmpty()) {
- namespaceStats.replicationStats.forEach((remoteCluster, replStats) ->
- stream.writeSample(metricName, sampleValueFunction.apply(replStats),
- "cluster", cluster,
- "namespace", namespace,
- "remote_cluster", remoteCluster)
- );
- }
+ private static void metricWithRemoteCluster(SimpleTextOutputStream stream, String cluster, String namespace,
+ String name, String remoteCluster, double value) {
+ TopicStats.metricType(stream, name);
+ stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace);
+ stream.write("\",remote_cluster=\"").write(remoteCluster).write("\"} ");
+ stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
}
-
-
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricStreams.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricStreams.java
deleted file mode 100644
index 6b6b972c175..00000000000
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricStreams.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker.stats.prometheus;
-
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
-import org.apache.pulsar.common.util.SimpleTextOutputStream;
-
-/**
- * Helper class to ensure that metrics of the same name are grouped together under the same TYPE header when written.
- * Those are the requirements of the
- * <a href="https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md#grouping-and-sorting">Prometheus Exposition Format</a>.
- */
-public class PrometheusMetricStreams {
- private final Map<String, SimpleTextOutputStream> metricStreamMap = new HashMap<>();
-
- /**
- * Write the given metric and sample value to the stream. Will write #TYPE header if metric not seen before.
- * @param metricName name of the metric.
- * @param value value of the sample
- * @param labelsAndValuesArray varargs of label and label value
- */
- void writeSample(String metricName, Number value, String... labelsAndValuesArray) {
- SimpleTextOutputStream stream = initGaugeType(metricName);
- stream.write(metricName).write('{');
- for (int i = 0; i < labelsAndValuesArray.length; i += 2) {
- stream.write(labelsAndValuesArray[i]).write("=\"").write(labelsAndValuesArray[i + 1]).write('\"');
- if (i + 2 != labelsAndValuesArray.length) {
- stream.write(',');
- }
- }
- stream.write("} ").write(value).write(' ').write(System.currentTimeMillis()).write('\n');
- }
-
- /**
- * Flush all the stored metrics to the supplied stream.
- * @param stream the stream to write to.
- */
- void flushAllToStream(SimpleTextOutputStream stream) {
- metricStreamMap.values().forEach(s -> stream.write(s.getBuffer()));
- }
-
- /**
- * Release all the streams to clean up resources.
- */
- void releaseAll() {
- metricStreamMap.values().forEach(s -> s.getBuffer().release());
- metricStreamMap.clear();
- }
-
- private SimpleTextOutputStream initGaugeType(String metricName) {
- return metricStreamMap.computeIfAbsent(metricName, s -> {
- SimpleTextOutputStream stream = new SimpleTextOutputStream(PulsarByteBufAllocator.DEFAULT.directBuffer());
- stream.write("# TYPE ").write(metricName).write(" gauge\n");
- return stream;
- });
- }
-}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
index a993d1edf3a..cd6afd1535d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
@@ -52,8 +52,7 @@ import org.apache.pulsar.common.util.SimpleTextOutputStream;
/**
* Generate metrics aggregated at the namespace level and optionally at a topic level and formats them out
* in a text format suitable to be consumed by Prometheus.
- * Format specification can be found at <a
- * href="https://prometheus.io/docs/instrumenting/exposition_formats/">Exposition Formats</a>
+ * Format specification can be found at {@link https://prometheus.io/docs/instrumenting/exposition_formats/}
*/
public class PrometheusMetricsGenerator {
@@ -87,44 +86,38 @@ public class PrometheusMetricsGenerator {
}
public static void generate(PulsarService pulsar, boolean includeTopicMetrics, boolean includeConsumerMetrics,
- boolean includeProducerMetrics, OutputStream out) throws IOException {
+ boolean includeProducerMetrics, OutputStream out) throws IOException {
generate(pulsar, includeTopicMetrics, includeConsumerMetrics, includeProducerMetrics, false, out, null);
}
public static void generate(PulsarService pulsar, boolean includeTopicMetrics, boolean includeConsumerMetrics,
- boolean includeProducerMetrics, boolean splitTopicAndPartitionIndexLabel,
- OutputStream out) throws IOException {
+ boolean includeProducerMetrics, boolean splitTopicAndPartitionIndexLabel,
+ OutputStream out) throws IOException {
generate(pulsar, includeTopicMetrics, includeConsumerMetrics, includeProducerMetrics,
splitTopicAndPartitionIndexLabel, out, null);
}
public static void generate(PulsarService pulsar, boolean includeTopicMetrics, boolean includeConsumerMetrics,
- boolean includeProducerMetrics, boolean splitTopicAndPartitionIndexLabel,
- OutputStream out,
- List<PrometheusRawMetricsProvider> metricsProviders)
- throws IOException {
+ boolean includeProducerMetrics, boolean splitTopicAndPartitionIndexLabel, OutputStream out,
+ List<PrometheusRawMetricsProvider> metricsProviders)
+ throws IOException {
ByteBuf buf = ByteBufAllocator.DEFAULT.heapBuffer();
- boolean exceptionHappens = false;
- //Used in namespace/topic and transaction aggregators as share metric names
- PrometheusMetricStreams metricStreams = new PrometheusMetricStreams();
try {
SimpleTextOutputStream stream = new SimpleTextOutputStream(buf);
generateSystemMetrics(stream, pulsar.getConfiguration().getClusterName());
NamespaceStatsAggregator.generate(pulsar, includeTopicMetrics, includeConsumerMetrics,
- includeProducerMetrics, splitTopicAndPartitionIndexLabel, metricStreams);
+ includeProducerMetrics, splitTopicAndPartitionIndexLabel, stream);
if (pulsar.getWorkerServiceOpt().isPresent()) {
pulsar.getWorkerService().generateFunctionsStats(stream);
}
if (pulsar.getConfiguration().isTransactionCoordinatorEnabled()) {
- TransactionAggregator.generate(pulsar, metricStreams, includeTopicMetrics);
+ TransactionAggregator.generate(pulsar, stream, includeTopicMetrics);
}
- metricStreams.flushAllToStream(stream);
-
generateBrokerBasicMetrics(pulsar, stream);
generateManagedLedgerBookieClientMetrics(pulsar, stream);
@@ -136,12 +129,7 @@ public class PrometheusMetricsGenerator {
}
out.write(buf.array(), buf.arrayOffset(), buf.readableBytes());
} finally {
- //release all the metrics buffers
- metricStreams.releaseAll();
- //if exception happens, release buffer
- if (exceptionHappens) {
- buf.release();
- }
+ buf.release();
}
}
@@ -154,17 +142,17 @@ public class PrometheusMetricsGenerator {
if (pulsar.getConfiguration().isExposeManagedLedgerMetricsInPrometheus()) {
// generate managedLedger metrics
parseMetricsToPrometheusMetrics(new ManagedLedgerMetrics(pulsar).generate(),
- clusterName, Collector.Type.GAUGE, stream);
+ clusterName, Collector.Type.GAUGE, stream);
}
if (pulsar.getConfiguration().isExposeManagedCursorMetricsInPrometheus()) {
// generate managedCursor metrics
parseMetricsToPrometheusMetrics(new ManagedCursorMetrics(pulsar).generate(),
- clusterName, Collector.Type.GAUGE, stream);
+ clusterName, Collector.Type.GAUGE, stream);
}
parseMetricsToPrometheusMetrics(Collections.singletonList(pulsar.getBrokerService()
- .getPulsarStats().getBrokerOperabilityMetrics().generateConnectionMetrics()),
+ .getPulsarStats().getBrokerOperabilityMetrics().generateConnectionMetrics()),
clusterName, Collector.Type.GAUGE, stream);
// generate loadBalance metrics
@@ -279,17 +267,17 @@ public class PrometheusMetricsGenerator {
static String getTypeStr(Collector.Type type) {
switch (type) {
- case COUNTER:
- return "counter";
- case GAUGE:
- return "gauge";
- case SUMMARY:
- return "summary";
- case HISTOGRAM:
- return "histogram";
- case UNTYPED:
- default:
- return "untyped";
+ case COUNTER:
+ return "counter";
+ case GAUGE:
+ return "gauge";
+ case SUMMARY :
+ return "summary";
+ case HISTOGRAM:
+ return "histogram";
+ case UNTYPED:
+ default:
+ return "untyped";
}
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
index e91521aff55..e6e5883847d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
@@ -23,12 +23,12 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.apache.bookkeeper.mledger.util.StatsBuckets;
-import org.apache.commons.lang3.ArrayUtils;
-import org.apache.pulsar.broker.service.Consumer;
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
import org.apache.pulsar.compaction.CompactionRecord;
import org.apache.pulsar.compaction.CompactorMXBean;
class TopicStats {
+
int subscriptionsCount;
int producersCount;
int consumersCount;
@@ -43,6 +43,7 @@ class TopicStats {
double averageMsgSize;
public long msgBacklog;
+
long publishRateLimitedTimes;
long backlogQuotaLimit;
@@ -54,6 +55,9 @@ class TopicStats {
Map<String, AggregatedSubscriptionStats> subscriptionStats = new HashMap<>();
Map<String, AggregatedProducerStats> producerStats = new HashMap<>();
+ // Used for tracking duplicate TYPE definitions
+ static Map<String, String> metricWithTypeDefinition = new HashMap<>();
+
// For compaction
long compactionRemovedEventCount;
long compactionSucceedCount;
@@ -99,340 +103,378 @@ class TopicStats {
compactionLatencyBuckets.reset();
}
- public static void printTopicStats(PrometheusMetricStreams stream, TopicStats stats,
- Optional<CompactorMXBean> compactorMXBean, String cluster, String namespace,
- String topic, boolean splitTopicAndPartitionIndexLabel) {
- writeMetric(stream, "pulsar_subscriptions_count", stats.subscriptionsCount,
- cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
- writeMetric(stream, "pulsar_producers_count", stats.producersCount,
- cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
- writeMetric(stream, "pulsar_consumers_count", stats.consumersCount,
- cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
-
- writeMetric(stream, "pulsar_rate_in", stats.rateIn,
- cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
- writeMetric(stream, "pulsar_rate_out", stats.rateOut,
- cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
- writeMetric(stream, "pulsar_throughput_in", stats.throughputIn,
- cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
- writeMetric(stream, "pulsar_throughput_out", stats.throughputOut,
- cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
- writeMetric(stream, "pulsar_average_msg_size", stats.averageMsgSize,
- cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
-
- writeMetric(stream, "pulsar_storage_size", stats.managedLedgerStats.storageSize,
- cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
- writeMetric(stream, "pulsar_storage_logical_size",
- stats.managedLedgerStats.storageLogicalSize, cluster, namespace, topic,
- splitTopicAndPartitionIndexLabel);
- writeMetric(stream, "pulsar_msg_backlog", stats.msgBacklog,
- cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
- writeMetric(stream, "pulsar_storage_write_rate", stats.managedLedgerStats.storageWriteRate,
- cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
- writeMetric(stream, "pulsar_storage_read_rate", stats.managedLedgerStats.storageReadRate,
- cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
- writeMetric(stream, "pulsar_storage_backlog_size", stats.managedLedgerStats.backlogSize,
- cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
- writeMetric(stream, "pulsar_publish_rate_limit_times", stats.publishRateLimitedTimes,
- cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
- writeMetric(stream, "pulsar_storage_offloaded_size", stats.managedLedgerStats
- .offloadedStorageUsed, cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
- writeMetric(stream, "pulsar_storage_backlog_quota_limit", stats.backlogQuotaLimit,
- cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
- writeMetric(stream, "pulsar_storage_backlog_quota_limit_time", stats.backlogQuotaLimitTime,
- cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+ static void resetTypes() {
+ metricWithTypeDefinition.clear();
+ }
+
+ static void printTopicStats(SimpleTextOutputStream stream, String cluster, String namespace, String topic,
+ TopicStats stats, Optional<CompactorMXBean> compactorMXBean,
+ boolean splitTopicAndPartitionIndexLabel) {
+ metric(stream, cluster, namespace, topic, "pulsar_subscriptions_count", stats.subscriptionsCount,
+ splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_producers_count", stats.producersCount,
+ splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_consumers_count", stats.consumersCount,
+ splitTopicAndPartitionIndexLabel);
+
+ metric(stream, cluster, namespace, topic, "pulsar_rate_in", stats.rateIn,
+ splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_rate_out", stats.rateOut,
+ splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_throughput_in", stats.throughputIn,
+ splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_throughput_out", stats.throughputOut,
+ splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_average_msg_size", stats.averageMsgSize,
+ splitTopicAndPartitionIndexLabel);
+
+ metric(stream, cluster, namespace, topic, "pulsar_storage_size", stats.managedLedgerStats.storageSize,
+ splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_storage_logical_size",
+ stats.managedLedgerStats.storageLogicalSize, splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_msg_backlog", stats.msgBacklog,
+ splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_storage_write_rate",
+ stats.managedLedgerStats.storageWriteRate, splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_storage_read_rate", stats.managedLedgerStats.storageReadRate,
+ splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_storage_backlog_size",
+ stats.managedLedgerStats.backlogSize, splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_publish_rate_limit_times", stats.publishRateLimitedTimes,
+ splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_storage_offloaded_size", stats.managedLedgerStats
+ .offloadedStorageUsed, splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_storage_backlog_quota_limit", stats.backlogQuotaLimit,
+ splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_storage_backlog_quota_limit_time",
+ stats.backlogQuotaLimitTime, splitTopicAndPartitionIndexLabel);
long[] latencyBuckets = stats.managedLedgerStats.storageWriteLatencyBuckets.getBuckets();
- writeMetric(stream, "pulsar_storage_write_latency_le_0_5",
- latencyBuckets[0], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
- writeMetric(stream, "pulsar_storage_write_latency_le_1",
- latencyBuckets[1], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
- writeMetric(stream, "pulsar_storage_write_latency_le_5",
- latencyBuckets[2], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
- writeMetric(stream, "pulsar_storage_write_latency_le_10",
- latencyBuckets[3], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
- writeMetric(stream, "pulsar_storage_write_latency_le_20",
- latencyBuckets[4], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
- writeMetric(stream, "pulsar_storage_write_latency_le_50",
- latencyBuckets[5], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
- writeMetric(stream, "pulsar_storage_write_latency_le_100",
- latencyBuckets[6], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
- writeMetric(stream, "pulsar_storage_write_latency_le_200",
- latencyBuckets[7], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
- writeMetric(stream, "pulsar_storage_write_latency_le_1000",
- latencyBuckets[8], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
- writeMetric(stream, "pulsar_storage_write_latency_overflow",
- latencyBuckets[9], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
- writeMetric(stream, "pulsar_storage_write_latency_count",
- stats.managedLedgerStats.storageWriteLatencyBuckets.getCount(),
- cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
- writeMetric(stream, "pulsar_storage_write_latency_sum",
- stats.managedLedgerStats.storageWriteLatencyBuckets.getSum(), cluster, namespace, topic,
+ metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_0_5", latencyBuckets[0],
+ splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_1", latencyBuckets[1],
splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_5", latencyBuckets[2],
+ splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_10", latencyBuckets[3],
+ splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_20", latencyBuckets[4],
+ splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_50", latencyBuckets[5],
+ splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_100", latencyBuckets[6],
+ splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_200", latencyBuckets[7],
+ splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_1000", latencyBuckets[8],
+ splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_overflow", latencyBuckets[9],
+ splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_count",
+ stats.managedLedgerStats.storageWriteLatencyBuckets.getCount(), splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_sum",
+ stats.managedLedgerStats.storageWriteLatencyBuckets.getSum(), splitTopicAndPartitionIndexLabel);
long[] ledgerWriteLatencyBuckets = stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets();
- writeMetric(stream, "pulsar_storage_ledger_write_latency_le_0_5",
- ledgerWriteLatencyBuckets[0], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
- writeMetric(stream, "pulsar_storage_ledger_write_latency_le_1",
- ledgerWriteLatencyBuckets[1], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
- writeMetric(stream, "pulsar_storage_ledger_write_latency_le_5",
- ledgerWriteLatencyBuckets[2], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
- writeMetric(stream, "pulsar_storage_ledger_write_latency_le_10",
- ledgerWriteLatencyBuckets[3], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
- writeMetric(stream, "pulsar_storage_ledger_write_latency_le_20",
- ledgerWriteLatencyBuckets[4], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
- writeMetric(stream, "pulsar_storage_ledger_write_latency_le_50",
- ledgerWriteLatencyBuckets[5], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
- writeMetric(stream, "pulsar_storage_ledger_write_latency_le_100",
- ledgerWriteLatencyBuckets[6], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
- writeMetric(stream, "pulsar_storage_ledger_write_latency_le_200",
- ledgerWriteLatencyBuckets[7], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
- writeMetric(stream, "pulsar_storage_ledger_write_latency_le_1000",
- ledgerWriteLatencyBuckets[8], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
- writeMetric(stream, "pulsar_storage_ledger_write_latency_overflow",
- ledgerWriteLatencyBuckets[9], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
- writeMetric(stream, "pulsar_storage_ledger_write_latency_count",
+ metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_0_5",
+ ledgerWriteLatencyBuckets[0], splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_1",
+ ledgerWriteLatencyBuckets[1], splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_5",
+ ledgerWriteLatencyBuckets[2], splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_10",
+ ledgerWriteLatencyBuckets[3], splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_20",
+ ledgerWriteLatencyBuckets[4], splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_50",
+ ledgerWriteLatencyBuckets[5], splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_100",
+ ledgerWriteLatencyBuckets[6], splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_200",
+ ledgerWriteLatencyBuckets[7], splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_1000",
+ ledgerWriteLatencyBuckets[8], splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_overflow",
+ ledgerWriteLatencyBuckets[9], splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_count",
stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getCount(),
- cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
- writeMetric(stream, "pulsar_storage_ledger_write_latency_sum",
+ splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_sum",
stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getSum(),
- cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+ splitTopicAndPartitionIndexLabel);
long[] entrySizeBuckets = stats.managedLedgerStats.entrySizeBuckets.getBuckets();
- writeMetric(stream, "pulsar_entry_size_le_128", entrySizeBuckets[0], cluster, namespace, topic,
+ metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_128", entrySizeBuckets[0],
splitTopicAndPartitionIndexLabel);
- writeMetric(stream, "pulsar_entry_size_le_512", entrySizeBuckets[1], cluster, namespace, topic,
+ metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_512", entrySizeBuckets[1],
splitTopicAndPartitionIndexLabel);
- writeMetric(stream, "pulsar_entry_size_le_1_kb", entrySizeBuckets[2], cluster, namespace, topic,
+ metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_1_kb", entrySizeBuckets[2],
splitTopicAndPartitionIndexLabel);
- writeMetric(stream, "pulsar_entry_size_le_2_kb", entrySizeBuckets[3], cluster, namespace, topic,
+ metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_2_kb", entrySizeBuckets[3],
splitTopicAndPartitionIndexLabel);
- writeMetric(stream, "pulsar_entry_size_le_4_kb", entrySizeBuckets[4], cluster, namespace, topic,
+ metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_4_kb", entrySizeBuckets[4],
splitTopicAndPartitionIndexLabel);
- writeMetric(stream, "pulsar_entry_size_le_16_kb", entrySizeBuckets[5], cluster, namespace, topic,
+ metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_16_kb", entrySizeBuckets[5],
splitTopicAndPartitionIndexLabel);
- writeMetric(stream, "pulsar_entry_size_le_100_kb", entrySizeBuckets[6], cluster, namespace, topic,
+ metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_100_kb", entrySizeBuckets[6],
splitTopicAndPartitionIndexLabel);
- writeMetric(stream, "pulsar_entry_size_le_1_mb", entrySizeBuckets[7], cluster, namespace, topic,
+ metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_1_mb", entrySizeBuckets[7],
splitTopicAndPartitionIndexLabel);
- writeMetric(stream, "pulsar_entry_size_le_overflow", entrySizeBuckets[8], cluster, namespace, topic,
+ metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_overflow", entrySizeBuckets[8],
splitTopicAndPartitionIndexLabel);
- writeMetric(stream, "pulsar_entry_size_count", stats.managedLedgerStats.entrySizeBuckets.getCount(),
- cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
- writeMetric(stream, "pulsar_entry_size_sum", stats.managedLedgerStats.entrySizeBuckets.getSum(),
- cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_entry_size_count",
+ stats.managedLedgerStats.entrySizeBuckets.getCount(), splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_entry_size_sum",
+ stats.managedLedgerStats.entrySizeBuckets.getSum(), splitTopicAndPartitionIndexLabel);
stats.producerStats.forEach((p, producerStats) -> {
- writeProducerMetric(stream, "pulsar_producer_msg_rate_in", producerStats.msgRateIn,
- cluster, namespace, topic, p, producerStats.producerId, splitTopicAndPartitionIndexLabel);
- writeProducerMetric(stream, "pulsar_producer_msg_throughput_in", producerStats.msgThroughputIn,
- cluster, namespace, topic, p, producerStats.producerId, splitTopicAndPartitionIndexLabel);
- writeProducerMetric(stream, "pulsar_producer_msg_average_Size", producerStats.averageMsgSize,
- cluster, namespace, topic, p, producerStats.producerId, splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, p, producerStats.producerId, "pulsar_producer_msg_rate_in",
+ producerStats.msgRateIn, splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, p, producerStats.producerId, "pulsar_producer_msg_throughput_in",
+ producerStats.msgThroughputIn, splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, p, producerStats.producerId, "pulsar_producer_msg_average_Size",
+ producerStats.averageMsgSize, splitTopicAndPartitionIndexLabel);
});
- stats.subscriptionStats.forEach((sub, subsStats) -> {
- writeSubscriptionMetric(stream, "pulsar_subscription_back_log", subsStats.msgBacklog,
- cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
- writeSubscriptionMetric(stream, "pulsar_subscription_back_log_no_delayed",
- subsStats.msgBacklogNoDelayed, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
- writeSubscriptionMetric(stream, "pulsar_subscription_delayed",
- subsStats.msgDelayed, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
- writeSubscriptionMetric(stream, "pulsar_subscription_msg_rate_redeliver",
- subsStats.msgRateRedeliver, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
- writeSubscriptionMetric(stream, "pulsar_subscription_unacked_messages",
- subsStats.unackedMessages, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
- writeSubscriptionMetric(stream, "pulsar_subscription_blocked_on_unacked_messages",
- subsStats.blockedSubscriptionOnUnackedMsgs ? 1 : 0, cluster, namespace, topic, sub,
- splitTopicAndPartitionIndexLabel);
- writeSubscriptionMetric(stream, "pulsar_subscription_msg_rate_out",
- subsStats.msgRateOut, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
- writeSubscriptionMetric(stream, "pulsar_subscription_msg_ack_rate",
- subsStats.messageAckRate, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
- writeSubscriptionMetric(stream, "pulsar_subscription_msg_throughput_out",
- subsStats.msgThroughputOut, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
- writeSubscriptionMetric(stream, "pulsar_out_bytes_total",
- subsStats.bytesOutCounter, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
- writeSubscriptionMetric(stream, "pulsar_out_messages_total",
- subsStats.msgOutCounter, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
- writeSubscriptionMetric(stream, "pulsar_subscription_last_expire_timestamp",
- subsStats.lastExpireTimestamp, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
- writeSubscriptionMetric(stream, "pulsar_subscription_last_acked_timestamp",
- subsStats.lastAckedTimestamp, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
- writeSubscriptionMetric(stream, "pulsar_subscription_last_consumed_flow_timestamp",
- subsStats.lastConsumedFlowTimestamp, cluster, namespace, topic, sub,
- splitTopicAndPartitionIndexLabel);
- writeSubscriptionMetric(stream, "pulsar_subscription_last_consumed_timestamp",
- subsStats.lastConsumedTimestamp, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
- writeSubscriptionMetric(stream, "pulsar_subscription_last_mark_delete_advanced_timestamp",
- subsStats.lastMarkDeleteAdvancedTimestamp, cluster, namespace, topic, sub,
- splitTopicAndPartitionIndexLabel);
- writeSubscriptionMetric(stream, "pulsar_subscription_msg_rate_expired",
- subsStats.msgRateExpired, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
- writeSubscriptionMetric(stream, "pulsar_subscription_total_msg_expired",
- subsStats.totalMsgExpired, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
-
+ stats.subscriptionStats.forEach((n, subsStats) -> {
+ metric(stream, cluster, namespace, topic, n, "pulsar_subscription_back_log",
+ subsStats.msgBacklog, splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, n, "pulsar_subscription_back_log_no_delayed",
+ subsStats.msgBacklogNoDelayed, splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, n, "pulsar_subscription_delayed",
+ subsStats.msgDelayed, splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_rate_redeliver",
+ subsStats.msgRateRedeliver, splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, n, "pulsar_subscription_unacked_messages",
+ subsStats.unackedMessages, splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, n, "pulsar_subscription_blocked_on_unacked_messages",
+ subsStats.blockedSubscriptionOnUnackedMsgs ? 1 : 0, splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_rate_out",
+ subsStats.msgRateOut, splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_ack_rate",
+ subsStats.messageAckRate, splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_throughput_out",
+ subsStats.msgThroughputOut, splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, n, "pulsar_out_bytes_total",
+ subsStats.bytesOutCounter, splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, n, "pulsar_out_messages_total",
+ subsStats.msgOutCounter, splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, n, "pulsar_subscription_last_expire_timestamp",
+ subsStats.lastExpireTimestamp, splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, n, "pulsar_subscription_last_acked_timestamp",
+ subsStats.lastAckedTimestamp, splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, n, "pulsar_subscription_last_consumed_flow_timestamp",
+ subsStats.lastConsumedFlowTimestamp, splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, n, "pulsar_subscription_last_consumed_timestamp",
+ subsStats.lastConsumedTimestamp, splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, n, "pulsar_subscription_last_mark_delete_advanced_timestamp",
+ subsStats.lastMarkDeleteAdvancedTimestamp, splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_rate_expired",
+ subsStats.msgRateExpired, splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, n, "pulsar_subscription_total_msg_expired",
+ subsStats.totalMsgExpired, splitTopicAndPartitionIndexLabel);
subsStats.consumerStat.forEach((c, consumerStats) -> {
- writeConsumerMetric(stream, "pulsar_consumer_msg_rate_redeliver", consumerStats.msgRateRedeliver,
- cluster, namespace, topic, sub, c, splitTopicAndPartitionIndexLabel);
- writeConsumerMetric(stream, "pulsar_consumer_unacked_messages", consumerStats.unackedMessages,
- cluster, namespace, topic, sub, c, splitTopicAndPartitionIndexLabel);
- writeConsumerMetric(stream, "pulsar_consumer_blocked_on_unacked_messages",
+ metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(),
+ "pulsar_consumer_msg_rate_redeliver", consumerStats.msgRateRedeliver,
+ splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(),
+ "pulsar_consumer_unacked_messages", consumerStats.unackedMessages,
+ splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(),
+ "pulsar_consumer_blocked_on_unacked_messages",
consumerStats.blockedSubscriptionOnUnackedMsgs ? 1 : 0,
- cluster, namespace, topic, sub, c, splitTopicAndPartitionIndexLabel);
- writeConsumerMetric(stream, "pulsar_consumer_msg_rate_out", consumerStats.msgRateOut,
- cluster, namespace, topic, sub, c, splitTopicAndPartitionIndexLabel);
-
- writeConsumerMetric(stream, "pulsar_consumer_msg_ack_rate", consumerStats.msgAckRate,
- cluster, namespace, topic, sub, c, splitTopicAndPartitionIndexLabel);
-
- writeConsumerMetric(stream, "pulsar_consumer_msg_throughput_out", consumerStats.msgThroughputOut,
- cluster, namespace, topic, sub, c, splitTopicAndPartitionIndexLabel);
- writeConsumerMetric(stream, "pulsar_consumer_available_permits", consumerStats.availablePermits,
- cluster, namespace, topic, sub, c, splitTopicAndPartitionIndexLabel);
- writeConsumerMetric(stream, "pulsar_out_bytes_total", consumerStats.bytesOutCounter,
- cluster, namespace, topic, sub, c, splitTopicAndPartitionIndexLabel);
- writeConsumerMetric(stream, "pulsar_out_messages_total", consumerStats.msgOutCounter,
- cluster, namespace, topic, sub, c, splitTopicAndPartitionIndexLabel);
+ splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(),
+ "pulsar_consumer_msg_rate_out", consumerStats.msgRateOut,
+ splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(),
+ "pulsar_consumer_msg_ack_rate", consumerStats.msgAckRate,
+ splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(),
+ "pulsar_consumer_msg_throughput_out", consumerStats.msgThroughputOut,
+ splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(),
+ "pulsar_consumer_available_permits", consumerStats.availablePermits,
+ splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(),
+ "pulsar_out_bytes_total", consumerStats.bytesOutCounter,
+ splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(),
+ "pulsar_out_messages_total", consumerStats.msgOutCounter,
+ splitTopicAndPartitionIndexLabel);
});
});
if (!stats.replicationStats.isEmpty()) {
stats.replicationStats.forEach((remoteCluster, replStats) -> {
- writeMetric(stream, "pulsar_replication_rate_in", replStats.msgRateIn,
- cluster, namespace, topic, remoteCluster, splitTopicAndPartitionIndexLabel);
- writeMetric(stream, "pulsar_replication_rate_out", replStats.msgRateOut,
- cluster, namespace, topic, remoteCluster, splitTopicAndPartitionIndexLabel);
- writeMetric(stream, "pulsar_replication_throughput_in", replStats.msgThroughputIn,
- cluster, namespace, topic, remoteCluster, splitTopicAndPartitionIndexLabel);
- writeMetric(stream, "pulsar_replication_throughput_out", replStats.msgThroughputOut,
- cluster, namespace, topic, remoteCluster, splitTopicAndPartitionIndexLabel);
- writeMetric(stream, "pulsar_replication_backlog", replStats.replicationBacklog,
- cluster, namespace, topic, remoteCluster, splitTopicAndPartitionIndexLabel);
- writeMetric(stream, "pulsar_replication_connected_count", replStats.connectedCount,
- cluster, namespace, topic, remoteCluster, splitTopicAndPartitionIndexLabel);
- writeMetric(stream, "pulsar_replication_rate_expired", replStats.msgRateExpired,
- cluster, namespace, topic, remoteCluster, splitTopicAndPartitionIndexLabel);
- writeMetric(stream, "pulsar_replication_delay_in_seconds", replStats.replicationDelayInSeconds,
- cluster, namespace, topic, remoteCluster, splitTopicAndPartitionIndexLabel);
+ metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_rate_in", remoteCluster,
+ replStats.msgRateIn, splitTopicAndPartitionIndexLabel);
+ metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_rate_out", remoteCluster,
+ replStats.msgRateOut, splitTopicAndPartitionIndexLabel);
+ metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_throughput_in",
+ remoteCluster, replStats.msgThroughputIn, splitTopicAndPartitionIndexLabel);
+ metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_throughput_out",
+ remoteCluster, replStats.msgThroughputOut, splitTopicAndPartitionIndexLabel);
+ metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_backlog", remoteCluster,
+ replStats.replicationBacklog, splitTopicAndPartitionIndexLabel);
+ metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_connected_count",
+ remoteCluster, replStats.connectedCount, splitTopicAndPartitionIndexLabel);
+ metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_rate_expired",
+ remoteCluster, replStats.msgRateExpired, splitTopicAndPartitionIndexLabel);
+ metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_delay_in_seconds",
+ remoteCluster, replStats.replicationDelayInSeconds, splitTopicAndPartitionIndexLabel);
});
}
- writeMetric(stream, "pulsar_in_bytes_total", stats.bytesInCounter, cluster, namespace, topic,
+ metric(stream, cluster, namespace, topic, "pulsar_in_bytes_total", stats.bytesInCounter,
splitTopicAndPartitionIndexLabel);
- writeMetric(stream, "pulsar_in_messages_total", stats.msgInCounter, cluster, namespace, topic,
+ metric(stream, cluster, namespace, topic, "pulsar_in_messages_total", stats.msgInCounter,
splitTopicAndPartitionIndexLabel);
// Compaction
boolean hasCompaction = compactorMXBean.flatMap(mxBean -> mxBean.getCompactionRecordForTopic(topic))
- .isPresent();
+ .map(__ -> true).orElse(false);
if (hasCompaction) {
- writeMetric(stream, "pulsar_compaction_removed_event_count",
- stats.compactionRemovedEventCount, cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
- writeMetric(stream, "pulsar_compaction_succeed_count",
- stats.compactionSucceedCount, cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
- writeMetric(stream, "pulsar_compaction_failed_count",
- stats.compactionFailedCount, cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
- writeMetric(stream, "pulsar_compaction_duration_time_in_mills",
- stats.compactionDurationTimeInMills, cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
- writeMetric(stream, "pulsar_compaction_read_throughput",
- stats.compactionReadThroughput, cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
- writeMetric(stream, "pulsar_compaction_write_throughput",
- stats.compactionWriteThroughput, cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
- writeMetric(stream, "pulsar_compaction_compacted_entries_count",
- stats.compactionCompactedEntriesCount, cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
- writeMetric(stream, "pulsar_compaction_compacted_entries_size",
- stats.compactionCompactedEntriesSize, cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
-
- long[] compactionBuckets = stats.compactionLatencyBuckets.getBuckets();
- writeMetric(stream, "pulsar_compaction_latency_le_0_5",
- compactionBuckets[0], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
- writeMetric(stream, "pulsar_compaction_latency_le_1",
- compactionBuckets[1], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
- writeMetric(stream, "pulsar_compaction_latency_le_5",
- compactionBuckets[2], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
- writeMetric(stream, "pulsar_compaction_latency_le_10",
- compactionBuckets[3], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
- writeMetric(stream, "pulsar_compaction_latency_le_20",
- compactionBuckets[4], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
- writeMetric(stream, "pulsar_compaction_latency_le_50",
- compactionBuckets[5], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
- writeMetric(stream, "pulsar_compaction_latency_le_100",
- compactionBuckets[6], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
- writeMetric(stream, "pulsar_compaction_latency_le_200",
- compactionBuckets[7], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
- writeMetric(stream, "pulsar_compaction_latency_le_1000",
- compactionBuckets[8], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
- writeMetric(stream, "pulsar_compaction_latency_overflow",
- compactionBuckets[9], cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
- writeMetric(stream, "pulsar_compaction_latency_sum",
- stats.compactionLatencyBuckets.getSum(), cluster, namespace, topic,
- splitTopicAndPartitionIndexLabel);
- writeMetric(stream, "pulsar_compaction_latency_count",
- stats.compactionLatencyBuckets.getCount(), cluster, namespace, topic,
- splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_compaction_removed_event_count",
+ stats.compactionRemovedEventCount, splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_compaction_succeed_count",
+ stats.compactionSucceedCount, splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_compaction_failed_count",
+ stats.compactionFailedCount, splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_compaction_duration_time_in_mills",
+ stats.compactionDurationTimeInMills, splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_compaction_read_throughput",
+ stats.compactionReadThroughput, splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_compaction_write_throughput",
+ stats.compactionWriteThroughput, splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_compaction_compacted_entries_count",
+ stats.compactionCompactedEntriesCount, splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_compaction_compacted_entries_size",
+ stats.compactionCompactedEntriesSize, splitTopicAndPartitionIndexLabel);
+ long[] compactionLatencyBuckets = stats.compactionLatencyBuckets.getBuckets();
+ metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_0_5",
+ compactionLatencyBuckets[0], splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_1",
+ compactionLatencyBuckets[1], splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_5",
+ compactionLatencyBuckets[2], splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_10",
+ compactionLatencyBuckets[3], splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_20",
+ compactionLatencyBuckets[4], splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_50",
+ compactionLatencyBuckets[5], splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_100",
+ compactionLatencyBuckets[6], splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_200",
+ compactionLatencyBuckets[7], splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_1000",
+ compactionLatencyBuckets[8], splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_overflow",
+ compactionLatencyBuckets[9], splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_sum",
+ stats.compactionLatencyBuckets.getSum(), splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_count",
+ stats.compactionLatencyBuckets.getCount(), splitTopicAndPartitionIndexLabel);
}
}
- private static void writeMetric(PrometheusMetricStreams stream, String metricName, Number value, String cluster,
- String namespace, String topic, boolean splitTopicAndPartitionIndexLabel) {
- writeTopicMetric(stream, metricName, value, cluster, namespace, topic, splitTopicAndPartitionIndexLabel);
+ static void metricType(SimpleTextOutputStream stream, String name) {
+
+ if (!metricWithTypeDefinition.containsKey(name)) {
+ metricWithTypeDefinition.put(name, "gauge");
+ stream.write("# TYPE ").write(name).write(" gauge\n");
+ }
+
+ }
+
+ private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic,
+ String name, double value, boolean splitTopicAndPartitionIndexLabel) {
+ metricType(stream, name);
+ appendRequiredLabels(stream, cluster, namespace, topic, name, splitTopicAndPartitionIndexLabel).write("\"} ");
+ stream.write(value);
+ appendEndings(stream);
}
- private static void writeMetric(PrometheusMetricStreams stream, String metricName, Number value, String cluster,
- String namespace, String topic, String remoteCluster,
- boolean splitTopicAndPartitionIndexLabel) {
- writeTopicMetric(stream, metricName, value, cluster, namespace, topic, splitTopicAndPartitionIndexLabel,
- "remote_cluster", remoteCluster);
+ private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic,
+ String subscription, String name, long value, boolean splitTopicAndPartitionIndexLabel) {
+ metricType(stream, name);
+ appendRequiredLabels(stream, cluster, namespace, topic, name, splitTopicAndPartitionIndexLabel)
+ .write("\",subscription=\"").write(subscription).write("\"} ");
+ stream.write(value);
+ appendEndings(stream);
}
- private static void writeProducerMetric(PrometheusMetricStreams stream, String metricName, Number value,
- String cluster, String namespace, String topic, String producer,
- long producerId, boolean splitTopicAndPartitionIndexLabel) {
- writeTopicMetric(stream, metricName, value, cluster, namespace, topic, splitTopicAndPartitionIndexLabel,
- "producer_name", producer, "producer_id", String.valueOf(producerId));
+ private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic,
+ String producerName, long produceId, String name, double value, boolean splitTopicAndPartitionIndexLabel) {
+ metricType(stream, name);
+ appendRequiredLabels(stream, cluster, namespace, topic, name, splitTopicAndPartitionIndexLabel)
+ .write("\",producer_name=\"").write(producerName)
+ .write("\",producer_id=\"").write(produceId).write("\"} ");
+ stream.write(value);
+ appendEndings(stream);
}
+ private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic,
+ String subscription, String name, double value, boolean splitTopicAndPartitionIndexLabel) {
+ metricType(stream, name);
+ appendRequiredLabels(stream, cluster, namespace, topic, name, splitTopicAndPartitionIndexLabel)
+ .write("\",subscription=\"").write(subscription).write("\"} ");
+ stream.write(value);
+ appendEndings(stream);
+ }
+
+ private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic,
+ String subscription, String consumerName, long consumerId, String name, long value,
+ boolean splitTopicAndPartitionIndexLabel) {
+ metricType(stream, name);
+ appendRequiredLabels(stream, cluster, namespace, topic, name, splitTopicAndPartitionIndexLabel)
+ .write("\",subscription=\"").write(subscription)
+ .write("\",consumer_name=\"").write(consumerName).write("\",consumer_id=\"").write(consumerId)
+ .write("\"} ");
+ stream.write(value);
+ appendEndings(stream);
+ }
- private static void writeSubscriptionMetric(PrometheusMetricStreams stream, String metricName, Number value,
- String cluster, String namespace, String topic, String subscription,
- boolean splitTopicAndPartitionIndexLabel) {
- writeTopicMetric(stream, metricName, value, cluster, namespace, topic, splitTopicAndPartitionIndexLabel,
- "subscription", subscription);
+ private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic,
+ String subscription, String consumerName, long consumerId, String name, double value,
+ boolean splitTopicAndPartitionIndexLabel) {
+ metricType(stream, name);
+ appendRequiredLabels(stream, cluster, namespace, topic, name, splitTopicAndPartitionIndexLabel)
+ .write("\",subscription=\"").write(subscription)
+ .write("\",consumer_name=\"").write(consumerName).write("\",consumer_id=\"")
+ .write(consumerId).write("\"} ");
+ stream.write(value);
+ appendEndings(stream);
}
- private static void writeConsumerMetric(PrometheusMetricStreams stream, String metricName, Number value,
- String cluster, String namespace, String topic, String subscription,
- Consumer consumer, boolean splitTopicAndPartitionIndexLabel) {
- writeTopicMetric(stream, metricName, value, cluster, namespace, topic, splitTopicAndPartitionIndexLabel,
- "subscription", subscription, "consumer_name", consumer.consumerName(),
- "consumer_id", String.valueOf(consumer.consumerId()));
+ private static void metricWithRemoteCluster(SimpleTextOutputStream stream, String cluster, String namespace,
+ String topic, String name, String remoteCluster, double value, boolean splitTopicAndPartitionIndexLabel) {
+ metricType(stream, name);
+ appendRequiredLabels(stream, cluster, namespace, topic, name, splitTopicAndPartitionIndexLabel)
+ .write("\",remote_cluster=\"").write(remoteCluster).write("\"} ");
+ stream.write(value);
+ appendEndings(stream);
}
- static void writeTopicMetric(PrometheusMetricStreams stream, String metricName, Number value, String cluster,
- String namespace, String topic, boolean splitTopicAndPartitionIndexLabel,
- String... extraLabelsAndValues) {
- String[] labelsAndValues = new String[splitTopicAndPartitionIndexLabel ? 8 : 6];
- labelsAndValues[0] = "cluster";
- labelsAndValues[1] = cluster;
- labelsAndValues[2] = "namespace";
- labelsAndValues[3] = namespace;
- labelsAndValues[4] = "topic";
+ private static SimpleTextOutputStream appendRequiredLabels(SimpleTextOutputStream stream, String cluster,
+ String namespace, String topic, String name, boolean splitTopicAndPartitionIndexLabel) {
+ stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace);
if (splitTopicAndPartitionIndexLabel) {
int index = topic.indexOf(PARTITIONED_TOPIC_SUFFIX);
if (index > 0) {
- labelsAndValues[5] = topic.substring(0, index);
- labelsAndValues[6] = "partition";
- labelsAndValues[7] = topic.substring(index + PARTITIONED_TOPIC_SUFFIX.length());
+ stream.write("\",topic=\"").write(topic.substring(0, index)).write("\",partition=\"")
+ .write(topic.substring(index + PARTITIONED_TOPIC_SUFFIX.length()));
} else {
- labelsAndValues[5] = topic;
- labelsAndValues[6] = "partition";
- labelsAndValues[7] = "-1";
+ stream.write("\",topic=\"").write(topic).write("\",partition=\"").write("-1");
}
} else {
- labelsAndValues[5] = topic;
+ stream.write("\",topic=\"").write(topic);
}
- String[] labels = ArrayUtils.addAll(labelsAndValues, extraLabelsAndValues);
- stream.writeSample(metricName, value, labels);
+ return stream;
+ }
+
+ private static void appendEndings(SimpleTextOutputStream stream) {
+ stream.write(' ').write(System.currentTimeMillis()).write('\n');
}
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java
index 8c58b516333..e6ac1535f43 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java
@@ -20,6 +20,8 @@ package org.apache.pulsar.broker.stats.prometheus;
import static org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsEventsNames;
import io.netty.util.concurrent.FastThreadLocal;
+import java.util.HashMap;
+import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl;
@@ -28,6 +30,7 @@ import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
import org.apache.pulsar.transaction.coordinator.impl.TransactionMetadataStoreStats;
@@ -35,10 +38,21 @@ import org.apache.pulsar.transaction.coordinator.impl.TransactionMetadataStoreSt
@Slf4j
public class TransactionAggregator {
+ /**
+ * Used for tracking duplicate TYPE definitions.
+ */
+ private static final FastThreadLocal<Map<String, String>> threadLocalMetricWithTypeDefinition =
+ new FastThreadLocal() {
+ @Override
+ protected Map<String, String> initialValue() {
+ return new HashMap<>();
+ }
+ };
+
private static final FastThreadLocal<AggregatedTransactionCoordinatorStats> localTransactionCoordinatorStats =
new FastThreadLocal<AggregatedTransactionCoordinatorStats>() {
@Override
- protected AggregatedTransactionCoordinatorStats initialValue() {
+ protected AggregatedTransactionCoordinatorStats initialValue() throws Exception {
return new AggregatedTransactionCoordinatorStats();
}
};
@@ -46,18 +60,21 @@ public class TransactionAggregator {
private static final FastThreadLocal<ManagedLedgerStats> localManageLedgerStats =
new FastThreadLocal<ManagedLedgerStats>() {
@Override
- protected ManagedLedgerStats initialValue() {
+ protected ManagedLedgerStats initialValue() throws Exception {
return new ManagedLedgerStats();
}
};
- public static void generate(PulsarService pulsar, PrometheusMetricStreams stream, boolean includeTopicMetrics) {
+ public static void generate(PulsarService pulsar, SimpleTextOutputStream stream, boolean includeTopicMetrics) {
String cluster = pulsar.getConfiguration().getClusterName();
+ Map<String, String> metricWithTypeDefinition = threadLocalMetricWithTypeDefinition.get();
+ metricWithTypeDefinition.clear();
if (includeTopicMetrics) {
+ pulsar.getBrokerService().getMultiLayerTopicMap().forEach((namespace, bundlesMap) -> {
- pulsar.getBrokerService().getMultiLayerTopicMap().forEach((namespace, bundlesMap) ->
- bundlesMap.forEach((bundle, topicsMap) -> topicsMap.forEach((name, topic) -> {
+ bundlesMap.forEach((bundle, topicsMap) -> {
+ topicsMap.forEach((name, topic) -> {
if (topic instanceof PersistentTopic) {
topic.getSubscriptions().values().forEach(subscription -> {
try {
@@ -65,8 +82,9 @@ public class TransactionAggregator {
if (!checkTopicIsEventsNames(TopicName.get(subscription.getTopic().getName()))
&& subscription instanceof PersistentSubscription
&& ((PersistentSubscription) subscription).checkIfPendingAckStoreInit()) {
- ManagedLedger managedLedger = ((PersistentSubscription) subscription)
- .getPendingAckManageLedger().get();
+ ManagedLedger managedLedger =
+ ((PersistentSubscription) subscription)
+ .getPendingAckManageLedger().get();
generateManageLedgerStats(managedLedger,
stream, cluster, namespace, name, subscription.getName());
}
@@ -75,7 +93,9 @@ public class TransactionAggregator {
}
});
}
- })));
+ });
+ });
+ });
}
AggregatedTransactionCoordinatorStats transactionCoordinatorStats = localTransactionCoordinatorStats.get();
@@ -104,18 +124,18 @@ public class TransactionAggregator {
localManageLedgerStats.get().reset();
if (transactionMetadataStore instanceof MLTransactionMetadataStore) {
- ManagedLedger managedLedger =
- ((MLTransactionMetadataStore) transactionMetadataStore).getManagedLedger();
+ ManagedLedger managedLedger =
+ ((MLTransactionMetadataStore) transactionMetadataStore).getManagedLedger();
generateManageLedgerStats(managedLedger,
stream, cluster, NamespaceName.SYSTEM_NAMESPACE.toString(),
MLTransactionLogImpl.TRANSACTION_LOG_PREFIX + transactionCoordinatorID.getId(),
MLTransactionLogImpl.TRANSACTION_SUBSCRIPTION_NAME);
}
- });
+ });
}
- private static void generateManageLedgerStats(ManagedLedger managedLedger, PrometheusMetricStreams stream,
+ private static void generateManageLedgerStats(ManagedLedger managedLedger, SimpleTextOutputStream stream,
String cluster, String namespace, String topic, String subscription) {
ManagedLedgerStats managedLedgerStats = localManageLedgerStats.get();
ManagedLedgerMBeanImpl mlStats = (ManagedLedgerMBeanImpl) managedLedger.getStats();
@@ -137,149 +157,174 @@ public class TransactionAggregator {
managedLedgerStats.storageWriteRate = mlStats.getAddEntryMessagesRate();
managedLedgerStats.storageReadRate = mlStats.getReadEntriesRate();
- printManageLedgerStats(stream, cluster, namespace, topic, subscription, managedLedgerStats);
+ printManageLedgerStats(stream, cluster, namespace, topic,
+ subscription, managedLedgerStats);
+ }
+
+ private static void metricType(SimpleTextOutputStream stream, String name) {
+ Map<String, String> metricWithTypeDefinition = threadLocalMetricWithTypeDefinition.get();
+ if (!metricWithTypeDefinition.containsKey(name)) {
+ metricWithTypeDefinition.put(name, "gauge");
+ stream.write("# TYPE ").write(name).write(" gauge\n");
+ }
+
+ }
+
+ private static void metric(SimpleTextOutputStream stream, String cluster, String name,
+ double value, long coordinatorId) {
+ metricType(stream, name);
+ stream.write(name)
+ .write("{cluster=\"").write(cluster)
+ .write("\",coordinator_id=\"").write(coordinatorId).write("\"} ")
+ .write(value).write(' ').write(System.currentTimeMillis())
+ .write('\n');
}
- private static void printManageLedgerStats(PrometheusMetricStreams stream, String cluster, String namespace,
+ private static void metrics(SimpleTextOutputStream stream, String cluster, String namespace,
+ String topic, String subscription, String name, long value) {
+ stream.write(name).write("{cluster=\"").write(cluster).write("\", namespace=\"").write(namespace)
+ .write("\",topic=\"").write(topic).write("\",subscription=\"").write(subscription).write("\"} ");
+ stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
+ }
+
+ private static void metrics(SimpleTextOutputStream stream, String cluster, String namespace,
+ String topic, String subscription, String name, double value) {
+ stream.write(name).write("{cluster=\"").write(cluster).write("\", namespace=\"").write(namespace)
+ .write("\",topic=\"").write(topic).write("\",subscription=\"").write(subscription).write("\"} ");
+ stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
+ }
+
+ private static void printManageLedgerStats(SimpleTextOutputStream stream, String cluster, String namespace,
String topic, String subscription, ManagedLedgerStats stats) {
- writeMetric(stream, "pulsar_storage_size", stats.storageSize, cluster, namespace, topic, subscription);
- writeMetric(stream, "pulsar_storage_logical_size", stats.storageLogicalSize, cluster, namespace, topic,
- subscription);
- writeMetric(stream, "pulsar_storage_backlog_size", stats.backlogSize, cluster, namespace, topic,
- subscription);
- writeMetric(stream, "pulsar_storage_offloaded_size", stats.offloadedStorageUsed, cluster, namespace, topic,
- subscription);
+ metrics(stream, cluster, namespace, topic, subscription,
+ "pulsar_storage_size", stats.storageSize);
+ metrics(stream, cluster, namespace, topic, subscription,
+ "pulsar_storage_logical_size", stats.storageLogicalSize);
+ metrics(stream, cluster, namespace, topic, subscription,
+ "pulsar_storage_backlog_size", stats.backlogSize);
+ metrics(stream, cluster, namespace, topic, subscription,
+ "pulsar_storage_offloaded_size", stats.offloadedStorageUsed);
- writeMetric(stream, "pulsar_storage_write_rate", stats.storageWriteRate, cluster, namespace, topic,
- subscription);
- writeMetric(stream, "pulsar_storage_read_rate", stats.storageReadRate, cluster, namespace, topic,
- subscription);
+ metrics(stream, cluster, namespace, topic, subscription,
+ "pulsar_storage_write_rate", stats.storageWriteRate);
+ metrics(stream, cluster, namespace, topic, subscription,
+ "pulsar_storage_read_rate", stats.storageReadRate);
stats.storageWriteLatencyBuckets.refresh();
long[] latencyBuckets = stats.storageWriteLatencyBuckets.getBuckets();
- writeMetric(stream, "pulsar_storage_write_latency_le_0_5", latencyBuckets[0], cluster, namespace, topic,
- subscription);
- writeMetric(stream, "pulsar_storage_write_latency_le_1", latencyBuckets[1], cluster, namespace, topic,
- subscription);
- writeMetric(stream, "pulsar_storage_write_latency_le_5", latencyBuckets[2], cluster, namespace, topic,
- subscription);
- writeMetric(stream, "pulsar_storage_write_latency_le_10", latencyBuckets[3], cluster, namespace, topic,
- subscription);
- writeMetric(stream, "pulsar_storage_write_latency_le_20", latencyBuckets[4], cluster, namespace, topic,
- subscription);
- writeMetric(stream, "pulsar_storage_write_latency_le_50", latencyBuckets[5], cluster, namespace, topic,
- subscription);
- writeMetric(stream, "pulsar_storage_write_latency_le_100", latencyBuckets[6], cluster, namespace, topic,
- subscription);
- writeMetric(stream, "pulsar_storage_write_latency_le_200", latencyBuckets[7], cluster, namespace, topic,
- subscription);
- writeMetric(stream, "pulsar_storage_write_latency_le_1000", latencyBuckets[8], cluster, namespace, topic,
- subscription);
- writeMetric(stream, "pulsar_storage_write_latency_overflow", latencyBuckets[9], cluster, namespace, topic,
- subscription);
- writeMetric(stream, "pulsar_storage_write_latency_count", stats.storageWriteLatencyBuckets.getCount(),
- cluster, namespace, topic, subscription);
- writeMetric(stream, "pulsar_storage_write_latency_sum", stats.storageWriteLatencyBuckets.getSum(), cluster,
- namespace, topic, subscription);
+ metric(stream, cluster, namespace, topic, subscription,
+ "pulsar_storage_write_latency_le_0_5", latencyBuckets[0]);
+ metric(stream, cluster, namespace, topic, subscription,
+ "pulsar_storage_write_latency_le_1", latencyBuckets[1]);
+ metric(stream, cluster, namespace, topic, subscription,
+ "pulsar_storage_write_latency_le_5", latencyBuckets[2]);
+ metric(stream, cluster, namespace, topic, subscription,
+ "pulsar_storage_write_latency_le_10", latencyBuckets[3]);
+ metric(stream, cluster, namespace, topic, subscription,
+ "pulsar_storage_write_latency_le_20", latencyBuckets[4]);
+ metric(stream, cluster, namespace, topic, subscription,
+ "pulsar_storage_write_latency_le_50", latencyBuckets[5]);
+ metric(stream, cluster, namespace, topic, subscription,
+ "pulsar_storage_write_latency_le_100", latencyBuckets[6]);
+ metric(stream, cluster, namespace, topic, subscription,
+ "pulsar_storage_write_latency_le_200", latencyBuckets[7]);
+ metric(stream, cluster, namespace, topic, subscription,
+ "pulsar_storage_write_latency_le_1000", latencyBuckets[8]);
+ metric(stream, cluster, namespace, topic, subscription,
+ "pulsar_storage_write_latency_overflow", latencyBuckets[9]);
+ metric(stream, cluster, namespace, topic, subscription, "pulsar_storage_write_latency_count",
+ stats.storageWriteLatencyBuckets.getCount());
+ metric(stream, cluster, namespace, topic, subscription, "pulsar_storage_write_latency_sum",
+ stats.storageWriteLatencyBuckets.getSum());
stats.storageLedgerWriteLatencyBuckets.refresh();
- long[] ledgerWriteLatencyBuckets = stats.storageLedgerWriteLatencyBuckets.getBuckets();
- writeMetric(stream, "pulsar_storage_ledger_write_latency_le_0_5", ledgerWriteLatencyBuckets[0], cluster,
- namespace, topic, subscription);
- writeMetric(stream, "pulsar_storage_ledger_write_latency_le_1", ledgerWriteLatencyBuckets[1], cluster,
- namespace, topic, subscription);
- writeMetric(stream, "pulsar_storage_ledger_write_latency_le_5", ledgerWriteLatencyBuckets[2], cluster,
- namespace, topic, subscription);
- writeMetric(stream, "pulsar_storage_ledger_write_latency_le_10", ledgerWriteLatencyBuckets[3], cluster,
- namespace, topic, subscription);
- writeMetric(stream, "pulsar_storage_ledger_write_latency_le_20", ledgerWriteLatencyBuckets[4], cluster,
- namespace, topic, subscription);
- writeMetric(stream, "pulsar_storage_ledger_write_latency_le_50", ledgerWriteLatencyBuckets[5], cluster,
- namespace, topic, subscription);
- writeMetric(stream, "pulsar_storage_ledger_write_latency_le_100", ledgerWriteLatencyBuckets[6], cluster,
- namespace, topic, subscription);
- writeMetric(stream, "pulsar_storage_ledger_write_latency_le_200", ledgerWriteLatencyBuckets[7], cluster,
- namespace, topic, subscription);
- writeMetric(stream, "pulsar_storage_ledger_write_latency_le_1000", ledgerWriteLatencyBuckets[8], cluster,
- namespace, topic, subscription);
- writeMetric(stream, "pulsar_storage_ledger_write_latency_overflow", ledgerWriteLatencyBuckets[9], cluster,
- namespace, topic, subscription);
- writeMetric(stream, "pulsar_storage_ledger_write_latency_count",
- stats.storageLedgerWriteLatencyBuckets.getCount(), cluster, namespace, topic, subscription);
- writeMetric(stream, "pulsar_storage_ledger_write_latency_sum",
- stats.storageLedgerWriteLatencyBuckets.getSum(), cluster, namespace, topic, subscription);
+ long[] ledgerWritelatencyBuckets = stats.storageLedgerWriteLatencyBuckets.getBuckets();
+ metric(stream, cluster, namespace, topic, subscription,
+ "pulsar_storage_ledger_write_latency_le_0_5", ledgerWritelatencyBuckets[0]);
+ metric(stream, cluster, namespace, topic, subscription,
+ "pulsar_storage_ledger_write_latency_le_1", ledgerWritelatencyBuckets[1]);
+ metric(stream, cluster, namespace, topic, subscription,
+ "pulsar_storage_ledger_write_latency_le_5", ledgerWritelatencyBuckets[2]);
+ metric(stream, cluster, namespace, topic, subscription,
+ "pulsar_storage_ledger_write_latency_le_10", ledgerWritelatencyBuckets[3]);
+ metric(stream, cluster, namespace, topic, subscription,
+ "pulsar_storage_ledger_write_latency_le_20", ledgerWritelatencyBuckets[4]);
+ metric(stream, cluster, namespace, topic, subscription,
+ "pulsar_storage_ledger_write_latency_le_50", ledgerWritelatencyBuckets[5]);
+ metric(stream, cluster, namespace, topic, subscription,
+ "pulsar_storage_ledger_write_latency_le_100", ledgerWritelatencyBuckets[6]);
+ metric(stream, cluster, namespace, topic, subscription,
+ "pulsar_storage_ledger_write_latency_le_200", ledgerWritelatencyBuckets[7]);
+ metric(stream, cluster, namespace, topic, subscription,
+ "pulsar_storage_ledger_write_latency_le_1000", ledgerWritelatencyBuckets[8]);
+ metric(stream, cluster, namespace, topic, subscription, "pulsar_storage_ledger_write_latency_overflow",
+ ledgerWritelatencyBuckets[9]);
+ metric(stream, cluster, namespace, topic, subscription, "pulsar_storage_ledger_write_latency_count",
+ stats.storageLedgerWriteLatencyBuckets.getCount());
+ metric(stream, cluster, namespace, topic, subscription, "pulsar_storage_ledger_write_latency_sum",
+ stats.storageLedgerWriteLatencyBuckets.getSum());
stats.entrySizeBuckets.refresh();
long[] entrySizeBuckets = stats.entrySizeBuckets.getBuckets();
- writeMetric(stream, "pulsar_entry_size_le_128", entrySizeBuckets[0], cluster, namespace, topic,
- subscription);
- writeMetric(stream, "pulsar_entry_size_le_512", entrySizeBuckets[1], cluster, namespace, topic,
- subscription);
- writeMetric(stream, "pulsar_entry_size_le_1_kb", entrySizeBuckets[2], cluster, namespace, topic,
- subscription);
- writeMetric(stream, "pulsar_entry_size_le_2_kb", entrySizeBuckets[3], cluster, namespace, topic,
- subscription);
- writeMetric(stream, "pulsar_entry_size_le_4_kb", entrySizeBuckets[4], cluster, namespace, topic,
- subscription);
- writeMetric(stream, "pulsar_entry_size_le_16_kb", entrySizeBuckets[5], cluster, namespace, topic,
- subscription);
- writeMetric(stream, "pulsar_entry_size_le_100_kb", entrySizeBuckets[6], cluster, namespace, topic,
- subscription);
- writeMetric(stream, "pulsar_entry_size_le_1_mb", entrySizeBuckets[7], cluster, namespace, topic,
- subscription);
- writeMetric(stream, "pulsar_entry_size_le_overflow", entrySizeBuckets[8], cluster, namespace, topic,
- subscription);
- writeMetric(stream, "pulsar_entry_size_count", stats.entrySizeBuckets.getCount(), cluster, namespace,
- topic, subscription);
- writeMetric(stream, "pulsar_entry_size_sum", stats.entrySizeBuckets.getSum(), cluster, namespace, topic,
- subscription);
+ metric(stream, cluster, namespace, topic, subscription, "pulsar_entry_size_le_128", entrySizeBuckets[0]);
+ metric(stream, cluster, namespace, topic, subscription, "pulsar_entry_size_le_512", entrySizeBuckets[1]);
+ metric(stream, cluster, namespace, topic, subscription, "pulsar_entry_size_le_1_kb", entrySizeBuckets[2]);
+ metric(stream, cluster, namespace, topic, subscription, "pulsar_entry_size_le_2_kb", entrySizeBuckets[3]);
+ metric(stream, cluster, namespace, topic, subscription, "pulsar_entry_size_le_4_kb", entrySizeBuckets[4]);
+ metric(stream, cluster, namespace, topic, subscription, "pulsar_entry_size_le_16_kb", entrySizeBuckets[5]);
+ metric(stream, cluster, namespace, topic, subscription,
+ "pulsar_entry_size_le_100_kb", entrySizeBuckets[6]);
+ metric(stream, cluster, namespace, topic, subscription, "pulsar_entry_size_le_1_mb", entrySizeBuckets[7]);
+ metric(stream, cluster, namespace, topic, subscription,
+ "pulsar_entry_size_le_overflow", entrySizeBuckets[8]);
+ metric(stream, cluster, namespace, topic, subscription,
+ "pulsar_entry_size_count", stats.entrySizeBuckets.getCount());
+ metric(stream, cluster, namespace, topic, subscription,
+ "pulsar_entry_size_sum", stats.entrySizeBuckets.getSum());
+ }
+
+ private static void metric(SimpleTextOutputStream stream, String cluster,
+ String namespace, String topic, String subscription,
+ String name, long value) {
+ stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace)
+ .write("\",topic=\"").write(topic).write("\",subscription=\"").write(subscription).write("\"} ");
+ stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
}
- static void printTransactionCoordinatorStats(PrometheusMetricStreams stream, String cluster,
+ static void printTransactionCoordinatorStats(SimpleTextOutputStream stream, String cluster,
AggregatedTransactionCoordinatorStats stats,
long coordinatorId) {
- writeMetric(stream, "pulsar_txn_active_count", stats.actives, cluster,
- coordinatorId);
- writeMetric(stream, "pulsar_txn_committed_count", stats.committedCount, cluster,
- coordinatorId);
- writeMetric(stream, "pulsar_txn_aborted_count", stats.abortedCount, cluster,
- coordinatorId);
- writeMetric(stream, "pulsar_txn_created_count", stats.createdCount, cluster,
- coordinatorId);
- writeMetric(stream, "pulsar_txn_timeout_count", stats.timeoutCount, cluster,
- coordinatorId);
- writeMetric(stream, "pulsar_txn_append_log_count", stats.appendLogCount, cluster,
- coordinatorId);
+ metric(stream, cluster, "pulsar_txn_active_count",
+ stats.actives, coordinatorId);
+ metric(stream, cluster, "pulsar_txn_committed_count",
+ stats.committedCount, coordinatorId);
+ metric(stream, cluster, "pulsar_txn_aborted_count",
+ stats.abortedCount, coordinatorId);
+ metric(stream, cluster, "pulsar_txn_created_count",
+ stats.createdCount, coordinatorId);
+ metric(stream, cluster, "pulsar_txn_timeout_count",
+ stats.timeoutCount, coordinatorId);
+ metric(stream, cluster, "pulsar_txn_append_log_count",
+ stats.appendLogCount, coordinatorId);
long[] latencyBuckets = stats.executionLatency;
- writeMetric(stream, "pulsar_txn_execution_latency_le_10", latencyBuckets[0], cluster, coordinatorId);
- writeMetric(stream, "pulsar_txn_execution_latency_le_20", latencyBuckets[1], cluster, coordinatorId);
- writeMetric(stream, "pulsar_txn_execution_latency_le_50", latencyBuckets[2], cluster, coordinatorId);
- writeMetric(stream, "pulsar_txn_execution_latency_le_100", latencyBuckets[3], cluster, coordinatorId);
- writeMetric(stream, "pulsar_txn_execution_latency_le_500", latencyBuckets[4], cluster, coordinatorId);
- writeMetric(stream, "pulsar_txn_execution_latency_le_1000", latencyBuckets[5], cluster, coordinatorId);
- writeMetric(stream, "pulsar_txn_execution_latency_le_5000", latencyBuckets[6], cluster, coordinatorId);
- writeMetric(stream, "pulsar_txn_execution_latency_le_15000", latencyBuckets[7], cluster, coordinatorId);
- writeMetric(stream, "pulsar_txn_execution_latency_le_30000", latencyBuckets[8], cluster, coordinatorId);
- writeMetric(stream, "pulsar_txn_execution_latency_le_60000", latencyBuckets[9], cluster, coordinatorId);
- writeMetric(stream, "pulsar_txn_execution_latency_le_300000", latencyBuckets[10], cluster,
- coordinatorId);
- writeMetric(stream, "pulsar_txn_execution_latency_le_1500000", latencyBuckets[11], cluster,
- coordinatorId);
- writeMetric(stream, "pulsar_txn_execution_latency_le_3000000", latencyBuckets[12], cluster,
- coordinatorId);
- writeMetric(stream, "pulsar_txn_execution_latency_le_overflow", latencyBuckets[13], cluster,
- coordinatorId);
- }
-
- private static void writeMetric(PrometheusMetricStreams stream, String metricName, double value, String cluster,
- long coordinatorId) {
- stream.writeSample(metricName, value, "cluster", cluster, "coordinator_id", String.valueOf(coordinatorId));
- }
-
- private static void writeMetric(PrometheusMetricStreams stream, String metricName, Number value, String cluster,
- String namespace, String topic, String subscription) {
- stream.writeSample(metricName, value, "cluster", cluster, "namespace", namespace, "topic", topic,
- "subscription", subscription);
+ metric(stream, cluster, "pulsar_txn_execution_latency_le_10", latencyBuckets[0], coordinatorId);
+ metric(stream, cluster, "pulsar_txn_execution_latency_le_20", latencyBuckets[1], coordinatorId);
+ metric(stream, cluster, "pulsar_txn_execution_latency_le_50", latencyBuckets[2], coordinatorId);
+ metric(stream, cluster, "pulsar_txn_execution_latency_le_100", latencyBuckets[3], coordinatorId);
+ metric(stream, cluster, "pulsar_txn_execution_latency_le_500", latencyBuckets[4], coordinatorId);
+ metric(stream, cluster, "pulsar_txn_execution_latency_le_1000", latencyBuckets[5], coordinatorId);
+ metric(stream, cluster, "pulsar_txn_execution_latency_le_5000", latencyBuckets[6], coordinatorId);
+ metric(stream, cluster, "pulsar_txn_execution_latency_le_15000", latencyBuckets[7], coordinatorId);
+ metric(stream, cluster, "pulsar_txn_execution_latency_le_30000", latencyBuckets[8], coordinatorId);
+ metric(stream, cluster, "pulsar_txn_execution_latency_le_60000", latencyBuckets[9], coordinatorId);
+ metric(stream, cluster, "pulsar_txn_execution_latency_le_300000",
+ latencyBuckets[10], coordinatorId);
+ metric(stream, cluster, "pulsar_txn_execution_latency_le_1500000",
+ latencyBuckets[11], coordinatorId);
+ metric(stream, cluster, "pulsar_txn_execution_latency_le_3000000",
+ latencyBuckets[12], coordinatorId);
+ metric(stream, cluster, "pulsar_txn_execution_latency_le_overflow",
+ latencyBuckets[13], coordinatorId);
}
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusTextFormatUtil.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusTextFormatUtil.java
index 8f704b11e76..7550096c2b5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusTextFormatUtil.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusTextFormatUtil.java
@@ -18,8 +18,13 @@
*/
package org.apache.pulsar.broker.stats.prometheus.metrics;
+import io.prometheus.client.Collector;
+import io.prometheus.client.Collector.MetricFamilySamples;
+import io.prometheus.client.Collector.MetricFamilySamples.Sample;
+import io.prometheus.client.CollectorRegistry;
import java.io.IOException;
import java.io.Writer;
+import java.util.Enumeration;
import org.apache.bookkeeper.stats.Counter;
/**
@@ -135,4 +140,31 @@ public class PrometheusTextFormatUtil {
.append(success.toString()).append("\"} ")
.append(Double.toString(opStat.getSum(success))).append('\n');
}
+
+ public static void writeMetricsCollectedByPrometheusClient(Writer w, CollectorRegistry registry)
+ throws IOException {
+ Enumeration<MetricFamilySamples> metricFamilySamples = registry.metricFamilySamples();
+ while (metricFamilySamples.hasMoreElements()) {
+ MetricFamilySamples metricFamily = metricFamilySamples.nextElement();
+
+ for (int i = 0; i < metricFamily.samples.size(); i++) {
+ Sample sample = metricFamily.samples.get(i);
+ w.write(sample.name);
+ w.write('{');
+ for (int j = 0; j < sample.labelNames.size(); j++) {
+ if (j != 0) {
+ w.write(", ");
+ }
+ w.write(sample.labelNames.get(j));
+ w.write("=\"");
+ w.write(sample.labelValues.get(j));
+ w.write('"');
+ }
+
+ w.write("} ");
+ w.write(Collector.doubleToGoString(sample.value));
+ w.write('\n');
+ }
+ }
+ }
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index be10c8dc65e..f28412ea751 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -50,7 +50,6 @@ import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.crypto.SecretKey;
@@ -1405,63 +1404,6 @@ public class PrometheusMetricsTest extends BrokerTestBase {
assertEquals(cm.get(0).value, count);
}
- @Test
- public void testMetricsGroupedByTypeDefinitions() throws Exception {
- Producer<byte[]> p1 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();
- Producer<byte[]> p2 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic2").create();
- for (int i = 0; i < 10; i++) {
- String message = "my-message-" + i;
- p1.send(message.getBytes());
- p2.send(message.getBytes());
- }
-
- ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
- PrometheusMetricsGenerator.generate(pulsar, false, false, false, statsOut);
- String metricsStr = statsOut.toString();
-
- Pattern typePattern = Pattern.compile("^#\\s+TYPE\\s+(\\w+)\\s+(\\w+)");
- Pattern metricNamePattern = Pattern.compile("^(\\w+)\\{.+");
-
- AtomicReference<String> currentMetric = new AtomicReference<>();
- Splitter.on("\n").split(metricsStr).forEach(line -> {
- if (line.isEmpty()) {
- return;
- }
- if (line.startsWith("#")) {
- // Get the current type definition
- Matcher typeMatcher = typePattern.matcher(line);
- checkArgument(typeMatcher.matches());
- String metricName = typeMatcher.group(1);
- currentMetric.set(metricName);
- } else {
- Matcher metricMatcher = metricNamePattern.matcher(line);
- checkArgument(metricMatcher.matches());
- String metricName = metricMatcher.group(1);
-
- if (metricName.endsWith("_bucket")) {
- metricName = metricName.substring(0, metricName.indexOf("_bucket"));
- } else if (metricName.endsWith("_count") && !currentMetric.get().endsWith("_count")) {
- metricName = metricName.substring(0, metricName.indexOf("_count"));
- } else if (metricName.endsWith("_sum") && !currentMetric.get().endsWith("_sum")) {
- metricName = metricName.substring(0, metricName.indexOf("_sum"));
- } else if (metricName.endsWith("_total") && !currentMetric.get().endsWith("_total")) {
- metricName = metricName.substring(0, metricName.indexOf("_total"));
- } else if (metricName.endsWith("_created") && !currentMetric.get().endsWith("_created")) {
- metricName = metricName.substring(0, metricName.indexOf("_created"));
- }
-
- if (!metricName.equals(currentMetric.get())) {
- System.out.println(metricsStr);
- fail("Metric not grouped under its type definition: " + line);
- }
-
- }
- });
-
- p1.close();
- p2.close();
- }
-
/**
* Hacky parsing of Prometheus text format. Should be good enough for unit tests
*/
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricStreamsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricStreamsTest.java
deleted file mode 100644
index 15c29a0dc66..00000000000
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricStreamsTest.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker.stats.prometheus;
-
-import static org.testng.Assert.assertTrue;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
-import java.io.ByteArrayOutputStream;
-import java.nio.charset.StandardCharsets;
-import org.apache.pulsar.common.util.SimpleTextOutputStream;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-@Test(groups = "broker")
-public class PrometheusMetricStreamsTest {
-
- private PrometheusMetricStreams underTest;
-
- @BeforeMethod(alwaysRun = true)
- protected void setup() throws Exception {
- underTest = new PrometheusMetricStreams();
- }
-
- @AfterMethod(alwaysRun = true)
- protected void cleanup() throws Exception {
- underTest.releaseAll();
- }
-
- @Test
- public void canWriteSampleWithoutLabels() {
- underTest.writeSample("my-metric", 123);
-
- String actual = writeToString();
-
- assertTrue(actual.startsWith("# TYPE my-metric gauge"), "Gauge type line missing");
- assertTrue(actual.contains("my-metric{} 123"), "Metric line missing");
- }
-
- @Test
- public void canWriteSampleWithLabels() {
- underTest.writeSample("my-other-metric", 123, "cluster", "local");
- underTest.writeSample("my-other-metric", 456, "cluster", "local", "namespace", "my-ns");
-
- String actual = writeToString();
-
- assertTrue(actual.startsWith("# TYPE my-other-metric gauge"), "Gauge type line missing");
- assertTrue(actual.contains("my-other-metric{cluster=\"local\"} 123"), "Cluster metric line missing");
- assertTrue(actual.contains("my-other-metric{cluster=\"local\",namespace=\"my-ns\"} 456"),
- "Cluster and Namespace metric line missing");
- }
-
- private String writeToString() {
- ByteBuf buffer = ByteBufAllocator.DEFAULT.directBuffer();
- try {
- SimpleTextOutputStream stream = new SimpleTextOutputStream(buffer);
- underTest.flushAllToStream(stream);
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- int readIndex = buffer.readerIndex();
- int readableBytes = buffer.readableBytes();
- for (int i = 0; i < readableBytes; i++) {
- out.write(buffer.getByte(readIndex + i));
- }
- return out.toString();
- } finally {
- buffer.release();
- }
- }
-}
\ No newline at end of file
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/SimpleTextOutputStream.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/SimpleTextOutputStream.java
index dd78b4cfe58..9fc4b347c85 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/SimpleTextOutputStream.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/SimpleTextOutputStream.java
@@ -22,11 +22,12 @@ import io.netty.buffer.ByteBuf;
/**
* Format strings and numbers into a ByteBuf without any memory allocation.
+ *
*/
public class SimpleTextOutputStream {
private final ByteBuf buffer;
- private static final char[] hexChars = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e',
- 'f'};
+ private static final char[] hexChars = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e',
+ 'f' };
public SimpleTextOutputStream(ByteBuf buffer) {
this.buffer = buffer;
@@ -130,12 +131,4 @@ public class SimpleTextOutputStream {
write(r);
return this;
}
-
- public void write(ByteBuf byteBuf) {
- buffer.writeBytes(byteBuf);
- }
-
- public ByteBuf getBuffer() {
- return buffer;
- }
}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/PrometheusTextFormat.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/PrometheusTextFormat.java
index f5aa273f656..f7a205c7db0 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/PrometheusTextFormat.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/PrometheusTextFormat.java
@@ -37,11 +37,6 @@ public class PrometheusTextFormat {
*/
while (mfs.hasMoreElements()) {
Collector.MetricFamilySamples metricFamilySamples = mfs.nextElement();
- writer.write("# TYPE ");
- writer.write(metricFamilySamples.name);
- writer.write(' ');
- writer.write(metricFamilySamples.type.name().toLowerCase());
- writer.write('\n');
for (Collector.MetricFamilySamples.Sample sample : metricFamilySamples.samples) {
writer.write(sample.name);
if (sample.labelNames.size() > 0) {
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerStatsManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerStatsManager.java
index 2ad407b2e5e..c8b411cbf57 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerStatsManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerStatsManager.java
@@ -328,11 +328,6 @@ public class WorkerStatsManager {
}
private void writeMetric(String metricName, long value, StringWriter stream) {
- stream.write("# TYPE ");
- stream.write(PULSAR_FUNCTION_WORKER_METRICS_PREFIX);
- stream.write(metricName);
- stream.write(" gauge \n");
-
stream.write(PULSAR_FUNCTION_WORKER_METRICS_PREFIX);
stream.write(metricName);
stream.write("{");