You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/12/11 14:27:19 UTC

[pulsar] 03/10: [pulsar-client] Refactor seek to reuse common logic. (#9670)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 321be3c4ab5135c38e4a9844014db628743949c6
Author: Xiaoguang Sun <su...@users.noreply.github.com>
AuthorDate: Tue Feb 23 21:44:13 2021 +0800

    [pulsar-client] Refactor seek to reuse common logic. (#9670)
    
    ### Motivation
    Refactor seek to reuse common logic so adding expression based seek later on would be easier.
    
    (cherry picked from commit ed4e971b22a0c2c96bee0946cb2be447ca397e43)
---
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 119 ++++++++-------------
 1 file changed, 46 insertions(+), 73 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 35c6618..3527c3c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1895,34 +1895,34 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         }
     }
 
-    @Override
-    public CompletableFuture<Void> seekAsync(long timestamp) {
+    private Optional<CompletableFuture<Void>> seekAsyncCheckState(String seekBy) {
         if (getState() == State.Closing || getState() == State.Closed) {
-            return FutureUtil
-                .failedFuture(new PulsarClientException.AlreadyClosedException(
-                    String.format("The consumer %s was already closed when seeking the subscription %s of the topic " +
-                        "%s to the timestamp %d", consumerName, subscription, topicName.toString(), timestamp)));
+            return Optional.of(FutureUtil
+                    .failedFuture(new PulsarClientException.AlreadyClosedException(
+                            String.format("The consumer %s was already closed when seeking the subscription %s of the"
+                                    + " topic %s to %s", consumerName, subscription, topicName.toString(), seekBy))));
         }
 
         if (!isConnected()) {
-            return FutureUtil.failedFuture(new PulsarClientException(
-                String.format("The client is not connected to the broker when seeking the subscription %s of the " +
-                    "topic %s to the timestamp %d", subscription, topicName.toString(), timestamp)));
+            return Optional.of(FutureUtil.failedFuture(new PulsarClientException(
+                    String.format("The client is not connected to the broker when seeking the subscription %s of the "
+                            + "topic %s to %s", subscription, topicName.toString(), seekBy))));
         }
 
-        final CompletableFuture<Void> seekFuture = new CompletableFuture<>();
+        return Optional.empty();
+    }
 
-        long requestId = client.newRequestId();
-        ByteBuf seek = Commands.newSeek(consumerId, requestId, timestamp);
+    private CompletableFuture<Void> seekAsyncInternal(long requestId, ByteBuf seek, MessageId seekId, String seekBy) {
+        final CompletableFuture<Void> seekFuture = new CompletableFuture<>();
         ClientCnx cnx = cnx();
 
-        log.info("[{}][{}] Seek subscription to publish time {}", topic, subscription, timestamp);
+        log.info("[{}][{}] Seek subscription to {}", topic, subscription, seekBy);
 
         cnx.sendRequestWithId(seek, requestId).thenRun(() -> {
-            log.info("[{}][{}] Successfully reset subscription to publish time {}", topic, subscription, timestamp);
+            log.info("[{}][{}] Successfully reset subscription to {}", topic, subscription, seekBy);
             acknowledgmentsGroupingTracker.flushAndClean();
 
-            seekMessageId = new BatchMessageIdImpl((MessageIdImpl) MessageId.earliest);
+            seekMessageId = new BatchMessageIdImpl((MessageIdImpl) seekId);
             duringSeek.set(true);
             lastDequeuedMessageId = MessageId.earliest;
 
@@ -1933,72 +1933,45 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
             log.error("[{}][{}] Failed to reset subscription: {}", topic, subscription, e.getCause().getMessage());
             seekFuture.completeExceptionally(
                 PulsarClientException.wrap(e.getCause(),
-                    String.format("Failed to seek the subscription %s of the topic %s to the timestamp %d",
-                        subscription, topicName.toString(), timestamp)));
+                    String.format("Failed to seek the subscription %s of the topic %s to %s",
+                        subscription, topicName.toString(), seekBy)));
             return null;
         });
         return seekFuture;
     }
 
     @Override
