You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/08/16 02:04:30 UTC
[pulsar] branch branch-2.11 updated: [broker][monitoring] add per-subscription EntryFilter metrics (#16932)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new f992b8089e6 [broker][monitoring] add per-subscription EntryFilter metrics (#16932)
f992b8089e6 is described below
commit f992b8089e6e401a09af9908aa8ac91c3e0ac74c
Author: Tao Jiuming <95...@users.noreply.github.com>
AuthorDate: Tue Aug 16 06:29:10 2022 +0800
[broker][monitoring] add per-subscription EntryFilter metrics (#16932)
* add Subscription level EntryFilter stats
* Fix Checkstyle
* add entryfilter stats admin-api test
* review fix
* review fix
* review fix
* review fix
* review fix
Fixes https://github.com/apache/pulsar/issues/16596
### Motivation
add per-subscription `EntryFilter` metrics for PersistentTopic and NonPersistentTopic.
### Modifications
Add the following metrics to Prometheus:
1. pulsar_subscription_filter_processed_msg_count
2. pulsar_subscription_filter_accepted_msg_count
3. pulsar_subscription_filter_rejected_msg_count
4. pulsar_subscription_filter_rescheduled_msg_count
Add the following metrics to admin-api:
1. filterProcessedMsgCount
2. filterAcceptedMsgCount
3. filterRejectedMsgCount
4. filterRescheduledMsgCount
### Documentation
Check the box below or label this PR directly.
Need to update docs?
- [x] `doc-required`
(Your PR needs to update docs and you will update later)
- [ ] `doc-not-needed`
(Please explain why)
- [ ] `doc`
(Your PR contains doc changes)
- [ ] `doc-complete`
(Docs have been already added)
---
.../broker/service/AbstractBaseDispatcher.java | 33 +++++
.../apache/pulsar/broker/service/Dispatcher.java | 17 +++
.../nonpersistent/NonPersistentSubscription.java | 5 +
.../service/nonpersistent/NonPersistentTopic.java | 15 +++
.../service/persistent/PersistentSubscription.java | 6 +
.../broker/service/persistent/PersistentTopic.java | 13 ++
.../stats/prometheus/AggregatedNamespaceStats.java | 4 +
.../prometheus/AggregatedSubscriptionStats.java | 8 ++
.../stats/prometheus/NamespaceStatsAggregator.java | 5 +-
.../pulsar/broker/stats/prometheus/TopicStats.java | 10 ++
.../pulsar/broker/stats/SubscriptionStatsTest.java | 145 +++++++++++++++++++--
.../common/policies/data/SubscriptionStats.java | 8 ++
.../policies/data/stats/SubscriptionStatsImpl.java | 16 +++
13 files changed, 276 insertions(+), 9 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index 47728cb5b22..22a5df2931e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -26,6 +26,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAdder;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
@@ -51,6 +52,10 @@ public abstract class AbstractBaseDispatcher extends EntryFilterSupport implemen
protected final ServiceConfiguration serviceConfig;
protected final boolean dispatchThrottlingOnBatchMessageEnabled;
+ private final LongAdder filterProcessedMsgs = new LongAdder();
+ private final LongAdder filterAcceptedMsgs = new LongAdder();
+ private final LongAdder filterRejectedMsgs = new LongAdder();
+ private final LongAdder filterRescheduledMsgs = new LongAdder();
protected AbstractBaseDispatcher(Subscription subscription, ServiceConfiguration serviceConfig) {
super(subscription);
@@ -116,15 +121,21 @@ public abstract class AbstractBaseDispatcher extends EntryFilterSupport implemen
? ((EntryAndMetadata) entry).getMetadata()
: Commands.peekAndCopyMessageMetadata(metadataAndPayload, subscription.toString(), -1)
);
+
+ int entryMsgCnt = msgMetadata == null ? 1 : msgMetadata.getNumMessagesInBatch();
+ this.filterProcessedMsgs.add(entryMsgCnt);
+
EntryFilter.FilterResult filterResult = runFiltersForEntry(entry, msgMetadata, consumer);
if (filterResult == EntryFilter.FilterResult.REJECT) {
entriesToFiltered.add(entry.getPosition());
entries.set(i, null);
+ this.filterRejectedMsgs.add(entryMsgCnt);
entry.release();
continue;
} else if (filterResult == EntryFilter.FilterResult.RESCHEDULE) {
entriesToRedeliver.add((PositionImpl) entry.getPosition());
entries.set(i, null);
+ this.filterRescheduledMsgs.add(entryMsgCnt);
entry.release();
continue;
}
@@ -163,6 +174,8 @@ public abstract class AbstractBaseDispatcher extends EntryFilterSupport implemen
continue;
}
+ this.filterAcceptedMsgs.add(entryMsgCnt);
+
totalEntries++;
int batchSize = msgMetadata.getNumMessagesInBatch();
totalMessages += batchSize;
@@ -285,4 +298,24 @@ public abstract class AbstractBaseDispatcher extends EntryFilterSupport implemen
protected String getSubscriptionName() {
return subscription == null ? null : subscription.getName();
}
+
+ @Override
+ public long getFilterProcessedMsgCount() {
+ return this.filterProcessedMsgs.longValue();
+ }
+
+ @Override
+ public long getFilterAcceptedMsgCount() {
+ return this.filterAcceptedMsgs.longValue();
+ }
+
+ @Override
+ public long getFilterRejectedMsgCount() {
+ return this.filterRejectedMsgs.longValue();
+ }
+
+ @Override
+ public long getFilterRescheduledMsgCount() {
+ return this.filterRescheduledMsgs.longValue();
+ }
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
index ec8d4c67121..86980ee340c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
@@ -129,4 +129,21 @@ public interface Dispatcher {
return false;
}
+
+ default long getFilterProcessedMsgCount() {
+ return 0;
+ }
+
+ default long getFilterAcceptedMsgCount() {
+ return 0;
+ }
+
+ default long getFilterRejectedMsgCount() {
+ return 0;
+ }
+
+ default long getFilterRescheduledMsgCount() {
+ return 0;
+ }
+
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
index 3d0a313fef1..34e86ca7615 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
@@ -462,6 +462,11 @@ public class NonPersistentSubscription extends AbstractSubscription implements S
subStats.msgOutCounter += consumerStats.msgOutCounter;
subStats.msgRateRedeliver += consumerStats.msgRateRedeliver;
});
+
+ subStats.filterProcessedMsgCount = dispatcher.getFilterProcessedMsgCount();
+ subStats.filterAcceptedMsgCount = dispatcher.getFilterAcceptedMsgCount();
+ subStats.filterRejectedMsgCount = dispatcher.getFilterRejectedMsgCount();
+ subStats.filterRescheduledMsgCount = dispatcher.getFilterRescheduledMsgCount();
}
subStats.type = getTypeString();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 30941e0d719..db77d12e713 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -55,6 +55,7 @@ import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyExceptio
import org.apache.pulsar.broker.service.BrokerServiceException.TopicFencedException;
import org.apache.pulsar.broker.service.BrokerServiceException.UnsupportedVersionException;
import org.apache.pulsar.broker.service.Consumer;
+import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.broker.service.Producer;
import org.apache.pulsar.broker.service.Replicator;
import org.apache.pulsar.broker.service.StreamingStats;
@@ -749,6 +750,20 @@ public class NonPersistentTopic extends AbstractTopic implements Topic, TopicPol
topicStatsStream.writePair("msgThroughputOut", subMsgThroughputOut);
topicStatsStream.writePair("msgRateRedeliver", subMsgRateRedeliver);
topicStatsStream.writePair("type", subscription.getTypeString());
+
+ // Write entry filter stats
+ Dispatcher dispatcher0 = subscription.getDispatcher();
+ if (null != dispatcher0) {
+ topicStatsStream.writePair("filterProcessedMsgCount",
+ dispatcher0.getFilterProcessedMsgCount());
+ topicStatsStream.writePair("filterAcceptedMsgCount",
+ dispatcher0.getFilterAcceptedMsgCount());
+ topicStatsStream.writePair("filterRejectedMsgCount",
+ dispatcher0.getFilterRejectedMsgCount());
+ topicStatsStream.writePair("filterRescheduledMsgCount",
+ dispatcher0.getFilterRescheduledMsgCount());
+ }
+
if (subscription.getDispatcher() != null) {
subscription.getDispatcher().getMessageDropRate().calculateRate();
topicStatsStream.writePair("msgDropRate",
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index a88744b4edc..fcb4135c97e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -1065,6 +1065,7 @@ public class PersistentSubscription extends AbstractSubscription implements Subs
subStats.lastMarkDeleteAdvancedTimestamp = lastMarkDeleteAdvancedTimestamp;
subStats.bytesOutCounter = bytesOutFromRemovedConsumers.longValue();
subStats.msgOutCounter = msgOutFromRemovedConsumer.longValue();
+
Dispatcher dispatcher = this.dispatcher;
if (dispatcher != null) {
Map<Consumer, List<Range>> consumerKeyHashRanges = getType() == SubType.Key_Shared
@@ -1089,6 +1090,11 @@ public class PersistentSubscription extends AbstractSubscription implements Subs
.collect(Collectors.toList());
}
});
+
+ subStats.filterProcessedMsgCount = dispatcher.getFilterProcessedMsgCount();
+ subStats.filterAcceptedMsgCount = dispatcher.getFilterAcceptedMsgCount();
+ subStats.filterRejectedMsgCount = dispatcher.getFilterRejectedMsgCount();
+ subStats.filterRescheduledMsgCount = dispatcher.getFilterRescheduledMsgCount();
}
SubType subType = getType();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 742be6e1afe..9f8ac113d4e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1762,6 +1762,19 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
topicStatsStream.writePair("totalNonContiguousDeletedMessagesRange",
subscription.getTotalNonContiguousDeletedMessagesRange());
topicStatsStream.writePair("type", subscription.getTypeString());
+
+ Dispatcher dispatcher0 = subscription.getDispatcher();
+ if (null != dispatcher0) {
+ topicStatsStream.writePair("filterProcessedMsgCount",
+ dispatcher0.getFilterProcessedMsgCount());
+ topicStatsStream.writePair("filterAcceptedMsgCount",
+ dispatcher0.getFilterAcceptedMsgCount());
+ topicStatsStream.writePair("filterRejectedMsgCount",
+ dispatcher0.getFilterRejectedMsgCount());
+ topicStatsStream.writePair("filterRescheduledMsgCount",
+ dispatcher0.getFilterRescheduledMsgCount());
+ }
+
if (Subscription.isIndividualAckMode(subscription.getType())) {
if (subscription.getDispatcher() instanceof PersistentDispatcherMultipleConsumers) {
PersistentDispatcherMultipleConsumers dispatcher =
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
index 761094ac0e6..93631025623 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
@@ -119,6 +119,10 @@ public class AggregatedNamespaceStats {
subsStats.msgDelayed += as.msgDelayed;
subsStats.msgRateRedeliver += as.msgRateRedeliver;
subsStats.unackedMessages += as.unackedMessages;
+ subsStats.filterProcessedMsgCount += as.filterProcessedMsgCount;
+ subsStats.filterAcceptedMsgCount += as.filterAcceptedMsgCount;
+ subsStats.filterRejectedMsgCount += as.filterRejectedMsgCount;
+ subsStats.filterRescheduledMsgCount += as.filterRescheduledMsgCount;
as.consumerStat.forEach((c, v) -> {
AggregatedConsumerStats consumerStats =
subsStats.consumerStat.computeIfAbsent(c, k -> new AggregatedConsumerStats());
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
index dbee8b9b87c..e3a56f5e334 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
@@ -64,5 +64,13 @@ public class AggregatedSubscriptionStats {
long consumersCount;
+ long filterProcessedMsgCount;
+
+ long filterAcceptedMsgCount;
+
+ long filterRejectedMsgCount;
+
+ long filterRescheduledMsgCount;
+
public Map<Consumer, AggregatedConsumerStats> consumerStat = new HashMap<>();
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
index f444ad0542e..bfb8140877b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
@@ -136,7 +136,10 @@ public class NamespaceStatsAggregator {
});
stats.rateOut += subsStats.msgRateOut;
stats.throughputOut += subsStats.msgThroughputOut;
-
+ subsStats.filterProcessedMsgCount = subscriptionStats.filterProcessedMsgCount;
+ subsStats.filterAcceptedMsgCount = subscriptionStats.filterAcceptedMsgCount;
+ subsStats.filterRejectedMsgCount = subscriptionStats.filterRejectedMsgCount;
+ subsStats.filterRescheduledMsgCount = subscriptionStats.filterRescheduledMsgCount;
}
private static void getTopicStats(Topic topic, TopicStats stats, boolean includeConsumerMetrics,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
index 99838ccfae9..5c6047ad807 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
@@ -281,6 +281,16 @@ class TopicStats {
subsStats.msgDropRate, splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_consumers_count",
subsStats.consumersCount, splitTopicAndPartitionIndexLabel);
+
+ metric(stream, cluster, namespace, topic, n, "pulsar_subscription_filter_processed_msg_count",
+ subsStats.filterProcessedMsgCount, splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, n, "pulsar_subscription_filter_accepted_msg_count",
+ subsStats.filterAcceptedMsgCount, splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, n, "pulsar_subscription_filter_rejected_msg_count",
+ subsStats.filterRejectedMsgCount, splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, n, "pulsar_subscription_filter_rescheduled_msg_count",
+ subsStats.filterRescheduledMsgCount, splitTopicAndPartitionIndexLabel);
+
subsStats.consumerStat.forEach((c, consumerStats) -> {
metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(),
"pulsar_consumer_msg_rate_redeliver", consumerStats.msgRateRedeliver,
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java
index fdc629b2fd0..f2794f68908 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java
@@ -18,24 +18,34 @@
*/
package org.apache.pulsar.broker.stats;
+import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
+import static org.mockito.Mockito.mock;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Multimap;
+import java.io.ByteArrayOutputStream;
+import java.lang.reflect.Field;
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.service.Dispatcher;
+import org.apache.pulsar.broker.service.EntryFilterSupport;
+import org.apache.pulsar.broker.service.plugin.*;
+import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
import org.apache.pulsar.client.admin.PulsarAdminException;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConsumerBase;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.*;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.nar.NarClassLoader;
+import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
-import java.util.UUID;
-
@Slf4j
@Test(groups = "broker")
public class SubscriptionStatsTest extends ProducerConsumerBase {
@@ -134,4 +144,123 @@ public class SubscriptionStatsTest extends ProducerConsumerBase {
.getNonContiguousDeletedMessagesRangesSerializedSize() > 0);
});
}
+
+ @DataProvider(name = "testSubscriptionMetrics")
+ public Object[][] topicAndSubscription() {
+ return new Object[][]{
+ {"persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub1", true},
+ {"non-persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub2", true},
+ {"persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub3", false},
+ {"non-persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub4", false},
+ };
+ }
+
+ @Test(dataProvider = "testSubscriptionMetrics")
+ public void testSubscriptionStats(final String topic, final String subName, boolean enableTopicStats)
+ throws Exception {
+ @Cleanup
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .topic(topic)
+ .enableBatching(false)
+ .create();
+
+ @Cleanup
+ Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+ .topic(topic)
+ .subscriptionType(SubscriptionType.Exclusive)
+ .subscriptionName(subName)
+ .subscribe();
+
+ boolean isPersistent = pulsar.getBrokerService().getTopic(topic, false).get().get().isPersistent();
+ Dispatcher dispatcher = pulsar.getBrokerService().getTopic(topic, false).get()
+ .get().getSubscription(subName).getDispatcher();
+
+ Field field = EntryFilterSupport.class.getDeclaredField("entryFilters");
+ field.setAccessible(true);
+ NarClassLoader narClassLoader = mock(NarClassLoader.class);
+ EntryFilter filter1 = new EntryFilterTest();
+ EntryFilterWithClassLoader loader1 = spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter1, narClassLoader);
+ field.set(dispatcher, ImmutableList.of(loader1));
+
+ for (int i = 0; i < 100; i++) {
+ producer.newMessage().property("ACCEPT", " ").value(UUID.randomUUID().toString()).send();
+ }
+ for (int i = 0; i < 100; i++) {
+ producer.newMessage().property("REJECT", " ").value(UUID.randomUUID().toString()).send();
+ }
+ for (int i = 0; i < 100; i++) {
+ producer.newMessage().property("RESCHEDULE", " ").value(UUID.randomUUID().toString()).send();
+ }
+
+ for (;;) {
+ Message<String> message = consumer.receive(10, TimeUnit.SECONDS);
+ if (message == null) {
+ break;
+ }
+ consumer.acknowledge(message);
+ }
+
+ ByteArrayOutputStream output = new ByteArrayOutputStream();
+ PrometheusMetricsGenerator.generate(pulsar, enableTopicStats, false, false, output);
+ String metricsStr = output.toString();
+ Multimap<String, PrometheusMetricsTest.Metric> metrics = PrometheusMetricsTest.parseMetrics(metricsStr);
+
+ Collection<PrometheusMetricsTest.Metric> throughFilterMetrics =
+ metrics.get("pulsar_subscription_filter_processed_msg_count");
+ Collection<PrometheusMetricsTest.Metric> acceptedMetrics =
+ metrics.get("pulsar_subscription_filter_accepted_msg_count");
+ Collection<PrometheusMetricsTest.Metric> rejectedMetrics =
+ metrics.get("pulsar_subscription_filter_rejected_msg_count");
+ Collection<PrometheusMetricsTest.Metric> rescheduledMetrics =
+ metrics.get("pulsar_subscription_filter_rescheduled_msg_count");
+
+ if (enableTopicStats) {
+ Assert.assertTrue(throughFilterMetrics.size() > 0);
+ Assert.assertTrue(acceptedMetrics.size() > 0);
+ Assert.assertTrue(rejectedMetrics.size() > 0);
+ Assert.assertTrue(rescheduledMetrics.size() > 0);
+
+ double throughFilter = throughFilterMetrics.stream()
+ .filter(m -> m.tags.get("subscription").equals(subName) && m.tags.get("topic").equals(topic))
+ .mapToDouble(m-> m.value).sum();
+ double filterAccepted = acceptedMetrics.stream()
+ .filter(m -> m.tags.get("subscription").equals(subName) && m.tags.get("topic").equals(topic))
+ .mapToDouble(m-> m.value).sum();
+ double filterRejected = rejectedMetrics.stream()
+ .filter(m -> m.tags.get("subscription").equals(subName) && m.tags.get("topic").equals(topic))
+ .mapToDouble(m-> m.value).sum();
+ double filterRescheduled = rescheduledMetrics.stream()
+ .filter(m -> m.tags.get("subscription").equals(subName) && m.tags.get("topic").equals(topic))
+ .mapToDouble(m-> m.value).sum();
+
+ Assert.assertEquals(filterAccepted, 100);
+ if (isPersistent) {
+ Assert.assertEquals(filterRejected, 100);
+ Assert.assertEquals(throughFilter, filterAccepted + filterRejected + filterRescheduled, 0.01 * throughFilter);
+ }
+ } else {
+ Assert.assertEquals(throughFilterMetrics.size(), 0);
+ Assert.assertEquals(acceptedMetrics.size(), 0);
+ Assert.assertEquals(rejectedMetrics.size(), 0);
+ Assert.assertEquals(rescheduledMetrics.size(), 0);
+ }
+
+ testSubscriptionStatsAdminApi(topic, subName);
+ }
+
+ private void testSubscriptionStatsAdminApi(String topic, String subName) throws Exception {
+ boolean persistent = TopicName.get(topic).isPersistent();
+ TopicStats topicStats = admin.topics().getStats(topic);
+ SubscriptionStats stats = topicStats.getSubscriptions().get(subName);
+ Assert.assertNotNull(stats);
+
+ Assert.assertEquals(stats.getFilterAcceptedMsgCount(), 100);
+ if (persistent) {
+ Assert.assertEquals(stats.getFilterRejectedMsgCount(), 100);
+ Assert.assertEquals(stats.getFilterProcessedMsgCount(),
+ stats.getFilterAcceptedMsgCount() + stats.getFilterRejectedMsgCount()
+ + stats.getFilterRescheduledMsgCount(),
+ 0.01 * stats.getFilterProcessedMsgCount());
+ }
+ }
}
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
index 4e7e9e0e1e6..57d2d16d62c 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
@@ -126,4 +126,12 @@ public interface SubscriptionStats {
/** The serialized size of non-contiguous deleted messages ranges. */
int getNonContiguousDeletedMessagesRangesSerializedSize();
+
+ long getFilterProcessedMsgCount();
+
+ long getFilterAcceptedMsgCount();
+
+ long getFilterRejectedMsgCount();
+
+ long getFilterRescheduledMsgCount();
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
index a945ae9cf86..02e1e8e184b 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
@@ -137,6 +137,14 @@ public class SubscriptionStatsImpl implements SubscriptionStats {
/** SubscriptionProperties (key/value strings) associated with this subscribe. */
public Map<String, String> subscriptionProperties;
+ public long filterProcessedMsgCount;
+
+ public long filterAcceptedMsgCount;
+
+ public long filterRejectedMsgCount;
+
+ public long filterRescheduledMsgCount;
+
public SubscriptionStatsImpl() {
this.consumers = new ArrayList<>();
this.consumersAfterMarkDeletePosition = new LinkedHashMap<>();
@@ -163,6 +171,10 @@ public class SubscriptionStatsImpl implements SubscriptionStats {
nonContiguousDeletedMessagesRangesSerializedSize = 0;
delayedTrackerMemoryUsage = 0;
subscriptionProperties.clear();
+ filterProcessedMsgCount = 0;
+ filterAcceptedMsgCount = 0;
+ filterRejectedMsgCount = 0;
+ filterRescheduledMsgCount = 0;
}
// if the stats are added for the 1st time, we will need to make a copy of these stats and add it to the current
@@ -199,6 +211,10 @@ public class SubscriptionStatsImpl implements SubscriptionStats {
this.nonContiguousDeletedMessagesRangesSerializedSize += stats.nonContiguousDeletedMessagesRangesSerializedSize;
this.delayedTrackerMemoryUsage += stats.delayedTrackerMemoryUsage;
this.subscriptionProperties.putAll(stats.subscriptionProperties);
+ this.filterProcessedMsgCount += stats.filterProcessedMsgCount;
+ this.filterAcceptedMsgCount += stats.filterAcceptedMsgCount;
+ this.filterRejectedMsgCount += stats.filterRejectedMsgCount;
+ this.filterRescheduledMsgCount += stats.filterRescheduledMsgCount;
return this;
}
}