You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/09/09 08:00:59 UTC
[pulsar] 07/09: Fix seek at batchIndex level receive duplicated
messages (#11826)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 6763969dac1b9133c9b80130a00fa0f01051b8c4
Author: Aloys <lo...@gmail.com>
AuthorDate: Thu Sep 2 15:46:35 2021 +0800
Fix seek at batchIndex level receive duplicated messages (#11826)
Fixes #11825
### Motivation
If AcknowledgmentAtBatchIndexLevelEnabled, consumer may receive duplicated messages after seek to batchIndex.
This pull request tries to resolve this problem.
### Modifications
Set the `duringSeek` before reveived seek response.
### Verifying this change
This change added tests and can be verified as follows:
SubscriptionSeekTest.testSeekForBatchMessageAndSpecifiedBatchIndex
(cherry picked from commit fe4cd09c297f7ab2a892a6c81bb0998e7ee737f2)
---
.../broker/service/SubscriptionSeekTest.java | 83 ++++++++++++++++++++++
.../apache/pulsar/client/impl/ConsumerImpl.java | 11 ++-
2 files changed, 91 insertions(+), 3 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
index 22e7135..cd584ae 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
@@ -43,6 +43,7 @@ import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
@@ -182,6 +183,88 @@ public class SubscriptionSeekTest extends BrokerTestBase {
assertEquals(receiveId, messageId);
}
}
+
+ @Test
+ public void testSeekForBatchMessageAndSpecifiedBatchIndex() throws Exception {
+ final String topicName = "persistent://prop/use/ns-abcd/testSeekForBatch";
+ String subscriptionName = "my-subscription-batch";
+
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .enableBatching(true)
+ .batchingMaxMessages(3)
+ // set batch max publish delay big enough to make sure entry has 3 messages
+ .batchingMaxPublishDelay(10, TimeUnit.SECONDS)
+ .topic(topicName).create();
+
+
+ List<MessageId> messageIds = new ArrayList<>();
+ List<CompletableFuture<MessageId>> futureMessageIds = new ArrayList<>();
+
+ List<String> messages = new ArrayList<>();
+ for (int i = 0; i < 3; i++) {
+ String message = "my-message-" + i;
+ messages.add(message);
+ CompletableFuture<MessageId> messageIdCompletableFuture = producer.sendAsync(message);
+ futureMessageIds.add(messageIdCompletableFuture);
+ }
+
+ for (CompletableFuture<MessageId> futureMessageId : futureMessageIds) {
+ MessageId messageId = futureMessageId.get();
+ messageIds.add(messageId);
+ }
+
+ producer.close();
+
+ assertTrue(messageIds.get(0) instanceof BatchMessageIdImpl);
+ assertTrue(messageIds.get(1) instanceof BatchMessageIdImpl);
+ assertTrue(messageIds.get(2) instanceof BatchMessageIdImpl);
+
+ BatchMessageIdImpl batchMsgId0 = (BatchMessageIdImpl) messageIds.get(0);
+ BatchMessageIdImpl batchMsgId1 = (BatchMessageIdImpl) messageIds.get(1);
+ BatchMessageIdImpl msgIdToSeekFirst = (BatchMessageIdImpl) messageIds.get(2);
+
+ assertEquals(batchMsgId0.getEntryId(), batchMsgId1.getEntryId());
+ assertEquals(batchMsgId1.getEntryId(), msgIdToSeekFirst.getEntryId());
+
+ PulsarClient newPulsarClient = PulsarClient.builder()
+ // set start backoff interval short enough to make sure client will re-connect quickly
+ .startingBackoffInterval(1, TimeUnit.MICROSECONDS)
+ .serviceUrl(lookupUrl.toString())
+ .build();
+
+ org.apache.pulsar.client.api.Consumer<String> consumer = newPulsarClient.newConsumer(Schema.STRING)
+ .topic(topicName)
+ .subscriptionName(subscriptionName)
+ .startMessageIdInclusive()
+ .subscribe();
+
+ PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
+ assertNotNull(topicRef);
+ assertEquals(topicRef.getSubscriptions().size(), 1);
+
+ consumer.seek(msgIdToSeekFirst);
+ MessageId msgId = consumer.receive().getMessageId();
+ assertTrue(msgId instanceof BatchMessageIdImpl);
+ BatchMessageIdImpl batchMsgId = (BatchMessageIdImpl) msgId;
+ assertEquals(batchMsgId, msgIdToSeekFirst);
+
+
+ consumer.seek(MessageId.earliest);
+ Message<String> receiveBeforEarliest = consumer.receive();
+ assertEquals(receiveBeforEarliest.getValue(), messages.get(0));
+ consumer.seek(MessageId.latest);
+ Message<String> receiveAfterLatest = consumer.receive(1, TimeUnit.SECONDS);
+ assertNull(receiveAfterLatest);
+
+ for (MessageId messageId : messageIds) {
+ consumer.seek(messageId);
+ MessageId receiveId = consumer.receive().getMessageId();
+ assertEquals(receiveId, messageId);
+ }
+
+ newPulsarClient.close();
+ }
+
@Test
public void testSeekForBatchByAdmin() throws PulsarClientException, ExecutionException, InterruptedException, PulsarAdminException {
final String topicName = "persistent://prop/use/ns-abcd/testSeekForBatchByAdmin-" + UUID.randomUUID().toString();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index b7fa039..7a20ff4 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1802,20 +1802,25 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
final CompletableFuture<Void> seekFuture = new CompletableFuture<>();
ClientCnx cnx = cnx();
- log.info("[{}][{}] Seek subscription to {}", topic, subscription, seekBy);
+ BatchMessageIdImpl originSeekMessageId = seekMessageId;
+ seekMessageId = new BatchMessageIdImpl((MessageIdImpl) seekId);
+ duringSeek.set(true);
+ log.info("[{}][{}] Seeking subscription to {}", topic, subscription, seekBy);
cnx.sendRequestWithId(seek, requestId).thenRun(() -> {
log.info("[{}][{}] Successfully reset subscription to {}", topic, subscription, seekBy);
acknowledgmentsGroupingTracker.flushAndClean();
- seekMessageId = new BatchMessageIdImpl((MessageIdImpl) seekId);
- duringSeek.set(true);
lastDequeuedMessageId = MessageId.earliest;
clearIncomingMessages();
seekFuture.complete(null);
}).exceptionally(e -> {
+ // re-set duringSeek and seekMessageId if seek failed
+ seekMessageId = originSeekMessageId;
+ duringSeek.set(false);
log.error("[{}][{}] Failed to reset subscription: {}", topic, subscription, e.getCause().getMessage());
+
seekFuture.completeExceptionally(
PulsarClientException.wrap(e.getCause(),
String.format("Failed to seek the subscription %s of the topic %s to %s",