You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/12/07 13:17:14 UTC

[pulsar] branch master updated: Issue 8677: Cannot get lastMessageId for an empty topic due to message retention (#8725)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 5054642  Issue 8677: Cannot get lastMessageId for an empty topic due to message retention (#8725)
5054642 is described below

commit 5054642966adc97808ca8e97a4e22170e5964b0b
Author: Enrico Olivelli <eo...@gmail.com>
AuthorDate: Mon Dec 7 14:16:43 2020 +0100

    Issue 8677: Cannot get lastMessageId for an empty topic due to message retention (#8725)
    
    When we are trimming the ledgers we are saving the `currentLedger` but as soon as your restart the broker the currentLedger is not containing the lastMessageId (because it is a fresh new ledger).
    
    Changes:
    - add test case on pulsar-broker that reproduces the issue reported but the user
    - log a message when we are trimming the ledger at lastAddConfirmedEntry
    - add test case that prevent changes in the future on ManagedLedgerImpl
    - fix a minor issue in PersistentTopic#getLastMessageId, a return keyword was missing and we continued with a call ti ManagedLedger, the CompletableFuture was already 'completed' so the final result is not changed (but we are saving resources)
    
    
    Fixes #8677
---
 .gitignore                                         |  3 +
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  7 +-
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 27 ++++++++
 .../broker/service/persistent/PersistentTopic.java | 15 +++-
 .../broker/service/ConsumedLedgersTrimTest.java    | 80 ++++++++++++++++++++++
 5 files changed, 129 insertions(+), 3 deletions(-)

diff --git a/.gitignore b/.gitignore
index 297f31d..176d1d2 100644
--- a/.gitignore
+++ b/.gitignore
@@ -30,6 +30,9 @@ pulsar-functions/worker/src/test/resources/
 *.iml
 *.iws
 
+# NetBeans
+nb-configuration.xml
+
 # Mac
 **/.DS_Store
 
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index e512485..ba50d2b 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -2072,7 +2072,6 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
             if (log.isDebugEnabled()) {
                 log.debug("[{}] Slowest consumer ledger id: {}", name, slowestReaderLedgerId);
             }
-
             // skip ledger if retention constraint met
             for (LedgerInfo ls : ledgers.headMap(slowestReaderLedgerId, false).values()) {
                 boolean expired = hasLedgerRetentionExpired(ls.getTimestamp());
@@ -2123,8 +2122,14 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
 
             advanceNonDurableCursors(ledgersToDelete);
 
+            PositionImpl currentLastConfirmedEntry = lastConfirmedEntry;
             // Update metadata
             for (LedgerInfo ls : ledgersToDelete) {
+                if (currentLastConfirmedEntry != null && ls.getLedgerId() == currentLastConfirmedEntry.getLedgerId()) {
+                    // this info is relevant because the lastMessageId won't be available anymore
+                    log.info("[{}] Ledger {} contains the current last confirmed entry {}, and it is going to be deleted", name,
+                            ls.getLedgerId(), currentLastConfirmedEntry);
+                }
                 ledgerCache.remove(ls.getLedgerId());
 
                 ledgers.remove(ls.getLedgerId());
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 56f215c..e6388c4 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -1854,6 +1854,33 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
         ml.close();
     }
 
+    /**
+     * Set retention time = 0 and create a empty ledger,
+     * first position can't higher than last after trim ledgers.
+     * Even if we do not have subscriptions the ledger
+     * that contains the lastConfirmedEntry will be deleted anyway.
+     */
+    @Test
+    public void testRetention0WithEmptyLedgerWithoutCursors() throws Exception {
+        ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle());
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setRetentionTime(0, TimeUnit.MINUTES);
+        config.setMaxEntriesPerLedger(1);
+
+        ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open("deletion_after_retention_test_ledger", config);
+        ml.addEntry("message1".getBytes());
+        ml.close();
+
+        // reopen ml
+        ml = (ManagedLedgerImpl) factory.open("deletion_after_retention_test_ledger", config);
+        ml.internalTrimConsumedLedgers(CompletableFuture.completedFuture(null));
+
+        assertTrue(ml.getFirstPosition().ledgerId <= ml.lastConfirmedEntry.ledgerId);
+        assertFalse(ml.getLedgersInfo().containsKey(ml.lastConfirmedEntry.ledgerId),
+                "the ledger at lastConfirmedEntry has not been trimmed!");
+        ml.close();
+    }
+
     @Test
     public void testInfiniteRetention() throws Exception {
         ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle());
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index a88eb6f..4a5f4e8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2192,12 +2192,23 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
     public CompletableFuture<MessageId> getLastMessageId() {
         CompletableFuture<MessageId> completableFuture = new CompletableFuture<>();
         PositionImpl position = (PositionImpl) ledger.getLastConfirmedEntry();
-        int partitionIndex = TopicName.getPartitionIndex(getName());
+        String name = getName();
+        int partitionIndex = TopicName.getPartitionIndex(name);
+        if (log.isDebugEnabled()) {
+            log.debug("getLastMessageId {}, partitionIndex{}, position {}", name, partitionIndex, position);
+        }
         if (position.getEntryId() == -1) {
             completableFuture
                     .complete(new MessageIdImpl(position.getLedgerId(), position.getEntryId(), partitionIndex));
+            return completableFuture;
+        }
+        ManagedLedgerImpl ledgerImpl = (ManagedLedgerImpl) ledger;
+        if (!ledgerImpl.ledgerExists(position.getLedgerId())) {
+            completableFuture
+                    .complete(MessageId.earliest);
+            return completableFuture;
         }
-        ((ManagedLedgerImpl) ledger).asyncReadEntry(position, new AsyncCallbacks.ReadEntryCallback() {
+        ledgerImpl.asyncReadEntry(position, new AsyncCallbacks.ReadEntryCallback() {
             @Override
             public void readEntryComplete(Entry entry, Object ctx) {
                 PulsarApi.MessageMetadata metadata = Commands.parseMessageMetadata(entry.getDataBuffer());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumedLedgersTrimTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumedLedgersTrimTest.java
index 72c2eff..cc84de2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumedLedgersTrimTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumedLedgersTrimTest.java
@@ -19,6 +19,9 @@
 package org.apache.pulsar.broker.service;
 
 
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotEquals;
+import java.util.concurrent.CompletableFuture;
 import lombok.Cleanup;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
@@ -30,8 +33,14 @@ import org.junit.Test;
 import org.testng.Assert;
 
 import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.client.api.MessageId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class ConsumedLedgersTrimTest extends BrokerTestBase {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ConsumedLedgersTrimTest.class);
+
     @Override
     protected void setup() throws Exception {
         //No-op
@@ -90,4 +99,75 @@ public class ConsumedLedgersTrimTest extends BrokerTestBase {
         Thread.sleep(1500);
         Assert.assertEquals(managedLedger.getLedgersInfoAsList().size(), 1);
     }
+
+
+    @Test
+    public void TestConsumedLedgersTrimNoSubscriptions() throws Exception {
+        conf.setRetentionCheckIntervalInSeconds(1);
+        conf.setBrokerDeleteInactiveTopicsEnabled(false);
+        super.baseSetup();
+        final String topicName = "persistent://prop/ns-abc/TestConsumedLedgersTrimNoSubscriptions";
+
+        // write some messages
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topicName)
+                .producerName("producer-name")
+                .create();
+
+        // set retention parameters, the ledgers are to be deleted as soon as possible
+        // but the topic is not to be automatically deleted
+        PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
+        ManagedLedgerConfig managedLedgerConfig = persistentTopic.getManagedLedger().getConfig();
+        managedLedgerConfig.setRetentionSizeInMB(-1);
+        managedLedgerConfig.setRetentionTime(1, TimeUnit.SECONDS);
+        managedLedgerConfig.setMaxEntriesPerLedger(1000);
+        managedLedgerConfig.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS);
+        MessageId initialMessageId = persistentTopic.getLastMessageId().get();
+        LOG.info("lastmessageid " + initialMessageId);
+
+        int msgNum = 7;
+        for (int i = 0; i < msgNum; i++) {
+            producer.send(new byte[1024 * 1024]);
+        }
+
+        ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
+        Assert.assertEquals(managedLedger.getLedgersInfoAsList().size(), 1);
+        MessageId messageIdBeforeRestart = pulsar.getAdminClient().topics().getLastMessageId(topicName);
+        LOG.info("messageIdBeforeRestart " + messageIdBeforeRestart);
+        assertNotEquals(messageIdBeforeRestart, initialMessageId);
+
+        // restart the broker we have to start a new ledger
+        // the lastMessageId is still on the previous ledger
+        restartBroker();
+        // force load topic
+        pulsar.getAdminClient().topics().getStats(topicName);
+        MessageId messageIdAfterRestart = pulsar.getAdminClient().topics().getLastMessageId(topicName);
+        LOG.info("lastmessageid " + messageIdAfterRestart);
+        assertEquals(messageIdAfterRestart, messageIdBeforeRestart);
+
+        persistentTopic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
+        managedLedgerConfig = persistentTopic.getManagedLedger().getConfig();
+        managedLedgerConfig.setRetentionSizeInMB(-1);
+        managedLedgerConfig.setRetentionTime(1, TimeUnit.SECONDS);
+        managedLedgerConfig.setMaxEntriesPerLedger(1);
+        managedLedgerConfig.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS);
+        managedLedger = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
+        // now we have two ledgers, the first is expired but is contains the lastMessageId
+        // the second is empty and should be kept as it is the current tail
+        Assert.assertEquals(managedLedger.getLedgersInfoAsList().size(), 2);
+
+        // force trimConsumedLedgers
+        Thread.sleep(3000);
+        CompletableFuture f = new CompletableFuture();
+        managedLedger.trimConsumedLedgersInBackground(f);
+        f.join();
+
+        // lastMessageId should be available even in this case, but is must
+        // refer to -1
+        MessageId messageIdAfterTrim = pulsar.getAdminClient().topics().getLastMessageId(topicName);
+        LOG.info("lastmessageid " + messageIdAfterTrim);
+        assertEquals(messageIdAfterTrim, MessageId.earliest);
+
+    }
 }