You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/06/13 02:42:36 UTC
[pulsar] 02/04: non-persistent topic metrics (#13827)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 99cc4a96475552ce14be8338005a45bedda38b55
Author: gaozhangmin <ga...@qq.com>
AuthorDate: Tue Feb 22 09:56:30 2022 +0800
non-persistent topic metrics (#13827)
### Motivation
Non-persistent topic doesn't have subscription metrics
### Modifications
Expose a new non-persistent subscription metric: `pulsar_subscription_msg_drop_rate`
(cherry picked from commit d548bc4ef3b785325b43c737f347ac702a840602)
---
.../prometheus/AggregatedSubscriptionStats.java | 2 +
.../stats/prometheus/NamespaceStatsAggregator.java | 81 +++++++++++++---------
.../pulsar/broker/stats/prometheus/TopicStats.java | 2 +
.../pulsar/broker/stats/PrometheusMetricsTest.java | 40 ++++++++++-
4 files changed, 93 insertions(+), 32 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
index fb74daf419f..c829be28e59 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
@@ -58,5 +58,7 @@ public class AggregatedSubscriptionStats {
long totalMsgExpired;
+ double msgDropRate;
+
public Map<Consumer, AggregatedConsumerStats> consumerStat = new HashMap<>();
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
index 7e36c5612d0..a3a0fcda445 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
@@ -31,7 +31,10 @@ import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
+import org.apache.pulsar.common.policies.data.stats.NonPersistentSubscriptionStatsImpl;
+import org.apache.pulsar.common.policies.data.stats.NonPersistentTopicStatsImpl;
import org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl;
+import org.apache.pulsar.common.policies.data.stats.SubscriptionStatsImpl;
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
import org.apache.pulsar.common.util.SimpleTextOutputStream;
import org.apache.pulsar.compaction.CompactedTopicContext;
@@ -109,6 +112,37 @@ public class NamespaceStatsAggregator {
return Optional.ofNullable(compactor).map(c -> c.getStats());
}
+ private static void aggregateTopicStats(TopicStats stats, SubscriptionStatsImpl subscriptionStats,
+ AggregatedSubscriptionStats subsStats) {
+ stats.subscriptionsCount++;
+ stats.msgBacklog += subscriptionStats.msgBacklog;
+ subsStats.msgBacklog = subscriptionStats.msgBacklog;
+ subsStats.msgDelayed = subscriptionStats.msgDelayed;
+ subsStats.msgRateExpired = subscriptionStats.msgRateExpired;
+ subsStats.totalMsgExpired = subscriptionStats.totalMsgExpired;
+ subsStats.msgBacklogNoDelayed = subsStats.msgBacklog - subsStats.msgDelayed;
+ subsStats.lastExpireTimestamp = subscriptionStats.lastExpireTimestamp;
+ subsStats.lastAckedTimestamp = subscriptionStats.lastAckedTimestamp;
+ subsStats.lastConsumedFlowTimestamp = subscriptionStats.lastConsumedFlowTimestamp;
+ subsStats.lastConsumedTimestamp = subscriptionStats.lastConsumedTimestamp;
+ subsStats.lastMarkDeleteAdvancedTimestamp = subscriptionStats.lastMarkDeleteAdvancedTimestamp;
+ subscriptionStats.consumers.forEach(cStats -> {
+ stats.consumersCount++;
+ subsStats.unackedMessages += cStats.unackedMessages;
+ subsStats.msgRateRedeliver += cStats.msgRateRedeliver;
+ subsStats.msgRateOut += cStats.msgRateOut;
+ subsStats.msgThroughputOut += cStats.msgThroughputOut;
+ subsStats.bytesOutCounter += cStats.bytesOutCounter;
+ subsStats.msgOutCounter += cStats.msgOutCounter;
+ if (!subsStats.blockedSubscriptionOnUnackedMsgs && cStats.blockedConsumerOnUnackedMsgs) {
+ subsStats.blockedSubscriptionOnUnackedMsgs = true;
+ }
+ });
+ stats.rateOut += subsStats.msgRateOut;
+ stats.throughputOut += subsStats.msgThroughputOut;
+
+ }
+
private static void getTopicStats(Topic topic, TopicStats stats, boolean includeConsumerMetrics,
boolean includeProducerMetrics, boolean getPreciseBacklog, boolean subscriptionBacklogSize,
Optional<CompactorMXBean> compactorMXBean) {
@@ -141,7 +175,6 @@ public class NamespaceStatsAggregator {
stats.managedLedgerStats.storageWriteRate = mlStats.getAddEntryMessagesRate();
stats.managedLedgerStats.storageReadRate = mlStats.getReadEntriesRate();
}
-
TopicStatsImpl tStatus = topic.getStats(getPreciseBacklog, subscriptionBacklogSize, false);
stats.msgInCounter = tStatus.msgInCounter;
stats.bytesInCounter = tStatus.bytesInCounter;
@@ -175,37 +208,23 @@ public class NamespaceStatsAggregator {
}
});
- tStatus.subscriptions.forEach((subName, subscriptionStats) -> {
- stats.subscriptionsCount++;
- stats.msgBacklog += subscriptionStats.msgBacklog;
-
- AggregatedSubscriptionStats subsStats = stats.subscriptionStats
- .computeIfAbsent(subName, k -> new AggregatedSubscriptionStats());
- subsStats.msgBacklog = subscriptionStats.msgBacklog;
- subsStats.msgDelayed = subscriptionStats.msgDelayed;
- subsStats.msgRateExpired = subscriptionStats.msgRateExpired;
- subsStats.totalMsgExpired = subscriptionStats.totalMsgExpired;
- subsStats.msgBacklogNoDelayed = subsStats.msgBacklog - subsStats.msgDelayed;
- subsStats.lastExpireTimestamp = subscriptionStats.lastExpireTimestamp;
- subsStats.lastAckedTimestamp = subscriptionStats.lastAckedTimestamp;
- subsStats.lastConsumedFlowTimestamp = subscriptionStats.lastConsumedFlowTimestamp;
- subsStats.lastConsumedTimestamp = subscriptionStats.lastConsumedTimestamp;
- subsStats.lastMarkDeleteAdvancedTimestamp = subscriptionStats.lastMarkDeleteAdvancedTimestamp;
- subscriptionStats.consumers.forEach(cStats -> {
- stats.consumersCount++;
- subsStats.unackedMessages += cStats.unackedMessages;
- subsStats.msgRateRedeliver += cStats.msgRateRedeliver;
- subsStats.msgRateOut += cStats.msgRateOut;
- subsStats.msgThroughputOut += cStats.msgThroughputOut;
- subsStats.bytesOutCounter += cStats.bytesOutCounter;
- subsStats.msgOutCounter += cStats.msgOutCounter;
- if (!subsStats.blockedSubscriptionOnUnackedMsgs && cStats.blockedConsumerOnUnackedMsgs) {
- subsStats.blockedSubscriptionOnUnackedMsgs = true;
- }
+ if (topic instanceof PersistentTopic) {
+ tStatus.subscriptions.forEach((subName, subscriptionStats) -> {
+ AggregatedSubscriptionStats subsStats = stats.subscriptionStats
+ .computeIfAbsent(subName, k -> new AggregatedSubscriptionStats());
+ aggregateTopicStats(stats, subscriptionStats, subsStats);
});
- stats.rateOut += subsStats.msgRateOut;
- stats.throughputOut += subsStats.msgThroughputOut;
- });
+ } else {
+ ((NonPersistentTopicStatsImpl) tStatus).getNonPersistentSubscriptions()
+ .forEach((subName, nonPersistentSubscriptionStats) -> {
+ NonPersistentSubscriptionStatsImpl subscriptionStats =
+ (NonPersistentSubscriptionStatsImpl) nonPersistentSubscriptionStats;
+ AggregatedSubscriptionStats subsStats = stats.subscriptionStats
+ .computeIfAbsent(subName, k -> new AggregatedSubscriptionStats());
+ aggregateTopicStats(stats, subscriptionStats, subsStats);
+ subsStats.msgDropRate += subscriptionStats.getMsgDropRate();
+ });
+ }
// Consumer stats can be a lot if a subscription has many consumers
if (includeConsumerMetrics) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
index f0556e8da90..35dbdba274a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
@@ -266,6 +266,8 @@ class TopicStats {
subsStats.msgRateExpired, splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_total_msg_expired",
subsStats.totalMsgExpired, splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_drop_rate",
+ subsStats.msgDropRate, splitTopicAndPartitionIndexLabel);
subsStats.consumerStat.forEach((c, consumerStats) -> {
metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(),
"pulsar_consumer_msg_rate_redeliver", consumerStats.msgRateRedeliver,
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index 3f64619a983..c4823c8b0e7 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -455,7 +455,6 @@ public class PrometheusMetricsTest extends BrokerTestBase {
.subscribe();
final int messages = 10;
-
for (int i = 0; i < messages; i++) {
String message = "my-message-" + i;
p1.send(message.getBytes());
@@ -491,6 +490,45 @@ public class PrometheusMetricsTest extends BrokerTestBase {
assertTrue(metrics.containsKey("pulsar_lb_bundles_split_count"));
}
+ @Test
+ public void testNonPersistentSubMetrics() throws Exception {
+ Producer<byte[]> p1 =
+ pulsarClient.newProducer().topic("non-persistent://my-property/use/my-ns/my-topic1").create();
+
+ Consumer<byte[]> c1 = pulsarClient.newConsumer()
+ .topic("non-persistent://my-property/use/my-ns/my-topic1")
+ .subscriptionName("test")
+ .subscribe();
+
+ final int messages = 100;
+
+ for (int i = 0; i < messages; i++) {
+ String message = "my-message-" + i;
+ p1.send(message.getBytes());
+ }
+
+ for (int i = 0; i < messages; i++) {
+ c1.acknowledge(c1.receive());
+ }
+
+ ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
+ PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut);
+ String metricsStr = statsOut.toString();
+ Multimap<String, Metric> metrics = parseMetrics(metricsStr);
+ assertTrue(metrics.containsKey("pulsar_subscription_back_log"));
+ assertTrue(metrics.containsKey("pulsar_subscription_back_log_no_delayed"));
+ assertTrue(metrics.containsKey("pulsar_subscription_msg_throughput_out"));
+ assertTrue(metrics.containsKey("pulsar_throughput_out"));
+ assertTrue(metrics.containsKey("pulsar_subscription_msg_rate_redeliver"));
+ assertTrue(metrics.containsKey("pulsar_subscription_unacked_messages"));
+ assertTrue(metrics.containsKey("pulsar_subscription_blocked_on_unacked_messages"));
+ assertTrue(metrics.containsKey("pulsar_subscription_msg_rate_out"));
+ assertTrue(metrics.containsKey("pulsar_out_bytes_total"));
+ assertTrue(metrics.containsKey("pulsar_out_messages_total"));
+ assertTrue(metrics.containsKey("pulsar_subscription_last_expire_timestamp"));
+ assertTrue(metrics.containsKey("pulsar_subscription_msg_drop_rate"));
+ }
+
@Test
public void testPerNamespaceStats() throws Exception {
Producer<byte[]> p1 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();