You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by "aloyszhang (via GitHub)" <gi...@apache.org> on 2023/02/13 06:15:43 UTC

[GitHub] [pulsar] aloyszhang opened a new pull request, #19498: [fix][client] fix receive dumplicated messages after seek to a timestamp in MultiTo…

aloyszhang opened a new pull request, #19498:
URL: https://github.com/apache/pulsar/pull/19498

   ### Motivation
   
   For `MultiTopicConsumerImpl`, after seeking a timestamp, the consumer may receive duplicated messages.
   
   ### Modifications
   
   Clearing the `incommingQueue` for `MultiTopicConsumerImpl` after seeking a timestamp success
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   ### Documentation
   
   <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
   
   - [ ] `doc-not-needed` <!-- Your PR changes do not impact docs -->
   
   
   ### Matching PR in forked repository
   
   PR in forked repository: <!-- ENTER URL HERE -->
   
   <!--
   After opening this PR, the build in apache/pulsar will fail and instructions will
   be provided for opening a PR in the PR author's forked repository.
   
   apache/pulsar pull requests should be first tested in your own fork since the 
   apache/pulsar CI based on GitHub Actions has constrained resources and quota.
   GitHub Actions provides separate quota for pull requests that are executed in 
   a forked repository.
   
   The tests will be run in the forked repository until all PR review comments have
   been handled, the tests pass and the PR is approved by a reviewer.
   -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lhotari commented on a diff in pull request #19498: [fix][client] fix receive dumplicated messages after seek to a timestamp in MultiTo…

Posted by "lhotari (via GitHub)" <gi...@apache.org>.
lhotari commented on code in PR #19498:
URL: https://github.com/apache/pulsar/pull/19498#discussion_r1105541732


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java:
##########
@@ -234,7 +239,21 @@ private void startReceivingMessages(List<ConsumerImpl<T>> newConsumers) {
         }
     }
 
