You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rd...@apache.org on 2019/05/29 01:18:13 UTC

[pulsar.wiki] branch master updated: Updated PIP 37: Large message size handling in Pulsar (markdown)

This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.wiki.git


The following commit(s) were added to refs/heads/master by this push:
     new 89b43ac  Updated PIP 37: Large message size handling in Pulsar (markdown)
89b43ac is described below

commit 89b43ac062e6467c81151a1c917e887041e96228
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Tue May 28 18:18:11 2019 -0700

    Updated PIP 37: Large message size handling in Pulsar (markdown)
---
 PIP-37:-Large-message-size-handling-in-Pulsar.md | 67 +++++++++++++++++++++++-
 1 file changed, 66 insertions(+), 1 deletion(-)

diff --git a/PIP-37:-Large-message-size-handling-in-Pulsar.md b/PIP-37:-Large-message-size-handling-in-Pulsar.md
index 3cf5a93..aaf2d2d 100644
--- a/PIP-37:-Large-message-size-handling-in-Pulsar.md
+++ b/PIP-37:-Large-message-size-handling-in-Pulsar.md
@@ -40,13 +40,50 @@ The main difference between this approach and PIP-31(Txn) is assembling of messa
 ### Usecase 3: Multiple producers with shared consumers
 
 We discussed how message chunking works without any broker changes when there is a single ordered consumer consumes messages published by  single/multiple publishers. In this section we will discuss how it works with shared consumers.
-Message chunking/split and joins requires all chunks related to one message must be delivered to one consumer. So, in the case of shared consumers we need a small broker change while dispatching messages. (1) Broker keeps a sorted list of shared consumer based on consumer connected time (2) broker reads message_id from metadata(unique message-id which attach to all message-chunks of that message) of the chunked-message while dispatching the message and based on message-id hash , broker s [...]
+
+
+#### Option 1: Broker caches mapping of message-UUID and consumerId
+
+Message chunking/split and joins requires all chunks related to one message must be delivered to one consumer. So, in the case of shared consumers we need broker change where broker reads message metadata before dispatching the messages. Broker keeps a sorted list of shared consumer based on consumer connected time and based on message-id hash , broker selects a consumer to send all chunks attached to that message-id. Broker also keeps track of hash-id for on-fly messages which will avoi [...]
+
+Broker can also use [PersistentStickyKeyDispatcherMultipleConsumers](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java) to dispatch chunked messages of specific original message by considering message-uuid as Key.
+
+**Cons:**
+This approach will not work very well in case of redelivery. In case of redelivery, broker keeps redelivered messages into unsorted map and it will be difficult and expensive for broker to figure out all chunked messages of the original message and sort them into order.
+
+
 
 ![image](https://user-images.githubusercontent.com/2898254/57895228-741e0100-77ff-11e9-8feb-334ebec83f4a.png)
 
                                    [Fig 3: Multiple producer with shared  consumer]
 
 
+
+#### Option 2: Producer publish marker message after publishing all chunked messages.
+
+One of the main issues in Option 1 was to manage dispatching of redelivered messages which are stored into unsorted map. 
+To solve this problem, producer will publish a marker-message with all list of chunked messages's messageIds(ledgerId,entryId) once all the chunked messages are published successfully. There will not be any change at broker while persisting these messages and they will be persisted into the same ledger with other messages.
+
+However, while dispatching messages, broker will read metadata of the message and skip the message if it is chunked message. Broker will deliver these chunked messages when it is dispatching appropriate marker-message attached to those chunked messages. When broker is trying to dispatch maker-message it will deserialize payload of that marker message and retrieve list of chunked message-ids present into the payload. Broker reads those list of chunked messages and dispatch them together t [...]
+
+Eg:
+1. Producer publish large message M1 into two chunks M1-C1 and M1-C2.
+
+2. After successfully publishing M1-C1 and M1-C2, producer will publish marker message M1-Marker1 with payload will have M1-C1 and M1-C2 messages with their messageIds assigned by broker(ledgerId and entryId).
+
+3. Broker will read M1-C1 and M2-C2 messages while dispatching and broker will skip them and not deliver immediately to any consumer.
+
+4. Now, when Broker will dispatch marker message M1-Marker1, broker will read the payload and retrieve list of chunked message-ids of M1-C1 and M1-C2. Broker will choose one of the available consumers and dispatch all chunked messages of M1 along with Marker message.
+
+5. Consumer handles chunked and marker messages, stitch chunked messages and create original message and place into receiver queue.
+
+6. Once the original message is processed successfully by consumer, consumer will ack all the chunked messages along with marker messages.
+
+
+![image](https://user-images.githubusercontent.com/2898254/58521849-c4d41900-8172-11e9-963e-1868d845ef8a.png)
+
+
+
 ## Chunking on large-message Vs Txn with large-message
 
 1. Txn approach requires lot of new enhancement at broker which requires extra service for txn-coordinator, extra CPU to read over txn-buffer for each txn-msg, extra memory to maintain txn-buffer, extra metadata for new txn-partition. And it might not be convenient to deploy txn-service for any specialized system which serves 1M topics with large traffic.
@@ -58,3 +95,31 @@ Message chunking/split and joins requires all chunks related to one message must
 
 
 
+## Client and Broker changes for Non-shared subscription:
+
+### Client changes:
+There will not be any change require into producer and consumer api. 
+
+Producer will split the original message into chunks and publish them with chunked metadata. Producer will have configuration `chunkingEnabled` to enable chunking if message payload size is larger than broker can support. If we want to support non-shared subscription then producer have to publish marker message along with chunked messages as we have discussed in the previous section.
+
+Consumer consumes the chunked messages and buffer them until it receives all the chunks of a message and finally consumer stitch them together and places into receiver-queue so, application can consume message from there. Once, consumer consumes entire large message and acks it, consumer internally sends acknowledgement all the chunk messages associated to that large message.
+
+Consumer will have configuration: `maxPendingChuckedMessage` to allow only configured number of chunked buffers into memory to avoid memory pressure in client application. Once, consumer reaches this threshold, it discards oldest buffer from the memory and marks them for redelivery to consume them later.
+Protocol changes:
+
+*SEND command:*
+will have new fields under metadata to pass chunking metadata from producer to consumer
+```
+optional string uuid; // Original message uuid that will be same for all the chunks
+optional int32 num_chunks_from_msg;
+optional int32 total_chunk_msg_size; 
+optional int32 chunk_id;
+```
+
+### Broker changes:
+
+#### Non-Shared consumer
+Broker doesn’t require any changes to support chunking for non-shared subscription. Broker only records chunked message rate on the topic for monitoring purpose.
+
+#### Shared consumer
+Broker requires changes when it has to support chunking for non-shared subscription. In that case, broker skips chunked message delivery immediately and dispatch those messages when broker reads marker message associated to those chunked messages.