You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/08/16 02:04:30 UTC

[pulsar] branch branch-2.11 updated: [broker][monitoring] add per-subscription EntryFilter metrics (#16932)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new f992b8089e6 [broker][monitoring] add per-subscription EntryFilter metrics (#16932)
f992b8089e6 is described below

commit f992b8089e6e401a09af9908aa8ac91c3e0ac74c
Author: Tao Jiuming <95...@users.noreply.github.com>
AuthorDate: Tue Aug 16 06:29:10 2022 +0800

    [broker][monitoring] add per-subscription EntryFilter metrics (#16932)
    
    * add Subscription level EntryFilter stats
    
    * Fix Checkstyle
    
    * add entryfilter stats admin-api test
    
    * review fix
    
    * review fix
    
    * review fix
    
    * review fix
    
    * review fix
    
    Fixes https://github.com/apache/pulsar/issues/16596
    
    ### Motivation
    
    add per-subscription `EntryFilter` metrics for PersistentTopic and NonPersistentTopic.
    
    ### Modifications
    
    Add the following metrics to Prometheus:
    1. pulsar_subscription_filter_processed_msg_count
    2. pulsar_subscription_filter_accepted_msg_count
    3. pulsar_subscription_filter_rejected_msg_count
    4. pulsar_subscription_filter_rescheduled_msg_count
    
    Add the following metrics to admin-api:
    1. filterProcessedMsgCount
    2. filterAcceptedMsgCount
    3. filterRejectedMsgCount
    4. filterRescheduledMsgCount
    
    ### Documentation
    
    Check the box below or label this PR directly.
    
    Need to update docs?
    
    - [x] `doc-required`
    (Your PR needs to update docs and you will update later)
    
    - [ ] `doc-not-needed`
    (Please explain why)
    
    - [ ] `doc`
    (Your PR contains doc changes)
    
    - [ ] `doc-complete`
    (Docs have been already added)
---
 .../broker/service/AbstractBaseDispatcher.java     |  33 +++++
 .../apache/pulsar/broker/service/Dispatcher.java   |  17 +++
 .../nonpersistent/NonPersistentSubscription.java   |   5 +
 .../service/nonpersistent/NonPersistentTopic.java  |  15 +++
 .../service/persistent/PersistentSubscription.java |   6 +
 .../broker/service/persistent/PersistentTopic.java |  13 ++
 .../stats/prometheus/AggregatedNamespaceStats.java |   4 +
 .../prometheus/AggregatedSubscriptionStats.java    |   8 ++
 .../stats/prometheus/NamespaceStatsAggregator.java |   5 +-
 .../pulsar/broker/stats/prometheus/TopicStats.java |  10 ++
 .../pulsar/broker/stats/SubscriptionStatsTest.java | 145 +++++++++++++++++++--
 .../common/policies/data/SubscriptionStats.java    |   8 ++
 .../policies/data/stats/SubscriptionStatsImpl.java |  16 +++
 13 files changed, 276 insertions(+), 9 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index 47728cb5b22..22a5df2931e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAdder;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
@@ -51,6 +52,10 @@ public abstract class AbstractBaseDispatcher extends EntryFilterSupport implemen
 
     protected final ServiceConfiguration serviceConfig;
     protected final boolean dispatchThrottlingOnBatchMessageEnabled;
+    private final LongAdder filterProcessedMsgs = new LongAdder();
+    private final LongAdder filterAcceptedMsgs = new LongAdder();
+    private final LongAdder filterRejectedMsgs = new LongAdder();
+    private final LongAdder filterRescheduledMsgs = new LongAdder();
 
     protected AbstractBaseDispatcher(Subscription subscription, ServiceConfiguration serviceConfig) {
         super(subscription);
@@ -116,15 +121,21 @@ public abstract class AbstractBaseDispatcher extends EntryFilterSupport implemen
                             ? ((EntryAndMetadata) entry).getMetadata()
                             : Commands.peekAndCopyMessageMetadata(metadataAndPayload, subscription.toString(), -1)
                     );
+
+            int entryMsgCnt = msgMetadata == null ? 1 : msgMetadata.getNumMessagesInBatch();
+            this.filterProcessedMsgs.add(entryMsgCnt);
+
             EntryFilter.FilterResult filterResult = runFiltersForEntry(entry, msgMetadata, consumer);
             if (filterResult == EntryFilter.FilterResult.REJECT) {
                 entriesToFiltered.add(entry.getPosition());
                 entries.set(i, null);
+                this.filterRejectedMsgs.add(entryMsgCnt);
                 entry.release();
                 continue;
             } else if (filterResult == EntryFilter.FilterResult.RESCHEDULE) {
                 entriesToRedeliver.add((PositionImpl) entry.getPosition());
                 entries.set(i, null);
+                this.filterRescheduledMsgs.add(entryMsgCnt);
                 entry.release();
                 continue;
             }
@@ -163,6 +174,8 @@ public abstract class AbstractBaseDispatcher extends EntryFilterSupport implemen
                 continue;
             }
 
+            this.filterAcceptedMsgs.add(entryMsgCnt);
+
             totalEntries++;
             int batchSize = msgMetadata.getNumMessagesInBatch();
             totalMessages += batchSize;
@@ -285,4 +298,24 @@ public abstract class AbstractBaseDispatcher extends EntryFilterSupport implemen
     protected String getSubscriptionName() {
         return subscription == null ? null : subscription.getName();
     }
+
+    @Override
+    public long getFilterProcessedMsgCount() {
+        return this.filterProcessedMsgs.longValue();
+    }
+
+    @Override
+    public long getFilterAcceptedMsgCount() {
+        return this.filterAcceptedMsgs.longValue();
+    }
+
+    @Override
+    public long getFilterRejectedMsgCount() {
+        return this.filterRejectedMsgs.longValue();
+    }
+
+    @Override
+    public long getFilterRescheduledMsgCount() {
+        return this.filterRescheduledMsgs.longValue();
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
index ec8d4c67121..86980ee340c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
@@ -129,4 +129,21 @@ public interface Dispatcher {
         return false;
     }
 
+
+    default long getFilterProcessedMsgCount() {
+        return 0;
+    }
+
+    default long getFilterAcceptedMsgCount() {
+        return 0;
+    }
+
+    default long getFilterRejectedMsgCount() {
+        return 0;
+    }
+
+    default long getFilterRescheduledMsgCount() {
+        return 0;
+    }
+
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
index 3d0a313fef1..34e86ca7615 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
@@ -462,6 +462,11 @@ public class NonPersistentSubscription extends AbstractSubscription implements S
                 subStats.msgOutCounter += consumerStats.msgOutCounter;
                 subStats.msgRateRedeliver += consumerStats.msgRateRedeliver;
             });
