You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/08/08 13:53:54 UTC

[GitHub] [pulsar] asafm commented on a diff in pull request #16932: [broker][monitoring] add per-subscription EntryFilter metrics

asafm commented on code in PR #16932:
URL: https://github.com/apache/pulsar/pull/16932#discussion_r940190093


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java:
##########
@@ -116,15 +121,21 @@ public int filterEntriesForConsumer(Optional<MessageMetadata[]> optMetadataArray
                             ? ((EntryAndMetadata) entry).getMetadata()
                             : Commands.peekAndCopyMessageMetadata(metadataAndPayload, subscription.toString(), -1)
                     );
+
+            int entryMsgs = msgMetadata == null ? 1 : msgMetadata.getNumMessagesInBatch();

Review Comment:
   maybe `entryMsgsCount`



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java:
##########
@@ -51,6 +52,10 @@ public abstract class AbstractBaseDispatcher extends EntryFilterSupport implemen
 
     protected final ServiceConfiguration serviceConfig;
     protected final boolean dispatchThrottlingOnBatchMessageEnabled;
+    private final LongAdder throughFilterMsgs = new LongAdder();

Review Comment:
   Maybe `filterProcessedMsgs` or `filterIncomingMsgs`



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java:
##########
@@ -748,6 +749,14 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats
                 topicStatsStream.writePair("msgThroughputOut", subMsgThroughputOut);
                 topicStatsStream.writePair("msgRateRedeliver", subMsgRateRedeliver);
                 topicStatsStream.writePair("type", subscription.getTypeString());
+
+                // Write entry filter stats
+                Dispatcher dispatcher0 = subscription.getDispatcher();
+                topicStatsStream.writePair("throughEntryFilterMsgs", dispatcher0.getThroughFilterMsgCount());
+                topicStatsStream.writePair("entryFilterAccepted", dispatcher0.getFilterAcceptedMsgCount());

Review Comment:
   Just occurred to me: A subscription has a list of Entry Filter. Maybe it's more correct: `entryFiltersAccepted`.
   Also, on the first metric you end with units `Msgs`, but all other 3 no. I would `entryFiltersAcceptedMsgs` and same for other 2 metrics below
   



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java:
##########
@@ -134,4 +144,123 @@ public void testNonContiguousDeletedMessagesRanges() throws Exception {
                     .getNonContiguousDeletedMessagesRangesSerializedSize() > 0);
         });
     }
