You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by "poorbarcode (via GitHub)" <gi...@apache.org> on 2023/05/25 09:24:48 UTC

[GitHub] [pulsar] poorbarcode opened a new pull request, #18620: [fix][broker] If ledger lost, cursor mark delete position can not forward

poorbarcode opened a new pull request, #18620:
URL: https://github.com/apache/pulsar/pull/18620

   ### Motivation
   Configuration `autoSkipNonRecoverableData` is designed to turn this feature on if we can accept partial data loss. When a ledger is lost, the broker will still work. But now we have this problem: If a ledger is lost, consumer and producer can work, but the cursor mark delete position can not forward.
   
   For example: 
   
   1. `topic-1` has 3 ledgers:
   ```
   {
     ledegr-1 : [1:1],[1:2],[1:3],[1:4],[1:5]
     ledegr-2: [2:1],[2:2],[2:3],[2:4],[2:5]
     ledegr-3: [3:1],[3:2],[3:3],[3:4],[3:5]
   }
   ```
   2.  individual ack `[1:1], [2:5]`, then the cursor.markDeletedPosition will be `[1:1]`, and `individualDeletedMessages` has one position `[2:5]`.
   3.  We've lost `ledger-2`.
   4. <strong>High light</strong>Then the consumers and producers can works ok. But the `markDeletedPosition` stay at `[1:5]`, and even calls  `unload topic` will not solve this problem 
   
   The root cause is: When an unrecoverable ledger is found, the read position of the cursor is simply moved forward, but `individualDeletedMessages` and `batchDeletedIndexes` are ignored.
   
   ### Modifications
   - When an unrecoverable ledger is found, remove the records in`individualDeletedMessages` and `batchDeletedIndexes`.
   - When the managed cursor is recovered, check whether there are invalid records in `individualDeletedMessages` and `batchDeletedIndexes` and print a warning log.
   
   ### Documentation
   
   <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
   
   - [ ] `doc` <!-- Your PR contains doc changes. Please attach the local preview screenshots (run `sh start.sh` at `pulsar/site2/website`) to your PR description, or else your PR might not get merged. -->
   - [ ] `doc-required` <!-- Your PR changes impact docs and you will update later -->
   - [x] `doc-not-needed` <!-- Your PR changes do not impact docs -->
   - [ ] `doc-complete` <!-- Docs have been already added -->
   
   ### Matching PR in forked repository
   
   PR in forked repository: 
   - https://github.com/poorbarcode/pulsar/pull/44
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #18620: [fix][broker] If ledger lost, cursor mark delete position can not forward

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode commented on code in PR #18620:
URL: https://github.com/apache/pulsar/pull/18620#discussion_r1194560920


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -2661,6 +2708,34 @@ void setReadPosition(Position newReadPositionInt) {
         }
     }
 
+    void clearIncompleteAckedRecordsByLedgerId(final long ledgerId){

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #18620: [fix] [broker] If ledger lost, cursor makr delete position can not forward

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #18620:
URL: https://github.com/apache/pulsar/pull/18620#discussion_r1031918767


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -1729,6 +1729,18 @@ synchronized void ledgerClosed(final LedgerHandle lh) {
         }
     }
 
