You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/04/01 09:56:01 UTC

[pulsar] 02/02: [Broker] Fix NonDurableCursorImpl initialPosition by startCursorPosition greater than lastConfirmedEntry problem. (#10095)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 925fb1da99a0a9daea894b9c2b1d722d54dabcab
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Thu Apr 1 16:18:00 2021 +0800

    [Broker] Fix NonDurableCursorImpl initialPosition by startCursorPosition greater than lastConfirmedEntry problem. (#10095)
    
    ## Motivation
    In order to fix reader set ```startMessageId``` greater than ```lastConfirmedEntry``` probelm.
    
    When the reader set ```startMessageId``` greater than ```lastConfirmedEntry```, generate ```NonDurableCursorImpl``` will carry a ```MarkdeletePosition``` greater than lastConfirmedEntry.
    When use ```getNumberOfEntriesInBacklog``` will throw
    ![image](https://user-images.githubusercontent.com/39078850/113100735-5f4d8980-922e-11eb-8758-7d3cfb1435f3.png)
    
    
    ## implement
    When ```startMessageId``` greater than ```lastConfirmedEntry``` use ```lastConfirmedEntry``` as ```markdeletePosition```
    ### Verifying this change
    Add the tests for it
    
    Does this pull request potentially affect one of the following parts:
    If yes was chosen, please highlight the changes
    
    Dependencies (does it add or upgrade a dependency): (no)
    The public API: (no)
    The schema: (no)
    The default values of configurations: (no)
    The wire protocol: (no)
    The rest endpoints: (no)
    The admin cli options: (no)
    Anything that affects deployment: (no)
    
    
    (cherry picked from commit 7d84e72a08e3baca8cb5fdbfc4bbfc3a1ae3ec92)
---
 .../org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java   | 2 +-
 .../org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java   | 7 +++++++
 2 files changed, 8 insertions(+), 1 deletion(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
index 6d1bdfb..167bcec 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
@@ -40,7 +40,7 @@ public class NonDurableCursorImpl extends ManagedCursorImpl {
         // Compare with "latest" position marker by using only the ledger id. Since the C++ client is using 48bits to
         // store the entryId, it's not able to pass a Long.max() as entryId. In this case there's no point to require
         // both ledgerId and entryId to be Long.max()
-        if (startCursorPosition == null || startCursorPosition.getLedgerId() == PositionImpl.latest.getLedgerId()) {
+        if (startCursorPosition == null || startCursorPosition.compareTo(ledger.lastConfirmedEntry) > 0) {
             // Start from last entry
             switch (initialPosition) {
                 case Latest:
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
index 7a15035..3d806c9 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
@@ -323,6 +323,13 @@ public class NonDurableCursorTest extends MockedBookKeeperTestCase {
     }
 
     @Test(timeOut = 20000)
+    void markDeleteGreaterThanLastConfirmedEntry() throws Exception {
+        ManagedLedger ml1 = factory.open("my_test_ledger");
+        ManagedCursor mc1 = ml1.newNonDurableCursor(PositionImpl.get(Long.MAX_VALUE - 1, Long.MAX_VALUE - 1));
+        assertEquals(mc1.getMarkDeletedPosition(), ml1.getLastConfirmedEntry());
+    }
+
+    @Test(timeOut = 20000)
     void testResetCursor() throws Exception {
         ManagedLedger ledger = factory.open("my_test_move_cursor_ledger",
                 new ManagedLedgerConfig().setMaxEntriesPerLedger(10));