+
+            subStats.filterProcessedMsgCount = dispatcher.getFilterProcessedMsgCount();
+            subStats.filterAcceptedMsgCount = dispatcher.getFilterAcceptedMsgCount();
+            subStats.filterRejectedMsgCount = dispatcher.getFilterRejectedMsgCount();
+            subStats.filterRescheduledMsgCount = dispatcher.getFilterRescheduledMsgCount();
         }
 
         subStats.type = getTypeString();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 30941e0d719..db77d12e713 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -55,6 +55,7 @@ import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyExceptio
 import org.apache.pulsar.broker.service.BrokerServiceException.TopicFencedException;
 import org.apache.pulsar.broker.service.BrokerServiceException.UnsupportedVersionException;
 import org.apache.pulsar.broker.service.Consumer;
+import org.apache.pulsar.broker.service.Dispatcher;
 import org.apache.pulsar.broker.service.Producer;
 import org.apache.pulsar.broker.service.Replicator;
 import org.apache.pulsar.broker.service.StreamingStats;
@@ -749,6 +750,20 @@ public class NonPersistentTopic extends AbstractTopic implements Topic, TopicPol
                 topicStatsStream.writePair("msgThroughputOut", subMsgThroughputOut);
                 topicStatsStream.writePair("msgRateRedeliver", subMsgRateRedeliver);
                 topicStatsStream.writePair("type", subscription.getTypeString());
