You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/08/03 14:00:21 UTC

[GitHub] [pulsar] tjiuming opened a new pull request, #16932: [broker][monitoring] add Subscription level EntryFilter stats

tjiuming opened a new pull request, #16932:
URL: https://github.com/apache/pulsar/pull/16932

   Fixes https://github.com/apache/pulsar/issues/16596
   
   ### Motivation
   
   add per-subscription `EntryFilter` metrics for PersistentTopic and NonPersistentTopic.
   
   ### Modifications
   
   Add the following metrics to Prometheus:
   1. pulsar_subscription_through_filter_msg_count
   2. pulsar_subscription_filter_accepted_msg_count
   3. pulsar_subscription_filter_rejected_msg_count
   4. pulsar_subscription_filter_rescheduled_msg_count
   
   Add the following metrics to admin-api:
   1. throughEntryFilterMsgs
   2. entryFilterAccepted
   3. entryFilterRejected
   4. entryFilterRescheduled
   
   ### Documentation
   
   Check the box below or label this PR directly.
   
   Need to update docs? 
   
   - [x] `doc-required` 
   (Your PR needs to update docs and you will update later)
     
   - [ ] `doc-not-needed` 
   (Please explain why)
     
   - [ ] `doc` 
   (Your PR contains doc changes)
   
   - [ ] `doc-complete`
   (Docs have been already added)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] michaeljmarshall commented on a diff in pull request #16932: [broker][monitoring] add per-subscription EntryFilter metrics

Posted by GitBox <gi...@apache.org>.
michaeljmarshall commented on code in PR #16932:
URL: https://github.com/apache/pulsar/pull/16932#discussion_r942036529


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java:
##########
@@ -51,6 +52,10 @@ public abstract class AbstractBaseDispatcher extends EntryFilterSupport implemen
 
     protected final ServiceConfiguration serviceConfig;
     protected final boolean dispatchThrottlingOnBatchMessageEnabled;
+    private final LongAdder throughFilterMsgs = new LongAdder();
+    private final LongAdder filterAcceptedMsgs = new LongAdder();

Review Comment:
   This is a good discussion to have. Note that the `filterEntriesForConsumer` method, which we are instrumenting uses `entries` throughout. At first, I thought entries would make more sense. However, I think total messages makes sense here. One interesting point is that the `msgOutCounter` measures the total messages by summing all of the messages within a batched message. Ultimately, I think the final solution should be consistent.
   
   The current solution looks good to me.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -1756,6 +1756,13 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats
                 topicStatsStream.writePair("totalNonContiguousDeletedMessagesRange",
                         subscription.getTotalNonContiguousDeletedMessagesRange());
                 topicStatsStream.writePair("type", subscription.getTypeString());
+
+                Dispatcher dispatcher0 = subscription.getDispatcher();
+                topicStatsStream.writePair("throughEntryFilterMsgs", dispatcher0.getThroughFilterMsgCount());
+                topicStatsStream.writePair("entryFilterAccepted", dispatcher0.getFilterAcceptedMsgCount());
+                topicStatsStream.writePair("entryFilterRejected", dispatcher0.getFilterRejectedMsgCount());
+                topicStatsStream.writePair("entryFilterRescheduled", dispatcher0.getFilterRescheduledMsgCount());

Review Comment:
   I know this is being discussed elsewhere in the PR, but just want to point out that the methods have `Msg` and the metrics have `entry`. We should make sure that the end result is consistent.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] tjiuming commented on a diff in pull request #16932: [broker][monitoring] add per-subscription EntryFilter metrics

Posted by GitBox <gi...@apache.org>.
tjiuming commented on code in PR #16932:
URL: https://github.com/apache/pulsar/pull/16932#discussion_r943223099


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java:
##########
@@ -748,6 +749,14 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats
                 topicStatsStream.writePair("msgThroughputOut", subMsgThroughputOut);
                 topicStatsStream.writePair("msgRateRedeliver", subMsgRateRedeliver);
                 topicStatsStream.writePair("type", subscription.getTypeString());
+
+                // Write entry filter stats
+                Dispatcher dispatcher0 = subscription.getDispatcher();

