You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/10/31 06:14:23 UTC

[pulsar] 01/05: When accumulating acks, update the batch index in batchDeletedIndexes and check whether it is greater than the batch index of the previous ack (#18042)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 043cb9fdee098f962c9940995fc57826d2db27d6
Author: LinChen <15...@qq.com>
AuthorDate: Fri Oct 21 18:22:02 2022 +0800

    When accumulating acks, update the batch index in batchDeletedIndexes and check whether it is greater than the batch index of the previous ack (#18042)
    
    Co-authored-by: leolinchen <le...@tencent.com>
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 22 +++++++-
 .../bookkeeper/mledger/impl/ManagedCursorTest.java | 58 ++++++++++++++++++++++
 2 files changed, 79 insertions(+), 1 deletion(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 6de68580826..ed861e6830f 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1828,7 +1828,27 @@ public class ManagedCursorImpl implements ManagedCursor {
 
         if (config.isDeletionAtBatchIndexLevelEnabled() && batchDeletedIndexes != null) {
             if (newPosition.ackSet != null) {
-                batchDeletedIndexes.put(newPosition, BitSetRecyclable.create().resetWords(newPosition.ackSet));
+                AtomicReference<BitSetRecyclable> bitSetRecyclable = new AtomicReference<>();
+                BitSetRecyclable givenBitSet = BitSetRecyclable.create().resetWords(newPosition.ackSet);
+                // In order to prevent the batch index recorded in batchDeletedIndexes from rolling back,
+                // only update batchDeletedIndexes when the submitted batch index is greater
+                // than the recorded index.
+                batchDeletedIndexes.compute(newPosition,
+                        (k, v) -> {
+                            if (v == null) {
+                                return givenBitSet;
+                            }
+                            if (givenBitSet.nextSetBit(0) > v.nextSetBit(0)) {
+                                bitSetRecyclable.set(v);
+                                return givenBitSet;
+                            } else {
+                                bitSetRecyclable.set(givenBitSet);
+                                return v;
+                            }
+                        });
+                if (bitSetRecyclable.get() != null) {
+                    bitSetRecyclable.get().recycle();
+                }
                 newPosition = ledger.getPreviousPosition(newPosition);
             }
             Map<PositionImpl, BitSetRecyclable> subMap = batchDeletedIndexes.subMap(PositionImpl.EARLIEST, newPosition);
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 052f6ac2d54..0e66e76d5c3 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -93,6 +93,8 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
+import org.apache.pulsar.common.api.proto.IntRange;
+import org.apache.pulsar.common.util.collections.BitSetRecyclable;
 import org.apache.pulsar.common.util.collections.LongPairRangeSet;
 import org.apache.pulsar.metadata.api.extended.SessionEvent;
 import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore;
@@ -3346,6 +3348,37 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
         assertEquals(c1.getReadPosition(), positions[markDelete + 1]);
     }
 
+    @Test
+    public void testBatchIndexMarkdelete() throws ManagedLedgerException, InterruptedException {
+        ManagedLedger ledger = factory.open("test_batch_index_delete");
+        ManagedCursor cursor = ledger.openCursor("c1");
+
+        final int totalEntries = 100;
+        final Position[] positions = new Position[totalEntries];
+        for (int i = 0; i < totalEntries; i++) {
+            // add entry
+            positions[i] = ledger.addEntry(("entry-" + i).getBytes(Encoding));
+        }
+        assertEquals(cursor.getNumberOfEntries(), totalEntries);
+        markDeleteBatchIndex(cursor, positions[0], 10, 3);
+        List<IntRange> deletedIndexes = getAckedIndexRange(cursor.getDeletedBatchIndexesAsLongArray((PositionImpl) positions[0]), 10);
+        Assert.assertEquals(1, deletedIndexes.size());
+        Assert.assertEquals(0, deletedIndexes.get(0).getStart());
+        Assert.assertEquals(3, deletedIndexes.get(0).getEnd());
+
+        markDeleteBatchIndex(cursor, positions[0], 10, 4);
+        deletedIndexes = getAckedIndexRange(cursor.getDeletedBatchIndexesAsLongArray((PositionImpl) positions[0]), 10);
+        Assert.assertEquals(1, deletedIndexes.size());
+        Assert.assertEquals(0, deletedIndexes.get(0).getStart());
+        Assert.assertEquals(4, deletedIndexes.get(0).getEnd());
+
+        markDeleteBatchIndex(cursor, positions[0], 10, 2);
+        deletedIndexes = getAckedIndexRange(cursor.getDeletedBatchIndexesAsLongArray((PositionImpl) positions[0]), 10);
+        Assert.assertEquals(1, deletedIndexes.size());
+        Assert.assertEquals(0, deletedIndexes.get(0).getStart());
+        Assert.assertEquals(4, deletedIndexes.get(0).getEnd());
+    }
+
     @Test
     public void testBatchIndexDelete() throws ManagedLedgerException, InterruptedException {
         ManagedLedger ledger = factory.open("test_batch_index_delete");
@@ -3477,6 +3510,31 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
         pos.ackSet = null;
     }
 
+    private void markDeleteBatchIndex(ManagedCursor cursor, Position position, int batchSize, int batchIndex
+    ) throws InterruptedException {
+        CountDownLatch latch = new CountDownLatch(1);
+        PositionImpl pos = (PositionImpl) position;
+        BitSetRecyclable bitSet = new BitSetRecyclable();
+        bitSet.set(0, batchSize);
+        bitSet.clear(0, batchIndex + 1);
+
+        pos.ackSet = bitSet.toLongArray();
+
+        cursor.asyncMarkDelete(pos, new MarkDeleteCallback() {
+            @Override
+            public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
+                latch.countDown();
+            }
+
+            @Override
+            public void markDeleteComplete(Object ctx) {
+                latch.countDown();
+            }
+        }, null);
+        latch.await();
+        pos.ackSet = null;
+    }
+
     private List<IntRange> getAckedIndexRange(long[] bitSetLongArray, int batchSize) {
         if (bitSetLongArray == null) {
             return null;