You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/09/05 07:55:44 UTC

[GitHub] [pulsar] hangc0276 commented on a diff in pull request #16590: [PIP-186] Introduce two phase deletion protocol based on system topic

hangc0276 commented on code in PR #16590:
URL: https://github.com/apache/pulsar/pull/16590#discussion_r962455243


##########
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java:
##########
@@ -2666,6 +2666,36 @@ public class ServiceConfiguration implements PulsarConfiguration {
         )
     private int managedLedgerInactiveLedgerRolloverTimeSeconds = 0;
 
+    @FieldContext(
+            dynamic = true,
+            category = CATEGORY_SERVER,
+            doc = "Using two phase deletion when delete ledger. if true, "
+                    + "LedgerDeletionService will take over ledger deletion. (Default false)"
+    )
+    private boolean topicTwoPhaseDeletionEnabled;

Review Comment:
   We should set the default value to `false`



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -1039,6 +1044,126 @@ private void internalUnloadTransactionCoordinatorAsync(AsyncResponse asyncRespon
                 });
     }
 
+    protected void internalDeleteLedger(AsyncResponse asyncResponse, boolean authoritative,
+                                        DeleteLedgerPayload deleteLedgerPayload) {
+        validateTopicOwnership(topicName, authoritative);
+
+        CompletableFuture<Void> ret;
+        if (topicName.isGlobal()) {
+            ret = validateGlobalNamespaceOwnershipAsync(namespaceName);
+        } else {
+            ret = CompletableFuture.completedFuture(null);
+        }
+        ret.thenAccept(__ -> {
+            long ledgerId = deleteLedgerPayload.getLedgerId();
+            log.info("[{}][{}] received delete ledger: {}", clientAppId(), topicName, ledgerId);
+            validateTopicOwnershipAsync(topicName, authoritative)
+                    .thenCompose(ignore ->
+                            //Is need to check delete_ledger operation?
+                            validateTopicOperationAsync(topicName, TopicOperation.DELETE_LEDGER))
+                    .thenCompose(ignore -> getTopicReferenceAsync(topicName))
+                    .thenCompose(topic -> {
+                        CompletableFuture<Void> future = new CompletableFuture<>();
+                        if (topic == null) {
+                            asyncResponse.resume(new RestException(Status.NOT_FOUND,
+                                    getTopicNotFoundErrorMessage(topicName.toString())));
+                            future.complete(null);
+                            return future;
+                        }
+                        ManagedLedger managedLedger = ((PersistentTopic) topic).getManagedLedger();
+                        DeleteLedgerPayload.OffloadContext context = deleteLedgerPayload.getOffloadContext();
+                        LedgerType ledgerType = LedgerType.valueOf(deleteLedgerPayload.getLedgerType());
+                        if (LedgerType.LEDGER == ledgerType) {
+                            managedLedger.asyncDeleteLedger(topicName.getPersistenceNamingEncoding(), ledgerId,
+                                    ledgerType, null).whenComplete((res, ex) -> {
+                                        if (ex != null) {
+                                            if (ex instanceof PendingDeleteLedgerInvalidException) {
+                                                if (log.isDebugEnabled()) {
+                                                    log.debug("[{}][{}] Received invalid pending delete ledger {},"
+                                                                    + " invalid reason: {}", clientAppId(), topicName,
+                                                            ledgerId, ex.getMessage());
+                                                }
+                                                future.complete(null);
+                                                return;
+                                            }
+                                            future.completeExceptionally(ex);
+                                            return;
+                                        }
+                                        future.complete(null);
+                                    });
+                        } else if (LedgerType.OFFLOAD_LEDGER == ledgerType) {

Review Comment:
   Can we merge these two cases into one? The main differences are the `offloadContext` parameter. In `managedledger.asyncDeleteLedger` method, we also checked the `ledgerType`



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedLedgerDeletionService.java:
##########
@@ -0,0 +1,583 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import static org.apache.bookkeeper.mledger.util.Errors.isNoSuchLedgerExistsException;
+import com.google.common.collect.Maps;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.mledger.LedgerOffloader;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.deletion.LedgerComponent;
+import org.apache.bookkeeper.mledger.deletion.LedgerDeletionService;
+import org.apache.bookkeeper.mledger.deletion.LedgerType;
+import org.apache.bookkeeper.mledger.deletion.PendingDeleteLedgerInfo;
+import org.apache.bookkeeper.mledger.deletion.PendingDeleteLedgerInvalidException;
+import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.offload.OffloadUtils;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
+import org.apache.bookkeeper.stats.NullStatsProvider;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stats.StatsProvider;
+import org.apache.bookkeeper.util.MathUtils;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.stats.prometheus.metrics.PrometheusMetricsProvider;
+import org.apache.pulsar.broker.systopic.LedgerDeletionSystemTopicClient;
+import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
+import org.apache.pulsar.broker.systopic.SystemTopicClient;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.common.events.EventType;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.SystemTopicNames;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
+import org.apache.pulsar.common.protocol.topic.DeleteLedgerPayload;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SystemTopicBasedLedgerDeletionService implements LedgerDeletionService {
+
+    private final NamespaceEventsSystemTopicFactory namespaceEventsSystemTopicFactory;
+
+    private static final Logger log = LoggerFactory.getLogger(SystemTopicBasedLedgerDeletionService.class);
+
+    private final PulsarAdmin pulsarAdmin;
+
+    private final int ledgerDeletionParallelism;
+
+    private final ServiceConfiguration serviceConfiguration;
+
+    private final Map<OffloadPoliciesImpl, LedgerOffloader> offloaderMap = new ConcurrentHashMap<>();
+
+    private StatsProvider statsProvider = new NullStatsProvider();
+
+    private OpStatsLogger deleteLedgerOpLogger;
+
+    private OpStatsLogger deleteOffloadLedgerOpLogger;
+
+    private final BookKeeper bookKeeper;
+
+    private final PulsarService pulsarService;
+
+    private SystemTopicClient<PendingDeleteLedgerInfo> ledgerDeletionTopicClient;
+
+    private transient CompletableFuture<SystemTopicClient.Reader<PendingDeleteLedgerInfo>> readerFuture;
+
+    private transient CompletableFuture<SystemTopicClient.Writer<PendingDeleteLedgerInfo>> writerFuture;
+
+    public SystemTopicBasedLedgerDeletionService(PulsarService pulsarService, ServiceConfiguration serviceConfiguration)
+            throws PulsarServerException {
+        this.namespaceEventsSystemTopicFactory = new NamespaceEventsSystemTopicFactory(pulsarService.getClient());
+        this.pulsarService = pulsarService;
+        this.pulsarAdmin = pulsarService.getAdminClient();
+        this.bookKeeper = pulsarService.getBookKeeperClient();
+        this.serviceConfiguration = serviceConfiguration;
+        this.ledgerDeletionParallelism = serviceConfiguration.getLedgerDeletionParallelismOfTopicTwoPhaseDeletion();
+    }
+
+    private SystemTopicClient<PendingDeleteLedgerInfo> getLedgerDeletionTopicClient() throws PulsarClientException {
+        TopicName ledgerDeletionSystemTopic =
+                NamespaceEventsSystemTopicFactory.getSystemTopicName(NamespaceName.SYSTEM_NAMESPACE,
+                        EventType.LEDGER_DELETION);
+        if (ledgerDeletionSystemTopic == null) {
+            throw new PulsarClientException.InvalidTopicNameException(
+                    "Can't create SystemTopicBasedLedgerDeletionService, " + "because the topicName is null!");
+        }
+        return namespaceEventsSystemTopicFactory.createLedgerDeletionSystemTopicClient(ledgerDeletionSystemTopic,
+                serviceConfiguration.getSendDelayOfTopicTwoPhaseDeletionInSeconds(),
+                serviceConfiguration.getReconsumeLaterOfTopicTwoPhaseDeletionInSeconds());
+    }
+
+    @Override
+    public void start() throws PulsarClientException, PulsarAdminException {
+        this.ledgerDeletionTopicClient = getLedgerDeletionTopicClient();
+        initStatsLogger();
+        initLedgerDeletionSystemTopic();
+    }
+
+    private void initStatsLogger() {
+        Configuration configuration = new ClientConfiguration();
+        if (serviceConfiguration.isBookkeeperClientExposeStatsToPrometheus()) {
+            configuration.addProperty(PrometheusMetricsProvider.PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS,
+                    serviceConfiguration.getManagedLedgerPrometheusStatsLatencyRolloverSeconds());
+            configuration.addProperty(PrometheusMetricsProvider.CLUSTER_NAME, serviceConfiguration.getClusterName());
+            this.statsProvider = new PrometheusMetricsProvider();
+        }
+        this.statsProvider.start(configuration);
+        StatsLogger statsLogger = statsProvider.getStatsLogger("pulsar_ledger_deletion");
+        this.deleteLedgerOpLogger = statsLogger.getOpStatsLogger("delete_ledger");

Review Comment:
   Does the metrics can be exported to the broker Prometheus port?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedLedgerDeletionService.java:
##########
@@ -0,0 +1,583 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import static org.apache.bookkeeper.mledger.util.Errors.isNoSuchLedgerExistsException;
+import com.google.common.collect.Maps;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.mledger.LedgerOffloader;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.deletion.LedgerComponent;
+import org.apache.bookkeeper.mledger.deletion.LedgerDeletionService;
+import org.apache.bookkeeper.mledger.deletion.LedgerType;
+import org.apache.bookkeeper.mledger.deletion.PendingDeleteLedgerInfo;
+import org.apache.bookkeeper.mledger.deletion.PendingDeleteLedgerInvalidException;
+import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.offload.OffloadUtils;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
+import org.apache.bookkeeper.stats.NullStatsProvider;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stats.StatsProvider;
+import org.apache.bookkeeper.util.MathUtils;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.stats.prometheus.metrics.PrometheusMetricsProvider;
+import org.apache.pulsar.broker.systopic.LedgerDeletionSystemTopicClient;
+import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
+import org.apache.pulsar.broker.systopic.SystemTopicClient;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.common.events.EventType;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.SystemTopicNames;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
+import org.apache.pulsar.common.protocol.topic.DeleteLedgerPayload;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SystemTopicBasedLedgerDeletionService implements LedgerDeletionService {
+
+    private final NamespaceEventsSystemTopicFactory namespaceEventsSystemTopicFactory;
+
+    private static final Logger log = LoggerFactory.getLogger(SystemTopicBasedLedgerDeletionService.class);
+
+    private final PulsarAdmin pulsarAdmin;
+
+    private final int ledgerDeletionParallelism;
+
+    private final ServiceConfiguration serviceConfiguration;
+
+    private final Map<OffloadPoliciesImpl, LedgerOffloader> offloaderMap = new ConcurrentHashMap<>();
+
+    private StatsProvider statsProvider = new NullStatsProvider();
+
+    private OpStatsLogger deleteLedgerOpLogger;
+
+    private OpStatsLogger deleteOffloadLedgerOpLogger;
+
+    private final BookKeeper bookKeeper;
+
+    private final PulsarService pulsarService;
+
+    private SystemTopicClient<PendingDeleteLedgerInfo> ledgerDeletionTopicClient;
+
+    private transient CompletableFuture<SystemTopicClient.Reader<PendingDeleteLedgerInfo>> readerFuture;
+
+    private transient CompletableFuture<SystemTopicClient.Writer<PendingDeleteLedgerInfo>> writerFuture;
+
+    public SystemTopicBasedLedgerDeletionService(PulsarService pulsarService, ServiceConfiguration serviceConfiguration)
+            throws PulsarServerException {
+        this.namespaceEventsSystemTopicFactory = new NamespaceEventsSystemTopicFactory(pulsarService.getClient());
+        this.pulsarService = pulsarService;
+        this.pulsarAdmin = pulsarService.getAdminClient();
+        this.bookKeeper = pulsarService.getBookKeeperClient();
+        this.serviceConfiguration = serviceConfiguration;
+        this.ledgerDeletionParallelism = serviceConfiguration.getLedgerDeletionParallelismOfTopicTwoPhaseDeletion();
+    }
+
+    private SystemTopicClient<PendingDeleteLedgerInfo> getLedgerDeletionTopicClient() throws PulsarClientException {
+        TopicName ledgerDeletionSystemTopic =
+                NamespaceEventsSystemTopicFactory.getSystemTopicName(NamespaceName.SYSTEM_NAMESPACE,
+                        EventType.LEDGER_DELETION);
+        if (ledgerDeletionSystemTopic == null) {
+            throw new PulsarClientException.InvalidTopicNameException(
+                    "Can't create SystemTopicBasedLedgerDeletionService, " + "because the topicName is null!");
+        }
+        return namespaceEventsSystemTopicFactory.createLedgerDeletionSystemTopicClient(ledgerDeletionSystemTopic,
+                serviceConfiguration.getSendDelayOfTopicTwoPhaseDeletionInSeconds(),
+                serviceConfiguration.getReconsumeLaterOfTopicTwoPhaseDeletionInSeconds());
+    }
+
+    @Override
+    public void start() throws PulsarClientException, PulsarAdminException {
+        this.ledgerDeletionTopicClient = getLedgerDeletionTopicClient();
+        initStatsLogger();
+        initLedgerDeletionSystemTopic();
+    }
+
+    private void initStatsLogger() {
+        Configuration configuration = new ClientConfiguration();
+        if (serviceConfiguration.isBookkeeperClientExposeStatsToPrometheus()) {
+            configuration.addProperty(PrometheusMetricsProvider.PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS,
+                    serviceConfiguration.getManagedLedgerPrometheusStatsLatencyRolloverSeconds());
+            configuration.addProperty(PrometheusMetricsProvider.CLUSTER_NAME, serviceConfiguration.getClusterName());
+            this.statsProvider = new PrometheusMetricsProvider();
+        }
+        this.statsProvider.start(configuration);
+        StatsLogger statsLogger = statsProvider.getStatsLogger("pulsar_ledger_deletion");
+        this.deleteLedgerOpLogger = statsLogger.getOpStatsLogger("delete_ledger");
+        this.deleteOffloadLedgerOpLogger = statsLogger.getOpStatsLogger("delete_offload_ledger");
+    }
+
+    private CompletableFuture<Void> initSystemTopic(String topicName, int partition) {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        doCreateSystemTopic(future, topicName, partition);
+        return future;
+    }
+
+    private void doCreateSystemTopic(CompletableFuture<Void> future, String topicName, int partition) {
+        this.pulsarAdmin.topics()
+                .createPartitionedTopicAsync(topicName, partition).whenComplete((res, e) -> {
+                    if (e != null && !(e instanceof PulsarAdminException.ConflictException)) {
+                        log.error("Initial system topic " + topicName + "failed.", e);
+                        doCreateSystemTopic(future, topicName, partition);
+                        return;
+                    }
+                    future.complete(null);
+                });
+    }
+
+    private void initLedgerDeletionSystemTopic() {
+        List<CompletableFuture<Void>> futures = new ArrayList<>();
+
+        futures.add(initSystemTopic(SystemTopicNames.LEDGER_DELETION_TOPIC.getPartitionedTopicName(),
+                this.ledgerDeletionParallelism));
+        futures.add(initSystemTopic(SystemTopicNames.LEDGER_DELETION_RETRY_TOPIC.getPartitionedTopicName(), 1));
+        futures.add(initSystemTopic(SystemTopicNames.LEDGER_DELETION_DLQ_TOPIC.getPartitionedTopicName(), 1));
+
+        FutureUtil.waitForAll(futures).whenComplete((res, e) -> {
+            initReaderFuture();
+            initWriterFuture();
+        });
+    }
+
+    private void readMorePendingDeleteLedger(LedgerDeletionSystemTopicClient.PendingDeleteLedgerReader reader) {
+        reader.readNextAsync().whenComplete((msg, ex) -> {
+            if (ex == null) {
+                handleMessage(reader, msg).exceptionally(e -> {
+                    log.warn("Delete ledger {} failed.", msg.getValue(), e);
+                    return null;
+                });
+                readMorePendingDeleteLedger(reader);
+            } else {
+                Throwable cause = FutureUtil.unwrapCompletionException(ex);
+                if (cause instanceof PulsarClientException.AlreadyClosedException) {
+                    log.error("Read more pending delete ledger exception, close the read now!", ex);
+                    reader.closeAsync();
+                    initReaderFuture();
+                } else {
+                    log.warn("Read more pending delete ledger exception, read again.", ex);
+                    readMorePendingDeleteLedger(reader);
+                }
+            }
+        });
+    }
+
+    private void initReaderFuture() {
+        this.readerFuture = ledgerDeletionTopicClient.newReaderAsync().whenComplete((reader, ex) -> {
+            if (ex != null) {
+                log.error("Failed to create reader on ledger deletion system topic", ex);
+                initReaderFuture();
+            } else {
+                readMorePendingDeleteLedger((LedgerDeletionSystemTopicClient.PendingDeleteLedgerReader) reader);
+            }
+        });
+    }
+
+    private void initWriterFuture() {
+        this.writerFuture = ledgerDeletionTopicClient.newWriterAsync().whenComplete((writer, ex) -> {
+            if (ex != null) {
+                log.error("Failed to create writer on ledger deletion system topic", ex);
+                initWriterFuture();
+            }
+        });
+    }
+
+    private String tuneTopicName(String topicName) {
+        if (topicName.contains("/" + TopicDomain.persistent.value())) {
+            return topicName.replaceFirst("/" + TopicDomain.persistent.value(), "");
+        }
+        return topicName;
+    }
+
+    @Override
+    public CompletableFuture<?> appendPendingDeleteLedger(String topicName, long ledgerId, LedgerInfo context,
+                                                          LedgerComponent component, LedgerType type,
+                                                          Map<String, String> properties) {
+        topicName = tuneTopicName(topicName);
+        PendingDeleteLedgerInfo pendingDeleteLedger = null;
+        if (LedgerType.LEDGER == type) {
+            pendingDeleteLedger =
+                    new PendingDeleteLedgerInfo(topicName, component, type, ledgerId, context, properties);
+        } else if (LedgerType.OFFLOAD_LEDGER == type) {
+            if (!context.getOffloadContext().hasUidMsb()) {
+                CompletableFuture<?> future = new CompletableFuture<>();
+                future.completeExceptionally(
+                        new IllegalArgumentException("The ledger " + ledgerId + " didn't offload."));
+                return future;
+            }
+            pendingDeleteLedger =
+                    new PendingDeleteLedgerInfo(topicName, component, type, ledgerId, context, properties);
+        }
+        return sendMessage(pendingDeleteLedger);
+    }
+
+    @Override
+    public void close() throws Exception {
+        asyncClose().get();
+    }
+
+    @Override
+    public CompletableFuture<?> asyncClose() {
+        if (readerFuture != null && !readerFuture.isCompletedExceptionally()) {
+            return readerFuture.thenCompose(SystemTopicClient.Reader::closeAsync);
+        }
+        return CompletableFuture.completedFuture(null);
+    }
+
+    @Override
+    public boolean isTopicTwoPhaseDeletionEnabled() {
+        return true;
+    }
+
+    private CompletableFuture<?> handleMessage(LedgerDeletionSystemTopicClient.PendingDeleteLedgerReader reader,
+                                               Message<PendingDeleteLedgerInfo> message) {
+        CompletableFuture<?> future = new CompletableFuture<>();
+        deleteInBroker(message).whenComplete((res, ex) -> {
+            if (ex != null) {
+                if (ex instanceof PulsarAdminException.NotFoundException) {
+                    deleteLocally(message).whenComplete((res1, ex1) -> {
+                        if (ex1 != null) {
+                            if (ex1 instanceof PendingDeleteLedgerInvalidException) {
+                                if (log.isDebugEnabled()) {
+                                    log.debug("Received invalid pending delete ledger {}, invalid reason: {}",
+                                            message.getValue().getLedgerId(), ex1.getMessage());
+                                }
+                                reader.ackMessageAsync(message);
+                                future.complete(null);
+                                return;
+                            }
+                            reader.reconsumeLaterAsync(message);
+                            future.completeExceptionally(ex1);
+                        }
+                        reader.ackMessageAsync(message);
+                        future.complete(null);
+                    });
+                } else if (ex instanceof PendingDeleteLedgerInvalidException) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Received invalid pending delete ledger {}, invalid reason: {}",
+                                message.getValue().getLedgerId(), ex.getMessage());
+                    }
+                    reader.ackMessageAsync(message);
+                    future.complete(null);
+                } else if (ex instanceof PulsarAdminException.ServerSideErrorException) {
+

Review Comment:
   Do we need to deal with this exception?



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2571,70 +2648,107 @@ void internalTrimLedgers(boolean isTruncate, CompletableFuture<?> promise) {
             advanceCursorsIfNecessary(ledgersToDelete);
 
             PositionImpl currentLastConfirmedEntry = lastConfirmedEntry;
-            // Update metadata
-            for (LedgerInfo ls : ledgersToDelete) {
-                if (currentLastConfirmedEntry != null && ls.getLedgerId() == currentLastConfirmedEntry.getLedgerId()) {
-                    // this info is relevant because the lastMessageId won't be available anymore
-                    log.info("[{}] Ledger {} contains the current last confirmed entry {}, and it is going to be "
-                             + "deleted", name, ls.getLedgerId(), currentLastConfirmedEntry);
-                }
 
-                invalidateReadHandle(ls.getLedgerId());
-
-                ledgers.remove(ls.getLedgerId());
-                NUMBER_OF_ENTRIES_UPDATER.addAndGet(this, -ls.getEntries());
-                TOTAL_SIZE_UPDATER.addAndGet(this, -ls.getSize());
-
-                entryCache.invalidateAllEntries(ls.getLedgerId());
-            }
-            for (LedgerInfo ls : offloadedLedgersToDelete) {
-                LedgerInfo.Builder newInfoBuilder = ls.toBuilder();
-                newInfoBuilder.getOffloadContextBuilder().setBookkeeperDeleted(true);
-                String driverName = OffloadUtils.getOffloadDriverName(ls,
-                        config.getLedgerOffloader().getOffloadDriverName());
-                Map<String, String> driverMetadata = OffloadUtils.getOffloadDriverMetadata(ls,
-                        config.getLedgerOffloader().getOffloadDriverMetadata());
-                OffloadUtils.setOffloadDriverMetadata(newInfoBuilder, driverName, driverMetadata);
-                ledgers.put(ls.getLedgerId(), newInfoBuilder.build());
-            }
+            // Update metadata
+            // Mark deletable ledgers
+            Set<Long> deletableLedgers =
+                    Stream.concat(ledgersToDelete.stream().filter(ls -> !ls.getOffloadContext().getBookkeeperDeleted()),
+                            offloadedLedgersToDelete.stream()).map(LedgerInfo::getLedgerId).collect(Collectors.toSet());
+
+            // Mark deletable offloaded ledgers
+            Set<Long> deletableOffloadedLedgers = ledgersToDelete.stream()
+                    .filter(ls -> ls.getOffloadContext().hasUidMsb())
+                    .map(LedgerInfo::getLedgerId).collect(Collectors.toSet());
+
+            CompletableFuture<?> appendDeleteLedgerFuture =
+                    appendPendingDeleteLedger(deletableLedgers, deletableOffloadedLedgers);
+            appendDeleteLedgerFuture.thenAccept(ignore -> {
+                believedDeleteIds.addAll(deletableLedgers);

Review Comment:
   If the `ledgerDeletionService` not enabled, the `believedDeleteIds` set will keep increasing and doesn't have chance to delete items.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedLedgerDeletionService.java:
##########
@@ -0,0 +1,583 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import static org.apache.bookkeeper.mledger.util.Errors.isNoSuchLedgerExistsException;
+import com.google.common.collect.Maps;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.mledger.LedgerOffloader;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.deletion.LedgerComponent;
+import org.apache.bookkeeper.mledger.deletion.LedgerDeletionService;
+import org.apache.bookkeeper.mledger.deletion.LedgerType;
+import org.apache.bookkeeper.mledger.deletion.PendingDeleteLedgerInfo;
+import org.apache.bookkeeper.mledger.deletion.PendingDeleteLedgerInvalidException;
+import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.offload.OffloadUtils;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
+import org.apache.bookkeeper.stats.NullStatsProvider;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stats.StatsProvider;
+import org.apache.bookkeeper.util.MathUtils;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.stats.prometheus.metrics.PrometheusMetricsProvider;
+import org.apache.pulsar.broker.systopic.LedgerDeletionSystemTopicClient;
+import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
+import org.apache.pulsar.broker.systopic.SystemTopicClient;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.common.events.EventType;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.SystemTopicNames;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
+import org.apache.pulsar.common.protocol.topic.DeleteLedgerPayload;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SystemTopicBasedLedgerDeletionService implements LedgerDeletionService {
+
+    private final NamespaceEventsSystemTopicFactory namespaceEventsSystemTopicFactory;
+
+    private static final Logger log = LoggerFactory.getLogger(SystemTopicBasedLedgerDeletionService.class);
+
+    private final PulsarAdmin pulsarAdmin;
+
+    private final int ledgerDeletionParallelism;
+
+    private final ServiceConfiguration serviceConfiguration;
+
+    private final Map<OffloadPoliciesImpl, LedgerOffloader> offloaderMap = new ConcurrentHashMap<>();
+
+    private StatsProvider statsProvider = new NullStatsProvider();
+
+    private OpStatsLogger deleteLedgerOpLogger;
+
+    private OpStatsLogger deleteOffloadLedgerOpLogger;
+
+    private final BookKeeper bookKeeper;
+
+    private final PulsarService pulsarService;
+
+    private SystemTopicClient<PendingDeleteLedgerInfo> ledgerDeletionTopicClient;
+
+    private transient CompletableFuture<SystemTopicClient.Reader<PendingDeleteLedgerInfo>> readerFuture;
+
+    private transient CompletableFuture<SystemTopicClient.Writer<PendingDeleteLedgerInfo>> writerFuture;
+
+    public SystemTopicBasedLedgerDeletionService(PulsarService pulsarService, ServiceConfiguration serviceConfiguration)
+            throws PulsarServerException {
+        this.namespaceEventsSystemTopicFactory = new NamespaceEventsSystemTopicFactory(pulsarService.getClient());
+        this.pulsarService = pulsarService;
+        this.pulsarAdmin = pulsarService.getAdminClient();
+        this.bookKeeper = pulsarService.getBookKeeperClient();
+        this.serviceConfiguration = serviceConfiguration;
+        this.ledgerDeletionParallelism = serviceConfiguration.getLedgerDeletionParallelismOfTopicTwoPhaseDeletion();
+    }
+
+    private SystemTopicClient<PendingDeleteLedgerInfo> getLedgerDeletionTopicClient() throws PulsarClientException {
+        TopicName ledgerDeletionSystemTopic =
+                NamespaceEventsSystemTopicFactory.getSystemTopicName(NamespaceName.SYSTEM_NAMESPACE,
+                        EventType.LEDGER_DELETION);
+        if (ledgerDeletionSystemTopic == null) {
+            throw new PulsarClientException.InvalidTopicNameException(
+                    "Can't create SystemTopicBasedLedgerDeletionService, " + "because the topicName is null!");
+        }
+        return namespaceEventsSystemTopicFactory.createLedgerDeletionSystemTopicClient(ledgerDeletionSystemTopic,
+                serviceConfiguration.getSendDelayOfTopicTwoPhaseDeletionInSeconds(),
+                serviceConfiguration.getReconsumeLaterOfTopicTwoPhaseDeletionInSeconds());
+    }
+
+    @Override
+    public void start() throws PulsarClientException, PulsarAdminException {
+        this.ledgerDeletionTopicClient = getLedgerDeletionTopicClient();
+        initStatsLogger();
+        initLedgerDeletionSystemTopic();
+    }
+
+    private void initStatsLogger() {
+        Configuration configuration = new ClientConfiguration();
+        if (serviceConfiguration.isBookkeeperClientExposeStatsToPrometheus()) {
+            configuration.addProperty(PrometheusMetricsProvider.PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS,
+                    serviceConfiguration.getManagedLedgerPrometheusStatsLatencyRolloverSeconds());
+            configuration.addProperty(PrometheusMetricsProvider.CLUSTER_NAME, serviceConfiguration.getClusterName());
+            this.statsProvider = new PrometheusMetricsProvider();
+        }
+        this.statsProvider.start(configuration);
+        StatsLogger statsLogger = statsProvider.getStatsLogger("pulsar_ledger_deletion");
+        this.deleteLedgerOpLogger = statsLogger.getOpStatsLogger("delete_ledger");
+        this.deleteOffloadLedgerOpLogger = statsLogger.getOpStatsLogger("delete_offload_ledger");
+    }
+
+    private CompletableFuture<Void> initSystemTopic(String topicName, int partition) {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        doCreateSystemTopic(future, topicName, partition);
+        return future;
+    }
+
+    private void doCreateSystemTopic(CompletableFuture<Void> future, String topicName, int partition) {
+        this.pulsarAdmin.topics()
+                .createPartitionedTopicAsync(topicName, partition).whenComplete((res, e) -> {
+                    if (e != null && !(e instanceof PulsarAdminException.ConflictException)) {
+                        log.error("Initial system topic " + topicName + "failed.", e);
+                        doCreateSystemTopic(future, topicName, partition);
+                        return;
+                    }
+                    future.complete(null);
+                });
+    }
+
+    private void initLedgerDeletionSystemTopic() {
+        List<CompletableFuture<Void>> futures = new ArrayList<>();
+
+        futures.add(initSystemTopic(SystemTopicNames.LEDGER_DELETION_TOPIC.getPartitionedTopicName(),
+                this.ledgerDeletionParallelism));
+        futures.add(initSystemTopic(SystemTopicNames.LEDGER_DELETION_RETRY_TOPIC.getPartitionedTopicName(), 1));
+        futures.add(initSystemTopic(SystemTopicNames.LEDGER_DELETION_DLQ_TOPIC.getPartitionedTopicName(), 1));
+
+        FutureUtil.waitForAll(futures).whenComplete((res, e) -> {
+            initReaderFuture();
+            initWriterFuture();
+        });
+    }
+
+    private void readMorePendingDeleteLedger(LedgerDeletionSystemTopicClient.PendingDeleteLedgerReader reader) {
+        reader.readNextAsync().whenComplete((msg, ex) -> {
+            if (ex == null) {
+                handleMessage(reader, msg).exceptionally(e -> {
+                    log.warn("Delete ledger {} failed.", msg.getValue(), e);
+                    return null;
+                });
+                readMorePendingDeleteLedger(reader);
+            } else {
+                Throwable cause = FutureUtil.unwrapCompletionException(ex);
+                if (cause instanceof PulsarClientException.AlreadyClosedException) {
+                    log.error("Read more pending delete ledger exception, close the read now!", ex);
+                    reader.closeAsync();
+                    initReaderFuture();
+                } else {
+                    log.warn("Read more pending delete ledger exception, read again.", ex);
+                    readMorePendingDeleteLedger(reader);
+                }
+            }
+        });
+    }
+
+    private void initReaderFuture() {
+        this.readerFuture = ledgerDeletionTopicClient.newReaderAsync().whenComplete((reader, ex) -> {
+            if (ex != null) {
+                log.error("Failed to create reader on ledger deletion system topic", ex);
+                initReaderFuture();
+            } else {
+                readMorePendingDeleteLedger((LedgerDeletionSystemTopicClient.PendingDeleteLedgerReader) reader);
+            }
+        });
+    }
+
+    private void initWriterFuture() {
+        this.writerFuture = ledgerDeletionTopicClient.newWriterAsync().whenComplete((writer, ex) -> {
+            if (ex != null) {
+                log.error("Failed to create writer on ledger deletion system topic", ex);
+                initWriterFuture();
+            }
+        });
+    }
+
+    private String tuneTopicName(String topicName) {
+        if (topicName.contains("/" + TopicDomain.persistent.value())) {
+            return topicName.replaceFirst("/" + TopicDomain.persistent.value(), "");
+        }
+        return topicName;
+    }
+
+    @Override
+    public CompletableFuture<?> appendPendingDeleteLedger(String topicName, long ledgerId, LedgerInfo context,
+                                                          LedgerComponent component, LedgerType type,
+                                                          Map<String, String> properties) {
+        topicName = tuneTopicName(topicName);
+        PendingDeleteLedgerInfo pendingDeleteLedger = null;
+        if (LedgerType.LEDGER == type) {
+            pendingDeleteLedger =
+                    new PendingDeleteLedgerInfo(topicName, component, type, ledgerId, context, properties);
+        } else if (LedgerType.OFFLOAD_LEDGER == type) {
+            if (!context.getOffloadContext().hasUidMsb()) {
+                CompletableFuture<?> future = new CompletableFuture<>();
+                future.completeExceptionally(
+                        new IllegalArgumentException("The ledger " + ledgerId + " didn't offload."));
+                return future;
+            }
+            pendingDeleteLedger =
+                    new PendingDeleteLedgerInfo(topicName, component, type, ledgerId, context, properties);
+        }
+        return sendMessage(pendingDeleteLedger);
+    }
+
+    @Override
+    public void close() throws Exception {

Review Comment:
   We need to stop the `statsProvider` on close



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedLedgerDeletionService.java:
##########
@@ -0,0 +1,583 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import static org.apache.bookkeeper.mledger.util.Errors.isNoSuchLedgerExistsException;
+import com.google.common.collect.Maps;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.mledger.LedgerOffloader;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.deletion.LedgerComponent;
+import org.apache.bookkeeper.mledger.deletion.LedgerDeletionService;
+import org.apache.bookkeeper.mledger.deletion.LedgerType;
+import org.apache.bookkeeper.mledger.deletion.PendingDeleteLedgerInfo;
+import org.apache.bookkeeper.mledger.deletion.PendingDeleteLedgerInvalidException;
+import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.offload.OffloadUtils;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
+import org.apache.bookkeeper.stats.NullStatsProvider;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stats.StatsProvider;
+import org.apache.bookkeeper.util.MathUtils;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.stats.prometheus.metrics.PrometheusMetricsProvider;
+import org.apache.pulsar.broker.systopic.LedgerDeletionSystemTopicClient;
+import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
+import org.apache.pulsar.broker.systopic.SystemTopicClient;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.common.events.EventType;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.SystemTopicNames;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
+import org.apache.pulsar.common.protocol.topic.DeleteLedgerPayload;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SystemTopicBasedLedgerDeletionService implements LedgerDeletionService {
+
+    private final NamespaceEventsSystemTopicFactory namespaceEventsSystemTopicFactory;
+
+    private static final Logger log = LoggerFactory.getLogger(SystemTopicBasedLedgerDeletionService.class);
+
+    private final PulsarAdmin pulsarAdmin;
+
+    private final int ledgerDeletionParallelism;
+
+    private final ServiceConfiguration serviceConfiguration;
+
+    private final Map<OffloadPoliciesImpl, LedgerOffloader> offloaderMap = new ConcurrentHashMap<>();
+
+    private StatsProvider statsProvider = new NullStatsProvider();
+
+    private OpStatsLogger deleteLedgerOpLogger;
+
+    private OpStatsLogger deleteOffloadLedgerOpLogger;
+
+    private final BookKeeper bookKeeper;
+
+    private final PulsarService pulsarService;
+
+    private SystemTopicClient<PendingDeleteLedgerInfo> ledgerDeletionTopicClient;
+
+    private transient CompletableFuture<SystemTopicClient.Reader<PendingDeleteLedgerInfo>> readerFuture;
+
+    private transient CompletableFuture<SystemTopicClient.Writer<PendingDeleteLedgerInfo>> writerFuture;
+
+    public SystemTopicBasedLedgerDeletionService(PulsarService pulsarService, ServiceConfiguration serviceConfiguration)
+            throws PulsarServerException {
+        this.namespaceEventsSystemTopicFactory = new NamespaceEventsSystemTopicFactory(pulsarService.getClient());
+        this.pulsarService = pulsarService;
+        this.pulsarAdmin = pulsarService.getAdminClient();
+        this.bookKeeper = pulsarService.getBookKeeperClient();
+        this.serviceConfiguration = serviceConfiguration;
+        this.ledgerDeletionParallelism = serviceConfiguration.getLedgerDeletionParallelismOfTopicTwoPhaseDeletion();
+    }
+
+    private SystemTopicClient<PendingDeleteLedgerInfo> getLedgerDeletionTopicClient() throws PulsarClientException {
+        TopicName ledgerDeletionSystemTopic =
+                NamespaceEventsSystemTopicFactory.getSystemTopicName(NamespaceName.SYSTEM_NAMESPACE,
+                        EventType.LEDGER_DELETION);
+        if (ledgerDeletionSystemTopic == null) {
+            throw new PulsarClientException.InvalidTopicNameException(
+                    "Can't create SystemTopicBasedLedgerDeletionService, " + "because the topicName is null!");
+        }
+        return namespaceEventsSystemTopicFactory.createLedgerDeletionSystemTopicClient(ledgerDeletionSystemTopic,
+                serviceConfiguration.getSendDelayOfTopicTwoPhaseDeletionInSeconds(),
+                serviceConfiguration.getReconsumeLaterOfTopicTwoPhaseDeletionInSeconds());
+    }
+
+    @Override
+    public void start() throws PulsarClientException, PulsarAdminException {
+        this.ledgerDeletionTopicClient = getLedgerDeletionTopicClient();
+        initStatsLogger();
+        initLedgerDeletionSystemTopic();
+    }
+
+    private void initStatsLogger() {
+        Configuration configuration = new ClientConfiguration();
+        if (serviceConfiguration.isBookkeeperClientExposeStatsToPrometheus()) {
+            configuration.addProperty(PrometheusMetricsProvider.PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS,
+                    serviceConfiguration.getManagedLedgerPrometheusStatsLatencyRolloverSeconds());
+            configuration.addProperty(PrometheusMetricsProvider.CLUSTER_NAME, serviceConfiguration.getClusterName());
+            this.statsProvider = new PrometheusMetricsProvider();
+        }
+        this.statsProvider.start(configuration);
+        StatsLogger statsLogger = statsProvider.getStatsLogger("pulsar_ledger_deletion");
+        this.deleteLedgerOpLogger = statsLogger.getOpStatsLogger("delete_ledger");
+        this.deleteOffloadLedgerOpLogger = statsLogger.getOpStatsLogger("delete_offload_ledger");
+    }
+
+    private CompletableFuture<Void> initSystemTopic(String topicName, int partition) {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        doCreateSystemTopic(future, topicName, partition);
+        return future;
+    }
+
+    private void doCreateSystemTopic(CompletableFuture<Void> future, String topicName, int partition) {
+        this.pulsarAdmin.topics()
+                .createPartitionedTopicAsync(topicName, partition).whenComplete((res, e) -> {
+                    if (e != null && !(e instanceof PulsarAdminException.ConflictException)) {
+                        log.error("Initial system topic " + topicName + "failed.", e);
+                        doCreateSystemTopic(future, topicName, partition);
+                        return;
+                    }
+                    future.complete(null);
+                });
+    }
+
+    private void initLedgerDeletionSystemTopic() {
+        List<CompletableFuture<Void>> futures = new ArrayList<>();
+
+        futures.add(initSystemTopic(SystemTopicNames.LEDGER_DELETION_TOPIC.getPartitionedTopicName(),
+                this.ledgerDeletionParallelism));
+        futures.add(initSystemTopic(SystemTopicNames.LEDGER_DELETION_RETRY_TOPIC.getPartitionedTopicName(), 1));
+        futures.add(initSystemTopic(SystemTopicNames.LEDGER_DELETION_DLQ_TOPIC.getPartitionedTopicName(), 1));
+
+        FutureUtil.waitForAll(futures).whenComplete((res, e) -> {
+            initReaderFuture();
+            initWriterFuture();
+        });
+    }
+
+    private void readMorePendingDeleteLedger(LedgerDeletionSystemTopicClient.PendingDeleteLedgerReader reader) {
+        reader.readNextAsync().whenComplete((msg, ex) -> {
+            if (ex == null) {
+                handleMessage(reader, msg).exceptionally(e -> {
+                    log.warn("Delete ledger {} failed.", msg.getValue(), e);
+                    return null;
+                });
+                readMorePendingDeleteLedger(reader);
+            } else {
+                Throwable cause = FutureUtil.unwrapCompletionException(ex);
+                if (cause instanceof PulsarClientException.AlreadyClosedException) {
+                    log.error("Read more pending delete ledger exception, close the read now!", ex);
+                    reader.closeAsync();
+                    initReaderFuture();
+                } else {
+                    log.warn("Read more pending delete ledger exception, read again.", ex);
+                    readMorePendingDeleteLedger(reader);
+                }
+            }
+        });
+    }
+
+    private void initReaderFuture() {
+        this.readerFuture = ledgerDeletionTopicClient.newReaderAsync().whenComplete((reader, ex) -> {
+            if (ex != null) {
+                log.error("Failed to create reader on ledger deletion system topic", ex);
+                initReaderFuture();
+            } else {
+                readMorePendingDeleteLedger((LedgerDeletionSystemTopicClient.PendingDeleteLedgerReader) reader);
+            }
+        });
+    }
+
+    private void initWriterFuture() {
+        this.writerFuture = ledgerDeletionTopicClient.newWriterAsync().whenComplete((writer, ex) -> {
+            if (ex != null) {
+                log.error("Failed to create writer on ledger deletion system topic", ex);
+                initWriterFuture();
+            }
+        });
+    }
+
+    private String tuneTopicName(String topicName) {
+        if (topicName.contains("/" + TopicDomain.persistent.value())) {
+            return topicName.replaceFirst("/" + TopicDomain.persistent.value(), "");
+        }
+        return topicName;
+    }
+
+    @Override
+    public CompletableFuture<?> appendPendingDeleteLedger(String topicName, long ledgerId, LedgerInfo context,
+                                                          LedgerComponent component, LedgerType type,
+                                                          Map<String, String> properties) {
+        topicName = tuneTopicName(topicName);
+        PendingDeleteLedgerInfo pendingDeleteLedger = null;
+        if (LedgerType.LEDGER == type) {
+            pendingDeleteLedger =
+                    new PendingDeleteLedgerInfo(topicName, component, type, ledgerId, context, properties);
+        } else if (LedgerType.OFFLOAD_LEDGER == type) {
+            if (!context.getOffloadContext().hasUidMsb()) {
+                CompletableFuture<?> future = new CompletableFuture<>();
+                future.completeExceptionally(
+                        new IllegalArgumentException("The ledger " + ledgerId + " didn't offload."));
+                return future;
+            }
+            pendingDeleteLedger =
+                    new PendingDeleteLedgerInfo(topicName, component, type, ledgerId, context, properties);
+        }
+        return sendMessage(pendingDeleteLedger);
+    }
+
+    @Override
+    public void close() throws Exception {
+        asyncClose().get();
+    }
+
+    @Override
+    public CompletableFuture<?> asyncClose() {
+        if (readerFuture != null && !readerFuture.isCompletedExceptionally()) {
+            return readerFuture.thenCompose(SystemTopicClient.Reader::closeAsync);
+        }
+        return CompletableFuture.completedFuture(null);
+    }
+
+    @Override
+    public boolean isTopicTwoPhaseDeletionEnabled() {
+        return true;
+    }
+
+    private CompletableFuture<?> handleMessage(LedgerDeletionSystemTopicClient.PendingDeleteLedgerReader reader,
+                                               Message<PendingDeleteLedgerInfo> message) {
+        CompletableFuture<?> future = new CompletableFuture<>();
+        deleteInBroker(message).whenComplete((res, ex) -> {
+            if (ex != null) {
+                if (ex instanceof PulsarAdminException.NotFoundException) {
+                    deleteLocally(message).whenComplete((res1, ex1) -> {
+                        if (ex1 != null) {
+                            if (ex1 instanceof PendingDeleteLedgerInvalidException) {
+                                if (log.isDebugEnabled()) {
+                                    log.debug("Received invalid pending delete ledger {}, invalid reason: {}",
+                                            message.getValue().getLedgerId(), ex1.getMessage());
+                                }
+                                reader.ackMessageAsync(message);
+                                future.complete(null);
+                                return;
+                            }
+                            reader.reconsumeLaterAsync(message);
+                            future.completeExceptionally(ex1);
+                        }
+                        reader.ackMessageAsync(message);
+                        future.complete(null);
+                    });
+                } else if (ex instanceof PendingDeleteLedgerInvalidException) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Received invalid pending delete ledger {}, invalid reason: {}",
+                                message.getValue().getLedgerId(), ex.getMessage());
+                    }
+                    reader.ackMessageAsync(message);
+                    future.complete(null);
+                } else if (ex instanceof PulsarAdminException.ServerSideErrorException) {
+
+                } else {
+                    reader.reconsumeLaterAsync(message);
+                    future.completeExceptionally(ex);
+                }
+                return;
+            }
+            reader.ackMessageAsync(message);
+            future.complete(null);
+        });
+        return future;
+    }
+
+    private CompletableFuture<?> deleteInBroker(Message<PendingDeleteLedgerInfo> message) {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        try {
+            PendingDeleteLedgerInfo pendingDeleteLedger = message.getValue();
+            //Now support managed_ledger two phase deletion.
+            if (LedgerComponent.MANAGED_LEDGER == pendingDeleteLedger.getLedgerComponent()) {
+                Long ledgerId = pendingDeleteLedger.getLedgerId();
+                String topicName = pendingDeleteLedger.getTopicName();
+                String ledgerType = pendingDeleteLedger.getLedgerType().name();
+                String ledgerComponent = pendingDeleteLedger.getLedgerComponent().name();
+                DeleteLedgerPayload deleteLedgerPayload =
+                        new DeleteLedgerPayload(ledgerId, topicName, ledgerType, ledgerComponent);
+                if (LedgerType.OFFLOAD_LEDGER == pendingDeleteLedger.getLedgerType()) {
+                    deleteLedgerPayload.setOffloadContext(buildOffloadContext(pendingDeleteLedger));
+                }
+                pulsarAdmin.topics().deleteLedgerAsync(deleteLedgerPayload).whenComplete((res, ex) -> {
+                    if (ex != null) {
+                        future.completeExceptionally(ex);
+                        return;
+                    }
+                    future.complete(null);
+                });
+            } else if (LedgerComponent.MANAGED_CURSOR == pendingDeleteLedger.getLedgerComponent()) {
+                future.complete(null);

Review Comment:
   Please add `@TODO` flag here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org