Review Comment:
   you're right, dispatcher can be null



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] michaeljmarshall commented on a diff in pull request #16932: [broker][monitoring] add per-subscription EntryFilter metrics

Posted by GitBox <gi...@apache.org>.
michaeljmarshall commented on code in PR #16932:
URL: https://github.com/apache/pulsar/pull/16932#discussion_r945956785


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java:
##########
@@ -748,6 +749,20 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats
                 topicStatsStream.writePair("msgThroughputOut", subMsgThroughputOut);
                 topicStatsStream.writePair("msgRateRedeliver", subMsgRateRedeliver);
                 topicStatsStream.writePair("type", subscription.getTypeString());
+
+                // Write entry filter stats
+                Dispatcher dispatcher0 = subscription.getDispatcher();
+                if (null != dispatcher0) {
+                    topicStatsStream.writePair("entryFilterProcessedMsgs",
+                            dispatcher0.getFilterProcessedMsgCount());
+                    topicStatsStream.writePair("entryFilterAcceptedMsgs",
+                            dispatcher0.getFilterAcceptedMsgCount());
+                    topicStatsStream.writePair("entryFilterRejectedMsgs",
+                            dispatcher0.getFilterRejectedMsgCount());
+                    topicStatsStream.writePair("entryFilterRescheduledMsgs",
+                            dispatcher0.getFilterRescheduledMsgCount());

Review Comment:
   We use `MsgCount` elsewhere, and it'd be great to make it consistent. Also, I don't think we need the `entry` filter prefix.  
   ```suggestion
                       topicStatsStream.writePair("filterProcessedMsgCount",
                               dispatcher0.getFilterProcessedMsgCount());
                       topicStatsStream.writePair("filterAcceptedMsgCount",
                               dispatcher0.getFilterAcceptedMsgCount());
                       topicStatsStream.writePair("filterRejectedMsgCount",
                               dispatcher0.getFilterRejectedMsgCount());
                       topicStatsStream.writePair("filterRescheduledMsgCount",
                               dispatcher0.getFilterRescheduledMsgCount());
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -1756,6 +1756,19 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats
                 topicStatsStream.writePair("totalNonContiguousDeletedMessagesRange",
                         subscription.getTotalNonContiguousDeletedMessagesRange());
                 topicStatsStream.writePair("type", subscription.getTypeString());
+
+                Dispatcher dispatcher0 = subscription.getDispatcher();
+                if (null != dispatcher0) {
+                    topicStatsStream.writePair("entryFilterProcessedMsgs",
+                            dispatcher0.getFilterProcessedMsgCount());
+                    topicStatsStream.writePair("entryFilterAcceptedMsgs",
+                            dispatcher0.getFilterAcceptedMsgCount());
+                    topicStatsStream.writePair("entryFilterRejectedMsgs",
+                            dispatcher0.getFilterRejectedMsgCount());
+                    topicStatsStream.writePair("entryFilterRescheduledMsgs",
+                            dispatcher0.getFilterRescheduledMsgCount());

Review Comment:
   Same suggestion.
   ```suggestion
                       topicStatsStream.writePair("filterProcessedMsgCount",
                               dispatcher0.getFilterProcessedMsgCount());
                       topicStatsStream.writePair("filterAcceptedMsgCount",
                               dispatcher0.getFilterAcceptedMsgCount());
                       topicStatsStream.writePair("filterRejectedMsgCount",
                               dispatcher0.getFilterRejectedMsgCount());
                       topicStatsStream.writePair("filterRescheduledMsgCount",
                               dispatcher0.getFilterRescheduledMsgCount());
   ```



##########
pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java:
##########
@@ -137,6 +137,14 @@ public class SubscriptionStatsImpl implements SubscriptionStats {
     /** SubscriptionProperties (key/value strings) associated with this subscribe. */
     public Map<String, String> subscriptionProperties;
 
+    public long throughFilterMsgCount;

Review Comment:
   ```suggestion
       public long filterProcessedMsgCount;
   ```



##########
pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java:
##########
@@ -126,4 +126,12 @@ public interface SubscriptionStats {
 
     /** The serialized size of non-contiguous deleted messages ranges. */
     int getNonContiguousDeletedMessagesRangesSerializedSize();
+
+    long getThroughFilterMsgCount();

Review Comment:
   ```suggestion
       long getFilterProcessedMsgCount();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] tjiuming commented on pull request #16932: [broker][monitoring] add Subscription level EntryFilter stats

