You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/07/05 13:25:52 UTC
[pulsar] branch branch-2.7 updated: Fix issues in advanceNonDurableCursors (#10667)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new 1691e5b7228 Fix issues in advanceNonDurableCursors (#10667)
1691e5b7228 is described below
commit 1691e5b7228f20957567c6ed8ee114ecddbdbd91
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Tue May 25 09:28:24 2021 -0700
Fix issues in advanceNonDurableCursors (#10667)
Co-authored-by: Jerry Peng <je...@splunk.com>
---
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 19 ++++++++++++++-----
1 file changed, 14 insertions(+), 5 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index db500e34b4a..7bdc5c9ef11 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -2201,7 +2201,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
return;
}
- advanceNonDurableCursors(ledgersToDelete);
+ advanceCursorsIfNecessary(ledgersToDelete);
PositionImpl currentLastConfirmedEntry = lastConfirmedEntry;
// Update metadata
@@ -2270,20 +2270,29 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
/**
* Non-durable cursors have to be moved forward when data is trimmed since they are not retain that data.
+ * This method also addresses a corner case for durable cursors in which the cursor is caught up, i.e. the mark delete position
+ * happens to be the last entry in a ledger. If the ledger is deleted, then subsequent calculations for backlog
+ * size may not be accurate since the method getNumberOfEntries we use in backlog calculation will not be able to fetch
+ * the ledger info of a deleted ledger. Thus, we need to update the mark delete position to the "-1" entry of the first ledger
+ * that is not marked for deletion.
* This is to make sure that the `consumedEntries` counter is correctly updated with the number of skipped
* entries and the stats are reported correctly.
*/
- private void advanceNonDurableCursors(List<LedgerInfo> ledgersToDelete) {
+ private void advanceCursorsIfNecessary(List<LedgerInfo> ledgersToDelete) {
if (ledgersToDelete.isEmpty()) {
return;
}
- long firstNonDeletedLedger = ledgers
- .higherKey(ledgersToDelete.get(ledgersToDelete.size() - 1).getLedgerId());
+ // need to move mark delete for non-durable cursors to the first ledger NOT marked for deletion
+ // calling getNumberOfEntries latter for a ledger that is already deleted will be problematic and return incorrect results
+ long firstNonDeletedLedger = ledgers.higherKey(ledgersToDelete.get(ledgersToDelete.size() - 1).getLedgerId());
PositionImpl highestPositionToDelete = new PositionImpl(firstNonDeletedLedger, -1);
cursors.forEach(cursor -> {
- if (highestPositionToDelete.compareTo((PositionImpl) cursor.getMarkDeletedPosition()) > 0) {
+ // move the mark delete position to the highestPositionToDelete only if it is smaller than the add confirmed
+ // to prevent the edge case where the cursor is caught up to the latest and highestPositionToDelete may be larger than the last add confirmed
+ if (highestPositionToDelete.compareTo((PositionImpl) cursor.getMarkDeletedPosition()) > 0
+ && highestPositionToDelete.compareTo((PositionImpl) cursor.getManagedLedger().getLastConfirmedEntry()) <= 0 ) {
cursor.asyncMarkDelete(highestPositionToDelete, new MarkDeleteCallback() {
@Override
public void markDeleteComplete(Object ctx) {