You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/09/26 01:38:14 UTC

[pulsar] branch branch-2.11 updated: [monitoring][broker][fix] Fix EntryFilter stats (#17605)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new bc0acb96446 [monitoring][broker][fix] Fix EntryFilter stats (#17605)
bc0acb96446 is described below

commit bc0acb96446687d57431c838d848f2d69daa9d7e
Author: Tao Jiuming <95...@users.noreply.github.com>
AuthorDate: Wed Sep 21 21:30:03 2022 +0800

    [monitoring][broker][fix] Fix EntryFilter stats (#17605)
    
    * fix entryFilter stats
    
    * fix test
    
    * add test comment
    
    * review fix
---
 .../broker/service/AbstractBaseDispatcher.java     | 17 +++--
 .../pulsar/broker/stats/SubscriptionStatsTest.java | 76 +++++++++++++++-------
 2 files changed, 64 insertions(+), 29 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index 22a5df2931e..1649516def1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -107,8 +107,9 @@ public abstract class AbstractBaseDispatcher extends EntryFilterSupport implemen
         long totalBytes = 0;
         int totalChunkedMessages = 0;
         int totalEntries = 0;
-        List<Position> entriesToFiltered = CollectionUtils.isNotEmpty(entryFilters) ? new ArrayList<>() : null;
-        List<PositionImpl> entriesToRedeliver = CollectionUtils.isNotEmpty(entryFilters) ? new ArrayList<>() : null;
+        final boolean hasFilter = CollectionUtils.isNotEmpty(entryFilters);
+        List<Position> entriesToFiltered = hasFilter ? new ArrayList<>() : null;
+        List<PositionImpl> entriesToRedeliver = hasFilter ? new ArrayList<>() : null;
         for (int i = 0, entriesSize = entries.size(); i < entriesSize; i++) {
             final Entry entry = entries.get(i);
             if (entry == null) {
@@ -123,18 +124,24 @@ public abstract class AbstractBaseDispatcher extends EntryFilterSupport implemen
                     );
 
             int entryMsgCnt = msgMetadata == null ? 1 : msgMetadata.getNumMessagesInBatch();
-            this.filterProcessedMsgs.add(entryMsgCnt);
+            if (hasFilter) {
+                this.filterProcessedMsgs.add(entryMsgCnt);
+            }
 
             EntryFilter.FilterResult filterResult = runFiltersForEntry(entry, msgMetadata, consumer);
             if (filterResult == EntryFilter.FilterResult.REJECT) {
                 entriesToFiltered.add(entry.getPosition());
                 entries.set(i, null);
+                // FilterResult will be always `ACCEPTED` when there is No Filter
+                // dont need to judge whether `hasFilter` is true or not.
                 this.filterRejectedMsgs.add(entryMsgCnt);
                 entry.release();
                 continue;
             } else if (filterResult == EntryFilter.FilterResult.RESCHEDULE) {
                 entriesToRedeliver.add((PositionImpl) entry.getPosition());
                 entries.set(i, null);
+                // FilterResult will be always `ACCEPTED` when there is No Filter
+                // dont need to judge whether `hasFilter` is true or not.
                 this.filterRescheduledMsgs.add(entryMsgCnt);
                 entry.release();
                 continue;
@@ -174,7 +181,9 @@ public abstract class AbstractBaseDispatcher extends EntryFilterSupport implemen
                 continue;
             }
 
-            this.filterAcceptedMsgs.add(entryMsgCnt);
+            if (hasFilter) {
+                this.filterAcceptedMsgs.add(entryMsgCnt);
+            }
 
             totalEntries++;
             int batchSize = msgMetadata.getNumMessagesInBatch();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java
index f2794f68908..b1b865727a0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java
@@ -148,16 +148,21 @@ public class SubscriptionStatsTest extends ProducerConsumerBase {
     @DataProvider(name = "testSubscriptionMetrics")
     public Object[][] topicAndSubscription() {
         return new Object[][]{
-                {"persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub1", true},
-                {"non-persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub2", true},
-                {"persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub3", false},
-                {"non-persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub4", false},
+                {"persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub1", true, true},
+                {"non-persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub2", true, true},
+                {"persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub3", false, true},
+                {"non-persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub4", false, true},
+
+                {"persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub1", true, false},
+                {"non-persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub2", true, false},
+                {"persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub3", false, false},
+                {"non-persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub4", false, false},
         };
     }
 
     @Test(dataProvider = "testSubscriptionMetrics")
-    public void testSubscriptionStats(final String topic, final String subName, boolean enableTopicStats)
-            throws Exception {
+    public void testSubscriptionStats(final String topic, final String subName, boolean enableTopicStats,
+                                      boolean setFilter) throws Exception {
         @Cleanup
         Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
                 .topic(topic)
@@ -175,12 +180,15 @@ public class SubscriptionStatsTest extends ProducerConsumerBase {
         Dispatcher dispatcher = pulsar.getBrokerService().getTopic(topic, false).get()
                 .get().getSubscription(subName).getDispatcher();
 
-        Field field = EntryFilterSupport.class.getDeclaredField("entryFilters");
-        field.setAccessible(true);
-        NarClassLoader narClassLoader = mock(NarClassLoader.class);
-        EntryFilter filter1 = new EntryFilterTest();
-        EntryFilterWithClassLoader loader1 = spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter1, narClassLoader);
-        field.set(dispatcher, ImmutableList.of(loader1));
+        if (setFilter) {
+            Field field = EntryFilterSupport.class.getDeclaredField("entryFilters");
+            field.setAccessible(true);
+            NarClassLoader narClassLoader = mock(NarClassLoader.class);
+            EntryFilter filter1 = new EntryFilterTest();
+            EntryFilterWithClassLoader loader1 =
+                    spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter1, narClassLoader);
+            field.set(dispatcher, ImmutableList.of(loader1));
+        }
 
         for (int i = 0; i < 100; i++) {
             producer.newMessage().property("ACCEPT", " ").value(UUID.randomUUID().toString()).send();
@@ -233,10 +241,18 @@ public class SubscriptionStatsTest extends ProducerConsumerBase {
                     .filter(m -> m.tags.get("subscription").equals(subName) && m.tags.get("topic").equals(topic))
                     .mapToDouble(m-> m.value).sum();
 
-            Assert.assertEquals(filterAccepted, 100);
-            if (isPersistent) {
-                Assert.assertEquals(filterRejected, 100);
-                Assert.assertEquals(throughFilter, filterAccepted + filterRejected + filterRescheduled, 0.01 * throughFilter);
+            if (setFilter) {
+                Assert.assertEquals(filterAccepted, 100);
+                if (isPersistent) {
+                    Assert.assertEquals(filterRejected, 100);
+                    // Only works on the test, if there are some markers, the filterProcessCount will be not equal with rejectedCount + rescheduledCount + acceptCount
+                    Assert.assertEquals(throughFilter, filterAccepted + filterRejected + filterRescheduled, 0.01 * throughFilter);
+                }
+            } else {
+                Assert.assertEquals(throughFilter, 0D);
+                Assert.assertEquals(filterAccepted, 0D);
+                Assert.assertEquals(filterRejected, 0D);
+                Assert.assertEquals(filterRescheduled, 0D);
             }
         } else {
             Assert.assertEquals(throughFilterMetrics.size(), 0);
@@ -245,22 +261,32 @@ public class SubscriptionStatsTest extends ProducerConsumerBase {
             Assert.assertEquals(rescheduledMetrics.size(), 0);
         }
 
-        testSubscriptionStatsAdminApi(topic, subName);
+        testSubscriptionStatsAdminApi(topic, subName, setFilter);
     }
 
-    private void testSubscriptionStatsAdminApi(String topic, String subName) throws Exception {
+    private void testSubscriptionStatsAdminApi(String topic, String subName, boolean setFilter) throws Exception {
         boolean persistent = TopicName.get(topic).isPersistent();
         TopicStats topicStats = admin.topics().getStats(topic);
         SubscriptionStats stats = topicStats.getSubscriptions().get(subName);
         Assert.assertNotNull(stats);
 
-        Assert.assertEquals(stats.getFilterAcceptedMsgCount(), 100);
-        if (persistent) {
-            Assert.assertEquals(stats.getFilterRejectedMsgCount(), 100);
-            Assert.assertEquals(stats.getFilterProcessedMsgCount(),
-                    stats.getFilterAcceptedMsgCount() + stats.getFilterRejectedMsgCount()
-                            + stats.getFilterRescheduledMsgCount(),
-                    0.01 * stats.getFilterProcessedMsgCount());
+        if (setFilter) {
+            Assert.assertEquals(stats.getFilterAcceptedMsgCount(), 100);
+            if (persistent) {
+                Assert.assertEquals(stats.getFilterRejectedMsgCount(), 100);
+                // Only works on the test, if there are some markers, the filterProcessCount will be not equal with rejectedCount + rescheduledCount + acceptCount
+                Assert.assertEquals(stats.getFilterProcessedMsgCount(),
+                        stats.getFilterAcceptedMsgCount() + stats.getFilterRejectedMsgCount()
+                                + stats.getFilterRescheduledMsgCount(),
+                        0.01 * stats.getFilterProcessedMsgCount());
+            }
+        } else {
+            Assert.assertEquals(stats.getFilterAcceptedMsgCount(), 0L);
+            if (persistent) {
+                Assert.assertEquals(stats.getFilterRejectedMsgCount(), 0L);
+                Assert.assertEquals(stats.getFilterAcceptedMsgCount(), 0L);
+                Assert.assertEquals(stats.getFilterRescheduledMsgCount(), 0L);
+            }
         }
     }
 }