+
+                // Write entry filter stats
+                Dispatcher dispatcher0 = subscription.getDispatcher();
+                if (null != dispatcher0) {
+                    topicStatsStream.writePair("filterProcessedMsgCount",
+                            dispatcher0.getFilterProcessedMsgCount());
+                    topicStatsStream.writePair("filterAcceptedMsgCount",
+                            dispatcher0.getFilterAcceptedMsgCount());
+                    topicStatsStream.writePair("filterRejectedMsgCount",
+                            dispatcher0.getFilterRejectedMsgCount());
+                    topicStatsStream.writePair("filterRescheduledMsgCount",
+                            dispatcher0.getFilterRescheduledMsgCount());
+                }
+
                 if (subscription.getDispatcher() != null) {
                     subscription.getDispatcher().getMessageDropRate().calculateRate();
                     topicStatsStream.writePair("msgDropRate",
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index a88744b4edc..fcb4135c97e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -1065,6 +1065,7 @@ public class PersistentSubscription extends AbstractSubscription implements Subs
         subStats.lastMarkDeleteAdvancedTimestamp = lastMarkDeleteAdvancedTimestamp;
         subStats.bytesOutCounter = bytesOutFromRemovedConsumers.longValue();
         subStats.msgOutCounter = msgOutFromRemovedConsumer.longValue();
+
         Dispatcher dispatcher = this.dispatcher;
         if (dispatcher != null) {
             Map<Consumer, List<Range>> consumerKeyHashRanges = getType() == SubType.Key_Shared
@@ -1089,6 +1090,11 @@ public class PersistentSubscription extends AbstractSubscription implements Subs
                             .collect(Collectors.toList());
                 }
             });
+
+            subStats.filterProcessedMsgCount = dispatcher.getFilterProcessedMsgCount();
+            subStats.filterAcceptedMsgCount = dispatcher.getFilterAcceptedMsgCount();
+            subStats.filterRejectedMsgCount = dispatcher.getFilterRejectedMsgCount();
+            subStats.filterRescheduledMsgCount = dispatcher.getFilterRescheduledMsgCount();
         }
 
         SubType subType = getType();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 742be6e1afe..9f8ac113d4e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1762,6 +1762,19 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
                 topicStatsStream.writePair("totalNonContiguousDeletedMessagesRange",
                         subscription.getTotalNonContiguousDeletedMessagesRange());
                 topicStatsStream.writePair("type", subscription.getTypeString());