Posted by GitBox <gi...@apache.org>.
tjiuming commented on PR #16932:
URL: https://github.com/apache/pulsar/pull/16932#issuecomment-1203991465

   @eolivelli @michaeljmarshall PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] michaeljmarshall commented on a diff in pull request #16932: [broker][monitoring] add per-subscription EntryFilter metrics

Posted by GitBox <gi...@apache.org>.
michaeljmarshall commented on code in PR #16932:
URL: https://github.com/apache/pulsar/pull/16932#discussion_r944093002


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java:
##########
@@ -748,6 +749,18 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats
                 topicStatsStream.writePair("msgThroughputOut", subMsgThroughputOut);
                 topicStatsStream.writePair("msgRateRedeliver", subMsgRateRedeliver);
                 topicStatsStream.writePair("type", subscription.getTypeString());
+
+                // Write entry filter stats
+                Dispatcher dispatcher0 = subscription.getDispatcher();
+                topicStatsStream.writePair("entryFilterProccessedMsgs",
+                        dispatcher0 == null ? 0 : dispatcher0.getFilterProcessesMsgsCount());
+                topicStatsStream.writePair("entryFilterAcceptedMsgs",
+                        dispatcher0 == null ? 0 : dispatcher0.getFilterAcceptedMsgsCount());
+                topicStatsStream.writePair("entryFilterRejectedMsgs",
+                        dispatcher0 == null ? 0 : dispatcher0.getFilterRejectedMsgsCount());
+                topicStatsStream.writePair("entryFilterRescheduledMsgs",
+                        dispatcher0 == null ? 0 : dispatcher0.getFilterRescheduledMsgsCount());

Review Comment:
   Nit: it seems like we could consolidate this logic with the `if (subscription.getDispatcher() != null)` block below to minimize the clutter of the `null` checks for the `dispatcher`.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java:
##########
@@ -64,5 +64,13 @@ public class AggregatedSubscriptionStats {
 
     long consumersCount;
 
+    long throughFilterMsgCount;

Review Comment:
   Question about this term. It maps to the metric `pulsar_subscription_filter_processed_msg_count`. In order to make it really clear that the mapping is correct, I think it might make sense to use `filterProcessedMsgCount` for this variable. What do you think?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java:
##########
@@ -129,4 +129,21 @@ default boolean checkAndUnblockIfStuck() {
         return false;
     }
 
+
+    default long getFilterProcessesMsgsCount() {

Review Comment:
   Nit: it is `Processed` elsewhere. I think it should be `Processed` here too, given that all of the other fields and methods are using the past tense.
   
   ```suggestion
       default long getFilterProcessedMsgsCount() {
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java:
##########
@@ -129,4 +129,21 @@ default boolean checkAndUnblockIfStuck() {
         return false;
     }
 
+
+    default long getFilterProcessesMsgsCount() {
+        return 0;
+    }
+
+    default long getFilterAcceptedMsgsCount() {
+        return 0;
+    }
+
+    default long getFilterRejectedMsgsCount() {
+        return 0;
+    }
+
+    default long getFilterRescheduledMsgsCount() {
+        return 0;
+    }

Review Comment:
   This is pretty minor, but for each of these, I think we might want to consider using `Msg` instead of `Msgs`. There are many usages of `MsgCount` in the project, but none of `MsgsCount`. If you feel strongly, you can ignore this comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lhotari commented on a diff in pull request #16932: [broker][monitoring] add per-subscription EntryFilter metrics

Posted by GitBox <gi...@apache.org>.
lhotari commented on code in PR #16932:
URL: https://github.com/apache/pulsar/pull/16932#discussion_r944528151


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -1756,6 +1756,19 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats
                 topicStatsStream.writePair("totalNonContiguousDeletedMessagesRange",
                         subscription.getTotalNonContiguousDeletedMessagesRange());
                 topicStatsStream.writePair("type", subscription.getTypeString());
+
+                Dispatcher dispatcher0 = subscription.getDispatcher();
+                if (null != dispatcher0) {
+                    topicStatsStream.writePair("entryFilterProccessedMsgs",

Review Comment:
   ```suggestion
                       topicStatsStream.writePair("entryFilterProcessedMsgs",
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] michaeljmarshall commented on pull request #16932: [broker][monitoring] add per-subscription EntryFilter metrics

Posted by GitBox <gi...@apache.org>.
michaeljmarshall commented on PR #16932:
URL: https://github.com/apache/pulsar/pull/16932#issuecomment-1215518681

   @tjiuming - thanks! Do you feel strongly about keeping `Msgs` in the admin stats output, or can we change it to `MsgCount`? I like the latter because it matches the prometheus metric exactly then.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] tjiuming commented on pull request #16932: [broker][monitoring] add per-subscription EntryFilter metrics

Posted by GitBox <gi...@apache.org>.
tjiuming commented on PR #16932:
URL: https://github.com/apache/pulsar/pull/16932#issuecomment-1215514272

   @michaeljmarshall many thanks for you review! I've resolved all the suggestions, PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] michaeljmarshall merged pull request #16932: [broker][monitoring] add per-subscription EntryFilter metrics

Posted by GitBox <gi...@apache.org>.
michaeljmarshall merged PR #16932:
URL: https://github.com/apache/pulsar/pull/16932


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] tjiuming commented on a diff in pull request #16932: [broker][monitoring] add per-subscription EntryFilter metrics

