You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/09/02 22:15:25 UTC

[GitHub] [pulsar] poorbarcode opened a new pull request, #17443: [fix][client]Duplicate messages when use MultiTopicsConsumerImpl

poorbarcode opened a new pull request, #17443:
URL: https://github.com/apache/pulsar/pull/17443

   Fixes 
   - When using the Multi Topics Consumer, the message will be consumed repeatedly
   - Also fix the flaky test: #17147
   
   ### Motivation
   
   TODO
   
   ### Modifications
   
   TODO
   
   ### Documentation
   
   - [ ] `doc-required` 
   
   - [x] `doc-not-needed` 
     
   - [ ] `doc` 
   
   - [ ] `doc-complete`
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on pull request #17443: [fix][client]Duplicate messages when use MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on PR #17443:
URL: https://github.com/apache/pulsar/pull/17443#issuecomment-1250908578

   @poorbarcode Thanks. I understand the problem for now.
   
   There is ML discussion https://lists.apache.org/thread/97o9t4ltkds5pfq41l9xbbd31t41qm8w
   I think we'd better find a general solution for the message duplication
   
   And this one https://lists.apache.org/thread/gnqwxo7w6n6g72ochvgpgv4s6r8mnwb7 is also a duplicated message
   discussion.
   
   These are not exactly the same problem, but tight related.
   
   For this PR. we will introduce many queue operations to avoid duplicated messages. 
   This seems to be a bit expensive.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #17443: [fix][client]Duplicate messages when use MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #17443:
URL: https://github.com/apache/pulsar/pull/17443#discussion_r969114272


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java:
##########
@@ -789,20 +804,31 @@ public CompletableFuture<Void> seekAsync(MessageId messageId) {
                     new PulsarClientException("Illegal messageId, messageId can only be earliest/latest")
             );
         }
