You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/08/18 12:30:03 UTC

[GitHub] [pulsar] nicoloboschi opened a new pull request, #17163: [fix][broker] Avoid messages being repeatedly replayed with SHARED subscriptions

nicoloboschi opened a new pull request, #17163:
URL: https://github.com/apache/pulsar/pull/17163

   ### Motivation
   
   There's a test failing constantly in the flaky suite: 
   https://github.com/apache/pulsar/issues/16795
   
   While investigating, I found out a case when it's possible that the dispatcher replays twice the same message to the consumer. This may happens because the message's replay is async and it's done while reading entries.  
   
   I believe that it may be related to recent changes to the dispatcher: 
   - https://github.com/apache/pulsar/pull/16603
   - https://github.com/apache/pulsar/pull/16812
   - https://github.com/apache/pulsar/pull/16968
   - https://github.com/apache/pulsar/pull/17143
   
   ### Modifications
   * The scheduling of the replay could still happens twice but the messages are filtered out if they do not exist anymore in the dispatcher (per-consumer) replay messages queue. (PersistentDispatcherMultipleConsumers#redeliveryMessages)
   
   The proof that this fix works is that now the test `SimpleProducerConsumerTest.testSharedSamePriorityConsumer` doesn't fail anymore.
   
   Fix #16795
   
   - [x] `doc-not-needed` 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on pull request #17163: [fix][broker] Avoid messages being repeatedly replayed with SHARED subscriptions

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on PR #17163:
URL: https://github.com/apache/pulsar/pull/17163#issuecomment-1220163043

   Hi, @nicoloboschi 
   > The proof that this fix works is that now the test SimpleProducerConsumerTest.testSharedSamePriorityConsumer doesn't fail anymore.
   
   I think it should be `SimpleProducerConsumerTestStreamingDispatcherTest.testSharedSamePriorityConsumer`
   
   By the way, I want to collect some tests that need to move out of the flaky-test group... Because in the current time we may lose some test coverage.
   See #17145


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli commented on a diff in pull request #17163: [fix][broker] Avoid messages being repeatedly replayed with SHARED subscriptions

Posted by GitBox <gi...@apache.org>.
eolivelli commented on code in PR #17163:
URL: https://github.com/apache/pulsar/pull/17163#discussion_r949084305


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java:
##########
@@ -653,9 +655,12 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
 
             // remove positions first from replay list first : sendMessages recycles entries
             if (readType == ReadType.Replay) {
-                entriesForThisConsumer.forEach(entry -> {
-                    redeliveryMessages.remove(entry.getLedgerId(), entry.getEntryId());
-                });
+                // we need to replay messages still present on the redeliveryMessages queue.
+                // It may happen that more than one replay is scheduled for the same message but it
+                // must be actually replayed only once.
+                entriesForThisConsumer = entriesForThisConsumer.stream()

Review Comment:
   This may be a symptom that something else is broken.
   We have read messages that should not have been read.
   
   I wonder if we are replaying the messages in the wrong order.
   may we can accept that because the order is here is not a matter as we are replaying messages
   
   we should also double check the PersistentStickyKeyDispatcherMultipleConsumers, that has a different implementation of  `trySendMessagesToConsumers`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nicoloboschi commented on pull request #17163: [fix][broker] Avoid messages being repeatedly replayed with SHARED subscriptions (streaming dispatcher)

Posted by GitBox <gi...@apache.org>.
nicoloboschi commented on PR #17163:
URL: https://github.com/apache/pulsar/pull/17163#issuecomment-1220769937

   @mattisonchao nice catch!! you're right, that is the missing check
   That also explains why only the streaming test fails.  
   
   Thanks, I've updated the pull with your suggestion


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on pull request #17163: [fix][broker] Avoid messages being repeatedly replayed with SHARED subscriptions

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on PR #17163:
URL: https://github.com/apache/pulsar/pull/17163#issuecomment-1220745626

   Hi, @nicoloboschi 
   >3.a In the next read cycle (readMoreEntries) the replay read is triggered asynchronously.
   3.b At the same time a consumer is ready to receive a new message and then consumerFlow is triggered. And, again,
   
   I'm still wondering why invoking `readMoreEntires` concurrently will got race conditions. I find we may lose the important state `sendInProgress` when invoking `PersistentStreamingDispatcherMultipleConsumers#readMoreEntires` method, After add this state control. the test never failed.
   
    


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nicoloboschi commented on pull request #17163: [fix][broker] Avoid messages being repeatedly replayed with SHARED subscriptions

Posted by GitBox <gi...@apache.org>.
nicoloboschi commented on PR #17163:
URL: https://github.com/apache/pulsar/pull/17163#issuecomment-1219603474

   @eolivelli thanks for the review. The `PersistentStickyKeyDispatcherMultipleConsumers ` had the same issue. I fixed it and added a test 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli merged pull request #17163: [fix][broker] Avoid messages being repeatedly replayed with SHARED subscriptions (streaming dispatcher)

Posted by GitBox <gi...@apache.org>.
eolivelli merged PR #17163:
URL: https://github.com/apache/pulsar/pull/17163


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli commented on a diff in pull request #17163: [fix][broker] Avoid messages being repeatedly replayed with SHARED subscriptions (streaming dispatcher)

Posted by GitBox <gi...@apache.org>.
eolivelli commented on code in PR #17163:
URL: https://github.com/apache/pulsar/pull/17163#discussion_r950563314


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java:
##########
@@ -155,6 +155,11 @@ protected void cancelPendingRead() {
 
     @Override
     public synchronized void readMoreEntries() {
+        if (sendInProgress) {

Review Comment:
   This will have a big performance impact.
   Also the subscription may become stuck.
   If you skip this round you have to rescheule the execution
   
   I can't double check it now, but I am not sure that this is the right fix



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on pull request #17163: [fix][broker] Avoid messages being repeatedly replayed with SHARED subscriptions

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on PR #17163:
URL: https://github.com/apache/pulsar/pull/17163#issuecomment-1220436886

   Thanks @mattisonchao
   
   I think we should point out the race condition or revert `dispatcherDispatchMessagesInSubscriptionThread` in the 2.11.0 release. It will be a potential risk, and there may be other issues just haven't break the test.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli commented on pull request #17163: [fix][broker] Avoid messages being repeatedly replayed with SHARED subscriptions (streaming dispatcher)

Posted by GitBox <gi...@apache.org>.
eolivelli commented on PR #17163:
URL: https://github.com/apache/pulsar/pull/17163#issuecomment-1221247037

   @Technoboy- this change must go into 2.11 as well


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nicoloboschi commented on pull request #17163: [fix][broker] Avoid messages being repeatedly replayed with SHARED subscriptions

Posted by GitBox <gi...@apache.org>.
nicoloboschi commented on PR #17163:
URL: https://github.com/apache/pulsar/pull/17163#issuecomment-1220465436

   Thanks for the reviews @codelipenghui and @mattisonchao.
   I did additional testing and I still think this solution is correct.
   
   Basically the case I'm seeing is the following:
   1. After - normally - reading an entry, the dispatcher calls `sendMessagesToConsumers`. 
   2. The consumers are busy (actually the test properly verify the busy consumers) and so no permits are available for any consumer. The message is added to the `redeliveryMessages` structure.
   3.a In the next read cycle (`readMoreEntries`) the replay read is triggered asynchronously.
   3.b At the same time a consumer is ready to receive a new message and then `consumerFlow` is triggered. And, again, `readMoreEntries`.
   4. Since the cursor read and replay read points the same position, the entry is read twice. (That could be another unwanted issue and may impact performance)
   5. After the read is complete, the send is triggered. The send method doesn't check if in case of replay the message is still on  the `redeliveryMessages` structure. So the message is sent twice (sequentially)
   
   
   
   
   After the recent work (see mentioned pull in the description) the methods `sendMessagesToConsumers` and `readMoreEntries` are no more executed in the same lock and that's why this situation wouldn't have happened before.
   
   The point 4. could be a problem and a new behaviour that will be introduced in 2.11. With this fix we prefer consistency (no duplicated sent to the consumers) over duplicated-reading. We read the same entry twice and perhaps we could optimize this edge-case in another pull. IMO still think this whole work of using different threads is worth.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #17163: [fix][broker] Avoid messages being repeatedly replayed with SHARED subscriptions (streaming dispatcher)

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #17163:
URL: https://github.com/apache/pulsar/pull/17163#discussion_r950629777


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java:
##########
@@ -155,6 +155,11 @@ protected void cancelPendingRead() {
 
     @Override
     public synchronized void readMoreEntries() {
+        if (sendInProgress) {

Review Comment:
   Hi @eolivelli 
   Can you clarify this problem?
   It looks like this logic already exists for `PersistentDispatcherMultipleConsumers`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on pull request #17163: [fix][broker] Avoid messages being repeatedly replayed with SHARED subscriptions

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on PR #17163:
URL: https://github.com/apache/pulsar/pull/17163#issuecomment-1220139888

   I think we have to know the root cause.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on pull request #17163: [fix][broker] Avoid messages being repeatedly replayed with SHARED subscriptions

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on PR #17163:
URL: https://github.com/apache/pulsar/pull/17163#issuecomment-1220144682

   > I think we have to know the root cause.
   
   +1, we should handle this PR with extra care. It will filter out some messages,
   This may cause the message to not be delivered correctly to the client under some conditions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on pull request #17163: [fix][broker] Avoid messages being repeatedly replayed with SHARED subscriptions

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on PR #17163:
URL: https://github.com/apache/pulsar/pull/17163#issuecomment-1220178118

   After the test, I think some race conditions in multi-thread should cause it. 
   I try to disable this configuration:
   `dispatcherDispatchMessagesInSubscriptionThread` run the test 30 times, everything got fine.
   
   ![image](https://user-images.githubusercontent.com/74767115/185530522-bcbf8260-2656-47cf-aa91-c0198cb7975b.png)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on pull request #17163: [fix][broker] Avoid messages being repeatedly replayed with SHARED subscriptions (streaming dispatcher)

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on PR #17163:
URL: https://github.com/apache/pulsar/pull/17163#issuecomment-1220772938

   I think we may check the relative logic, to avoid another problem. Because I think this kind of logic is **very** important.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nicoloboschi commented on a diff in pull request #17163: [fix][broker] Avoid messages being repeatedly replayed with SHARED subscriptions

Posted by GitBox <gi...@apache.org>.
nicoloboschi commented on code in PR #17163:
URL: https://github.com/apache/pulsar/pull/17163#discussion_r949253131


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java:
##########
@@ -653,9 +655,12 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
 
             // remove positions first from replay list first : sendMessages recycles entries
             if (readType == ReadType.Replay) {
-                entriesForThisConsumer.forEach(entry -> {
-                    redeliveryMessages.remove(entry.getLedgerId(), entry.getEntryId());
-                });
+                // we need to replay messages still present on the redeliveryMessages queue.
+                // It may happen that more than one replay is scheduled for the same message but it
+                // must be actually replayed only once.
+                entriesForThisConsumer = entriesForThisConsumer.stream()

Review Comment:
   I think it's a natural behavior now that ´readMoreEntries´ is async. 
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org