You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by yu...@apache.org on 2023/10/08 14:19:18 UTC
[pulsar] branch master updated: [fix] [ml] fix wrong msg backlog of non-durable cursor after trim ledgers (#21250)
This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ca779827581 [fix] [ml] fix wrong msg backlog of non-durable cursor after trim ledgers (#21250)
ca779827581 is described below
commit ca77982758170993aa52c0f7f45bbf9ad72e368a
Author: fengyubiao <yu...@streamnative.io>
AuthorDate: Sun Oct 8 22:19:12 2023 +0800
[fix] [ml] fix wrong msg backlog of non-durable cursor after trim ledgers (#21250)
### Background
- But after trimming ledgers, `ml.lastConfirmedPosition` relies on a deleted ledger when the current ledger of ML is empty.
- Cursor prevents setting `markDeletedPosition` to a value larger than `ml.lastConfirmedPosition`, but there are no entries to read<sup>[1]</sup>.
- The code description in the method `advanceCursors` said: do not make `cursor.markDeletedPosition` larger than `ml.lastConfirmedPosition`<sup>[2]</sup>
### Issue
If there is no durable cursor, the `markDeletedPosition` might be set to `{current_ledger, -1}`, and `async mark delete` will be prevented by the `rule-2` above. So he `backlog`, `readPosition`, and `markDeletedPosition` of the cursor will be in an incorrect position after trimming the ledger. You can reproduce it by the test `testTrimLedgerIfNoDurableCursor`
### Modifications
Do not make `cursor.markDeletedPosition` larger than `ml.lastConfirmedPosition` when advancing non-durable cursors.
---
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 28 +++--
.../client/api/NonDurableSubscriptionTest.java | 115 ++++++++++++++++++++-
2 files changed, 133 insertions(+), 10 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index d51b48bdda5..e011bf3e6d7 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -2850,15 +2850,14 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
return;
}
- // need to move mark delete for non-durable cursors to the first ledger NOT marked for deletion
- // calling getNumberOfEntries latter for a ledger that is already deleted will be problematic and return
- // incorrect results
- Long firstNonDeletedLedger = ledgers.higherKey(ledgersToDelete.get(ledgersToDelete.size() - 1).getLedgerId());
- if (firstNonDeletedLedger == null) {
- throw new LedgerNotExistException("First non deleted Ledger is not found");
+ // Just ack messages like a consumer. Normally, consumers will not confirm a position that does not exist, so
+ // find the latest existing position to ack.
+ PositionImpl highestPositionToDelete = calculateLastEntryInLedgerList(ledgersToDelete);
+ if (highestPositionToDelete == null) {
+ log.warn("[{}] The ledgers to be trim are all empty, skip to advance non-durable cursors: {}",
+ name, ledgersToDelete);
+ return;
}
- PositionImpl highestPositionToDelete = new PositionImpl(firstNonDeletedLedger, -1);
-
cursors.forEach(cursor -> {
// move the mark delete position to the highestPositionToDelete only if it is smaller than the add confirmed
// to prevent the edge case where the cursor is caught up to the latest and highestPositionToDelete may be
@@ -2882,6 +2881,19 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
});
}
+ /**
+ * @return null if all ledgers is empty.
+ */
+ private PositionImpl calculateLastEntryInLedgerList(List<LedgerInfo> ledgersToDelete) {
+ for (int i = ledgersToDelete.size() - 1; i >= 0; i--) {
+ LedgerInfo ledgerInfo = ledgersToDelete.get(i);
+ if (ledgerInfo != null && ledgerInfo.hasEntries() && ledgerInfo.getEntries() > 0) {
+ return PositionImpl.get(ledgerInfo.getLedgerId(), ledgerInfo.getEntries() - 1);
+ }
+ }
+ return null;
+ }
+
/**
* Delete this ManagedLedger completely from the system.
*
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java
index b5d00ac012a..20407295ccb 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java
@@ -33,6 +33,7 @@ import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.PulsarService;
@@ -55,7 +56,7 @@ import org.testng.annotations.Test;
@Test(groups = "broker-api")
@Slf4j
-public class NonDurableSubscriptionTest extends ProducerConsumerBase {
+public class NonDurableSubscriptionTest extends ProducerConsumerBase {
private final AtomicInteger numFlow = new AtomicInteger(0);
@@ -316,7 +317,7 @@ public class NonDurableSubscriptionTest extends ProducerConsumerBase {
}
@Test
- public void testTrimLedgerIfNoDurableCursor() throws Exception {
+ public void testHasMessageAvailableIfIncomingQueueNotEmpty() throws Exception {
final String nonDurableCursor = "non-durable-cursor";
final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
Reader<String> reader = pulsarClient.newReader(Schema.STRING).topic(topicName).receiverQueueSize(1)
@@ -557,4 +558,114 @@ public class NonDurableSubscriptionTest extends ProducerConsumerBase {
producer.close();
admin.topics().delete(topicName, false);
}
+
+ @Test
+ public void testTrimLedgerIfNoDurableCursor() throws Exception {
+ final String nonDurableCursor = "non-durable-cursor";
+ final String durableCursor = "durable-cursor";
+ final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+ admin.topics().createNonPartitionedTopic(topicName);
+ Reader<String> reader = pulsarClient.newReader(Schema.STRING).topic(topicName).receiverQueueSize(1)
+ .subscriptionName(nonDurableCursor).startMessageId(MessageIdImpl.earliest).create();
+ Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topicName).receiverQueueSize(1)
+ .subscriptionName(durableCursor).subscribe();
+ consumer.close();
+
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topicName).create();
+ producer.send("1");
+ producer.send("2");
+ producer.send("3");
+ producer.send("4");
+ MessageIdImpl msgIdInDeletedLedger5 = (MessageIdImpl) producer.send("5");
+
+ Message<String> msg1 = reader.readNext(2, TimeUnit.SECONDS);
+ assertEquals(msg1.getValue(), "1");
+ Message<String> msg2 = reader.readNext(2, TimeUnit.SECONDS);
+ assertEquals(msg2.getValue(), "2");
+ Message<String> msg3 = reader.readNext(2, TimeUnit.SECONDS);
+ assertEquals(msg3.getValue(), "3");
+
+ // Unsubscribe durable cursor.
+ // Trigger a trim ledgers task, and verify trim ledgers successful.
+ admin.topics().unload(topicName);
+ Thread.sleep(3 * 1000);
+ admin.topics().deleteSubscription(topicName, durableCursor);
+ // Trim ledgers after release durable cursor.
+ trimLedgers(topicName);
+ List<ManagedLedgerInternalStats.LedgerInfo> ledgers = admin.topics().getInternalStats(topicName).ledgers;
+ assertEquals(ledgers.size(), 1);
+ assertNotEquals(ledgers.get(0).ledgerId, msgIdInDeletedLedger5.getLedgerId());
+
+ // Verify backlog and markDeletePosition is correct.
+ Awaitility.await().untilAsserted(() -> {
+ SubscriptionStats subscriptionStats = admin.topics().getStats(topicName, true, true, true)
+ .getSubscriptions().get(nonDurableCursor);
+ log.info("backlog size: {}", subscriptionStats.getMsgBacklog());
+ assertEquals(subscriptionStats.getMsgBacklog(), 0);
+ ManagedLedgerInternalStats.CursorStats cursorStats =
+ admin.topics().getInternalStats(topicName).cursors.get(nonDurableCursor);
+ String[] ledgerIdAndEntryId = cursorStats.markDeletePosition.split(":");
+ PositionImpl actMarkDeletedPos =
+ PositionImpl.get(Long.valueOf(ledgerIdAndEntryId[0]), Long.valueOf(ledgerIdAndEntryId[1]));
+ PositionImpl expectedMarkDeletedPos =
+ PositionImpl.get(msgIdInDeletedLedger5.getLedgerId(), msgIdInDeletedLedger5.getEntryId());
+ log.info("Expected mark deleted position: {}", expectedMarkDeletedPos);
+ log.info("Actual mark deleted position: {}", cursorStats.markDeletePosition);
+ Assert.assertTrue(actMarkDeletedPos.compareTo(expectedMarkDeletedPos) >= 0);
+ });
+
+ // Clear the incoming queue of the reader for next test.
+ while (true) {
+ Message<String> msg = reader.readNext(2, TimeUnit.SECONDS);
+ if (msg == null) {
+ break;
+ }
+ log.info("clear msg: {}", msg.getValue());
+ }
+
+ // The following tests are designed to verify the api "getNumberOfEntries" and "consumedEntries" still work
+ // after changes.See the code-description added with the PR https://github.com/apache/pulsar/pull/10667.
+ PersistentTopic persistentTopic =
+ (PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false).join().get();
+ ManagedLedgerImpl ml = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
+ ManagedCursorImpl cursor = (ManagedCursorImpl) ml.getCursors().get(nonDurableCursor);
+
+ // Verify "getNumberOfEntries" if there is no entries to consume.
+ assertEquals(0, cursor.getNumberOfEntries());
+ assertEquals(0, ml.getNumberOfEntries());
+
+ // Verify "getNumberOfEntries" if there is 1 entry to consume.
+ producer.send("6");
+ producer.send("7");
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(2, ml.getNumberOfEntries());
+ // Since there is one message has been pulled into the incoming queue of reader. There is only one messages
+ // waiting to cursor read.
+ assertEquals(1, cursor.getNumberOfEntries());
+ });
+
+ // Verify "consumedEntries" is correct.
+ ManagedLedgerInternalStats.CursorStats cursorStats =
+ admin.topics().getInternalStats(topicName).cursors.get(nonDurableCursor);
+ // "messagesConsumedCounter" should be 0 after unload the topic.
+ // Note: "topic_internal_stat.cursor.messagesConsumedCounter" means how many messages were acked on this
+ // cursor. The similar one "topic_stats.lastConsumedTimestamp" means the last time of sending messages to
+ // the consumer.
+ assertEquals(0, cursorStats.messagesConsumedCounter);
+ Message<String> msg6 = reader.readNext(2, TimeUnit.SECONDS);
+ assertEquals(msg6.getValue(), "6");
+ Message<String> msg7 = reader.readNext(2, TimeUnit.SECONDS);
+ assertEquals(msg7.getValue(), "7");
+ Awaitility.await().untilAsserted(() -> {
+ // "messagesConsumedCounter" should be 2 after consumed 2 message.
+ ManagedLedgerInternalStats.CursorStats cStat =
+ admin.topics().getInternalStats(topicName).cursors.get(nonDurableCursor);
+ assertEquals(2, cStat.messagesConsumedCounter);
+ });
+
+ // cleanup.
+ reader.close();
+ producer.close();
+ admin.topics().delete(topicName, false);
+ }
}