You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/01/06 06:19:10 UTC

[pulsar] 01/03: Export Prometheus metric for messageTTL (#8871)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 63b3741fe51e650da3095a62fd1f0f10edae0338
Author: feynmanlin <fe...@tencent.com>
AuthorDate: Sat Dec 12 02:12:11 2020 +0800

    Export Prometheus metric for messageTTL (#8871)
    
    Fixes #8573
    
    Some users who want to know how many messages are expired at what time? Currently, these metrics are too few, so that TTL looks like a black box, unobservable
    
    add  `totalMsgExpired`、`lastExpireTimestamp`、`msgRateExpired` for Prometheus metric
    
    PrometheusMetricsTest.java
    
    (cherry picked from commit 060e35b587beaf74484ecc58fe7e0b91d4cf630e)
---
 .../persistent/PersistentMessageExpiryMonitor.java |   9 +-
 .../service/persistent/PersistentSubscription.java |   1 +
 .../prometheus/AggregatedSubscriptionStats.java    |   6 ++
 .../stats/prometheus/NamespaceStatsAggregator.java |   3 +
 .../pulsar/broker/stats/prometheus/TopicStats.java |  36 +++++--
 .../pulsar/broker/stats/PrometheusMetricsTest.java | 104 +++++++++++++++++++++
 .../common/policies/data/SubscriptionStats.java    |   5 +
 7 files changed, 153 insertions(+), 11 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
index 0380c32..315a0a3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
@@ -20,7 +20,7 @@ package org.apache.pulsar.broker.service.persistent;
 
 import java.util.Optional;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-
+import java.util.concurrent.atomic.LongAdder;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.FindEntryCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
 import org.apache.bookkeeper.mledger.ManagedLedgerException.NonRecoverableLedgerException;
@@ -40,6 +40,7 @@ public class PersistentMessageExpiryMonitor implements FindEntryCallback {
     private final String subName;
     private final String topicName;
     private final Rate msgExpired;
+    private final LongAdder totalMsgExpired;
     private final boolean autoSkipNonRecoverableData;
     private final PersistentSubscription subscription;
 
@@ -56,6 +57,7 @@ public class PersistentMessageExpiryMonitor implements FindEntryCallback {
         this.subName = subscriptionName;
         this.subscription = subscription;
         this.msgExpired = new Rate();
+        this.totalMsgExpired = new LongAdder();
         // check to avoid test failures
         this.autoSkipNonRecoverableData = this.cursor.getManagedLedger() != null
                 && this.cursor.getManagedLedger().getConfig().isAutoSkipNonRecoverableData();
@@ -97,6 +99,10 @@ public class PersistentMessageExpiryMonitor implements FindEntryCallback {
         return msgExpired.getRate();
     }
 
+    public long getTotalMessageExpired() {
+        return totalMsgExpired.sum();
+    }
+
     private static final Logger log = LoggerFactory.getLogger(PersistentMessageExpiryMonitor.class);
 
     private final MarkDeleteCallback markDeleteCallback = new MarkDeleteCallback() {
@@ -104,6 +110,7 @@ public class PersistentMessageExpiryMonitor implements FindEntryCallback {
         public void markDeleteComplete(Object ctx) {
             long numMessagesExpired = (long) ctx - cursor.getNumberOfEntriesInBacklog(false);
             msgExpired.recordMultipleEvents(numMessagesExpired, 0 /* no value stats */);
+            totalMsgExpired.add(numMessagesExpired);
             updateRates();
             // If the subscription is a Key_Shared subscription, we should to trigger message dispatch.
             if (subscription != null && subscription.getType() == PulsarApi.CommandSubscribe.SubType.Key_Shared) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 9958e7d..8f09143 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -1025,6 +1025,7 @@ public class PersistentSubscription implements Subscription {
         subStats.msgBacklog = getNumberOfEntriesInBacklog(getPreciseBacklog);
         subStats.msgBacklogNoDelayed = subStats.msgBacklog - subStats.msgDelayed;
         subStats.msgRateExpired = expiryMonitor.getMessageExpiryRate();
+        subStats.totalMsgExpired = expiryMonitor.getTotalMessageExpired();
         subStats.isReplicated = isReplicated();
         subStats.isDurable = cursor.isDurable();
         if (getType() == SubType.Key_Shared && dispatcher instanceof PersistentStickyKeyDispatcherMultipleConsumers) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
index 1f1e879..f3573f7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
@@ -45,5 +45,11 @@ public class AggregatedSubscriptionStats {
 
     long bytesOutCounter;
 
+    long lastExpireTimestamp;
+
+    double msgRateExpired;
+
+    long totalMsgExpired;
+
     public Map<Consumer, AggregatedConsumerStats> consumerStat = new HashMap<>();
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
index dc9e0e3..fa46114 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
@@ -135,6 +135,9 @@ public class NamespaceStatsAggregator {
                     .computeIfAbsent(subName, k -> new AggregatedSubscriptionStats());
             subsStats.msgBacklog = subscriptionStats.msgBacklog;
             subsStats.msgDelayed = subscriptionStats.msgDelayed;
+            subsStats.lastExpireTimestamp = subscriptionStats.lastExpireTimestamp;
+            subsStats.msgRateExpired = subscriptionStats.msgRateExpired;
+            subsStats.totalMsgExpired = subscriptionStats.totalMsgExpired;
             subsStats.msgBacklogNoDelayed = subsStats.msgBacklog - subsStats.msgDelayed;
             subscriptionStats.consumers.forEach(cStats -> {
                 stats.consumersCount++;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
index 0510d34..5e4d178 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
@@ -137,16 +137,32 @@ class TopicStats {
         metric(stream, cluster, namespace, topic, "pulsar_entry_size_sum", stats.entrySizeBuckets.getSum());
 
         stats.subscriptionStats.forEach((n, subsStats) -> {
-            metric(stream, cluster, namespace, topic, n, "pulsar_subscription_back_log", subsStats.msgBacklog);
-            metric(stream, cluster, namespace, topic, n, "pulsar_subscription_back_log_no_delayed", subsStats.msgBacklogNoDelayed);
-            metric(stream, cluster, namespace, topic, n, "pulsar_subscription_delayed", subsStats.msgDelayed);
-            metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_rate_redeliver", subsStats.msgRateRedeliver);
-            metric(stream, cluster, namespace, topic, n, "pulsar_subscription_unacked_messages", subsStats.unackedMessages);
-            metric(stream, cluster, namespace, topic, n, "pulsar_subscription_blocked_on_unacked_messages", subsStats.blockedSubscriptionOnUnackedMsgs ? 1 : 0);
-            metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_rate_out", subsStats.msgRateOut);
-            metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_throughput_out", subsStats.msgThroughputOut);
-            metric(stream, cluster, namespace, topic, n, "pulsar_out_bytes_total", subsStats.bytesOutCounter);
-            metric(stream, cluster, namespace, topic, n, "pulsar_out_messages_total", subsStats.msgOutCounter);
+            metric(stream, cluster, namespace, topic, n, "pulsar_subscription_back_log",
+                    subsStats.msgBacklog);
+            metric(stream, cluster, namespace, topic, n, "pulsar_subscription_back_log_no_delayed",
+                    subsStats.msgBacklogNoDelayed);
+            metric(stream, cluster, namespace, topic, n, "pulsar_subscription_delayed",
+                    subsStats.msgDelayed);
+            metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_rate_redeliver",
+                    subsStats.msgRateRedeliver);
+            metric(stream, cluster, namespace, topic, n, "pulsar_subscription_unacked_messages",
+                    subsStats.unackedMessages);
+            metric(stream, cluster, namespace, topic, n, "pulsar_subscription_blocked_on_unacked_messages",
+                    subsStats.blockedSubscriptionOnUnackedMsgs ? 1 : 0);
+            metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_rate_out",
+                    subsStats.msgRateOut);
+            metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_throughput_out",
+                    subsStats.msgThroughputOut);
+            metric(stream, cluster, namespace, topic, n, "pulsar_out_bytes_total",
+                    subsStats.bytesOutCounter);
+            metric(stream, cluster, namespace, topic, n, "pulsar_out_messages_total",
+                    subsStats.msgOutCounter);
+            metric(stream, cluster, namespace, topic, n, "pulsar_subscription_last_expire_timestamp",
+                    subsStats.lastExpireTimestamp);
+            metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_rate_expired",
+                    subsStats.msgRateExpired);
+            metric(stream, cluster, namespace, topic, n, "pulsar_subscription_total_msg_expired",
+                    subsStats.totalMsgExpired);
             subsStats.consumerStat.forEach((c, consumerStats) -> {
                 metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(), "pulsar_consumer_msg_rate_redeliver", consumerStats.msgRateRedeliver);
                 metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(), "pulsar_consumer_unacked_messages", consumerStats.unackedMessages);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index c6200009..33fe94c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -25,19 +25,29 @@ import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
 import java.io.ByteArrayOutputStream;
+import java.lang.reflect.Field;
+import java.math.RoundingMode;
+import java.text.NumberFormat;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.HashMap;
 import java.util.TreeMap;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Producer;
+import org.awaitility.Awaitility;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -186,6 +196,100 @@ public class PrometheusMetricsTest extends BrokerTestBase {
     }
 
     @Test
+    public void testPerTopicExpiredStat() throws Exception {
+        String ns = "prop/ns-abc1";
+        admin.namespaces().createNamespace(ns);
+        String topic1 = "persistent://" + ns + "/testPerTopicExpiredStat1";
+        String topic2 = "persistent://" + ns + "/testPerTopicExpiredStat2";
+        List<String> topicList = Arrays.asList(topic2,topic1);
+        Producer<byte[]> p1 = pulsarClient.newProducer().topic(topic1).create();
+        Producer<byte[]> p2 = pulsarClient.newProducer().topic(topic2).create();
+        final String subName = "test";
+        for (String topic : topicList) {
+            pulsarClient.newConsumer()
+                    .topic(topic)
+                    .subscriptionName(subName)
+                    .subscribe().close();
+        }
+
+        final int messages = 10;
+
+        for (int i = 0; i < messages; i++) {
+            String message = "my-message-" + i;
+            p1.send(message.getBytes());
+            p2.send(message.getBytes());
+        }
+
+        p1.close();
+        p2.close();
+        // Let the message expire
+        for (String topic : topicList) {
+            PersistentTopic persistentTopic = (PersistentTopic)pulsar.getBrokerService().getTopicIfExists(topic).get().get();
+            persistentTopic.getBrokerService().getPulsar().getConfiguration().setTtlDurationDefaultInSeconds(-1);
+        }
+        pulsar.getBrokerService().forEachTopic(Topic::checkMessageExpiry);
+        //wait for checkMessageExpiry
+        PersistentSubscription sub = (PersistentSubscription)
+                pulsar.getBrokerService().getTopicIfExists(topic1).get().get().getSubscription(subName);
+        PersistentSubscription sub2 = (PersistentSubscription)
+                pulsar.getBrokerService().getTopicIfExists(topic2).get().get().getSubscription(subName);
+        Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() -> sub.getExpiredMessageRate() != 0.0);
+        Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() -> sub2.getExpiredMessageRate() != 0.0);
+
+        ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
+        PrometheusMetricsGenerator.generate(pulsar, true, false, statsOut);
+        String metricsStr = new String(statsOut.toByteArray());
+        Multimap<String, Metric> metrics = parseMetrics(metricsStr);
+        // There should be 2 metrics with different tags for each topic
+        List<Metric> cm = (List<Metric>) metrics.get("pulsar_subscription_last_expire_timestamp");
+        assertEquals(cm.size(), 2);
+        assertEquals(cm.get(0).tags.get("topic"), topic2);
+        assertEquals(cm.get(0).tags.get("namespace"), ns);
+        assertEquals(cm.get(1).tags.get("topic"), topic1);
+        assertEquals(cm.get(1).tags.get("namespace"), ns);
+
+        //check value
+        Field field = PersistentSubscription.class.getDeclaredField("lastExpireTimestamp");
+        field.setAccessible(true);
+        for (int i = 0; i < topicList.size(); i++) {
+            PersistentSubscription subscription = (PersistentSubscription) pulsar.getBrokerService()
+                    .getTopicIfExists(topicList.get(i)).get().get().getSubscription(subName);
+            assertEquals((long) field.get(subscription), (long) cm.get(i).value);
+        }
+
+        cm = (List<Metric>) metrics.get("pulsar_subscription_msg_rate_expired");
+        assertEquals(cm.size(), 2);
+        assertEquals(cm.get(0).tags.get("topic"), topic2);
+        assertEquals(cm.get(0).tags.get("namespace"), ns);
+        assertEquals(cm.get(1).tags.get("topic"), topic1);
+        assertEquals(cm.get(1).tags.get("namespace"), ns);
+        //check value
+        field = PersistentSubscription.class.getDeclaredField("expiryMonitor");
+        field.setAccessible(true);
+        NumberFormat nf = NumberFormat.getNumberInstance();
+        nf.setMaximumFractionDigits(3);
+        nf.setRoundingMode(RoundingMode.DOWN);
+        for (int i = 0; i < topicList.size(); i++) {
+            PersistentSubscription subscription = (PersistentSubscription) pulsar.getBrokerService()
+                    .getTopicIfExists(topicList.get(i)).get().get().getSubscription(subName);
+            PersistentMessageExpiryMonitor monitor = (PersistentMessageExpiryMonitor) field.get(subscription);
+            assertEquals(Double.valueOf(nf.format(monitor.getMessageExpiryRate())).doubleValue(), cm.get(i).value);
+        }
+
+        cm = (List<Metric>) metrics.get("pulsar_subscription_total_msg_expired");
+        assertEquals(cm.size(), 2);
+        assertEquals(cm.get(0).tags.get("topic"), topic2);
+        assertEquals(cm.get(0).tags.get("namespace"), ns);
+        assertEquals(cm.get(1).tags.get("topic"), topic1);
+        assertEquals(cm.get(1).tags.get("namespace"), ns);
+        //check value
+        for (int i = 0; i < topicList.size(); i++) {
+            assertEquals(messages, (long)cm.get(i).value);
+        }
+
+    }
+
+    @Test
     public void testPerNamespaceStats() throws Exception {
         Producer<byte[]> p1 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();
         Producer<byte[]> p2 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic2").create();
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
index c3a09ab..12c5767 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
@@ -74,6 +74,9 @@ public class SubscriptionStats {
     /** Total rate of messages expired on this subscription (msg/s). */
     public double msgRateExpired;
 
+    /** Total messages expired on this subscription. */
+    public long totalMsgExpired;
+
     /** Last message expire execution timestamp. */
     public long lastExpireTimestamp;
 
@@ -119,6 +122,7 @@ public class SubscriptionStats {
         msgBacklogNoDelayed = 0;
         unackedMessages = 0;
         msgRateExpired = 0;
+        totalMsgExpired = 0;
         lastExpireTimestamp = 0L;
         consumers.clear();
         consumersAfterMarkDeletePosition.clear();
@@ -139,6 +143,7 @@ public class SubscriptionStats {
         this.msgBacklogNoDelayed += stats.msgBacklogNoDelayed;
         this.unackedMessages += stats.unackedMessages;
         this.msgRateExpired += stats.msgRateExpired;
+        this.totalMsgExpired += stats.totalMsgExpired;
         this.isReplicated |= stats.isReplicated;
         this.isDurable |= stats.isDurable;
         if (this.consumers.size() != stats.consumers.size()) {