You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/07/05 13:25:52 UTC

[pulsar] branch branch-2.7 updated: Fix issues in advanceNonDurableCursors (#10667)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new 1691e5b7228 Fix issues in advanceNonDurableCursors (#10667)
1691e5b7228 is described below

commit 1691e5b7228f20957567c6ed8ee114ecddbdbd91
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Tue May 25 09:28:24 2021 -0700

    Fix issues in advanceNonDurableCursors (#10667)
    
    Co-authored-by: Jerry Peng <je...@splunk.com>
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java    | 19 ++++++++++++++-----
 1 file changed, 14 insertions(+), 5 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index db500e34b4a..7bdc5c9ef11 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -2201,7 +2201,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
                 return;
             }
 
-            advanceNonDurableCursors(ledgersToDelete);
+            advanceCursorsIfNecessary(ledgersToDelete);
 
             PositionImpl currentLastConfirmedEntry = lastConfirmedEntry;
             // Update metadata
@@ -2270,20 +2270,29 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
 
     /**
      * Non-durable cursors have to be moved forward when data is trimmed since they are not retain that data.
+     * This method also addresses a corner case for durable cursors in which the cursor is caught up, i.e. the mark delete position
+     * happens to be the last entry in a ledger.  If the ledger is deleted, then subsequent calculations for backlog
+     * size may not be accurate since the method getNumberOfEntries we use in backlog calculation will not be able to fetch
+     * the ledger info of a deleted ledger.  Thus, we need to update the mark delete position to the "-1" entry of the first ledger
+     * that is not marked for deletion.
      * This is to make sure that the `consumedEntries` counter is correctly updated with the number of skipped
      * entries and the stats are reported correctly.
      */
-    private void advanceNonDurableCursors(List<LedgerInfo> ledgersToDelete) {
+    private void advanceCursorsIfNecessary(List<LedgerInfo> ledgersToDelete) {
         if (ledgersToDelete.isEmpty()) {
             return;
         }
 
-        long firstNonDeletedLedger = ledgers
-                .higherKey(ledgersToDelete.get(ledgersToDelete.size() - 1).getLedgerId());
+        // need to move mark delete for non-durable cursors to the first ledger NOT marked for deletion
+        // calling getNumberOfEntries latter for a ledger that is already deleted will be problematic and return incorrect results
+        long firstNonDeletedLedger = ledgers.higherKey(ledgersToDelete.get(ledgersToDelete.size() - 1).getLedgerId());
         PositionImpl highestPositionToDelete = new PositionImpl(firstNonDeletedLedger, -1);
 
         cursors.forEach(cursor -> {
-            if (highestPositionToDelete.compareTo((PositionImpl) cursor.getMarkDeletedPosition()) > 0) {
+            // move the mark delete position to the highestPositionToDelete only if it is smaller than the add confirmed
+            // to prevent the edge case where the cursor is caught up to the latest and highestPositionToDelete may be larger than the last add confirmed
+            if (highestPositionToDelete.compareTo((PositionImpl) cursor.getMarkDeletedPosition()) > 0
+                    && highestPositionToDelete.compareTo((PositionImpl) cursor.getManagedLedger().getLastConfirmedEntry()) <= 0 ) {
                 cursor.asyncMarkDelete(highestPositionToDelete, new MarkDeleteCallback() {
                     @Override
                     public void markDeleteComplete(Object ctx) {