+
+                Dispatcher dispatcher0 = subscription.getDispatcher();
+                if (null != dispatcher0) {
+                    topicStatsStream.writePair("filterProcessedMsgCount",
+                            dispatcher0.getFilterProcessedMsgCount());
+                    topicStatsStream.writePair("filterAcceptedMsgCount",
+                            dispatcher0.getFilterAcceptedMsgCount());
+                    topicStatsStream.writePair("filterRejectedMsgCount",
+                            dispatcher0.getFilterRejectedMsgCount());
+                    topicStatsStream.writePair("filterRescheduledMsgCount",
+                            dispatcher0.getFilterRescheduledMsgCount());
+                }
+
                 if (Subscription.isIndividualAckMode(subscription.getType())) {
                     if (subscription.getDispatcher() instanceof PersistentDispatcherMultipleConsumers) {
                         PersistentDispatcherMultipleConsumers dispatcher =
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
index 761094ac0e6..93631025623 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
@@ -119,6 +119,10 @@ public class AggregatedNamespaceStats {
             subsStats.msgDelayed += as.msgDelayed;
             subsStats.msgRateRedeliver += as.msgRateRedeliver;
             subsStats.unackedMessages += as.unackedMessages;
+            subsStats.filterProcessedMsgCount += as.filterProcessedMsgCount;
+            subsStats.filterAcceptedMsgCount += as.filterAcceptedMsgCount;
+            subsStats.filterRejectedMsgCount += as.filterRejectedMsgCount;
+            subsStats.filterRescheduledMsgCount += as.filterRescheduledMsgCount;
             as.consumerStat.forEach((c, v) -> {
                 AggregatedConsumerStats consumerStats =
                         subsStats.consumerStat.computeIfAbsent(c, k -> new AggregatedConsumerStats());
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
index dbee8b9b87c..e3a56f5e334 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
@@ -64,5 +64,13 @@ public class AggregatedSubscriptionStats {
 
     long consumersCount;
 
+    long filterProcessedMsgCount;
+
+    long filterAcceptedMsgCount;
+
+    long filterRejectedMsgCount;
+
+    long filterRescheduledMsgCount;
+
     public Map<Consumer, AggregatedConsumerStats> consumerStat = new HashMap<>();
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
index f444ad0542e..bfb8140877b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
@@ -136,7 +136,10 @@ public class NamespaceStatsAggregator {
         });
         stats.rateOut += subsStats.msgRateOut;
         stats.throughputOut += subsStats.msgThroughputOut;
-
+        subsStats.filterProcessedMsgCount = subscriptionStats.filterProcessedMsgCount;
+        subsStats.filterAcceptedMsgCount = subscriptionStats.filterAcceptedMsgCount;
+        subsStats.filterRejectedMsgCount = subscriptionStats.filterRejectedMsgCount;
+        subsStats.filterRescheduledMsgCount = subscriptionStats.filterRescheduledMsgCount;
     }
 
     private static void getTopicStats(Topic topic, TopicStats stats, boolean includeConsumerMetrics,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
index 99838ccfae9..5c6047ad807 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
@@ -281,6 +281,16 @@ class TopicStats {
                     subsStats.msgDropRate, splitTopicAndPartitionIndexLabel);
             metric(stream, cluster, namespace, topic, n, "pulsar_subscription_consumers_count",
                     subsStats.consumersCount, splitTopicAndPartitionIndexLabel);
+
+            metric(stream, cluster, namespace, topic, n, "pulsar_subscription_filter_processed_msg_count",
+                    subsStats.filterProcessedMsgCount, splitTopicAndPartitionIndexLabel);
+            metric(stream, cluster, namespace, topic, n, "pulsar_subscription_filter_accepted_msg_count",
+                    subsStats.filterAcceptedMsgCount, splitTopicAndPartitionIndexLabel);
+            metric(stream, cluster, namespace, topic, n, "pulsar_subscription_filter_rejected_msg_count",
+                    subsStats.filterRejectedMsgCount, splitTopicAndPartitionIndexLabel);
+            metric(stream, cluster, namespace, topic, n, "pulsar_subscription_filter_rescheduled_msg_count",
+                    subsStats.filterRescheduledMsgCount, splitTopicAndPartitionIndexLabel);
+
             subsStats.consumerStat.forEach((c, consumerStats) -> {
                 metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(),
                         "pulsar_consumer_msg_rate_redeliver", consumerStats.msgRateRedeliver,
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java
index fdc629b2fd0..f2794f68908 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java
@@ -18,24 +18,34 @@
  */
 package org.apache.pulsar.broker.stats;
 
+import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
+import static org.mockito.Mockito.mock;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Multimap;
+import java.io.ByteArrayOutputStream;
+import java.lang.reflect.Field;
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.service.Dispatcher;
+import org.apache.pulsar.broker.service.EntryFilterSupport;
+import org.apache.pulsar.broker.service.plugin.*;
+import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
 import org.apache.pulsar.client.admin.PulsarAdminException;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConsumerBase;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.*;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.nar.NarClassLoader;
+import org.apache.pulsar.common.policies.data.SubscriptionStats;
 import org.apache.pulsar.common.policies.data.TopicStats;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
-import java.util.UUID;
-
 @Slf4j
 @Test(groups = "broker")
 public class SubscriptionStatsTest extends ProducerConsumerBase {
@@ -134,4 +144,123 @@ public class SubscriptionStatsTest extends ProducerConsumerBase {
                     .getNonContiguousDeletedMessagesRangesSerializedSize() > 0);
         });
     }
+
+    @DataProvider(name = "testSubscriptionMetrics")
+    public Object[][] topicAndSubscription() {
+        return new Object[][]{
+                {"persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub1", true},
+                {"non-persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub2", true},
+                {"persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub3", false},
+                {"non-persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub4", false},
+        };
+    }
+
+    @Test(dataProvider = "testSubscriptionMetrics")
+    public void testSubscriptionStats(final String topic, final String subName, boolean enableTopicStats)
+            throws Exception {
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .enableBatching(false)
+                .create();
+
+        @Cleanup
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionType(SubscriptionType.Exclusive)
+                .subscriptionName(subName)
+                .subscribe();
+
+        boolean isPersistent = pulsar.getBrokerService().getTopic(topic, false).get().get().isPersistent();
+        Dispatcher dispatcher = pulsar.getBrokerService().getTopic(topic, false).get()
+                .get().getSubscription(subName).getDispatcher();
+
+        Field field = EntryFilterSupport.class.getDeclaredField("entryFilters");
+        field.setAccessible(true);
+        NarClassLoader narClassLoader = mock(NarClassLoader.class);
+        EntryFilter filter1 = new EntryFilterTest();
+        EntryFilterWithClassLoader loader1 = spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter1, narClassLoader);
+        field.set(dispatcher, ImmutableList.of(loader1));
+
+        for (int i = 0; i < 100; i++) {
+            producer.newMessage().property("ACCEPT", " ").value(UUID.randomUUID().toString()).send();
+        }
+        for (int i = 0; i < 100; i++) {
+            producer.newMessage().property("REJECT", " ").value(UUID.randomUUID().toString()).send();
+        }
+        for (int i = 0; i < 100; i++) {
+            producer.newMessage().property("RESCHEDULE", " ").value(UUID.randomUUID().toString()).send();
+        }
+
+        for (;;) {
+            Message<String> message = consumer.receive(10, TimeUnit.SECONDS);
+            if (message == null) {
+                break;
+            }
+            consumer.acknowledge(message);
+        }
+
+        ByteArrayOutputStream output = new ByteArrayOutputStream();
+        PrometheusMetricsGenerator.generate(pulsar, enableTopicStats, false, false, output);
+        String metricsStr = output.toString();
+        Multimap<String, PrometheusMetricsTest.Metric> metrics = PrometheusMetricsTest.parseMetrics(metricsStr);
+
+        Collection<PrometheusMetricsTest.Metric> throughFilterMetrics =
+                metrics.get("pulsar_subscription_filter_processed_msg_count");
+        Collection<PrometheusMetricsTest.Metric> acceptedMetrics =
+                metrics.get("pulsar_subscription_filter_accepted_msg_count");
+        Collection<PrometheusMetricsTest.Metric> rejectedMetrics =
+                metrics.get("pulsar_subscription_filter_rejected_msg_count");
+        Collection<PrometheusMetricsTest.Metric> rescheduledMetrics =
+                metrics.get("pulsar_subscription_filter_rescheduled_msg_count");
+
+        if (enableTopicStats) {
+            Assert.assertTrue(throughFilterMetrics.size() > 0);
+            Assert.assertTrue(acceptedMetrics.size() > 0);
+            Assert.assertTrue(rejectedMetrics.size() > 0);
+            Assert.assertTrue(rescheduledMetrics.size() > 0);
+
+            double throughFilter = throughFilterMetrics.stream()
+                    .filter(m -> m.tags.get("subscription").equals(subName) && m.tags.get("topic").equals(topic))
+                    .mapToDouble(m-> m.value).sum();
+            double filterAccepted = acceptedMetrics.stream()
+                    .filter(m -> m.tags.get("subscription").equals(subName) && m.tags.get("topic").equals(topic))
+                    .mapToDouble(m-> m.value).sum();
+            double filterRejected = rejectedMetrics.stream()
+                    .filter(m -> m.tags.get("subscription").equals(subName) && m.tags.get("topic").equals(topic))
+                    .mapToDouble(m-> m.value).sum();
+            double filterRescheduled = rescheduledMetrics.stream()
+                    .filter(m -> m.tags.get("subscription").equals(subName) && m.tags.get("topic").equals(topic))
+                    .mapToDouble(m-> m.value).sum();
+
+            Assert.assertEquals(filterAccepted, 100);
+            if (isPersistent) {
+                Assert.assertEquals(filterRejected, 100);
+                Assert.assertEquals(throughFilter, filterAccepted + filterRejected + filterRescheduled, 0.01 * throughFilter);
+            }
+        } else {
+            Assert.assertEquals(throughFilterMetrics.size(), 0);
+            Assert.assertEquals(acceptedMetrics.size(), 0);
+            Assert.assertEquals(rejectedMetrics.size(), 0);
+            Assert.assertEquals(rescheduledMetrics.size(), 0);
+        }
+
+        testSubscriptionStatsAdminApi(topic, subName);
+    }
+
+    private void testSubscriptionStatsAdminApi(String topic, String subName) throws Exception {
+        boolean persistent = TopicName.get(topic).isPersistent();
+        TopicStats topicStats = admin.topics().getStats(topic);
+        SubscriptionStats stats = topicStats.getSubscriptions().get(subName);
+        Assert.assertNotNull(stats);
+
+        Assert.assertEquals(stats.getFilterAcceptedMsgCount(), 100);
+        if (persistent) {
+            Assert.assertEquals(stats.getFilterRejectedMsgCount(), 100);
+            Assert.assertEquals(stats.getFilterProcessedMsgCount(),
+                    stats.getFilterAcceptedMsgCount() + stats.getFilterRejectedMsgCount()
+                            + stats.getFilterRescheduledMsgCount(),
+                    0.01 * stats.getFilterProcessedMsgCount());
+        }
+    }
 }
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
index 4e7e9e0e1e6..57d2d16d62c 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
@@ -126,4 +126,12 @@ public interface SubscriptionStats {
 
     /** The serialized size of non-contiguous deleted messages ranges. */
     int getNonContiguousDeletedMessagesRangesSerializedSize();
+
+    long getFilterProcessedMsgCount();
+
+    long getFilterAcceptedMsgCount();
+
+    long getFilterRejectedMsgCount();
+
+    long getFilterRescheduledMsgCount();
 }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
index a945ae9cf86..02e1e8e184b 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
@@ -137,6 +137,14 @@ public class SubscriptionStatsImpl implements SubscriptionStats {
     /** SubscriptionProperties (key/value strings) associated with this subscribe. */
     public Map<String, String> subscriptionProperties;
 
+    public long filterProcessedMsgCount;
+
+    public long filterAcceptedMsgCount;
+
+    public long filterRejectedMsgCount;
+
+    public long filterRescheduledMsgCount;
+
     public SubscriptionStatsImpl() {
         this.consumers = new ArrayList<>();
         this.consumersAfterMarkDeletePosition = new LinkedHashMap<>();
@@ -163,6 +171,10 @@ public class SubscriptionStatsImpl implements SubscriptionStats {
         nonContiguousDeletedMessagesRangesSerializedSize = 0;
         delayedTrackerMemoryUsage = 0;
         subscriptionProperties.clear();
+        filterProcessedMsgCount = 0;
+        filterAcceptedMsgCount = 0;
+        filterRejectedMsgCount = 0;
+        filterRescheduledMsgCount = 0;
     }
 
     // if the stats are added for the 1st time, we will need to make a copy of these stats and add it to the current
@@ -199,6 +211,10 @@ public class SubscriptionStatsImpl implements SubscriptionStats {
         this.nonContiguousDeletedMessagesRangesSerializedSize += stats.nonContiguousDeletedMessagesRangesSerializedSize;
         this.delayedTrackerMemoryUsage += stats.delayedTrackerMemoryUsage;
         this.subscriptionProperties.putAll(stats.subscriptionProperties);
+        this.filterProcessedMsgCount += stats.filterProcessedMsgCount;
+        this.filterAcceptedMsgCount += stats.filterAcceptedMsgCount;
+        this.filterRejectedMsgCount += stats.filterRejectedMsgCount;
+        this.filterRescheduledMsgCount += stats.filterRescheduledMsgCount;
         return this;
     }
 }