You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/06/29 15:46:06 UTC

[GitHub] [pulsar-client-go] Gleiphir2769 commented on issue #456: Support Large Message Size (PIP-37 / "chunking")

Gleiphir2769 commented on issue #456:
URL: https://github.com/apache/pulsar-client-go/issues/456#issuecomment-1170147387

   Hello, I am very interested in completing this feature, the following is my plan.
   ## Motivation
   Make pulsar go client support chunking to produce and consume big messages without closing baching. 
   ## Modifications
   ### Publish Chunked Messages
   The `maxMessageSize` limited the big message publishing. 
   https://github.com/apache/pulsar-client-go/blob/0f7041ffa9085197aa888ac33d3288a3ed81c57b/pulsar/producer_partition.go#L427-L436
   If the size of message payload is bigger than `maxMessageSize`, it will be discarded. So it should be split into chunked messages with a size not exceeding the `maxMessageSize`, and they are sent to the brokers separately. I think the chunk logic can be added in `internalSendAsync`.
   https://github.com/apache/pulsar-client-go/blob/0f7041ffa9085197aa888ac33d3288a3ed81c57b/pulsar/producer_partition.go#L741
   ### Receive Chunked Messages
   Pulsar allows multiple Producers to produce messages to the same topic at the same time, which means that the chunks of multiple big messages may be alternately arranged in the topic. And each chunk of the same big message is not necessarily consecutive arrived (but must arrive in order, which is guaranteed by the broker).
   So the go client needs a `ChunkedMessageCtx` to track and buffer the chunked message. The context `ChunkedMessageCtx` maintains the position of the currently received chunks and accumulates the `payload` of the chunks that have been received. When all chunks are received, `ChunkedMessageCtx` returns the accumulated `payload` to the user, i.e. the full message before the chunking.
   ![snipaste_2022-05-16_17-54-35.png](https://s2.loli.net/2022/05/20/Boyx7FZe8S5ONH1.png)
   All `ChunkedMessageCtx` need to be maintained in a cache. Due to memory pressure, the number of `ChunkedMessageCtx` needs to be limited (the default upper limit for Java clients is 100). This cache is essentially a concurrent map with eviction policy (LRU). It can be simply implementted as map + mutex + pending queue or some other more complex one (https://github.com/Gleiphir2769/s-cache). 
   I think it shoud work in here.
   https://github.com/apache/pulsar-client-go/blob/0f7041ffa9085197aa888ac33d3288a3ed81c57b/pulsar/consumer_partition.go#L553
   ### Some Details
   #### Batching
   Currently pulsar go client depends on `BatchBuilder` to send all messages even `batching` is closed (each message will cause a flush of the batch in this case). 
   https://github.com/apache/pulsar-client-go/blob/0f7041ffa9085197aa888ac33d3288a3ed81c57b/pulsar/producer_partition.go#L472-L474
   In the [Java Client](https://github.com/apache/pulsar/blob/f230d15ffcd5f74cca13bd23b35ace784d6f8ce6/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1313-L1323), batch message logic will skip the processing of chunk messages. So we need a single message sending implement independent of `BatchBuilder`.
   Considering the problem of consumer available-permits calculation in shared subscription ([issue #10417](https://github.com/apache/pulsar/pull/10417)), `batching` and `chunking` cannot be enabled at the same time.
   #### Chunked Message ID 
   This is related to [PIP 107](https://github.com/apache/pulsar/issues/12402). It's good to take the solution in the new Java Client, which is to implement an `ChunkMessageIdImpl` that can invoke `getFirstChunkMessageId`. It will modify the `Seek` implement which seek the first chunk message id.
   https://github.com/apache/pulsar-client-go/blob/0f7041ffa9085197aa888ac33d3288a3ed81c57b/pulsar/consumer_partition.go#L428-L431
   https://github.com/apache/pulsar-client-go/blob/0f7041ffa9085197aa888ac33d3288a3ed81c57b/pulsar/consumer_partition.go#L447-L459
   #### Size Calculation
   This is related to [issue #16196](https://github.com/apache/pulsar/pull/16196). Message metadate should be updated before computing the chunk size. An the total size should include all bytes other than the metadata and payload, e.g. the 4 bytes checksum field.
   #### Shared Subscription
   There are some problems of chunking with shared subscription. [issue #16202](https://github.com/apache/pulsar/pull/16202) supported chunking with Shared subscription. And go client may not need to limit chunking with Shared subscription in `ConsumerImpl`.
   #### unAckedChunkedMessageIdSequenceMap
   (wait to be completed)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org