You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/09/17 04:17:34 UTC

[GitHub] [pulsar] Jason918 commented on a diff in pull request #15914: [Fix][Tiered Storage] Eagerly Delete Offloaded Segments On Topic Deletion

Jason918 commented on code in PR #15914:
URL: https://github.com/apache/pulsar/pull/15914#discussion_r973532353


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##########
@@ -1072,28 +1074,31 @@ public CompletableFuture<Void> deleteTopic(String topic, boolean forceDelete) {
         }
 
         CompletableFuture<Void> future = new CompletableFuture<>();
+        log.info("Topic {} could not load, try to delete from metadata", topic);
 
         CompletableFuture<Void> deleteTopicAuthenticationFuture = new CompletableFuture<>();
         deleteTopicAuthenticationWithRetry(topic, deleteTopicAuthenticationFuture, 5);
+
         deleteTopicAuthenticationFuture.whenComplete((v, ex) -> {
             if (ex != null) {
                 future.completeExceptionally(ex);
                 return;
             }
-            managedLedgerFactory.asyncDelete(tn.getPersistenceNamingEncoding(), new DeleteLedgerCallback() {
-                @Override
-                public void deleteLedgerComplete(Object ctx) {
-                    future.complete(null);
-                }
+            CompletableFuture<ManagedLedgerConfig> mlConfigFuture =  getManagedLedgerConfig(topicName);

Review Comment:
   ```suggestion
               CompletableFuture<ManagedLedgerConfig> mlConfigFuture = getManagedLedgerConfig(topicName);
   ```



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java:
##########
@@ -154,6 +164,18 @@ void asyncOpenReadOnlyCursor(String managedLedgerName, Position startPosition, M
      */
     void asyncDelete(String name, DeleteLedgerCallback callback, Object ctx);
 
+    /**
+     * Delete a managed ledger. If it's not open, it's metadata will get regardless deleted.
+     *
+     * @param name
+     * @throws InterruptedException
+     * @throws ManagedLedgerException
+     */
+    default void asyncDelete(String name, CompletableFuture<ManagedLedgerConfig> mlConfigFuture,
+                             DeleteLedgerCallback callback, Object ctx) {
+        asyncDelete(name, callback, ctx);

Review Comment:
   NIT: No need to add this default implementation as we already overrode this.



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java:
##########
@@ -873,23 +891,101 @@ public void getInfoFailed(ManagedLedgerException exception, Object ctx) {
         }, ctx);
     }
 
+    private CompletableFuture<Void> cleanupOffloaded(long ledgerId, UUID uuid, ManagedLedgerConfig mlConfig,
+                                     Map<String, String> offloadDriverMetadata, String cleanupReason, String name) {
+        log.info("[{}] Cleanup offload for ledgerId {} uuid {} because of the reason {}.",
+                name, ledgerId, uuid.toString(), cleanupReason);
+        Map<String, String> metadataMap = new HashMap();
+        metadataMap.putAll(offloadDriverMetadata);
+        metadataMap.put("ManagedLedgerName", name);
+
+        return Retries.run(Backoff.exponentialJittered(TimeUnit.SECONDS.toMillis(1),
+                        TimeUnit.SECONDS.toHours(1)).limit(10),
+                Retries.NonFatalPredicate,
+                () -> mlConfig.getLedgerOffloader().deleteOffloaded(ledgerId, uuid, metadataMap),
+                scheduledExecutor, name).whenComplete((ignored, exception) -> {
+            if (exception != null) {
+                log.warn("[{}] Error cleaning up offload for {}, (cleanup reason: {})",
+                        name, ledgerId, cleanupReason, exception);
+            }
+        });
+    }
+
     private void deleteManagedLedgerData(BookKeeper bkc, String managedLedgerName, ManagedLedgerInfo info,
-            DeleteLedgerCallback callback, Object ctx) {
+                                         CompletableFuture<ManagedLedgerConfig> mlConfigFuture,
+                                         DeleteLedgerCallback callback, Object ctx) {
+        final CompletableFuture<Map<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo>>
+                ledgerInfosFuture = new CompletableFuture<>();
+        store.getManagedLedgerInfo(managedLedgerName, false, null,

Review Comment:
   Why do we read `ManagedLedgerInfo` from store instead of just use `info` in the parameter list? 



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##########
@@ -1072,28 +1074,31 @@ public CompletableFuture<Void> deleteTopic(String topic, boolean forceDelete) {
         }
 
         CompletableFuture<Void> future = new CompletableFuture<>();
+        log.info("Topic {} could not load, try to delete from metadata", topic);

Review Comment:
   Duplicate with debug log in L1067?



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java:
##########
@@ -873,23 +891,101 @@ public void getInfoFailed(ManagedLedgerException exception, Object ctx) {
         }, ctx);
     }
 
+    private CompletableFuture<Void> cleanupOffloaded(long ledgerId, UUID uuid, ManagedLedgerConfig mlConfig,

Review Comment:
   Can reuse the code with `ManagedLedgerImpl#cleanupOffloaded`, it's just the same.
   Maybe put the codes in `OffloaderUtils`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org