-        List<CompletableFuture<Void>> futures = new ArrayList<>(consumers.size());
-        consumers.values().forEach(consumerImpl -> futures.add(consumerImpl.seekAsync(targetMessageId)));
-
-        unAckedMessageTracker.clear();
-        clearIncomingMessages();
-
-        return FutureUtil.waitForAll(futures);
+        return internalSeekAsync(consumer -> consumer.seekAsync(messageId));
     }
 
     @Override
     public CompletableFuture<Void> seekAsync(long timestamp) {
-        List<CompletableFuture<Void>> futures = new ArrayList<>(consumers.size());
-        consumers.values().forEach(consumer -> futures.add(consumer.seekAsync(timestamp)));
-        return FutureUtil.waitForAll(futures);
+        return internalSeekAsync(consumer -> consumer.seekAsync(timestamp));
+    }
+
+    private CompletableFuture<Void> internalSeekAsync(Function<Consumer, CompletableFuture<Void>> childSeekFunction) {
+        pause();
+        CompletableFuture<Void> res = FutureUtil.waitForAll(receiveMessageFutures).thenCompose(__ -> {

Review Comment:
   Good suggestion, but we don't have the operability of `cancel`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on pull request #17443: [fix][client]Duplicate messages when use MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on PR #17443:
URL: https://github.com/apache/pulsar/pull/17443#issuecomment-1241599359

   Hi @codelipenghui 
   
   Already added the description. I'm sorry I didn't finish `TODO` in time


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on pull request #17443: [fix][client]Duplicate messages when use MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on PR #17443:
URL: https://github.com/apache/pulsar/pull/17443#issuecomment-1251242141

   PIP-194 will fix this problem
   
   - https://github.com/apache/pulsar/issues/16757


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] MarvinCai commented on a diff in pull request #17443: [fix][client]Duplicate messages when use MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
MarvinCai commented on code in PR #17443:
URL: https://github.com/apache/pulsar/pull/17443#discussion_r967621298


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java:
##########
@@ -789,20 +804,31 @@ public CompletableFuture<Void> seekAsync(MessageId messageId) {
                     new PulsarClientException("Illegal messageId, messageId can only be earliest/latest")
             );
         }
-        List<CompletableFuture<Void>> futures = new ArrayList<>(consumers.size());
-        consumers.values().forEach(consumerImpl -> futures.add(consumerImpl.seekAsync(targetMessageId)));
-
-        unAckedMessageTracker.clear();
-        clearIncomingMessages();
-
-        return FutureUtil.waitForAll(futures);
+        return internalSeekAsync(consumer -> consumer.seekAsync(messageId));
     }
 
     @Override
     public CompletableFuture<Void> seekAsync(long timestamp) {
-        List<CompletableFuture<Void>> futures = new ArrayList<>(consumers.size());
-        consumers.values().forEach(consumer -> futures.add(consumer.seekAsync(timestamp)));
-        return FutureUtil.waitForAll(futures);
+        return internalSeekAsync(consumer -> consumer.seekAsync(timestamp));
+    }
+
+    private CompletableFuture<Void> internalSeekAsync(Function<Consumer, CompletableFuture<Void>> childSeekFunction) {
+        pause();
+        CompletableFuture<Void> res = FutureUtil.waitForAll(receiveMessageFutures).thenCompose(__ -> {

Review Comment:
   do we need to wait for completion of these ongoing receives as the messages will be discarded anyway, can we just cancel them?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on pull request #17443: [fix][client]Duplicate messages when use MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on PR #17443:
URL: https://github.com/apache/pulsar/pull/17443#issuecomment-1237914274

   This PR should merge into these branches (because this test was appended in branch-2.11): 
   - `branch-2.8`
   - `branch-2.9`
   - `branch-2.10`
   - `branch-2.11`
   - `master`
   
   Note:  new features have been added in version 2.8, so if we want to merge to 2.7, there will be conflicts.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on pull request #17443: [fix][client]Duplicate messages when use MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on PR #17443:
URL: https://github.com/apache/pulsar/pull/17443#issuecomment-1241537749

   @poorbarcode Could you please provide more context about how the race condition will happen?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode closed pull request #17443: [fix][client]Duplicate messages when use MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
poorbarcode closed pull request #17443: [fix][client]Duplicate messages when use MultiTopicsConsumerImpl
URL: https://github.com/apache/pulsar/pull/17443


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #17443: [fix][client]Duplicate messages when use MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #17443:
URL: https://github.com/apache/pulsar/pull/17443#discussion_r969114272


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java:
##########
@@ -789,20 +804,31 @@ public CompletableFuture<Void> seekAsync(MessageId messageId) {
                     new PulsarClientException("Illegal messageId, messageId can only be earliest/latest")
             );
         }
-        List<CompletableFuture<Void>> futures = new ArrayList<>(consumers.size());
-        consumers.values().forEach(consumerImpl -> futures.add(consumerImpl.seekAsync(targetMessageId)));
-
-        unAckedMessageTracker.clear();
-        clearIncomingMessages();
-
-        return FutureUtil.waitForAll(futures);
+        return internalSeekAsync(consumer -> consumer.seekAsync(messageId));
     }
 
     @Override
     public CompletableFuture<Void> seekAsync(long timestamp) {
-        List<CompletableFuture<Void>> futures = new ArrayList<>(consumers.size());
-        consumers.values().forEach(consumer -> futures.add(consumer.seekAsync(timestamp)));
-        return FutureUtil.waitForAll(futures);
+        return internalSeekAsync(consumer -> consumer.seekAsync(timestamp));
+    }
+
+    private CompletableFuture<Void> internalSeekAsync(Function<Consumer, CompletableFuture<Void>> childSeekFunction) {
+        pause();
+        CompletableFuture<Void> res = FutureUtil.waitForAll(receiveMessageFutures).thenCompose(__ -> {

Review Comment:
   Good suggestion, but we don't have the operability of `cancel`, and the waiting-event only happens on this scenario:  call `seek` multi times quickly, along with `read`, this is very rare.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on pull request #17443: [fix][client]Duplicate messages when use MultiTopicsConsumerImpl

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on PR #17443:
URL: https://github.com/apache/pulsar/pull/17443#issuecomment-1237917591

   /pulsarbot rerun-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org