You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by "Technoboy- (via GitHub)" <gi...@apache.org> on 2023/08/09 02:48:01 UTC

[GitHub] [pulsar] Technoboy- opened a new pull request, #20963: [improve][client] Add backoff for `seek`

Technoboy- opened a new pull request, #20963:
URL: https://github.com/apache/pulsar/pull/20963

   ### Motivation
   Currently, only `seek` doesn't support backoff in consumer side, see :
   https://github.com/apache/pulsar/blob/2ab184e49a036a1dd10dc537bef4ab034a5ad5e0/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L2151-L2166
   
   
   ### Documentation
   
   <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
   
   - [ ] `doc` <!-- Your PR contains doc changes. -->
   - [ ] `doc-required` <!-- Your PR changes impact docs and you will update later -->
   - [x] `doc-not-needed` <!-- Your PR changes do not impact docs -->
   - [ ] `doc-complete` <!-- Docs have been already added -->
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codecov-commenter commented on pull request #20963: [improve][client] Add backoff for `seek`

Posted by "codecov-commenter (via GitHub)" <gi...@apache.org>.
codecov-commenter commented on PR #20963:
URL: https://github.com/apache/pulsar/pull/20963#issuecomment-1678365950

   ## [Codecov](https://app.codecov.io/gh/apache/pulsar/pull/20963?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) Report
   > Merging [#20963](https://app.codecov.io/gh/apache/pulsar/pull/20963?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) (1f7bdf4) into [master](https://app.codecov.io/gh/apache/pulsar/commit/2ab184e49a036a1dd10dc537bef4ab034a5ad5e0?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) (2ab184e) will **decrease** coverage by `3.80%`.
   > Report is 10 commits behind head on master.
   > The diff coverage is `53.96%`.
   
   [![Impacted file tree graph](https://app.codecov.io/gh/apache/pulsar/pull/20963/graphs/tree.svg?width=650&height=150&src=pr&token=acYqCpsK9J&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)](https://app.codecov.io/gh/apache/pulsar/pull/20963?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master   #20963      +/-   ##
   ============================================
   - Coverage     37.11%   33.32%   -3.80%     
   - Complexity    12250    12310      +60     
   ============================================
     Files          1698     1621      -77     
     Lines        129813   129555     -258     
     Branches      14156    14047     -109     
   ============================================
   - Hits          48179    43168    -5011     
   - Misses        75318    80807    +5489     
   + Partials       6316     5580     -736     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | inttests | `24.06% <41.26%> (-0.34%)` | :arrow_down: |
   | systests | `?` | |
   | unittests | `32.03% <53.96%> (-0.19%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Files Changed](https://app.codecov.io/gh/apache/pulsar/pull/20963?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Coverage Δ | |
   |---|---|---|
   | [...va/org/apache/pulsar/client/impl/ConsumerImpl.java](https://app.codecov.io/gh/apache/pulsar/pull/20963?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cHVsc2FyLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2NsaWVudC9pbXBsL0NvbnN1bWVySW1wbC5qYXZh) | `55.90% <53.96%> (+0.02%)` | :arrow_up: |
   
   ... and [413 files with indirect coverage changes](https://app.codecov.io/gh/apache/pulsar/pull/20963/indirect-changes?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- commented on pull request #20963: [improve][client] Add backoff for `seek`

Posted by "Technoboy- (via GitHub)" <gi...@apache.org>.
Technoboy- commented on PR #20963:
URL: https://github.com/apache/pulsar/pull/20963#issuecomment-1678425671

   > Is it possible to add a test to cover the new changes?
   
   Added test.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- closed pull request #20963: [improve][client] Add backoff for `seek`

Posted by "Technoboy- (via GitHub)" <gi...@apache.org>.
Technoboy- closed pull request #20963: [improve][client] Add backoff for `seek`
URL: https://github.com/apache/pulsar/pull/20963


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- closed pull request #20963: [improve][client] Add backoff for `seek`

Posted by "Technoboy- (via GitHub)" <gi...@apache.org>.
Technoboy- closed pull request #20963: [improve][client] Add backoff for `seek`
URL: https://github.com/apache/pulsar/pull/20963


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- closed pull request #20963: [improve][client] Add backoff for `seek`

Posted by "Technoboy- (via GitHub)" <gi...@apache.org>.
Technoboy- closed pull request #20963: [improve][client] Add backoff for `seek`
URL: https://github.com/apache/pulsar/pull/20963


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- commented on a diff in pull request #20963: [improve][client] Add backoff for `seek`

Posted by "Technoboy- (via GitHub)" <gi...@apache.org>.
Technoboy- commented on code in PR #20963:
URL: https://github.com/apache/pulsar/pull/20963#discussion_r1294181070


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -2148,100 +2148,108 @@ public CompletableFuture<Void> seekAsync(Function<String, Object> function) {
                 new PulsarClientException("Only support seek by messageId or timestamp"));
     }
 
-    private Optional<CompletableFuture<Void>> seekAsyncCheckState(String seekBy) {
-        if (getState() == State.Closing || getState() == State.Closed) {
-            return Optional.of(FutureUtil
-                    .failedFuture(new PulsarClientException.AlreadyClosedException(
-                            String.format("The consumer %s was already closed when seeking the subscription %s of the"
-                                    + " topic %s to %s", consumerName, subscription, topicName.toString(), seekBy))));
-        }
-
-        if (!isConnected()) {
-            return Optional.of(FutureUtil.failedFuture(new PulsarClientException(
-                    String.format("The client is not connected to the broker when seeking the subscription %s of the "
-                            + "topic %s to %s", subscription, topicName.toString(), seekBy))));
-        }
+    private CompletableFuture<Void> seekAsyncInternal(long requestId, ByteBuf seek, MessageId seekId, String seekBy) {
+        AtomicLong opTimeoutMs = new AtomicLong(client.getConfiguration().getOperationTimeoutMs());
+        Backoff backoff = new BackoffBuilder()
+                .setInitialTime(100, TimeUnit.MILLISECONDS)
+                .setMax(opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS)
+                .setMandatoryStop(0, TimeUnit.MILLISECONDS)
+                .create();
 
-        return Optional.empty();
+        CompletableFuture<Void> seekFuture = new CompletableFuture<>();
+        seekAsyncInternal(requestId, seek, seekId, seekBy, backoff, opTimeoutMs, seekFuture);
+        return seekFuture;
     }
 
-    private CompletableFuture<Void> seekAsyncInternal(long requestId, ByteBuf seek, MessageId seekId, String seekBy) {
-        final CompletableFuture<Void> seekFuture = new CompletableFuture<>();
+    private void seekAsyncInternal(long requestId, ByteBuf seek, MessageId seekId, String seekBy,
+                                   final Backoff backoff, final AtomicLong remainingTime,
+                                   CompletableFuture<Void> seekFuture) {
         ClientCnx cnx = cnx();
+        if (isConnected() && cnx != null) {
+            if (!duringSeek.compareAndSet(false, true)) {
+                final String message = String.format(
+                        "[%s][%s] attempting to seek operation that is already in progress (seek by %s)",
+                        topic, subscription, seekBy);
+                log.warn("[{}][{}] Attempting to seek operation that is already in progress, cancelling {}",
+                        topic, subscription, seekBy);
+                seekFuture.completeExceptionally(new IllegalStateException(message));
+                return;
+            }
+            MessageIdAdv originSeekMessageId = seekMessageId;
+            seekMessageId = (MessageIdAdv) seekId;
+            log.info("[{}][{}] Seeking subscription to {}", topic, subscription, seekBy);
 
-        if (!duringSeek.compareAndSet(false, true)) {
-            final String message = String.format(
-                    "[%s][%s] attempting to seek operation that is already in progress (seek by %s)",
-                    topic, subscription, seekBy);
-            log.warn("[{}][{}] Attempting to seek operation that is already in progress, cancelling {}",
-                    topic, subscription, seekBy);
-            seekFuture.completeExceptionally(new IllegalStateException(message));
-            return seekFuture;
-        }
-
-        MessageIdAdv originSeekMessageId = seekMessageId;
-        seekMessageId = (MessageIdAdv) seekId;
-        log.info("[{}][{}] Seeking subscription to {}", topic, subscription, seekBy);
-
-        cnx.sendRequestWithId(seek, requestId).thenRun(() -> {
-            log.info("[{}][{}] Successfully reset subscription to {}", topic, subscription, seekBy);
-            acknowledgmentsGroupingTracker.flushAndClean();
+            cnx.sendRequestWithId(seek, requestId).thenRun(() -> {
+                log.info("[{}][{}] Successfully reset subscription to {}", topic, subscription, seekBy);
+                acknowledgmentsGroupingTracker.flushAndClean();
 
-            lastDequeuedMessageId = MessageId.earliest;
+                lastDequeuedMessageId = MessageId.earliest;
 
-            clearIncomingMessages();
-            seekFuture.complete(null);
-        }).exceptionally(e -> {
-            // re-set duringSeek and seekMessageId if seek failed
-            seekMessageId = originSeekMessageId;
-            duringSeek.set(false);
-            log.error("[{}][{}] Failed to reset subscription: {}", topic, subscription, e.getCause().getMessage());
+                clearIncomingMessages();
+                seekFuture.complete(null);
+            }).exceptionally(e -> {

Review Comment:
   Ah, the server side will response with `MetadataError/UnknownError`, which we can't retry. I see other methods here, also not handle exception to take a retry.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #20963: [improve][client] Add backoff for `seek`

Posted by "codelipenghui (via GitHub)" <gi...@apache.org>.
codelipenghui commented on code in PR #20963:
URL: https://github.com/apache/pulsar/pull/20963#discussion_r1294124706


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -2148,100 +2148,108 @@ public CompletableFuture<Void> seekAsync(Function<String, Object> function) {
                 new PulsarClientException("Only support seek by messageId or timestamp"));
     }
 
-    private Optional<CompletableFuture<Void>> seekAsyncCheckState(String seekBy) {
-        if (getState() == State.Closing || getState() == State.Closed) {
-            return Optional.of(FutureUtil
-                    .failedFuture(new PulsarClientException.AlreadyClosedException(
-                            String.format("The consumer %s was already closed when seeking the subscription %s of the"
-                                    + " topic %s to %s", consumerName, subscription, topicName.toString(), seekBy))));
-        }
-
-        if (!isConnected()) {
-            return Optional.of(FutureUtil.failedFuture(new PulsarClientException(
-                    String.format("The client is not connected to the broker when seeking the subscription %s of the "
-                            + "topic %s to %s", subscription, topicName.toString(), seekBy))));
-        }
+    private CompletableFuture<Void> seekAsyncInternal(long requestId, ByteBuf seek, MessageId seekId, String seekBy) {
+        AtomicLong opTimeoutMs = new AtomicLong(client.getConfiguration().getOperationTimeoutMs());
+        Backoff backoff = new BackoffBuilder()
+                .setInitialTime(100, TimeUnit.MILLISECONDS)
+                .setMax(opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS)
+                .setMandatoryStop(0, TimeUnit.MILLISECONDS)
+                .create();
 
-        return Optional.empty();
+        CompletableFuture<Void> seekFuture = new CompletableFuture<>();
+        seekAsyncInternal(requestId, seek, seekId, seekBy, backoff, opTimeoutMs, seekFuture);
+        return seekFuture;
     }
 
-    private CompletableFuture<Void> seekAsyncInternal(long requestId, ByteBuf seek, MessageId seekId, String seekBy) {
-        final CompletableFuture<Void> seekFuture = new CompletableFuture<>();
+    private void seekAsyncInternal(long requestId, ByteBuf seek, MessageId seekId, String seekBy,
+                                   final Backoff backoff, final AtomicLong remainingTime,
+                                   CompletableFuture<Void> seekFuture) {
         ClientCnx cnx = cnx();
+        if (isConnected() && cnx != null) {
+            if (!duringSeek.compareAndSet(false, true)) {
+                final String message = String.format(
+                        "[%s][%s] attempting to seek operation that is already in progress (seek by %s)",
+                        topic, subscription, seekBy);
+                log.warn("[{}][{}] Attempting to seek operation that is already in progress, cancelling {}",
+                        topic, subscription, seekBy);
+                seekFuture.completeExceptionally(new IllegalStateException(message));
+                return;
+            }
+            MessageIdAdv originSeekMessageId = seekMessageId;
+            seekMessageId = (MessageIdAdv) seekId;
+            log.info("[{}][{}] Seeking subscription to {}", topic, subscription, seekBy);
 
-        if (!duringSeek.compareAndSet(false, true)) {
-            final String message = String.format(
-                    "[%s][%s] attempting to seek operation that is already in progress (seek by %s)",
-                    topic, subscription, seekBy);
-            log.warn("[{}][{}] Attempting to seek operation that is already in progress, cancelling {}",
-                    topic, subscription, seekBy);
-            seekFuture.completeExceptionally(new IllegalStateException(message));
-            return seekFuture;
-        }
-
-        MessageIdAdv originSeekMessageId = seekMessageId;
-        seekMessageId = (MessageIdAdv) seekId;
-        log.info("[{}][{}] Seeking subscription to {}", topic, subscription, seekBy);
-
-        cnx.sendRequestWithId(seek, requestId).thenRun(() -> {
-            log.info("[{}][{}] Successfully reset subscription to {}", topic, subscription, seekBy);
-            acknowledgmentsGroupingTracker.flushAndClean();
+            cnx.sendRequestWithId(seek, requestId).thenRun(() -> {
+                log.info("[{}][{}] Successfully reset subscription to {}", topic, subscription, seekBy);
+                acknowledgmentsGroupingTracker.flushAndClean();
 
-            lastDequeuedMessageId = MessageId.earliest;
+                lastDequeuedMessageId = MessageId.earliest;
 
-            clearIncomingMessages();
-            seekFuture.complete(null);
-        }).exceptionally(e -> {
-            // re-set duringSeek and seekMessageId if seek failed
-            seekMessageId = originSeekMessageId;
-            duringSeek.set(false);
-            log.error("[{}][{}] Failed to reset subscription: {}", topic, subscription, e.getCause().getMessage());
+                clearIncomingMessages();
+                seekFuture.complete(null);
+            }).exceptionally(e -> {

Review Comment:
   If the exception is retriable, I think we should also perform a backoff.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- merged pull request #20963: [improve][client] Add backoff for `seek`

Posted by "Technoboy- (via GitHub)" <gi...@apache.org>.
Technoboy- merged PR #20963:
URL: https://github.com/apache/pulsar/pull/20963


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org