Posted by GitBox <gi...@apache.org>.
tjiuming commented on code in PR #16932:
URL: https://github.com/apache/pulsar/pull/16932#discussion_r940965038


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java:
##########
@@ -51,6 +52,10 @@ public abstract class AbstractBaseDispatcher extends EntryFilterSupport implemen
 
     protected final ServiceConfiguration serviceConfig;
     protected final boolean dispatchThrottlingOnBatchMessageEnabled;
+    private final LongAdder throughFilterMsgs = new LongAdder();
+    private final LongAdder filterAcceptedMsgs = new LongAdder();

Review Comment:
   the origin issue requires messages metrics



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] tjiuming commented on a diff in pull request #16932: [broker][monitoring] add per-subscription EntryFilter metrics

Posted by GitBox <gi...@apache.org>.
tjiuming commented on code in PR #16932:
URL: https://github.com/apache/pulsar/pull/16932#discussion_r940967398


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java:
##########
@@ -134,4 +144,123 @@ public void testNonContiguousDeletedMessagesRanges() throws Exception {
                     .getNonContiguousDeletedMessagesRangesSerializedSize() > 0);
         });
     }
+
+    @DataProvider(name = "testSubscriptionMetrics")
+    public Object[][] topicAndSubscription() {
+        return new Object[][]{
+                {"persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub1", true},
+                {"non-persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub2", true},
+                {"persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub3", false},
+                {"non-persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub4", false},
+        };
+    }
+
+    @Test(dataProvider = "testSubscriptionMetrics")
+    public void testSubscriptionStats(final String topic, final String subName, boolean enableTopicStats)
+            throws Exception {
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .enableBatching(false)
+                .create();
+
+        @Cleanup
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionType(SubscriptionType.Exclusive)
+                .subscriptionName(subName)
+                .subscribe();
+
+        boolean isPersistent = pulsar.getBrokerService().getTopic(topic, false).get().get().isPersistent();
+        Dispatcher dispatcher = pulsar.getBrokerService().getTopic(topic, false).get()
+                .get().getSubscription(subName).getDispatcher();
+
+        Field field = EntryFilterSupport.class.getDeclaredField("entryFilters");
+        field.setAccessible(true);
+        NarClassLoader narClassLoader = mock(NarClassLoader.class);
+        EntryFilter filter1 = new EntryFilterTest();
+        EntryFilterWithClassLoader loader1 = spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter1, narClassLoader);
+        field.set(dispatcher, ImmutableList.of(loader1));
+
+        for (int i = 0; i < 100; i++) {
+            producer.newMessage().property("ACCEPT", " ").value(UUID.randomUUID().toString()).send();
+        }
+        for (int i = 0; i < 100; i++) {
+            producer.newMessage().property("REJECT", " ").value(UUID.randomUUID().toString()).send();
+        }
+        for (int i = 0; i < 100; i++) {
+            producer.newMessage().property("RESCHEDULE", " ").value(UUID.randomUUID().toString()).send();
+        }
+
+        for (;;) {
+            Message<String> message = consumer.receive(10, TimeUnit.SECONDS);
+            if (message == null) {
+                break;
+            }
+            consumer.acknowledge(message);
+        }
+
+        ByteArrayOutputStream output = new ByteArrayOutputStream();
+        PrometheusMetricsGenerator.generate(pulsar, enableTopicStats, false, false, output);
+        String metricsStr = output.toString();
+        Multimap<String, PrometheusMetricsTest.Metric> metrics = PrometheusMetricsTest.parseMetrics(metricsStr);
+
+        Collection<PrometheusMetricsTest.Metric> throughFilterMetrics =
+                metrics.get("pulsar_subscription_through_filter_msg_count");
+        Collection<PrometheusMetricsTest.Metric> acceptedMetrics =
+                metrics.get("pulsar_subscription_filter_accepted_msg_count");
+        Collection<PrometheusMetricsTest.Metric> rejectedMetrics =
+                metrics.get("pulsar_subscription_filter_rejected_msg_count");
+        Collection<PrometheusMetricsTest.Metric> rescheduledMetrics =
+                metrics.get("pulsar_subscription_filter_rescheduled_msg_count");
+
+        if (enableTopicStats) {
+            Assert.assertTrue(throughFilterMetrics.size() > 0);
+            Assert.assertTrue(acceptedMetrics.size() > 0);
+            Assert.assertTrue(rejectedMetrics.size() > 0);
+            Assert.assertTrue(rescheduledMetrics.size() > 0);
+
+            double throughFilter = throughFilterMetrics.stream()
+                    .filter(m -> m.tags.get("subscription").equals(subName) && m.tags.get("topic").equals(topic))
+                    .mapToDouble(m-> m.value).sum();
+            double filterAccepted = acceptedMetrics.stream()
+                    .filter(m -> m.tags.get("subscription").equals(subName) && m.tags.get("topic").equals(topic))
+                    .mapToDouble(m-> m.value).sum();
+            double filterRejected = rejectedMetrics.stream()
+                    .filter(m -> m.tags.get("subscription").equals(subName) && m.tags.get("topic").equals(topic))
+                    .mapToDouble(m-> m.value).sum();
+            double filterRescheduled = rescheduledMetrics.stream()
+                    .filter(m -> m.tags.get("subscription").equals(subName) && m.tags.get("topic").equals(topic))
+                    .mapToDouble(m-> m.value).sum();
+
+            Assert.assertEquals(filterAccepted, 100);
+            if (isPersistent) {
+                Assert.assertEquals(filterRejected, 100);
+                Assert.assertEquals(throughFilter, filterAccepted + filterRejected + filterRescheduled, 0.01 * throughFilter);
+            }
+        } else {
+            Assert.assertEquals(throughFilterMetrics.size(), 0);

Review Comment:
   per-subscription metrics are disabled when `exposeTopicLevelMetricsInPrometheus=false`, so it has to be 0



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] asafm commented on a diff in pull request #16932: [broker][monitoring] add per-subscription EntryFilter metrics

