You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/06/13 02:42:36 UTC

[pulsar] 02/04: non-persistent topic metrics (#13827)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 99cc4a96475552ce14be8338005a45bedda38b55
Author: gaozhangmin <ga...@qq.com>
AuthorDate: Tue Feb 22 09:56:30 2022 +0800

    non-persistent topic metrics (#13827)
    
    ### Motivation
    Non-persistent topic doesn't have subscription metrics
    
    ### Modifications
    
    Expose a new non-persistent subscription metric: `pulsar_subscription_msg_drop_rate`
    
    (cherry picked from commit d548bc4ef3b785325b43c737f347ac702a840602)
---
 .../prometheus/AggregatedSubscriptionStats.java    |  2 +
 .../stats/prometheus/NamespaceStatsAggregator.java | 81 +++++++++++++---------
 .../pulsar/broker/stats/prometheus/TopicStats.java |  2 +
 .../pulsar/broker/stats/PrometheusMetricsTest.java | 40 ++++++++++-
 4 files changed, 93 insertions(+), 32 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
index fb74daf419f..c829be28e59 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
@@ -58,5 +58,7 @@ public class AggregatedSubscriptionStats {
 
     long totalMsgExpired;
 
+    double msgDropRate;
+
     public Map<Consumer, AggregatedConsumerStats> consumerStat = new HashMap<>();
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
index 7e36c5612d0..a3a0fcda445 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
@@ -31,7 +31,10 @@ import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
+import org.apache.pulsar.common.policies.data.stats.NonPersistentSubscriptionStatsImpl;
+import org.apache.pulsar.common.policies.data.stats.NonPersistentTopicStatsImpl;
 import org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl;
+import org.apache.pulsar.common.policies.data.stats.SubscriptionStatsImpl;
 import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
 import org.apache.pulsar.common.util.SimpleTextOutputStream;
 import org.apache.pulsar.compaction.CompactedTopicContext;
@@ -109,6 +112,37 @@ public class NamespaceStatsAggregator {
         return Optional.ofNullable(compactor).map(c -> c.getStats());
     }
 
+    private static void aggregateTopicStats(TopicStats stats, SubscriptionStatsImpl subscriptionStats,
+                                            AggregatedSubscriptionStats subsStats) {
+        stats.subscriptionsCount++;
+        stats.msgBacklog += subscriptionStats.msgBacklog;
+        subsStats.msgBacklog = subscriptionStats.msgBacklog;
+        subsStats.msgDelayed = subscriptionStats.msgDelayed;
+        subsStats.msgRateExpired = subscriptionStats.msgRateExpired;
+        subsStats.totalMsgExpired = subscriptionStats.totalMsgExpired;
+        subsStats.msgBacklogNoDelayed = subsStats.msgBacklog - subsStats.msgDelayed;
+        subsStats.lastExpireTimestamp = subscriptionStats.lastExpireTimestamp;
+        subsStats.lastAckedTimestamp = subscriptionStats.lastAckedTimestamp;
+        subsStats.lastConsumedFlowTimestamp = subscriptionStats.lastConsumedFlowTimestamp;
+        subsStats.lastConsumedTimestamp = subscriptionStats.lastConsumedTimestamp;
+        subsStats.lastMarkDeleteAdvancedTimestamp = subscriptionStats.lastMarkDeleteAdvancedTimestamp;
+        subscriptionStats.consumers.forEach(cStats -> {
+            stats.consumersCount++;
+            subsStats.unackedMessages += cStats.unackedMessages;
+            subsStats.msgRateRedeliver += cStats.msgRateRedeliver;
+            subsStats.msgRateOut += cStats.msgRateOut;
+            subsStats.msgThroughputOut += cStats.msgThroughputOut;
+            subsStats.bytesOutCounter += cStats.bytesOutCounter;
+            subsStats.msgOutCounter += cStats.msgOutCounter;
+            if (!subsStats.blockedSubscriptionOnUnackedMsgs && cStats.blockedConsumerOnUnackedMsgs) {
+                subsStats.blockedSubscriptionOnUnackedMsgs = true;
+            }
+        });
+        stats.rateOut += subsStats.msgRateOut;
+        stats.throughputOut += subsStats.msgThroughputOut;
+
+    }
+
     private static void getTopicStats(Topic topic, TopicStats stats, boolean includeConsumerMetrics,
             boolean includeProducerMetrics, boolean getPreciseBacklog, boolean subscriptionBacklogSize,
                                       Optional<CompactorMXBean> compactorMXBean) {
@@ -141,7 +175,6 @@ public class NamespaceStatsAggregator {
             stats.managedLedgerStats.storageWriteRate = mlStats.getAddEntryMessagesRate();
             stats.managedLedgerStats.storageReadRate = mlStats.getReadEntriesRate();
         }
-
         TopicStatsImpl tStatus = topic.getStats(getPreciseBacklog, subscriptionBacklogSize, false);
         stats.msgInCounter = tStatus.msgInCounter;
         stats.bytesInCounter = tStatus.bytesInCounter;
@@ -175,37 +208,23 @@ public class NamespaceStatsAggregator {
             }
         });
 
-        tStatus.subscriptions.forEach((subName, subscriptionStats) -> {
-            stats.subscriptionsCount++;
-            stats.msgBacklog += subscriptionStats.msgBacklog;
-
-            AggregatedSubscriptionStats subsStats = stats.subscriptionStats
-                    .computeIfAbsent(subName, k -> new AggregatedSubscriptionStats());
-            subsStats.msgBacklog = subscriptionStats.msgBacklog;
-            subsStats.msgDelayed = subscriptionStats.msgDelayed;
-            subsStats.msgRateExpired = subscriptionStats.msgRateExpired;
-            subsStats.totalMsgExpired = subscriptionStats.totalMsgExpired;
-            subsStats.msgBacklogNoDelayed = subsStats.msgBacklog - subsStats.msgDelayed;
-            subsStats.lastExpireTimestamp = subscriptionStats.lastExpireTimestamp;
-            subsStats.lastAckedTimestamp = subscriptionStats.lastAckedTimestamp;
-            subsStats.lastConsumedFlowTimestamp = subscriptionStats.lastConsumedFlowTimestamp;
-            subsStats.lastConsumedTimestamp = subscriptionStats.lastConsumedTimestamp;
-            subsStats.lastMarkDeleteAdvancedTimestamp = subscriptionStats.lastMarkDeleteAdvancedTimestamp;
-            subscriptionStats.consumers.forEach(cStats -> {
-                stats.consumersCount++;
-                subsStats.unackedMessages += cStats.unackedMessages;
-                subsStats.msgRateRedeliver += cStats.msgRateRedeliver;
-                subsStats.msgRateOut += cStats.msgRateOut;
-                subsStats.msgThroughputOut += cStats.msgThroughputOut;
-                subsStats.bytesOutCounter += cStats.bytesOutCounter;
-                subsStats.msgOutCounter += cStats.msgOutCounter;
-                if (!subsStats.blockedSubscriptionOnUnackedMsgs && cStats.blockedConsumerOnUnackedMsgs) {
-                    subsStats.blockedSubscriptionOnUnackedMsgs = true;
-                }
+        if (topic instanceof PersistentTopic) {
+            tStatus.subscriptions.forEach((subName, subscriptionStats) -> {
+                AggregatedSubscriptionStats subsStats = stats.subscriptionStats
+                        .computeIfAbsent(subName, k -> new AggregatedSubscriptionStats());
+                aggregateTopicStats(stats, subscriptionStats, subsStats);
             });
-            stats.rateOut += subsStats.msgRateOut;
-            stats.throughputOut += subsStats.msgThroughputOut;
-        });
+        } else {
+            ((NonPersistentTopicStatsImpl) tStatus).getNonPersistentSubscriptions()
+                    .forEach((subName, nonPersistentSubscriptionStats) -> {
+                NonPersistentSubscriptionStatsImpl subscriptionStats =
+                                (NonPersistentSubscriptionStatsImpl) nonPersistentSubscriptionStats;
+                AggregatedSubscriptionStats subsStats = stats.subscriptionStats
+                        .computeIfAbsent(subName, k -> new AggregatedSubscriptionStats());
+                aggregateTopicStats(stats, subscriptionStats, subsStats);
+                subsStats.msgDropRate += subscriptionStats.getMsgDropRate();
+            });
+        }
 
         // Consumer stats can be a lot if a subscription has many consumers
         if (includeConsumerMetrics) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
index f0556e8da90..35dbdba274a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
@@ -266,6 +266,8 @@ class TopicStats {
                     subsStats.msgRateExpired, splitTopicAndPartitionIndexLabel);
             metric(stream, cluster, namespace, topic, n, "pulsar_subscription_total_msg_expired",
                     subsStats.totalMsgExpired, splitTopicAndPartitionIndexLabel);
+            metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_drop_rate",
+                    subsStats.msgDropRate, splitTopicAndPartitionIndexLabel);
             subsStats.consumerStat.forEach((c, consumerStats) -> {
                 metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(),
                         "pulsar_consumer_msg_rate_redeliver", consumerStats.msgRateRedeliver,
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index 3f64619a983..c4823c8b0e7 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -455,7 +455,6 @@ public class PrometheusMetricsTest extends BrokerTestBase {
                 .subscribe();
 
         final int messages = 10;
-
         for (int i = 0; i < messages; i++) {
             String message = "my-message-" + i;
             p1.send(message.getBytes());
@@ -491,6 +490,45 @@ public class PrometheusMetricsTest extends BrokerTestBase {
         assertTrue(metrics.containsKey("pulsar_lb_bundles_split_count"));
     }
 
+    @Test
+    public void testNonPersistentSubMetrics() throws Exception {
+        Producer<byte[]> p1 =
+                pulsarClient.newProducer().topic("non-persistent://my-property/use/my-ns/my-topic1").create();
+
+        Consumer<byte[]> c1 = pulsarClient.newConsumer()
+                .topic("non-persistent://my-property/use/my-ns/my-topic1")
+                .subscriptionName("test")
+                .subscribe();
+
+        final int messages = 100;
+
+        for (int i = 0; i < messages; i++) {
+            String message = "my-message-" + i;
+            p1.send(message.getBytes());
+        }
+
+        for (int i = 0; i < messages; i++) {
+            c1.acknowledge(c1.receive());
+        }
+
+        ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
+        PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut);
+        String metricsStr = statsOut.toString();
+        Multimap<String, Metric> metrics = parseMetrics(metricsStr);
+        assertTrue(metrics.containsKey("pulsar_subscription_back_log"));
+        assertTrue(metrics.containsKey("pulsar_subscription_back_log_no_delayed"));
+        assertTrue(metrics.containsKey("pulsar_subscription_msg_throughput_out"));
+        assertTrue(metrics.containsKey("pulsar_throughput_out"));
+        assertTrue(metrics.containsKey("pulsar_subscription_msg_rate_redeliver"));
+        assertTrue(metrics.containsKey("pulsar_subscription_unacked_messages"));
+        assertTrue(metrics.containsKey("pulsar_subscription_blocked_on_unacked_messages"));
+        assertTrue(metrics.containsKey("pulsar_subscription_msg_rate_out"));
+        assertTrue(metrics.containsKey("pulsar_out_bytes_total"));
+        assertTrue(metrics.containsKey("pulsar_out_messages_total"));
+        assertTrue(metrics.containsKey("pulsar_subscription_last_expire_timestamp"));
+        assertTrue(metrics.containsKey("pulsar_subscription_msg_drop_rate"));
+    }
+
     @Test
     public void testPerNamespaceStats() throws Exception {
         Producer<byte[]> p1 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();