You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/01/06 06:19:10 UTC
[pulsar] 01/03: Export Prometheus metric for messageTTL (#8871)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 63b3741fe51e650da3095a62fd1f0f10edae0338
Author: feynmanlin <fe...@tencent.com>
AuthorDate: Sat Dec 12 02:12:11 2020 +0800
Export Prometheus metric for messageTTL (#8871)
Fixes #8573
Some users who want to know how many messages are expired at what time? Currently, these metrics are too few, so that TTL looks like a black box, unobservable
add `totalMsgExpired`、`lastExpireTimestamp`、`msgRateExpired` for Prometheus metric
PrometheusMetricsTest.java
(cherry picked from commit 060e35b587beaf74484ecc58fe7e0b91d4cf630e)
---
.../persistent/PersistentMessageExpiryMonitor.java | 9 +-
.../service/persistent/PersistentSubscription.java | 1 +
.../prometheus/AggregatedSubscriptionStats.java | 6 ++
.../stats/prometheus/NamespaceStatsAggregator.java | 3 +
.../pulsar/broker/stats/prometheus/TopicStats.java | 36 +++++--
.../pulsar/broker/stats/PrometheusMetricsTest.java | 104 +++++++++++++++++++++
.../common/policies/data/SubscriptionStats.java | 5 +
7 files changed, 153 insertions(+), 11 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
index 0380c32..315a0a3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
@@ -20,7 +20,7 @@ package org.apache.pulsar.broker.service.persistent;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-
+import java.util.concurrent.atomic.LongAdder;
import org.apache.bookkeeper.mledger.AsyncCallbacks.FindEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
import org.apache.bookkeeper.mledger.ManagedLedgerException.NonRecoverableLedgerException;
@@ -40,6 +40,7 @@ public class PersistentMessageExpiryMonitor implements FindEntryCallback {
private final String subName;
private final String topicName;
private final Rate msgExpired;
+ private final LongAdder totalMsgExpired;
private final boolean autoSkipNonRecoverableData;
private final PersistentSubscription subscription;
@@ -56,6 +57,7 @@ public class PersistentMessageExpiryMonitor implements FindEntryCallback {
this.subName = subscriptionName;
this.subscription = subscription;
this.msgExpired = new Rate();
+ this.totalMsgExpired = new LongAdder();
// check to avoid test failures
this.autoSkipNonRecoverableData = this.cursor.getManagedLedger() != null
&& this.cursor.getManagedLedger().getConfig().isAutoSkipNonRecoverableData();
@@ -97,6 +99,10 @@ public class PersistentMessageExpiryMonitor implements FindEntryCallback {
return msgExpired.getRate();
}
+ public long getTotalMessageExpired() {
+ return totalMsgExpired.sum();
+ }
+
private static final Logger log = LoggerFactory.getLogger(PersistentMessageExpiryMonitor.class);
private final MarkDeleteCallback markDeleteCallback = new MarkDeleteCallback() {
@@ -104,6 +110,7 @@ public class PersistentMessageExpiryMonitor implements FindEntryCallback {
public void markDeleteComplete(Object ctx) {
long numMessagesExpired = (long) ctx - cursor.getNumberOfEntriesInBacklog(false);
msgExpired.recordMultipleEvents(numMessagesExpired, 0 /* no value stats */);
+ totalMsgExpired.add(numMessagesExpired);
updateRates();
// If the subscription is a Key_Shared subscription, we should to trigger message dispatch.
if (subscription != null && subscription.getType() == PulsarApi.CommandSubscribe.SubType.Key_Shared) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 9958e7d..8f09143 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -1025,6 +1025,7 @@ public class PersistentSubscription implements Subscription {
subStats.msgBacklog = getNumberOfEntriesInBacklog(getPreciseBacklog);
subStats.msgBacklogNoDelayed = subStats.msgBacklog - subStats.msgDelayed;
subStats.msgRateExpired = expiryMonitor.getMessageExpiryRate();
+ subStats.totalMsgExpired = expiryMonitor.getTotalMessageExpired();
subStats.isReplicated = isReplicated();
subStats.isDurable = cursor.isDurable();
if (getType() == SubType.Key_Shared && dispatcher instanceof PersistentStickyKeyDispatcherMultipleConsumers) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
index 1f1e879..f3573f7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
@@ -45,5 +45,11 @@ public class AggregatedSubscriptionStats {
long bytesOutCounter;
+ long lastExpireTimestamp;
+
+ double msgRateExpired;
+
+ long totalMsgExpired;
+
public Map<Consumer, AggregatedConsumerStats> consumerStat = new HashMap<>();
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
index dc9e0e3..fa46114 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
@@ -135,6 +135,9 @@ public class NamespaceStatsAggregator {
.computeIfAbsent(subName, k -> new AggregatedSubscriptionStats());
subsStats.msgBacklog = subscriptionStats.msgBacklog;
subsStats.msgDelayed = subscriptionStats.msgDelayed;
+ subsStats.lastExpireTimestamp = subscriptionStats.lastExpireTimestamp;
+ subsStats.msgRateExpired = subscriptionStats.msgRateExpired;
+ subsStats.totalMsgExpired = subscriptionStats.totalMsgExpired;
subsStats.msgBacklogNoDelayed = subsStats.msgBacklog - subsStats.msgDelayed;
subscriptionStats.consumers.forEach(cStats -> {
stats.consumersCount++;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
index 0510d34..5e4d178 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
@@ -137,16 +137,32 @@ class TopicStats {
metric(stream, cluster, namespace, topic, "pulsar_entry_size_sum", stats.entrySizeBuckets.getSum());
stats.subscriptionStats.forEach((n, subsStats) -> {
- metric(stream, cluster, namespace, topic, n, "pulsar_subscription_back_log", subsStats.msgBacklog);
- metric(stream, cluster, namespace, topic, n, "pulsar_subscription_back_log_no_delayed", subsStats.msgBacklogNoDelayed);
- metric(stream, cluster, namespace, topic, n, "pulsar_subscription_delayed", subsStats.msgDelayed);
- metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_rate_redeliver", subsStats.msgRateRedeliver);
- metric(stream, cluster, namespace, topic, n, "pulsar_subscription_unacked_messages", subsStats.unackedMessages);
- metric(stream, cluster, namespace, topic, n, "pulsar_subscription_blocked_on_unacked_messages", subsStats.blockedSubscriptionOnUnackedMsgs ? 1 : 0);
- metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_rate_out", subsStats.msgRateOut);
- metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_throughput_out", subsStats.msgThroughputOut);
- metric(stream, cluster, namespace, topic, n, "pulsar_out_bytes_total", subsStats.bytesOutCounter);
- metric(stream, cluster, namespace, topic, n, "pulsar_out_messages_total", subsStats.msgOutCounter);
+ metric(stream, cluster, namespace, topic, n, "pulsar_subscription_back_log",
+ subsStats.msgBacklog);
+ metric(stream, cluster, namespace, topic, n, "pulsar_subscription_back_log_no_delayed",
+ subsStats.msgBacklogNoDelayed);
+ metric(stream, cluster, namespace, topic, n, "pulsar_subscription_delayed",
+ subsStats.msgDelayed);
+ metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_rate_redeliver",
+ subsStats.msgRateRedeliver);
+ metric(stream, cluster, namespace, topic, n, "pulsar_subscription_unacked_messages",
+ subsStats.unackedMessages);
+ metric(stream, cluster, namespace, topic, n, "pulsar_subscription_blocked_on_unacked_messages",
+ subsStats.blockedSubscriptionOnUnackedMsgs ? 1 : 0);
+ metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_rate_out",
+ subsStats.msgRateOut);
+ metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_throughput_out",
+ subsStats.msgThroughputOut);
+ metric(stream, cluster, namespace, topic, n, "pulsar_out_bytes_total",
+ subsStats.bytesOutCounter);
+ metric(stream, cluster, namespace, topic, n, "pulsar_out_messages_total",
+ subsStats.msgOutCounter);
+ metric(stream, cluster, namespace, topic, n, "pulsar_subscription_last_expire_timestamp",
+ subsStats.lastExpireTimestamp);
+ metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_rate_expired",
+ subsStats.msgRateExpired);
+ metric(stream, cluster, namespace, topic, n, "pulsar_subscription_total_msg_expired",
+ subsStats.totalMsgExpired);
subsStats.consumerStat.forEach((c, consumerStats) -> {
metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(), "pulsar_consumer_msg_rate_redeliver", consumerStats.msgRateRedeliver);
metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(), "pulsar_consumer_unacked_messages", consumerStats.unackedMessages);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index c6200009..33fe94c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -25,19 +25,29 @@ import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.io.ByteArrayOutputStream;
+import java.lang.reflect.Field;
+import java.math.RoundingMode;
+import java.text.NumberFormat;
+import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import java.util.TreeMap;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
+import org.awaitility.Awaitility;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -186,6 +196,100 @@ public class PrometheusMetricsTest extends BrokerTestBase {
}
@Test
+ public void testPerTopicExpiredStat() throws Exception {
+ String ns = "prop/ns-abc1";
+ admin.namespaces().createNamespace(ns);
+ String topic1 = "persistent://" + ns + "/testPerTopicExpiredStat1";
+ String topic2 = "persistent://" + ns + "/testPerTopicExpiredStat2";
+ List<String> topicList = Arrays.asList(topic2,topic1);
+ Producer<byte[]> p1 = pulsarClient.newProducer().topic(topic1).create();
+ Producer<byte[]> p2 = pulsarClient.newProducer().topic(topic2).create();
+ final String subName = "test";
+ for (String topic : topicList) {
+ pulsarClient.newConsumer()
+ .topic(topic)
+ .subscriptionName(subName)
+ .subscribe().close();
+ }
+
+ final int messages = 10;
+
+ for (int i = 0; i < messages; i++) {
+ String message = "my-message-" + i;
+ p1.send(message.getBytes());
+ p2.send(message.getBytes());
+ }
+
+ p1.close();
+ p2.close();
+ // Let the message expire
+ for (String topic : topicList) {
+ PersistentTopic persistentTopic = (PersistentTopic)pulsar.getBrokerService().getTopicIfExists(topic).get().get();
+ persistentTopic.getBrokerService().getPulsar().getConfiguration().setTtlDurationDefaultInSeconds(-1);
+ }
+ pulsar.getBrokerService().forEachTopic(Topic::checkMessageExpiry);
+ //wait for checkMessageExpiry
+ PersistentSubscription sub = (PersistentSubscription)
+ pulsar.getBrokerService().getTopicIfExists(topic1).get().get().getSubscription(subName);
+ PersistentSubscription sub2 = (PersistentSubscription)
+ pulsar.getBrokerService().getTopicIfExists(topic2).get().get().getSubscription(subName);
+ Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() -> sub.getExpiredMessageRate() != 0.0);
+ Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() -> sub2.getExpiredMessageRate() != 0.0);
+
+ ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
+ PrometheusMetricsGenerator.generate(pulsar, true, false, statsOut);
+ String metricsStr = new String(statsOut.toByteArray());
+ Multimap<String, Metric> metrics = parseMetrics(metricsStr);
+ // There should be 2 metrics with different tags for each topic
+ List<Metric> cm = (List<Metric>) metrics.get("pulsar_subscription_last_expire_timestamp");
+ assertEquals(cm.size(), 2);
+ assertEquals(cm.get(0).tags.get("topic"), topic2);
+ assertEquals(cm.get(0).tags.get("namespace"), ns);
+ assertEquals(cm.get(1).tags.get("topic"), topic1);
+ assertEquals(cm.get(1).tags.get("namespace"), ns);
+
+ //check value
+ Field field = PersistentSubscription.class.getDeclaredField("lastExpireTimestamp");
+ field.setAccessible(true);
+ for (int i = 0; i < topicList.size(); i++) {
+ PersistentSubscription subscription = (PersistentSubscription) pulsar.getBrokerService()
+ .getTopicIfExists(topicList.get(i)).get().get().getSubscription(subName);
+ assertEquals((long) field.get(subscription), (long) cm.get(i).value);
+ }
+
+ cm = (List<Metric>) metrics.get("pulsar_subscription_msg_rate_expired");
+ assertEquals(cm.size(), 2);
+ assertEquals(cm.get(0).tags.get("topic"), topic2);
+ assertEquals(cm.get(0).tags.get("namespace"), ns);
+ assertEquals(cm.get(1).tags.get("topic"), topic1);
+ assertEquals(cm.get(1).tags.get("namespace"), ns);
+ //check value
+ field = PersistentSubscription.class.getDeclaredField("expiryMonitor");
+ field.setAccessible(true);
+ NumberFormat nf = NumberFormat.getNumberInstance();
+ nf.setMaximumFractionDigits(3);
+ nf.setRoundingMode(RoundingMode.DOWN);
+ for (int i = 0; i < topicList.size(); i++) {
+ PersistentSubscription subscription = (PersistentSubscription) pulsar.getBrokerService()
+ .getTopicIfExists(topicList.get(i)).get().get().getSubscription(subName);
+ PersistentMessageExpiryMonitor monitor = (PersistentMessageExpiryMonitor) field.get(subscription);
+ assertEquals(Double.valueOf(nf.format(monitor.getMessageExpiryRate())).doubleValue(), cm.get(i).value);
+ }
+
+ cm = (List<Metric>) metrics.get("pulsar_subscription_total_msg_expired");
+ assertEquals(cm.size(), 2);
+ assertEquals(cm.get(0).tags.get("topic"), topic2);
+ assertEquals(cm.get(0).tags.get("namespace"), ns);
+ assertEquals(cm.get(1).tags.get("topic"), topic1);
+ assertEquals(cm.get(1).tags.get("namespace"), ns);
+ //check value
+ for (int i = 0; i < topicList.size(); i++) {
+ assertEquals(messages, (long)cm.get(i).value);
+ }
+
+ }
+
+ @Test
public void testPerNamespaceStats() throws Exception {
Producer<byte[]> p1 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();
Producer<byte[]> p2 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic2").create();
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
index c3a09ab..12c5767 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
@@ -74,6 +74,9 @@ public class SubscriptionStats {
/** Total rate of messages expired on this subscription (msg/s). */
public double msgRateExpired;
+ /** Total messages expired on this subscription. */
+ public long totalMsgExpired;
+
/** Last message expire execution timestamp. */
public long lastExpireTimestamp;
@@ -119,6 +122,7 @@ public class SubscriptionStats {
msgBacklogNoDelayed = 0;
unackedMessages = 0;
msgRateExpired = 0;
+ totalMsgExpired = 0;
lastExpireTimestamp = 0L;
consumers.clear();
consumersAfterMarkDeletePosition.clear();
@@ -139,6 +143,7 @@ public class SubscriptionStats {
this.msgBacklogNoDelayed += stats.msgBacklogNoDelayed;
this.unackedMessages += stats.unackedMessages;
this.msgRateExpired += stats.msgRateExpired;
+ this.totalMsgExpired += stats.totalMsgExpired;
this.isReplicated |= stats.isReplicated;
this.isDurable |= stats.isDurable;
if (this.consumers.size() != stats.consumers.size()) {