You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/10/28 00:07:42 UTC

[GitHub] [pulsar] poorbarcode opened a new pull request, #18227: [fix][broker]fix Repeated messages of shared dispatcher

poorbarcode opened a new pull request, #18227:
URL: https://github.com/apache/pulsar/pull/18227

   Fixes #16795
   
   ### Motivation
   
   In `Shard` mode:
   1. When consumers execute `redeliverUnacknowledgedMessages`, the unack messages will be stored in the queue `messagesToRedeliver`
   2. `Dispatcher` will first get the messages from the queue `messagesToRedeliver` when receiving `cmd-FLOW` from consumers.
   3.  Messages will be deleted from the queue `messagesToRedeliver` after they are actually sent to consumers.
   
   <strong>(High light)</strong>At step 2: If more than one consumer tries to get messages at the same time, the message in queue `messagesToRedeliver` will be sent to multiple consumers, resulting in repeated consumption.
   
   | step | `consumer-1` | `consumer-2` |
   | --- | --- | --- |
   | 1 | try load messages |  |
   | 2 | peek message from the queue `messagesToRedeliver` | try load messages |
   | 3 | repay messages | peek message from the queue `messagesToRedeliver` |
   | 4 | send message to `consumer-1` | repay messages |
   | 5 |  | send message to `consumer-2` |
   
   Although the above steps are in `synchronized` block, `consumer-2` gets the message before step 4 of `consumer-1`, and repeat consumption occurs
   
   ### Modifications
   
   Before step 5 of `consumer-2`, check that the messages are still in queue `messagesToRedeliver`. If the messages are not in the queue `messagesToRedeliver`, it means that these messages have been sent to consumer-1
   
   ### Documentation
   
   <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
   
   - [ ] `doc` <!-- Your PR contains doc changes. Please attach the local preview screenshots (run `sh start.sh` at `pulsar/site2/website`) to your PR description, or else your PR might not get merged. -->
   - [ ] `doc-required` <!-- Your PR changes impact docs and you will update later -->
   - [x] `doc-not-needed` <!-- Your PR changes do not impact docs -->
   - [ ] `doc-complete` <!-- Docs have been already added -->
   
   ### Matching PR in forked repository
   
   PR in forked repository: 
   - https://github.com/poorbarcode/pulsar/pull/32
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #18227: [do not merge][fix][broker]fix Repeated messages of shared dispatcher

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #18227:
URL: https://github.com/apache/pulsar/pull/18227#discussion_r1010766576


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java:
##########
@@ -583,6 +583,9 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
         if (needTrimAckedMessages()) {
             cursor.trimDeletedEntries(entries);
         }
+        if (readType == ReadType.Replay) {
+            entries.removeIf(entry -> !redeliveryMessages.contains(entry.getLedgerId(), entry.getEntryId()));
+        }

Review Comment:
   I have modified the test `concurrentlyRedeliverAndCloseLastConsumer` to make that the probability of the problem occurring is greater than 50%. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #18227: [fix][broker]fix Repeated messages of shared dispatcher

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #18227:
URL: https://github.com/apache/pulsar/pull/18227#discussion_r1007538502


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java:
##########
@@ -583,6 +583,9 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
         if (needTrimAckedMessages()) {
             cursor.trimDeletedEntries(entries);
         }
+        if (readType == ReadType.Replay) {
+            entries.removeIf(entry -> !redeliveryMessages.contains(entry.getLedgerId(), entry.getEntryId()));
+        }

Review Comment:
   I have tried to remove these lines to check the newly added test `concurrentlyReceiveDeliveriedMessages`
   
   It always gets passed with `invocationCount = 100`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #18227: [do not merge][fix][broker]fix Repeated messages of shared dispatcher

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #18227:
URL: https://github.com/apache/pulsar/pull/18227#discussion_r1010766576


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java:
##########
@@ -583,6 +583,9 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
         if (needTrimAckedMessages()) {
             cursor.trimDeletedEntries(entries);
         }
+        if (readType == ReadType.Replay) {
+            entries.removeIf(entry -> !redeliveryMessages.contains(entry.getLedgerId(), entry.getEntryId()));
+        }

Review Comment:
   I have modified the test to make that the probability of problem occuring is greater than 50%.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on pull request #18227: [fix][broker]fix Repeated messages of shared dispatcher

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on PR #18227:
URL: https://github.com/apache/pulsar/pull/18227#issuecomment-1294345411

   @poorbarcode Interesting. I think the expected behavior is no matter how many consumers are under a subscription. The subscription should only perform the entry read operation one by one.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] github-actions[bot] commented on pull request #18227: [fix][broker]fix Repeated messages of shared dispatcher

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #18227:
URL: https://github.com/apache/pulsar/pull/18227#issuecomment-1345434611

   The pr had no activity for 30 days, mark with Stale label.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org