You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2023/08/21 07:10:56 UTC
[pulsar] branch master updated: [improve][client] Add backoff for `seek` (#20963)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ee91edc3e08 [improve][client] Add backoff for `seek` (#20963)
ee91edc3e08 is described below
commit ee91edc3e08c87db1e5662a553ff62af0c1886e5
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Mon Aug 21 15:10:49 2023 +0800
[improve][client] Add backoff for `seek` (#20963)
---
.../apache/pulsar/client/impl/ConsumerImpl.java | 154 +++++++++++----------
.../pulsar/client/impl/ConsumerImplTest.java | 13 +-
2 files changed, 93 insertions(+), 74 deletions(-)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index a929fe9aa6b..551950ebc58 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -2148,100 +2148,108 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
new PulsarClientException("Only support seek by messageId or timestamp"));
}
- private Optional<CompletableFuture<Void>> seekAsyncCheckState(String seekBy) {
- if (getState() == State.Closing || getState() == State.Closed) {
- return Optional.of(FutureUtil
- .failedFuture(new PulsarClientException.AlreadyClosedException(
- String.format("The consumer %s was already closed when seeking the subscription %s of the"
- + " topic %s to %s", consumerName, subscription, topicName.toString(), seekBy))));
- }
-
- if (!isConnected()) {
- return Optional.of(FutureUtil.failedFuture(new PulsarClientException(
- String.format("The client is not connected to the broker when seeking the subscription %s of the "
- + "topic %s to %s", subscription, topicName.toString(), seekBy))));
- }
+ private CompletableFuture<Void> seekAsyncInternal(long requestId, ByteBuf seek, MessageId seekId, String seekBy) {
+ AtomicLong opTimeoutMs = new AtomicLong(client.getConfiguration().getOperationTimeoutMs());
+ Backoff backoff = new BackoffBuilder()
+ .setInitialTime(100, TimeUnit.MILLISECONDS)
+ .setMax(opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS)
+ .setMandatoryStop(0, TimeUnit.MILLISECONDS)
+ .create();
- return Optional.empty();
+ CompletableFuture<Void> seekFuture = new CompletableFuture<>();
+ seekAsyncInternal(requestId, seek, seekId, seekBy, backoff, opTimeoutMs, seekFuture);
+ return seekFuture;
}
- private CompletableFuture<Void> seekAsyncInternal(long requestId, ByteBuf seek, MessageId seekId, String seekBy) {
- final CompletableFuture<Void> seekFuture = new CompletableFuture<>();
+ private void seekAsyncInternal(long requestId, ByteBuf seek, MessageId seekId, String seekBy,
+ final Backoff backoff, final AtomicLong remainingTime,
+ CompletableFuture<Void> seekFuture) {
ClientCnx cnx = cnx();
+ if (isConnected() && cnx != null) {
+ if (!duringSeek.compareAndSet(false, true)) {
+ final String message = String.format(
+ "[%s][%s] attempting to seek operation that is already in progress (seek by %s)",
+ topic, subscription, seekBy);
+ log.warn("[{}][{}] Attempting to seek operation that is already in progress, cancelling {}",
+ topic, subscription, seekBy);
+ seekFuture.completeExceptionally(new IllegalStateException(message));
+ return;
+ }
+ MessageIdAdv originSeekMessageId = seekMessageId;
+ seekMessageId = (MessageIdAdv) seekId;
+ log.info("[{}][{}] Seeking subscription to {}", topic, subscription, seekBy);
- if (!duringSeek.compareAndSet(false, true)) {
- final String message = String.format(
- "[%s][%s] attempting to seek operation that is already in progress (seek by %s)",
- topic, subscription, seekBy);
- log.warn("[{}][{}] Attempting to seek operation that is already in progress, cancelling {}",
- topic, subscription, seekBy);
- seekFuture.completeExceptionally(new IllegalStateException(message));
- return seekFuture;
- }
-
- MessageIdAdv originSeekMessageId = seekMessageId;
- seekMessageId = (MessageIdAdv) seekId;
- log.info("[{}][{}] Seeking subscription to {}", topic, subscription, seekBy);
-
- cnx.sendRequestWithId(seek, requestId).thenRun(() -> {
- log.info("[{}][{}] Successfully reset subscription to {}", topic, subscription, seekBy);
- acknowledgmentsGroupingTracker.flushAndClean();
+ cnx.sendRequestWithId(seek, requestId).thenRun(() -> {
+ log.info("[{}][{}] Successfully reset subscription to {}", topic, subscription, seekBy);
+ acknowledgmentsGroupingTracker.flushAndClean();
- lastDequeuedMessageId = MessageId.earliest;
+ lastDequeuedMessageId = MessageId.earliest;
- clearIncomingMessages();
- seekFuture.complete(null);
- }).exceptionally(e -> {
- // re-set duringSeek and seekMessageId if seek failed
- seekMessageId = originSeekMessageId;
- duringSeek.set(false);
- log.error("[{}][{}] Failed to reset subscription: {}", topic, subscription, e.getCause().getMessage());
+ clearIncomingMessages();
+ seekFuture.complete(null);
+ }).exceptionally(e -> {
+ // re-set duringSeek and seekMessageId if seek failed
+ seekMessageId = originSeekMessageId;
+ duringSeek.set(false);
+ log.error("[{}][{}] Failed to reset subscription: {}", topic, subscription, e.getCause().getMessage());
+
+ seekFuture.completeExceptionally(
+ PulsarClientException.wrap(e.getCause(),
+ String.format("Failed to seek the subscription %s of the topic %s to %s",
+ subscription, topicName.toString(), seekBy)));
+ return null;
+ });
+ } else {
+ long nextDelay = Math.min(backoff.next(), remainingTime.get());
+ if (nextDelay <= 0) {
+ seekFuture.completeExceptionally(
+ new PulsarClientException.TimeoutException(
+ String.format("The subscription %s of the topic %s could not seek "
+ + "withing configured timeout", subscription, topicName.toString())));
+ return;
+ }
- seekFuture.completeExceptionally(
- PulsarClientException.wrap(e.getCause(),
- String.format("Failed to seek the subscription %s of the topic %s to %s",
- subscription, topicName.toString(), seekBy)));
- return null;
- });
- return seekFuture;
+ ((ScheduledExecutorService) client.getScheduledExecutorProvider().getExecutor()).schedule(() -> {
+ log.warn("[{}] [{}] Could not get connection while seek -- Will try again in {} ms",
+ topic, getHandlerName(), nextDelay);
+ remainingTime.addAndGet(-nextDelay);
+ seekAsyncInternal(requestId, seek, seekId, seekBy, backoff, remainingTime, seekFuture);
+ }, nextDelay, TimeUnit.MILLISECONDS);
+ }
}
@Override
public CompletableFuture<Void> seekAsync(long timestamp) {
String seekBy = String.format("the timestamp %d", timestamp);
- return seekAsyncCheckState(seekBy).orElseGet(() -> {
- long requestId = client.newRequestId();
- return seekAsyncInternal(requestId, Commands.newSeek(consumerId, requestId, timestamp),
+ long requestId = client.newRequestId();
+ return seekAsyncInternal(requestId, Commands.newSeek(consumerId, requestId, timestamp),
MessageId.earliest, seekBy);
- });
}
@Override
public CompletableFuture<Void> seekAsync(MessageId messageId) {
String seekBy = String.format("the message %s", messageId.toString());
- return seekAsyncCheckState(seekBy).orElseGet(() -> {
- long requestId = client.newRequestId();
- final MessageIdAdv msgId = (MessageIdAdv) messageId;
- final MessageIdAdv firstChunkMsgId = msgId.getFirstChunkMessageId();
- final ByteBuf seek;
- if (msgId.getFirstChunkMessageId() != null) {
- seek = Commands.newSeek(consumerId, requestId, firstChunkMsgId.getLedgerId(),
- firstChunkMsgId.getEntryId(), new long[0]);
+ long requestId = client.newRequestId();
+ final MessageIdAdv msgId = (MessageIdAdv) messageId;
+ final MessageIdAdv firstChunkMsgId = msgId.getFirstChunkMessageId();
+ final ByteBuf seek;
+ if (msgId.getFirstChunkMessageId() != null) {
+ seek = Commands.newSeek(consumerId, requestId, firstChunkMsgId.getLedgerId(),
+ firstChunkMsgId.getEntryId(), new long[0]);
+ } else {
+ final long[] ackSetArr;
+ if (MessageIdAdvUtils.isBatch(msgId)) {
+ final BitSetRecyclable ackSet = BitSetRecyclable.create();
+ ackSet.set(0, msgId.getBatchSize());
+ ackSet.clear(0, Math.max(msgId.getBatchIndex(), 0));
+ ackSetArr = ackSet.toLongArray();
+ ackSet.recycle();
} else {
- final long[] ackSetArr;
- if (MessageIdAdvUtils.isBatch(msgId)) {
- final BitSetRecyclable ackSet = BitSetRecyclable.create();
- ackSet.set(0, msgId.getBatchSize());
- ackSet.clear(0, Math.max(msgId.getBatchIndex(), 0));
- ackSetArr = ackSet.toLongArray();
- ackSet.recycle();
- } else {
- ackSetArr = new long[0];
- }
- seek = Commands.newSeek(consumerId, requestId, msgId.getLedgerId(), msgId.getEntryId(), ackSetArr);
+ ackSetArr = new long[0];
}
- return seekAsyncInternal(requestId, seek, messageId, seekBy);
- });
+ seek = Commands.newSeek(consumerId, requestId, msgId.getLedgerId(), msgId.getEntryId(), ackSetArr);
+ }
+ return seekAsyncInternal(requestId, seek, messageId, seekBy);
}
public boolean hasMessageAvailable() throws PulsarClientException {
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
index 5a223d5da15..070919c57a4 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.client.impl;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
@@ -34,6 +35,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
@@ -46,6 +48,7 @@ import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.conf.TopicConsumerConfigurationData;
import org.apache.pulsar.client.util.ExecutorProvider;
+import org.apache.pulsar.client.util.ScheduledExecutorProvider;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
@@ -263,13 +266,21 @@ public class ConsumerImplTest {
assertThat(consumer.getPriorityLevel()).isEqualTo(1);
}
- @Test(invocationTimeOut = 1000)
+ @Test
public void testSeekAsyncInternal() {
// given
ClientCnx cnx = mock(ClientCnx.class);
CompletableFuture<ProducerResponse> clientReq = new CompletableFuture<>();
when(cnx.sendRequestWithId(any(ByteBuf.class), anyLong())).thenReturn(clientReq);
+ ScheduledExecutorProvider provider = mock(ScheduledExecutorProvider.class);
+ ScheduledExecutorService scheduledExecutorService = mock(ScheduledExecutorService.class);
+ when(provider.getExecutor()).thenReturn(scheduledExecutorService);
+ when(consumer.getClient().getScheduledExecutorProvider()).thenReturn(provider);
+
+ CompletableFuture<Void> result = consumer.seekAsync(1L);
+ verify(scheduledExecutorService, atLeast(1)).schedule(any(Runnable.class), anyLong(), any(TimeUnit.class));
+
consumer.setClientCnx(cnx);
consumer.setState(HandlerState.State.Ready);