You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/02/18 15:56:55 UTC

[pulsar] 01/27: Fix the batch index ack persistent issue. (#9504)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 900ba224c426cc83b61f6625a9e3e54f6ea2d7ba
Author: lipenghui <pe...@apache.org>
AuthorDate: Sun Feb 7 11:18:28 2021 +0800

    Fix the batch index ack persistent issue. (#9504)
    
    Fix the batch index ack persistent issue. Currently, the `batchDeletedIndexInfoBuilder` been reused for generating the batch index ack data, but the delete set does not been cleared before adding the delete set.
    
    Clear the delete set before add the new delete set.
    
    Test added. Without this fix, the test failed.
    
    (cherry picked from commit 7916666cd579438a5bc11e749e73d6e549d7e756)
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java       |  1 +
 .../bookkeeper/mledger/impl/ManagedCursorTest.java       | 16 ++++++++++++++--
 2 files changed, 15 insertions(+), 2 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index e9b96be..861b5ef 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -2471,6 +2471,7 @@ public class ManagedCursorImpl implements ManagedCursor {
             for (long l : array) {
                 deleteSet.add(l);
             }
+            batchDeletedIndexInfoBuilder.clearDeleteSet();
             batchDeletedIndexInfoBuilder.addAllDeleteSet(deleteSet);
             result.add(batchDeletedIndexInfoBuilder.build());
         }
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 256457c..8bf0e6d 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -3153,7 +3153,10 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
 
     @Test
     public void testBatchIndexesDeletionPersistAndRecover() throws ManagedLedgerException, InterruptedException {
-        ManagedLedger ledger = factory.open("test_batch_indexes_deletion_persistent");
+        ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
+        // Make sure the cursor metadata updated by the cursor ledger ID.
+        managedLedgerConfig.setMaxUnackedRangesToPersistInZk(-1);
+        ManagedLedger ledger = factory.open("test_batch_indexes_deletion_persistent", managedLedgerConfig);
         ManagedCursor cursor = ledger.openCursor("c1");
 
         final int totalEntries = 100;
@@ -3163,6 +3166,7 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
             positions[i] = ledger.addEntry(("entry-" + i).getBytes(Encoding));
         }
         assertEquals(cursor.getNumberOfEntries(), totalEntries);
+        deleteBatchIndex(cursor, positions[6], 10, Lists.newArrayList(IntRange.newBuilder().setStart(1).setEnd(3).build()));
         deleteBatchIndex(cursor, positions[5], 10, Lists.newArrayList(IntRange.newBuilder().setStart(3).setEnd(6).build()));
         deleteBatchIndex(cursor, positions[0], 10, Lists.newArrayList(IntRange.newBuilder().setStart(0).setEnd(9).build()));
         deleteBatchIndex(cursor, positions[1], 10, Lists.newArrayList(IntRange.newBuilder().setStart(0).setEnd(9).build()));
@@ -3170,8 +3174,11 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
         deleteBatchIndex(cursor, positions[3], 10, Lists.newArrayList(IntRange.newBuilder().setStart(0).setEnd(9).build()));
         deleteBatchIndex(cursor, positions[4], 10, Lists.newArrayList(IntRange.newBuilder().setStart(0).setEnd(9).build()));
 
-        ledger = factory.open("test_batch_indexes_deletion_persistent");
+        cursor.close();
+        ledger.close();
+        ledger = factory.open("test_batch_indexes_deletion_persistent", managedLedgerConfig);
         cursor = ledger.openCursor("c1");
+
         List<IntRange> deletedIndexes = getAckedIndexRange(cursor.getDeletedBatchIndexesAsLongArray((PositionImpl) positions[5]), 10);
         Assert.assertEquals(deletedIndexes.size(), 1);
         Assert.assertEquals(deletedIndexes.get(0).getStart(), 3);
@@ -3181,6 +3188,11 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
         deletedIndexes = getAckedIndexRange(cursor.getDeletedBatchIndexesAsLongArray((PositionImpl) positions[5]), 10);
         Assert.assertNull(deletedIndexes);
         Assert.assertEquals(cursor.getMarkDeletedPosition(), positions[5]);
+
+        deletedIndexes = getAckedIndexRange(cursor.getDeletedBatchIndexesAsLongArray((PositionImpl) positions[6]), 10);
+        Assert.assertEquals(deletedIndexes.size(), 1);
+        Assert.assertEquals(deletedIndexes.get(0).getStart(), 1);
+        Assert.assertEquals(deletedIndexes.get(0).getEnd(), 3);
     }
 
     private void deleteBatchIndex(ManagedCursor cursor, Position position, int batchSize,