You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/02/14 05:26:29 UTC
[pulsar] branch branch-2.9 updated: Clean up individually deleted messages before the mark-delete position (#14261)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 1cea990 Clean up individually deleted messages before the mark-delete position (#14261)
1cea990 is described below
commit 1cea9909d695bf0c8a838f3ae600500d31371e95
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Sun Feb 13 21:22:38 2022 -0800
Clean up individually deleted messages before the mark-delete position (#14261)
---
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 17 +++++++++++++-
.../bookkeeper/mledger/impl/ManagedCursorTest.java | 26 ++++++++++++++++++++++
2 files changed, 42 insertions(+), 1 deletion(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 2f9578c..d37ecda 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1580,7 +1580,9 @@ public class ManagedCursorImpl implements ManagedCursor {
*/
PositionImpl setAcknowledgedPosition(PositionImpl newMarkDeletePosition) {
if (newMarkDeletePosition.compareTo(markDeletePosition) < 0) {
- throw new IllegalArgumentException("Mark deleting an already mark-deleted position");
+ throw new IllegalArgumentException(
+ "Mark deleting an already mark-deleted position. Current mark-delete: " + markDeletePosition
+ + " -- attempted mark delete: " + newMarkDeletePosition);
}
PositionImpl oldMarkDeletePosition = markDeletePosition;
@@ -2000,6 +2002,19 @@ public class ManagedCursorImpl implements ManagedCursor {
// mark-delete to the upper bound of the first range segment
Range<PositionImpl> range = individualDeletedMessages.firstRange();
+ // If the upper bound is before the mark-delete position, we need to move ahead as these
+ // individualDeletedMessages are now irrelevant
+ if (range.upperEndpoint().compareTo(markDeletePosition) <= 0) {
+ individualDeletedMessages.removeAtMost(markDeletePosition.getLedgerId(),
+ markDeletePosition.getEntryId());
+ range = individualDeletedMessages.firstRange();
+ }
+
+ if (range == null) {
+ // The set was completely cleaned up now
+ return;
+ }
+
// If the lowerBound is ahead of MarkDelete, verify if there are any entries in-between
if (range.lowerEndpoint().compareTo(markDeletePosition) <= 0 || ledger
.getNumberOfEntries(Range.openClosed(markDeletePosition, range.lowerEndpoint())) <= 0) {
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 2f80bc8..a00a817 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -88,6 +88,7 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
+import org.apache.pulsar.common.util.collections.LongPairRangeSet;
import org.apache.pulsar.metadata.api.extended.SessionEvent;
import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreException;
@@ -3508,6 +3509,31 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
}
@Test
+ public void testConsistencyOfIndividualMessages() throws Exception {
+ ManagedLedger ledger1 = factory.open("testConsistencyOfIndividualMessages");
+ ManagedCursorImpl c1 = (ManagedCursorImpl) ledger1.openCursor("c");
+
+ PositionImpl p1 = (PositionImpl) ledger1.addEntry(new byte[1024]);
+ c1.markDelete(p1);
+
+ // Artificially add a position that is before the current mark-delete position
+ LongPairRangeSet<PositionImpl> idm = c1.getIndividuallyDeletedMessagesSet();
+ idm.addOpenClosed(p1.getLedgerId() - 1, 0, p1.getLedgerId() - 1, 10);
+
+ List<Position> positions = new ArrayList<>();
+ for (int i = 0; i < 20; i++) {
+ positions.add(ledger1.addEntry(new byte[1024]));
+ }
+
+ for (int i = 0; i < 20; i++) {
+ c1.delete(positions.get(i));
+ }
+
+ assertEquals(c1.getTotalNonContiguousDeletedMessagesRange(), 0);
+ assertEquals(c1.getMarkDeletedPosition(), positions.get(positions.size() -1));
+ }
+
+ @Test
public void testCursorCheckReadPositionChanged() throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig());
ManagedCursor c1 = ledger.openCursor("c1");