+
+    @DataProvider(name = "testSubscriptionMetrics")
+    public Object[][] topicAndSubscription() {
+        return new Object[][]{
+                {"persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub1", true},
+                {"non-persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub2", true},
+                {"persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub3", false},
+                {"non-persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub4", false},
+        };
+    }
+
+    @Test(dataProvider = "testSubscriptionMetrics")
+    public void testSubscriptionStats(final String topic, final String subName, boolean enableTopicStats)
+            throws Exception {
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .enableBatching(false)
+                .create();
+
+        @Cleanup
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionType(SubscriptionType.Exclusive)
+                .subscriptionName(subName)
+                .subscribe();
+
+        boolean isPersistent = pulsar.getBrokerService().getTopic(topic, false).get().get().isPersistent();
+        Dispatcher dispatcher = pulsar.getBrokerService().getTopic(topic, false).get()
+                .get().getSubscription(subName).getDispatcher();
+
+        Field field = EntryFilterSupport.class.getDeclaredField("entryFilters");
+        field.setAccessible(true);
+        NarClassLoader narClassLoader = mock(NarClassLoader.class);
+        EntryFilter filter1 = new EntryFilterTest();
+        EntryFilterWithClassLoader loader1 = spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter1, narClassLoader);
+        field.set(dispatcher, ImmutableList.of(loader1));
+
+        for (int i = 0; i < 100; i++) {
+            producer.newMessage().property("ACCEPT", " ").value(UUID.randomUUID().toString()).send();
+        }
+        for (int i = 0; i < 100; i++) {
+            producer.newMessage().property("REJECT", " ").value(UUID.randomUUID().toString()).send();
+        }
+        for (int i = 0; i < 100; i++) {
+            producer.newMessage().property("RESCHEDULE", " ").value(UUID.randomUUID().toString()).send();
+        }
+
+        for (;;) {
+            Message<String> message = consumer.receive(10, TimeUnit.SECONDS);
+            if (message == null) {
+                break;
+            }
+            consumer.acknowledge(message);
+        }
+
+        ByteArrayOutputStream output = new ByteArrayOutputStream();
+        PrometheusMetricsGenerator.generate(pulsar, enableTopicStats, false, false, output);
+        String metricsStr = output.toString();
+        Multimap<String, PrometheusMetricsTest.Metric> metrics = PrometheusMetricsTest.parseMetrics(metricsStr);
+
+        Collection<PrometheusMetricsTest.Metric> throughFilterMetrics =
+                metrics.get("pulsar_subscription_through_filter_msg_count");
+        Collection<PrometheusMetricsTest.Metric> acceptedMetrics =
+                metrics.get("pulsar_subscription_filter_accepted_msg_count");
+        Collection<PrometheusMetricsTest.Metric> rejectedMetrics =
+                metrics.get("pulsar_subscription_filter_rejected_msg_count");
+        Collection<PrometheusMetricsTest.Metric> rescheduledMetrics =
+                metrics.get("pulsar_subscription_filter_rescheduled_msg_count");
+
+        if (enableTopicStats) {
+            Assert.assertTrue(throughFilterMetrics.size() > 0);
+            Assert.assertTrue(acceptedMetrics.size() > 0);
+            Assert.assertTrue(rejectedMetrics.size() > 0);
+            Assert.assertTrue(rescheduledMetrics.size() > 0);
+
+            double throughFilter = throughFilterMetrics.stream()
+                    .filter(m -> m.tags.get("subscription").equals(subName) && m.tags.get("topic").equals(topic))
+                    .mapToDouble(m-> m.value).sum();
+            double filterAccepted = acceptedMetrics.stream()
+                    .filter(m -> m.tags.get("subscription").equals(subName) && m.tags.get("topic").equals(topic))
+                    .mapToDouble(m-> m.value).sum();
+            double filterRejected = rejectedMetrics.stream()
+                    .filter(m -> m.tags.get("subscription").equals(subName) && m.tags.get("topic").equals(topic))
+                    .mapToDouble(m-> m.value).sum();
+            double filterRescheduled = rescheduledMetrics.stream()
+                    .filter(m -> m.tags.get("subscription").equals(subName) && m.tags.get("topic").equals(topic))
+                    .mapToDouble(m-> m.value).sum();
+
+            Assert.assertEquals(filterAccepted, 100);
+            if (isPersistent) {
+                Assert.assertEquals(filterRejected, 100);
+                Assert.assertEquals(throughFilter, filterAccepted + filterRejected + filterRescheduled, 0.01 * throughFilter);
+            }
+        } else {
+            Assert.assertEquals(throughFilterMetrics.size(), 0);

Review Comment:
   Why 0?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java:
##########
@@ -748,6 +749,14 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats
                 topicStatsStream.writePair("msgThroughputOut", subMsgThroughputOut);
                 topicStatsStream.writePair("msgRateRedeliver", subMsgRateRedeliver);
                 topicStatsStream.writePair("type", subscription.getTypeString());
+
+                // Write entry filter stats
+                Dispatcher dispatcher0 = subscription.getDispatcher();

Review Comment:
   According to the code below, the block you've added should be inside `if (subscription.getDispatcher() != null) { ... }` block



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java:
##########
@@ -116,15 +121,21 @@ public int filterEntriesForConsumer(Optional<MessageMetadata[]> optMetadataArray
                             ? ((EntryAndMetadata) entry).getMetadata()
                             : Commands.peekAndCopyMessageMetadata(metadataAndPayload, subscription.toString(), -1)
                     );
+
+            int entryMsgs = msgMetadata == null ? 1 : msgMetadata.getNumMessagesInBatch();
+            this.throughFilterMsgs.add(entryMsgs);
+
             EntryFilter.FilterResult filterResult = runFiltersForEntry(entry, msgMetadata, consumer);
             if (filterResult == EntryFilter.FilterResult.REJECT) {
                 entriesToFiltered.add(entry.getPosition());
                 entries.set(i, null);
+                this.filterRejectedMsgs.add(entryMsgs);

Review Comment:
   ```suggestion
                   filterRejectedMsgs.add(entryMsgs);
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java:
##########
@@ -285,4 +298,24 @@ protected byte[] peekStickyKey(ByteBuf metadataAndPayload) {
     protected String getSubscriptionName() {
         return subscription == null ? null : subscription.getName();
     }
+
+    @Override
+    public long getThroughFilterMsgCount() {
+        return this.throughFilterMsgs.longValue();

Review Comment:
   ```suggestion
           return throughFilterMsgs.longValue();
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java:
##########
@@ -51,6 +52,10 @@ public abstract class AbstractBaseDispatcher extends EntryFilterSupport implemen
 
     protected final ServiceConfiguration serviceConfig;
     protected final boolean dispatchThrottlingOnBatchMessageEnabled;
+    private final LongAdder throughFilterMsgs = new LongAdder();
+    private final LongAdder filterAcceptedMsgs = new LongAdder();

Review Comment:
   As I understand there are two entities that require counting here:
   1. Entries - How many entries were rejected/accepted/processed/rescheduled
   2. Msgs - since an entry might be a batch messages entry, we can also count the same by in msgs, not in entries.
   
   The only thing I'm contemplating here is that EntryFilter makes decisions regarding entries, not messages, so I'm thinking if it makes sense to count only in messages.
   



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java:
##########
@@ -281,6 +281,16 @@ static void printTopicStats(SimpleTextOutputStream stream, String cluster, Strin
                     subsStats.msgDropRate, splitTopicAndPartitionIndexLabel);
             metric(stream, cluster, namespace, topic, n, "pulsar_subscription_consumers_count",
                     subsStats.consumersCount, splitTopicAndPartitionIndexLabel);
+
+            metric(stream, cluster, namespace, topic, n, "pulsar_subscription_through_filter_msg_count",

Review Comment:
   Same comment above, but here it should to be consistent `pulsar_subscription_filter_processed_msgs_count`



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java:
##########
@@ -134,4 +144,123 @@ public void testNonContiguousDeletedMessagesRanges() throws Exception {
                     .getNonContiguousDeletedMessagesRangesSerializedSize() > 0);
         });
     }
