You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/04/06 13:32:48 UTC

[GitHub] [pulsar] codelipenghui commented on a change in pull request #10087: [BUGFIX] cannot cleanup expired data after managed-ledger restart

codelipenghui commented on a change in pull request #10087:
URL: https://github.com/apache/pulsar/pull/10087#discussion_r607851452



##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -2140,6 +2133,42 @@ public void addWaitingEntryCallBack(WaitingEntryCallBack cb) {
         this.waitingEntryCallBacks.add(cb);
     }
 
+    public void maybeUpdateCursorBeforeTrimmingConsumedLedger() {
+        for (ManagedCursor cursor : cursors) {
+            PositionImpl lastAckedPosition = (PositionImpl) cursor.getMarkDeletedPosition();
+            LedgerInfo currPointedLedger = ledgers.get(lastAckedPosition.getLedgerId());
+            LedgerInfo nextPointedLedger = Optional.ofNullable(ledgers.higherEntry(lastAckedPosition.getLedgerId()))
+                    .map(Map.Entry::getValue).orElse(null);
+
+            if (currPointedLedger != null) {
+                if (nextPointedLedger != null) {
+                    if (lastAckedPosition.getEntryId() != -1 &&
+                            lastAckedPosition.getEntryId() + 1 >= currPointedLedger.getEntries()) {
+                        lastAckedPosition = new PositionImpl(nextPointedLedger.getLedgerId(), -1);
+                    }
+                } else {
+                    log.debug("No need to reset cursor: {}, current ledger is the last ledger.", cursor);
+                }
+            } else {
+                log.warn("Cursor: {} does not exist in the managed-ledger.", cursor);
+            }
+
+            if (!lastAckedPosition.equals((PositionImpl) cursor.getMarkDeletedPosition())) {
+                try {
+                    log.info("Reset cursor:{} to {} since ledger consumed completely", cursor, lastAckedPosition);
+                    if (lastConfirmedEntry.compareTo(lastAckedPosition) < 0) {
+                        lastConfirmedEntry = lastAckedPosition;

Review comment:
       This will break the behavior of the `lastConfirmedEntry` because we will get such as (10, -1) when get the last confirm entry. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org