You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by "hangc0276 (via GitHub)" <gi...@apache.org> on 2023/04/14 09:26:57 UTC

[GitHub] [bookkeeper] hangc0276 commented on pull request #3919: [fix] DigestManager should not advance readerIndex

hangc0276 commented on PR #3919:
URL: https://github.com/apache/bookkeeper/pull/3919#issuecomment-1508220615

   ### How this bug happens on the Pulsar side
   #### Reproduce steps
   1. Add `brokerEntryMetadataInterceptors=org.apache.pulsar.common.intercept.AppendBrokerTimestampMetadataInterceptor` to `conf/broker.conf` to enable broker entry metadata interceptor
   2. Setup a consumer to a topic-A
   3. Setup a producer without batch and produce a small message (10 bytes) to topic-A
   4. The broker will throw the following exception
   ```
   2023-04-14T17:06:09,622+0800 [broker-topic-workers-OrderedExecutor-0-0] ERROR org.apache.pulsar.common.protocol.Commands - [PersistentSubscription{topic=persistent://pulsar/test_v1/127.0.0.1:8080/healthcheck, name=healthCheck-d1544425-97ea-4874-8972-d30d2391ee8b}] [-1] Failed to parse message metadata
   java.lang.IndexOutOfBoundsException: readerIndex(94) + length(2) exceeds writerIndex(94): UnpooledDuplicatedByteBuf(ridx: 94, widx: 94, cap: 94, unwrapped: CompositeByteBuf(ridx: 94, widx: 94, cap: 94, components=2))
   	at io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1442) ~[io.netty-netty-buffer-4.1.89.Final.jar:4.1.89.Final]
   	at io.netty.buffer.AbstractByteBuf.readShort(AbstractByteBuf.java:749) ~[io.netty-netty-buffer-4.1.89.Final.jar:4.1.89.Final]
   	at org.apache.pulsar.common.protocol.Commands.skipBrokerEntryMetadataIfExist(Commands.java:1692) ~[org.apache.pulsar-pulsar-common-3.0.0-SNAPSHOT.jar:3.0.0-SNAPSHOT]
   	at org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:452) ~[org.apache.pulsar-pulsar-common-3.0.0-SNAPSHOT.jar:3.0.0-SNAPSHOT]
   	at org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:445) ~[org.apache.pulsar-pulsar-common-3.0.0-SNAPSHOT.jar:3.0.0-SNAPSHOT]
   	at org.apache.pulsar.common.protocol.Commands.peekMessageMetadata(Commands.java:1899) ~[org.apache.pulsar-pulsar-common-3.0.0-SNAPSHOT.jar:3.0.0-SNAPSHOT]
   	at org.apache.pulsar.common.protocol.Commands.peekAndCopyMessageMetadata(Commands.java:1918) ~[org.apache.pulsar-pulsar-common-3.0.0-SNAPSHOT.jar:3.0.0-SNAPSHOT]
   	at org.apache.pulsar.broker.service.AbstractBaseDispatcher.filterEntriesForConsumer(AbstractBaseDispatcher.java:142) ~[org.apache.pulsar-pulsar-broker-3.0.0-SNAPSHOT.jar:3.0.0-SNAPSHOT]
   	at org.apache.pulsar.broker.service.AbstractBaseDispatcher.filterEntriesForConsumer(AbstractBaseDispatcher.java:100) ~[org.apache.pulsar-pulsar-broker-3.0.0-SNAPSHOT.jar:3.0.0-SNAPSHOT]
   	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer.internalReadEntriesComplete(PersistentDispatcherSingleActiveConsumer.java:210) ~[org.apache
   ```
   
   #### How this bug happens
   1. When producing an entry on the Pulsar topic with AppendBrokerTimestampMetadataInterceptor, the broker will add a meta header for the entry and generate a CompositeByteBuf
   https://github.com/apache/pulsar/blob/091ee2504ffbe6ec98e354b76e7f4c045e1914aa/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java#L1685-L1687
   
   2. To isolate the `readerIndex` and `writeIndex` change between the Pulsar broker and BookKeeper client, we generate a duplicated ByteBuf for BookieClient to write to the BookKeeper cluster. Ideally, the Broker's ByteBuf's readerIndex won't change.
   https://github.com/apache/pulsar/blob/091ee2504ffbe6ec98e354b76e7f4c045e1914aa/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java#L126-L144
   
   3. However, we unwrapped the duplicated ByteBuf if the original ByteBuf is a CompositeByteBuf to improve the crc32c digest for the ByteBuf. If we use the wrapped duplicated ByteBuf to calculate the crc32c digest, it will copy the CompositeByteBuf data from direct memory to heap memory and bring a heavy JVM GC load. 
   https://github.com/apache/bookkeeper/blob/35e9da9b55b5d44459d3421e8704be47afc6f914/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java#L141-L142
   
   4. After calculating the crc32c digest, it will write the unwrapped CompositeByteBuf to a new ByteBuf if the ByteBuf size is small. The `buf.writeBytes(unwrapped)` API will change the unwrapped CompositeByteBuf's readerIndex to the end of the ByteBuf.
   https://github.com/apache/bookkeeper/blob/35e9da9b55b5d44459d3421e8704be47afc6f914/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java#L162
   
   https://github.com/netty/netty/blob/24a0ac36ea91d1aee647d738f879ac873892d829/buffer/src/main/java/io/netty/buffer/AbstractByteBuf.java#L1086-L1099
   
   5. After this entry writes succeed, the broker will add this entry to the Broker's cache due to the topic has active consumers. At this comment, the original CompositeByteBuf `data`'s readerIndex has been updated to the end of the ByteBuf on Step 4
   https://github.com/apache/pulsar/blob/091ee2504ffbe6ec98e354b76e7f4c045e1914aa/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java#L231-L238
   
   6. When the consumer fetches messages from the topic, it will get the entry from the Broker's Cache directly. Due to the ByteBuf's readerIndex has been set to the end of the ByteBuf, it will throw an IndexOutOfBoundsException when parsing metadata from the entry.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org