You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/06/08 14:39:24 UTC
[GitHub] [pulsar] BewareMyPower opened a new pull request, #15983: [fix][broker] Avoid storing `MessageMetadata` instances returned by `peekMessageMetadata`
BewareMyPower opened a new pull request, #15983:
URL: https://github.com/apache/pulsar/pull/15983
### Motivation
https://github.com/apache/pulsar/pull/15967 removed the `EntryWrapper`,
which holds an `Entry` instance that is never used. Instead, after the
refactoring, the `MessageMetadata` array is useless, see
https://github.com/apache/pulsar/blob/298a573295f845e46f8a55cee366b6db63e997c2/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java#L517-L519
Each `MessageMetadata` instance in the array is returned by
`peekMessageMetadata`, whose returned value references a thread local
object `Commands#LOCAL_MESSAGE_METADATA`. It brings a problem that if
multiple entries were read, all `MessageMetadata` elements in the array
reference the same object.
However, accidentally, the wrong invocation of `Optional#orElse` saves
it. See
https://github.com/apache/pulsar/blob/298a573295f845e46f8a55cee366b6db63e997c2/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java#L133-L134
Each time `peekMessageMetadata` is called, the thread local message
metadata will be updated. Unlike `orElseGet`, the expression in `orElse`
is always called no matter if the optional is empty.
This behavior change increases the invocations count of
`peekMessageMetadata` and the `metadataArray` cache became redundant.
### Modifications
- Use `orElseGet` instead of `orElse` in `AbstractBaseDispatcher`.
- Add a new static method `Commands#peekAndCopyMessageMetadata` that
returns a `MessageMetadata` instance allocated from heap memory.
- Call `peekAndCopyMessageMetadata` to cache all message metadata
instances in `PersistentDispatcherMultipleConsumers`.
### Verifying this change
- [ ] Make sure that the change passes the CI checks.
It's hard to add tests. As I've explained before, #15967 only degrades the
performance and doesn't affect the correctness.
### Documentation
Check the box below or label this PR directly.
Need to update docs?
- [ ] `doc-required`
(Your PR needs to update docs and you will update later)
- [x] `doc-not-needed`
(Please explain why)
- [ ] `doc`
(Your PR contains doc changes)
- [ ] `doc-complete`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] BewareMyPower merged pull request #15983: [fix][broker] Avoid storing `MessageMetadata` instances returned by `peekMessageMetadata`
Posted by GitBox <gi...@apache.org>.
BewareMyPower merged PR #15983:
URL: https://github.com/apache/pulsar/pull/15983
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] BewareMyPower commented on pull request #15983: [fix][broker] Avoid storing `MessageMetadata` instances returned by `peekMessageMetadata`
Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on PR #15983:
URL: https://github.com/apache/pulsar/pull/15983#issuecomment-1150020037
I tried to add the test, but it looks harder than I thought. The only way is to verify the invocation count of the static`Commands#peekMessageMetadata` method.
A simple way to expose the problem is reverting the changes of `PersistentDispatcherMultipleConsumers.java` in this PR and running the following test in `FilterEntryTest`.
```java
@Test
public void testMultiEntriesInMultiConsumerDispatcher() throws Exception {
String topic = "persistent://prop/ns-abc/topic" + UUID.randomUUID();
String subName = "sub";
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
// PersistentDispatcherMultipleConsumers will be used for Shared subscription
.subscriptionType(SubscriptionType.Shared)
.subscriptionName(subName)
.subscribe();
consumer.close();
PersistentSubscription subscription = (PersistentSubscription) pulsar.getBrokerService()
.getTopicReference(topic).get().getSubscription(subName);
Dispatcher dispatcher = subscription.getDispatcher();
Field field = AbstractBaseDispatcher.class.getDeclaredField("entryFilters");
field.setAccessible(true);
NarClassLoader narClassLoader = mock(NarClassLoader.class);
final List<Long> sequenceIdList = Collections.synchronizedList(new ArrayList<>());
EntryFilter filter = new EntryFilter() {
@Override
public FilterResult filterEntry(Entry entry, FilterContext context) {
sequenceIdList.add(context.getMsgMetadata().getSequenceId());
return FilterResult.ACCEPT;
}
@Override
public void close() {
}
};
EntryFilterWithClassLoader loader = spyWithClassAndConstructorArgs(
EntryFilterWithClassLoader.class, filter, narClassLoader);
field.set(dispatcher, ImmutableList.of(loader));
final int numMessages = 10;
@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.enableBatching(false)
.topic(topic)
.create();
for (int i = 0; i < numMessages; i++) {
producer.newMessage().value("msg-" + i).send();
}
consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
// PersistentDispatcherMultipleConsumers will be used for Shared subscription
.subscriptionType(SubscriptionType.Shared)
.subscriptionName(subName)
.subscribe();
while (true) {
Message<String> msg = consumer.receive(1, TimeUnit.SECONDS);
if (msg == null) {
break;
}
}
assertEquals(LongStream.range(0, numMessages).boxed().collect(Collectors.toList()), sequenceIdList);
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org