Posted by GitBox <gi...@apache.org>.
asafm commented on code in PR #16932:
URL: https://github.com/apache/pulsar/pull/16932#discussion_r940190093


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java:
##########
@@ -116,15 +121,21 @@ public int filterEntriesForConsumer(Optional<MessageMetadata[]> optMetadataArray
                             ? ((EntryAndMetadata) entry).getMetadata()
                             : Commands.peekAndCopyMessageMetadata(metadataAndPayload, subscription.toString(), -1)
                     );
+
+            int entryMsgs = msgMetadata == null ? 1 : msgMetadata.getNumMessagesInBatch();

Review Comment:
   maybe `entryMsgsCount`



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java:
##########
@@ -51,6 +52,10 @@ public abstract class AbstractBaseDispatcher extends EntryFilterSupport implemen
 
     protected final ServiceConfiguration serviceConfig;
     protected final boolean dispatchThrottlingOnBatchMessageEnabled;
+    private final LongAdder throughFilterMsgs = new LongAdder();

Review Comment:
   Maybe `filterProcessedMsgs` or `filterIncomingMsgs`



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java:
##########
@@ -748,6 +749,14 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats
                 topicStatsStream.writePair("msgThroughputOut", subMsgThroughputOut);
                 topicStatsStream.writePair("msgRateRedeliver", subMsgRateRedeliver);
                 topicStatsStream.writePair("type", subscription.getTypeString());