+    @Override
+    public void removeNonRecoverableLedger(long ledgerId){
+        ledgers.remove(ledgerId);
+        Iterator<ManagedCursor> managedCursorIterator = cursors.iterator();
+        while (managedCursorIterator.hasNext()){
+            ManagedCursor managedCursor = managedCursorIterator.next();
+            if (managedCursor instanceof ManagedCursorImpl managedCursorImpl){
+                managedCursorImpl.clearIncompleteAckedRecordsByLedgerId(ledgerId);

Review Comment:
   Using interface implementations may break encapsulation, but I'm not sure it's justified here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #18620: [fix][broker] If ledger lost, cursor mark delete position can not forward

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode commented on code in PR #18620:
URL: https://github.com/apache/pulsar/pull/18620#discussion_r1205216997


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -2718,6 +2719,72 @@ void setReadPosition(Position newReadPositionInt) {
         }
     }
 
+    /**
+     * Manually acknowledge all entries in the lost ledger.
+     * - Since this is an uncommon event, we focus on maintainability. So we do not modify
+     *   {@link #individualDeletedMessages} and {@link #batchDeletedIndexes}, but call
+     *   {@link #asyncDelete(Position, AsyncCallbacks.DeleteCallback, Object)}.
+     * - This method is valid regardless of the consumer ACK type.
+     * - If there is a consumer ack request after this event, it will also work.
+     */
+    @Override
+    public void noticeNonRecoverableLedgerSkipped(final long ledgerId){
+        LedgerInfo ledgerInfo = ledger.getLedgersInfo().get(ledgerId);
+        if (ledgerInfo == null) {
+            return;
+        }
+        lock.writeLock().lock();
+        log.warn("[{}] [{}] Since the ledger [{}] is lost and the autoSkipNonRecoverableData is true, this ledger will"
+                + " be auto acknowledge in subscription", ledger.getName(), name, ledgerId);
+        try {
+            List<Position> positionsToAck = new ArrayList<>();
+            for (int i = 0; i < ledgerInfo.getEntries(); i++) {
+                if (!individualDeletedMessages.contains(ledgerId, i)) {
+                    positionsToAck.add(PositionImpl.get(ledgerId, i));
+                }
+                // Acknowledge in segments to avoid OOM.
+                if (positionsToAck.size() >= 1000) {
+                    retryToAcknowledgeNonRecoverablePositions(positionsToAck, 1);
+                    positionsToAck = new ArrayList<>();
+                }

Review Comment:
   > I mean the asyncDelete method is also doing the same thing.
   > - Iterate the positions then update the memory state one by one.
   > - The mark delete rate limiter will update the changes to the storage layer in batch
   
   I think you are right, but the mechanism provided by the rate limiter can not save the logic(calculate the new markDeletePosition) below
   - https://github.com/apache/pulsar/blob/master/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java#L2313-L2328
   - https://github.com/apache/pulsar/blob/master/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java#L1819-L1884
   
   Since the event ledger loss rarely happens, I feel your suggestion is better.
   
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #18620: [fix][broker] If ledger lost, cursor mark delete position can not forward

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode commented on code in PR #18620:
URL: https://github.com/apache/pulsar/pull/18620#discussion_r1205153703


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -2718,6 +2719,72 @@ void setReadPosition(Position newReadPositionInt) {
         }
     }
 
+    /**
+     * Manually acknowledge all entries in the lost ledger.
+     * - Since this is an uncommon event, we focus on maintainability. So we do not modify
+     *   {@link #individualDeletedMessages} and {@link #batchDeletedIndexes}, but call
+     *   {@link #asyncDelete(Position, AsyncCallbacks.DeleteCallback, Object)}.
+     * - This method is valid regardless of the consumer ACK type.
+     * - If there is a consumer ack request after this event, it will also work.
+     */
+    @Override
+    public void noticeNonRecoverableLedgerSkipped(final long ledgerId){
+        LedgerInfo ledgerInfo = ledger.getLedgersInfo().get(ledgerId);
+        if (ledgerInfo == null) {
+            return;
+        }
+        lock.writeLock().lock();
+        log.warn("[{}] [{}] Since the ledger [{}] is lost and the autoSkipNonRecoverableData is true, this ledger will"
+                + " be auto acknowledge in subscription", ledger.getName(), name, ledgerId);
+        try {
+            List<Position> positionsToAck = new ArrayList<>();
+            for (int i = 0; i < ledgerInfo.getEntries(); i++) {
+                if (!individualDeletedMessages.contains(ledgerId, i)) {
+                    positionsToAck.add(PositionImpl.get(ledgerId, i));
+                }
+                // Acknowledge in segments to avoid OOM.
+                if (positionsToAck.size() >= 1000) {
+                    retryToAcknowledgeNonRecoverablePositions(positionsToAck, 1);
+                    positionsToAck = new ArrayList<>();
+                }

Review Comment:
   - Batch deletion has these two benefits
     -  reduce the cost of the range of `individualDeletedMessages` merging
     - It doesn't move `markDeletePosition` very much
   - Because we use batch deletions, we need to avoid oom, so delete in small batches( I've seen there are 100millis of entries in a ledger)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #18620: [fix][broker] If ledger lost, cursor mark delete position can not forward

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode commented on code in PR #18620:
URL: https://github.com/apache/pulsar/pull/18620#discussion_r1194560654


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java:
##########
@@ -117,6 +119,9 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
                 return;
             }
             updateReadPosition(nexReadPosition);
+            if (lostLedger != null) {
+                clearIncompleteAckedRecordsFromLedger(lostLedger);

Review Comment:
   fixed



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -1729,6 +1729,18 @@ synchronized void ledgerClosed(final LedgerHandle lh) {
         }
     }
 
+    @Override
+    public void removeNonRecoverableLedger(long ledgerId){
+        ledgers.remove(ledgerId);
+        Iterator<ManagedCursor> managedCursorIterator = cursors.iterator();
+        while (managedCursorIterator.hasNext()){
+            ManagedCursor managedCursor = managedCursorIterator.next();
+            if (managedCursor instanceof ManagedCursorImpl managedCursorImpl){
+                managedCursorImpl.clearIncompleteAckedRecordsByLedgerId(ledgerId);

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #18620: [fix] [broker] If ledger lost, cursor makr delete position can not forward

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #18620:
URL: https://github.com/apache/pulsar/pull/18620#discussion_r1031913744


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java:
##########
@@ -102,9 +102,11 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
                     readPosition, exception.getMessage());
             final ManagedLedgerImpl ledger = (ManagedLedgerImpl) cursor.getManagedLedger();
             Position nexReadPosition;
+            Long lostLedger = null;
             if (exception instanceof ManagedLedgerException.LedgerNotExistException) {
                 // try to find and move to next valid ledger
                 nexReadPosition = cursor.getNextLedgerPosition(readPosition.getLedgerId());
+                lostLedger = readPosition.ledgerId;
             } else {
                 // Skip this read operation
                 nexReadPosition = ledger.getValidPositionAfterSkippedEntries(readPosition, count);

Review Comment:
   I think we also need to check here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #18620: [fix] [broker] If ledger lost, cursor makr delete position can not forward

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #18620:
URL: https://github.com/apache/pulsar/pull/18620#discussion_r1031916835


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -2661,6 +2708,34 @@ void setReadPosition(Position newReadPositionInt) {
         }
     }
 
+    void clearIncompleteAckedRecordsByLedgerId(final long ledgerId){
+        lock.writeLock().lock();
+        try {
+            List<Range> rangeListToDelete = individualDeletedMessages.asRanges().stream()
+                    .filter(range -> range.lowerEndpoint().getLedgerId() == ledgerId)
+                    .map(range -> Range.openClosed(
+                            new LongPairRangeSet.LongPair(ledgerId, range.lowerEndpoint().getEntryId()),
+                            new LongPairRangeSet.LongPair(ledgerId, range.upperEndpoint().getEntryId())

Review Comment:
   `range.upperEndpoint().getEntryId() -1` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #18620: [fix][broker] If ledger lost, cursor mark delete position can not forward

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #18620:
URL: https://github.com/apache/pulsar/pull/18620#discussion_r1033548960


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -480,7 +482,24 @@ public void operationComplete(ManagedCursorInfo info, Stat stat) {
                     // Need to proceed and read the last entry in the specified ledger to find out the last position
                     log.info("[{}] Consumer {} meta-data recover from ledger {}", ledger.getName(), name,
                             info.getCursorsLedgerId());
-                    recoverFromLedger(info, callback);
+                    recoverFromLedger(info, new VoidCallback() {
+                        @Override
+                        public void operationComplete() {
+                            LinkedHashSet<Long> ledgersDeletedButNotAcked = checkLedgersDeletedButNotAcked();
+                            if (!ledgersDeletedButNotAcked.isEmpty()){
+                                log.warn("[{}] ledgers {} has deleted from topic, but still in the records of"
+                                                + " incomplete-ack {}. If it is not cleaned up in time, it will cause"
+                                                + " the mark deleted position to not move forward.",
+                                        ledger.getName(), ledgersDeletedButNotAcked, name);

Review Comment:
   We can know the topic by `ledger.getName()` and the subscription by `cursor.this.name`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codecov-commenter commented on pull request #18620: [fix][broker] If ledger lost, cursor mark delete position can not forward

Posted by "codecov-commenter (via GitHub)" <gi...@apache.org>.
codecov-commenter commented on PR #18620:
URL: https://github.com/apache/pulsar/pull/18620#issuecomment-1566638402

   ## [Codecov](https://app.codecov.io/gh/apache/pulsar/pull/18620?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) Report
   > Merging [#18620](https://app.codecov.io/gh/apache/pulsar/pull/18620?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) (0274113) into [master](https://app.codecov.io/gh/apache/pulsar/commit/2ebb3797c3f371c3ca22cbc8002a8110e3e3fa47?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) (2ebb379) will **increase** coverage by `35.92%`.
   > The diff coverage is `73.91%`.
   
   [![Impacted file tree graph](https://app.codecov.io/gh/apache/pulsar/pull/18620/graphs/tree.svg?width=650&height=150&src=pr&token=acYqCpsK9J&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)](https://app.codecov.io/gh/apache/pulsar/pull/18620?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
   
   ```diff
   @@              Coverage Diff              @@
   ##             master   #18620       +/-   ##
   =============================================
   + Coverage     36.97%   72.90%   +35.92%     
   - Complexity    12043    31890    +19847     
   =============================================
     Files          1687     1865      +178     
     Lines        128760   138439     +9679     
     Branches      14003    15192     +1189     
   =============================================
   + Hits          47612   100925    +53313     
   + Misses        74851    29496    -45355     
   - Partials       6297     8018     +1721     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | inttests | `24.23% <0.00%> (-0.08%)` | :arrow_down: |
   | systests | `24.98% <0.00%> (-0.10%)` | :arrow_down: |
   | unittests | `72.20% <73.91%> (+40.23%)` | :arrow_up: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://app.codecov.io/gh/apache/pulsar/pull/18620?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Coverage Δ | |
   |---|---|---|
   | [...a/org/apache/bookkeeper/mledger/ManagedCursor.java](https://app.codecov.io/gh/apache/pulsar/pull/18620?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-bWFuYWdlZC1sZWRnZXIvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2Jvb2trZWVwZXIvbWxlZGdlci9NYW5hZ2VkQ3Vyc29yLmphdmE=) | `42.85% <0.00%> (-3.30%)` | :arrow_down: |
   | [...a/org/apache/bookkeeper/mledger/ManagedLedger.java](https://app.codecov.io/gh/apache/pulsar/pull/18620?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-bWFuYWdlZC1sZWRnZXIvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2Jvb2trZWVwZXIvbWxlZGdlci9NYW5hZ2VkTGVkZ2VyLmphdmE=) | `0.00% <0.00%> (ø)` | |
   | [...rg/apache/bookkeeper/mledger/impl/OpReadEntry.java](https://app.codecov.io/gh/apache/pulsar/pull/18620?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-bWFuYWdlZC1sZWRnZXIvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2Jvb2trZWVwZXIvbWxlZGdlci9pbXBsL09wUmVhZEVudHJ5LmphdmE=) | `84.31% <75.00%> (+15.94%)` | :arrow_up: |
   | [...che/bookkeeper/mledger/impl/ManagedCursorImpl.java](https://app.codecov.io/gh/apache/pulsar/pull/18620?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-bWFuYWdlZC1sZWRnZXIvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2Jvb2trZWVwZXIvbWxlZGdlci9pbXBsL01hbmFnZWRDdXJzb3JJbXBsLmphdmE=) | `78.85% <76.92%> (+33.26%)` | :arrow_up: |
   | [...che/bookkeeper/mledger/impl/ManagedLedgerImpl.java](https://app.codecov.io/gh/apache/pulsar/pull/18620?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-bWFuYWdlZC1sZWRnZXIvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2Jvb2trZWVwZXIvbWxlZGdlci9pbXBsL01hbmFnZWRMZWRnZXJJbXBsLmphdmE=) | `81.19% <100.00%> (+32.01%)` | :arrow_up: |
   
   ... and [1428 files with indirect coverage changes](https://app.codecov.io/gh/apache/pulsar/pull/18620/indirect-changes?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode merged pull request #18620: [fix][broker] If ledger lost, cursor mark delete position can not forward

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode merged PR #18620:
URL: https://github.com/apache/pulsar/pull/18620


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #18620: [fix] [broker] If ledger lost, cursor makr delete position can not forward

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #18620:
URL: https://github.com/apache/pulsar/pull/18620#discussion_r1031914094


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -2661,6 +2708,34 @@ void setReadPosition(Position newReadPositionInt) {
         }
     }
 
+    void clearIncompleteAckedRecordsByLedgerId(final long ledgerId){
+        lock.writeLock().lock();
+        try {
+            List<Range> rangeListToDelete = individualDeletedMessages.asRanges().stream()
+                    .filter(range -> range.lowerEndpoint().getLedgerId() == ledgerId)
+                    .map(range -> Range.openClosed(
+                            new LongPairRangeSet.LongPair(ledgerId, range.lowerEndpoint().getEntryId()),
+                            new LongPairRangeSet.LongPair(ledgerId, range.upperEndpoint().getEntryId())
+                    )).collect(Collectors.toList());
+            if (!rangeListToDelete.isEmpty()) {
+                rangeListToDelete.forEach(individualDeletedMessages::remove);
+            }
+
+            if (batchDeletedIndexes != null) {
+                Set<PositionImpl> batchedIndexesToDelete = batchDeletedIndexes.keySet().stream()
+                        .filter(position -> position.getLedgerId() == ledgerId)
+                        .collect(Collectors.toSet());
+                if (!batchedIndexesToDelete.isEmpty()) {
+                    batchedIndexesToDelete.forEach(batchDeletedIndexes::remove);
+                }
+            }
+        } catch (Exception e){
+            e.printStackTrace();

Review Comment:
   why `e.printStackTrace();`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #18620: [fix][broker] If ledger lost, cursor mark delete position can not forward

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode commented on code in PR #18620:
URL: https://github.com/apache/pulsar/pull/18620#discussion_r1194569568


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -480,7 +482,24 @@ public void operationComplete(ManagedCursorInfo info, Stat stat) {
                     // Need to proceed and read the last entry in the specified ledger to find out the last position
                     log.info("[{}] Consumer {} meta-data recover from ledger {}", ledger.getName(), name,
                             info.getCursorsLedgerId());
-                    recoverFromLedger(info, callback);
+                    recoverFromLedger(info, new VoidCallback() {
+                        @Override
+                        public void operationComplete() {
+                            LinkedHashSet<Long> ledgersDeletedButNotAcked = checkLedgersDeletedButNotAcked();

Review Comment:
   these codes have been removed(This PR no longer call the checks when cursor is initializing)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #18620: [fix][broker] If ledger lost, cursor mark delete position can not forward

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode commented on code in PR #18620:
URL: https://github.com/apache/pulsar/pull/18620#discussion_r1198363308


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -2661,6 +2708,34 @@ void setReadPosition(Position newReadPositionInt) {
         }
     }
 
+    void clearIncompleteAckedRecordsByLedgerId(final long ledgerId){
+        lock.writeLock().lock();
+        try {
+            List<Range> rangeListToDelete = individualDeletedMessages.asRanges().stream()
+                    .filter(range -> range.lowerEndpoint().getLedgerId() == ledgerId)
+                    .map(range -> Range.openClosed(

Review Comment:
   The solution has been revised. Could you take a look again?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #18620: [fix][broker] If ledger lost, cursor mark delete position can not forward

Posted by "codelipenghui (via GitHub)" <gi...@apache.org>.
codelipenghui commented on code in PR #18620:
URL: https://github.com/apache/pulsar/pull/18620#discussion_r1205099793


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java:
##########
@@ -786,6 +786,12 @@ Set<? extends Position> asyncReplayEntries(
      */
     long getEstimatedSizeSinceMarkDeletePosition();
 
+    /**
+     * If a ledger is lost, this ledger will be skipped after enabled "autoSkipNonRecoverableData", and the method is
+     * used to delete information about this ledger in the ManagedCursor.
+     */
+    void noticeNonRecoverableLedgerSkipped(long ledgerId);

Review Comment:
   We have the following annotations on this file
   
   ```
   @InterfaceAudience.LimitedPrivate
   @InterfaceStability.Stable
   ```
   
   So we should not break the existing implementations.



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java:
##########
@@ -631,6 +631,12 @@ void asyncSetProperties(Map<String, String> properties, AsyncCallbacks.UpdatePro
      */
     void trimConsumedLedgersInBackground(CompletableFuture<?> promise);
 
+    /**
+     * If a ledger is lost, this ledger will be skipped after enabled "autoSkipNonRecoverableData", and the method is
+     * used to delete information about this ledger in the ManagedCursor.
+     */
+    void noticeToCursorNonRecoverableLedgerSkipped(long ledgerId);

Review Comment:
   Same as the above comment.



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java:
##########
@@ -786,6 +786,12 @@ Set<? extends Position> asyncReplayEntries(
      */
     long getEstimatedSizeSinceMarkDeletePosition();
 
+    /**
+     * If a ledger is lost, this ledger will be skipped after enabled "autoSkipNonRecoverableData", and the method is
+     * used to delete information about this ledger in the ManagedCursor.
+     */
+    void noticeNonRecoverableLedgerSkipped(long ledgerId);

Review Comment:
   Maybe we can just use `skipNonRecoverableLedger(long ledgerId)`;



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -2718,6 +2719,72 @@ void setReadPosition(Position newReadPositionInt) {
         }
     }
 
+    /**
+     * Manually acknowledge all entries in the lost ledger.
+     * - Since this is an uncommon event, we focus on maintainability. So we do not modify
+     *   {@link #individualDeletedMessages} and {@link #batchDeletedIndexes}, but call
+     *   {@link #asyncDelete(Position, AsyncCallbacks.DeleteCallback, Object)}.
+     * - This method is valid regardless of the consumer ACK type.
+     * - If there is a consumer ack request after this event, it will also work.
+     */
+    @Override
+    public void noticeNonRecoverableLedgerSkipped(final long ledgerId){
+        LedgerInfo ledgerInfo = ledger.getLedgersInfo().get(ledgerId);
+        if (ledgerInfo == null) {
+            return;
+        }
+        lock.writeLock().lock();
+        log.warn("[{}] [{}] Since the ledger [{}] is lost and the autoSkipNonRecoverableData is true, this ledger will"
+                + " be auto acknowledge in subscription", ledger.getName(), name, ledgerId);
+        try {
+            List<Position> positionsToAck = new ArrayList<>();
+            for (int i = 0; i < ledgerInfo.getEntries(); i++) {
+                if (!individualDeletedMessages.contains(ledgerId, i)) {
+                    positionsToAck.add(PositionImpl.get(ledgerId, i));
+                }
+                // Acknowledge in segments to avoid OOM.
+                if (positionsToAck.size() >= 1000) {
+                    retryToAcknowledgeNonRecoverablePositions(positionsToAck, 1);
+                    positionsToAck = new ArrayList<>();
+                }

Review Comment:
   The managed cursor already has an entry delete limiter, can we call `asyncDelete` directly?



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -1741,6 +1741,15 @@ synchronized void ledgerClosed(final LedgerHandle lh) {
         }
     }
 
+    @Override
+    public void noticeToCursorNonRecoverableLedgerSkipped(long ledgerId){
+        Iterator<ManagedCursor> managedCursorIterator = cursors.iterator();
+        while (managedCursorIterator.hasNext()){
+            ManagedCursor managedCursor = managedCursorIterator.next();
+            managedCursor.noticeNonRecoverableLedgerSkipped(ledgerId);
+        }

Review Comment:
   ```suggestion
           for (ManagedCursor managedCursor : cursors) {
               managedCursor.noticeNonRecoverableLedgerSkipped(ledgerId);
           }
   ```



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -2718,6 +2719,72 @@ void setReadPosition(Position newReadPositionInt) {
         }
     }
 
+    /**
+     * Manually acknowledge all entries in the lost ledger.
+     * - Since this is an uncommon event, we focus on maintainability. So we do not modify
+     *   {@link #individualDeletedMessages} and {@link #batchDeletedIndexes}, but call
+     *   {@link #asyncDelete(Position, AsyncCallbacks.DeleteCallback, Object)}.
+     * - This method is valid regardless of the consumer ACK type.
+     * - If there is a consumer ack request after this event, it will also work.
+     */
+    @Override
+    public void noticeNonRecoverableLedgerSkipped(final long ledgerId){
+        LedgerInfo ledgerInfo = ledger.getLedgersInfo().get(ledgerId);
+        if (ledgerInfo == null) {
+            return;
+        }
+        lock.writeLock().lock();
+        log.warn("[{}] [{}] Since the ledger [{}] is lost and the autoSkipNonRecoverableData is true, this ledger will"
+                + " be auto acknowledge in subscription", ledger.getName(), name, ledgerId);
+        try {
+            List<Position> positionsToAck = new ArrayList<>();
+            for (int i = 0; i < ledgerInfo.getEntries(); i++) {
+                if (!individualDeletedMessages.contains(ledgerId, i)) {
+                    positionsToAck.add(PositionImpl.get(ledgerId, i));
+                }
+                // Acknowledge in segments to avoid OOM.
+                if (positionsToAck.size() >= 1000) {
+                    retryToAcknowledgeNonRecoverablePositions(positionsToAck, 1);
+                    positionsToAck = new ArrayList<>();
+                }
+            }
+            // Acknowledge the last segments.
+            if (!positionsToAck.isEmpty()) {
+                retryToAcknowledgeNonRecoverablePositions(positionsToAck, 1);
+            }
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    private void retryToAcknowledgeNonRecoverablePositions(List<Position> positions, int retryTimes) {
+        if (CollectionUtils.isEmpty(positions)) {
+            return;
+        }
+        asyncDelete(positions, new AsyncCallbacks.DeleteCallback() {
+            @Override
+            public void deleteComplete(Object ctx) {
+                // ignore.
+            }
+
+            @Override
+            public void deleteFailed(ManagedLedgerException ex, Object ctx) {
+                if (retryTimes <= 3) {
+                    log.warn("[{}] [{}] Try to acknowledge the non recoverable positions fail and it will be retry"
+                                    + " after 60s. ledgerId: {}, the current retry times: {}",
+                            ledger.getName(), name, positions.get(0).getLedgerId(), retryTimes, ex);
+                    ledger.getScheduledExecutor()
+                            .schedule(() -> retryToAcknowledgeNonRecoverablePositions(positions, retryTimes + 1),
+                                    60, TimeUnit.SECONDS);
+                } else {
+                    log.error("[{}] [{}] Try to acknowledge the non recoverable positions ultimately failed."
+                                    + " ledgerId: {}, retry times: {}",
+                            ledger.getName(), name, positions.get(0).getLedgerId(), retryTimes, ex);
+                }
+            }
+        }, retryTimes);

Review Comment:
   The method `internalMarkDelete` already handled the failure operation, why do we need to handle it again?
   We only need to make sure the memory state is updated. The failure of ack state persistence is fine, right? Because the subsequent ack operations will try again.
   
   If the broker crashed, the non-recoverable ledger will be detected again?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #18620: [fix][broker] If ledger lost, cursor mark delete position can not forward

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #18620:
URL: https://github.com/apache/pulsar/pull/18620#discussion_r1031951904


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -2661,6 +2708,34 @@ void setReadPosition(Position newReadPositionInt) {
         }
     }
 
+    void clearIncompleteAckedRecordsByLedgerId(final long ledgerId){
+        lock.writeLock().lock();
+        try {
+            List<Range> rangeListToDelete = individualDeletedMessages.asRanges().stream()
+                    .filter(range -> range.lowerEndpoint().getLedgerId() == ledgerId)
+                    .map(range -> Range.openClosed(
+                            new LongPairRangeSet.LongPair(ledgerId, range.lowerEndpoint().getEntryId()),
+                            new LongPairRangeSet.LongPair(ledgerId, range.upperEndpoint().getEntryId())
+                    )).collect(Collectors.toList());
+            if (!rangeListToDelete.isEmpty()) {
+                rangeListToDelete.forEach(individualDeletedMessages::remove);
+            }
+
+            if (batchDeletedIndexes != null) {
+                Set<PositionImpl> batchedIndexesToDelete = batchDeletedIndexes.keySet().stream()
+                        .filter(position -> position.getLedgerId() == ledgerId)
+                        .collect(Collectors.toSet());
+                if (!batchedIndexesToDelete.isEmpty()) {
+                    batchedIndexesToDelete.forEach(batchDeletedIndexes::remove);
+                }
+            }
+        } catch (Exception e){
+            e.printStackTrace();

Review Comment:
   This was added during debugging, and I forgot to delete it, already fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #18620: [fix] [broker] If ledger lost, cursor makr delete position can not forward

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #18620:
URL: https://github.com/apache/pulsar/pull/18620#discussion_r1031915619


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -2661,6 +2708,34 @@ void setReadPosition(Position newReadPositionInt) {
         }
     }
 
+    void clearIncompleteAckedRecordsByLedgerId(final long ledgerId){

Review Comment:
   Access Modifiers?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] AlvaroStream commented on a diff in pull request #18620: [fix][broker] If ledger lost, cursor mark delete position can not forward

Posted by GitBox <gi...@apache.org>.
AlvaroStream commented on code in PR #18620:
URL: https://github.com/apache/pulsar/pull/18620#discussion_r1033524777


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -480,7 +482,24 @@ public void operationComplete(ManagedCursorInfo info, Stat stat) {
                     // Need to proceed and read the last entry in the specified ledger to find out the last position
                     log.info("[{}] Consumer {} meta-data recover from ledger {}", ledger.getName(), name,
                             info.getCursorsLedgerId());
-                    recoverFromLedger(info, callback);
+                    recoverFromLedger(info, new VoidCallback() {
+                        @Override
+                        public void operationComplete() {
+                            LinkedHashSet<Long> ledgersDeletedButNotAcked = checkLedgersDeletedButNotAcked();
+                            if (!ledgersDeletedButNotAcked.isEmpty()){
+                                log.warn("[{}] ledgers {} has deleted from topic, but still in the records of"
+                                                + " incomplete-ack {}. If it is not cleaned up in time, it will cause"
+                                                + " the mark deleted position to not move forward.",
+                                        ledger.getName(), ledgersDeletedButNotAcked, name);

Review Comment:
   Is it possible to add information about the topic and subscription stuck?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #18620: [fix] [broker] If ledger lost, cursor makr delete position can not forward

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #18620:
URL: https://github.com/apache/pulsar/pull/18620#discussion_r1031919293


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java:
##########
@@ -117,6 +119,9 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
                 return;
             }
             updateReadPosition(nexReadPosition);
+            if (lostLedger != null) {
+                clearIncompleteAckedRecordsFromLedger(lostLedger);

Review Comment:
   Call `cursor.getManagedLedger().removeNonRecoverableLedger(ledgerId);` directly to avoid adding a useless stack frame.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #18620: [fix] [broker] If ledger lost, cursor makr delete position can not forward

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #18620:
URL: https://github.com/apache/pulsar/pull/18620#discussion_r1031915208


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -480,7 +482,24 @@ public void operationComplete(ManagedCursorInfo info, Stat stat) {
                     // Need to proceed and read the last entry in the specified ledger to find out the last position
                     log.info("[{}] Consumer {} meta-data recover from ledger {}", ledger.getName(), name,
                             info.getCursorsLedgerId());
-                    recoverFromLedger(info, callback);
+                    recoverFromLedger(info, new VoidCallback() {
+                        @Override
+                        public void operationComplete() {
+                            LinkedHashSet<Long> ledgersDeletedButNotAcked = checkLedgersDeletedButNotAcked();

Review Comment:
   Why do we need this checking logic? It looks like we already processed at cursor got `NonRecoverableLedgerException`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #18620: [fix][broker] If ledger lost, cursor mark delete position can not forward

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode commented on code in PR #18620:
URL: https://github.com/apache/pulsar/pull/18620#discussion_r1205161925


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -2718,6 +2719,72 @@ void setReadPosition(Position newReadPositionInt) {
         }
     }
 
+    /**
+     * Manually acknowledge all entries in the lost ledger.
+     * - Since this is an uncommon event, we focus on maintainability. So we do not modify
+     *   {@link #individualDeletedMessages} and {@link #batchDeletedIndexes}, but call
+     *   {@link #asyncDelete(Position, AsyncCallbacks.DeleteCallback, Object)}.
+     * - This method is valid regardless of the consumer ACK type.
+     * - If there is a consumer ack request after this event, it will also work.
+     */
+    @Override
+    public void noticeNonRecoverableLedgerSkipped(final long ledgerId){
+        LedgerInfo ledgerInfo = ledger.getLedgersInfo().get(ledgerId);
+        if (ledgerInfo == null) {
+            return;
+        }
+        lock.writeLock().lock();
+        log.warn("[{}] [{}] Since the ledger [{}] is lost and the autoSkipNonRecoverableData is true, this ledger will"
+                + " be auto acknowledge in subscription", ledger.getName(), name, ledgerId);
+        try {
+            List<Position> positionsToAck = new ArrayList<>();
+            for (int i = 0; i < ledgerInfo.getEntries(); i++) {
+                if (!individualDeletedMessages.contains(ledgerId, i)) {
+                    positionsToAck.add(PositionImpl.get(ledgerId, i));
+                }
+                // Acknowledge in segments to avoid OOM.
+                if (positionsToAck.size() >= 1000) {
+                    retryToAcknowledgeNonRecoverablePositions(positionsToAck, 1);
+                    positionsToAck = new ArrayList<>();
+                }
+            }
+            // Acknowledge the last segments.
+            if (!positionsToAck.isEmpty()) {
+                retryToAcknowledgeNonRecoverablePositions(positionsToAck, 1);
+            }
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    private void retryToAcknowledgeNonRecoverablePositions(List<Position> positions, int retryTimes) {
+        if (CollectionUtils.isEmpty(positions)) {
+            return;
+        }
+        asyncDelete(positions, new AsyncCallbacks.DeleteCallback() {
+            @Override
+            public void deleteComplete(Object ctx) {
+                // ignore.
+            }
+
+            @Override
+            public void deleteFailed(ManagedLedgerException ex, Object ctx) {
+                if (retryTimes <= 3) {
+                    log.warn("[{}] [{}] Try to acknowledge the non recoverable positions fail and it will be retry"
+                                    + " after 60s. ledgerId: {}, the current retry times: {}",
+                            ledger.getName(), name, positions.get(0).getLedgerId(), retryTimes, ex);
+                    ledger.getScheduledExecutor()
+                            .schedule(() -> retryToAcknowledgeNonRecoverablePositions(positions, retryTimes + 1),
+                                    60, TimeUnit.SECONDS);
+                } else {
+                    log.error("[{}] [{}] Try to acknowledge the non recoverable positions ultimately failed."
+                                    + " ledgerId: {}, retry times: {}",
+                            ledger.getName(), name, positions.get(0).getLedgerId(), retryTimes, ex);
+                }
+            }
+        }, retryTimes);

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] gaoran10 commented on pull request #18620: [fix][broker] If ledger lost, cursor mark delete position can not forward

Posted by "gaoran10 (via GitHub)" <gi...@apache.org>.
gaoran10 commented on PR #18620:
URL: https://github.com/apache/pulsar/pull/18620#issuecomment-1566421022

   It seems there are check style problems.
   ```
   Error:  src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:[97,8] (imports) UnusedImports: Unused import: org.apache.commons.collections4.CollectionUtils.
   Error:  src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:[2750] (sizes) LineLength: Line is longer than 120 characters (found 129).
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #18620: [fix][broker] If ledger lost, cursor mark delete position can not forward

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode commented on code in PR #18620:
URL: https://github.com/apache/pulsar/pull/18620#discussion_r1205156001


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -1741,6 +1741,15 @@ synchronized void ledgerClosed(final LedgerHandle lh) {
         }
     }
 
+    @Override
+    public void noticeToCursorNonRecoverableLedgerSkipped(long ledgerId){
+        Iterator<ManagedCursor> managedCursorIterator = cursors.iterator();
+        while (managedCursorIterator.hasNext()){
+            ManagedCursor managedCursor = managedCursorIterator.next();
+            managedCursor.noticeNonRecoverableLedgerSkipped(ledgerId);
+        }

Review Comment:
   fixed



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java:
##########
@@ -631,6 +631,12 @@ void asyncSetProperties(Map<String, String> properties, AsyncCallbacks.UpdatePro
      */
     void trimConsumedLedgersInBackground(CompletableFuture<?> promise);
 
+    /**
+     * If a ledger is lost, this ledger will be skipped after enabled "autoSkipNonRecoverableData", and the method is
+     * used to delete information about this ledger in the ManagedCursor.
+     */
+    void noticeToCursorNonRecoverableLedgerSkipped(long ledgerId);

Review Comment:
   fixed



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java:
##########
@@ -786,6 +786,12 @@ Set<? extends Position> asyncReplayEntries(
      */
     long getEstimatedSizeSinceMarkDeletePosition();
 
+    /**
+     * If a ledger is lost, this ledger will be skipped after enabled "autoSkipNonRecoverableData", and the method is
+     * used to delete information about this ledger in the ManagedCursor.
+     */
+    void noticeNonRecoverableLedgerSkipped(long ledgerId);

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #18620: [fix] [broker] If ledger lost, cursor makr delete position can not forward

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #18620:
URL: https://github.com/apache/pulsar/pull/18620#discussion_r1031913744


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java:
##########
@@ -102,9 +102,11 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
                     readPosition, exception.getMessage());
             final ManagedLedgerImpl ledger = (ManagedLedgerImpl) cursor.getManagedLedger();
             Position nexReadPosition;
+            Long lostLedger = null;
             if (exception instanceof ManagedLedgerException.LedgerNotExistException) {
                 // try to find and move to next valid ledger
                 nexReadPosition = cursor.getNextLedgerPosition(readPosition.getLedgerId());
+                lostLedger = readPosition.ledgerId;
             } else {
                 // Skip this read operation
                 nexReadPosition = ledger.getValidPositionAfterSkippedEntries(readPosition, count);

Review Comment:
   I think we also need to check here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #18620: [fix] [broker] If ledger lost, cursor makr delete position can not forward

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #18620:
URL: https://github.com/apache/pulsar/pull/18620#discussion_r1031917592


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -2661,6 +2708,34 @@ void setReadPosition(Position newReadPositionInt) {
         }
     }
 
+    void clearIncompleteAckedRecordsByLedgerId(final long ledgerId){
+        lock.writeLock().lock();
+        try {
+            List<Range> rangeListToDelete = individualDeletedMessages.asRanges().stream()
+                    .filter(range -> range.lowerEndpoint().getLedgerId() == ledgerId)
+                    .map(range -> Range.openClosed(

Review Comment:
   Why can't we return the range direct?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #18620: [fix][broker] If ledger lost, cursor mark delete position can not forward

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #18620:
URL: https://github.com/apache/pulsar/pull/18620#discussion_r1031951717


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/LedgerLostTest.java:
##########
@@ -0,0 +1,429 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker;
+
+import java.net.Inet4Address;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.TreeMap;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
+import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.BatchMessageIdImpl;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+import org.testcontainers.shaded.org.awaitility.reflect.WhiteboxImpl;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker")
+public class LedgerLostTest {
+
+    // prefer inet4.
+    private static final String LOCALHOST = Inet4Address.getLoopbackAddress().getHostAddress();
+    private static final String CLUSTER = "broken_ledger_test";
+    private static final String DEFAULT_TENANT = "public";
+    private static final String DEFAULT_NAMESPACE = DEFAULT_TENANT + "/default";
+
+    protected LocalBookkeeperEnsemble bkEnsemble;
+    protected ServiceConfiguration pulsarConfig;
+    protected PulsarService pulsarService;
+    protected int brokerWebServicePort;
+    protected int brokerServicePort;
+    protected String metadataServiceUri;
+    protected BookKeeper bookKeeperClient;
+    protected String brokerUrl;
+    protected String brokerServiceUrl;
+    protected PulsarAdmin pulsarAdmin;
+    protected PulsarClient pulsarClient;
+
+    @BeforeClass
+    protected void setup() throws Exception {
+        log.info("--- Start cluster ---");
+        startLocalBookie();
+        initPulsarConfig();
+        startPulsar();
+    }
+
+    @AfterClass
+    protected void cleanup() throws Exception {
+        log.info("--- Shutting down ---");
+        silentStopPulsar();
+        stopLocalBookie();
+    }
+
+    protected void startLocalBookie() throws Exception{
+        log.info("===> Start bookie ");
+        bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+        bkEnsemble.start();
+        metadataServiceUri = String.format("zk:%s:%s", LOCALHOST, bkEnsemble.getZookeeperPort());
+        initBookieClient();
+    }
+
+    protected void initBookieClient() throws Exception {
+        bookKeeperClient = new BookKeeper(String.format("%s:%s", LOCALHOST, bkEnsemble.getZookeeperPort()));
+    }
+
+    protected void stopLocalBookie() {
+        log.info("===> Close bookie client");
+        try {
+            bookKeeperClient.close();
+        } catch (Exception e){
+            log.error("Close bookie client fail", e);
+        }
+        log.info("===> Stop bookie ");
+        try {
+            bkEnsemble.stop();
+        } catch (Exception e){
+            log.error("Stop bookie fail", e);
+        }
+    }
+
+    protected void initPulsarConfig() throws Exception{
+        pulsarConfig = new ServiceConfiguration();
+        pulsarConfig.setAdvertisedAddress(LOCALHOST);
+        pulsarConfig.setMetadataStoreUrl(metadataServiceUri);
+        pulsarConfig.setClusterName(CLUSTER);
+        pulsarConfig.setTransactionCoordinatorEnabled(false);
+        pulsarConfig.setAllowAutoTopicCreation(true);
+        pulsarConfig.setSystemTopicEnabled(true);
+        pulsarConfig.setAllowAutoTopicCreationType("non-partitioned");
+        pulsarConfig.setAutoSkipNonRecoverableData(true);
+        pulsarConfig.setManagedLedgerDefaultMarkDeleteRateLimit(Integer.MAX_VALUE);
+        pulsarConfig.setBrokerDeleteInactiveTopicsEnabled(false);
+        pulsarConfig.setAcknowledgmentAtBatchIndexLevelEnabled(true);
+        pulsarConfig.setLazyCursorRecovery(false);
+    }
+
+    protected void startPulsar() throws Exception {
+        log.info("===> Start pulsar ");
+        pulsarService = new PulsarService(pulsarConfig);
+        pulsarService.start();
+        brokerWebServicePort = pulsarService.getListenPortHTTP().get();
+        brokerServicePort = pulsarService.getBrokerListenPort().get();
+        brokerUrl = String.format("http://%s:%s", LOCALHOST, brokerWebServicePort);
+        brokerServiceUrl = String.format("pulsar://%s:%s", LOCALHOST, brokerServicePort);
+        initPulsarAdmin();
+        initPulsarClient();
+        initDefaultNamespace();
+    }
+
+    protected void silentStopPulsar() throws Exception {
+        log.info("===> Close pulsar client ");
+        try {
+            pulsarClient.close();
+        }catch (Exception e){
+            log.error("===> Close pulsar client fail", e);
+        }
+        log.info("===> Close pulsar admin ");
+        try {
+            pulsarAdmin.close();
+        }catch (Exception e){
+            log.error("===> Close pulsar admin fail", e);
+        }
+        log.info("===> Stop pulsar service ");
+        try {
+            pulsarService.close();
+        }catch (Exception e){
+            log.error("===> Stop pulsar service fail", e);
+        }
+    }
+
+    protected void stopPulsar() throws Exception {
+        log.info("===> Close pulsar client ");
+        pulsarClient.close();
+        log.info("===> Close pulsar admin ");
+        pulsarAdmin.close();
+        log.info("===> Stop pulsar service ");
+        pulsarService.close();
+    }
+
+    protected void initPulsarAdmin() throws Exception {
+        pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(brokerUrl).build();
+    }
+
+    protected void initPulsarClient() throws Exception {
+        pulsarClient = PulsarClient.builder().serviceUrl(brokerServiceUrl).build();
+    }
+
+    protected void initDefaultNamespace() throws Exception {
+        if (!pulsarAdmin.clusters().getClusters().contains(CLUSTER)) {
+            pulsarAdmin.clusters().createCluster(CLUSTER, ClusterData.builder().serviceUrl(brokerServiceUrl).build());
+        }
+        if (!pulsarAdmin.tenants().getTenants().contains(DEFAULT_TENANT)){
+            pulsarAdmin.tenants().createTenant(DEFAULT_TENANT,
+                    TenantInfo.builder().allowedClusters(Collections.singleton(CLUSTER)).build());
+        }
+        if (!pulsarAdmin.namespaces().getNamespaces(DEFAULT_TENANT).contains(DEFAULT_NAMESPACE)) {
+            pulsarAdmin.namespaces().createNamespace(DEFAULT_NAMESPACE, Collections.singleton(CLUSTER));
+        }
+    }
+
+    @DataProvider(name = "batchEnabled")
+    public Object[][] batchEnabled(){
+        return new Object[][]{
+                {true},
+                {false}
+        };
+    }
+
+    @Test(timeOut = 30000, dataProvider = "batchEnabled")
+    public void testCompactionLedgerLost(boolean enabledBatch) throws Exception {

Review Comment:
   It's a misnomer, I copied it from somewhere and forgot to change it. already fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] github-actions[bot] commented on pull request #18620: [fix][broker] If ledger lost, cursor mark delete position can not forward

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #18620:
URL: https://github.com/apache/pulsar/pull/18620#issuecomment-1367022410

   The pr had no activity for 30 days, mark with Stale label.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #18620: [fix] [broker] If ledger lost, cursor makr delete position can not forward

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #18620:
URL: https://github.com/apache/pulsar/pull/18620#discussion_r1031919726


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/LedgerLostTest.java:
##########
@@ -0,0 +1,429 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker;
+
+import java.net.Inet4Address;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.TreeMap;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
+import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.BatchMessageIdImpl;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+import org.testcontainers.shaded.org.awaitility.reflect.WhiteboxImpl;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker")
+public class LedgerLostTest {
+
+    // prefer inet4.
+    private static final String LOCALHOST = Inet4Address.getLoopbackAddress().getHostAddress();
+    private static final String CLUSTER = "broken_ledger_test";
+    private static final String DEFAULT_TENANT = "public";
+    private static final String DEFAULT_NAMESPACE = DEFAULT_TENANT + "/default";
+
+    protected LocalBookkeeperEnsemble bkEnsemble;
+    protected ServiceConfiguration pulsarConfig;
+    protected PulsarService pulsarService;
+    protected int brokerWebServicePort;
+    protected int brokerServicePort;
+    protected String metadataServiceUri;
+    protected BookKeeper bookKeeperClient;
+    protected String brokerUrl;
+    protected String brokerServiceUrl;
+    protected PulsarAdmin pulsarAdmin;
+    protected PulsarClient pulsarClient;
+
+    @BeforeClass
+    protected void setup() throws Exception {
+        log.info("--- Start cluster ---");
+        startLocalBookie();
+        initPulsarConfig();
+        startPulsar();
+    }
+
+    @AfterClass
+    protected void cleanup() throws Exception {
+        log.info("--- Shutting down ---");
+        silentStopPulsar();
+        stopLocalBookie();
+    }
+
+    protected void startLocalBookie() throws Exception{
+        log.info("===> Start bookie ");
+        bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+        bkEnsemble.start();
+        metadataServiceUri = String.format("zk:%s:%s", LOCALHOST, bkEnsemble.getZookeeperPort());
+        initBookieClient();
+    }
+
+    protected void initBookieClient() throws Exception {
+        bookKeeperClient = new BookKeeper(String.format("%s:%s", LOCALHOST, bkEnsemble.getZookeeperPort()));
+    }
+
+    protected void stopLocalBookie() {
+        log.info("===> Close bookie client");
+        try {
+            bookKeeperClient.close();
+        } catch (Exception e){
+            log.error("Close bookie client fail", e);
+        }
+        log.info("===> Stop bookie ");
+        try {
+            bkEnsemble.stop();
+        } catch (Exception e){
+            log.error("Stop bookie fail", e);
+        }
+    }
+
+    protected void initPulsarConfig() throws Exception{
+        pulsarConfig = new ServiceConfiguration();
+        pulsarConfig.setAdvertisedAddress(LOCALHOST);
+        pulsarConfig.setMetadataStoreUrl(metadataServiceUri);
+        pulsarConfig.setClusterName(CLUSTER);
+        pulsarConfig.setTransactionCoordinatorEnabled(false);
+        pulsarConfig.setAllowAutoTopicCreation(true);
+        pulsarConfig.setSystemTopicEnabled(true);
+        pulsarConfig.setAllowAutoTopicCreationType("non-partitioned");
+        pulsarConfig.setAutoSkipNonRecoverableData(true);
+        pulsarConfig.setManagedLedgerDefaultMarkDeleteRateLimit(Integer.MAX_VALUE);
+        pulsarConfig.setBrokerDeleteInactiveTopicsEnabled(false);
+        pulsarConfig.setAcknowledgmentAtBatchIndexLevelEnabled(true);
+        pulsarConfig.setLazyCursorRecovery(false);
+    }
+
+    protected void startPulsar() throws Exception {
+        log.info("===> Start pulsar ");
+        pulsarService = new PulsarService(pulsarConfig);
+        pulsarService.start();
+        brokerWebServicePort = pulsarService.getListenPortHTTP().get();
+        brokerServicePort = pulsarService.getBrokerListenPort().get();
+        brokerUrl = String.format("http://%s:%s", LOCALHOST, brokerWebServicePort);
+        brokerServiceUrl = String.format("pulsar://%s:%s", LOCALHOST, brokerServicePort);
+        initPulsarAdmin();
+        initPulsarClient();
+        initDefaultNamespace();
+    }
+
+    protected void silentStopPulsar() throws Exception {
+        log.info("===> Close pulsar client ");
+        try {
+            pulsarClient.close();
+        }catch (Exception e){
+            log.error("===> Close pulsar client fail", e);
+        }
+        log.info("===> Close pulsar admin ");
+        try {
+            pulsarAdmin.close();
+        }catch (Exception e){
+            log.error("===> Close pulsar admin fail", e);
+        }
+        log.info("===> Stop pulsar service ");
+        try {
+            pulsarService.close();
+        }catch (Exception e){
+            log.error("===> Stop pulsar service fail", e);
+        }
+    }
+
+    protected void stopPulsar() throws Exception {
+        log.info("===> Close pulsar client ");
+        pulsarClient.close();
+        log.info("===> Close pulsar admin ");
+        pulsarAdmin.close();
+        log.info("===> Stop pulsar service ");
+        pulsarService.close();
+    }
+
+    protected void initPulsarAdmin() throws Exception {
+        pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(brokerUrl).build();
+    }
+
+    protected void initPulsarClient() throws Exception {
+        pulsarClient = PulsarClient.builder().serviceUrl(brokerServiceUrl).build();
+    }
+
+    protected void initDefaultNamespace() throws Exception {
+        if (!pulsarAdmin.clusters().getClusters().contains(CLUSTER)) {
+            pulsarAdmin.clusters().createCluster(CLUSTER, ClusterData.builder().serviceUrl(brokerServiceUrl).build());
+        }
+        if (!pulsarAdmin.tenants().getTenants().contains(DEFAULT_TENANT)){
+            pulsarAdmin.tenants().createTenant(DEFAULT_TENANT,
+                    TenantInfo.builder().allowedClusters(Collections.singleton(CLUSTER)).build());
+        }
+        if (!pulsarAdmin.namespaces().getNamespaces(DEFAULT_TENANT).contains(DEFAULT_NAMESPACE)) {
+            pulsarAdmin.namespaces().createNamespace(DEFAULT_NAMESPACE, Collections.singleton(CLUSTER));
+        }
+    }
+
+    @DataProvider(name = "batchEnabled")
+    public Object[][] batchEnabled(){
+        return new Object[][]{
+                {true},
+                {false}
+        };
+    }
+
+    @Test(timeOut = 30000, dataProvider = "batchEnabled")
+    public void testCompactionLedgerLost(boolean enabledBatch) throws Exception {

Review Comment:
   Could you explain why `compaction ledger`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #18620: [fix] [broker] If ledger lost, cursor makr delete position can not forward

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #18620:
URL: https://github.com/apache/pulsar/pull/18620#discussion_r1031915208


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -480,7 +482,24 @@ public void operationComplete(ManagedCursorInfo info, Stat stat) {
                     // Need to proceed and read the last entry in the specified ledger to find out the last position
                     log.info("[{}] Consumer {} meta-data recover from ledger {}", ledger.getName(), name,
                             info.getCursorsLedgerId());
-                    recoverFromLedger(info, callback);
+                    recoverFromLedger(info, new VoidCallback() {
+                        @Override
+                        public void operationComplete() {
+                            LinkedHashSet<Long> ledgersDeletedButNotAcked = checkLedgersDeletedButNotAcked();

Review Comment:
   Why do we need this checking logic? It looks like we already handled at cursor got `NonRecoverableLedgerException`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- closed pull request #18620: [fix][broker] If ledger lost, cursor mark delete position can not forward

Posted by GitBox <gi...@apache.org>.
Technoboy- closed pull request #18620: [fix][broker] If ledger lost, cursor mark delete position can not forward
URL: https://github.com/apache/pulsar/pull/18620


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #18620: [fix][broker] If ledger lost, cursor mark delete position can not forward

Posted by "codelipenghui (via GitHub)" <gi...@apache.org>.
codelipenghui commented on code in PR #18620:
URL: https://github.com/apache/pulsar/pull/18620#discussion_r1205193799


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java:
##########
@@ -2718,6 +2719,72 @@ void setReadPosition(Position newReadPositionInt) {
         }
     }
 
+    /**
+     * Manually acknowledge all entries in the lost ledger.
+     * - Since this is an uncommon event, we focus on maintainability. So we do not modify
+     *   {@link #individualDeletedMessages} and {@link #batchDeletedIndexes}, but call
+     *   {@link #asyncDelete(Position, AsyncCallbacks.DeleteCallback, Object)}.
+     * - This method is valid regardless of the consumer ACK type.
+     * - If there is a consumer ack request after this event, it will also work.
+     */
+    @Override
+    public void noticeNonRecoverableLedgerSkipped(final long ledgerId){
+        LedgerInfo ledgerInfo = ledger.getLedgersInfo().get(ledgerId);
+        if (ledgerInfo == null) {
+            return;
+        }
+        lock.writeLock().lock();
+        log.warn("[{}] [{}] Since the ledger [{}] is lost and the autoSkipNonRecoverableData is true, this ledger will"
+                + " be auto acknowledge in subscription", ledger.getName(), name, ledgerId);
+        try {
+            List<Position> positionsToAck = new ArrayList<>();
+            for (int i = 0; i < ledgerInfo.getEntries(); i++) {
+                if (!individualDeletedMessages.contains(ledgerId, i)) {
+                    positionsToAck.add(PositionImpl.get(ledgerId, i));
+                }
+                // Acknowledge in segments to avoid OOM.
+                if (positionsToAck.size() >= 1000) {
+                    retryToAcknowledgeNonRecoverablePositions(positionsToAck, 1);
+                    positionsToAck = new ArrayList<>();
+                }

Review Comment:
   I mean the `asyncDelete` method is also doing the same thing. 
   
   - Iterate the positions then update the memory state one by one.
   - The mark delete rate limiter will update the changes to the storage layer in batch



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode closed pull request #18620: [fix][broker] If ledger lost, cursor mark delete position can not forward

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode closed pull request #18620: [fix][broker] If ledger lost, cursor mark delete position can not forward
URL: https://github.com/apache/pulsar/pull/18620


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org