+
+    @DataProvider(name = "testSubscriptionMetrics")
+    public Object[][] topicAndSubscription() {
+        return new Object[][]{
+                {"persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub1", true},
+                {"non-persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub2", true},
+                {"persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub3", false},
+                {"non-persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub4", false},
+        };
+    }
+
+    @Test(dataProvider = "testSubscriptionMetrics")
+    public void testSubscriptionStats(final String topic, final String subName, boolean enableTopicStats)
+            throws Exception {
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .enableBatching(false)
+                .create();
+
+        @Cleanup
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionType(SubscriptionType.Exclusive)
+                .subscriptionName(subName)
+                .subscribe();
+
+        boolean isPersistent = pulsar.getBrokerService().getTopic(topic, false).get().get().isPersistent();
+        Dispatcher dispatcher = pulsar.getBrokerService().getTopic(topic, false).get()
+                .get().getSubscription(subName).getDispatcher();
+
+        Field field = EntryFilterSupport.class.getDeclaredField("entryFilters");
+        field.setAccessible(true);
+        NarClassLoader narClassLoader = mock(NarClassLoader.class);
+        EntryFilter filter1 = new EntryFilterTest();
+        EntryFilterWithClassLoader loader1 = spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter1, narClassLoader);
+        field.set(dispatcher, ImmutableList.of(loader1));
+
+        for (int i = 0; i < 100; i++) {
+            producer.newMessage().property("ACCEPT", " ").value(UUID.randomUUID().toString()).send();
+        }
+        for (int i = 0; i < 100; i++) {
+            producer.newMessage().property("REJECT", " ").value(UUID.randomUUID().toString()).send();
+        }
+        for (int i = 0; i < 100; i++) {
+            producer.newMessage().property("RESCHEDULE", " ").value(UUID.randomUUID().toString()).send();
+        }
+
+        for (;;) {

Review Comment:
   `while (true) {`
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org