+
+                // Write entry filter stats
+                Dispatcher dispatcher0 = subscription.getDispatcher();
+                topicStatsStream.writePair("throughEntryFilterMsgs", dispatcher0.getThroughFilterMsgCount());
+                topicStatsStream.writePair("entryFilterAccepted", dispatcher0.getFilterAcceptedMsgCount());

Review Comment:
   Just occurred to me: A subscription has a list of Entry Filter. Maybe it's more correct: `entryFiltersAccepted`.
   Also, on the first metric you end with units `Msgs`, but all other 3 no. I would `entryFiltersAcceptedMsgs` and same for other 2 metrics below
   



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java:
##########
@@ -134,4 +144,123 @@ public void testNonContiguousDeletedMessagesRanges() throws Exception {
                     .getNonContiguousDeletedMessagesRangesSerializedSize() > 0);
         });
     }
+
+    @DataProvider(name = "testSubscriptionMetrics")
+    public Object[][] topicAndSubscription() {
+        return new Object[][]{
+                {"persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub1", true},
+                {"non-persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub2", true},
+                {"persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub3", false},
+                {"non-persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub4", false},
+        };
+    }
+
+    @Test(dataProvider = "testSubscriptionMetrics")
+    public void testSubscriptionStats(final String topic, final String subName, boolean enableTopicStats)
+            throws Exception {
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .enableBatching(false)
+                .create();
+
+        @Cleanup
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionType(SubscriptionType.Exclusive)
+                .subscriptionName(subName)
+                .subscribe();
+
+        boolean isPersistent = pulsar.getBrokerService().getTopic(topic, false).get().get().isPersistent();
+        Dispatcher dispatcher = pulsar.getBrokerService().getTopic(topic, false).get()
+                .get().getSubscription(subName).getDispatcher();
+
+        Field field = EntryFilterSupport.class.getDeclaredField("entryFilters");
+        field.setAccessible(true);
+        NarClassLoader narClassLoader = mock(NarClassLoader.class);
+        EntryFilter filter1 = new EntryFilterTest();
+        EntryFilterWithClassLoader loader1 = spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter1, narClassLoader);
+        field.set(dispatcher, ImmutableList.of(loader1));
+
+        for (int i = 0; i < 100; i++) {
+            producer.newMessage().property("ACCEPT", " ").value(UUID.randomUUID().toString()).send();
+        }
+        for (int i = 0; i < 100; i++) {
+            producer.newMessage().property("REJECT", " ").value(UUID.randomUUID().toString()).send();
+        }
+        for (int i = 0; i < 100; i++) {
+            producer.newMessage().property("RESCHEDULE", " ").value(UUID.randomUUID().toString()).send();
+        }
+
+        for (;;) {
+            Message<String> message = consumer.receive(10, TimeUnit.SECONDS);
+            if (message == null) {
+                break;
+            }
+            consumer.acknowledge(message);
+        }
+
+        ByteArrayOutputStream output = new ByteArrayOutputStream();
+        PrometheusMetricsGenerator.generate(pulsar, enableTopicStats, false, false, output);
+        String metricsStr = output.toString();
+        Multimap<String, PrometheusMetricsTest.Metric> metrics = PrometheusMetricsTest.parseMetrics(metricsStr);
+
+        Collection<PrometheusMetricsTest.Metric> throughFilterMetrics =
+                metrics.get("pulsar_subscription_through_filter_msg_count");
+        Collection<PrometheusMetricsTest.Metric> acceptedMetrics =
+                metrics.get("pulsar_subscription_filter_accepted_msg_count");
+        Collection<PrometheusMetricsTest.Metric> rejectedMetrics =
+                metrics.get("pulsar_subscription_filter_rejected_msg_count");
+        Collection<PrometheusMetricsTest.Metric> rescheduledMetrics =
+                metrics.get("pulsar_subscription_filter_rescheduled_msg_count");
+
+        if (enableTopicStats) {
+            Assert.assertTrue(throughFilterMetrics.size() > 0);
+            Assert.assertTrue(acceptedMetrics.size() > 0);
+            Assert.assertTrue(rejectedMetrics.size() > 0);
+            Assert.assertTrue(rescheduledMetrics.size() > 0);
+
+            double throughFilter = throughFilterMetrics.stream()
+                    .filter(m -> m.tags.get("subscription").equals(subName) && m.tags.get("topic").equals(topic))
+                    .mapToDouble(m-> m.value).sum();
+            double filterAccepted = acceptedMetrics.stream()
+                    .filter(m -> m.tags.get("subscription").equals(subName) && m.tags.get("topic").equals(topic))
+                    .mapToDouble(m-> m.value).sum();
+            double filterRejected = rejectedMetrics.stream()
+                    .filter(m -> m.tags.get("subscription").equals(subName) && m.tags.get("topic").equals(topic))
+                    .mapToDouble(m-> m.value).sum();
+            double filterRescheduled = rescheduledMetrics.stream()
+                    .filter(m -> m.tags.get("subscription").equals(subName) && m.tags.get("topic").equals(topic))
+                    .mapToDouble(m-> m.value).sum();
+
+            Assert.assertEquals(filterAccepted, 100);
+            if (isPersistent) {
+                Assert.assertEquals(filterRejected, 100);
+                Assert.assertEquals(throughFilter, filterAccepted + filterRejected + filterRescheduled, 0.01 * throughFilter);
+            }
+        } else {
+            Assert.assertEquals(throughFilterMetrics.size(), 0);

Review Comment:
   Why 0?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java:
##########
@@ -748,6 +749,14 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats
                 topicStatsStream.writePair("msgThroughputOut", subMsgThroughputOut);
                 topicStatsStream.writePair("msgRateRedeliver", subMsgRateRedeliver);
                 topicStatsStream.writePair("type", subscription.getTypeString());
+
+                // Write entry filter stats
+                Dispatcher dispatcher0 = subscription.getDispatcher();

Review Comment:
   According to the code below, the block you've added should be inside `if (subscription.getDispatcher() != null) { ... }` block



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java:
##########
@@ -116,15 +121,21 @@ public int filterEntriesForConsumer(Optional<MessageMetadata[]> optMetadataArray
                             ? ((EntryAndMetadata) entry).getMetadata()
                             : Commands.peekAndCopyMessageMetadata(metadataAndPayload, subscription.toString(), -1)
                     );
+
+            int entryMsgs = msgMetadata == null ? 1 : msgMetadata.getNumMessagesInBatch();
+            this.throughFilterMsgs.add(entryMsgs);
+
             EntryFilter.FilterResult filterResult = runFiltersForEntry(entry, msgMetadata, consumer);
             if (filterResult == EntryFilter.FilterResult.REJECT) {
                 entriesToFiltered.add(entry.getPosition());
                 entries.set(i, null);
+                this.filterRejectedMsgs.add(entryMsgs);

Review Comment:
   ```suggestion
                   filterRejectedMsgs.add(entryMsgs);
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java:
##########
@@ -285,4 +298,24 @@ protected byte[] peekStickyKey(ByteBuf metadataAndPayload) {
     protected String getSubscriptionName() {
         return subscription == null ? null : subscription.getName();
     }
+
+    @Override
+    public long getThroughFilterMsgCount() {
+        return this.throughFilterMsgs.longValue();

Review Comment:
   ```suggestion
           return throughFilterMsgs.longValue();
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java:
##########
@@ -51,6 +52,10 @@ public abstract class AbstractBaseDispatcher extends EntryFilterSupport implemen
 
     protected final ServiceConfiguration serviceConfig;
     protected final boolean dispatchThrottlingOnBatchMessageEnabled;
+    private final LongAdder throughFilterMsgs = new LongAdder();
+    private final LongAdder filterAcceptedMsgs = new LongAdder();

Review Comment:
   As I understand there are two entities that require counting here:
   1. Entries - How many entries were rejected/accepted/processed/rescheduled
   2. Msgs - since an entry might be a batch messages entry, we can also count the same by in msgs, not in entries.
   
   The only thing I'm contemplating here is that EntryFilter makes decisions regarding entries, not messages, so I'm thinking if it makes sense to count only in messages.
   



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java:
##########
@@ -281,6 +281,16 @@ static void printTopicStats(SimpleTextOutputStream stream, String cluster, Strin
                     subsStats.msgDropRate, splitTopicAndPartitionIndexLabel);
             metric(stream, cluster, namespace, topic, n, "pulsar_subscription_consumers_count",
                     subsStats.consumersCount, splitTopicAndPartitionIndexLabel);
+
+            metric(stream, cluster, namespace, topic, n, "pulsar_subscription_through_filter_msg_count",

Review Comment:
   Same comment above, but here it should to be consistent `pulsar_subscription_filter_processed_msgs_count`



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java:
##########
@@ -134,4 +144,123 @@ public void testNonContiguousDeletedMessagesRanges() throws Exception {
                     .getNonContiguousDeletedMessagesRangesSerializedSize() > 0);
         });
     }
