You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/06/08 14:47:07 UTC

[GitHub] [pulsar] BewareMyPower commented on pull request #15983: [fix][broker] Avoid storing `MessageMetadata` instances returned by `peekMessageMetadata`

BewareMyPower commented on PR #15983:
URL: https://github.com/apache/pulsar/pull/15983#issuecomment-1150020037

   I tried to add the test, but it looks harder than I thought. The only way is to verify the invocation count of the static`Commands#peekMessageMetadata` method.
   
   A simple way to expose the problem is reverting the changes of `PersistentDispatcherMultipleConsumers.java` in this PR and running the following test in `FilterEntryTest`.
   
   ```java
       @Test
       public void testMultiEntriesInMultiConsumerDispatcher() throws Exception {
           String topic = "persistent://prop/ns-abc/topic" + UUID.randomUUID();
           String subName = "sub";
   
           Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
                   .topic(topic)
                   // PersistentDispatcherMultipleConsumers will be used for Shared subscription
                   .subscriptionType(SubscriptionType.Shared)
                   .subscriptionName(subName)
                   .subscribe();
           consumer.close();
           PersistentSubscription subscription = (PersistentSubscription) pulsar.getBrokerService()
                   .getTopicReference(topic).get().getSubscription(subName);
           Dispatcher dispatcher = subscription.getDispatcher();
           Field field = AbstractBaseDispatcher.class.getDeclaredField("entryFilters");
           field.setAccessible(true);
           NarClassLoader narClassLoader = mock(NarClassLoader.class);
   
           final List<Long> sequenceIdList = Collections.synchronizedList(new ArrayList<>());
           EntryFilter filter = new EntryFilter() {
   
               @Override
               public FilterResult filterEntry(Entry entry, FilterContext context) {
                   sequenceIdList.add(context.getMsgMetadata().getSequenceId());
                   return FilterResult.ACCEPT;
               }
   
               @Override
               public void close() {
               }
           };
           EntryFilterWithClassLoader loader = spyWithClassAndConstructorArgs(
                   EntryFilterWithClassLoader.class, filter, narClassLoader);
           field.set(dispatcher, ImmutableList.of(loader));
   
           final int numMessages = 10;
   
           @Cleanup
           Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
                   .enableBatching(false)
                   .topic(topic)
                   .create();
           for (int i = 0; i < numMessages; i++) {
               producer.newMessage().value("msg-" + i).send();
           }
   
           consumer = pulsarClient.newConsumer(Schema.STRING)
                   .topic(topic)
                   .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                   // PersistentDispatcherMultipleConsumers will be used for Shared subscription
                   .subscriptionType(SubscriptionType.Shared)
                   .subscriptionName(subName)
                   .subscribe();
           while (true) {
               Message<String> msg = consumer.receive(1, TimeUnit.SECONDS);
               if (msg == null) {
                   break;
               }
           }
   
           assertEquals(LongStream.range(0, numMessages).boxed().collect(Collectors.toList()), sequenceIdList);
       }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org