You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/03/04 13:58:40 UTC

[GitHub] [pulsar] codelipenghui commented on a change in pull request #14531: [ISSUE] add filteredMsgCount for `pulsar-admin topics stats`

codelipenghui commented on a change in pull request #14531:
URL: https://github.com/apache/pulsar/pull/14531#discussion_r819587689



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
##########
@@ -159,4 +155,52 @@ public void testFilter() throws Exception {
 
     }
 
+
+    @Test
+    public void testFilteredMsgCount() throws Throwable{
+        String topic = "persistent://prop/ns-abc/topic" + UUID.randomUUID();
+        String subName = "sub";
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topic)

Review comment:
       ```suggestion
           @Cleanup
           Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topic)
   ```

##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
##########
@@ -159,4 +155,52 @@ public void testFilter() throws Exception {
 
     }
 
+
+    @Test
+    public void testFilteredMsgCount() throws Throwable{
+        String topic = "persistent://prop/ns-abc/topic" + UUID.randomUUID();
+        String subName = "sub";
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topic)
+                .subscriptionName(subName).subscribe();
+        // mock entry filters
+        PersistentSubscription subscription = (PersistentSubscription) pulsar.getBrokerService()
+                .getTopicReference(topic).get().getSubscription(subName);
+        Dispatcher dispatcher = subscription.getDispatcher();
+        Field field = AbstractBaseDispatcher.class.getDeclaredField("entryFilters");
+        field.setAccessible(true);
+        NarClassLoader narClassLoader = mock(NarClassLoader.class);
+        EntryFilter filter1 = new EntryFilterTest();
+        EntryFilterWithClassLoader loader1 = spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter1, narClassLoader);
+        EntryFilter filter2 = new EntryFilter2Test();
+        EntryFilterWithClassLoader loader2 = spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter2, narClassLoader);
+        field.set(dispatcher, ImmutableList.of(loader1, loader2));
+
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)

Review comment:
       ```suggestion
           @Cleanup
           Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
   ```

##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
##########
@@ -32,10 +31,7 @@
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
-import org.apache.pulsar.broker.service.AbstractBaseDispatcher;
-import org.apache.pulsar.broker.service.BrokerService;
-import org.apache.pulsar.broker.service.BrokerTestBase;
-import org.apache.pulsar.broker.service.Dispatcher;
+import org.apache.pulsar.broker.service.*;

Review comment:
       Please avoid star import.

##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
##########
@@ -159,4 +155,52 @@ public void testFilter() throws Exception {
 
     }
 
+
+    @Test
+    public void testFilteredMsgCount() throws Throwable{
+        String topic = "persistent://prop/ns-abc/topic" + UUID.randomUUID();
+        String subName = "sub";
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topic)
+                .subscriptionName(subName).subscribe();
+        // mock entry filters
+        PersistentSubscription subscription = (PersistentSubscription) pulsar.getBrokerService()
+                .getTopicReference(topic).get().getSubscription(subName);
+        Dispatcher dispatcher = subscription.getDispatcher();
+        Field field = AbstractBaseDispatcher.class.getDeclaredField("entryFilters");
+        field.setAccessible(true);
+        NarClassLoader narClassLoader = mock(NarClassLoader.class);
+        EntryFilter filter1 = new EntryFilterTest();
+        EntryFilterWithClassLoader loader1 = spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter1, narClassLoader);
+        EntryFilter filter2 = new EntryFilter2Test();
+        EntryFilterWithClassLoader loader2 = spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter2, narClassLoader);
+        field.set(dispatcher, ImmutableList.of(loader1, loader2));
+
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .enableBatching(false).topic(topic).create();
+        for (int i = 0; i < 10; i++) {
+            producer.send("test");
+        }
+
+        for (int i = 0; i < 10; i++) {
+            assertNotNull(producer.newMessage().property("REJECT", "").value("1").send());
+        }
+
+
+        int counter = 0;
+        while (true) {
+            Message<String> message = consumer.receive(1, TimeUnit.SECONDS);

Review comment:
       1 second might be too short in the CI environment if the broker dispatch messages slowly.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org