You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2024/03/27 22:39:29 UTC

(pulsar) 01/05: [improve][client] Add backoff for `seek` (#20963)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 9ea7f6014b51635fc337acd0e264cf444cdd3a2a
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Mon Aug 21 15:10:49 2023 +0800

    [improve][client] Add backoff for `seek` (#20963)
    
    (cherry picked from commit ee91edc3e08c87db1e5662a553ff62af0c1886e5)
---
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 154 +++++++++++----------
 .../pulsar/client/impl/ConsumerImplTest.java       |  13 +-
 2 files changed, 93 insertions(+), 74 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index cfb5f309684..717355abbef 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -2225,100 +2225,108 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
                 new PulsarClientException("Only support seek by messageId or timestamp"));
     }
 
-    private Optional<CompletableFuture<Void>> seekAsyncCheckState(String seekBy) {
-        if (getState() == State.Closing || getState() == State.Closed) {
-            return Optional.of(FutureUtil
-                    .failedFuture(new PulsarClientException.AlreadyClosedException(
-                            String.format("The consumer %s was already closed when seeking the subscription %s of the"
-                                    + " topic %s to %s", consumerName, subscription, topicName.toString(), seekBy))));
-        }
-
-        if (!isConnected()) {
-            return Optional.of(FutureUtil.failedFuture(new PulsarClientException(
-                    String.format("The client is not connected to the broker when seeking the subscription %s of the "
-                            + "topic %s to %s", subscription, topicName.toString(), seekBy))));
-        }
+    private CompletableFuture<Void> seekAsyncInternal(long requestId, ByteBuf seek, MessageId seekId, String seekBy) {
+        AtomicLong opTimeoutMs = new AtomicLong(client.getConfiguration().getOperationTimeoutMs());
+        Backoff backoff = new BackoffBuilder()
+                .setInitialTime(100, TimeUnit.MILLISECONDS)
+                .setMax(opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS)
+                .setMandatoryStop(0, TimeUnit.MILLISECONDS)
+                .create();
 
-        return Optional.empty();
+        CompletableFuture<Void> seekFuture = new CompletableFuture<>();
+        seekAsyncInternal(requestId, seek, seekId, seekBy, backoff, opTimeoutMs, seekFuture);
+        return seekFuture;
     }
 
-    private CompletableFuture<Void> seekAsyncInternal(long requestId, ByteBuf seek, MessageId seekId, String seekBy) {
-        final CompletableFuture<Void> seekFuture = new CompletableFuture<>();
+    private void seekAsyncInternal(long requestId, ByteBuf seek, MessageId seekId, String seekBy,
+                                   final Backoff backoff, final AtomicLong remainingTime,
+                                   CompletableFuture<Void> seekFuture) {
         ClientCnx cnx = cnx();
+        if (isConnected() && cnx != null) {
+            if (!duringSeek.compareAndSet(false, true)) {
+                final String message = String.format(
+                        "[%s][%s] attempting to seek operation that is already in progress (seek by %s)",
+                        topic, subscription, seekBy);
+                log.warn("[{}][{}] Attempting to seek operation that is already in progress, cancelling {}",
+                        topic, subscription, seekBy);
+                seekFuture.completeExceptionally(new IllegalStateException(message));
+                return;
+            }
+            MessageIdAdv originSeekMessageId = seekMessageId;
+            seekMessageId = (MessageIdAdv) seekId;
+            log.info("[{}][{}] Seeking subscription to {}", topic, subscription, seekBy);
 
-        if (!duringSeek.compareAndSet(false, true)) {
-            final String message = String.format(
-                    "[%s][%s] attempting to seek operation that is already in progress (seek by %s)",
-                    topic, subscription, seekBy);
-            log.warn("[{}][{}] Attempting to seek operation that is already in progress, cancelling {}",
-                    topic, subscription, seekBy);
-            seekFuture.completeExceptionally(new IllegalStateException(message));
-            return seekFuture;
-        }
-
-        MessageIdAdv originSeekMessageId = seekMessageId;
-        seekMessageId = (MessageIdAdv) seekId;
-        log.info("[{}][{}] Seeking subscription to {}", topic, subscription, seekBy);
-
-        cnx.sendRequestWithId(seek, requestId).thenRun(() -> {
-            log.info("[{}][{}] Successfully reset subscription to {}", topic, subscription, seekBy);
-            acknowledgmentsGroupingTracker.flushAndClean();
+            cnx.sendRequestWithId(seek, requestId).thenRun(() -> {
+                log.info("[{}][{}] Successfully reset subscription to {}", topic, subscription, seekBy);
+                acknowledgmentsGroupingTracker.flushAndClean();
 
-            lastDequeuedMessageId = MessageId.earliest;
+                lastDequeuedMessageId = MessageId.earliest;
 
-            clearIncomingMessages();
-            seekFuture.complete(null);
-        }).exceptionally(e -> {
-            // re-set duringSeek and seekMessageId if seek failed
-            seekMessageId = originSeekMessageId;
-            duringSeek.set(false);
-            log.error("[{}][{}] Failed to reset subscription: {}", topic, subscription, e.getCause().getMessage());
+                clearIncomingMessages();
+                seekFuture.complete(null);
+            }).exceptionally(e -> {
+                // re-set duringSeek and seekMessageId if seek failed
+                seekMessageId = originSeekMessageId;
+                duringSeek.set(false);
+                log.error("[{}][{}] Failed to reset subscription: {}", topic, subscription, e.getCause().getMessage());
+
+                seekFuture.completeExceptionally(
+                        PulsarClientException.wrap(e.getCause(),
+                                String.format("Failed to seek the subscription %s of the topic %s to %s",
+                                        subscription, topicName.toString(), seekBy)));
+                return null;
+            });
+        } else {
+            long nextDelay = Math.min(backoff.next(), remainingTime.get());
+            if (nextDelay <= 0) {
+                seekFuture.completeExceptionally(
+                        new PulsarClientException.TimeoutException(
+                                String.format("The subscription %s of the topic %s could not seek "
+                                        + "withing configured timeout", subscription, topicName.toString())));
+                return;
+            }
 
-            seekFuture.completeExceptionally(
-                PulsarClientException.wrap(e.getCause(),
-                    String.format("Failed to seek the subscription %s of the topic %s to %s",
-                        subscription, topicName.toString(), seekBy)));
-            return null;
-        });
-        return seekFuture;
+            ((ScheduledExecutorService) client.getScheduledExecutorProvider().getExecutor()).schedule(() -> {
+                log.warn("[{}] [{}] Could not get connection while seek -- Will try again in {} ms",
+                        topic, getHandlerName(), nextDelay);
+                remainingTime.addAndGet(-nextDelay);
+                seekAsyncInternal(requestId, seek, seekId, seekBy, backoff, remainingTime, seekFuture);
+            }, nextDelay, TimeUnit.MILLISECONDS);
+        }
     }
 
     @Override
     public CompletableFuture<Void> seekAsync(long timestamp) {
         String seekBy = String.format("the timestamp %d", timestamp);
-        return seekAsyncCheckState(seekBy).orElseGet(() -> {
-            long requestId = client.newRequestId();
-            return seekAsyncInternal(requestId, Commands.newSeek(consumerId, requestId, timestamp),
+        long requestId = client.newRequestId();
+        return seekAsyncInternal(requestId, Commands.newSeek(consumerId, requestId, timestamp),
                 MessageId.earliest, seekBy);
-        });
     }
 
     @Override
     public CompletableFuture<Void> seekAsync(MessageId messageId) {
         String seekBy = String.format("the message %s", messageId.toString());
-        return seekAsyncCheckState(seekBy).orElseGet(() -> {
-            long requestId = client.newRequestId();
-            final MessageIdAdv msgId = (MessageIdAdv) messageId;
-            final MessageIdAdv firstChunkMsgId = msgId.getFirstChunkMessageId();
-            final ByteBuf seek;
-            if (msgId.getFirstChunkMessageId() != null) {
-                seek = Commands.newSeek(consumerId, requestId, firstChunkMsgId.getLedgerId(),
-                        firstChunkMsgId.getEntryId(), new long[0]);
+        long requestId = client.newRequestId();
+        final MessageIdAdv msgId = (MessageIdAdv) messageId;
+        final MessageIdAdv firstChunkMsgId = msgId.getFirstChunkMessageId();
+        final ByteBuf seek;
+        if (msgId.getFirstChunkMessageId() != null) {
+            seek = Commands.newSeek(consumerId, requestId, firstChunkMsgId.getLedgerId(),
+                    firstChunkMsgId.getEntryId(), new long[0]);
+        } else {
+            final long[] ackSetArr;
+            if (MessageIdAdvUtils.isBatch(msgId)) {
+                final BitSetRecyclable ackSet = BitSetRecyclable.create();
+                ackSet.set(0, msgId.getBatchSize());
+                ackSet.clear(0, Math.max(msgId.getBatchIndex(), 0));
+                ackSetArr = ackSet.toLongArray();
+                ackSet.recycle();
             } else {
-                final long[] ackSetArr;
-                if (MessageIdAdvUtils.isBatch(msgId)) {
-                    final BitSetRecyclable ackSet = BitSetRecyclable.create();
-                    ackSet.set(0, msgId.getBatchSize());
-                    ackSet.clear(0, Math.max(msgId.getBatchIndex(), 0));
-                    ackSetArr = ackSet.toLongArray();
-                    ackSet.recycle();
-                } else {
-                    ackSetArr = new long[0];
-                }
-                seek = Commands.newSeek(consumerId, requestId, msgId.getLedgerId(), msgId.getEntryId(), ackSetArr);
+                ackSetArr = new long[0];
             }
-            return seekAsyncInternal(requestId, seek, messageId, seekBy);
-        });
+            seek = Commands.newSeek(consumerId, requestId, msgId.getLedgerId(), msgId.getEntryId(), ackSetArr);
+        }
+        return seekAsyncInternal(requestId, seek, messageId, seekBy);
     }
 
     public boolean hasMessageAvailable() throws PulsarClientException {
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
index 5a223d5da15..070919c57a4 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.client.impl;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.atLeast;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
@@ -34,6 +35,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import lombok.Cleanup;
@@ -46,6 +48,7 @@ import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.client.impl.conf.TopicConsumerConfigurationData;
 import org.apache.pulsar.client.util.ExecutorProvider;
+import org.apache.pulsar.client.util.ScheduledExecutorProvider;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
@@ -263,13 +266,21 @@ public class ConsumerImplTest {
         assertThat(consumer.getPriorityLevel()).isEqualTo(1);
     }
 
-    @Test(invocationTimeOut = 1000)
+    @Test
     public void testSeekAsyncInternal() {
         // given
         ClientCnx cnx = mock(ClientCnx.class);
         CompletableFuture<ProducerResponse> clientReq = new CompletableFuture<>();
         when(cnx.sendRequestWithId(any(ByteBuf.class), anyLong())).thenReturn(clientReq);
 
+        ScheduledExecutorProvider provider = mock(ScheduledExecutorProvider.class);
+        ScheduledExecutorService scheduledExecutorService = mock(ScheduledExecutorService.class);
+        when(provider.getExecutor()).thenReturn(scheduledExecutorService);
+        when(consumer.getClient().getScheduledExecutorProvider()).thenReturn(provider);
+
+        CompletableFuture<Void> result = consumer.seekAsync(1L);
+        verify(scheduledExecutorService, atLeast(1)).schedule(any(Runnable.class), anyLong(), any(TimeUnit.class));
+
         consumer.setClientCnx(cnx);
         consumer.setState(HandlerState.State.Ready);