You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/09/26 01:38:14 UTC
[pulsar] branch branch-2.11 updated: [monitoring][broker][fix] Fix EntryFilter stats (#17605)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new bc0acb96446 [monitoring][broker][fix] Fix EntryFilter stats (#17605)
bc0acb96446 is described below
commit bc0acb96446687d57431c838d848f2d69daa9d7e
Author: Tao Jiuming <95...@users.noreply.github.com>
AuthorDate: Wed Sep 21 21:30:03 2022 +0800
[monitoring][broker][fix] Fix EntryFilter stats (#17605)
* fix entryFilter stats
* fix test
* add test comment
* review fix
---
.../broker/service/AbstractBaseDispatcher.java | 17 +++--
.../pulsar/broker/stats/SubscriptionStatsTest.java | 76 +++++++++++++++-------
2 files changed, 64 insertions(+), 29 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index 22a5df2931e..1649516def1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -107,8 +107,9 @@ public abstract class AbstractBaseDispatcher extends EntryFilterSupport implemen
long totalBytes = 0;
int totalChunkedMessages = 0;
int totalEntries = 0;
- List<Position> entriesToFiltered = CollectionUtils.isNotEmpty(entryFilters) ? new ArrayList<>() : null;
- List<PositionImpl> entriesToRedeliver = CollectionUtils.isNotEmpty(entryFilters) ? new ArrayList<>() : null;
+ final boolean hasFilter = CollectionUtils.isNotEmpty(entryFilters);
+ List<Position> entriesToFiltered = hasFilter ? new ArrayList<>() : null;
+ List<PositionImpl> entriesToRedeliver = hasFilter ? new ArrayList<>() : null;
for (int i = 0, entriesSize = entries.size(); i < entriesSize; i++) {
final Entry entry = entries.get(i);
if (entry == null) {
@@ -123,18 +124,24 @@ public abstract class AbstractBaseDispatcher extends EntryFilterSupport implemen
);
int entryMsgCnt = msgMetadata == null ? 1 : msgMetadata.getNumMessagesInBatch();
- this.filterProcessedMsgs.add(entryMsgCnt);
+ if (hasFilter) {
+ this.filterProcessedMsgs.add(entryMsgCnt);
+ }
EntryFilter.FilterResult filterResult = runFiltersForEntry(entry, msgMetadata, consumer);
if (filterResult == EntryFilter.FilterResult.REJECT) {
entriesToFiltered.add(entry.getPosition());
entries.set(i, null);
+ // FilterResult will be always `ACCEPTED` when there is No Filter
+ // dont need to judge whether `hasFilter` is true or not.
this.filterRejectedMsgs.add(entryMsgCnt);
entry.release();
continue;
} else if (filterResult == EntryFilter.FilterResult.RESCHEDULE) {
entriesToRedeliver.add((PositionImpl) entry.getPosition());
entries.set(i, null);
+ // FilterResult will be always `ACCEPTED` when there is No Filter
+ // dont need to judge whether `hasFilter` is true or not.
this.filterRescheduledMsgs.add(entryMsgCnt);
entry.release();
continue;
@@ -174,7 +181,9 @@ public abstract class AbstractBaseDispatcher extends EntryFilterSupport implemen
continue;
}
- this.filterAcceptedMsgs.add(entryMsgCnt);
+ if (hasFilter) {
+ this.filterAcceptedMsgs.add(entryMsgCnt);
+ }
totalEntries++;
int batchSize = msgMetadata.getNumMessagesInBatch();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java
index f2794f68908..b1b865727a0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java
@@ -148,16 +148,21 @@ public class SubscriptionStatsTest extends ProducerConsumerBase {
@DataProvider(name = "testSubscriptionMetrics")
public Object[][] topicAndSubscription() {
return new Object[][]{
- {"persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub1", true},
- {"non-persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub2", true},
- {"persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub3", false},
- {"non-persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub4", false},
+ {"persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub1", true, true},
+ {"non-persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub2", true, true},
+ {"persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub3", false, true},
+ {"non-persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub4", false, true},
+
+ {"persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub1", true, false},
+ {"non-persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub2", true, false},
+ {"persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub3", false, false},
+ {"non-persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub4", false, false},
};
}
@Test(dataProvider = "testSubscriptionMetrics")
- public void testSubscriptionStats(final String topic, final String subName, boolean enableTopicStats)
- throws Exception {
+ public void testSubscriptionStats(final String topic, final String subName, boolean enableTopicStats,
+ boolean setFilter) throws Exception {
@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
@@ -175,12 +180,15 @@ public class SubscriptionStatsTest extends ProducerConsumerBase {
Dispatcher dispatcher = pulsar.getBrokerService().getTopic(topic, false).get()
.get().getSubscription(subName).getDispatcher();
- Field field = EntryFilterSupport.class.getDeclaredField("entryFilters");
- field.setAccessible(true);
- NarClassLoader narClassLoader = mock(NarClassLoader.class);
- EntryFilter filter1 = new EntryFilterTest();
- EntryFilterWithClassLoader loader1 = spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter1, narClassLoader);
- field.set(dispatcher, ImmutableList.of(loader1));
+ if (setFilter) {
+ Field field = EntryFilterSupport.class.getDeclaredField("entryFilters");
+ field.setAccessible(true);
+ NarClassLoader narClassLoader = mock(NarClassLoader.class);
+ EntryFilter filter1 = new EntryFilterTest();
+ EntryFilterWithClassLoader loader1 =
+ spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter1, narClassLoader);
+ field.set(dispatcher, ImmutableList.of(loader1));
+ }
for (int i = 0; i < 100; i++) {
producer.newMessage().property("ACCEPT", " ").value(UUID.randomUUID().toString()).send();
@@ -233,10 +241,18 @@ public class SubscriptionStatsTest extends ProducerConsumerBase {
.filter(m -> m.tags.get("subscription").equals(subName) && m.tags.get("topic").equals(topic))
.mapToDouble(m-> m.value).sum();
- Assert.assertEquals(filterAccepted, 100);
- if (isPersistent) {
- Assert.assertEquals(filterRejected, 100);
- Assert.assertEquals(throughFilter, filterAccepted + filterRejected + filterRescheduled, 0.01 * throughFilter);
+ if (setFilter) {
+ Assert.assertEquals(filterAccepted, 100);
+ if (isPersistent) {
+ Assert.assertEquals(filterRejected, 100);
+ // Only works on the test, if there are some markers, the filterProcessCount will be not equal with rejectedCount + rescheduledCount + acceptCount
+ Assert.assertEquals(throughFilter, filterAccepted + filterRejected + filterRescheduled, 0.01 * throughFilter);
+ }
+ } else {
+ Assert.assertEquals(throughFilter, 0D);
+ Assert.assertEquals(filterAccepted, 0D);
+ Assert.assertEquals(filterRejected, 0D);
+ Assert.assertEquals(filterRescheduled, 0D);
}
} else {
Assert.assertEquals(throughFilterMetrics.size(), 0);
@@ -245,22 +261,32 @@ public class SubscriptionStatsTest extends ProducerConsumerBase {
Assert.assertEquals(rescheduledMetrics.size(), 0);
}
- testSubscriptionStatsAdminApi(topic, subName);
+ testSubscriptionStatsAdminApi(topic, subName, setFilter);
}
- private void testSubscriptionStatsAdminApi(String topic, String subName) throws Exception {
+ private void testSubscriptionStatsAdminApi(String topic, String subName, boolean setFilter) throws Exception {
boolean persistent = TopicName.get(topic).isPersistent();
TopicStats topicStats = admin.topics().getStats(topic);
SubscriptionStats stats = topicStats.getSubscriptions().get(subName);
Assert.assertNotNull(stats);
- Assert.assertEquals(stats.getFilterAcceptedMsgCount(), 100);
- if (persistent) {
- Assert.assertEquals(stats.getFilterRejectedMsgCount(), 100);
- Assert.assertEquals(stats.getFilterProcessedMsgCount(),
- stats.getFilterAcceptedMsgCount() + stats.getFilterRejectedMsgCount()
- + stats.getFilterRescheduledMsgCount(),
- 0.01 * stats.getFilterProcessedMsgCount());
+ if (setFilter) {
+ Assert.assertEquals(stats.getFilterAcceptedMsgCount(), 100);
+ if (persistent) {
+ Assert.assertEquals(stats.getFilterRejectedMsgCount(), 100);
+ // Only works on the test, if there are some markers, the filterProcessCount will be not equal with rejectedCount + rescheduledCount + acceptCount
+ Assert.assertEquals(stats.getFilterProcessedMsgCount(),
+ stats.getFilterAcceptedMsgCount() + stats.getFilterRejectedMsgCount()
+ + stats.getFilterRescheduledMsgCount(),
+ 0.01 * stats.getFilterProcessedMsgCount());
+ }
+ } else {
+ Assert.assertEquals(stats.getFilterAcceptedMsgCount(), 0L);
+ if (persistent) {
+ Assert.assertEquals(stats.getFilterRejectedMsgCount(), 0L);
+ Assert.assertEquals(stats.getFilterAcceptedMsgCount(), 0L);
+ Assert.assertEquals(stats.getFilterRescheduledMsgCount(), 0L);
+ }
}
}
}