You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/06/22 09:15:08 UTC

[GitHub] [pulsar] nodece opened a new pull request, #16171: [fix][client] Fix duplicate messages caused by seek

nodece opened a new pull request, #16171:
URL: https://github.com/apache/pulsar/pull/16171

   Signed-off-by: Zixuan Liu <no...@gmail.com>
   
   Master Issue: #<xyz>
   
   ### Motivation
   
   When subscribing to a message on a partitioned topic, do the seek operation and then consume the messages, which sometimes will receive duplicate messages. 
   
   The root cause is that when a seek operation is performed if have the task in the `pendingReceives` queue, we will get old messages from before doing the seek operation.
   
   ### Modifications
   
   - Add a seek check to the logic of the received message, wthe seek operation is in progress, skip put the message to `incomingMessages` queue
   
   ### Verifying this change
   
   `org.apache.pulsar.broker.service.SubscriptionSeekTest#testSeekByFunctionAndMultiTopic` cover this changes.
   
   ### Documentation
   
   - [x] `doc-not-needed` 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on pull request #16171: [fix][client] Fix duplicate messages caused by seek

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on PR #16171:
URL: https://github.com/apache/pulsar/pull/16171#issuecomment-1168351385

   Yes this PR fixes the race condition when `seek` is called after `subscribe`, but I think there is another bug that `subscribe` doesn't looks like a synchronous operation. Without this PR, the test could still pass after sleeping for a while before `seek`.
   
   ```java
           org.apache.pulsar.client.api.Consumer<String> consumer = pulsarClient
                   .newConsumer(Schema.STRING).startMessageIdInclusive()
                   .topics(Arrays.asList(topicName, topicName2)).subscriptionName("my-sub").subscribe();
   
           Thread.sleep(2000);
   
           consumer.seek((partitionedTopic) -> {
   ```
   
   I think the root cause is that `MultiTopicsConsumerImpl#subscribeFuture()` completed too early.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #16171: [fix][client] Fix duplicate messages caused by seek

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #16171:
URL: https://github.com/apache/pulsar/pull/16171#discussion_r995275721


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java:
##########
@@ -252,6 +252,10 @@ private void receiveMessageFromConsumer(ConsumerImpl<T> consumer, boolean batchR
             messagesFuture = consumer.receiveAsync().thenApply(Collections::singletonList);
         }
         messagesFuture.thenAcceptAsync(messages -> {
+            if (consumer.isDuringSeek()) {
+                receiveMessageFromConsumer(consumer, batchReceive);

Review Comment:
   Ok, I guess the fix can only fix the case that the MultiTopicConsumer poll messages from the internal consumer, but it can't fix the issue that the user facing. After the message polled from the internal consumer to the queue of the MultiTopicConsumer, the user will still have chance to get duplicated messages during the seek operation?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nodece commented on pull request #16171: [fix][client] Fix duplicate messages caused by seek

Posted by GitBox <gi...@apache.org>.
nodece commented on PR #16171:
URL: https://github.com/apache/pulsar/pull/16171#issuecomment-1164141101

   > > org.apache.pulsar.broker.service.SubscriptionSeekTest#testSeekByFunctionAndMultiTopic cover this changes.
   > 
   > Why didn't it fail before?
   
   @BewareMyPower I don't know how to explain, the master branch cannot reproduce this issue, but the #15568 can reproduce this issue.
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nodece commented on pull request #16171: [fix][client] Fix duplicate messages caused by seek

Posted by GitBox <gi...@apache.org>.
nodece commented on PR #16171:
URL: https://github.com/apache/pulsar/pull/16171#issuecomment-1168489733

   /pulsarbot rerun-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nodece commented on pull request #16171: [fix][client] Fix duplicate messages caused by seek

Posted by GitBox <gi...@apache.org>.
nodece commented on PR #16171:
URL: https://github.com/apache/pulsar/pull/16171#issuecomment-1163850785

   /pulsarbot rerun-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #16171: [fix][client] Fix duplicate messages caused by seek

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #16171:
URL: https://github.com/apache/pulsar/pull/16171#discussion_r991859379


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java:
##########
@@ -252,6 +252,10 @@ private void receiveMessageFromConsumer(ConsumerImpl<T> consumer, boolean batchR
             messagesFuture = consumer.receiveAsync().thenApply(Collections::singletonList);
         }
         messagesFuture.thenAcceptAsync(messages -> {
+            if (consumer.isDuringSeek()) {
+                receiveMessageFromConsumer(consumer, batchReceive);

Review Comment:
   When `incomingMessages` is not empty, there are two possibilities:
   1. loop until there is no message in the `incomingMessages`
   2. If `ConsumerImpl.seek` fails, the messages have popped from `incomingMessages` cannot be consumed until `redeliver` executes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] github-actions[bot] commented on pull request #16171: [fix][client] Fix duplicate messages caused by seek

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #16171:
URL: https://github.com/apache/pulsar/pull/16171#issuecomment-1226691840

   The pr had no activity for 30 days, mark with Stale label.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nodece commented on a diff in pull request #16171: [fix][client] Fix duplicate messages caused by seek

Posted by GitBox <gi...@apache.org>.
nodece commented on code in PR #16171:
URL: https://github.com/apache/pulsar/pull/16171#discussion_r995294949


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java:
##########
@@ -252,6 +252,10 @@ private void receiveMessageFromConsumer(ConsumerImpl<T> consumer, boolean batchR
             messagesFuture = consumer.receiveAsync().thenApply(Collections::singletonList);
         }
         messagesFuture.thenAcceptAsync(messages -> {
+            if (consumer.isDuringSeek()) {
+                receiveMessageFromConsumer(consumer, batchReceive);

Review Comment:
   > Ok, I guess the fix can only fix the case that the MultiTopicConsumer poll messages from the internal consumer.
   
   Right.
   
   > the user will still have chance to get duplicated messages during the seek operation?
   
   I think we need to figure out the details of this seek. 
   
   1. When to clean `incomingMessages`, after or before seek?
   2. During the seek, can the client continue to consume the `incomingMessages`?
   
    
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #16171: [fix][client] Fix duplicate messages caused by seek

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #16171:
URL: https://github.com/apache/pulsar/pull/16171#discussion_r908087403


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java:
##########
@@ -248,6 +248,10 @@ private void startReceivingMessages(List<ConsumerImpl<T>> newConsumers) {
 
     private void receiveMessageFromConsumer(ConsumerImpl<T> consumer) {
         consumer.receiveAsync().thenAcceptAsync(message -> {
+            if (consumer.isDuringSeek()) {
+                receiveMessageFromConsumer(consumer);

Review Comment:
   This will introduce a busy loop here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on pull request #16171: [fix][client] Fix duplicate messages caused by seek

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on PR #16171:
URL: https://github.com/apache/pulsar/pull/16171#issuecomment-1164249765

   Sure.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nodece commented on a diff in pull request #16171: [fix][client] Fix duplicate messages caused by seek

Posted by GitBox <gi...@apache.org>.
nodece commented on code in PR #16171:
URL: https://github.com/apache/pulsar/pull/16171#discussion_r908224571


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java:
##########
@@ -248,6 +248,10 @@ private void startReceivingMessages(List<ConsumerImpl<T>> newConsumers) {
 
     private void receiveMessageFromConsumer(ConsumerImpl<T> consumer) {
         consumer.receiveAsync().thenAcceptAsync(message -> {
+            if (consumer.isDuringSeek()) {
+                receiveMessageFromConsumer(consumer);

Review Comment:
   Increase timeout to 4 in test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nodece commented on a diff in pull request #16171: [fix][client] Fix duplicate messages caused by seek

Posted by GitBox <gi...@apache.org>.
nodece commented on code in PR #16171:
URL: https://github.com/apache/pulsar/pull/16171#discussion_r994136417


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java:
##########
@@ -252,6 +252,10 @@ private void receiveMessageFromConsumer(ConsumerImpl<T> consumer, boolean batchR
             messagesFuture = consumer.receiveAsync().thenApply(Collections::singletonList);
         }
         messagesFuture.thenAcceptAsync(messages -> {
+            if (consumer.isDuringSeek()) {
+                receiveMessageFromConsumer(consumer, batchReceive);

Review Comment:
   > 2\. If `ConsumerImpl.seek` fails, the messages have popped from `incomingMessages` cannot be consumed until `redeliver` executes
   
   Good catch! We need to consider this.
   
   > It looks like not only from the MultiTopicConsumer.
   
   Yes. 
   
   >  If it is from the user side, using a consumer to receive the messages and another thread try to seek the subscription to another position, they will also receive the duplicated messages right?
   
   For `MultiTopicConsumer`, this is right, because the `MultiTopicConsumer` has a loop to pulling the messsage.
   
   
   
   
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #16171: [fix][client] Fix duplicate messages caused by seek

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #16171:
URL: https://github.com/apache/pulsar/pull/16171#discussion_r994093124


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java:
##########
@@ -252,6 +252,10 @@ private void receiveMessageFromConsumer(ConsumerImpl<T> consumer, boolean batchR
             messagesFuture = consumer.receiveAsync().thenApply(Collections::singletonList);
         }
         messagesFuture.thenAcceptAsync(messages -> {
+            if (consumer.isDuringSeek()) {
+                receiveMessageFromConsumer(consumer, batchReceive);

Review Comment:
   I also have a question about this part.
   
   It looks like not only from the MultiTopicConsumer. If it is from the user side, using a consumer to receive the messages and another thread try to seek the subscription to another position, they will also receive the duplicated messages right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on pull request #16171: [fix][client] Fix duplicate messages caused by seek

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on PR #16171:
URL: https://github.com/apache/pulsar/pull/16171#issuecomment-1164144735

   Did you mean after applying the changes of #15568 will this test fail?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nodece commented on pull request #16171: [fix][client] Fix duplicate messages caused by seek

Posted by GitBox <gi...@apache.org>.
nodece commented on PR #16171:
URL: https://github.com/apache/pulsar/pull/16171#issuecomment-1164240392

   > Did you mean after applying the changes of #15568 will this test fail?
   
   Yes, could you take a look this issue?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nodece commented on pull request #16171: [fix][client] Fix duplicate messages caused by seek

Posted by "nodece (via GitHub)" <gi...@apache.org>.
nodece commented on PR #16171:
URL: https://github.com/apache/pulsar/pull/16171#issuecomment-1483881115

   No reviewer, so close this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] hicolour commented on pull request #16171: [fix][client] Fix duplicate messages caused by seek

Posted by "hicolour (via GitHub)" <gi...@apache.org>.
hicolour commented on PR #16171:
URL: https://github.com/apache/pulsar/pull/16171#issuecomment-1531524571

   Hey @nodece, why this PR was closed? Does it it mean that this PR and https://github.com/apache/pulsar/pull/15568 will not be  delivered to the maistream ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on pull request #16171: [fix][client] Fix duplicate messages caused by seek

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on PR #16171:
URL: https://github.com/apache/pulsar/pull/16171#issuecomment-1164115040

   > org.apache.pulsar.broker.service.SubscriptionSeekTest#testSeekByFunctionAndMultiTopic cover this changes.
   
   Why didn't it fail before?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #16171: [fix][client] Fix duplicate messages caused by seek

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #16171:
URL: https://github.com/apache/pulsar/pull/16171#discussion_r908153909


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java:
##########
@@ -248,6 +248,10 @@ private void startReceivingMessages(List<ConsumerImpl<T>> newConsumers) {
 
     private void receiveMessageFromConsumer(ConsumerImpl<T> consumer) {
         consumer.receiveAsync().thenAcceptAsync(message -> {
+            if (consumer.isDuringSeek()) {
+                receiveMessageFromConsumer(consumer);

Review Comment:
   Maybe not. But after adding the following code change to simulate a large latency in `clearReceiverQueue`:
   
   ```java
       private BatchMessageIdImpl clearReceiverQueue() {
           /* ... */
   
           try {
               Thread.sleep(500);
           } catch (InterruptedException e) {
               e.printStackTrace();
           }
   ```
   
   The test failed again.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nodece closed pull request #16171: [fix][client] Fix duplicate messages caused by seek

Posted by "nodece (via GitHub)" <gi...@apache.org>.
nodece closed pull request #16171: [fix][client] Fix duplicate messages caused by seek
URL: https://github.com/apache/pulsar/pull/16171


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nodece commented on pull request #16171: [fix][client] Fix duplicate messages caused by seek

Posted by "nodece (via GitHub)" <gi...@apache.org>.
nodece commented on PR #16171:
URL: https://github.com/apache/pulsar/pull/16171#issuecomment-1533379391

   > Hey @nodece, why this PR was closed? 
   
   Currently, there is no reviewer pushing this PR, so I decided to close this PR. If anyone is willing to push, I will reopen.
   
   > Does it it mean that this PR and #15568 will not be delivered to the maistream ?
   
   Right!
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org