-    public CompletableFuture<Void> seekAsync(MessageId messageId) {
-        if (getState() == State.Closing || getState() == State.Closed) {
-            return FutureUtil
-                .failedFuture(new PulsarClientException.AlreadyClosedException(
-                    String.format("The consumer %s was already closed when seeking the subscription %s of the topic " +
-                            "%s to the message %s", consumerName, subscription, topicName.toString(),
-                        messageId.toString())));
-        }
-
-        if (!isConnected()) {
-            return FutureUtil.failedFuture(new PulsarClientException(
-                    String.format("The client is not connected to the broker when seeking the subscription %s of the " +
-                            "topic %s to the message %s", subscription, topicName.toString(), messageId.toString())));
-        }
-
-        final CompletableFuture<Void> seekFuture = new CompletableFuture<>();
-
-        long requestId = client.newRequestId();
-        ByteBuf seek = null;
-        if (messageId instanceof BatchMessageIdImpl) {
-            BatchMessageIdImpl msgId = (BatchMessageIdImpl) messageId;
-            // Initialize ack set
-            BitSetRecyclable ackSet = BitSetRecyclable.create();
-            ackSet.set(0, msgId.getBatchSize());
-            ackSet.clear(0, Math.max(msgId.getBatchIndex(), 0));
-            long[] ackSetArr = ackSet.toLongArray();
-            ackSet.recycle();
-
-            seek = Commands.newSeek(consumerId, requestId, msgId.getLedgerId(), msgId.getEntryId(), ackSetArr);
-        } else {
-            MessageIdImpl msgId = (MessageIdImpl) messageId;
-            seek = Commands.newSeek(consumerId, requestId, msgId.getLedgerId(), msgId.getEntryId(), new long[0]);
-        }
-
-        ClientCnx cnx = cnx();
-
-        log.info("[{}][{}] Seek subscription to message id {}", topic, subscription, messageId);
-
-        cnx.sendRequestWithId(seek, requestId).thenRun(() -> {
-            log.info("[{}][{}] Successfully reset subscription to message id {}", topic, subscription, messageId);
-            acknowledgmentsGroupingTracker.flushAndClean();
-
-            seekMessageId = new BatchMessageIdImpl((MessageIdImpl) messageId);
-            duringSeek.set(true);
-            lastDequeuedMessageId = MessageId.earliest;
+    public CompletableFuture<Void> seekAsync(long timestamp) {
+        String seekBy = String.format("the timestamp %d", timestamp);
+        return seekAsyncCheckState(seekBy).orElseGet(() -> {
+            long requestId = client.newRequestId();
+            return seekAsyncInternal(requestId, Commands.newSeek(consumerId, requestId, timestamp),
+                MessageId.earliest, seekBy);
+        });
+    }
 
-            incomingMessages.clear();
-            resetIncomingMessageSize();
-            seekFuture.complete(null);
-        }).exceptionally(e -> {
-            log.error("[{}][{}] Failed to reset subscription: {}", topic, subscription, e.getCause().getMessage());
-            seekFuture.completeExceptionally(
-                PulsarClientException.wrap(e.getCause(),
-                    String.format("Failed to seek the subscription %s of the topic %s to the message %s",
-                        subscription, topicName.toString(), messageId.toString())));
-            return null;
+    @Override
+    public CompletableFuture<Void> seekAsync(MessageId messageId) {
+        String seekBy = String.format("the message %s", messageId.toString());
+        return seekAsyncCheckState(seekBy).orElseGet(() -> {
+            long requestId = client.newRequestId();
+            ByteBuf seek = null;
+            if (messageId instanceof BatchMessageIdImpl) {
+                BatchMessageIdImpl msgId = (BatchMessageIdImpl) messageId;
+                // Initialize ack set
+                BitSetRecyclable ackSet = BitSetRecyclable.create();
+                ackSet.set(0, msgId.getBatchSize());
+                ackSet.clear(0, Math.max(msgId.getBatchIndex(), 0));
+                long[] ackSetArr = ackSet.toLongArray();
+                ackSet.recycle();
+
+                seek = Commands.newSeek(consumerId, requestId, msgId.getLedgerId(), msgId.getEntryId(), ackSetArr);
+            } else {
+                MessageIdImpl msgId = (MessageIdImpl) messageId;
+                seek = Commands.newSeek(consumerId, requestId, msgId.getLedgerId(), msgId.getEntryId(), new long[0]);
+            }
+            return seekAsyncInternal(requestId, seek, messageId, seekBy);
         });
-        return seekFuture;
     }
 
     public boolean hasMessageAvailable() throws PulsarClientException {