You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2024/03/27 20:34:36 UTC

(pulsar) 01/02: [fix][broker] Avoid expired unclosed ledgers when checking expired messages by ledger closure time (#22335)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 20be6ca977e78c7defa14883becb84397003659d
Author: Cong Zhao <zh...@apache.org>
AuthorDate: Thu Mar 28 03:42:15 2024 +0800

    [fix][broker] Avoid expired unclosed ledgers when checking expired messages by ledger closure time (#22335)
    
    (cherry picked from commit f77fe5f099f7ecc334509db07bba477c4226cf19)
---
 .../persistent/PersistentMessageExpiryMonitor.java |  4 +-
 .../service/PersistentMessageFinderTest.java       | 51 +++++++++++++++++++---
 2 files changed, 46 insertions(+), 9 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
index d82f3d93f8f..51ce2c1b8ab 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
@@ -116,8 +116,8 @@ public class PersistentMessageExpiryMonitor implements FindEntryCallback {
                             managedLedger.getLedgersInfo().lastKey(), true);
             MLDataFormats.ManagedLedgerInfo.LedgerInfo info = null;
             for (MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo : ledgerInfoSortedMap.values()) {
-                if (!ledgerInfo.hasTimestamp() || !MessageImpl.isEntryExpired(messageTTLInSeconds,
-                        ledgerInfo.getTimestamp())) {
+                if (!ledgerInfo.hasTimestamp() || ledgerInfo.getTimestamp() == 0L
+                        || !MessageImpl.isEntryExpired(messageTTLInSeconds, ledgerInfo.getTimestamp())) {
                     break;
                 }
                 info = ledgerInfo;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
index d56968c8f8e..194747f59fc 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
@@ -33,10 +33,8 @@ import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
-
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.UnpooledByteBufAllocator;
-
 import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.HashSet;
@@ -46,7 +44,9 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-
+import java.util.concurrent.atomic.AtomicReference;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.core.MediaType;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
@@ -59,6 +59,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
+import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor;
 import org.apache.pulsar.broker.service.persistent.PersistentMessageFinder;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
@@ -71,11 +72,10 @@ import org.apache.pulsar.common.intercept.BrokerEntryMetadataUtils;
 import org.apache.pulsar.common.protocol.ByteBufPair;
 import org.apache.pulsar.common.protocol.Commands;
 import org.awaitility.Awaitility;
+import org.mockito.Mockito;
+import org.testng.Assert;
 import org.testng.annotations.Test;
 
-import javax.ws.rs.client.Entity;
-import javax.ws.rs.core.MediaType;
-
 @Test(groups = "broker")
 public class PersistentMessageFinderTest extends MockedBookKeeperTestCase {
 
@@ -261,7 +261,7 @@ public class PersistentMessageFinderTest extends MockedBookKeeperTestCase {
         ledger.addEntry(createMessageWrittenToLedger("msg2"));
         ledger.addEntry(createMessageWrittenToLedger("msg3"));
         Position lastPosition = ledger.addEntry(createMessageWrittenToLedger("last-message"));
-        
+
         long endTimestamp = System.currentTimeMillis() + 1000;
 
         Result result = new Result();
@@ -451,6 +451,43 @@ public class PersistentMessageFinderTest extends MockedBookKeeperTestCase {
         assertEquals(c1.getNumberOfEntriesInBacklog(true), 0);
     }
 
+    @Test
+    public void testCheckExpiryByLedgerClosureTimeWithAckUnclosedLedger() throws Throwable {
+        final String ledgerAndCursorName = "testCheckExpiryByLedgerClosureTimeWithAckUnclosedLedger";
+        int maxTTLSeconds = 1;
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setMaxEntriesPerLedger(5);
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open(ledgerAndCursorName, config);
+        ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor(ledgerAndCursorName);
+        // set client clock to 10 days later
+        long incorrectPublishTimestamp = System.currentTimeMillis() + TimeUnit.DAYS.toMillis(10);
+        for (int i = 0; i < 7; i++) {
+            ledger.addEntry(createMessageWrittenToLedger("msg" + i, incorrectPublishTimestamp));
+        }
+        assertEquals(ledger.getLedgersInfoAsList().size(), 2);
+        PersistentMessageExpiryMonitor monitor =
+                new PersistentMessageExpiryMonitor("topicname", c1.getName(), c1, null);
+        AsyncCallbacks.MarkDeleteCallback markDeleteCallback =
+                (AsyncCallbacks.MarkDeleteCallback) spy(
+                        FieldUtils.readDeclaredField(monitor, "markDeleteCallback", true));
+        FieldUtils.writeField(monitor, "markDeleteCallback", markDeleteCallback, true);
+
+        AtomicReference<Throwable> throwableAtomicReference = new AtomicReference<>();
+        Mockito.doAnswer(invocation -> {
+            ManagedLedgerException argument = invocation.getArgument(0, ManagedLedgerException.class);
+            throwableAtomicReference.set(argument);
+            return invocation.callRealMethod();
+        }).when(markDeleteCallback).markDeleteFailed(any(), any());
+
+        PositionImpl position = (PositionImpl) ledger.getLastConfirmedEntry();
+        c1.markDelete(position);
+        Thread.sleep(TimeUnit.SECONDS.toMillis(maxTTLSeconds));
+        monitor.expireMessages(maxTTLSeconds);
+        assertEquals(c1.getNumberOfEntriesInBacklog(true), 0);
+
+        Assert.assertNull(throwableAtomicReference.get());
+    }
+
     @Test
     void testMessageExpiryWithPosition() throws Exception {
         final String ledgerAndCursorName = "testPersistentMessageExpiryWithPositionNonRecoverableLedgers";