You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by "Technoboy- (via GitHub)" <gi...@apache.org> on 2023/08/09 02:48:01 UTC
[GitHub] [pulsar] Technoboy- opened a new pull request, #20963: [improve][client] Add backoff for `seek`
Technoboy- opened a new pull request, #20963:
URL: https://github.com/apache/pulsar/pull/20963
### Motivation
Currently, only `seek` doesn't support backoff in consumer side, see :
https://github.com/apache/pulsar/blob/2ab184e49a036a1dd10dc537bef4ab034a5ad5e0/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L2151-L2166
### Documentation
<!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
- [ ] `doc` <!-- Your PR contains doc changes. -->
- [ ] `doc-required` <!-- Your PR changes impact docs and you will update later -->
- [x] `doc-not-needed` <!-- Your PR changes do not impact docs -->
- [ ] `doc-complete` <!-- Docs have been already added -->
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] codecov-commenter commented on pull request #20963: [improve][client] Add backoff for `seek`
Posted by "codecov-commenter (via GitHub)" <gi...@apache.org>.
codecov-commenter commented on PR #20963:
URL: https://github.com/apache/pulsar/pull/20963#issuecomment-1678365950
## [Codecov](https://app.codecov.io/gh/apache/pulsar/pull/20963?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) Report
> Merging [#20963](https://app.codecov.io/gh/apache/pulsar/pull/20963?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) (1f7bdf4) into [master](https://app.codecov.io/gh/apache/pulsar/commit/2ab184e49a036a1dd10dc537bef4ab034a5ad5e0?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) (2ab184e) will **decrease** coverage by `3.80%`.
> Report is 10 commits behind head on master.
> The diff coverage is `53.96%`.
[![Impacted file tree graph](https://app.codecov.io/gh/apache/pulsar/pull/20963/graphs/tree.svg?width=650&height=150&src=pr&token=acYqCpsK9J&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)](https://app.codecov.io/gh/apache/pulsar/pull/20963?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
```diff
@@ Coverage Diff @@
## master #20963 +/- ##
============================================
- Coverage 37.11% 33.32% -3.80%
- Complexity 12250 12310 +60
============================================
Files 1698 1621 -77
Lines 129813 129555 -258
Branches 14156 14047 -109
============================================
- Hits 48179 43168 -5011
- Misses 75318 80807 +5489
+ Partials 6316 5580 -736
```
| Flag | Coverage Δ | |
|---|---|---|
| inttests | `24.06% <41.26%> (-0.34%)` | :arrow_down: |
| systests | `?` | |
| unittests | `32.03% <53.96%> (-0.19%)` | :arrow_down: |
Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#carryforward-flags-in-the-pull-request-comment) to find out more.
| [Files Changed](https://app.codecov.io/gh/apache/pulsar/pull/20963?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Coverage Δ | |
|---|---|---|
| [...va/org/apache/pulsar/client/impl/ConsumerImpl.java](https://app.codecov.io/gh/apache/pulsar/pull/20963?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cHVsc2FyLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2NsaWVudC9pbXBsL0NvbnN1bWVySW1wbC5qYXZh) | `55.90% <53.96%> (+0.02%)` | :arrow_up: |
... and [413 files with indirect coverage changes](https://app.codecov.io/gh/apache/pulsar/pull/20963/indirect-changes?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] Technoboy- commented on pull request #20963: [improve][client] Add backoff for `seek`
Posted by "Technoboy- (via GitHub)" <gi...@apache.org>.
Technoboy- commented on PR #20963:
URL: https://github.com/apache/pulsar/pull/20963#issuecomment-1678425671
> Is it possible to add a test to cover the new changes?
Added test.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] Technoboy- closed pull request #20963: [improve][client] Add backoff for `seek`
Posted by "Technoboy- (via GitHub)" <gi...@apache.org>.
Technoboy- closed pull request #20963: [improve][client] Add backoff for `seek`
URL: https://github.com/apache/pulsar/pull/20963
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] Technoboy- closed pull request #20963: [improve][client] Add backoff for `seek`
Posted by "Technoboy- (via GitHub)" <gi...@apache.org>.
Technoboy- closed pull request #20963: [improve][client] Add backoff for `seek`
URL: https://github.com/apache/pulsar/pull/20963
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] Technoboy- closed pull request #20963: [improve][client] Add backoff for `seek`
Posted by "Technoboy- (via GitHub)" <gi...@apache.org>.
Technoboy- closed pull request #20963: [improve][client] Add backoff for `seek`
URL: https://github.com/apache/pulsar/pull/20963
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] Technoboy- commented on a diff in pull request #20963: [improve][client] Add backoff for `seek`
Posted by "Technoboy- (via GitHub)" <gi...@apache.org>.
Technoboy- commented on code in PR #20963:
URL: https://github.com/apache/pulsar/pull/20963#discussion_r1294181070
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -2148,100 +2148,108 @@ public CompletableFuture<Void> seekAsync(Function<String, Object> function) {
new PulsarClientException("Only support seek by messageId or timestamp"));
}
- private Optional<CompletableFuture<Void>> seekAsyncCheckState(String seekBy) {
- if (getState() == State.Closing || getState() == State.Closed) {
- return Optional.of(FutureUtil
- .failedFuture(new PulsarClientException.AlreadyClosedException(
- String.format("The consumer %s was already closed when seeking the subscription %s of the"
- + " topic %s to %s", consumerName, subscription, topicName.toString(), seekBy))));
- }
-
- if (!isConnected()) {
- return Optional.of(FutureUtil.failedFuture(new PulsarClientException(
- String.format("The client is not connected to the broker when seeking the subscription %s of the "
- + "topic %s to %s", subscription, topicName.toString(), seekBy))));
- }
+ private CompletableFuture<Void> seekAsyncInternal(long requestId, ByteBuf seek, MessageId seekId, String seekBy) {
+ AtomicLong opTimeoutMs = new AtomicLong(client.getConfiguration().getOperationTimeoutMs());
+ Backoff backoff = new BackoffBuilder()
+ .setInitialTime(100, TimeUnit.MILLISECONDS)
+ .setMax(opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS)
+ .setMandatoryStop(0, TimeUnit.MILLISECONDS)
+ .create();
- return Optional.empty();
+ CompletableFuture<Void> seekFuture = new CompletableFuture<>();
+ seekAsyncInternal(requestId, seek, seekId, seekBy, backoff, opTimeoutMs, seekFuture);
+ return seekFuture;
}
- private CompletableFuture<Void> seekAsyncInternal(long requestId, ByteBuf seek, MessageId seekId, String seekBy) {
- final CompletableFuture<Void> seekFuture = new CompletableFuture<>();
+ private void seekAsyncInternal(long requestId, ByteBuf seek, MessageId seekId, String seekBy,
+ final Backoff backoff, final AtomicLong remainingTime,
+ CompletableFuture<Void> seekFuture) {
ClientCnx cnx = cnx();
+ if (isConnected() && cnx != null) {
+ if (!duringSeek.compareAndSet(false, true)) {
+ final String message = String.format(
+ "[%s][%s] attempting to seek operation that is already in progress (seek by %s)",
+ topic, subscription, seekBy);
+ log.warn("[{}][{}] Attempting to seek operation that is already in progress, cancelling {}",
+ topic, subscription, seekBy);
+ seekFuture.completeExceptionally(new IllegalStateException(message));
+ return;
+ }
+ MessageIdAdv originSeekMessageId = seekMessageId;
+ seekMessageId = (MessageIdAdv) seekId;
+ log.info("[{}][{}] Seeking subscription to {}", topic, subscription, seekBy);
- if (!duringSeek.compareAndSet(false, true)) {
- final String message = String.format(
- "[%s][%s] attempting to seek operation that is already in progress (seek by %s)",
- topic, subscription, seekBy);
- log.warn("[{}][{}] Attempting to seek operation that is already in progress, cancelling {}",
- topic, subscription, seekBy);
- seekFuture.completeExceptionally(new IllegalStateException(message));
- return seekFuture;
- }
-
- MessageIdAdv originSeekMessageId = seekMessageId;
- seekMessageId = (MessageIdAdv) seekId;
- log.info("[{}][{}] Seeking subscription to {}", topic, subscription, seekBy);
-
- cnx.sendRequestWithId(seek, requestId).thenRun(() -> {
- log.info("[{}][{}] Successfully reset subscription to {}", topic, subscription, seekBy);
- acknowledgmentsGroupingTracker.flushAndClean();
+ cnx.sendRequestWithId(seek, requestId).thenRun(() -> {
+ log.info("[{}][{}] Successfully reset subscription to {}", topic, subscription, seekBy);
+ acknowledgmentsGroupingTracker.flushAndClean();
- lastDequeuedMessageId = MessageId.earliest;
+ lastDequeuedMessageId = MessageId.earliest;
- clearIncomingMessages();
- seekFuture.complete(null);
- }).exceptionally(e -> {
- // re-set duringSeek and seekMessageId if seek failed
- seekMessageId = originSeekMessageId;
- duringSeek.set(false);
- log.error("[{}][{}] Failed to reset subscription: {}", topic, subscription, e.getCause().getMessage());
+ clearIncomingMessages();
+ seekFuture.complete(null);
+ }).exceptionally(e -> {
Review Comment:
Ah, the server side will response with `MetadataError/UnknownError`, which we can't retry. I see other methods here, also not handle exception to take a retry.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] codelipenghui commented on a diff in pull request #20963: [improve][client] Add backoff for `seek`
Posted by "codelipenghui (via GitHub)" <gi...@apache.org>.
codelipenghui commented on code in PR #20963:
URL: https://github.com/apache/pulsar/pull/20963#discussion_r1294124706
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -2148,100 +2148,108 @@ public CompletableFuture<Void> seekAsync(Function<String, Object> function) {
new PulsarClientException("Only support seek by messageId or timestamp"));
}
- private Optional<CompletableFuture<Void>> seekAsyncCheckState(String seekBy) {
- if (getState() == State.Closing || getState() == State.Closed) {
- return Optional.of(FutureUtil
- .failedFuture(new PulsarClientException.AlreadyClosedException(
- String.format("The consumer %s was already closed when seeking the subscription %s of the"
- + " topic %s to %s", consumerName, subscription, topicName.toString(), seekBy))));
- }
-
- if (!isConnected()) {
- return Optional.of(FutureUtil.failedFuture(new PulsarClientException(
- String.format("The client is not connected to the broker when seeking the subscription %s of the "
- + "topic %s to %s", subscription, topicName.toString(), seekBy))));
- }
+ private CompletableFuture<Void> seekAsyncInternal(long requestId, ByteBuf seek, MessageId seekId, String seekBy) {
+ AtomicLong opTimeoutMs = new AtomicLong(client.getConfiguration().getOperationTimeoutMs());
+ Backoff backoff = new BackoffBuilder()
+ .setInitialTime(100, TimeUnit.MILLISECONDS)
+ .setMax(opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS)
+ .setMandatoryStop(0, TimeUnit.MILLISECONDS)
+ .create();
- return Optional.empty();
+ CompletableFuture<Void> seekFuture = new CompletableFuture<>();
+ seekAsyncInternal(requestId, seek, seekId, seekBy, backoff, opTimeoutMs, seekFuture);
+ return seekFuture;
}
- private CompletableFuture<Void> seekAsyncInternal(long requestId, ByteBuf seek, MessageId seekId, String seekBy) {
- final CompletableFuture<Void> seekFuture = new CompletableFuture<>();
+ private void seekAsyncInternal(long requestId, ByteBuf seek, MessageId seekId, String seekBy,
+ final Backoff backoff, final AtomicLong remainingTime,
+ CompletableFuture<Void> seekFuture) {
ClientCnx cnx = cnx();
+ if (isConnected() && cnx != null) {
+ if (!duringSeek.compareAndSet(false, true)) {
+ final String message = String.format(
+ "[%s][%s] attempting to seek operation that is already in progress (seek by %s)",
+ topic, subscription, seekBy);
+ log.warn("[{}][{}] Attempting to seek operation that is already in progress, cancelling {}",
+ topic, subscription, seekBy);
+ seekFuture.completeExceptionally(new IllegalStateException(message));
+ return;
+ }
+ MessageIdAdv originSeekMessageId = seekMessageId;
+ seekMessageId = (MessageIdAdv) seekId;
+ log.info("[{}][{}] Seeking subscription to {}", topic, subscription, seekBy);
- if (!duringSeek.compareAndSet(false, true)) {
- final String message = String.format(
- "[%s][%s] attempting to seek operation that is already in progress (seek by %s)",
- topic, subscription, seekBy);
- log.warn("[{}][{}] Attempting to seek operation that is already in progress, cancelling {}",
- topic, subscription, seekBy);
- seekFuture.completeExceptionally(new IllegalStateException(message));
- return seekFuture;
- }
-
- MessageIdAdv originSeekMessageId = seekMessageId;
- seekMessageId = (MessageIdAdv) seekId;
- log.info("[{}][{}] Seeking subscription to {}", topic, subscription, seekBy);
-
- cnx.sendRequestWithId(seek, requestId).thenRun(() -> {
- log.info("[{}][{}] Successfully reset subscription to {}", topic, subscription, seekBy);
- acknowledgmentsGroupingTracker.flushAndClean();
+ cnx.sendRequestWithId(seek, requestId).thenRun(() -> {
+ log.info("[{}][{}] Successfully reset subscription to {}", topic, subscription, seekBy);
+ acknowledgmentsGroupingTracker.flushAndClean();
- lastDequeuedMessageId = MessageId.earliest;
+ lastDequeuedMessageId = MessageId.earliest;
- clearIncomingMessages();
- seekFuture.complete(null);
- }).exceptionally(e -> {
- // re-set duringSeek and seekMessageId if seek failed
- seekMessageId = originSeekMessageId;
- duringSeek.set(false);
- log.error("[{}][{}] Failed to reset subscription: {}", topic, subscription, e.getCause().getMessage());
+ clearIncomingMessages();
+ seekFuture.complete(null);
+ }).exceptionally(e -> {
Review Comment:
If the exception is retriable, I think we should also perform a backoff.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] Technoboy- merged pull request #20963: [improve][client] Add backoff for `seek`
Posted by "Technoboy- (via GitHub)" <gi...@apache.org>.
Technoboy- merged PR #20963:
URL: https://github.com/apache/pulsar/pull/20963
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org