You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/10/31 06:14:23 UTC
[pulsar] 01/05: When accumulating acks, update the batch index in batchDeletedIndexes and check whether it is greater than the batch index of the previous ack (#18042)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 043cb9fdee098f962c9940995fc57826d2db27d6
Author: LinChen <15...@qq.com>
AuthorDate: Fri Oct 21 18:22:02 2022 +0800
When accumulating acks, update the batch index in batchDeletedIndexes and check whether it is greater than the batch index of the previous ack (#18042)
Co-authored-by: leolinchen <le...@tencent.com>
---
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 22 +++++++-
.../bookkeeper/mledger/impl/ManagedCursorTest.java | 58 ++++++++++++++++++++++
2 files changed, 79 insertions(+), 1 deletion(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 6de68580826..ed861e6830f 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1828,7 +1828,27 @@ public class ManagedCursorImpl implements ManagedCursor {
if (config.isDeletionAtBatchIndexLevelEnabled() && batchDeletedIndexes != null) {
if (newPosition.ackSet != null) {
- batchDeletedIndexes.put(newPosition, BitSetRecyclable.create().resetWords(newPosition.ackSet));
+ AtomicReference<BitSetRecyclable> bitSetRecyclable = new AtomicReference<>();
+ BitSetRecyclable givenBitSet = BitSetRecyclable.create().resetWords(newPosition.ackSet);
+ // In order to prevent the batch index recorded in batchDeletedIndexes from rolling back,
+ // only update batchDeletedIndexes when the submitted batch index is greater
+ // than the recorded index.
+ batchDeletedIndexes.compute(newPosition,
+ (k, v) -> {
+ if (v == null) {
+ return givenBitSet;
+ }
+ if (givenBitSet.nextSetBit(0) > v.nextSetBit(0)) {
+ bitSetRecyclable.set(v);
+ return givenBitSet;
+ } else {
+ bitSetRecyclable.set(givenBitSet);
+ return v;
+ }
+ });
+ if (bitSetRecyclable.get() != null) {
+ bitSetRecyclable.get().recycle();
+ }
newPosition = ledger.getPreviousPosition(newPosition);
}
Map<PositionImpl, BitSetRecyclable> subMap = batchDeletedIndexes.subMap(PositionImpl.EARLIEST, newPosition);
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 052f6ac2d54..0e66e76d5c3 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -93,6 +93,8 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
+import org.apache.pulsar.common.api.proto.IntRange;
+import org.apache.pulsar.common.util.collections.BitSetRecyclable;
import org.apache.pulsar.common.util.collections.LongPairRangeSet;
import org.apache.pulsar.metadata.api.extended.SessionEvent;
import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore;
@@ -3346,6 +3348,37 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
assertEquals(c1.getReadPosition(), positions[markDelete + 1]);
}
+ @Test
+ public void testBatchIndexMarkdelete() throws ManagedLedgerException, InterruptedException {
+ ManagedLedger ledger = factory.open("test_batch_index_delete");
+ ManagedCursor cursor = ledger.openCursor("c1");
+
+ final int totalEntries = 100;
+ final Position[] positions = new Position[totalEntries];
+ for (int i = 0; i < totalEntries; i++) {
+ // add entry
+ positions[i] = ledger.addEntry(("entry-" + i).getBytes(Encoding));
+ }
+ assertEquals(cursor.getNumberOfEntries(), totalEntries);
+ markDeleteBatchIndex(cursor, positions[0], 10, 3);
+ List<IntRange> deletedIndexes = getAckedIndexRange(cursor.getDeletedBatchIndexesAsLongArray((PositionImpl) positions[0]), 10);
+ Assert.assertEquals(1, deletedIndexes.size());
+ Assert.assertEquals(0, deletedIndexes.get(0).getStart());
+ Assert.assertEquals(3, deletedIndexes.get(0).getEnd());
+
+ markDeleteBatchIndex(cursor, positions[0], 10, 4);
+ deletedIndexes = getAckedIndexRange(cursor.getDeletedBatchIndexesAsLongArray((PositionImpl) positions[0]), 10);
+ Assert.assertEquals(1, deletedIndexes.size());
+ Assert.assertEquals(0, deletedIndexes.get(0).getStart());
+ Assert.assertEquals(4, deletedIndexes.get(0).getEnd());
+
+ markDeleteBatchIndex(cursor, positions[0], 10, 2);
+ deletedIndexes = getAckedIndexRange(cursor.getDeletedBatchIndexesAsLongArray((PositionImpl) positions[0]), 10);
+ Assert.assertEquals(1, deletedIndexes.size());
+ Assert.assertEquals(0, deletedIndexes.get(0).getStart());
+ Assert.assertEquals(4, deletedIndexes.get(0).getEnd());
+ }
+
@Test
public void testBatchIndexDelete() throws ManagedLedgerException, InterruptedException {
ManagedLedger ledger = factory.open("test_batch_index_delete");
@@ -3477,6 +3510,31 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
pos.ackSet = null;
}
+ private void markDeleteBatchIndex(ManagedCursor cursor, Position position, int batchSize, int batchIndex
+ ) throws InterruptedException {
+ CountDownLatch latch = new CountDownLatch(1);
+ PositionImpl pos = (PositionImpl) position;
+ BitSetRecyclable bitSet = new BitSetRecyclable();
+ bitSet.set(0, batchSize);
+ bitSet.clear(0, batchIndex + 1);
+
+ pos.ackSet = bitSet.toLongArray();
+
+ cursor.asyncMarkDelete(pos, new MarkDeleteCallback() {
+ @Override
+ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
+ latch.countDown();
+ }
+
+ @Override
+ public void markDeleteComplete(Object ctx) {
+ latch.countDown();
+ }
+ }, null);
+ latch.await();
+ pos.ackSet = null;
+ }
+
private List<IntRange> getAckedIndexRange(long[] bitSetLongArray, int batchSize) {
if (bitSetLongArray == null) {
return null;