You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/07/14 02:13:55 UTC

[GitHub] [pulsar] horizonzy opened a new pull request, #16590: [PIP-186] Introduce two phase deletion protocol based on system topic

horizonzy opened a new pull request, #16590:
URL: https://github.com/apache/pulsar/pull/16590

   Original issue: #13238
   PIP Issue: #16569
   
   
   ### Motivation
   In current ledger deletion, we divided it into two separate steps. It happens in ManagedLedger and ManagedCursor.
   Remove all the waiting to delete ledgers from the ledger list and update the newest ledger list into a meta store.
   In the meta store update callback operation, delete the waiting to delete ledgers from storage systems, such as BookKeeper or Tiered storage.
   
   Due to the separate step, we can’t ensure the ledger deletion transaction. If the first step succeeds and the second step fails, it will lead to ledgers that can't be deleted from the storage system forever. The second step may fail by broker restart or storage system deletion failed.
   
   In our customer’s environment, we have found many orphan ledgers cause by the above reason.
   
   ### Modifications
   Introduce two phase deletion to solve the problem.
   
   Need to update docs? 
   
   - [ ] `doc-required` 
   (Your PR needs to update docs and you will update later)
     
   - [ ] `doc-not-needed` 
   (Please explain why)
     
   - [ ] `doc` 
   (Your PR contains doc changes)
   
   - [ ] `doc-complete`
   (Docs have been already added)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] github-actions[bot] commented on pull request #16590: [PIP-186] Introduce two phase deletion protocol based on system topic

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #16590:
URL: https://github.com/apache/pulsar/pull/16590#issuecomment-1183905605

   @horizonzy Please provide a correct documentation label for your PR.
   Instructions see [Pulsar Documentation Label Guide](https://docs.google.com/document/d/1Qw7LHQdXWBW9t2-r-A7QdFDBwmZh6ytB4guwMoXHqc0).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] horizonzy commented on a diff in pull request #16590: [PIP-186] Introduce two phase deletion protocol based on system topic

Posted by GitBox <gi...@apache.org>.
horizonzy commented on code in PR #16590:
URL: https://github.com/apache/pulsar/pull/16590#discussion_r963051439


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2571,70 +2648,107 @@ void internalTrimLedgers(boolean isTruncate, CompletableFuture<?> promise) {
             advanceCursorsIfNecessary(ledgersToDelete);
 
             PositionImpl currentLastConfirmedEntry = lastConfirmedEntry;
-            // Update metadata
-            for (LedgerInfo ls : ledgersToDelete) {
-                if (currentLastConfirmedEntry != null && ls.getLedgerId() == currentLastConfirmedEntry.getLedgerId()) {
-                    // this info is relevant because the lastMessageId won't be available anymore
-                    log.info("[{}] Ledger {} contains the current last confirmed entry {}, and it is going to be "
-                             + "deleted", name, ls.getLedgerId(), currentLastConfirmedEntry);
-                }
 
-                invalidateReadHandle(ls.getLedgerId());
-
-                ledgers.remove(ls.getLedgerId());
-                NUMBER_OF_ENTRIES_UPDATER.addAndGet(this, -ls.getEntries());
-                TOTAL_SIZE_UPDATER.addAndGet(this, -ls.getSize());
-
-                entryCache.invalidateAllEntries(ls.getLedgerId());
-            }
-            for (LedgerInfo ls : offloadedLedgersToDelete) {
-                LedgerInfo.Builder newInfoBuilder = ls.toBuilder();
-                newInfoBuilder.getOffloadContextBuilder().setBookkeeperDeleted(true);
-                String driverName = OffloadUtils.getOffloadDriverName(ls,
-                        config.getLedgerOffloader().getOffloadDriverName());
-                Map<String, String> driverMetadata = OffloadUtils.getOffloadDriverMetadata(ls,
-                        config.getLedgerOffloader().getOffloadDriverMetadata());
-                OffloadUtils.setOffloadDriverMetadata(newInfoBuilder, driverName, driverMetadata);
-                ledgers.put(ls.getLedgerId(), newInfoBuilder.build());
-            }
+            // Update metadata
+            // Mark deletable ledgers
+            Set<Long> deletableLedgers =
+                    Stream.concat(ledgersToDelete.stream().filter(ls -> !ls.getOffloadContext().getBookkeeperDeleted()),
+                            offloadedLedgersToDelete.stream()).map(LedgerInfo::getLedgerId).collect(Collectors.toSet());
+
+            // Mark deletable offloaded ledgers
+            Set<Long> deletableOffloadedLedgers = ledgersToDelete.stream()
+                    .filter(ls -> ls.getOffloadContext().hasUidMsb())
+                    .map(LedgerInfo::getLedgerId).collect(Collectors.toSet());
+
+            CompletableFuture<?> appendDeleteLedgerFuture =
+                    appendPendingDeleteLedger(deletableLedgers, deletableOffloadedLedgers);
+            appendDeleteLedgerFuture.thenAccept(ignore -> {
+                believedDeleteIds.addAll(deletableLedgers);

Review Comment:
   Nice catch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] github-actions[bot] commented on pull request #16590: [WIP][PIP-186] Introduce two phase deletion protocol based on system topic

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #16590:
URL: https://github.com/apache/pulsar/pull/16590#issuecomment-1183954132

   @horizonzy Please provide a correct documentation label for your PR.
   Instructions see [Pulsar Documentation Label Guide](https://docs.google.com/document/d/1Qw7LHQdXWBW9t2-r-A7QdFDBwmZh6ytB4guwMoXHqc0).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] gaozhangmin commented on a diff in pull request #16590: [PIP-186] Introduce two phase deletion protocol based on system topic

Posted by GitBox <gi...@apache.org>.
gaozhangmin commented on code in PR #16590:
URL: https://github.com/apache/pulsar/pull/16590#discussion_r920716462


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/LedgerDeletionSystemTopicClient.java:
##########
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.systopic;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.mledger.deletion.PendingDeleteLedgerInfo;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.DeadLetterPolicy;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.impl.ConsumerImpl;
+import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
+import org.apache.pulsar.common.naming.SystemTopicNames;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * System topic for ledger deletion.
+ */
+public class LedgerDeletionSystemTopicClient extends SystemTopicClientBase<PendingDeleteLedgerInfo> {
+
+    private final int sendDelaySeconds;
+
+    private final int reconsumeLaterSeconds;
+
+    public LedgerDeletionSystemTopicClient(PulsarClient client, TopicName topicName, int sendDelaySeconds,
+                                           int reconsumeLaterSeconds) {
+        super(client, topicName);
+        this.sendDelaySeconds = sendDelaySeconds;
+        this.reconsumeLaterSeconds = reconsumeLaterSeconds;
+    }
+
+    @Override
+    protected CompletableFuture<Writer<PendingDeleteLedgerInfo>> newWriterAsyncInternal() {
+        return client.newProducer(Schema.AVRO(PendingDeleteLedgerInfo.class))
+                .topic(topicName.toString())
+                .enableBatching(false)
+                .createAsync().thenCompose(producer -> {
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] A new writer is created", topicName);
+                    }
+                    return CompletableFuture.completedFuture(
+                            new PendingDeleteLedgerWriter(producer, LedgerDeletionSystemTopicClient.this,
+                                    sendDelaySeconds));
+                });
+    }
+
+    @Override
+    protected CompletableFuture<Reader<PendingDeleteLedgerInfo>> newReaderAsyncInternal() {
+        // TODO: 2022/7/7 死信队列发消息batch情况
+        return client.newConsumer(Schema.AVRO(PendingDeleteLedgerInfo.class))

Review Comment:
   Here, you should enable retry, If you want to use `reconsumeLater` .



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] github-actions[bot] commented on pull request #16590: [PIP-186] Introduce two phase deletion protocol based on system topic

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #16590:
URL: https://github.com/apache/pulsar/pull/16590#issuecomment-1320751824

   The pr had no activity for 30 days, mark with Stale label.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] horizonzy commented on a diff in pull request #16590: [PIP-186] Introduce two phase deletion protocol based on system topic

Posted by GitBox <gi...@apache.org>.
horizonzy commented on code in PR #16590:
URL: https://github.com/apache/pulsar/pull/16590#discussion_r947000060


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2570,70 +2651,107 @@ void internalTrimLedgers(boolean isTruncate, CompletableFuture<?> promise) {
             advanceCursorsIfNecessary(ledgersToDelete);
 
             PositionImpl currentLastConfirmedEntry = lastConfirmedEntry;
-            // Update metadata
-            for (LedgerInfo ls : ledgersToDelete) {
-                if (currentLastConfirmedEntry != null && ls.getLedgerId() == currentLastConfirmedEntry.getLedgerId()) {
-                    // this info is relevant because the lastMessageId won't be available anymore
-                    log.info("[{}] Ledger {} contains the current last confirmed entry {}, and it is going to be "
-                             + "deleted", name, ls.getLedgerId(), currentLastConfirmedEntry);
-                }
-
-                invalidateReadHandle(ls.getLedgerId());
-
-                ledgers.remove(ls.getLedgerId());
-                NUMBER_OF_ENTRIES_UPDATER.addAndGet(this, -ls.getEntries());
-                TOTAL_SIZE_UPDATER.addAndGet(this, -ls.getSize());
 
-                entryCache.invalidateAllEntries(ls.getLedgerId());
-            }
-            for (LedgerInfo ls : offloadedLedgersToDelete) {
-                LedgerInfo.Builder newInfoBuilder = ls.toBuilder();
-                newInfoBuilder.getOffloadContextBuilder().setBookkeeperDeleted(true);
-                String driverName = OffloadUtils.getOffloadDriverName(ls,
-                        config.getLedgerOffloader().getOffloadDriverName());
-                Map<String, String> driverMetadata = OffloadUtils.getOffloadDriverMetadata(ls,
-                        config.getLedgerOffloader().getOffloadDriverMetadata());
-                OffloadUtils.setOffloadDriverMetadata(newInfoBuilder, driverName, driverMetadata);
-                ledgers.put(ls.getLedgerId(), newInfoBuilder.build());
-            }
+            // Update metadata
+            // Mark deletable ledgers
+            Set<Long> deletableLedgers =
+                    Stream.concat(ledgersToDelete.stream().filter(ls -> !ls.getOffloadContext().getBookkeeperDeleted()),
+                            offloadedLedgersToDelete.stream()).map(LedgerInfo::getLedgerId).collect(Collectors.toSet());
+
+            // Mark deletable offloaded ledgers
+            Set<Long> deletableOffloadedLedgers = ledgersToDelete.stream()
+                    .filter(ls -> ls.getOffloadContext().hasUidMsb())
+                    .map(LedgerInfo::getLedgerId).collect(Collectors.toSet());
+
+            CompletableFuture<?> appendDeleteLedgerFuture =
+                    appendPendingDeleteLedger(deletableLedgers, deletableOffloadedLedgers);
+            appendDeleteLedgerFuture.thenAccept(ignore -> {
+                believedDeleteIds.addAll(deletableLedgers);
+                for (LedgerInfo ls : ledgersToDelete) {
+                    if (currentLastConfirmedEntry != null
+                            && ls.getLedgerId() == currentLastConfirmedEntry.getLedgerId()) {
+                        // this info is relevant because the lastMessageId won't be available anymore
+                        log.info("[{}] Ledger {} contains the current last confirmed entry {}, and it is going to be "
+                                + "deleted", name, ls.getLedgerId(), currentLastConfirmedEntry);
+                    }
+                    invalidateReadHandle(ls.getLedgerId());
+                    ledgers.remove(ls.getLedgerId());
+                    entryCache.invalidateAllEntries(ls.getLedgerId());
 
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] Updating of ledgers list after trimming", name);
-            }
+                    NUMBER_OF_ENTRIES_UPDATER.addAndGet(this, -ls.getEntries());
+                    TOTAL_SIZE_UPDATER.addAndGet(this, -ls.getSize());
+                }
+                for (LedgerInfo ls : offloadedLedgersToDelete) {
+                    LedgerInfo.Builder newInfoBuilder = ls.toBuilder();
+                    newInfoBuilder.getOffloadContextBuilder().setBookkeeperDeleted(true);
+                    String driverName = OffloadUtils.getOffloadDriverName(ls,
+                            config.getLedgerOffloader().getOffloadDriverName());
+                    Map<String, String> driverMetadata = OffloadUtils.getOffloadDriverMetadata(ls,
+                            config.getLedgerOffloader().getOffloadDriverMetadata());
+                    OffloadUtils.setOffloadDriverMetadata(newInfoBuilder, driverName, driverMetadata);
+                    ledgers.put(ls.getLedgerId(), newInfoBuilder.build());
+                }
 
-            store.asyncUpdateLedgerIds(name, getManagedLedgerInfo(), ledgersStat, new MetaStoreCallback<Void>() {
-                @Override
-                public void operationComplete(Void result, Stat stat) {
-                    log.info("[{}] End TrimConsumedLedgers. ledgers={} totalSize={}", name, ledgers.size(),
-                            TOTAL_SIZE_UPDATER.get(ManagedLedgerImpl.this));
-                    ledgersStat = stat;
-                    metadataMutex.unlock();
-                    trimmerMutex.unlock();
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Updating of ledgers list after trimming", name);
+                }
 
-                    for (LedgerInfo ls : ledgersToDelete) {
-                        log.info("[{}] Removing ledger {} - size: {}", name, ls.getLedgerId(), ls.getSize());
-                        asyncDeleteLedger(ls.getLedgerId(), ls);
-                    }
-                    for (LedgerInfo ls : offloadedLedgersToDelete) {
-                        log.info("[{}] Deleting offloaded ledger {} from bookkeeper - size: {}", name, ls.getLedgerId(),
-                                ls.getSize());
-                        asyncDeleteLedgerFromBookKeeper(ls.getLedgerId());
+                store.asyncUpdateLedgerIds(name, getManagedLedgerInfo(), ledgersStat, new MetaStoreCallback<Void>() {
+                    @Override
+                    public void operationComplete(Void result, Stat stat) {
+                        log.info("[{}] End TrimConsumedLedgers. ledgers={} totalSize={}", name, ledgers.size(),
+                                TOTAL_SIZE_UPDATER.get(ManagedLedgerImpl.this));
+                        ledgersStat = stat;
+                        metadataMutex.unlock();
+                        trimmerMutex.unlock();
+                        if (ledgerDeletionService instanceof LedgerDeletionService.LedgerDeletionServiceDisable) {

Review Comment:
   good idea.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] horizonzy commented on a diff in pull request #16590: [WIP][PIP-186] Introduce two phase deletion protocol based on system topic

Posted by GitBox <gi...@apache.org>.
horizonzy commented on code in PR #16590:
URL: https://github.com/apache/pulsar/pull/16590#discussion_r920717343


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/LedgerDeletionSystemTopicClient.java:
##########
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.systopic;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.mledger.deletion.PendingDeleteLedgerInfo;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.DeadLetterPolicy;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.impl.ConsumerImpl;
+import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
+import org.apache.pulsar.common.naming.SystemTopicNames;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * System topic for ledger deletion.
+ */
+public class LedgerDeletionSystemTopicClient extends SystemTopicClientBase<PendingDeleteLedgerInfo> {
+
+    private final int sendDelaySeconds;
+
+    private final int reconsumeLaterSeconds;
+
+    public LedgerDeletionSystemTopicClient(PulsarClient client, TopicName topicName, int sendDelaySeconds,
+                                           int reconsumeLaterSeconds) {
+        super(client, topicName);
+        this.sendDelaySeconds = sendDelaySeconds;
+        this.reconsumeLaterSeconds = reconsumeLaterSeconds;
+    }
+
+    @Override
+    protected CompletableFuture<Writer<PendingDeleteLedgerInfo>> newWriterAsyncInternal() {
+        return client.newProducer(Schema.AVRO(PendingDeleteLedgerInfo.class))
+                .topic(topicName.toString())
+                .enableBatching(false)
+                .createAsync().thenCompose(producer -> {
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] A new writer is created", topicName);
+                    }
+                    return CompletableFuture.completedFuture(
+                            new PendingDeleteLedgerWriter(producer, LedgerDeletionSystemTopicClient.this,
+                                    sendDelaySeconds));
+                });
+    }
+
+    @Override
+    protected CompletableFuture<Reader<PendingDeleteLedgerInfo>> newReaderAsyncInternal() {
+        // TODO: 2022/7/7 死信队列发消息batch情况
+        return client.newConsumer(Schema.AVRO(PendingDeleteLedgerInfo.class))

Review Comment:
   thanks reminder



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] horizonzy commented on a diff in pull request #16590: [PIP-186] Introduce two phase deletion protocol based on system topic

Posted by GitBox <gi...@apache.org>.
horizonzy commented on code in PR #16590:
URL: https://github.com/apache/pulsar/pull/16590#discussion_r963045131


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -1039,6 +1044,126 @@ private void internalUnloadTransactionCoordinatorAsync(AsyncResponse asyncRespon
                 });
     }
 
+    protected void internalDeleteLedger(AsyncResponse asyncResponse, boolean authoritative,
+                                        DeleteLedgerPayload deleteLedgerPayload) {
+        validateTopicOwnership(topicName, authoritative);
+
+        CompletableFuture<Void> ret;
+        if (topicName.isGlobal()) {
+            ret = validateGlobalNamespaceOwnershipAsync(namespaceName);
+        } else {
+            ret = CompletableFuture.completedFuture(null);
+        }
+        ret.thenAccept(__ -> {
+            long ledgerId = deleteLedgerPayload.getLedgerId();
+            log.info("[{}][{}] received delete ledger: {}", clientAppId(), topicName, ledgerId);
+            validateTopicOwnershipAsync(topicName, authoritative)
+                    .thenCompose(ignore ->
+                            //Is need to check delete_ledger operation?
+                            validateTopicOperationAsync(topicName, TopicOperation.DELETE_LEDGER))
+                    .thenCompose(ignore -> getTopicReferenceAsync(topicName))
+                    .thenCompose(topic -> {
+                        CompletableFuture<Void> future = new CompletableFuture<>();
+                        if (topic == null) {
+                            asyncResponse.resume(new RestException(Status.NOT_FOUND,
+                                    getTopicNotFoundErrorMessage(topicName.toString())));
+                            future.complete(null);
+                            return future;
+                        }
+                        ManagedLedger managedLedger = ((PersistentTopic) topic).getManagedLedger();
+                        DeleteLedgerPayload.OffloadContext context = deleteLedgerPayload.getOffloadContext();
+                        LedgerType ledgerType = LedgerType.valueOf(deleteLedgerPayload.getLedgerType());
+                        if (LedgerType.LEDGER == ledgerType) {
+                            managedLedger.asyncDeleteLedger(topicName.getPersistenceNamingEncoding(), ledgerId,
+                                    ledgerType, null).whenComplete((res, ex) -> {
+                                        if (ex != null) {
+                                            if (ex instanceof PendingDeleteLedgerInvalidException) {
+                                                if (log.isDebugEnabled()) {
+                                                    log.debug("[{}][{}] Received invalid pending delete ledger {},"
+                                                                    + " invalid reason: {}", clientAppId(), topicName,
+                                                            ledgerId, ex.getMessage());
+                                                }
+                                                future.complete(null);
+                                                return;
+                                            }
+                                            future.completeExceptionally(ex);
+                                            return;
+                                        }
+                                        future.complete(null);
+                                    });
+                        } else if (LedgerType.OFFLOAD_LEDGER == ledgerType) {

Review Comment:
   Yes, it can be.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] horizonzy commented on a diff in pull request #16590: [PIP-186] Introduce two phase deletion protocol based on system topic

Posted by GitBox <gi...@apache.org>.
horizonzy commented on code in PR #16590:
URL: https://github.com/apache/pulsar/pull/16590#discussion_r963050508


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedLedgerDeletionService.java:
##########
@@ -0,0 +1,583 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import static org.apache.bookkeeper.mledger.util.Errors.isNoSuchLedgerExistsException;
+import com.google.common.collect.Maps;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.mledger.LedgerOffloader;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.deletion.LedgerComponent;
+import org.apache.bookkeeper.mledger.deletion.LedgerDeletionService;
+import org.apache.bookkeeper.mledger.deletion.LedgerType;
+import org.apache.bookkeeper.mledger.deletion.PendingDeleteLedgerInfo;
+import org.apache.bookkeeper.mledger.deletion.PendingDeleteLedgerInvalidException;
+import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.offload.OffloadUtils;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
+import org.apache.bookkeeper.stats.NullStatsProvider;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stats.StatsProvider;
+import org.apache.bookkeeper.util.MathUtils;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.stats.prometheus.metrics.PrometheusMetricsProvider;
+import org.apache.pulsar.broker.systopic.LedgerDeletionSystemTopicClient;
+import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
+import org.apache.pulsar.broker.systopic.SystemTopicClient;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.common.events.EventType;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.SystemTopicNames;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
+import org.apache.pulsar.common.protocol.topic.DeleteLedgerPayload;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SystemTopicBasedLedgerDeletionService implements LedgerDeletionService {
+
+    private final NamespaceEventsSystemTopicFactory namespaceEventsSystemTopicFactory;
+
+    private static final Logger log = LoggerFactory.getLogger(SystemTopicBasedLedgerDeletionService.class);
+
+    private final PulsarAdmin pulsarAdmin;
+
+    private final int ledgerDeletionParallelism;
+
+    private final ServiceConfiguration serviceConfiguration;
+
+    private final Map<OffloadPoliciesImpl, LedgerOffloader> offloaderMap = new ConcurrentHashMap<>();
+
+    private StatsProvider statsProvider = new NullStatsProvider();
+
+    private OpStatsLogger deleteLedgerOpLogger;
+
+    private OpStatsLogger deleteOffloadLedgerOpLogger;
+
+    private final BookKeeper bookKeeper;
+
+    private final PulsarService pulsarService;
+
+    private SystemTopicClient<PendingDeleteLedgerInfo> ledgerDeletionTopicClient;
+
+    private transient CompletableFuture<SystemTopicClient.Reader<PendingDeleteLedgerInfo>> readerFuture;
+
+    private transient CompletableFuture<SystemTopicClient.Writer<PendingDeleteLedgerInfo>> writerFuture;
+
+    public SystemTopicBasedLedgerDeletionService(PulsarService pulsarService, ServiceConfiguration serviceConfiguration)
+            throws PulsarServerException {
+        this.namespaceEventsSystemTopicFactory = new NamespaceEventsSystemTopicFactory(pulsarService.getClient());
+        this.pulsarService = pulsarService;
+        this.pulsarAdmin = pulsarService.getAdminClient();
+        this.bookKeeper = pulsarService.getBookKeeperClient();
+        this.serviceConfiguration = serviceConfiguration;
+        this.ledgerDeletionParallelism = serviceConfiguration.getLedgerDeletionParallelismOfTopicTwoPhaseDeletion();
+    }
+
+    private SystemTopicClient<PendingDeleteLedgerInfo> getLedgerDeletionTopicClient() throws PulsarClientException {
+        TopicName ledgerDeletionSystemTopic =
+                NamespaceEventsSystemTopicFactory.getSystemTopicName(NamespaceName.SYSTEM_NAMESPACE,
+                        EventType.LEDGER_DELETION);
+        if (ledgerDeletionSystemTopic == null) {
+            throw new PulsarClientException.InvalidTopicNameException(
+                    "Can't create SystemTopicBasedLedgerDeletionService, " + "because the topicName is null!");
+        }
+        return namespaceEventsSystemTopicFactory.createLedgerDeletionSystemTopicClient(ledgerDeletionSystemTopic,
+                serviceConfiguration.getSendDelayOfTopicTwoPhaseDeletionInSeconds(),
+                serviceConfiguration.getReconsumeLaterOfTopicTwoPhaseDeletionInSeconds());
+    }
+
+    @Override
+    public void start() throws PulsarClientException, PulsarAdminException {
+        this.ledgerDeletionTopicClient = getLedgerDeletionTopicClient();
+        initStatsLogger();
+        initLedgerDeletionSystemTopic();
+    }
+
+    private void initStatsLogger() {
+        Configuration configuration = new ClientConfiguration();
+        if (serviceConfiguration.isBookkeeperClientExposeStatsToPrometheus()) {
+            configuration.addProperty(PrometheusMetricsProvider.PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS,
+                    serviceConfiguration.getManagedLedgerPrometheusStatsLatencyRolloverSeconds());
+            configuration.addProperty(PrometheusMetricsProvider.CLUSTER_NAME, serviceConfiguration.getClusterName());
+            this.statsProvider = new PrometheusMetricsProvider();
+        }
+        this.statsProvider.start(configuration);
+        StatsLogger statsLogger = statsProvider.getStatsLogger("pulsar_ledger_deletion");
+        this.deleteLedgerOpLogger = statsLogger.getOpStatsLogger("delete_ledger");
+        this.deleteOffloadLedgerOpLogger = statsLogger.getOpStatsLogger("delete_offload_ledger");
+    }
+
+    private CompletableFuture<Void> initSystemTopic(String topicName, int partition) {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        doCreateSystemTopic(future, topicName, partition);
+        return future;
+    }
+
+    private void doCreateSystemTopic(CompletableFuture<Void> future, String topicName, int partition) {
+        this.pulsarAdmin.topics()
+                .createPartitionedTopicAsync(topicName, partition).whenComplete((res, e) -> {
+                    if (e != null && !(e instanceof PulsarAdminException.ConflictException)) {
+                        log.error("Initial system topic " + topicName + "failed.", e);
+                        doCreateSystemTopic(future, topicName, partition);
+                        return;
+                    }
+                    future.complete(null);
+                });
+    }
+
+    private void initLedgerDeletionSystemTopic() {
+        List<CompletableFuture<Void>> futures = new ArrayList<>();
+
+        futures.add(initSystemTopic(SystemTopicNames.LEDGER_DELETION_TOPIC.getPartitionedTopicName(),
+                this.ledgerDeletionParallelism));
+        futures.add(initSystemTopic(SystemTopicNames.LEDGER_DELETION_RETRY_TOPIC.getPartitionedTopicName(), 1));
+        futures.add(initSystemTopic(SystemTopicNames.LEDGER_DELETION_DLQ_TOPIC.getPartitionedTopicName(), 1));
+
+        FutureUtil.waitForAll(futures).whenComplete((res, e) -> {
+            initReaderFuture();
+            initWriterFuture();
+        });
+    }
+
+    private void readMorePendingDeleteLedger(LedgerDeletionSystemTopicClient.PendingDeleteLedgerReader reader) {
+        reader.readNextAsync().whenComplete((msg, ex) -> {
+            if (ex == null) {
+                handleMessage(reader, msg).exceptionally(e -> {
+                    log.warn("Delete ledger {} failed.", msg.getValue(), e);
+                    return null;
+                });
+                readMorePendingDeleteLedger(reader);
+            } else {
+                Throwable cause = FutureUtil.unwrapCompletionException(ex);
+                if (cause instanceof PulsarClientException.AlreadyClosedException) {
+                    log.error("Read more pending delete ledger exception, close the read now!", ex);
+                    reader.closeAsync();
+                    initReaderFuture();
+                } else {
+                    log.warn("Read more pending delete ledger exception, read again.", ex);
+                    readMorePendingDeleteLedger(reader);
+                }
+            }
+        });
+    }
+
+    private void initReaderFuture() {
+        this.readerFuture = ledgerDeletionTopicClient.newReaderAsync().whenComplete((reader, ex) -> {
+            if (ex != null) {
+                log.error("Failed to create reader on ledger deletion system topic", ex);
+                initReaderFuture();
+            } else {
+                readMorePendingDeleteLedger((LedgerDeletionSystemTopicClient.PendingDeleteLedgerReader) reader);
+            }
+        });
+    }
+
+    private void initWriterFuture() {
+        this.writerFuture = ledgerDeletionTopicClient.newWriterAsync().whenComplete((writer, ex) -> {
+            if (ex != null) {
+                log.error("Failed to create writer on ledger deletion system topic", ex);
+                initWriterFuture();
+            }
+        });
+    }
+
+    private String tuneTopicName(String topicName) {
+        if (topicName.contains("/" + TopicDomain.persistent.value())) {
+            return topicName.replaceFirst("/" + TopicDomain.persistent.value(), "");
+        }
+        return topicName;
+    }
+
+    @Override
+    public CompletableFuture<?> appendPendingDeleteLedger(String topicName, long ledgerId, LedgerInfo context,
+                                                          LedgerComponent component, LedgerType type,
+                                                          Map<String, String> properties) {
+        topicName = tuneTopicName(topicName);
+        PendingDeleteLedgerInfo pendingDeleteLedger = null;
+        if (LedgerType.LEDGER == type) {
+            pendingDeleteLedger =
+                    new PendingDeleteLedgerInfo(topicName, component, type, ledgerId, context, properties);
+        } else if (LedgerType.OFFLOAD_LEDGER == type) {
+            if (!context.getOffloadContext().hasUidMsb()) {
+                CompletableFuture<?> future = new CompletableFuture<>();
+                future.completeExceptionally(
+                        new IllegalArgumentException("The ledger " + ledgerId + " didn't offload."));
+                return future;
+            }
+            pendingDeleteLedger =
+                    new PendingDeleteLedgerInfo(topicName, component, type, ledgerId, context, properties);
+        }
+        return sendMessage(pendingDeleteLedger);
+    }
+
+    @Override
+    public void close() throws Exception {
+        asyncClose().get();
+    }
+
+    @Override
+    public CompletableFuture<?> asyncClose() {
+        if (readerFuture != null && !readerFuture.isCompletedExceptionally()) {
+            return readerFuture.thenCompose(SystemTopicClient.Reader::closeAsync);
+        }
+        return CompletableFuture.completedFuture(null);
+    }
+
+    @Override
+    public boolean isTopicTwoPhaseDeletionEnabled() {
+        return true;
+    }
+
+    private CompletableFuture<?> handleMessage(LedgerDeletionSystemTopicClient.PendingDeleteLedgerReader reader,
+                                               Message<PendingDeleteLedgerInfo> message) {
+        CompletableFuture<?> future = new CompletableFuture<>();
+        deleteInBroker(message).whenComplete((res, ex) -> {
+            if (ex != null) {
+                if (ex instanceof PulsarAdminException.NotFoundException) {
+                    deleteLocally(message).whenComplete((res1, ex1) -> {
+                        if (ex1 != null) {
+                            if (ex1 instanceof PendingDeleteLedgerInvalidException) {
+                                if (log.isDebugEnabled()) {
+                                    log.debug("Received invalid pending delete ledger {}, invalid reason: {}",
+                                            message.getValue().getLedgerId(), ex1.getMessage());
+                                }
+                                reader.ackMessageAsync(message);
+                                future.complete(null);
+                                return;
+                            }
+                            reader.reconsumeLaterAsync(message);
+                            future.completeExceptionally(ex1);
+                        }
+                        reader.ackMessageAsync(message);
+                        future.complete(null);
+                    });
+                } else if (ex instanceof PendingDeleteLedgerInvalidException) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Received invalid pending delete ledger {}, invalid reason: {}",
+                                message.getValue().getLedgerId(), ex.getMessage());
+                    }
+                    reader.ackMessageAsync(message);
+                    future.complete(null);
+                } else if (ex instanceof PulsarAdminException.ServerSideErrorException) {
+

Review Comment:
   It needn't, The origin logic is: in client side, to check the ServerSideErrorException if contains `PendingDeleteLedgerInvalidException` info, it contains it, means that the pending delete message is invalid, we also need ack it. 
   But now, at server side, we catch `PendingDeleteLedgerInvalidException`, make it return success response. So the client side needn't to check the error response info.
   
   I will delete this logic branch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] horizonzy commented on a diff in pull request #16590: [PIP-186] Introduce two phase deletion protocol based on system topic

Posted by GitBox <gi...@apache.org>.
horizonzy commented on code in PR #16590:
URL: https://github.com/apache/pulsar/pull/16590#discussion_r963050508


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedLedgerDeletionService.java:
##########
@@ -0,0 +1,583 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import static org.apache.bookkeeper.mledger.util.Errors.isNoSuchLedgerExistsException;
+import com.google.common.collect.Maps;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.mledger.LedgerOffloader;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.deletion.LedgerComponent;
+import org.apache.bookkeeper.mledger.deletion.LedgerDeletionService;
+import org.apache.bookkeeper.mledger.deletion.LedgerType;
+import org.apache.bookkeeper.mledger.deletion.PendingDeleteLedgerInfo;
+import org.apache.bookkeeper.mledger.deletion.PendingDeleteLedgerInvalidException;
+import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.offload.OffloadUtils;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
+import org.apache.bookkeeper.stats.NullStatsProvider;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stats.StatsProvider;
+import org.apache.bookkeeper.util.MathUtils;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.stats.prometheus.metrics.PrometheusMetricsProvider;
+import org.apache.pulsar.broker.systopic.LedgerDeletionSystemTopicClient;
+import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
+import org.apache.pulsar.broker.systopic.SystemTopicClient;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.common.events.EventType;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.SystemTopicNames;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
+import org.apache.pulsar.common.protocol.topic.DeleteLedgerPayload;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SystemTopicBasedLedgerDeletionService implements LedgerDeletionService {
+
+    private final NamespaceEventsSystemTopicFactory namespaceEventsSystemTopicFactory;
+
+    private static final Logger log = LoggerFactory.getLogger(SystemTopicBasedLedgerDeletionService.class);
+
+    private final PulsarAdmin pulsarAdmin;
+
+    private final int ledgerDeletionParallelism;
+
+    private final ServiceConfiguration serviceConfiguration;
+
+    private final Map<OffloadPoliciesImpl, LedgerOffloader> offloaderMap = new ConcurrentHashMap<>();
+
+    private StatsProvider statsProvider = new NullStatsProvider();
+
+    private OpStatsLogger deleteLedgerOpLogger;
+
+    private OpStatsLogger deleteOffloadLedgerOpLogger;
+
+    private final BookKeeper bookKeeper;
+
+    private final PulsarService pulsarService;
+
+    private SystemTopicClient<PendingDeleteLedgerInfo> ledgerDeletionTopicClient;
+
+    private transient CompletableFuture<SystemTopicClient.Reader<PendingDeleteLedgerInfo>> readerFuture;
+
+    private transient CompletableFuture<SystemTopicClient.Writer<PendingDeleteLedgerInfo>> writerFuture;
+
+    public SystemTopicBasedLedgerDeletionService(PulsarService pulsarService, ServiceConfiguration serviceConfiguration)
+            throws PulsarServerException {
+        this.namespaceEventsSystemTopicFactory = new NamespaceEventsSystemTopicFactory(pulsarService.getClient());
+        this.pulsarService = pulsarService;
+        this.pulsarAdmin = pulsarService.getAdminClient();
+        this.bookKeeper = pulsarService.getBookKeeperClient();
+        this.serviceConfiguration = serviceConfiguration;
+        this.ledgerDeletionParallelism = serviceConfiguration.getLedgerDeletionParallelismOfTopicTwoPhaseDeletion();
+    }
+
+    private SystemTopicClient<PendingDeleteLedgerInfo> getLedgerDeletionTopicClient() throws PulsarClientException {
+        TopicName ledgerDeletionSystemTopic =
+                NamespaceEventsSystemTopicFactory.getSystemTopicName(NamespaceName.SYSTEM_NAMESPACE,
+                        EventType.LEDGER_DELETION);
+        if (ledgerDeletionSystemTopic == null) {
+            throw new PulsarClientException.InvalidTopicNameException(
+                    "Can't create SystemTopicBasedLedgerDeletionService, " + "because the topicName is null!");
+        }
+        return namespaceEventsSystemTopicFactory.createLedgerDeletionSystemTopicClient(ledgerDeletionSystemTopic,
+                serviceConfiguration.getSendDelayOfTopicTwoPhaseDeletionInSeconds(),
+                serviceConfiguration.getReconsumeLaterOfTopicTwoPhaseDeletionInSeconds());
+    }
+
+    @Override
+    public void start() throws PulsarClientException, PulsarAdminException {
+        this.ledgerDeletionTopicClient = getLedgerDeletionTopicClient();
+        initStatsLogger();
+        initLedgerDeletionSystemTopic();
+    }
+
+    private void initStatsLogger() {
+        Configuration configuration = new ClientConfiguration();
+        if (serviceConfiguration.isBookkeeperClientExposeStatsToPrometheus()) {
+            configuration.addProperty(PrometheusMetricsProvider.PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS,
+                    serviceConfiguration.getManagedLedgerPrometheusStatsLatencyRolloverSeconds());
+            configuration.addProperty(PrometheusMetricsProvider.CLUSTER_NAME, serviceConfiguration.getClusterName());
+            this.statsProvider = new PrometheusMetricsProvider();
+        }
+        this.statsProvider.start(configuration);
+        StatsLogger statsLogger = statsProvider.getStatsLogger("pulsar_ledger_deletion");
+        this.deleteLedgerOpLogger = statsLogger.getOpStatsLogger("delete_ledger");
+        this.deleteOffloadLedgerOpLogger = statsLogger.getOpStatsLogger("delete_offload_ledger");
+    }
+
+    private CompletableFuture<Void> initSystemTopic(String topicName, int partition) {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        doCreateSystemTopic(future, topicName, partition);
+        return future;
+    }
+
+    private void doCreateSystemTopic(CompletableFuture<Void> future, String topicName, int partition) {
+        this.pulsarAdmin.topics()
+                .createPartitionedTopicAsync(topicName, partition).whenComplete((res, e) -> {
+                    if (e != null && !(e instanceof PulsarAdminException.ConflictException)) {
+                        log.error("Initial system topic " + topicName + "failed.", e);
+                        doCreateSystemTopic(future, topicName, partition);
+                        return;
+                    }
+                    future.complete(null);
+                });
+    }
+
+    private void initLedgerDeletionSystemTopic() {
+        List<CompletableFuture<Void>> futures = new ArrayList<>();
+
+        futures.add(initSystemTopic(SystemTopicNames.LEDGER_DELETION_TOPIC.getPartitionedTopicName(),
+                this.ledgerDeletionParallelism));
+        futures.add(initSystemTopic(SystemTopicNames.LEDGER_DELETION_RETRY_TOPIC.getPartitionedTopicName(), 1));
+        futures.add(initSystemTopic(SystemTopicNames.LEDGER_DELETION_DLQ_TOPIC.getPartitionedTopicName(), 1));
+
+        FutureUtil.waitForAll(futures).whenComplete((res, e) -> {
+            initReaderFuture();
+            initWriterFuture();
+        });
+    }
+
+    private void readMorePendingDeleteLedger(LedgerDeletionSystemTopicClient.PendingDeleteLedgerReader reader) {
+        reader.readNextAsync().whenComplete((msg, ex) -> {
+            if (ex == null) {
+                handleMessage(reader, msg).exceptionally(e -> {
+                    log.warn("Delete ledger {} failed.", msg.getValue(), e);
+                    return null;
+                });
+                readMorePendingDeleteLedger(reader);
+            } else {
+                Throwable cause = FutureUtil.unwrapCompletionException(ex);
+                if (cause instanceof PulsarClientException.AlreadyClosedException) {
+                    log.error("Read more pending delete ledger exception, close the read now!", ex);
+                    reader.closeAsync();
+                    initReaderFuture();
+                } else {
+                    log.warn("Read more pending delete ledger exception, read again.", ex);
+                    readMorePendingDeleteLedger(reader);
+                }
+            }
+        });
+    }
+
+    private void initReaderFuture() {
+        this.readerFuture = ledgerDeletionTopicClient.newReaderAsync().whenComplete((reader, ex) -> {
+            if (ex != null) {
+                log.error("Failed to create reader on ledger deletion system topic", ex);
+                initReaderFuture();
+            } else {
+                readMorePendingDeleteLedger((LedgerDeletionSystemTopicClient.PendingDeleteLedgerReader) reader);
+            }
+        });
+    }
+
+    private void initWriterFuture() {
+        this.writerFuture = ledgerDeletionTopicClient.newWriterAsync().whenComplete((writer, ex) -> {
+            if (ex != null) {
+                log.error("Failed to create writer on ledger deletion system topic", ex);
+                initWriterFuture();
+            }
+        });
+    }
+
+    private String tuneTopicName(String topicName) {
+        if (topicName.contains("/" + TopicDomain.persistent.value())) {
+            return topicName.replaceFirst("/" + TopicDomain.persistent.value(), "");
+        }
+        return topicName;
+    }
+
+    @Override
+    public CompletableFuture<?> appendPendingDeleteLedger(String topicName, long ledgerId, LedgerInfo context,
+                                                          LedgerComponent component, LedgerType type,
+                                                          Map<String, String> properties) {
+        topicName = tuneTopicName(topicName);
+        PendingDeleteLedgerInfo pendingDeleteLedger = null;
+        if (LedgerType.LEDGER == type) {
+            pendingDeleteLedger =
+                    new PendingDeleteLedgerInfo(topicName, component, type, ledgerId, context, properties);
+        } else if (LedgerType.OFFLOAD_LEDGER == type) {
+            if (!context.getOffloadContext().hasUidMsb()) {
+                CompletableFuture<?> future = new CompletableFuture<>();
+                future.completeExceptionally(
+                        new IllegalArgumentException("The ledger " + ledgerId + " didn't offload."));
+                return future;
+            }
+            pendingDeleteLedger =
+                    new PendingDeleteLedgerInfo(topicName, component, type, ledgerId, context, properties);
+        }
+        return sendMessage(pendingDeleteLedger);
+    }
+
+    @Override
+    public void close() throws Exception {
+        asyncClose().get();
+    }
+
+    @Override
+    public CompletableFuture<?> asyncClose() {
+        if (readerFuture != null && !readerFuture.isCompletedExceptionally()) {
+            return readerFuture.thenCompose(SystemTopicClient.Reader::closeAsync);
+        }
+        return CompletableFuture.completedFuture(null);
+    }
+
+    @Override
+    public boolean isTopicTwoPhaseDeletionEnabled() {
+        return true;
+    }
+
+    private CompletableFuture<?> handleMessage(LedgerDeletionSystemTopicClient.PendingDeleteLedgerReader reader,
+                                               Message<PendingDeleteLedgerInfo> message) {
+        CompletableFuture<?> future = new CompletableFuture<>();
+        deleteInBroker(message).whenComplete((res, ex) -> {
+            if (ex != null) {
+                if (ex instanceof PulsarAdminException.NotFoundException) {
+                    deleteLocally(message).whenComplete((res1, ex1) -> {
+                        if (ex1 != null) {
+                            if (ex1 instanceof PendingDeleteLedgerInvalidException) {
+                                if (log.isDebugEnabled()) {
+                                    log.debug("Received invalid pending delete ledger {}, invalid reason: {}",
+                                            message.getValue().getLedgerId(), ex1.getMessage());
+                                }
+                                reader.ackMessageAsync(message);
+                                future.complete(null);
+                                return;
+                            }
+                            reader.reconsumeLaterAsync(message);
+                            future.completeExceptionally(ex1);
+                        }
+                        reader.ackMessageAsync(message);
+                        future.complete(null);
+                    });
+                } else if (ex instanceof PendingDeleteLedgerInvalidException) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Received invalid pending delete ledger {}, invalid reason: {}",
+                                message.getValue().getLedgerId(), ex.getMessage());
+                    }
+                    reader.ackMessageAsync(message);
+                    future.complete(null);
+                } else if (ex instanceof PulsarAdminException.ServerSideErrorException) {
+

Review Comment:
   It needn't, The origin logic is: in client side, to check the ServerSideErrorException if contains `PendingDeleteLedgerInvalidException` info, it contains it, means that the pending delete message is invalid, we also need ack it. 
   But now, at server side, we catch `PendingDeleteLedgerInvalidException`, make it return success response. So the client side needn't to check the error response info



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] horizonzy commented on a diff in pull request #16590: [PIP-186] Introduce two phase deletion protocol based on system topic

Posted by GitBox <gi...@apache.org>.
horizonzy commented on code in PR #16590:
URL: https://github.com/apache/pulsar/pull/16590#discussion_r920717343


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/LedgerDeletionSystemTopicClient.java:
##########
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.systopic;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.mledger.deletion.PendingDeleteLedgerInfo;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.DeadLetterPolicy;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.impl.ConsumerImpl;
+import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
+import org.apache.pulsar.common.naming.SystemTopicNames;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * System topic for ledger deletion.
+ */
+public class LedgerDeletionSystemTopicClient extends SystemTopicClientBase<PendingDeleteLedgerInfo> {
+
+    private final int sendDelaySeconds;
+
+    private final int reconsumeLaterSeconds;
+
+    public LedgerDeletionSystemTopicClient(PulsarClient client, TopicName topicName, int sendDelaySeconds,
+                                           int reconsumeLaterSeconds) {
+        super(client, topicName);
+        this.sendDelaySeconds = sendDelaySeconds;
+        this.reconsumeLaterSeconds = reconsumeLaterSeconds;
+    }
+
+    @Override
+    protected CompletableFuture<Writer<PendingDeleteLedgerInfo>> newWriterAsyncInternal() {
+        return client.newProducer(Schema.AVRO(PendingDeleteLedgerInfo.class))
+                .topic(topicName.toString())
+                .enableBatching(false)
+                .createAsync().thenCompose(producer -> {
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] A new writer is created", topicName);
+                    }
+                    return CompletableFuture.completedFuture(
+                            new PendingDeleteLedgerWriter(producer, LedgerDeletionSystemTopicClient.this,
+                                    sendDelaySeconds));
+                });
+    }
+
+    @Override
+    protected CompletableFuture<Reader<PendingDeleteLedgerInfo>> newReaderAsyncInternal() {
+        // TODO: 2022/7/7 死信队列发消息batch情况
+        return client.newConsumer(Schema.AVRO(PendingDeleteLedgerInfo.class))

Review Comment:
   thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] horizonzy commented on a diff in pull request #16590: [PIP-186] Introduce two phase deletion protocol based on system topic

Posted by GitBox <gi...@apache.org>.
horizonzy commented on code in PR #16590:
URL: https://github.com/apache/pulsar/pull/16590#discussion_r947003565


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java:
##########
@@ -1079,6 +1080,34 @@ public void deleteTopic(
                 });
     }
 
+    @POST
+    @Path("/{tenant}/{namespace}/{topic}/ledger/delete")
+    @ApiOperation(value = "Delete a topic.",
+            notes = "The topic cannot be deleted if delete is not forcefully and there's any active "
+                    + "subscription or producer connected to the it. "
+                    + "Force delete ignores connected clients and deletes topic by explicitly closing them.")
+    @ApiResponses(value = {
+            @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
+            @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"),
+            @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Topic does not exist"),
+            @ApiResponse(code = 412, message = "Topic has active producers/subscriptions"),
+            @ApiResponse(code = 500, message = "Internal server error")})
+    public void deleteLedger(

Review Comment:
   If there are 2 brokers, it will exists two system topic consumers(A and B). If the topic onwership is A, when B received pending delete ledger msg, it will send delete ledger command to A. So introduce rest api in server side and PusarAdmin.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] horizonzy commented on a diff in pull request #16590: [PIP-186] Introduce two phase deletion protocol based on system topic

Posted by GitBox <gi...@apache.org>.
horizonzy commented on code in PR #16590:
URL: https://github.com/apache/pulsar/pull/16590#discussion_r963052491


##########
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java:
##########
@@ -2666,6 +2666,36 @@ public class ServiceConfiguration implements PulsarConfiguration {
         )
     private int managedLedgerInactiveLedgerRolloverTimeSeconds = 0;
 
+    @FieldContext(
+            dynamic = true,
+            category = CATEGORY_SERVER,
+            doc = "Using two phase deletion when delete ledger. if true, "
+                    + "LedgerDeletionService will take over ledger deletion. (Default false)"
+    )
+    private boolean topicTwoPhaseDeletionEnabled;

Review Comment:
   yes.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedLedgerDeletionService.java:
##########
@@ -0,0 +1,583 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import static org.apache.bookkeeper.mledger.util.Errors.isNoSuchLedgerExistsException;
+import com.google.common.collect.Maps;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.mledger.LedgerOffloader;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.deletion.LedgerComponent;
+import org.apache.bookkeeper.mledger.deletion.LedgerDeletionService;
+import org.apache.bookkeeper.mledger.deletion.LedgerType;
+import org.apache.bookkeeper.mledger.deletion.PendingDeleteLedgerInfo;
+import org.apache.bookkeeper.mledger.deletion.PendingDeleteLedgerInvalidException;
+import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.offload.OffloadUtils;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
+import org.apache.bookkeeper.stats.NullStatsProvider;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stats.StatsProvider;
+import org.apache.bookkeeper.util.MathUtils;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.stats.prometheus.metrics.PrometheusMetricsProvider;
+import org.apache.pulsar.broker.systopic.LedgerDeletionSystemTopicClient;
+import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
+import org.apache.pulsar.broker.systopic.SystemTopicClient;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.common.events.EventType;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.SystemTopicNames;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
+import org.apache.pulsar.common.protocol.topic.DeleteLedgerPayload;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SystemTopicBasedLedgerDeletionService implements LedgerDeletionService {
+
+    private final NamespaceEventsSystemTopicFactory namespaceEventsSystemTopicFactory;
+
+    private static final Logger log = LoggerFactory.getLogger(SystemTopicBasedLedgerDeletionService.class);
+
+    private final PulsarAdmin pulsarAdmin;
+
+    private final int ledgerDeletionParallelism;
+
+    private final ServiceConfiguration serviceConfiguration;
+
+    private final Map<OffloadPoliciesImpl, LedgerOffloader> offloaderMap = new ConcurrentHashMap<>();
+
+    private StatsProvider statsProvider = new NullStatsProvider();
+
+    private OpStatsLogger deleteLedgerOpLogger;
+
+    private OpStatsLogger deleteOffloadLedgerOpLogger;
+
+    private final BookKeeper bookKeeper;
+
+    private final PulsarService pulsarService;
+
+    private SystemTopicClient<PendingDeleteLedgerInfo> ledgerDeletionTopicClient;
+
+    private transient CompletableFuture<SystemTopicClient.Reader<PendingDeleteLedgerInfo>> readerFuture;
+
+    private transient CompletableFuture<SystemTopicClient.Writer<PendingDeleteLedgerInfo>> writerFuture;
+
+    public SystemTopicBasedLedgerDeletionService(PulsarService pulsarService, ServiceConfiguration serviceConfiguration)
+            throws PulsarServerException {
+        this.namespaceEventsSystemTopicFactory = new NamespaceEventsSystemTopicFactory(pulsarService.getClient());
+        this.pulsarService = pulsarService;
+        this.pulsarAdmin = pulsarService.getAdminClient();
+        this.bookKeeper = pulsarService.getBookKeeperClient();
+        this.serviceConfiguration = serviceConfiguration;
+        this.ledgerDeletionParallelism = serviceConfiguration.getLedgerDeletionParallelismOfTopicTwoPhaseDeletion();
+    }
+
+    private SystemTopicClient<PendingDeleteLedgerInfo> getLedgerDeletionTopicClient() throws PulsarClientException {
+        TopicName ledgerDeletionSystemTopic =
+                NamespaceEventsSystemTopicFactory.getSystemTopicName(NamespaceName.SYSTEM_NAMESPACE,
+                        EventType.LEDGER_DELETION);
+        if (ledgerDeletionSystemTopic == null) {
+            throw new PulsarClientException.InvalidTopicNameException(
+                    "Can't create SystemTopicBasedLedgerDeletionService, " + "because the topicName is null!");
+        }
+        return namespaceEventsSystemTopicFactory.createLedgerDeletionSystemTopicClient(ledgerDeletionSystemTopic,
+                serviceConfiguration.getSendDelayOfTopicTwoPhaseDeletionInSeconds(),
+                serviceConfiguration.getReconsumeLaterOfTopicTwoPhaseDeletionInSeconds());
+    }
+
+    @Override
+    public void start() throws PulsarClientException, PulsarAdminException {
+        this.ledgerDeletionTopicClient = getLedgerDeletionTopicClient();
+        initStatsLogger();
+        initLedgerDeletionSystemTopic();
+    }
+
+    private void initStatsLogger() {
+        Configuration configuration = new ClientConfiguration();
+        if (serviceConfiguration.isBookkeeperClientExposeStatsToPrometheus()) {
+            configuration.addProperty(PrometheusMetricsProvider.PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS,
+                    serviceConfiguration.getManagedLedgerPrometheusStatsLatencyRolloverSeconds());
+            configuration.addProperty(PrometheusMetricsProvider.CLUSTER_NAME, serviceConfiguration.getClusterName());
+            this.statsProvider = new PrometheusMetricsProvider();
+        }
+        this.statsProvider.start(configuration);
+        StatsLogger statsLogger = statsProvider.getStatsLogger("pulsar_ledger_deletion");
+        this.deleteLedgerOpLogger = statsLogger.getOpStatsLogger("delete_ledger");
+        this.deleteOffloadLedgerOpLogger = statsLogger.getOpStatsLogger("delete_offload_ledger");
+    }
+
+    private CompletableFuture<Void> initSystemTopic(String topicName, int partition) {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        doCreateSystemTopic(future, topicName, partition);
+        return future;
+    }
+
+    private void doCreateSystemTopic(CompletableFuture<Void> future, String topicName, int partition) {
+        this.pulsarAdmin.topics()
+                .createPartitionedTopicAsync(topicName, partition).whenComplete((res, e) -> {
+                    if (e != null && !(e instanceof PulsarAdminException.ConflictException)) {
+                        log.error("Initial system topic " + topicName + "failed.", e);
+                        doCreateSystemTopic(future, topicName, partition);
+                        return;
+                    }
+                    future.complete(null);
+                });
+    }
+
+    private void initLedgerDeletionSystemTopic() {
+        List<CompletableFuture<Void>> futures = new ArrayList<>();
+
+        futures.add(initSystemTopic(SystemTopicNames.LEDGER_DELETION_TOPIC.getPartitionedTopicName(),
+                this.ledgerDeletionParallelism));
+        futures.add(initSystemTopic(SystemTopicNames.LEDGER_DELETION_RETRY_TOPIC.getPartitionedTopicName(), 1));
+        futures.add(initSystemTopic(SystemTopicNames.LEDGER_DELETION_DLQ_TOPIC.getPartitionedTopicName(), 1));
+
+        FutureUtil.waitForAll(futures).whenComplete((res, e) -> {
+            initReaderFuture();
+            initWriterFuture();
+        });
+    }
+
+    private void readMorePendingDeleteLedger(LedgerDeletionSystemTopicClient.PendingDeleteLedgerReader reader) {
+        reader.readNextAsync().whenComplete((msg, ex) -> {
+            if (ex == null) {
+                handleMessage(reader, msg).exceptionally(e -> {
+                    log.warn("Delete ledger {} failed.", msg.getValue(), e);
+                    return null;
+                });
+                readMorePendingDeleteLedger(reader);
+            } else {
+                Throwable cause = FutureUtil.unwrapCompletionException(ex);
+                if (cause instanceof PulsarClientException.AlreadyClosedException) {
+                    log.error("Read more pending delete ledger exception, close the read now!", ex);
+                    reader.closeAsync();
+                    initReaderFuture();
+                } else {
+                    log.warn("Read more pending delete ledger exception, read again.", ex);
+                    readMorePendingDeleteLedger(reader);
+                }
+            }
+        });
+    }
+
+    private void initReaderFuture() {
+        this.readerFuture = ledgerDeletionTopicClient.newReaderAsync().whenComplete((reader, ex) -> {
+            if (ex != null) {
+                log.error("Failed to create reader on ledger deletion system topic", ex);
+                initReaderFuture();
+            } else {
+                readMorePendingDeleteLedger((LedgerDeletionSystemTopicClient.PendingDeleteLedgerReader) reader);
+            }
+        });
+    }
+
+    private void initWriterFuture() {
+        this.writerFuture = ledgerDeletionTopicClient.newWriterAsync().whenComplete((writer, ex) -> {
+            if (ex != null) {
+                log.error("Failed to create writer on ledger deletion system topic", ex);
+                initWriterFuture();
+            }
+        });
+    }
+
+    private String tuneTopicName(String topicName) {
+        if (topicName.contains("/" + TopicDomain.persistent.value())) {
+            return topicName.replaceFirst("/" + TopicDomain.persistent.value(), "");
+        }
+        return topicName;
+    }
+
+    @Override
+    public CompletableFuture<?> appendPendingDeleteLedger(String topicName, long ledgerId, LedgerInfo context,
+                                                          LedgerComponent component, LedgerType type,
+                                                          Map<String, String> properties) {
+        topicName = tuneTopicName(topicName);
+        PendingDeleteLedgerInfo pendingDeleteLedger = null;
+        if (LedgerType.LEDGER == type) {
+            pendingDeleteLedger =
+                    new PendingDeleteLedgerInfo(topicName, component, type, ledgerId, context, properties);
+        } else if (LedgerType.OFFLOAD_LEDGER == type) {
+            if (!context.getOffloadContext().hasUidMsb()) {
+                CompletableFuture<?> future = new CompletableFuture<>();
+                future.completeExceptionally(
+                        new IllegalArgumentException("The ledger " + ledgerId + " didn't offload."));
+                return future;
+            }
+            pendingDeleteLedger =
+                    new PendingDeleteLedgerInfo(topicName, component, type, ledgerId, context, properties);
+        }
+        return sendMessage(pendingDeleteLedger);
+    }
+
+    @Override
+    public void close() throws Exception {

Review Comment:
   yes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] horizonzy commented on a diff in pull request #16590: [PIP-186] Introduce two phase deletion protocol based on system topic

Posted by GitBox <gi...@apache.org>.
horizonzy commented on code in PR #16590:
URL: https://github.com/apache/pulsar/pull/16590#discussion_r947003851


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/deletion/LedgerDeletionService.java:
##########
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.deletion;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+public interface LedgerDeletionService {
+
+    /**
+     * Start.
+     */
+    void start() throws PulsarClientException, PulsarAdminException;
+
+    /**
+     * @param topicName topicName
+     * @param ledgerId  ledgerId
+     * @param context   ledgerInfo
+     * @param component managed_ledger, managed_cursor, schema_storage
+     * @param type      ledger, offload_ledger
+     * @param properties properties
+     * @return
+     */
+    CompletableFuture<?> appendPendingDeleteLedger(String topicName, long ledgerId, LedgerInfo context,
+                                                   LedgerComponent component, LedgerType type, Map<String, String> properties);
+
+    /**
+     *
+     * @param topicName topicName
+     * @param ledgerId  ledgerId
+     * @param component managed_ledger, managed_cursor, schema_storage
+     * @param isBelievedDelete isBelievedDelete, if false, we should check the param is match the ledger metadata.
+     * @return
+     */
+    CompletableFuture<?> asyncDeleteLedger(String topicName, long ledgerId, LedgerComponent component,
+                                           boolean isBelievedDelete);
+
+    /**
+     *
+     * @param topicName topicName
+     * @param ledgerId ledgerId
+     * @param offloadContext offloadContext
+     * @return
+     */
+    CompletableFuture<?> asyncDeleteOffloadedLedger(String topicName, long ledgerId,
+                                                    MLDataFormats.OffloadContext offloadContext);
+
+    /**
+     * Close.
+     */
+    void close() throws Exception;
+
+    /**
+     * Async close.
+     */
+    CompletableFuture<?> asyncClose();
+
+    class LedgerDeletionServiceDisable implements LedgerDeletionService {
+
+        @Override
+        public void start() {
+            //No op
+        }
+
+        private static final CompletableFuture<?> COMPLETABLE_FUTURE = CompletableFuture.completedFuture(null);

Review Comment:
   Fine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] horizonzy commented on a diff in pull request #16590: [PIP-186] Introduce two phase deletion protocol based on system topic

Posted by GitBox <gi...@apache.org>.
horizonzy commented on code in PR #16590:
URL: https://github.com/apache/pulsar/pull/16590#discussion_r963051764


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedLedgerDeletionService.java:
##########
@@ -0,0 +1,583 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import static org.apache.bookkeeper.mledger.util.Errors.isNoSuchLedgerExistsException;
+import com.google.common.collect.Maps;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.mledger.LedgerOffloader;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.deletion.LedgerComponent;
+import org.apache.bookkeeper.mledger.deletion.LedgerDeletionService;
+import org.apache.bookkeeper.mledger.deletion.LedgerType;
+import org.apache.bookkeeper.mledger.deletion.PendingDeleteLedgerInfo;
+import org.apache.bookkeeper.mledger.deletion.PendingDeleteLedgerInvalidException;
+import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.offload.OffloadUtils;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
+import org.apache.bookkeeper.stats.NullStatsProvider;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stats.StatsProvider;
+import org.apache.bookkeeper.util.MathUtils;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.stats.prometheus.metrics.PrometheusMetricsProvider;
+import org.apache.pulsar.broker.systopic.LedgerDeletionSystemTopicClient;
+import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
+import org.apache.pulsar.broker.systopic.SystemTopicClient;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.common.events.EventType;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.SystemTopicNames;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
+import org.apache.pulsar.common.protocol.topic.DeleteLedgerPayload;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SystemTopicBasedLedgerDeletionService implements LedgerDeletionService {
+
+    private final NamespaceEventsSystemTopicFactory namespaceEventsSystemTopicFactory;
+
+    private static final Logger log = LoggerFactory.getLogger(SystemTopicBasedLedgerDeletionService.class);
+
+    private final PulsarAdmin pulsarAdmin;
+
+    private final int ledgerDeletionParallelism;
+
+    private final ServiceConfiguration serviceConfiguration;
+
+    private final Map<OffloadPoliciesImpl, LedgerOffloader> offloaderMap = new ConcurrentHashMap<>();
+
+    private StatsProvider statsProvider = new NullStatsProvider();
+
+    private OpStatsLogger deleteLedgerOpLogger;
+
+    private OpStatsLogger deleteOffloadLedgerOpLogger;
+
+    private final BookKeeper bookKeeper;
+
+    private final PulsarService pulsarService;
+
+    private SystemTopicClient<PendingDeleteLedgerInfo> ledgerDeletionTopicClient;
+
+    private transient CompletableFuture<SystemTopicClient.Reader<PendingDeleteLedgerInfo>> readerFuture;
+
+    private transient CompletableFuture<SystemTopicClient.Writer<PendingDeleteLedgerInfo>> writerFuture;
+
+    public SystemTopicBasedLedgerDeletionService(PulsarService pulsarService, ServiceConfiguration serviceConfiguration)
+            throws PulsarServerException {
+        this.namespaceEventsSystemTopicFactory = new NamespaceEventsSystemTopicFactory(pulsarService.getClient());
+        this.pulsarService = pulsarService;
+        this.pulsarAdmin = pulsarService.getAdminClient();
+        this.bookKeeper = pulsarService.getBookKeeperClient();
+        this.serviceConfiguration = serviceConfiguration;
+        this.ledgerDeletionParallelism = serviceConfiguration.getLedgerDeletionParallelismOfTopicTwoPhaseDeletion();
+    }
+
+    private SystemTopicClient<PendingDeleteLedgerInfo> getLedgerDeletionTopicClient() throws PulsarClientException {
+        TopicName ledgerDeletionSystemTopic =
+                NamespaceEventsSystemTopicFactory.getSystemTopicName(NamespaceName.SYSTEM_NAMESPACE,
+                        EventType.LEDGER_DELETION);
+        if (ledgerDeletionSystemTopic == null) {
+            throw new PulsarClientException.InvalidTopicNameException(
+                    "Can't create SystemTopicBasedLedgerDeletionService, " + "because the topicName is null!");
+        }
+        return namespaceEventsSystemTopicFactory.createLedgerDeletionSystemTopicClient(ledgerDeletionSystemTopic,
+                serviceConfiguration.getSendDelayOfTopicTwoPhaseDeletionInSeconds(),
+                serviceConfiguration.getReconsumeLaterOfTopicTwoPhaseDeletionInSeconds());
+    }
+
+    @Override
+    public void start() throws PulsarClientException, PulsarAdminException {
+        this.ledgerDeletionTopicClient = getLedgerDeletionTopicClient();
+        initStatsLogger();
+        initLedgerDeletionSystemTopic();
+    }
+
+    private void initStatsLogger() {
+        Configuration configuration = new ClientConfiguration();
+        if (serviceConfiguration.isBookkeeperClientExposeStatsToPrometheus()) {
+            configuration.addProperty(PrometheusMetricsProvider.PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS,
+                    serviceConfiguration.getManagedLedgerPrometheusStatsLatencyRolloverSeconds());
+            configuration.addProperty(PrometheusMetricsProvider.CLUSTER_NAME, serviceConfiguration.getClusterName());
+            this.statsProvider = new PrometheusMetricsProvider();
+        }
+        this.statsProvider.start(configuration);
+        StatsLogger statsLogger = statsProvider.getStatsLogger("pulsar_ledger_deletion");
+        this.deleteLedgerOpLogger = statsLogger.getOpStatsLogger("delete_ledger");
+        this.deleteOffloadLedgerOpLogger = statsLogger.getOpStatsLogger("delete_offload_ledger");
+    }
+
+    private CompletableFuture<Void> initSystemTopic(String topicName, int partition) {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        doCreateSystemTopic(future, topicName, partition);
+        return future;
+    }
+
+    private void doCreateSystemTopic(CompletableFuture<Void> future, String topicName, int partition) {
+        this.pulsarAdmin.topics()
+                .createPartitionedTopicAsync(topicName, partition).whenComplete((res, e) -> {
+                    if (e != null && !(e instanceof PulsarAdminException.ConflictException)) {
+                        log.error("Initial system topic " + topicName + "failed.", e);
+                        doCreateSystemTopic(future, topicName, partition);
+                        return;
+                    }
+                    future.complete(null);
+                });
+    }
+
+    private void initLedgerDeletionSystemTopic() {
+        List<CompletableFuture<Void>> futures = new ArrayList<>();
+
+        futures.add(initSystemTopic(SystemTopicNames.LEDGER_DELETION_TOPIC.getPartitionedTopicName(),
+                this.ledgerDeletionParallelism));
+        futures.add(initSystemTopic(SystemTopicNames.LEDGER_DELETION_RETRY_TOPIC.getPartitionedTopicName(), 1));
+        futures.add(initSystemTopic(SystemTopicNames.LEDGER_DELETION_DLQ_TOPIC.getPartitionedTopicName(), 1));
+
+        FutureUtil.waitForAll(futures).whenComplete((res, e) -> {
+            initReaderFuture();
+            initWriterFuture();
+        });
+    }
+
+    private void readMorePendingDeleteLedger(LedgerDeletionSystemTopicClient.PendingDeleteLedgerReader reader) {
+        reader.readNextAsync().whenComplete((msg, ex) -> {
+            if (ex == null) {
+                handleMessage(reader, msg).exceptionally(e -> {
+                    log.warn("Delete ledger {} failed.", msg.getValue(), e);
+                    return null;
+                });
+                readMorePendingDeleteLedger(reader);
+            } else {
+                Throwable cause = FutureUtil.unwrapCompletionException(ex);
+                if (cause instanceof PulsarClientException.AlreadyClosedException) {
+                    log.error("Read more pending delete ledger exception, close the read now!", ex);
+                    reader.closeAsync();
+                    initReaderFuture();
+                } else {
+                    log.warn("Read more pending delete ledger exception, read again.", ex);
+                    readMorePendingDeleteLedger(reader);
+                }
+            }
+        });
+    }
+
+    private void initReaderFuture() {
+        this.readerFuture = ledgerDeletionTopicClient.newReaderAsync().whenComplete((reader, ex) -> {
+            if (ex != null) {
+                log.error("Failed to create reader on ledger deletion system topic", ex);
+                initReaderFuture();
+            } else {
+                readMorePendingDeleteLedger((LedgerDeletionSystemTopicClient.PendingDeleteLedgerReader) reader);
+            }
+        });
+    }
+
+    private void initWriterFuture() {
+        this.writerFuture = ledgerDeletionTopicClient.newWriterAsync().whenComplete((writer, ex) -> {
+            if (ex != null) {
+                log.error("Failed to create writer on ledger deletion system topic", ex);
+                initWriterFuture();
+            }
+        });
+    }
+
+    private String tuneTopicName(String topicName) {
+        if (topicName.contains("/" + TopicDomain.persistent.value())) {
+            return topicName.replaceFirst("/" + TopicDomain.persistent.value(), "");
+        }
+        return topicName;
+    }
+
+    @Override
+    public CompletableFuture<?> appendPendingDeleteLedger(String topicName, long ledgerId, LedgerInfo context,
+                                                          LedgerComponent component, LedgerType type,
+                                                          Map<String, String> properties) {
+        topicName = tuneTopicName(topicName);
+        PendingDeleteLedgerInfo pendingDeleteLedger = null;
+        if (LedgerType.LEDGER == type) {
+            pendingDeleteLedger =
+                    new PendingDeleteLedgerInfo(topicName, component, type, ledgerId, context, properties);
+        } else if (LedgerType.OFFLOAD_LEDGER == type) {
+            if (!context.getOffloadContext().hasUidMsb()) {
+                CompletableFuture<?> future = new CompletableFuture<>();
+                future.completeExceptionally(
+                        new IllegalArgumentException("The ledger " + ledgerId + " didn't offload."));
+                return future;
+            }
+            pendingDeleteLedger =
+                    new PendingDeleteLedgerInfo(topicName, component, type, ledgerId, context, properties);
+        }
+        return sendMessage(pendingDeleteLedger);
+    }
+
+    @Override
+    public void close() throws Exception {
+        asyncClose().get();
+    }
+
+    @Override
+    public CompletableFuture<?> asyncClose() {
+        if (readerFuture != null && !readerFuture.isCompletedExceptionally()) {
+            return readerFuture.thenCompose(SystemTopicClient.Reader::closeAsync);
+        }
+        return CompletableFuture.completedFuture(null);
+    }
+
+    @Override
+    public boolean isTopicTwoPhaseDeletionEnabled() {
+        return true;
+    }
+
+    private CompletableFuture<?> handleMessage(LedgerDeletionSystemTopicClient.PendingDeleteLedgerReader reader,
+                                               Message<PendingDeleteLedgerInfo> message) {
+        CompletableFuture<?> future = new CompletableFuture<>();
+        deleteInBroker(message).whenComplete((res, ex) -> {
+            if (ex != null) {
+                if (ex instanceof PulsarAdminException.NotFoundException) {
+                    deleteLocally(message).whenComplete((res1, ex1) -> {
+                        if (ex1 != null) {
+                            if (ex1 instanceof PendingDeleteLedgerInvalidException) {
+                                if (log.isDebugEnabled()) {
+                                    log.debug("Received invalid pending delete ledger {}, invalid reason: {}",
+                                            message.getValue().getLedgerId(), ex1.getMessage());
+                                }
+                                reader.ackMessageAsync(message);
+                                future.complete(null);
+                                return;
+                            }
+                            reader.reconsumeLaterAsync(message);
+                            future.completeExceptionally(ex1);
+                        }
+                        reader.ackMessageAsync(message);
+                        future.complete(null);
+                    });
+                } else if (ex instanceof PendingDeleteLedgerInvalidException) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Received invalid pending delete ledger {}, invalid reason: {}",
+                                message.getValue().getLedgerId(), ex.getMessage());
+                    }
+                    reader.ackMessageAsync(message);
+                    future.complete(null);
+                } else if (ex instanceof PulsarAdminException.ServerSideErrorException) {
+
+                } else {
+                    reader.reconsumeLaterAsync(message);
+                    future.completeExceptionally(ex);
+                }
+                return;
+            }
+            reader.ackMessageAsync(message);
+            future.complete(null);
+        });
+        return future;
+    }
+
+    private CompletableFuture<?> deleteInBroker(Message<PendingDeleteLedgerInfo> message) {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        try {
+            PendingDeleteLedgerInfo pendingDeleteLedger = message.getValue();
+            //Now support managed_ledger two phase deletion.
+            if (LedgerComponent.MANAGED_LEDGER == pendingDeleteLedger.getLedgerComponent()) {
+                Long ledgerId = pendingDeleteLedger.getLedgerId();
+                String topicName = pendingDeleteLedger.getTopicName();
+                String ledgerType = pendingDeleteLedger.getLedgerType().name();
+                String ledgerComponent = pendingDeleteLedger.getLedgerComponent().name();
+                DeleteLedgerPayload deleteLedgerPayload =
+                        new DeleteLedgerPayload(ledgerId, topicName, ledgerType, ledgerComponent);
+                if (LedgerType.OFFLOAD_LEDGER == pendingDeleteLedger.getLedgerType()) {
+                    deleteLedgerPayload.setOffloadContext(buildOffloadContext(pendingDeleteLedger));
+                }
+                pulsarAdmin.topics().deleteLedgerAsync(deleteLedgerPayload).whenComplete((res, ex) -> {
+                    if (ex != null) {
+                        future.completeExceptionally(ex);
+                        return;
+                    }
+                    future.complete(null);
+                });
+            } else if (LedgerComponent.MANAGED_CURSOR == pendingDeleteLedger.getLedgerComponent()) {
+                future.complete(null);

Review Comment:
   Alright.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] horizonzy commented on pull request #16590: [PIP-186] Introduce two phase deletion protocol based on system topic

Posted by GitBox <gi...@apache.org>.
horizonzy commented on PR #16590:
URL: https://github.com/apache/pulsar/pull/16590#issuecomment-1198831319

   See #16569 firstly, the pip has changed, the code is not match the new pip, after the pip passing, I will change this code.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli commented on a diff in pull request #16590: [PIP-186] Introduce two phase deletion protocol based on system topic

Posted by GitBox <gi...@apache.org>.
eolivelli commented on code in PR #16590:
URL: https://github.com/apache/pulsar/pull/16590#discussion_r946919802


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java:
##########
@@ -1079,6 +1080,34 @@ public void deleteTopic(
                 });
     }
 
+    @POST
+    @Path("/{tenant}/{namespace}/{topic}/ledger/delete")
+    @ApiOperation(value = "Delete a topic.",
+            notes = "The topic cannot be deleted if delete is not forcefully and there's any active "
+                    + "subscription or producer connected to the it. "
+                    + "Force delete ignores connected clients and deletes topic by explicitly closing them.")
+    @ApiResponses(value = {
+            @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
+            @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"),
+            @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Topic does not exist"),
+            @ApiResponse(code = 412, message = "Topic has active producers/subscriptions"),
+            @ApiResponse(code = 500, message = "Internal server error")})
+    public void deleteLedger(

Review Comment:
   why do we need a new API?



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2570,70 +2651,107 @@ void internalTrimLedgers(boolean isTruncate, CompletableFuture<?> promise) {
             advanceCursorsIfNecessary(ledgersToDelete);
 
             PositionImpl currentLastConfirmedEntry = lastConfirmedEntry;
-            // Update metadata
-            for (LedgerInfo ls : ledgersToDelete) {
-                if (currentLastConfirmedEntry != null && ls.getLedgerId() == currentLastConfirmedEntry.getLedgerId()) {
-                    // this info is relevant because the lastMessageId won't be available anymore
-                    log.info("[{}] Ledger {} contains the current last confirmed entry {}, and it is going to be "
-                             + "deleted", name, ls.getLedgerId(), currentLastConfirmedEntry);
-                }
-
-                invalidateReadHandle(ls.getLedgerId());
-
-                ledgers.remove(ls.getLedgerId());
-                NUMBER_OF_ENTRIES_UPDATER.addAndGet(this, -ls.getEntries());
-                TOTAL_SIZE_UPDATER.addAndGet(this, -ls.getSize());
 
-                entryCache.invalidateAllEntries(ls.getLedgerId());
-            }
-            for (LedgerInfo ls : offloadedLedgersToDelete) {
-                LedgerInfo.Builder newInfoBuilder = ls.toBuilder();
-                newInfoBuilder.getOffloadContextBuilder().setBookkeeperDeleted(true);
-                String driverName = OffloadUtils.getOffloadDriverName(ls,
-                        config.getLedgerOffloader().getOffloadDriverName());
-                Map<String, String> driverMetadata = OffloadUtils.getOffloadDriverMetadata(ls,
-                        config.getLedgerOffloader().getOffloadDriverMetadata());
-                OffloadUtils.setOffloadDriverMetadata(newInfoBuilder, driverName, driverMetadata);
-                ledgers.put(ls.getLedgerId(), newInfoBuilder.build());
-            }
+            // Update metadata
+            // Mark deletable ledgers
+            Set<Long> deletableLedgers =
+                    Stream.concat(ledgersToDelete.stream().filter(ls -> !ls.getOffloadContext().getBookkeeperDeleted()),
+                            offloadedLedgersToDelete.stream()).map(LedgerInfo::getLedgerId).collect(Collectors.toSet());
+
+            // Mark deletable offloaded ledgers
+            Set<Long> deletableOffloadedLedgers = ledgersToDelete.stream()
+                    .filter(ls -> ls.getOffloadContext().hasUidMsb())
+                    .map(LedgerInfo::getLedgerId).collect(Collectors.toSet());
+
+            CompletableFuture<?> appendDeleteLedgerFuture =
+                    appendPendingDeleteLedger(deletableLedgers, deletableOffloadedLedgers);
+            appendDeleteLedgerFuture.thenAccept(ignore -> {
+                believedDeleteIds.addAll(deletableLedgers);
+                for (LedgerInfo ls : ledgersToDelete) {
+                    if (currentLastConfirmedEntry != null
+                            && ls.getLedgerId() == currentLastConfirmedEntry.getLedgerId()) {
+                        // this info is relevant because the lastMessageId won't be available anymore
+                        log.info("[{}] Ledger {} contains the current last confirmed entry {}, and it is going to be "
+                                + "deleted", name, ls.getLedgerId(), currentLastConfirmedEntry);
+                    }
+                    invalidateReadHandle(ls.getLedgerId());
+                    ledgers.remove(ls.getLedgerId());
+                    entryCache.invalidateAllEntries(ls.getLedgerId());
 
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] Updating of ledgers list after trimming", name);
-            }
+                    NUMBER_OF_ENTRIES_UPDATER.addAndGet(this, -ls.getEntries());
+                    TOTAL_SIZE_UPDATER.addAndGet(this, -ls.getSize());
+                }
+                for (LedgerInfo ls : offloadedLedgersToDelete) {
+                    LedgerInfo.Builder newInfoBuilder = ls.toBuilder();
+                    newInfoBuilder.getOffloadContextBuilder().setBookkeeperDeleted(true);
+                    String driverName = OffloadUtils.getOffloadDriverName(ls,
+                            config.getLedgerOffloader().getOffloadDriverName());
+                    Map<String, String> driverMetadata = OffloadUtils.getOffloadDriverMetadata(ls,
+                            config.getLedgerOffloader().getOffloadDriverMetadata());
+                    OffloadUtils.setOffloadDriverMetadata(newInfoBuilder, driverName, driverMetadata);
+                    ledgers.put(ls.getLedgerId(), newInfoBuilder.build());
+                }
 
-            store.asyncUpdateLedgerIds(name, getManagedLedgerInfo(), ledgersStat, new MetaStoreCallback<Void>() {
-                @Override
-                public void operationComplete(Void result, Stat stat) {
-                    log.info("[{}] End TrimConsumedLedgers. ledgers={} totalSize={}", name, ledgers.size(),
-                            TOTAL_SIZE_UPDATER.get(ManagedLedgerImpl.this));
-                    ledgersStat = stat;
-                    metadataMutex.unlock();
-                    trimmerMutex.unlock();
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Updating of ledgers list after trimming", name);
+                }
 
-                    for (LedgerInfo ls : ledgersToDelete) {
-                        log.info("[{}] Removing ledger {} - size: {}", name, ls.getLedgerId(), ls.getSize());
-                        asyncDeleteLedger(ls.getLedgerId(), ls);
-                    }
-                    for (LedgerInfo ls : offloadedLedgersToDelete) {
-                        log.info("[{}] Deleting offloaded ledger {} from bookkeeper - size: {}", name, ls.getLedgerId(),
-                                ls.getSize());
-                        asyncDeleteLedgerFromBookKeeper(ls.getLedgerId());
+                store.asyncUpdateLedgerIds(name, getManagedLedgerInfo(), ledgersStat, new MetaStoreCallback<Void>() {
+                    @Override
+                    public void operationComplete(Void result, Stat stat) {
+                        log.info("[{}] End TrimConsumedLedgers. ledgers={} totalSize={}", name, ledgers.size(),
+                                TOTAL_SIZE_UPDATER.get(ManagedLedgerImpl.this));
+                        ledgersStat = stat;
+                        metadataMutex.unlock();
+                        trimmerMutex.unlock();
+                        if (ledgerDeletionService instanceof LedgerDeletionService.LedgerDeletionServiceDisable) {

Review Comment:
   using instanceof is a bad practice.
   we should add a method to LedgerDeletionService and override it on the various implementations



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] github-actions[bot] commented on pull request #16590: [PIP-186] Introduce two phase deletion protocol based on system topic

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #16590:
URL: https://github.com/apache/pulsar/pull/16590#issuecomment-1184045130

   @horizonzy Please provide a correct documentation label for your PR.
   Instructions see [Pulsar Documentation Label Guide](https://docs.google.com/document/d/1Qw7LHQdXWBW9t2-r-A7QdFDBwmZh6ytB4guwMoXHqc0).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] github-actions[bot] commented on pull request #16590: [PIP-186] Introduce two phase deletion protocol based on system topic

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #16590:
URL: https://github.com/apache/pulsar/pull/16590#issuecomment-1269218354

   The pr had no activity for 30 days, mark with Stale label.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Anonymitaet commented on pull request #16590: [PIP-186] Introduce two phase deletion protocol based on system topic

Posted by "Anonymitaet (via GitHub)" <gi...@apache.org>.
Anonymitaet commented on PR #16590:
URL: https://github.com/apache/pulsar/pull/16590#issuecomment-1644951945

   Confirmed w/ @codelipenghui, for 3.1, the code will be frozen next week, and this PR will not be merged into 3.1.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on pull request #16590: [WIP] [PIP-186] Introduce two phase deletion protocol based on system topic

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on PR #16590:
URL: https://github.com/apache/pulsar/pull/16590#issuecomment-1198950163

   Move to 2.12.0 first


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] horizonzy commented on pull request #16590: [PIP-186] Introduce two phase deletion protocol based on system topic

Posted by GitBox <gi...@apache.org>.
horizonzy commented on PR #16590:
URL: https://github.com/apache/pulsar/pull/16590#issuecomment-1281761213

   @hangc0276 @eolivelli Could you help to review it, thanks! Pending for a long time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] hangc0276 commented on a diff in pull request #16590: [PIP-186] Introduce two phase deletion protocol based on system topic

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on code in PR #16590:
URL: https://github.com/apache/pulsar/pull/16590#discussion_r962455243


##########
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java:
##########
@@ -2666,6 +2666,36 @@ public class ServiceConfiguration implements PulsarConfiguration {
         )
     private int managedLedgerInactiveLedgerRolloverTimeSeconds = 0;
 
+    @FieldContext(
+            dynamic = true,
+            category = CATEGORY_SERVER,
+            doc = "Using two phase deletion when delete ledger. if true, "
+                    + "LedgerDeletionService will take over ledger deletion. (Default false)"
+    )
+    private boolean topicTwoPhaseDeletionEnabled;

Review Comment:
   We should set the default value to `false`



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -1039,6 +1044,126 @@ private void internalUnloadTransactionCoordinatorAsync(AsyncResponse asyncRespon
                 });
     }
 
+    protected void internalDeleteLedger(AsyncResponse asyncResponse, boolean authoritative,
+                                        DeleteLedgerPayload deleteLedgerPayload) {
+        validateTopicOwnership(topicName, authoritative);
+
+        CompletableFuture<Void> ret;
+        if (topicName.isGlobal()) {
+            ret = validateGlobalNamespaceOwnershipAsync(namespaceName);
+        } else {
+            ret = CompletableFuture.completedFuture(null);
+        }
+        ret.thenAccept(__ -> {
+            long ledgerId = deleteLedgerPayload.getLedgerId();
+            log.info("[{}][{}] received delete ledger: {}", clientAppId(), topicName, ledgerId);
+            validateTopicOwnershipAsync(topicName, authoritative)
+                    .thenCompose(ignore ->
+                            //Is need to check delete_ledger operation?
+                            validateTopicOperationAsync(topicName, TopicOperation.DELETE_LEDGER))
+                    .thenCompose(ignore -> getTopicReferenceAsync(topicName))
+                    .thenCompose(topic -> {
+                        CompletableFuture<Void> future = new CompletableFuture<>();
+                        if (topic == null) {
+                            asyncResponse.resume(new RestException(Status.NOT_FOUND,
+                                    getTopicNotFoundErrorMessage(topicName.toString())));
+                            future.complete(null);
+                            return future;
+                        }
+                        ManagedLedger managedLedger = ((PersistentTopic) topic).getManagedLedger();
+                        DeleteLedgerPayload.OffloadContext context = deleteLedgerPayload.getOffloadContext();
+                        LedgerType ledgerType = LedgerType.valueOf(deleteLedgerPayload.getLedgerType());
+                        if (LedgerType.LEDGER == ledgerType) {
+                            managedLedger.asyncDeleteLedger(topicName.getPersistenceNamingEncoding(), ledgerId,
+                                    ledgerType, null).whenComplete((res, ex) -> {
+                                        if (ex != null) {
+                                            if (ex instanceof PendingDeleteLedgerInvalidException) {
+                                                if (log.isDebugEnabled()) {
+                                                    log.debug("[{}][{}] Received invalid pending delete ledger {},"
+                                                                    + " invalid reason: {}", clientAppId(), topicName,
+                                                            ledgerId, ex.getMessage());
+                                                }
+                                                future.complete(null);
+                                                return;
+                                            }
+                                            future.completeExceptionally(ex);
+                                            return;
+                                        }
+                                        future.complete(null);
+                                    });
+                        } else if (LedgerType.OFFLOAD_LEDGER == ledgerType) {

Review Comment:
   Can we merge these two cases into one? The main differences are the `offloadContext` parameter. In `managedledger.asyncDeleteLedger` method, we also checked the `ledgerType`



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedLedgerDeletionService.java:
##########
@@ -0,0 +1,583 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import static org.apache.bookkeeper.mledger.util.Errors.isNoSuchLedgerExistsException;
+import com.google.common.collect.Maps;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.mledger.LedgerOffloader;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.deletion.LedgerComponent;
+import org.apache.bookkeeper.mledger.deletion.LedgerDeletionService;
+import org.apache.bookkeeper.mledger.deletion.LedgerType;
+import org.apache.bookkeeper.mledger.deletion.PendingDeleteLedgerInfo;
+import org.apache.bookkeeper.mledger.deletion.PendingDeleteLedgerInvalidException;
+import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.offload.OffloadUtils;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
+import org.apache.bookkeeper.stats.NullStatsProvider;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stats.StatsProvider;
+import org.apache.bookkeeper.util.MathUtils;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.stats.prometheus.metrics.PrometheusMetricsProvider;
+import org.apache.pulsar.broker.systopic.LedgerDeletionSystemTopicClient;
+import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
+import org.apache.pulsar.broker.systopic.SystemTopicClient;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.common.events.EventType;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.SystemTopicNames;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
+import org.apache.pulsar.common.protocol.topic.DeleteLedgerPayload;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SystemTopicBasedLedgerDeletionService implements LedgerDeletionService {
+
+    private final NamespaceEventsSystemTopicFactory namespaceEventsSystemTopicFactory;
+
+    private static final Logger log = LoggerFactory.getLogger(SystemTopicBasedLedgerDeletionService.class);
+
+    private final PulsarAdmin pulsarAdmin;
+
+    private final int ledgerDeletionParallelism;
+
+    private final ServiceConfiguration serviceConfiguration;
+
+    private final Map<OffloadPoliciesImpl, LedgerOffloader> offloaderMap = new ConcurrentHashMap<>();
+
+    private StatsProvider statsProvider = new NullStatsProvider();
+
+    private OpStatsLogger deleteLedgerOpLogger;
+
+    private OpStatsLogger deleteOffloadLedgerOpLogger;
+
+    private final BookKeeper bookKeeper;
+
+    private final PulsarService pulsarService;
+
+    private SystemTopicClient<PendingDeleteLedgerInfo> ledgerDeletionTopicClient;
+
+    private transient CompletableFuture<SystemTopicClient.Reader<PendingDeleteLedgerInfo>> readerFuture;
+
+    private transient CompletableFuture<SystemTopicClient.Writer<PendingDeleteLedgerInfo>> writerFuture;
+
+    public SystemTopicBasedLedgerDeletionService(PulsarService pulsarService, ServiceConfiguration serviceConfiguration)
+            throws PulsarServerException {
+        this.namespaceEventsSystemTopicFactory = new NamespaceEventsSystemTopicFactory(pulsarService.getClient());
+        this.pulsarService = pulsarService;
+        this.pulsarAdmin = pulsarService.getAdminClient();
+        this.bookKeeper = pulsarService.getBookKeeperClient();
+        this.serviceConfiguration = serviceConfiguration;
+        this.ledgerDeletionParallelism = serviceConfiguration.getLedgerDeletionParallelismOfTopicTwoPhaseDeletion();
+    }
+
+    private SystemTopicClient<PendingDeleteLedgerInfo> getLedgerDeletionTopicClient() throws PulsarClientException {
+        TopicName ledgerDeletionSystemTopic =
+                NamespaceEventsSystemTopicFactory.getSystemTopicName(NamespaceName.SYSTEM_NAMESPACE,
+                        EventType.LEDGER_DELETION);
+        if (ledgerDeletionSystemTopic == null) {
+            throw new PulsarClientException.InvalidTopicNameException(
+                    "Can't create SystemTopicBasedLedgerDeletionService, " + "because the topicName is null!");
+        }
+        return namespaceEventsSystemTopicFactory.createLedgerDeletionSystemTopicClient(ledgerDeletionSystemTopic,
+                serviceConfiguration.getSendDelayOfTopicTwoPhaseDeletionInSeconds(),
+                serviceConfiguration.getReconsumeLaterOfTopicTwoPhaseDeletionInSeconds());
+    }
+
+    @Override
+    public void start() throws PulsarClientException, PulsarAdminException {
+        this.ledgerDeletionTopicClient = getLedgerDeletionTopicClient();
+        initStatsLogger();
+        initLedgerDeletionSystemTopic();
+    }
+
+    private void initStatsLogger() {
+        Configuration configuration = new ClientConfiguration();
+        if (serviceConfiguration.isBookkeeperClientExposeStatsToPrometheus()) {
+            configuration.addProperty(PrometheusMetricsProvider.PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS,
+                    serviceConfiguration.getManagedLedgerPrometheusStatsLatencyRolloverSeconds());
+            configuration.addProperty(PrometheusMetricsProvider.CLUSTER_NAME, serviceConfiguration.getClusterName());
+            this.statsProvider = new PrometheusMetricsProvider();
+        }
+        this.statsProvider.start(configuration);
+        StatsLogger statsLogger = statsProvider.getStatsLogger("pulsar_ledger_deletion");
+        this.deleteLedgerOpLogger = statsLogger.getOpStatsLogger("delete_ledger");

Review Comment:
   Does the metrics can be exported to the broker Prometheus port?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedLedgerDeletionService.java:
##########
@@ -0,0 +1,583 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import static org.apache.bookkeeper.mledger.util.Errors.isNoSuchLedgerExistsException;
+import com.google.common.collect.Maps;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.mledger.LedgerOffloader;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.deletion.LedgerComponent;
+import org.apache.bookkeeper.mledger.deletion.LedgerDeletionService;
+import org.apache.bookkeeper.mledger.deletion.LedgerType;
+import org.apache.bookkeeper.mledger.deletion.PendingDeleteLedgerInfo;
+import org.apache.bookkeeper.mledger.deletion.PendingDeleteLedgerInvalidException;
+import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.offload.OffloadUtils;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
+import org.apache.bookkeeper.stats.NullStatsProvider;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stats.StatsProvider;
+import org.apache.bookkeeper.util.MathUtils;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.stats.prometheus.metrics.PrometheusMetricsProvider;
+import org.apache.pulsar.broker.systopic.LedgerDeletionSystemTopicClient;
+import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
+import org.apache.pulsar.broker.systopic.SystemTopicClient;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.common.events.EventType;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.SystemTopicNames;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
+import org.apache.pulsar.common.protocol.topic.DeleteLedgerPayload;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SystemTopicBasedLedgerDeletionService implements LedgerDeletionService {
+
+    private final NamespaceEventsSystemTopicFactory namespaceEventsSystemTopicFactory;
+
+    private static final Logger log = LoggerFactory.getLogger(SystemTopicBasedLedgerDeletionService.class);
+
+    private final PulsarAdmin pulsarAdmin;
+
+    private final int ledgerDeletionParallelism;
+
+    private final ServiceConfiguration serviceConfiguration;
+
+    private final Map<OffloadPoliciesImpl, LedgerOffloader> offloaderMap = new ConcurrentHashMap<>();
+
+    private StatsProvider statsProvider = new NullStatsProvider();
+
+    private OpStatsLogger deleteLedgerOpLogger;
+
+    private OpStatsLogger deleteOffloadLedgerOpLogger;
+
+    private final BookKeeper bookKeeper;
+
+    private final PulsarService pulsarService;
+
+    private SystemTopicClient<PendingDeleteLedgerInfo> ledgerDeletionTopicClient;
+
+    private transient CompletableFuture<SystemTopicClient.Reader<PendingDeleteLedgerInfo>> readerFuture;
+
+    private transient CompletableFuture<SystemTopicClient.Writer<PendingDeleteLedgerInfo>> writerFuture;
+
+    public SystemTopicBasedLedgerDeletionService(PulsarService pulsarService, ServiceConfiguration serviceConfiguration)
+            throws PulsarServerException {
+        this.namespaceEventsSystemTopicFactory = new NamespaceEventsSystemTopicFactory(pulsarService.getClient());
+        this.pulsarService = pulsarService;
+        this.pulsarAdmin = pulsarService.getAdminClient();
+        this.bookKeeper = pulsarService.getBookKeeperClient();
+        this.serviceConfiguration = serviceConfiguration;
+        this.ledgerDeletionParallelism = serviceConfiguration.getLedgerDeletionParallelismOfTopicTwoPhaseDeletion();
+    }
+
+    private SystemTopicClient<PendingDeleteLedgerInfo> getLedgerDeletionTopicClient() throws PulsarClientException {
+        TopicName ledgerDeletionSystemTopic =
+                NamespaceEventsSystemTopicFactory.getSystemTopicName(NamespaceName.SYSTEM_NAMESPACE,
+                        EventType.LEDGER_DELETION);
+        if (ledgerDeletionSystemTopic == null) {
+            throw new PulsarClientException.InvalidTopicNameException(
+                    "Can't create SystemTopicBasedLedgerDeletionService, " + "because the topicName is null!");
+        }
+        return namespaceEventsSystemTopicFactory.createLedgerDeletionSystemTopicClient(ledgerDeletionSystemTopic,
+                serviceConfiguration.getSendDelayOfTopicTwoPhaseDeletionInSeconds(),
+                serviceConfiguration.getReconsumeLaterOfTopicTwoPhaseDeletionInSeconds());
+    }
+
+    @Override
+    public void start() throws PulsarClientException, PulsarAdminException {
+        this.ledgerDeletionTopicClient = getLedgerDeletionTopicClient();
+        initStatsLogger();
+        initLedgerDeletionSystemTopic();
+    }
+
+    private void initStatsLogger() {
+        Configuration configuration = new ClientConfiguration();
+        if (serviceConfiguration.isBookkeeperClientExposeStatsToPrometheus()) {
+            configuration.addProperty(PrometheusMetricsProvider.PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS,
+                    serviceConfiguration.getManagedLedgerPrometheusStatsLatencyRolloverSeconds());
+            configuration.addProperty(PrometheusMetricsProvider.CLUSTER_NAME, serviceConfiguration.getClusterName());
+            this.statsProvider = new PrometheusMetricsProvider();
+        }
+        this.statsProvider.start(configuration);
+        StatsLogger statsLogger = statsProvider.getStatsLogger("pulsar_ledger_deletion");
+        this.deleteLedgerOpLogger = statsLogger.getOpStatsLogger("delete_ledger");
+        this.deleteOffloadLedgerOpLogger = statsLogger.getOpStatsLogger("delete_offload_ledger");
+    }
+
+    private CompletableFuture<Void> initSystemTopic(String topicName, int partition) {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        doCreateSystemTopic(future, topicName, partition);
+        return future;
+    }
+
+    private void doCreateSystemTopic(CompletableFuture<Void> future, String topicName, int partition) {
+        this.pulsarAdmin.topics()
+                .createPartitionedTopicAsync(topicName, partition).whenComplete((res, e) -> {
+                    if (e != null && !(e instanceof PulsarAdminException.ConflictException)) {
+                        log.error("Initial system topic " + topicName + "failed.", e);
+                        doCreateSystemTopic(future, topicName, partition);
+                        return;
+                    }
+                    future.complete(null);
+                });
+    }
+
+    private void initLedgerDeletionSystemTopic() {
+        List<CompletableFuture<Void>> futures = new ArrayList<>();
+
+        futures.add(initSystemTopic(SystemTopicNames.LEDGER_DELETION_TOPIC.getPartitionedTopicName(),
+                this.ledgerDeletionParallelism));
+        futures.add(initSystemTopic(SystemTopicNames.LEDGER_DELETION_RETRY_TOPIC.getPartitionedTopicName(), 1));
+        futures.add(initSystemTopic(SystemTopicNames.LEDGER_DELETION_DLQ_TOPIC.getPartitionedTopicName(), 1));
+
+        FutureUtil.waitForAll(futures).whenComplete((res, e) -> {
+            initReaderFuture();
+            initWriterFuture();
+        });
+    }
+
+    private void readMorePendingDeleteLedger(LedgerDeletionSystemTopicClient.PendingDeleteLedgerReader reader) {
+        reader.readNextAsync().whenComplete((msg, ex) -> {
+            if (ex == null) {
+                handleMessage(reader, msg).exceptionally(e -> {
+                    log.warn("Delete ledger {} failed.", msg.getValue(), e);
+                    return null;
+                });
+                readMorePendingDeleteLedger(reader);
+            } else {
+                Throwable cause = FutureUtil.unwrapCompletionException(ex);
+                if (cause instanceof PulsarClientException.AlreadyClosedException) {
+                    log.error("Read more pending delete ledger exception, close the read now!", ex);
+                    reader.closeAsync();
+                    initReaderFuture();
+                } else {
+                    log.warn("Read more pending delete ledger exception, read again.", ex);
+                    readMorePendingDeleteLedger(reader);
+                }
+            }
+        });
+    }
+
+    private void initReaderFuture() {
+        this.readerFuture = ledgerDeletionTopicClient.newReaderAsync().whenComplete((reader, ex) -> {
+            if (ex != null) {
+                log.error("Failed to create reader on ledger deletion system topic", ex);
+                initReaderFuture();
+            } else {
+                readMorePendingDeleteLedger((LedgerDeletionSystemTopicClient.PendingDeleteLedgerReader) reader);
+            }
+        });
+    }
+
+    private void initWriterFuture() {
+        this.writerFuture = ledgerDeletionTopicClient.newWriterAsync().whenComplete((writer, ex) -> {
+            if (ex != null) {
+                log.error("Failed to create writer on ledger deletion system topic", ex);
+                initWriterFuture();
+            }
+        });
+    }
+
+    private String tuneTopicName(String topicName) {
+        if (topicName.contains("/" + TopicDomain.persistent.value())) {
+            return topicName.replaceFirst("/" + TopicDomain.persistent.value(), "");
+        }
+        return topicName;
+    }
+
+    @Override
+    public CompletableFuture<?> appendPendingDeleteLedger(String topicName, long ledgerId, LedgerInfo context,
+                                                          LedgerComponent component, LedgerType type,
+                                                          Map<String, String> properties) {
+        topicName = tuneTopicName(topicName);
+        PendingDeleteLedgerInfo pendingDeleteLedger = null;
+        if (LedgerType.LEDGER == type) {
+            pendingDeleteLedger =
+                    new PendingDeleteLedgerInfo(topicName, component, type, ledgerId, context, properties);
+        } else if (LedgerType.OFFLOAD_LEDGER == type) {
+            if (!context.getOffloadContext().hasUidMsb()) {
+                CompletableFuture<?> future = new CompletableFuture<>();
+                future.completeExceptionally(
+                        new IllegalArgumentException("The ledger " + ledgerId + " didn't offload."));
+                return future;
+            }
+            pendingDeleteLedger =
+                    new PendingDeleteLedgerInfo(topicName, component, type, ledgerId, context, properties);
+        }
+        return sendMessage(pendingDeleteLedger);
+    }
+
+    @Override
+    public void close() throws Exception {
+        asyncClose().get();
+    }
+
+    @Override
+    public CompletableFuture<?> asyncClose() {
+        if (readerFuture != null && !readerFuture.isCompletedExceptionally()) {
+            return readerFuture.thenCompose(SystemTopicClient.Reader::closeAsync);
+        }
+        return CompletableFuture.completedFuture(null);
+    }
+
+    @Override
+    public boolean isTopicTwoPhaseDeletionEnabled() {
+        return true;
+    }
+
+    private CompletableFuture<?> handleMessage(LedgerDeletionSystemTopicClient.PendingDeleteLedgerReader reader,
+                                               Message<PendingDeleteLedgerInfo> message) {
+        CompletableFuture<?> future = new CompletableFuture<>();
+        deleteInBroker(message).whenComplete((res, ex) -> {
+            if (ex != null) {
+                if (ex instanceof PulsarAdminException.NotFoundException) {
+                    deleteLocally(message).whenComplete((res1, ex1) -> {
+                        if (ex1 != null) {
+                            if (ex1 instanceof PendingDeleteLedgerInvalidException) {
+                                if (log.isDebugEnabled()) {
+                                    log.debug("Received invalid pending delete ledger {}, invalid reason: {}",
+                                            message.getValue().getLedgerId(), ex1.getMessage());
+                                }
+                                reader.ackMessageAsync(message);
+                                future.complete(null);
+                                return;
+                            }
+                            reader.reconsumeLaterAsync(message);
+                            future.completeExceptionally(ex1);
+                        }
+                        reader.ackMessageAsync(message);
+                        future.complete(null);
+                    });
+                } else if (ex instanceof PendingDeleteLedgerInvalidException) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Received invalid pending delete ledger {}, invalid reason: {}",
+                                message.getValue().getLedgerId(), ex.getMessage());
+                    }
+                    reader.ackMessageAsync(message);
+                    future.complete(null);
+                } else if (ex instanceof PulsarAdminException.ServerSideErrorException) {
+

Review Comment:
   Do we need to deal with this exception?



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2571,70 +2648,107 @@ void internalTrimLedgers(boolean isTruncate, CompletableFuture<?> promise) {
             advanceCursorsIfNecessary(ledgersToDelete);
 
             PositionImpl currentLastConfirmedEntry = lastConfirmedEntry;
-            // Update metadata
-            for (LedgerInfo ls : ledgersToDelete) {
-                if (currentLastConfirmedEntry != null && ls.getLedgerId() == currentLastConfirmedEntry.getLedgerId()) {
-                    // this info is relevant because the lastMessageId won't be available anymore
-                    log.info("[{}] Ledger {} contains the current last confirmed entry {}, and it is going to be "
-                             + "deleted", name, ls.getLedgerId(), currentLastConfirmedEntry);
-                }
 
-                invalidateReadHandle(ls.getLedgerId());
-
-                ledgers.remove(ls.getLedgerId());
-                NUMBER_OF_ENTRIES_UPDATER.addAndGet(this, -ls.getEntries());
-                TOTAL_SIZE_UPDATER.addAndGet(this, -ls.getSize());
-
-                entryCache.invalidateAllEntries(ls.getLedgerId());
-            }
-            for (LedgerInfo ls : offloadedLedgersToDelete) {
-                LedgerInfo.Builder newInfoBuilder = ls.toBuilder();
-                newInfoBuilder.getOffloadContextBuilder().setBookkeeperDeleted(true);
-                String driverName = OffloadUtils.getOffloadDriverName(ls,
-                        config.getLedgerOffloader().getOffloadDriverName());
-                Map<String, String> driverMetadata = OffloadUtils.getOffloadDriverMetadata(ls,
-                        config.getLedgerOffloader().getOffloadDriverMetadata());
-                OffloadUtils.setOffloadDriverMetadata(newInfoBuilder, driverName, driverMetadata);
-                ledgers.put(ls.getLedgerId(), newInfoBuilder.build());
-            }
+            // Update metadata
+            // Mark deletable ledgers
+            Set<Long> deletableLedgers =
+                    Stream.concat(ledgersToDelete.stream().filter(ls -> !ls.getOffloadContext().getBookkeeperDeleted()),
+                            offloadedLedgersToDelete.stream()).map(LedgerInfo::getLedgerId).collect(Collectors.toSet());
+
+            // Mark deletable offloaded ledgers
+            Set<Long> deletableOffloadedLedgers = ledgersToDelete.stream()
+                    .filter(ls -> ls.getOffloadContext().hasUidMsb())
+                    .map(LedgerInfo::getLedgerId).collect(Collectors.toSet());
+
+            CompletableFuture<?> appendDeleteLedgerFuture =
+                    appendPendingDeleteLedger(deletableLedgers, deletableOffloadedLedgers);
+            appendDeleteLedgerFuture.thenAccept(ignore -> {
+                believedDeleteIds.addAll(deletableLedgers);

Review Comment:
   If the `ledgerDeletionService` not enabled, the `believedDeleteIds` set will keep increasing and doesn't have chance to delete items.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedLedgerDeletionService.java:
##########
@@ -0,0 +1,583 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import static org.apache.bookkeeper.mledger.util.Errors.isNoSuchLedgerExistsException;
+import com.google.common.collect.Maps;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.mledger.LedgerOffloader;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.deletion.LedgerComponent;
+import org.apache.bookkeeper.mledger.deletion.LedgerDeletionService;
+import org.apache.bookkeeper.mledger.deletion.LedgerType;
+import org.apache.bookkeeper.mledger.deletion.PendingDeleteLedgerInfo;
+import org.apache.bookkeeper.mledger.deletion.PendingDeleteLedgerInvalidException;
+import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.offload.OffloadUtils;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
+import org.apache.bookkeeper.stats.NullStatsProvider;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stats.StatsProvider;
+import org.apache.bookkeeper.util.MathUtils;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.stats.prometheus.metrics.PrometheusMetricsProvider;
+import org.apache.pulsar.broker.systopic.LedgerDeletionSystemTopicClient;
+import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
+import org.apache.pulsar.broker.systopic.SystemTopicClient;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.common.events.EventType;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.SystemTopicNames;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
+import org.apache.pulsar.common.protocol.topic.DeleteLedgerPayload;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SystemTopicBasedLedgerDeletionService implements LedgerDeletionService {
+
+    private final NamespaceEventsSystemTopicFactory namespaceEventsSystemTopicFactory;
+
+    private static final Logger log = LoggerFactory.getLogger(SystemTopicBasedLedgerDeletionService.class);
+
+    private final PulsarAdmin pulsarAdmin;
+
+    private final int ledgerDeletionParallelism;
+
+    private final ServiceConfiguration serviceConfiguration;
+
+    private final Map<OffloadPoliciesImpl, LedgerOffloader> offloaderMap = new ConcurrentHashMap<>();
+
+    private StatsProvider statsProvider = new NullStatsProvider();
+
+    private OpStatsLogger deleteLedgerOpLogger;
+
+    private OpStatsLogger deleteOffloadLedgerOpLogger;
+
+    private final BookKeeper bookKeeper;
+
+    private final PulsarService pulsarService;
+
+    private SystemTopicClient<PendingDeleteLedgerInfo> ledgerDeletionTopicClient;
+
+    private transient CompletableFuture<SystemTopicClient.Reader<PendingDeleteLedgerInfo>> readerFuture;
+
+    private transient CompletableFuture<SystemTopicClient.Writer<PendingDeleteLedgerInfo>> writerFuture;
+
+    public SystemTopicBasedLedgerDeletionService(PulsarService pulsarService, ServiceConfiguration serviceConfiguration)
+            throws PulsarServerException {
+        this.namespaceEventsSystemTopicFactory = new NamespaceEventsSystemTopicFactory(pulsarService.getClient());
+        this.pulsarService = pulsarService;
+        this.pulsarAdmin = pulsarService.getAdminClient();
+        this.bookKeeper = pulsarService.getBookKeeperClient();
+        this.serviceConfiguration = serviceConfiguration;
+        this.ledgerDeletionParallelism = serviceConfiguration.getLedgerDeletionParallelismOfTopicTwoPhaseDeletion();
+    }
+
+    private SystemTopicClient<PendingDeleteLedgerInfo> getLedgerDeletionTopicClient() throws PulsarClientException {
+        TopicName ledgerDeletionSystemTopic =
+                NamespaceEventsSystemTopicFactory.getSystemTopicName(NamespaceName.SYSTEM_NAMESPACE,
+                        EventType.LEDGER_DELETION);
+        if (ledgerDeletionSystemTopic == null) {
+            throw new PulsarClientException.InvalidTopicNameException(
+                    "Can't create SystemTopicBasedLedgerDeletionService, " + "because the topicName is null!");
+        }
+        return namespaceEventsSystemTopicFactory.createLedgerDeletionSystemTopicClient(ledgerDeletionSystemTopic,
+                serviceConfiguration.getSendDelayOfTopicTwoPhaseDeletionInSeconds(),
+                serviceConfiguration.getReconsumeLaterOfTopicTwoPhaseDeletionInSeconds());
+    }
+
+    @Override
+    public void start() throws PulsarClientException, PulsarAdminException {
+        this.ledgerDeletionTopicClient = getLedgerDeletionTopicClient();
+        initStatsLogger();
+        initLedgerDeletionSystemTopic();
+    }
+
+    private void initStatsLogger() {
+        Configuration configuration = new ClientConfiguration();
+        if (serviceConfiguration.isBookkeeperClientExposeStatsToPrometheus()) {
+            configuration.addProperty(PrometheusMetricsProvider.PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS,
+                    serviceConfiguration.getManagedLedgerPrometheusStatsLatencyRolloverSeconds());
+            configuration.addProperty(PrometheusMetricsProvider.CLUSTER_NAME, serviceConfiguration.getClusterName());
+            this.statsProvider = new PrometheusMetricsProvider();
+        }
+        this.statsProvider.start(configuration);
+        StatsLogger statsLogger = statsProvider.getStatsLogger("pulsar_ledger_deletion");
+        this.deleteLedgerOpLogger = statsLogger.getOpStatsLogger("delete_ledger");
+        this.deleteOffloadLedgerOpLogger = statsLogger.getOpStatsLogger("delete_offload_ledger");
+    }
+
+    private CompletableFuture<Void> initSystemTopic(String topicName, int partition) {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        doCreateSystemTopic(future, topicName, partition);
+        return future;
+    }
+
+    private void doCreateSystemTopic(CompletableFuture<Void> future, String topicName, int partition) {
+        this.pulsarAdmin.topics()
+                .createPartitionedTopicAsync(topicName, partition).whenComplete((res, e) -> {
+                    if (e != null && !(e instanceof PulsarAdminException.ConflictException)) {
+                        log.error("Initial system topic " + topicName + "failed.", e);
+                        doCreateSystemTopic(future, topicName, partition);
+                        return;
+                    }
+                    future.complete(null);
+                });
+    }
+
+    private void initLedgerDeletionSystemTopic() {
+        List<CompletableFuture<Void>> futures = new ArrayList<>();
+
+        futures.add(initSystemTopic(SystemTopicNames.LEDGER_DELETION_TOPIC.getPartitionedTopicName(),
+                this.ledgerDeletionParallelism));
+        futures.add(initSystemTopic(SystemTopicNames.LEDGER_DELETION_RETRY_TOPIC.getPartitionedTopicName(), 1));
+        futures.add(initSystemTopic(SystemTopicNames.LEDGER_DELETION_DLQ_TOPIC.getPartitionedTopicName(), 1));
+
+        FutureUtil.waitForAll(futures).whenComplete((res, e) -> {
+            initReaderFuture();
+            initWriterFuture();
+        });
+    }
+
+    private void readMorePendingDeleteLedger(LedgerDeletionSystemTopicClient.PendingDeleteLedgerReader reader) {
+        reader.readNextAsync().whenComplete((msg, ex) -> {
+            if (ex == null) {
+                handleMessage(reader, msg).exceptionally(e -> {
+                    log.warn("Delete ledger {} failed.", msg.getValue(), e);
+                    return null;
+                });
+                readMorePendingDeleteLedger(reader);
+            } else {
+                Throwable cause = FutureUtil.unwrapCompletionException(ex);
+                if (cause instanceof PulsarClientException.AlreadyClosedException) {
+                    log.error("Read more pending delete ledger exception, close the read now!", ex);
+                    reader.closeAsync();
+                    initReaderFuture();
+                } else {
+                    log.warn("Read more pending delete ledger exception, read again.", ex);
+                    readMorePendingDeleteLedger(reader);
+                }
+            }
+        });
+    }
+
+    private void initReaderFuture() {
+        this.readerFuture = ledgerDeletionTopicClient.newReaderAsync().whenComplete((reader, ex) -> {
+            if (ex != null) {
+                log.error("Failed to create reader on ledger deletion system topic", ex);
+                initReaderFuture();
+            } else {
+                readMorePendingDeleteLedger((LedgerDeletionSystemTopicClient.PendingDeleteLedgerReader) reader);
+            }
+        });
+    }
+
+    private void initWriterFuture() {
+        this.writerFuture = ledgerDeletionTopicClient.newWriterAsync().whenComplete((writer, ex) -> {
+            if (ex != null) {
+                log.error("Failed to create writer on ledger deletion system topic", ex);
+                initWriterFuture();
+            }
+        });
+    }
+
+    private String tuneTopicName(String topicName) {
+        if (topicName.contains("/" + TopicDomain.persistent.value())) {
+            return topicName.replaceFirst("/" + TopicDomain.persistent.value(), "");
+        }
+        return topicName;
+    }
+
+    @Override
+    public CompletableFuture<?> appendPendingDeleteLedger(String topicName, long ledgerId, LedgerInfo context,
+                                                          LedgerComponent component, LedgerType type,
+                                                          Map<String, String> properties) {
+        topicName = tuneTopicName(topicName);
+        PendingDeleteLedgerInfo pendingDeleteLedger = null;
+        if (LedgerType.LEDGER == type) {
+            pendingDeleteLedger =
+                    new PendingDeleteLedgerInfo(topicName, component, type, ledgerId, context, properties);
+        } else if (LedgerType.OFFLOAD_LEDGER == type) {
+            if (!context.getOffloadContext().hasUidMsb()) {
+                CompletableFuture<?> future = new CompletableFuture<>();
+                future.completeExceptionally(
+                        new IllegalArgumentException("The ledger " + ledgerId + " didn't offload."));
+                return future;
+            }
+            pendingDeleteLedger =
+                    new PendingDeleteLedgerInfo(topicName, component, type, ledgerId, context, properties);
+        }
+        return sendMessage(pendingDeleteLedger);
+    }
+
+    @Override
+    public void close() throws Exception {

Review Comment:
   We need to stop the `statsProvider` on close



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedLedgerDeletionService.java:
##########
@@ -0,0 +1,583 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import static org.apache.bookkeeper.mledger.util.Errors.isNoSuchLedgerExistsException;
+import com.google.common.collect.Maps;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.mledger.LedgerOffloader;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.deletion.LedgerComponent;
+import org.apache.bookkeeper.mledger.deletion.LedgerDeletionService;
+import org.apache.bookkeeper.mledger.deletion.LedgerType;
+import org.apache.bookkeeper.mledger.deletion.PendingDeleteLedgerInfo;
+import org.apache.bookkeeper.mledger.deletion.PendingDeleteLedgerInvalidException;
+import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.offload.OffloadUtils;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
+import org.apache.bookkeeper.stats.NullStatsProvider;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stats.StatsProvider;
+import org.apache.bookkeeper.util.MathUtils;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.stats.prometheus.metrics.PrometheusMetricsProvider;
+import org.apache.pulsar.broker.systopic.LedgerDeletionSystemTopicClient;
+import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
+import org.apache.pulsar.broker.systopic.SystemTopicClient;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.common.events.EventType;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.SystemTopicNames;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
+import org.apache.pulsar.common.protocol.topic.DeleteLedgerPayload;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SystemTopicBasedLedgerDeletionService implements LedgerDeletionService {
+
+    private final NamespaceEventsSystemTopicFactory namespaceEventsSystemTopicFactory;
+
+    private static final Logger log = LoggerFactory.getLogger(SystemTopicBasedLedgerDeletionService.class);
+
+    private final PulsarAdmin pulsarAdmin;
+
+    private final int ledgerDeletionParallelism;
+
+    private final ServiceConfiguration serviceConfiguration;
+
+    private final Map<OffloadPoliciesImpl, LedgerOffloader> offloaderMap = new ConcurrentHashMap<>();
+
+    private StatsProvider statsProvider = new NullStatsProvider();
+
+    private OpStatsLogger deleteLedgerOpLogger;
+
+    private OpStatsLogger deleteOffloadLedgerOpLogger;
+
+    private final BookKeeper bookKeeper;
+
+    private final PulsarService pulsarService;
+
+    private SystemTopicClient<PendingDeleteLedgerInfo> ledgerDeletionTopicClient;
+
+    private transient CompletableFuture<SystemTopicClient.Reader<PendingDeleteLedgerInfo>> readerFuture;
+
+    private transient CompletableFuture<SystemTopicClient.Writer<PendingDeleteLedgerInfo>> writerFuture;
+
+    public SystemTopicBasedLedgerDeletionService(PulsarService pulsarService, ServiceConfiguration serviceConfiguration)
+            throws PulsarServerException {
+        this.namespaceEventsSystemTopicFactory = new NamespaceEventsSystemTopicFactory(pulsarService.getClient());
+        this.pulsarService = pulsarService;
+        this.pulsarAdmin = pulsarService.getAdminClient();
+        this.bookKeeper = pulsarService.getBookKeeperClient();
+        this.serviceConfiguration = serviceConfiguration;
+        this.ledgerDeletionParallelism = serviceConfiguration.getLedgerDeletionParallelismOfTopicTwoPhaseDeletion();
+    }
+
+    private SystemTopicClient<PendingDeleteLedgerInfo> getLedgerDeletionTopicClient() throws PulsarClientException {
+        TopicName ledgerDeletionSystemTopic =
+                NamespaceEventsSystemTopicFactory.getSystemTopicName(NamespaceName.SYSTEM_NAMESPACE,
+                        EventType.LEDGER_DELETION);
+        if (ledgerDeletionSystemTopic == null) {
+            throw new PulsarClientException.InvalidTopicNameException(
+                    "Can't create SystemTopicBasedLedgerDeletionService, " + "because the topicName is null!");
+        }
+        return namespaceEventsSystemTopicFactory.createLedgerDeletionSystemTopicClient(ledgerDeletionSystemTopic,
+                serviceConfiguration.getSendDelayOfTopicTwoPhaseDeletionInSeconds(),
+                serviceConfiguration.getReconsumeLaterOfTopicTwoPhaseDeletionInSeconds());
+    }
+
+    @Override
+    public void start() throws PulsarClientException, PulsarAdminException {
+        this.ledgerDeletionTopicClient = getLedgerDeletionTopicClient();
+        initStatsLogger();
+        initLedgerDeletionSystemTopic();
+    }
+
+    private void initStatsLogger() {
+        Configuration configuration = new ClientConfiguration();
+        if (serviceConfiguration.isBookkeeperClientExposeStatsToPrometheus()) {
+            configuration.addProperty(PrometheusMetricsProvider.PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS,
+                    serviceConfiguration.getManagedLedgerPrometheusStatsLatencyRolloverSeconds());
+            configuration.addProperty(PrometheusMetricsProvider.CLUSTER_NAME, serviceConfiguration.getClusterName());
+            this.statsProvider = new PrometheusMetricsProvider();
+        }
+        this.statsProvider.start(configuration);
+        StatsLogger statsLogger = statsProvider.getStatsLogger("pulsar_ledger_deletion");
+        this.deleteLedgerOpLogger = statsLogger.getOpStatsLogger("delete_ledger");
+        this.deleteOffloadLedgerOpLogger = statsLogger.getOpStatsLogger("delete_offload_ledger");
+    }
+
+    private CompletableFuture<Void> initSystemTopic(String topicName, int partition) {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        doCreateSystemTopic(future, topicName, partition);
+        return future;
+    }
+
+    private void doCreateSystemTopic(CompletableFuture<Void> future, String topicName, int partition) {
+        this.pulsarAdmin.topics()
+                .createPartitionedTopicAsync(topicName, partition).whenComplete((res, e) -> {
+                    if (e != null && !(e instanceof PulsarAdminException.ConflictException)) {
+                        log.error("Initial system topic " + topicName + "failed.", e);
+                        doCreateSystemTopic(future, topicName, partition);
+                        return;
+                    }
+                    future.complete(null);
+                });
+    }
+
+    private void initLedgerDeletionSystemTopic() {
+        List<CompletableFuture<Void>> futures = new ArrayList<>();
+
+        futures.add(initSystemTopic(SystemTopicNames.LEDGER_DELETION_TOPIC.getPartitionedTopicName(),
+                this.ledgerDeletionParallelism));
+        futures.add(initSystemTopic(SystemTopicNames.LEDGER_DELETION_RETRY_TOPIC.getPartitionedTopicName(), 1));
+        futures.add(initSystemTopic(SystemTopicNames.LEDGER_DELETION_DLQ_TOPIC.getPartitionedTopicName(), 1));
+
+        FutureUtil.waitForAll(futures).whenComplete((res, e) -> {
+            initReaderFuture();
+            initWriterFuture();
+        });
+    }
+
+    private void readMorePendingDeleteLedger(LedgerDeletionSystemTopicClient.PendingDeleteLedgerReader reader) {
+        reader.readNextAsync().whenComplete((msg, ex) -> {
+            if (ex == null) {
+                handleMessage(reader, msg).exceptionally(e -> {
+                    log.warn("Delete ledger {} failed.", msg.getValue(), e);
+                    return null;
+                });
+                readMorePendingDeleteLedger(reader);
+            } else {
+                Throwable cause = FutureUtil.unwrapCompletionException(ex);
+                if (cause instanceof PulsarClientException.AlreadyClosedException) {
+                    log.error("Read more pending delete ledger exception, close the read now!", ex);
+                    reader.closeAsync();
+                    initReaderFuture();
+                } else {
+                    log.warn("Read more pending delete ledger exception, read again.", ex);
+                    readMorePendingDeleteLedger(reader);
+                }
+            }
+        });
+    }
+
+    private void initReaderFuture() {
+        this.readerFuture = ledgerDeletionTopicClient.newReaderAsync().whenComplete((reader, ex) -> {
+            if (ex != null) {
+                log.error("Failed to create reader on ledger deletion system topic", ex);
+                initReaderFuture();
+            } else {
+                readMorePendingDeleteLedger((LedgerDeletionSystemTopicClient.PendingDeleteLedgerReader) reader);
+            }
+        });
+    }
+
+    private void initWriterFuture() {
+        this.writerFuture = ledgerDeletionTopicClient.newWriterAsync().whenComplete((writer, ex) -> {
+            if (ex != null) {
+                log.error("Failed to create writer on ledger deletion system topic", ex);
+                initWriterFuture();
+            }
+        });
+    }
+
+    private String tuneTopicName(String topicName) {
+        if (topicName.contains("/" + TopicDomain.persistent.value())) {
+            return topicName.replaceFirst("/" + TopicDomain.persistent.value(), "");
+        }
+        return topicName;
+    }
+
+    @Override
+    public CompletableFuture<?> appendPendingDeleteLedger(String topicName, long ledgerId, LedgerInfo context,
+                                                          LedgerComponent component, LedgerType type,
+                                                          Map<String, String> properties) {
+        topicName = tuneTopicName(topicName);
+        PendingDeleteLedgerInfo pendingDeleteLedger = null;
+        if (LedgerType.LEDGER == type) {
+            pendingDeleteLedger =
+                    new PendingDeleteLedgerInfo(topicName, component, type, ledgerId, context, properties);
+        } else if (LedgerType.OFFLOAD_LEDGER == type) {
+            if (!context.getOffloadContext().hasUidMsb()) {
+                CompletableFuture<?> future = new CompletableFuture<>();
+                future.completeExceptionally(
+                        new IllegalArgumentException("The ledger " + ledgerId + " didn't offload."));
+                return future;
+            }
+            pendingDeleteLedger =
+                    new PendingDeleteLedgerInfo(topicName, component, type, ledgerId, context, properties);
+        }
+        return sendMessage(pendingDeleteLedger);
+    }
+
+    @Override
+    public void close() throws Exception {
+        asyncClose().get();
+    }
+
+    @Override
+    public CompletableFuture<?> asyncClose() {
+        if (readerFuture != null && !readerFuture.isCompletedExceptionally()) {
+            return readerFuture.thenCompose(SystemTopicClient.Reader::closeAsync);
+        }
+        return CompletableFuture.completedFuture(null);
+    }
+
+    @Override
+    public boolean isTopicTwoPhaseDeletionEnabled() {
+        return true;
+    }
+
+    private CompletableFuture<?> handleMessage(LedgerDeletionSystemTopicClient.PendingDeleteLedgerReader reader,
+                                               Message<PendingDeleteLedgerInfo> message) {
+        CompletableFuture<?> future = new CompletableFuture<>();
+        deleteInBroker(message).whenComplete((res, ex) -> {
+            if (ex != null) {
+                if (ex instanceof PulsarAdminException.NotFoundException) {
+                    deleteLocally(message).whenComplete((res1, ex1) -> {
+                        if (ex1 != null) {
+                            if (ex1 instanceof PendingDeleteLedgerInvalidException) {
+                                if (log.isDebugEnabled()) {
+                                    log.debug("Received invalid pending delete ledger {}, invalid reason: {}",
+                                            message.getValue().getLedgerId(), ex1.getMessage());
+                                }
+                                reader.ackMessageAsync(message);
+                                future.complete(null);
+                                return;
+                            }
+                            reader.reconsumeLaterAsync(message);
+                            future.completeExceptionally(ex1);
+                        }
+                        reader.ackMessageAsync(message);
+                        future.complete(null);
+                    });
+                } else if (ex instanceof PendingDeleteLedgerInvalidException) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Received invalid pending delete ledger {}, invalid reason: {}",
+                                message.getValue().getLedgerId(), ex.getMessage());
+                    }
+                    reader.ackMessageAsync(message);
+                    future.complete(null);
+                } else if (ex instanceof PulsarAdminException.ServerSideErrorException) {
+
+                } else {
+                    reader.reconsumeLaterAsync(message);
+                    future.completeExceptionally(ex);
+                }
+                return;
+            }
+            reader.ackMessageAsync(message);
+            future.complete(null);
+        });
+        return future;
+    }
+
+    private CompletableFuture<?> deleteInBroker(Message<PendingDeleteLedgerInfo> message) {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        try {
+            PendingDeleteLedgerInfo pendingDeleteLedger = message.getValue();
+            //Now support managed_ledger two phase deletion.
+            if (LedgerComponent.MANAGED_LEDGER == pendingDeleteLedger.getLedgerComponent()) {
+                Long ledgerId = pendingDeleteLedger.getLedgerId();
+                String topicName = pendingDeleteLedger.getTopicName();
+                String ledgerType = pendingDeleteLedger.getLedgerType().name();
+                String ledgerComponent = pendingDeleteLedger.getLedgerComponent().name();
+                DeleteLedgerPayload deleteLedgerPayload =
+                        new DeleteLedgerPayload(ledgerId, topicName, ledgerType, ledgerComponent);
+                if (LedgerType.OFFLOAD_LEDGER == pendingDeleteLedger.getLedgerType()) {
+                    deleteLedgerPayload.setOffloadContext(buildOffloadContext(pendingDeleteLedger));
+                }
+                pulsarAdmin.topics().deleteLedgerAsync(deleteLedgerPayload).whenComplete((res, ex) -> {
+                    if (ex != null) {
+                        future.completeExceptionally(ex);
+                        return;
+                    }
+                    future.complete(null);
+                });
+            } else if (LedgerComponent.MANAGED_CURSOR == pendingDeleteLedger.getLedgerComponent()) {
+                future.complete(null);

Review Comment:
   Please add `@TODO` flag here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli commented on a diff in pull request #16590: [PIP-186] Introduce two phase deletion protocol based on system topic

Posted by GitBox <gi...@apache.org>.
eolivelli commented on code in PR #16590:
URL: https://github.com/apache/pulsar/pull/16590#discussion_r946912131


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/deletion/LedgerDeletionService.java:
##########
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.deletion;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+public interface LedgerDeletionService {
+
+    /**
+     * Start.
+     */
+    void start() throws PulsarClientException, PulsarAdminException;
+
+    /**
+     * @param topicName topicName
+     * @param ledgerId  ledgerId
+     * @param context   ledgerInfo
+     * @param component managed_ledger, managed_cursor, schema_storage
+     * @param type      ledger, offload_ledger
+     * @param properties properties
+     * @return
+     */
+    CompletableFuture<?> appendPendingDeleteLedger(String topicName, long ledgerId, LedgerInfo context,
+                                                   LedgerComponent component, LedgerType type, Map<String, String> properties);
+
+    /**
+     *
+     * @param topicName topicName
+     * @param ledgerId  ledgerId
+     * @param component managed_ledger, managed_cursor, schema_storage
+     * @param isBelievedDelete isBelievedDelete, if false, we should check the param is match the ledger metadata.
+     * @return
+     */
+    CompletableFuture<?> asyncDeleteLedger(String topicName, long ledgerId, LedgerComponent component,
+                                           boolean isBelievedDelete);
+
+    /**
+     *
+     * @param topicName topicName
+     * @param ledgerId ledgerId
+     * @param offloadContext offloadContext
+     * @return
+     */
+    CompletableFuture<?> asyncDeleteOffloadedLedger(String topicName, long ledgerId,
+                                                    MLDataFormats.OffloadContext offloadContext);
+
+    /**
+     * Close.
+     */
+    void close() throws Exception;
+
+    /**
+     * Async close.
+     */
+    CompletableFuture<?> asyncClose();
+
+    class LedgerDeletionServiceDisable implements LedgerDeletionService {
+
+        @Override
+        public void start() {
+            //No op
+        }
+
+        private static final CompletableFuture<?> COMPLETABLE_FUTURE = CompletableFuture.completedFuture(null);

Review Comment:
   you cannot cache CompletableFutures
   the behaviour will be unpredictable



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org