You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/12/11 14:27:19 UTC
[pulsar] 03/10: [pulsar-client] Refactor seek to reuse common logic. (#9670)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 321be3c4ab5135c38e4a9844014db628743949c6
Author: Xiaoguang Sun <su...@users.noreply.github.com>
AuthorDate: Tue Feb 23 21:44:13 2021 +0800
[pulsar-client] Refactor seek to reuse common logic. (#9670)
### Motivation
Refactor seek to reuse common logic so adding expression based seek later on would be easier.
(cherry picked from commit ed4e971b22a0c2c96bee0946cb2be447ca397e43)
---
.../apache/pulsar/client/impl/ConsumerImpl.java | 119 ++++++++-------------
1 file changed, 46 insertions(+), 73 deletions(-)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 35c6618..3527c3c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1895,34 +1895,34 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
}
}
- @Override
- public CompletableFuture<Void> seekAsync(long timestamp) {
+ private Optional<CompletableFuture<Void>> seekAsyncCheckState(String seekBy) {
if (getState() == State.Closing || getState() == State.Closed) {
- return FutureUtil
- .failedFuture(new PulsarClientException.AlreadyClosedException(
- String.format("The consumer %s was already closed when seeking the subscription %s of the topic " +
- "%s to the timestamp %d", consumerName, subscription, topicName.toString(), timestamp)));
+ return Optional.of(FutureUtil
+ .failedFuture(new PulsarClientException.AlreadyClosedException(
+ String.format("The consumer %s was already closed when seeking the subscription %s of the"
+ + " topic %s to %s", consumerName, subscription, topicName.toString(), seekBy))));
}
if (!isConnected()) {
- return FutureUtil.failedFuture(new PulsarClientException(
- String.format("The client is not connected to the broker when seeking the subscription %s of the " +
- "topic %s to the timestamp %d", subscription, topicName.toString(), timestamp)));
+ return Optional.of(FutureUtil.failedFuture(new PulsarClientException(
+ String.format("The client is not connected to the broker when seeking the subscription %s of the "
+ + "topic %s to %s", subscription, topicName.toString(), seekBy))));
}
- final CompletableFuture<Void> seekFuture = new CompletableFuture<>();
+ return Optional.empty();
+ }
- long requestId = client.newRequestId();
- ByteBuf seek = Commands.newSeek(consumerId, requestId, timestamp);
+ private CompletableFuture<Void> seekAsyncInternal(long requestId, ByteBuf seek, MessageId seekId, String seekBy) {
+ final CompletableFuture<Void> seekFuture = new CompletableFuture<>();
ClientCnx cnx = cnx();
- log.info("[{}][{}] Seek subscription to publish time {}", topic, subscription, timestamp);
+ log.info("[{}][{}] Seek subscription to {}", topic, subscription, seekBy);
cnx.sendRequestWithId(seek, requestId).thenRun(() -> {
- log.info("[{}][{}] Successfully reset subscription to publish time {}", topic, subscription, timestamp);
+ log.info("[{}][{}] Successfully reset subscription to {}", topic, subscription, seekBy);
acknowledgmentsGroupingTracker.flushAndClean();
- seekMessageId = new BatchMessageIdImpl((MessageIdImpl) MessageId.earliest);
+ seekMessageId = new BatchMessageIdImpl((MessageIdImpl) seekId);
duringSeek.set(true);
lastDequeuedMessageId = MessageId.earliest;
@@ -1933,72 +1933,45 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
log.error("[{}][{}] Failed to reset subscription: {}", topic, subscription, e.getCause().getMessage());
seekFuture.completeExceptionally(
PulsarClientException.wrap(e.getCause(),
- String.format("Failed to seek the subscription %s of the topic %s to the timestamp %d",
- subscription, topicName.toString(), timestamp)));
+ String.format("Failed to seek the subscription %s of the topic %s to %s",
+ subscription, topicName.toString(), seekBy)));
return null;
});
return seekFuture;
}
@Override
- public CompletableFuture<Void> seekAsync(MessageId messageId) {
- if (getState() == State.Closing || getState() == State.Closed) {
- return FutureUtil
- .failedFuture(new PulsarClientException.AlreadyClosedException(
- String.format("The consumer %s was already closed when seeking the subscription %s of the topic " +
- "%s to the message %s", consumerName, subscription, topicName.toString(),
- messageId.toString())));
- }
-
- if (!isConnected()) {
- return FutureUtil.failedFuture(new PulsarClientException(
- String.format("The client is not connected to the broker when seeking the subscription %s of the " +
- "topic %s to the message %s", subscription, topicName.toString(), messageId.toString())));
- }
-
- final CompletableFuture<Void> seekFuture = new CompletableFuture<>();
-
- long requestId = client.newRequestId();
- ByteBuf seek = null;
- if (messageId instanceof BatchMessageIdImpl) {
- BatchMessageIdImpl msgId = (BatchMessageIdImpl) messageId;
- // Initialize ack set
- BitSetRecyclable ackSet = BitSetRecyclable.create();
- ackSet.set(0, msgId.getBatchSize());
- ackSet.clear(0, Math.max(msgId.getBatchIndex(), 0));
- long[] ackSetArr = ackSet.toLongArray();
- ackSet.recycle();
-
- seek = Commands.newSeek(consumerId, requestId, msgId.getLedgerId(), msgId.getEntryId(), ackSetArr);
- } else {
- MessageIdImpl msgId = (MessageIdImpl) messageId;
- seek = Commands.newSeek(consumerId, requestId, msgId.getLedgerId(), msgId.getEntryId(), new long[0]);
- }
-
- ClientCnx cnx = cnx();
-
- log.info("[{}][{}] Seek subscription to message id {}", topic, subscription, messageId);
-
- cnx.sendRequestWithId(seek, requestId).thenRun(() -> {
- log.info("[{}][{}] Successfully reset subscription to message id {}", topic, subscription, messageId);
- acknowledgmentsGroupingTracker.flushAndClean();
-
- seekMessageId = new BatchMessageIdImpl((MessageIdImpl) messageId);
- duringSeek.set(true);
- lastDequeuedMessageId = MessageId.earliest;
+ public CompletableFuture<Void> seekAsync(long timestamp) {
+ String seekBy = String.format("the timestamp %d", timestamp);
+ return seekAsyncCheckState(seekBy).orElseGet(() -> {
+ long requestId = client.newRequestId();
+ return seekAsyncInternal(requestId, Commands.newSeek(consumerId, requestId, timestamp),
+ MessageId.earliest, seekBy);
+ });
+ }
- incomingMessages.clear();
- resetIncomingMessageSize();
- seekFuture.complete(null);
- }).exceptionally(e -> {
- log.error("[{}][{}] Failed to reset subscription: {}", topic, subscription, e.getCause().getMessage());
- seekFuture.completeExceptionally(
- PulsarClientException.wrap(e.getCause(),
- String.format("Failed to seek the subscription %s of the topic %s to the message %s",
- subscription, topicName.toString(), messageId.toString())));
- return null;
+ @Override
+ public CompletableFuture<Void> seekAsync(MessageId messageId) {
+ String seekBy = String.format("the message %s", messageId.toString());
+ return seekAsyncCheckState(seekBy).orElseGet(() -> {
+ long requestId = client.newRequestId();
+ ByteBuf seek = null;
+ if (messageId instanceof BatchMessageIdImpl) {
+ BatchMessageIdImpl msgId = (BatchMessageIdImpl) messageId;
+ // Initialize ack set
+ BitSetRecyclable ackSet = BitSetRecyclable.create();
+ ackSet.set(0, msgId.getBatchSize());
+ ackSet.clear(0, Math.max(msgId.getBatchIndex(), 0));
+ long[] ackSetArr = ackSet.toLongArray();
+ ackSet.recycle();
+
+ seek = Commands.newSeek(consumerId, requestId, msgId.getLedgerId(), msgId.getEntryId(), ackSetArr);
+ } else {
+ MessageIdImpl msgId = (MessageIdImpl) messageId;
+ seek = Commands.newSeek(consumerId, requestId, msgId.getLedgerId(), msgId.getEntryId(), new long[0]);
+ }
+ return seekAsyncInternal(requestId, seek, messageId, seekBy);
});
- return seekFuture;
}
public boolean hasMessageAvailable() throws PulsarClientException {