+
+    @DataProvider(name = "testSubscriptionMetrics")
+    public Object[][] topicAndSubscription() {
+        return new Object[][]{
+                {"persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub1", true},
+                {"non-persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub2", true},
+                {"persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub3", false},
+                {"non-persistent://my-property/my-ns/testSubscriptionStats-" + UUID.randomUUID(), "my-sub4", false},
+        };
+    }
+
+    @Test(dataProvider = "testSubscriptionMetrics")
+    public void testSubscriptionStats(final String topic, final String subName, boolean enableTopicStats)
+            throws Exception {
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .enableBatching(false)
+                .create();
+
+        @Cleanup
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionType(SubscriptionType.Exclusive)
+                .subscriptionName(subName)
+                .subscribe();
+
+        boolean isPersistent = pulsar.getBrokerService().getTopic(topic, false).get().get().isPersistent();
+        Dispatcher dispatcher = pulsar.getBrokerService().getTopic(topic, false).get()
+                .get().getSubscription(subName).getDispatcher();
+
+        Field field = EntryFilterSupport.class.getDeclaredField("entryFilters");
+        field.setAccessible(true);
+        NarClassLoader narClassLoader = mock(NarClassLoader.class);
+        EntryFilter filter1 = new EntryFilterTest();
+        EntryFilterWithClassLoader loader1 = spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter1, narClassLoader);
+        field.set(dispatcher, ImmutableList.of(loader1));
+
+        for (int i = 0; i < 100; i++) {
+            producer.newMessage().property("ACCEPT", " ").value(UUID.randomUUID().toString()).send();
+        }
+        for (int i = 0; i < 100; i++) {
+            producer.newMessage().property("REJECT", " ").value(UUID.randomUUID().toString()).send();
+        }
+        for (int i = 0; i < 100; i++) {
+            producer.newMessage().property("RESCHEDULE", " ").value(UUID.randomUUID().toString()).send();
+        }
+
+        for (;;) {

Review Comment:
   `while (true) {`
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] tjiuming commented on pull request #16932: [broker][monitoring] add per-subscription EntryFilter metrics

Posted by GitBox <gi...@apache.org>.
tjiuming commented on PR #16932:
URL: https://github.com/apache/pulsar/pull/16932#issuecomment-1215531550

   > @tjiuming - thanks! Do you feel strongly about keeping `Msgs` in the admin stats output, or can we change it to `MsgCount`? I like the latter because it matches the prometheus metric exactly then.
   
   it makes sense, and I've resolved them


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] michaeljmarshall commented on pull request #16932: [broker][monitoring] add per-subscription EntryFilter metrics

Posted by GitBox <gi...@apache.org>.
michaeljmarshall commented on PR #16932:
URL: https://github.com/apache/pulsar/pull/16932#issuecomment-1215928423

   Thanks @tjiuming! Great work!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org