You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/06/26 12:16:28 UTC

[pulsar] 01/04: [Broker] Fix the backlog issue with --precise-backlog=true (#10966)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 2f9b6721c579300e6b1db3f0c062e8dbe38d1925
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Tue Jun 22 07:31:53 2021 +0800

    [Broker] Fix the backlog issue with --precise-backlog=true (#10966)
    
    fix backlog issuse with --precise-backlog=true.
    Now when `managedLedger` create a new `ledger` complete. if `markDelete` is the `previousLedger` LAC it will delete the previousLedger from `managedLedger` . when get backlog we will use range.close to get `getNumberOfEntries` -1, if previousLedger not exist will get the wrong number.
    
    ![image](https://user-images.githubusercontent.com/39078850/122502847-fce47800-d029-11eb-81b3-abc9e595d93e.png)
    
     use range.openClose() to `getBacklog`.
    Add the tests for it
    
    (cherry picked from commit e3a97ee4caa80522663f5900b07ae331927de68e)
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |  4 +--
 .../bookkeeper/mledger/impl/ManagedCursorTest.java | 30 ++++++++++++++++++++++
 2 files changed, 32 insertions(+), 2 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index f732a75..8f671c5 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -858,13 +858,13 @@ public class ManagedCursorImpl implements ManagedCursor {
                     messagesConsumedCounter, markDeletePosition, readPosition);
         }
         if (isPrecise) {
-            return getNumberOfEntries(Range.closed(markDeletePosition, ledger.getLastPosition())) - 1;
+            return getNumberOfEntries(Range.openClosed(markDeletePosition, ledger.getLastPosition()));
         }
 
         long backlog = ManagedLedgerImpl.ENTRIES_ADDED_COUNTER_UPDATER.get(ledger) - messagesConsumedCounter;
         if (backlog < 0) {
             // In some case the counters get incorrect values, fall back to the precise backlog count
-            backlog = getNumberOfEntries(Range.closed(markDeletePosition, ledger.getLastPosition())) - 1;
+            backlog = getNumberOfEntries(Range.openClosed(markDeletePosition, ledger.getLastPosition()));
         }
 
         return backlog;
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 4f6e1a0..cfa7f0b 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -45,6 +45,8 @@ import java.util.List;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ExecutorService;
@@ -79,6 +81,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.VoidCallback;
 import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
@@ -3421,5 +3424,32 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
         dirtyFactory.shutdown();
     }
 
+    @Test
+    public void testCursorGetBacklog() throws Exception {
+        ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
+        managedLedgerConfig.setMaxEntriesPerLedger(2);
+        managedLedgerConfig.setMinimumRolloverTime(0, TimeUnit.MILLISECONDS);
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("get-backlog", managedLedgerConfig);
+        ManagedCursor managedCursor = ledger.openCursor("test");
+
+        PositionImpl position = (PositionImpl) ledger.addEntry("test".getBytes(Encoding));
+        ledger.addEntry("test".getBytes(Encoding));
+        PositionImpl position1 = (PositionImpl) ledger.addEntry("test".getBytes(Encoding));
+        ledger.addEntry("test".getBytes(Encoding));
+
+        Assert.assertEquals(managedCursor.getNumberOfEntriesInBacklog(true), 4);
+        Assert.assertEquals(managedCursor.getNumberOfEntriesInBacklog(false), 4);
+        Field field = ManagedLedgerImpl.class.getDeclaredField("ledgers");
+        field.setAccessible(true);
+
+        ((ConcurrentSkipListMap<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo>) field.get(ledger)).remove(position.ledgerId);
+        field = ManagedCursorImpl.class.getDeclaredField("markDeletePosition");
+        field.setAccessible(true);
+        field.set(managedCursor, PositionImpl.get(position1.ledgerId, -1));
+
+
+        Assert.assertEquals(managedCursor.getNumberOfEntriesInBacklog(true), 2);
+        Assert.assertEquals(managedCursor.getNumberOfEntriesInBacklog(false), 4);
+    }
     private static final Logger log = LoggerFactory.getLogger(ManagedCursorTest.class);
 }