You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by "poorbarcode (via GitHub)" <gi...@apache.org> on 2023/08/24 03:36:06 UTC

[GitHub] [pulsar] poorbarcode commented on a diff in pull request #21027: [improve][pip] Change cursor`s properties to store chunk ID map.

poorbarcode commented on code in PR #21027:
URL: https://github.com/apache/pulsar/pull/21027#discussion_r1303733160


##########
pip/pip-295.md:
##########
@@ -0,0 +1,128 @@
+
+# Background knowledge
+
+In [PIP 37](https://github.com/apache/pulsar/wiki/PIP-37:-Large-message-size-handling-in-Pulsar), Pulsar introduced chunk messages to handle the large message. It will separate a large message into some chunks when the producer sends the significant message to the broker. On the consumer side, a consumer will wait to receive all the chunks of a message and then assemble them into a single chunk message before returning it.
+In [PIP 6](https://github.com/apache/pulsar/wiki/PIP-6:-Guaranteed-Message-Deduplication), Pulsar introduced deduplication to make sure the messages sent by the producer are non-repeating.
+In PIP 6, each producer will have a sequence ID that starts at 0 and increase for each message. The message with a lower sequence ID will be dropped in the broker.
+
+# Motivation
+
+In the earliest design, all the chunks in a single chunk message have the same sequence ID which causes the chunk message can not work when enabling deduplication. For example, we have a  chunk message consisting of chunk-1 and chunk-2. When Broker receives chunk-1, it will update the last sequence ID to the sequence ID of chunk-1. And then, when the broker gets chunk-2, the chunk-2 will be dropped by depublication.
+I opened a [PR](https://github.com/apache/pulsar/pull/20948) to resolve this case. It allowed the chunks of a single chunk message to use the same sequence ID and filter duplicated chunks in a single-chunk message on the consumer side.
+It can resolve message duplication end to end, but the message duplication still exists in the topic.
+
+# Goals
+
+## In Scope
+Chunk messages can be effectively filtered on the broker side. Ensure that chunk messages work normally after enabling deduplication and the topic has no duplicate chunks.
+
+## Out of Scope
+
+
+
+# High Level Design
+Introduce a mechanism similar to [PIP 37](https://github.com/apache/pulsar/wiki/PIP-37:-Large-message-size-handling-in-Pulsar) to check the chunk ID.
+For normal messages, we still only check sequence ID, but we will check both sequence ID and chunk ID for chunk messages.
+
+# Detailed Design
+
+## Design & Implementation Details
+
+Add `chunkIDPushed` and `chunkIDPersisted` to store the chunk of each producer`s ongoing chunk messages. It will be used to check whether the chunks in a single message are duplicated.
+
+```
+    @VisibleForTesting
+    final ConcurrentOpenHashMap<String, Integer> chunkIDPushed =
+            ConcurrentOpenHashMap.<String, Integer>newBuilder()
+                    .expectedItems(16)
+                    .concurrencyLevel(1)
+                    .build();
+
+    @VisibleForTesting
+    final ConcurrentOpenHashMap<String, Integer> chunkIDPersisted =
+            ConcurrentOpenHashMap.<String, Integer>newBuilder()
+            .expectedItems(16)
+            .concurrencyLevel(1)
+            .build();
+```
+
+Optimize the `properties` of the `MarkDeleteEntry` from `Map<String, Long>` to `Map<String, String>`. In the depublication design, the ' MarkDeleteEntry' properties are used as a snapshot to store the sequence ID map. After introducing the chunk ID map, it cannot hold two long for each producer. So we hope to change the `MarkDeleteEntry' properties from `Map<String, Long>` to `Map<String, String>` to make it more flexible.
+
+
+
+## Public-facing Changes
+None
+
+### Public API
+None
+
+### Binary protocol
+Add `repeated StringProperty markDeleteProperties = 9;` to replace the original `repeated LongProperty properties = 5;`.
+
+### Configuration
+
+### CLI
+
+### Metrics
+
+
+# Monitoring
+
+
+# Security Considerations
+
+
+# Backward & Forward Compatibility
+
+## Revert
+When reverting to the old version of Pulsar, the `ManagedCursorInfo` will not contain the properties(`repeated LongProperty properties = 5;`). Because the new version of pulsar use markDeleteProperties (`repeated StringProperty markDeleteProperties = 9;`) to record mark delete properties.
+So It can only be reverted if losing many last persistent sequence ID data.
+
+## Upgrade
+
+Add an upgrade logic in `recover(final VoidCallback callback)`.
+The original logic:
+```java
+    Map<String, Long> recoveredProperties = Collections.emptyMap();
+    if (info.getPropertiesCount() > 0) {
+        // Recover properties map
+        recoveredProperties = new HashMap<>();
+        for (int i = 0; i < info.getPropertiesCount(); i++) {
+            LongProperty property = info.getProperties(i);
+            recoveredProperties.put(property.getName(), property.getValue());
+        }
+    }
+
+    recoveredCursor(recoveredPosition, recoveredProperties, recoveredCursorProperties, null);
+```
+Change to:
+```java
+    // Recover properties map
+    Map<String, String> recoveredProperties;
+    if (info.getPropertiesCount() == 0 && info.getmarkDeletePropertiesCount() == 0) {

Review Comment:
   The `info` means `ManagedCursorInfo`, right?



##########
pip/pip-295.md:
##########
@@ -0,0 +1,128 @@
+
+# Background knowledge
+
+In [PIP 37](https://github.com/apache/pulsar/wiki/PIP-37:-Large-message-size-handling-in-Pulsar), Pulsar introduced chunk messages to handle the large message. It will separate a large message into some chunks when the producer sends the significant message to the broker. On the consumer side, a consumer will wait to receive all the chunks of a message and then assemble them into a single chunk message before returning it.
+In [PIP 6](https://github.com/apache/pulsar/wiki/PIP-6:-Guaranteed-Message-Deduplication), Pulsar introduced deduplication to make sure the messages sent by the producer are non-repeating.
+In PIP 6, each producer will have a sequence ID that starts at 0 and increase for each message. The message with a lower sequence ID will be dropped in the broker.
+
+# Motivation
+
+In the earliest design, all the chunks in a single chunk message have the same sequence ID which causes the chunk message can not work when enabling deduplication. For example, we have a  chunk message consisting of chunk-1 and chunk-2. When Broker receives chunk-1, it will update the last sequence ID to the sequence ID of chunk-1. And then, when the broker gets chunk-2, the chunk-2 will be dropped by depublication.
+I opened a [PR](https://github.com/apache/pulsar/pull/20948) to resolve this case. It allowed the chunks of a single chunk message to use the same sequence ID and filter duplicated chunks in a single-chunk message on the consumer side.
+It can resolve message duplication end to end, but the message duplication still exists in the topic.
+
+# Goals
+
+## In Scope
+Chunk messages can be effectively filtered on the broker side. Ensure that chunk messages work normally after enabling deduplication and the topic has no duplicate chunks.
+
+## Out of Scope
+
+
+
+# High Level Design
+Introduce a mechanism similar to [PIP 37](https://github.com/apache/pulsar/wiki/PIP-37:-Large-message-size-handling-in-Pulsar) to check the chunk ID.
+For normal messages, we still only check sequence ID, but we will check both sequence ID and chunk ID for chunk messages.
+
+# Detailed Design
+
+## Design & Implementation Details
+
+Add `chunkIDPushed` and `chunkIDPersisted` to store the chunk of each producer`s ongoing chunk messages. It will be used to check whether the chunks in a single message are duplicated.
+
+```
+    @VisibleForTesting
+    final ConcurrentOpenHashMap<String, Integer> chunkIDPushed =
+            ConcurrentOpenHashMap.<String, Integer>newBuilder()
+                    .expectedItems(16)
+                    .concurrencyLevel(1)
+                    .build();
+
+    @VisibleForTesting
+    final ConcurrentOpenHashMap<String, Integer> chunkIDPersisted =
+            ConcurrentOpenHashMap.<String, Integer>newBuilder()
+            .expectedItems(16)
+            .concurrencyLevel(1)
+            .build();
+```
+
+Optimize the `properties` of the `MarkDeleteEntry` from `Map<String, Long>` to `Map<String, String>`. In the depublication design, the ' MarkDeleteEntry' properties are used as a snapshot to store the sequence ID map. After introducing the chunk ID map, it cannot hold two long for each producer. So we hope to change the `MarkDeleteEntry' properties from `Map<String, Long>` to `Map<String, String>` to make it more flexible.

Review Comment:
   Could you add a demo to describe the structure of the attribute `properties` of cursor metadata that you wanted after this PIP?



##########
pip/pip-295.md:
##########
@@ -0,0 +1,128 @@
+
+# Background knowledge
+
+In [PIP 37](https://github.com/apache/pulsar/wiki/PIP-37:-Large-message-size-handling-in-Pulsar), Pulsar introduced chunk messages to handle the large message. It will separate a large message into some chunks when the producer sends the significant message to the broker. On the consumer side, a consumer will wait to receive all the chunks of a message and then assemble them into a single chunk message before returning it.
+In [PIP 6](https://github.com/apache/pulsar/wiki/PIP-6:-Guaranteed-Message-Deduplication), Pulsar introduced deduplication to make sure the messages sent by the producer are non-repeating.
+In PIP 6, each producer will have a sequence ID that starts at 0 and increase for each message. The message with a lower sequence ID will be dropped in the broker.
+
+# Motivation
+
+In the earliest design, all the chunks in a single chunk message have the same sequence ID which causes the chunk message can not work when enabling deduplication. For example, we have a  chunk message consisting of chunk-1 and chunk-2. When Broker receives chunk-1, it will update the last sequence ID to the sequence ID of chunk-1. And then, when the broker gets chunk-2, the chunk-2 will be dropped by depublication.
+I opened a [PR](https://github.com/apache/pulsar/pull/20948) to resolve this case. It allowed the chunks of a single chunk message to use the same sequence ID and filter duplicated chunks in a single-chunk message on the consumer side.
+It can resolve message duplication end to end, but the message duplication still exists in the topic.
+
+# Goals
+
+## In Scope
+Chunk messages can be effectively filtered on the broker side. Ensure that chunk messages work normally after enabling deduplication and the topic has no duplicate chunks.

Review Comment:
   **Background:***: There are two properties in the metadata of the cursor
   - `properties<String, Long>`: used to maintain the last sequence of producer-sent messages<sup>[1]</sup>.
     - PIP: https://github.com/apache/pulsar/wiki/PIP-6:-Guaranteed-Message-Deduplication
     - PR: https://github.com/apache/pulsar/pull/744
   -  `cursorProperties<String, String>`: used to maintain the subscription properties. 
     - PIP: https://github.com/apache/pulsar/issues/12269 
     - PR: https://github.com/apache/pulsar/issues/15750
   
   **[1]**: a structure of `properties`:
   ```yaml
   properties: 
     - "producer_name_1" : {{last_persist_sequence_1}}
     - "producer_name_2" : {{last_persist_sequence_2}}
   ```
   
   ----
   
   In this PIP, you want to change `properties<String, Long>` to `properties<String, Long>`, right? Could you also explain this change here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org