You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/12/07 13:17:14 UTC
[pulsar] branch master updated: Issue 8677: Cannot get
lastMessageId for an empty topic due to message retention (#8725)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 5054642 Issue 8677: Cannot get lastMessageId for an empty topic due to message retention (#8725)
5054642 is described below
commit 5054642966adc97808ca8e97a4e22170e5964b0b
Author: Enrico Olivelli <eo...@gmail.com>
AuthorDate: Mon Dec 7 14:16:43 2020 +0100
Issue 8677: Cannot get lastMessageId for an empty topic due to message retention (#8725)
When we are trimming the ledgers we are saving the `currentLedger` but as soon as your restart the broker the currentLedger is not containing the lastMessageId (because it is a fresh new ledger).
Changes:
- add test case on pulsar-broker that reproduces the issue reported but the user
- log a message when we are trimming the ledger at lastAddConfirmedEntry
- add test case that prevent changes in the future on ManagedLedgerImpl
- fix a minor issue in PersistentTopic#getLastMessageId, a return keyword was missing and we continued with a call ti ManagedLedger, the CompletableFuture was already 'completed' so the final result is not changed (but we are saving resources)
Fixes #8677
---
.gitignore | 3 +
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 7 +-
.../bookkeeper/mledger/impl/ManagedLedgerTest.java | 27 ++++++++
.../broker/service/persistent/PersistentTopic.java | 15 +++-
.../broker/service/ConsumedLedgersTrimTest.java | 80 ++++++++++++++++++++++
5 files changed, 129 insertions(+), 3 deletions(-)
diff --git a/.gitignore b/.gitignore
index 297f31d..176d1d2 100644
--- a/.gitignore
+++ b/.gitignore
@@ -30,6 +30,9 @@ pulsar-functions/worker/src/test/resources/
*.iml
*.iws
+# NetBeans
+nb-configuration.xml
+
# Mac
**/.DS_Store
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index e512485..ba50d2b 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -2072,7 +2072,6 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
if (log.isDebugEnabled()) {
log.debug("[{}] Slowest consumer ledger id: {}", name, slowestReaderLedgerId);
}
-
// skip ledger if retention constraint met
for (LedgerInfo ls : ledgers.headMap(slowestReaderLedgerId, false).values()) {
boolean expired = hasLedgerRetentionExpired(ls.getTimestamp());
@@ -2123,8 +2122,14 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
advanceNonDurableCursors(ledgersToDelete);
+ PositionImpl currentLastConfirmedEntry = lastConfirmedEntry;
// Update metadata
for (LedgerInfo ls : ledgersToDelete) {
+ if (currentLastConfirmedEntry != null && ls.getLedgerId() == currentLastConfirmedEntry.getLedgerId()) {
+ // this info is relevant because the lastMessageId won't be available anymore
+ log.info("[{}] Ledger {} contains the current last confirmed entry {}, and it is going to be deleted", name,
+ ls.getLedgerId(), currentLastConfirmedEntry);
+ }
ledgerCache.remove(ls.getLedgerId());
ledgers.remove(ls.getLedgerId());
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 56f215c..e6388c4 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -1854,6 +1854,33 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
ml.close();
}
+ /**
+ * Set retention time = 0 and create a empty ledger,
+ * first position can't higher than last after trim ledgers.
+ * Even if we do not have subscriptions the ledger
+ * that contains the lastConfirmedEntry will be deleted anyway.
+ */
+ @Test
+ public void testRetention0WithEmptyLedgerWithoutCursors() throws Exception {
+ ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle());
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setRetentionTime(0, TimeUnit.MINUTES);
+ config.setMaxEntriesPerLedger(1);
+
+ ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open("deletion_after_retention_test_ledger", config);
+ ml.addEntry("message1".getBytes());
+ ml.close();
+
+ // reopen ml
+ ml = (ManagedLedgerImpl) factory.open("deletion_after_retention_test_ledger", config);
+ ml.internalTrimConsumedLedgers(CompletableFuture.completedFuture(null));
+
+ assertTrue(ml.getFirstPosition().ledgerId <= ml.lastConfirmedEntry.ledgerId);
+ assertFalse(ml.getLedgersInfo().containsKey(ml.lastConfirmedEntry.ledgerId),
+ "the ledger at lastConfirmedEntry has not been trimmed!");
+ ml.close();
+ }
+
@Test
public void testInfiniteRetention() throws Exception {
ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle());
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index a88eb6f..4a5f4e8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2192,12 +2192,23 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
public CompletableFuture<MessageId> getLastMessageId() {
CompletableFuture<MessageId> completableFuture = new CompletableFuture<>();
PositionImpl position = (PositionImpl) ledger.getLastConfirmedEntry();
- int partitionIndex = TopicName.getPartitionIndex(getName());
+ String name = getName();
+ int partitionIndex = TopicName.getPartitionIndex(name);
+ if (log.isDebugEnabled()) {
+ log.debug("getLastMessageId {}, partitionIndex{}, position {}", name, partitionIndex, position);
+ }
if (position.getEntryId() == -1) {
completableFuture
.complete(new MessageIdImpl(position.getLedgerId(), position.getEntryId(), partitionIndex));
+ return completableFuture;
+ }
+ ManagedLedgerImpl ledgerImpl = (ManagedLedgerImpl) ledger;
+ if (!ledgerImpl.ledgerExists(position.getLedgerId())) {
+ completableFuture
+ .complete(MessageId.earliest);
+ return completableFuture;
}
- ((ManagedLedgerImpl) ledger).asyncReadEntry(position, new AsyncCallbacks.ReadEntryCallback() {
+ ledgerImpl.asyncReadEntry(position, new AsyncCallbacks.ReadEntryCallback() {
@Override
public void readEntryComplete(Entry entry, Object ctx) {
PulsarApi.MessageMetadata metadata = Commands.parseMessageMetadata(entry.getDataBuffer());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumedLedgersTrimTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumedLedgersTrimTest.java
index 72c2eff..cc84de2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumedLedgersTrimTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumedLedgersTrimTest.java
@@ -19,6 +19,9 @@
package org.apache.pulsar.broker.service;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotEquals;
+import java.util.concurrent.CompletableFuture;
import lombok.Cleanup;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
@@ -30,8 +33,14 @@ import org.junit.Test;
import org.testng.Assert;
import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.client.api.MessageId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class ConsumedLedgersTrimTest extends BrokerTestBase {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ConsumedLedgersTrimTest.class);
+
@Override
protected void setup() throws Exception {
//No-op
@@ -90,4 +99,75 @@ public class ConsumedLedgersTrimTest extends BrokerTestBase {
Thread.sleep(1500);
Assert.assertEquals(managedLedger.getLedgersInfoAsList().size(), 1);
}
+
+
+ @Test
+ public void TestConsumedLedgersTrimNoSubscriptions() throws Exception {
+ conf.setRetentionCheckIntervalInSeconds(1);
+ conf.setBrokerDeleteInactiveTopicsEnabled(false);
+ super.baseSetup();
+ final String topicName = "persistent://prop/ns-abc/TestConsumedLedgersTrimNoSubscriptions";
+
+ // write some messages
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topicName)
+ .producerName("producer-name")
+ .create();
+
+ // set retention parameters, the ledgers are to be deleted as soon as possible
+ // but the topic is not to be automatically deleted
+ PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
+ ManagedLedgerConfig managedLedgerConfig = persistentTopic.getManagedLedger().getConfig();
+ managedLedgerConfig.setRetentionSizeInMB(-1);
+ managedLedgerConfig.setRetentionTime(1, TimeUnit.SECONDS);
+ managedLedgerConfig.setMaxEntriesPerLedger(1000);
+ managedLedgerConfig.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS);
+ MessageId initialMessageId = persistentTopic.getLastMessageId().get();
+ LOG.info("lastmessageid " + initialMessageId);
+
+ int msgNum = 7;
+ for (int i = 0; i < msgNum; i++) {
+ producer.send(new byte[1024 * 1024]);
+ }
+
+ ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
+ Assert.assertEquals(managedLedger.getLedgersInfoAsList().size(), 1);
+ MessageId messageIdBeforeRestart = pulsar.getAdminClient().topics().getLastMessageId(topicName);
+ LOG.info("messageIdBeforeRestart " + messageIdBeforeRestart);
+ assertNotEquals(messageIdBeforeRestart, initialMessageId);
+
+ // restart the broker we have to start a new ledger
+ // the lastMessageId is still on the previous ledger
+ restartBroker();
+ // force load topic
+ pulsar.getAdminClient().topics().getStats(topicName);
+ MessageId messageIdAfterRestart = pulsar.getAdminClient().topics().getLastMessageId(topicName);
+ LOG.info("lastmessageid " + messageIdAfterRestart);
+ assertEquals(messageIdAfterRestart, messageIdBeforeRestart);
+
+ persistentTopic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
+ managedLedgerConfig = persistentTopic.getManagedLedger().getConfig();
+ managedLedgerConfig.setRetentionSizeInMB(-1);
+ managedLedgerConfig.setRetentionTime(1, TimeUnit.SECONDS);
+ managedLedgerConfig.setMaxEntriesPerLedger(1);
+ managedLedgerConfig.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS);
+ managedLedger = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
+ // now we have two ledgers, the first is expired but is contains the lastMessageId
+ // the second is empty and should be kept as it is the current tail
+ Assert.assertEquals(managedLedger.getLedgersInfoAsList().size(), 2);
+
+ // force trimConsumedLedgers
+ Thread.sleep(3000);
+ CompletableFuture f = new CompletableFuture();
+ managedLedger.trimConsumedLedgersInBackground(f);
+ f.join();
+
+ // lastMessageId should be available even in this case, but is must
+ // refer to -1
+ MessageId messageIdAfterTrim = pulsar.getAdminClient().topics().getLastMessageId(topicName);
+ LOG.info("lastmessageid " + messageIdAfterTrim);
+ assertEquals(messageIdAfterTrim, MessageId.earliest);
+
+ }
}