+    private void waitForSeekComplete() {
+        while (duringSeek.get()) {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] [{}] is during seek processing", topic, subscription);
+            }
+            try {
+                Thread.sleep(seekCompleteCheckTicketMs);

Review Comment:
   Thread.sleep shouldn't be run on any Netty threads. I think this solution has to be revisited. Which type of thread pool is this code running on?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] aloyszhang commented on a diff in pull request #19498: [fix][client] fix receive dumplicated messages after seek to a timestamp in MultiTo…

Posted by "aloyszhang (via GitHub)" <gi...@apache.org>.
aloyszhang commented on code in PR #19498:
URL: https://github.com/apache/pulsar/pull/19498#discussion_r1105249065


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java:
##########
@@ -790,7 +790,10 @@ public CompletableFuture<Void> seekAsync(MessageId messageId) {
     public CompletableFuture<Void> seekAsync(long timestamp) {
         List<CompletableFuture<Void>> futures = new ArrayList<>(consumers.size());
         consumers.values().forEach(consumer -> futures.add(consumer.seekAsync(timestamp)));
-        return FutureUtil.waitForAll(futures);
+        return FutureUtil.waitForAll(futures).thenApply(f -> {
+                    clearIncomingMessages();

Review Comment:
   You're right, there is a chance for data loss.
   Change to draft first.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] AnonHxy commented on a diff in pull request #19498: [fix][client] fix receive dumplicated messages after seek to a timestamp in MultiTo…

Posted by "AnonHxy (via GitHub)" <gi...@apache.org>.
AnonHxy commented on code in PR #19498:
URL: https://github.com/apache/pulsar/pull/19498#discussion_r1105230975


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java:
##########
@@ -790,7 +790,10 @@ public CompletableFuture<Void> seekAsync(MessageId messageId) {
     public CompletableFuture<Void> seekAsync(long timestamp) {
         List<CompletableFuture<Void>> futures = new ArrayList<>(consumers.size());
         consumers.values().forEach(consumer -> futures.add(consumer.seekAsync(timestamp)));
-        return FutureUtil.waitForAll(futures);
+        return FutureUtil.waitForAll(futures).thenApply(f -> {
+                    clearIncomingMessages();

Review Comment:
   It looks that line2148 has already clear the incoming messages queue, I wonder why we also need do it again here
   https://github.com/apache/pulsar/blob/af1b6e16ad9ffc0f5fad532e71c25e3a33e389c5/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L2133-L2149



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] aloyszhang commented on pull request #19498: [fix][client] fix receive dumplicated messages after seek to a timestamp in MultiTo…

Posted by "aloyszhang (via GitHub)" <gi...@apache.org>.
aloyszhang commented on PR #19498:
URL: https://github.com/apache/pulsar/pull/19498#issuecomment-1430816848

   PTAL @poorbarcode @AnonHxy 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lhotari commented on a diff in pull request #19498: [fix][client] fix receive dumplicated messages after seek to a timestamp in MultiTo…

Posted by "lhotari (via GitHub)" <gi...@apache.org>.
lhotari commented on code in PR #19498:
URL: https://github.com/apache/pulsar/pull/19498#discussion_r1105544626


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java:
##########
@@ -716,31 +735,46 @@ protected void completeOpBatchReceive(OpBatchReceive<T> op) {
         resumeReceivingFromPausedConsumersIfNeeded();
     }
 
+    private void setDuringSeek() throws PulsarClientException {
+        if (!duringSeek.compareAndSet(false, true)) {
+            throw new SeekConflictException("Another seek is on going");
+        }
+    }
+
     @Override
     public void seek(MessageId messageId) throws PulsarClientException {
+        setDuringSeek();
         try {
             seekAsync(messageId).get();
         } catch (Exception e) {
+            duringSeek.set(false);
             throw PulsarClientException.unwrap(e);
         }
+        duringSeek.set(false);
     }
 
     @Override
     public void seek(long timestamp) throws PulsarClientException {
+        setDuringSeek();
         try {
             seekAsync(timestamp).get();
         } catch (Exception e) {
+            duringSeek.set(false);

Review Comment:
   use a finally block instead



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java:
##########
@@ -716,31 +735,46 @@ protected void completeOpBatchReceive(OpBatchReceive<T> op) {
         resumeReceivingFromPausedConsumersIfNeeded();
     }
 
+    private void setDuringSeek() throws PulsarClientException {
+        if (!duringSeek.compareAndSet(false, true)) {
+            throw new SeekConflictException("Another seek is on going");
+        }
+    }
+
     @Override
     public void seek(MessageId messageId) throws PulsarClientException {
+        setDuringSeek();
         try {
             seekAsync(messageId).get();
         } catch (Exception e) {
+            duringSeek.set(false);
             throw PulsarClientException.unwrap(e);
         }
+        duringSeek.set(false);
     }
 
     @Override
     public void seek(long timestamp) throws PulsarClientException {
+        setDuringSeek();
         try {
             seekAsync(timestamp).get();
         } catch (Exception e) {
+            duringSeek.set(false);
             throw PulsarClientException.unwrap(e);
         }
+        duringSeek.set(false);
     }
 
     @Override
     public void seek(Function<String, Object> function) throws PulsarClientException {
+        setDuringSeek();
         try {
             seekAsync(function).get();
         } catch (Exception e) {
+            duringSeek.set(false);
             throw PulsarClientException.unwrap(e);
         }
+        duringSeek.set(false);

Review Comment:
   use a finally block instead



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lhotari commented on a diff in pull request #19498: [fix][client] fix receive dumplicated messages after seek to a timestamp in MultiTo…

Posted by "lhotari (via GitHub)" <gi...@apache.org>.
lhotari commented on code in PR #19498:
URL: https://github.com/apache/pulsar/pull/19498#discussion_r1105541732


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java:
##########
@@ -234,7 +239,21 @@ private void startReceivingMessages(List<ConsumerImpl<T>> newConsumers) {
         }
     }
 
+    private void waitForSeekComplete() {
+        while (duringSeek.get()) {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] [{}] is during seek processing", topic, subscription);
+            }
+            try {
+                Thread.sleep(seekCompleteCheckTicketMs);

Review Comment:
   Thread.sleep shouldn't be run on any Netty threads. I think this solution has to be revisited. Which type of thread pool is this code running on?
   Bookkeeper code includes Thread.sleep, but we should avoid that in Pulsar code base.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] github-actions[bot] commented on pull request #19498: [fix][client] fix receive dumplicated messages after seek to a timestamp in MultiTo…

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #19498:
URL: https://github.com/apache/pulsar/pull/19498#issuecomment-1427407660

   @aloyszhang Please add the following content to your PR description and select a checkbox:
   ```
   - [ ] `doc` <!-- Your PR contains doc changes -->
   - [ ] `doc-required` <!-- Your PR changes impact docs and you will update later -->
   - [ ] `doc-not-needed` <!-- Your PR changes do not impact docs -->
   - [ ] `doc-complete` <!-- Docs have been already added -->
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][client] fix receive dumplicated messages after seek to a timestamp in MultiTo… [pulsar]

Posted by "aloyszhang (via GitHub)" <gi...@apache.org>.
aloyszhang closed pull request #19498: [fix][client] fix receive dumplicated messages after seek to a timestamp in MultiTo…
URL: https://github.com/apache/pulsar/pull/19498


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #19498: [fix][client] fix receive dumplicated messages after seek to a timestamp in MultiTo…

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode commented on code in PR #19498:
URL: https://github.com/apache/pulsar/pull/19498#discussion_r1107343111


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java:
##########
@@ -790,7 +790,10 @@ public CompletableFuture<Void> seekAsync(MessageId messageId) {
     public CompletableFuture<Void> seekAsync(long timestamp) {
         List<CompletableFuture<Void>> futures = new ArrayList<>(consumers.size());
         consumers.values().forEach(consumer -> futures.add(consumer.seekAsync(timestamp)));
-        return FutureUtil.waitForAll(futures);
+        return FutureUtil.waitForAll(futures).thenApply(f -> {
+                    clearIncomingMessages();

Review Comment:
   If a task runs at this line below, we call it `receiving-task`.
   
   https://github.com/apache/pulsar/blob/f9af4245e0b05c382656fc674fdaeda26487258c/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java#L244
   
   Then is there a scenario like this?
   
   | time | `receiving-task` | `seek task` |
   | --- | --- | --- |
   | 1 | `receiving-task` started, and it is not finished | | 
   | 2 |  | seek | 
   | 3 |  | clear the `incoming queue` after seeking |
   | 4 | `receiving-task` finished, and push messages to `incoming queue` |
   
   Then the consumer receives duplicated messages



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] aloyszhang commented on pull request #19498: [fix][client] fix receive dumplicated messages after seek to a timestamp in MultiTo…

Posted by "aloyszhang (via GitHub)" <gi...@apache.org>.
aloyszhang commented on PR #19498:
URL: https://github.com/apache/pulsar/pull/19498#issuecomment-1430747609

   cc @lhotari 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] aloyszhang commented on a diff in pull request #19498: [fix][client] fix receive dumplicated messages after seek to a timestamp in MultiTo…

Posted by "aloyszhang (via GitHub)" <gi...@apache.org>.
aloyszhang commented on code in PR #19498:
URL: https://github.com/apache/pulsar/pull/19498#discussion_r1105233965


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java:
##########
@@ -790,7 +790,10 @@ public CompletableFuture<Void> seekAsync(MessageId messageId) {
     public CompletableFuture<Void> seekAsync(long timestamp) {
         List<CompletableFuture<Void>> futures = new ArrayList<>(consumers.size());
         consumers.values().forEach(consumer -> futures.add(consumer.seekAsync(timestamp)));
-        return FutureUtil.waitForAll(futures);
+        return FutureUtil.waitForAll(futures).thenApply(f -> {
+                    clearIncomingMessages();

Review Comment:
   This only clears the incoming message queue of `ConsumerImpl`,  the `MultiTopicConsumer` also has its own  incoming message queue, messages be put into this queue from `ConsumerImpl` which may leads to duplicated messages.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] aloyszhang commented on a diff in pull request #19498: [fix][client] fix receive dumplicated messages after seek to a timestamp in MultiTo…

Posted by "aloyszhang (via GitHub)" <gi...@apache.org>.
aloyszhang commented on code in PR #19498:
URL: https://github.com/apache/pulsar/pull/19498#discussion_r1105233965


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java:
##########
@@ -790,7 +790,10 @@ public CompletableFuture<Void> seekAsync(MessageId messageId) {
     public CompletableFuture<Void> seekAsync(long timestamp) {
         List<CompletableFuture<Void>> futures = new ArrayList<>(consumers.size());
         consumers.values().forEach(consumer -> futures.add(consumer.seekAsync(timestamp)));
-        return FutureUtil.waitForAll(futures);
+        return FutureUtil.waitForAll(futures).thenApply(f -> {
+                    clearIncomingMessages();

Review Comment:
   This only clears the incoming message queue of `ConsumerImpl`,  the `MultiTopicConsumer` also has its own incoming message queue, messages  put into this queue from `ConsumerImpl` may lead to duplicated messages.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] aloyszhang commented on a diff in pull request #19498: [fix][client] fix receive dumplicated messages after seek to a timestamp in MultiTo…

Posted by "aloyszhang (via GitHub)" <gi...@apache.org>.
aloyszhang commented on code in PR #19498:
URL: https://github.com/apache/pulsar/pull/19498#discussion_r1105233281


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java:
##########
@@ -790,7 +790,10 @@ public CompletableFuture<Void> seekAsync(MessageId messageId) {
     public CompletableFuture<Void> seekAsync(long timestamp) {
         List<CompletableFuture<Void>> futures = new ArrayList<>(consumers.size());
         consumers.values().forEach(consumer -> futures.add(consumer.seekAsync(timestamp)));
-        return FutureUtil.waitForAll(futures);
+        return FutureUtil.waitForAll(futures).thenApply(f -> {
+                    clearIncomingMessages();

Review Comment:
   I'll check this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #19498: [fix][client] fix receive dumplicated messages after seek to a timestamp in MultiTo…

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode commented on code in PR #19498:
URL: https://github.com/apache/pulsar/pull/19498#discussion_r1107343111


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java:
##########
@@ -790,7 +790,10 @@ public CompletableFuture<Void> seekAsync(MessageId messageId) {
     public CompletableFuture<Void> seekAsync(long timestamp) {
         List<CompletableFuture<Void>> futures = new ArrayList<>(consumers.size());
         consumers.values().forEach(consumer -> futures.add(consumer.seekAsync(timestamp)));
-        return FutureUtil.waitForAll(futures);
+        return FutureUtil.waitForAll(futures).thenApply(f -> {
+                    clearIncomingMessages();

Review Comment:
   If a task runs at this line below, we call it `receiving-task`.
   
   https://github.com/apache/pulsar/blob/f9af4245e0b05c382656fc674fdaeda26487258c/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java#L244
   
   Then is there a scenario like this?
   
   | time | `receiving-task` | `seek task` |
   | --- | --- | --- |
   | 1 | `receiving-task` started, and it is not finished | | 
   | 2 |  | seek | 
   | 3 |  | clear the `incoming queue` after seeking |
   | 4 | `receiving-task` finished, and push messages to `incoming queue` |
   | 5 |  | receive messages |
   
   Then the consumer receives duplicated messages



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lhotari commented on a diff in pull request #19498: [fix][client] fix receive dumplicated messages after seek to a timestamp in MultiTo…

Posted by "lhotari (via GitHub)" <gi...@apache.org>.
lhotari commented on code in PR #19498:
URL: https://github.com/apache/pulsar/pull/19498#discussion_r1105544031


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java:
##########
@@ -716,31 +735,46 @@ protected void completeOpBatchReceive(OpBatchReceive<T> op) {
         resumeReceivingFromPausedConsumersIfNeeded();
     }
 
+    private void setDuringSeek() throws PulsarClientException {
+        if (!duringSeek.compareAndSet(false, true)) {
+            throw new SeekConflictException("Another seek is on going");
+        }
+    }
+
     @Override
     public void seek(MessageId messageId) throws PulsarClientException {
+        setDuringSeek();
         try {
             seekAsync(messageId).get();
         } catch (Exception e) {
+            duringSeek.set(false);
             throw PulsarClientException.unwrap(e);
         }
+        duringSeek.set(false);

Review Comment:
   use a finally block instead



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] aloyszhang commented on a diff in pull request #19498: [fix][client] fix receive dumplicated messages after seek to a timestamp in MultiTo…

Posted by "aloyszhang (via GitHub)" <gi...@apache.org>.
aloyszhang commented on code in PR #19498:
URL: https://github.com/apache/pulsar/pull/19498#discussion_r1105589040


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java:
##########
@@ -716,31 +735,46 @@ protected void completeOpBatchReceive(OpBatchReceive<T> op) {
         resumeReceivingFromPausedConsumersIfNeeded();
     }
 
+    private void setDuringSeek() throws PulsarClientException {
+        if (!duringSeek.compareAndSet(false, true)) {
+            throw new SeekConflictException("Another seek is on going");
+        }
+    }
+
     @Override
     public void seek(MessageId messageId) throws PulsarClientException {
+        setDuringSeek();
         try {
             seekAsync(messageId).get();
         } catch (Exception e) {
+            duringSeek.set(false);
             throw PulsarClientException.unwrap(e);
         }
+        duringSeek.set(false);

Review Comment:
   sure



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] github-actions[bot] commented on pull request #19498: [fix][client] fix receive dumplicated messages after seek to a timestamp in MultiTo…

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #19498:
URL: https://github.com/apache/pulsar/pull/19498#issuecomment-1474583536

   The pr had no activity for 30 days, mark with Stale label.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] github-actions[bot] commented on pull request #19498: [fix][client] fix receive dumplicated messages after seek to a timestamp in MultiTo…

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #19498:
URL: https://github.com/apache/pulsar/pull/19498#issuecomment-1543177159

   The pr had no activity for 30 days, mark with Stale label.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on pull request #19498: [fix][client] fix receive dumplicated messages after seek to a timestamp in MultiTo…

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode commented on PR #19498:
URL: https://github.com/apache/pulsar/pull/19498#issuecomment-1501538346

   Since we will start the RC version of `3.0.0` on `2023-04-11`, I will change the label/milestone of PR who have not been merged.
   - The PR of type `feature` is deferred to `3.1.0`
   - The PR of type `fix` is deferred to `3.0.1`
   
   So drag this PR to `3.0.1`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #19498: [fix][client] fix receive dumplicated messages after seek to a timestamp in MultiTo…

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode commented on code in PR #19498:
URL: https://github.com/apache/pulsar/pull/19498#discussion_r1104497781


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java:
##########
@@ -790,7 +790,10 @@ public CompletableFuture<Void> seekAsync(MessageId messageId) {
     public CompletableFuture<Void> seekAsync(long timestamp) {
         List<CompletableFuture<Void>> futures = new ArrayList<>(consumers.size());
         consumers.values().forEach(consumer -> futures.add(consumer.seekAsync(timestamp)));
-        return FutureUtil.waitForAll(futures);
+        return FutureUtil.waitForAll(futures).thenApply(f -> {
+                    clearIncomingMessages();

Review Comment:
   Is there a scenario like this?
   - inner-consumer-1 seeks finished
   - inner-consumer-2 seeks finished
   - receive messages from inner-consumer-1, and push them to incoming-queue
   - clear incoming-queue
   
   then some messages are lost.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] aloyszhang commented on a diff in pull request #19498: [fix][client] fix receive dumplicated messages after seek to a timestamp in MultiTo…

Posted by "aloyszhang (via GitHub)" <gi...@apache.org>.
aloyszhang commented on code in PR #19498:
URL: https://github.com/apache/pulsar/pull/19498#discussion_r1105526835


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java:
##########
@@ -790,7 +790,10 @@ public CompletableFuture<Void> seekAsync(MessageId messageId) {
     public CompletableFuture<Void> seekAsync(long timestamp) {
         List<CompletableFuture<Void>> futures = new ArrayList<>(consumers.size());
         consumers.values().forEach(consumer -> futures.add(consumer.seekAsync(timestamp)));
-        return FutureUtil.waitForAll(futures);
+        return FutureUtil.waitForAll(futures).thenApply(f -> {
+                    clearIncomingMessages();

Review Comment:
   Update, PTAL again.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lhotari commented on a diff in pull request #19498: [fix][client] fix receive dumplicated messages after seek to a timestamp in MultiTo…

Posted by "lhotari (via GitHub)" <gi...@apache.org>.
lhotari commented on code in PR #19498:
URL: https://github.com/apache/pulsar/pull/19498#discussion_r1105548965


##########
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java:
##########
@@ -628,6 +628,15 @@
      */
     ConsumerBuilder<T> autoUpdatePartitionsInterval(int interval, TimeUnit unit);
 
+    /**
+     * Sets the interval of checking whether seek is complete interval.
+     *
+     * @param interval
+     *            the interval of check ticket.
+     * @return the consumer builder instance
+     */
+    ConsumerBuilder<T> seekCompleteCheckTicketMs(int interval);

Review Comment:
   the name "seekCompleteCheckTicketMs" is very confusing. A better name is "seekCompletionPollIntervalMs" etc. . I wonder if the timeunit solution should be used to be consistent with other methods that configure time intervals.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] aloyszhang commented on a diff in pull request #19498: [fix][client] fix receive dumplicated messages after seek to a timestamp in MultiTo…

Posted by "aloyszhang (via GitHub)" <gi...@apache.org>.
aloyszhang commented on code in PR #19498:
URL: https://github.com/apache/pulsar/pull/19498#discussion_r1105567556


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java:
##########
@@ -234,7 +239,21 @@ private void startReceivingMessages(List<ConsumerImpl<T>> newConsumers) {
         }
     }
 
+    private void waitForSeekComplete() {
+        while (duringSeek.get()) {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] [{}] is during seek processing", topic, subscription);
+            }
+            try {
+                Thread.sleep(seekCompleteCheckTicketMs);

Review Comment:
   It's not a Netty thread. 
   > Bookkeeper code includes Thread.sleep, but we should avoid that in Pulsar code base.
   change to schedule later.



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java:
##########
@@ -234,7 +239,21 @@ private void startReceivingMessages(List<ConsumerImpl<T>> newConsumers) {
         }
     }
 
+    private void waitForSeekComplete() {
+        while (duringSeek.get()) {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] [{}] is during seek processing", topic, subscription);
+            }
+            try {
+                Thread.sleep(seekCompleteCheckTicketMs);

Review Comment:
   It's not a Netty thread. 
   > Bookkeeper code includes Thread.sleep, but we should avoid that in Pulsar code base.
   
   change to schedule later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org