You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/08/16 16:26:44 UTC

[GitHub] [pulsar] horizonzy commented on a diff in pull request #16590: [PIP-186] Introduce two phase deletion protocol based on system topic

horizonzy commented on code in PR #16590:
URL: https://github.com/apache/pulsar/pull/16590#discussion_r947000060


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2570,70 +2651,107 @@ void internalTrimLedgers(boolean isTruncate, CompletableFuture<?> promise) {
             advanceCursorsIfNecessary(ledgersToDelete);
 
             PositionImpl currentLastConfirmedEntry = lastConfirmedEntry;
-            // Update metadata
-            for (LedgerInfo ls : ledgersToDelete) {
-                if (currentLastConfirmedEntry != null && ls.getLedgerId() == currentLastConfirmedEntry.getLedgerId()) {
-                    // this info is relevant because the lastMessageId won't be available anymore
-                    log.info("[{}] Ledger {} contains the current last confirmed entry {}, and it is going to be "
-                             + "deleted", name, ls.getLedgerId(), currentLastConfirmedEntry);
-                }
-
-                invalidateReadHandle(ls.getLedgerId());
-
-                ledgers.remove(ls.getLedgerId());
-                NUMBER_OF_ENTRIES_UPDATER.addAndGet(this, -ls.getEntries());
-                TOTAL_SIZE_UPDATER.addAndGet(this, -ls.getSize());
 
-                entryCache.invalidateAllEntries(ls.getLedgerId());
-            }
-            for (LedgerInfo ls : offloadedLedgersToDelete) {
-                LedgerInfo.Builder newInfoBuilder = ls.toBuilder();
-                newInfoBuilder.getOffloadContextBuilder().setBookkeeperDeleted(true);
-                String driverName = OffloadUtils.getOffloadDriverName(ls,
-                        config.getLedgerOffloader().getOffloadDriverName());
-                Map<String, String> driverMetadata = OffloadUtils.getOffloadDriverMetadata(ls,
-                        config.getLedgerOffloader().getOffloadDriverMetadata());
-                OffloadUtils.setOffloadDriverMetadata(newInfoBuilder, driverName, driverMetadata);
-                ledgers.put(ls.getLedgerId(), newInfoBuilder.build());
-            }
+            // Update metadata
+            // Mark deletable ledgers
+            Set<Long> deletableLedgers =
+                    Stream.concat(ledgersToDelete.stream().filter(ls -> !ls.getOffloadContext().getBookkeeperDeleted()),
+                            offloadedLedgersToDelete.stream()).map(LedgerInfo::getLedgerId).collect(Collectors.toSet());
+
+            // Mark deletable offloaded ledgers
+            Set<Long> deletableOffloadedLedgers = ledgersToDelete.stream()
+                    .filter(ls -> ls.getOffloadContext().hasUidMsb())
+                    .map(LedgerInfo::getLedgerId).collect(Collectors.toSet());
+
+            CompletableFuture<?> appendDeleteLedgerFuture =
+                    appendPendingDeleteLedger(deletableLedgers, deletableOffloadedLedgers);
+            appendDeleteLedgerFuture.thenAccept(ignore -> {
+                believedDeleteIds.addAll(deletableLedgers);
+                for (LedgerInfo ls : ledgersToDelete) {
+                    if (currentLastConfirmedEntry != null
+                            && ls.getLedgerId() == currentLastConfirmedEntry.getLedgerId()) {
+                        // this info is relevant because the lastMessageId won't be available anymore
+                        log.info("[{}] Ledger {} contains the current last confirmed entry {}, and it is going to be "
+                                + "deleted", name, ls.getLedgerId(), currentLastConfirmedEntry);
+                    }
+                    invalidateReadHandle(ls.getLedgerId());
+                    ledgers.remove(ls.getLedgerId());
+                    entryCache.invalidateAllEntries(ls.getLedgerId());
 
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] Updating of ledgers list after trimming", name);
-            }
+                    NUMBER_OF_ENTRIES_UPDATER.addAndGet(this, -ls.getEntries());
+                    TOTAL_SIZE_UPDATER.addAndGet(this, -ls.getSize());
+                }
+                for (LedgerInfo ls : offloadedLedgersToDelete) {
+                    LedgerInfo.Builder newInfoBuilder = ls.toBuilder();
+                    newInfoBuilder.getOffloadContextBuilder().setBookkeeperDeleted(true);
+                    String driverName = OffloadUtils.getOffloadDriverName(ls,
+                            config.getLedgerOffloader().getOffloadDriverName());
+                    Map<String, String> driverMetadata = OffloadUtils.getOffloadDriverMetadata(ls,
+                            config.getLedgerOffloader().getOffloadDriverMetadata());
+                    OffloadUtils.setOffloadDriverMetadata(newInfoBuilder, driverName, driverMetadata);
+                    ledgers.put(ls.getLedgerId(), newInfoBuilder.build());
+                }
 
-            store.asyncUpdateLedgerIds(name, getManagedLedgerInfo(), ledgersStat, new MetaStoreCallback<Void>() {
-                @Override
-                public void operationComplete(Void result, Stat stat) {
-                    log.info("[{}] End TrimConsumedLedgers. ledgers={} totalSize={}", name, ledgers.size(),
-                            TOTAL_SIZE_UPDATER.get(ManagedLedgerImpl.this));
-                    ledgersStat = stat;
-                    metadataMutex.unlock();
-                    trimmerMutex.unlock();
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Updating of ledgers list after trimming", name);
+                }
 
-                    for (LedgerInfo ls : ledgersToDelete) {
-                        log.info("[{}] Removing ledger {} - size: {}", name, ls.getLedgerId(), ls.getSize());
-                        asyncDeleteLedger(ls.getLedgerId(), ls);
-                    }
-                    for (LedgerInfo ls : offloadedLedgersToDelete) {
-                        log.info("[{}] Deleting offloaded ledger {} from bookkeeper - size: {}", name, ls.getLedgerId(),
-                                ls.getSize());
-                        asyncDeleteLedgerFromBookKeeper(ls.getLedgerId());
+                store.asyncUpdateLedgerIds(name, getManagedLedgerInfo(), ledgersStat, new MetaStoreCallback<Void>() {
+                    @Override
+                    public void operationComplete(Void result, Stat stat) {
+                        log.info("[{}] End TrimConsumedLedgers. ledgers={} totalSize={}", name, ledgers.size(),
+                                TOTAL_SIZE_UPDATER.get(ManagedLedgerImpl.this));
+                        ledgersStat = stat;
+                        metadataMutex.unlock();
+                        trimmerMutex.unlock();
+                        if (ledgerDeletionService instanceof LedgerDeletionService.LedgerDeletionServiceDisable) {

Review Comment:
   good idea.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org