You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/02/14 05:26:29 UTC

[pulsar] branch branch-2.9 updated: Clean up individually deleted messages before the mark-delete position (#14261)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new 1cea990  Clean up individually deleted messages before the mark-delete position (#14261)
1cea990 is described below

commit 1cea9909d695bf0c8a838f3ae600500d31371e95
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Sun Feb 13 21:22:38 2022 -0800

    Clean up individually deleted messages before the mark-delete position (#14261)
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 17 +++++++++++++-
 .../bookkeeper/mledger/impl/ManagedCursorTest.java | 26 ++++++++++++++++++++++
 2 files changed, 42 insertions(+), 1 deletion(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 2f9578c..d37ecda 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1580,7 +1580,9 @@ public class ManagedCursorImpl implements ManagedCursor {
      */
     PositionImpl setAcknowledgedPosition(PositionImpl newMarkDeletePosition) {
         if (newMarkDeletePosition.compareTo(markDeletePosition) < 0) {
-            throw new IllegalArgumentException("Mark deleting an already mark-deleted position");
+            throw new IllegalArgumentException(
+                    "Mark deleting an already mark-deleted position. Current mark-delete: " + markDeletePosition
+                            + " -- attempted mark delete: " + newMarkDeletePosition);
         }
 
         PositionImpl oldMarkDeletePosition = markDeletePosition;
@@ -2000,6 +2002,19 @@ public class ManagedCursorImpl implements ManagedCursor {
             // mark-delete to the upper bound of the first range segment
             Range<PositionImpl> range = individualDeletedMessages.firstRange();
 
+            // If the upper bound is before the mark-delete position, we need to move ahead as these
+            // individualDeletedMessages are now irrelevant
+            if (range.upperEndpoint().compareTo(markDeletePosition) <= 0) {
+                individualDeletedMessages.removeAtMost(markDeletePosition.getLedgerId(),
+                        markDeletePosition.getEntryId());
+                range = individualDeletedMessages.firstRange();
+            }
+
+            if (range == null) {
+                // The set was completely cleaned up now
+                return;
+            }
+
             // If the lowerBound is ahead of MarkDelete, verify if there are any entries in-between
             if (range.lowerEndpoint().compareTo(markDeletePosition) <= 0 || ledger
                     .getNumberOfEntries(Range.openClosed(markDeletePosition, range.lowerEndpoint())) <= 0) {
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 2f80bc8..a00a817 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -88,6 +88,7 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
+import org.apache.pulsar.common.util.collections.LongPairRangeSet;
 import org.apache.pulsar.metadata.api.extended.SessionEvent;
 import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
@@ -3508,6 +3509,31 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
     }
 
     @Test
+    public void testConsistencyOfIndividualMessages() throws Exception {
+        ManagedLedger ledger1 = factory.open("testConsistencyOfIndividualMessages");
+        ManagedCursorImpl c1 = (ManagedCursorImpl) ledger1.openCursor("c");
+
+        PositionImpl p1 = (PositionImpl) ledger1.addEntry(new byte[1024]);
+        c1.markDelete(p1);
+
+        // Artificially add a position that is before the current mark-delete position
+        LongPairRangeSet<PositionImpl> idm = c1.getIndividuallyDeletedMessagesSet();
+        idm.addOpenClosed(p1.getLedgerId() - 1, 0, p1.getLedgerId() - 1, 10);
+
+        List<Position> positions = new ArrayList<>();
+        for (int i = 0; i < 20; i++) {
+            positions.add(ledger1.addEntry(new byte[1024]));
+        }
+
+        for (int i = 0; i < 20; i++) {
+            c1.delete(positions.get(i));
+        }
+
+        assertEquals(c1.getTotalNonContiguousDeletedMessagesRange(), 0);
+        assertEquals(c1.getMarkDeletedPosition(), positions.get(positions.size() -1));
+    }
+
+    @Test
     public void testCursorCheckReadPositionChanged() throws Exception {
         ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig());
         ManagedCursor c1 = ledger.openCursor("c1");