You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by he...@apache.org on 2024/03/06 21:36:43 UTC
(pulsar) branch branch-3.0 updated: [fix][broker][branch-3.0] Avoid consumers receiving acknowledged messages from compacted topic after reconnection (#21187) (#22142)
This is an automated email from the ASF dual-hosted git repository.
heesung pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 5b62d4b221f [fix][broker][branch-3.0] Avoid consumers receiving acknowledged messages from compacted topic after reconnection (#21187) (#22142)
5b62d4b221f is described below
commit 5b62d4b221fb0686c93911c13998fe3ad2689689
Author: Cong Zhao <zh...@apache.org>
AuthorDate: Thu Mar 7 05:36:36 2024 +0800
[fix][broker][branch-3.0] Avoid consumers receiving acknowledged messages from compacted topic after reconnection (#21187) (#22142)
---
.../apache/bookkeeper/mledger/ManagedCursor.java | 4 +
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 10 +-
.../PersistentDispatcherSingleActiveConsumer.java | 12 +-
.../broker/service/persistent/PersistentTopic.java | 5 +-
.../pulsar/compaction/CompactedTopicImpl.java | 6 +-
.../broker/service/ReplicatorSubscriptionTest.java | 2 +
.../pulsar/broker/transaction/TransactionTest.java | 1 +
.../org/apache/pulsar/client/impl/ReaderTest.java | 28 ++++
.../apache/pulsar/compaction/CompactionTest.java | 164 ++++++++++++++++++++-
9 files changed, 218 insertions(+), 14 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
index d1ffdf6d2d7..bc6a1e9a782 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
@@ -517,6 +517,10 @@ public interface ManagedCursor {
*/
void rewind();
+ default void rewind(boolean readCompacted) {
+ rewind();
+ }
+
/**
* Move the cursor to a different read position.
*
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 10e72f709fe..7fd93dacf49 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -677,7 +677,7 @@ public class ManagedCursorImpl implements ManagedCursor {
LedgerHandle recoveredFromCursorLedger) {
// if the position was at a ledger that didn't exist (since it will be deleted if it was previously empty),
// we need to move to the next existing ledger
- if (!ledger.ledgerExists(position.getLedgerId())) {
+ if (position.getEntryId() == -1L && !ledger.ledgerExists(position.getLedgerId())) {
Long nextExistingLedger = ledger.getNextValidLedger(position.getLedgerId());
if (nextExistingLedger == null) {
log.info("[{}] [{}] Couldn't find next next valid ledger for recovery {}", ledger.getName(), name,
@@ -2518,9 +2518,15 @@ public class ManagedCursorImpl implements ManagedCursor {
@Override
public void rewind() {
+ rewind(false);
+ }
+
+ @Override
+ public void rewind(boolean readCompacted) {
lock.writeLock().lock();
try {
- PositionImpl newReadPosition = ledger.getNextValidPosition(markDeletePosition);
+ PositionImpl newReadPosition =
+ readCompacted ? markDeletePosition.getNext() : ledger.getNextValidPosition(markDeletePosition);
PositionImpl oldReadPosition = readPosition;
log.info("[{}-{}] Rewind from {} to {}", ledger.getName(), name, oldReadPosition, newReadPosition);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index eacc568f0a4..bf6482bda01 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -105,9 +105,9 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher
if (log.isDebugEnabled()) {
log.debug("[{}] Rewind cursor and read more entries without delay", name);
}
- cursor.rewind();
-
Consumer activeConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
+ cursor.rewind(activeConsumer != null && activeConsumer.readCompacted());
+
notifyActiveConsumerChanged(activeConsumer);
readMoreEntries(activeConsumer);
return;
@@ -125,9 +125,9 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher
log.debug("[{}] Rewind cursor and read more entries after {} ms delay", name,
serviceConfig.getActiveConsumerFailoverDelayTimeMillis());
}
- cursor.rewind();
-
Consumer activeConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
+ cursor.rewind(activeConsumer != null && activeConsumer.readCompacted());
+
notifyActiveConsumerChanged(activeConsumer);
readMoreEntries(activeConsumer);
readOnActiveConsumerTask = null;
@@ -198,7 +198,7 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher
log.debug("[{}] rewind because no available consumer found", name);
}
entries.forEach(Entry::release);
- cursor.rewind();
+ cursor.rewind(currentConsumer != null ? currentConsumer.readCompacted() : readConsumer.readCompacted());
if (currentConsumer != null) {
notifyActiveConsumerChanged(currentConsumer);
readMoreEntries(currentConsumer);
@@ -293,7 +293,7 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher
}
cursor.cancelPendingReadRequest();
havePendingRead = false;
- cursor.rewind();
+ cursor.rewind(consumer.readCompacted());
if (log.isDebugEnabled()) {
log.debug("[{}-{}] Cursor rewinded, redelivering unacknowledged messages. ", name, consumer);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index e694e063774..24a223fe82a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -977,7 +977,9 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
}
private CompletableFuture<Subscription> getDurableSubscription(String subscriptionName,
- InitialPosition initialPosition, long startMessageRollbackDurationSec, boolean replicated,
+ InitialPosition initialPosition,
+ long startMessageRollbackDurationSec,
+ boolean replicated,
Map<String, String> subscriptionProperties) {
CompletableFuture<Subscription> subscriptionFuture = new CompletableFuture<>();
if (checkMaxSubscriptionsPerTopicExceed(subscriptionName)) {
@@ -987,7 +989,6 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
}
Map<String, Long> properties = PersistentSubscription.getBaseCursorProperties(replicated);
-
ledger.asyncOpenCursor(Codec.encode(subscriptionName), initialPosition, properties, subscriptionProperties,
new OpenCursorCallback() {
@Override
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index 22520654135..b9066530998 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -96,7 +96,11 @@ public class CompactedTopicImpl implements CompactedTopic {
ReadEntriesCallback callback, Consumer consumer) {
synchronized (this) {
PositionImpl cursorPosition;
- if (isFirstRead && MessageId.earliest.equals(consumer.getStartMessageId())){
+ boolean readFromEarliest = isFirstRead && MessageId.earliest.equals(consumer.getStartMessageId())
+ && (!cursor.isDurable() || cursor.getName().equals(Compactor.COMPACTION_SUBSCRIPTION)
+ || cursor.getMarkDeletedPosition() == null
+ || cursor.getMarkDeletedPosition().getEntryId() == -1L);
+ if (readFromEarliest){
cursorPosition = PositionImpl.EARLIEST;
} else {
cursorPosition = (PositionImpl) cursor.getReadPosition();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
index fe519827be7..4cc3a9ada7d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java
@@ -52,6 +52,7 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
@@ -868,6 +869,7 @@ public class ReplicatorSubscriptionTest extends ReplicatorTestBase {
.topic(topicName)
.subscriptionName("sub2")
.subscriptionType(SubscriptionType.Exclusive)
+ .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.readCompacted(true)
.subscribe();
List<String> result = new ArrayList<>();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index c947ba27069..1ad5ea14e0a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -1877,6 +1877,7 @@ public class TransactionTest extends TransactionTestBase {
.topic(topic)
.subscriptionName("sub")
.subscriptionType(SubscriptionType.Exclusive)
+ .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.readCompacted(true)
.subscribe();
List<String> result = new ArrayList<>();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
index 951f99af1a4..2f91d792581 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
@@ -733,4 +733,32 @@ public class ReaderTest extends MockedPulsarServiceBaseTest {
admin.topics().deletePartitionedTopic(partitionedTopic);
}
+ @Test
+ public void testReaderReconnectedFromNextEntry() throws Exception {
+ final String topic = "persistent://my-property/my-ns/testReaderReconnectedFromNextEntry";
+ Reader<String> reader = pulsarClient.newReader(Schema.STRING).topic(topic).receiverQueueSize(1)
+ .startMessageId(MessageId.earliest).create();
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();
+
+ // Send 3 and consume 1.
+ producer.send("1");
+ producer.send("2");
+ producer.send("3");
+ Message<String> msg1 = reader.readNext(2, TimeUnit.SECONDS);
+ assertEquals(msg1.getValue(), "1");
+
+ // Trigger reader reconnect.
+ admin.topics().unload(topic);
+
+ // For non-durable we are going to restart from the next entry.
+ Message<String> msg2 = reader.readNext(2, TimeUnit.SECONDS);
+ assertEquals(msg2.getValue(), "2");
+ Message<String> msg3 = reader.readNext(2, TimeUnit.SECONDS);
+ assertEquals(msg3.getValue(), "3");
+
+ // cleanup.
+ reader.close();
+ producer.close();
+ admin.topics().delete(topic, false);
+ }
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index 545dd97675c..d3de2187d95 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -1878,6 +1878,7 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
ConsumerImpl<String> consumer = (ConsumerImpl<String>) client.newConsumer(Schema.STRING)
.topic(topicName).readCompacted(true).receiverQueueSize(receiveQueueSize).subscriptionName(subName)
+ .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
//Give some time to consume
@@ -1915,6 +1916,7 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) client.newConsumer(Schema.BYTES)
.topic(topicName).readCompacted(true).receiverQueueSize(receiveQueueSize).subscriptionName(subName)
+ .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
@@ -2175,9 +2177,11 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
});
@Cleanup
- Consumer<String> consumer =
- pulsarClient.newConsumer(Schema.STRING).topic(topicName).subscriptionName(subName).readCompacted(true)
- .subscribe();
+ Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topicName)
+ .subscriptionName("sub-2")
+ .readCompacted(true)
+ .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe();
List<String> result = new ArrayList<>();
while (true) {
@@ -2191,4 +2195,158 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
Assert.assertEquals(result, List.of("V3", "V4", "V5"));
}
+
+ @Test
+ public void testAcknowledgeWithReconnection() throws Exception {
+ final String topicName = "persistent://my-property/use/my-ns/testAcknowledge" + UUID.randomUUID();
+ final String subName = "my-sub";
+ @Cleanup
+ PulsarClient client = newPulsarClient(lookupUrl.toString(), 100);
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .enableBatching(false).topic(topicName).create();
+
+ List<String> expected = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ producer.newMessage().key(String.valueOf(i)).value(String.valueOf(i)).send();
+ expected.add(String.valueOf(i));
+ }
+ producer.flush();
+
+ admin.topics().triggerCompaction(topicName);
+
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(admin.topics().compactionStatus(topicName).status,
+ LongRunningProcessStatus.Status.SUCCESS);
+ });
+
+ // trim the topic
+ admin.topics().unload(topicName);
+
+ Awaitility.await().untilAsserted(() -> {
+ PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(topicName, false);
+ assertEquals(internalStats.numberOfEntries, 0);
+ });
+
+ ConsumerImpl<String> consumer = (ConsumerImpl<String>) client.newConsumer(Schema.STRING)
+ .topic(topicName).readCompacted(true).receiverQueueSize(1).subscriptionName(subName)
+ .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .isAckReceiptEnabled(true)
+ .subscribe();
+
+ List<String> results = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ Message<String> message = consumer.receive(3, TimeUnit.SECONDS);
+ if (message == null) {
+ break;
+ }
+ results.add(message.getValue());
+ consumer.acknowledge(message);
+ }
+
+ Awaitility.await().untilAsserted(() ->
+ assertEquals(admin.topics().getStats(topicName, true).getSubscriptions().get(subName).getMsgBacklog(),
+ 5));
+
+ // Make consumer reconnect to broker
+ admin.topics().unload(topicName);
+
+ // Wait for consumer to reconnect and clear incomingMessages
+ consumer.pause();
+ Awaitility.await().untilAsserted(() -> {
+ Assert.assertEquals(consumer.numMessagesInQueue(), 0);
+ });
+ consumer.resume();
+
+ for (int i = 0; i < 5; i++) {
+ Message<String> message = consumer.receive(3, TimeUnit.SECONDS);
+ if (message == null) {
+ break;
+ }
+ results.add(message.getValue());
+ consumer.acknowledge(message);
+ }
+
+ Awaitility.await().untilAsserted(() ->
+ assertEquals(admin.topics().getStats(topicName, true).getSubscriptions().get(subName).getMsgBacklog(),
+ 0));
+
+ Assert.assertEquals(results, expected);
+
+ Message<String> message = consumer.receive(3, TimeUnit.SECONDS);
+ Assert.assertNull(message);
+
+ // Make consumer reconnect to broker
+ admin.topics().unload(topicName);
+
+ producer.newMessage().key("K").value("V").send();
+ Message<String> message2 = consumer.receive(3, TimeUnit.SECONDS);
+ Assert.assertEquals(message2.getValue(), "V");
+ consumer.acknowledge(message2);
+
+ Awaitility.await().untilAsserted(() -> {
+ PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(topicName);
+ Assert.assertEquals(internalStats.lastConfirmedEntry,
+ internalStats.cursors.get(subName).markDeletePosition);
+ });
+
+ consumer.close();
+ producer.close();
+ }
+
+ @Test
+ public void testEarliestSubsAfterRollover() throws Exception {
+ final String topicName = "persistent://my-property/use/my-ns/testEarliestSubsAfterRollover" + UUID.randomUUID();
+ final String subName = "my-sub";
+ @Cleanup
+ PulsarClient client = newPulsarClient(lookupUrl.toString(), 100);
+ @Cleanup
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .enableBatching(false).topic(topicName).create();
+
+ List<String> expected = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ producer.newMessage().key(String.valueOf(i)).value(String.valueOf(i)).send();
+ expected.add(String.valueOf(i));
+ }
+ producer.flush();
+
+ admin.topics().triggerCompaction(topicName);
+
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(admin.topics().compactionStatus(topicName).status,
+ LongRunningProcessStatus.Status.SUCCESS);
+ });
+
+ // trim the topic
+ admin.topics().unload(topicName);
+
+ Awaitility.await().untilAsserted(() -> {
+ PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(topicName, false);
+ assertEquals(internalStats.numberOfEntries, 0);
+ });
+
+ // Make ml.getFirstPosition() return new ledger first position
+ producer.newMessage().key("K").value("V").send();
+ expected.add("V");
+
+ @Cleanup
+ ConsumerImpl<String> consumer = (ConsumerImpl<String>) client.newConsumer(Schema.STRING)
+ .topic(topicName).readCompacted(true).receiverQueueSize(1).subscriptionName(subName)
+ .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .isAckReceiptEnabled(true)
+ .subscribe();
+
+ List<String> results = new ArrayList<>();
+ while (true) {
+ Message<String> message = consumer.receive(3, TimeUnit.SECONDS);
+ if (message == null) {
+ break;
+ }
+
+ results.add(message.getValue());
+ consumer.acknowledge(message);
+ }
+
+ Assert.assertEquals(results, expected);
+ }
}