You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/06/08 14:39:24 UTC

[GitHub] [pulsar] BewareMyPower opened a new pull request, #15983: [fix][broker] Avoid storing `MessageMetadata` instances returned by `peekMessageMetadata`

BewareMyPower opened a new pull request, #15983:
URL: https://github.com/apache/pulsar/pull/15983

   ### Motivation
   
   https://github.com/apache/pulsar/pull/15967 removed the `EntryWrapper`,
   which holds an `Entry` instance that is never used. Instead, after the
   refactoring, the `MessageMetadata` array is useless, see
   https://github.com/apache/pulsar/blob/298a573295f845e46f8a55cee366b6db63e997c2/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java#L517-L519
   
   Each `MessageMetadata` instance in the array is returned by
   `peekMessageMetadata`, whose returned value references a thread local
   object `Commands#LOCAL_MESSAGE_METADATA`. It brings a problem that if
   multiple entries were read, all `MessageMetadata` elements in the array
   reference the same object.
   
   However, accidentally, the wrong invocation of `Optional#orElse` saves
   it. See
   https://github.com/apache/pulsar/blob/298a573295f845e46f8a55cee366b6db63e997c2/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java#L133-L134
   
   Each time `peekMessageMetadata` is called, the thread local message
   metadata will be updated. Unlike `orElseGet`, the expression in `orElse`
   is always called no matter if the optional is empty.
   
   This behavior change increases the invocations count of
   `peekMessageMetadata` and the `metadataArray` cache became redundant.
   
   ### Modifications
   
   - Use `orElseGet` instead of `orElse` in `AbstractBaseDispatcher`.
   - Add a new static method `Commands#peekAndCopyMessageMetadata` that
     returns a `MessageMetadata` instance allocated from heap memory.
   - Call `peekAndCopyMessageMetadata` to cache all message metadata
     instances in `PersistentDispatcherMultipleConsumers`.
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   It's hard to add tests. As I've explained before, #15967 only degrades the
   performance and doesn't affect the correctness.
   
   ### Documentation
   
   Check the box below or label this PR directly.
   
   Need to update docs?
   
   - [ ] `doc-required`
   (Your PR needs to update docs and you will update later)
   
   - [x] `doc-not-needed`
   (Please explain why)
   
   - [ ] `doc`
   (Your PR contains doc changes)
   
   - [ ] `doc-complete`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower merged pull request #15983: [fix][broker] Avoid storing `MessageMetadata` instances returned by `peekMessageMetadata`

Posted by GitBox <gi...@apache.org>.
BewareMyPower merged PR #15983:
URL: https://github.com/apache/pulsar/pull/15983


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on pull request #15983: [fix][broker] Avoid storing `MessageMetadata` instances returned by `peekMessageMetadata`

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on PR #15983:
URL: https://github.com/apache/pulsar/pull/15983#issuecomment-1150020037

   I tried to add the test, but it looks harder than I thought. The only way is to verify the invocation count of the static`Commands#peekMessageMetadata` method.
   
   A simple way to expose the problem is reverting the changes of `PersistentDispatcherMultipleConsumers.java` in this PR and running the following test in `FilterEntryTest`.
   
   ```java
       @Test
       public void testMultiEntriesInMultiConsumerDispatcher() throws Exception {
           String topic = "persistent://prop/ns-abc/topic" + UUID.randomUUID();
           String subName = "sub";
   
           Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
                   .topic(topic)
                   // PersistentDispatcherMultipleConsumers will be used for Shared subscription
                   .subscriptionType(SubscriptionType.Shared)
                   .subscriptionName(subName)
                   .subscribe();
           consumer.close();
           PersistentSubscription subscription = (PersistentSubscription) pulsar.getBrokerService()
                   .getTopicReference(topic).get().getSubscription(subName);
           Dispatcher dispatcher = subscription.getDispatcher();
           Field field = AbstractBaseDispatcher.class.getDeclaredField("entryFilters");
           field.setAccessible(true);
           NarClassLoader narClassLoader = mock(NarClassLoader.class);
   
           final List<Long> sequenceIdList = Collections.synchronizedList(new ArrayList<>());
           EntryFilter filter = new EntryFilter() {
   
               @Override
               public FilterResult filterEntry(Entry entry, FilterContext context) {
                   sequenceIdList.add(context.getMsgMetadata().getSequenceId());
                   return FilterResult.ACCEPT;
               }
   
               @Override
               public void close() {
               }
           };
           EntryFilterWithClassLoader loader = spyWithClassAndConstructorArgs(
                   EntryFilterWithClassLoader.class, filter, narClassLoader);
           field.set(dispatcher, ImmutableList.of(loader));
   
           final int numMessages = 10;
   
           @Cleanup
           Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
                   .enableBatching(false)
                   .topic(topic)
                   .create();
           for (int i = 0; i < numMessages; i++) {
               producer.newMessage().value("msg-" + i).send();
           }
   
           consumer = pulsarClient.newConsumer(Schema.STRING)
                   .topic(topic)
                   .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                   // PersistentDispatcherMultipleConsumers will be used for Shared subscription
                   .subscriptionType(SubscriptionType.Shared)
                   .subscriptionName(subName)
                   .subscribe();
           while (true) {
               Message<String> msg = consumer.receive(1, TimeUnit.SECONDS);
               if (msg == null) {
                   break;
               }
           }
   
           assertEquals(LongStream.range(0, numMessages).boxed().collect(Collectors.toList()), sequenceIdList);
       }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org