You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2023/04/17 02:31:55 UTC

[pulsar] branch branch-3.0 updated (c8d2bba651b -> d05871213ad)

This is an automated email from the ASF dual-hosted git repository.

zhaocong pushed a change to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


    from c8d2bba651b [improve][broker] PIP-192 Fix getLastMessageId for compressed payload(And add compression and maxBatchSize for the load balance system topic) (#20087)
     new 0a3a02bb4ed [fix][broker] Fix avoid future of clear delayed message can't complete (#20075)
     new d05871213ad [fix][broker] Ensure previous delayed index be removed from snapshotSegmentLastIndexTable & Make load operate asynchronous (#20086)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../BucketDelayedDeliveryTrackerFactory.java       |  28 ++++++
 .../pulsar/broker/delayed/bucket/Bucket.java       |   3 +-
 .../bucket/BucketDelayedDeliveryTracker.java       | 110 ++++++++++++---------
 .../PersistentDispatcherMultipleConsumers.java     |  30 ++----
 .../broker/service/persistent/PersistentTopic.java |  26 ++---
 .../bucket/BucketDelayedDeliveryTrackerTest.java   |  30 ++++--
 .../persistent/BucketDelayedDeliveryTest.java      |  48 +++++++++
 7 files changed, 187 insertions(+), 88 deletions(-)


[pulsar] 01/02: [fix][broker] Fix avoid future of clear delayed message can't complete (#20075)

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhaocong pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 0a3a02bb4edb5d36ee3a09766c3c3d671a9044f3
Author: Cong Zhao <zh...@apache.org>
AuthorDate: Sat Apr 15 13:42:07 2023 +0800

    [fix][broker] Fix avoid future of clear delayed message can't complete (#20075)
    
    (cherry picked from commit 6152cc82ac8a344cfc2539c56f15cec6f4a1cff7)
---
 .../BucketDelayedDeliveryTrackerFactory.java       | 28 +++++++++++++
 .../pulsar/broker/delayed/bucket/Bucket.java       |  3 +-
 .../bucket/BucketDelayedDeliveryTracker.java       |  4 +-
 .../PersistentDispatcherMultipleConsumers.java     | 17 ++++----
 .../broker/service/persistent/PersistentTopic.java | 26 +++++++-----
 .../persistent/BucketDelayedDeliveryTest.java      | 48 ++++++++++++++++++++++
 6 files changed, 102 insertions(+), 24 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java
index f0feb8b27d6..6a00bfd1995 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java
@@ -21,13 +21,19 @@ package org.apache.pulsar.broker.delayed;
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.Timer;
 import io.netty.util.concurrent.DefaultThreadFactory;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.delayed.bucket.BookkeeperBucketSnapshotStorage;
 import org.apache.pulsar.broker.delayed.bucket.BucketDelayedDeliveryTracker;
 import org.apache.pulsar.broker.delayed.bucket.BucketSnapshotStorage;
 import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
+import org.apache.pulsar.common.util.FutureUtil;
 
 public class BucketDelayedDeliveryTrackerFactory implements DelayedDeliveryTrackerFactory {
 
@@ -72,6 +78,28 @@ public class BucketDelayedDeliveryTrackerFactory implements DelayedDeliveryTrack
                 delayedDeliveryMaxIndexesPerBucketSnapshotSegment, delayedDeliveryMaxNumBuckets);
     }
 
+    /**
+     * Clean up residual snapshot data.
+     * If tracker has not been created or has been closed, then we can't clean up the snapshot with `tracker.clear`,
+     * this method can clean up the residual snapshots without creating a tracker.
+     */
+    public CompletableFuture<Void> cleanResidualSnapshots(ManagedCursor cursor) {
+        Map<String, String> cursorProperties = cursor.getCursorProperties();
+        List<CompletableFuture<Void>> futures = new ArrayList<>();
+        FutureUtil.Sequencer<Void> sequencer = FutureUtil.Sequencer.create();
+        cursorProperties.forEach((k, v) -> {
+            if (k != null && v != null && k.startsWith(BucketDelayedDeliveryTracker.DELAYED_BUCKET_KEY_PREFIX)) {
+                CompletableFuture<Void> future = sequencer.sequential(() -> {
+                    return cursor.removeCursorProperty(k)
+                            .thenCompose(__ -> bucketSnapshotStorage.deleteBucketSnapshot(Long.parseLong(v)));
+                });
+                futures.add(future);
+            }
+        });
+
+        return FutureUtil.waitForAll(futures);
+    }
+
     @Override
     public void close() throws Exception {
         if (bucketSnapshotStorage != null) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java
index 5b7023be503..4d7d3aa512b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java
@@ -18,8 +18,8 @@
  */
 package org.apache.pulsar.broker.delayed.bucket;
 
-import static org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.CURSOR_INTERNAL_PROPERTY_PREFIX;
 import static org.apache.bookkeeper.mledger.util.Futures.executeWithRetry;
+import static org.apache.pulsar.broker.delayed.bucket.BucketDelayedDeliveryTracker.DELAYED_BUCKET_KEY_PREFIX;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -41,7 +41,6 @@ import org.roaringbitmap.RoaringBitmap;
 @AllArgsConstructor
 abstract class Bucket {
 
-    static final String DELAYED_BUCKET_KEY_PREFIX = CURSOR_INTERNAL_PROPERTY_PREFIX + "delayed.bucket";
     static final String DELIMITER = "_";
     static final int MaxRetryTimes = 3;
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
index a34bd51af98..6ead1e207b0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
@@ -19,7 +19,7 @@
 package org.apache.pulsar.broker.delayed.bucket;
 
 import static com.google.common.base.Preconditions.checkArgument;
-import static org.apache.pulsar.broker.delayed.bucket.Bucket.DELAYED_BUCKET_KEY_PREFIX;
+import static org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.CURSOR_INTERNAL_PROPERTY_PREFIX;
 import static org.apache.pulsar.broker.delayed.bucket.Bucket.DELIMITER;
 import static org.apache.pulsar.broker.delayed.bucket.Bucket.MaxRetryTimes;
 import com.google.common.annotations.VisibleForTesting;
@@ -66,6 +66,8 @@ import org.roaringbitmap.RoaringBitmap;
 @ThreadSafe
 public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker {
 
+    public static final String DELAYED_BUCKET_KEY_PREFIX = CURSOR_INTERNAL_PROPERTY_PREFIX + "delayed.bucket";
+
     static final CompletableFuture<Long> NULL_LONG_PROMISE = CompletableFuture.completedFuture(null);
 
     static final int AsyncOperationTimeoutSeconds = 60;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 4ac755860fc..c60b4562bf1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -49,6 +49,7 @@ import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.delayed.AbstractDelayedDeliveryTracker;
 import org.apache.pulsar.broker.delayed.BucketDelayedDeliveryTrackerFactory;
 import org.apache.pulsar.broker.delayed.DelayedDeliveryTracker;
+import org.apache.pulsar.broker.delayed.DelayedDeliveryTrackerFactory;
 import org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTracker;
 import org.apache.pulsar.broker.delayed.bucket.BucketDelayedDeliveryTracker;
 import org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers;
@@ -1098,19 +1099,15 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
             return CompletableFuture.completedFuture(null);
         }
 
-        if (delayedDeliveryTracker.isEmpty() && topic.getBrokerService()
-                .getDelayedDeliveryTrackerFactory() instanceof BucketDelayedDeliveryTrackerFactory) {
-            synchronized (this) {
-                if (delayedDeliveryTracker.isEmpty()) {
-                    delayedDeliveryTracker = Optional
-                            .of(topic.getBrokerService().getDelayedDeliveryTrackerFactory().newTracker(this));
-                }
-            }
-        }
-
         if (delayedDeliveryTracker.isPresent()) {
             return this.delayedDeliveryTracker.get().clear();
         } else {
+            DelayedDeliveryTrackerFactory delayedDeliveryTrackerFactory =
+                    topic.getBrokerService().getDelayedDeliveryTrackerFactory();
+            if (delayedDeliveryTrackerFactory instanceof BucketDelayedDeliveryTrackerFactory
+                    bucketDelayedDeliveryTrackerFactory) {
+                return bucketDelayedDeliveryTrackerFactory.cleanResidualSnapshots(cursor);
+            }
             return CompletableFuture.completedFuture(null);
         }
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 0f5e6043981..fe181bb1c01 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -81,6 +81,7 @@ import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.delayed.BucketDelayedDeliveryTrackerFactory;
+import org.apache.pulsar.broker.delayed.DelayedDeliveryTrackerFactory;
 import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
 import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateCompactionStrategy;
 import org.apache.pulsar.broker.namespace.NamespaceService;
@@ -1169,15 +1170,21 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         }
 
         Dispatcher dispatcher = persistentSubscription.getDispatcher();
-        final Dispatcher temporaryDispatcher;
         if (dispatcher == null) {
-            log.info("[{}][{}] Dispatcher is null, try to create temporary dispatcher to clear delayed message", topic,
-                    subscriptionName);
-            dispatcher = temporaryDispatcher =
-                    new PersistentDispatcherMultipleConsumers(this, persistentSubscription.cursor,
-                            persistentSubscription);
-        } else {
-            temporaryDispatcher = null;
+            DelayedDeliveryTrackerFactory delayedDeliveryTrackerFactory =
+                    brokerService.getDelayedDeliveryTrackerFactory();
+            if (delayedDeliveryTrackerFactory instanceof BucketDelayedDeliveryTrackerFactory
+                    bucketDelayedDeliveryTrackerFactory) {
+                ManagedCursor cursor = persistentSubscription.getCursor();
+                bucketDelayedDeliveryTrackerFactory.cleanResidualSnapshots(cursor).whenComplete((__, ex) -> {
+                    if (ex != null) {
+                        unsubscribeFuture.completeExceptionally(ex);
+                    } else {
+                        asyncDeleteCursor(subscriptionName, unsubscribeFuture);
+                    }
+                });
+            }
+            return;
         }
 
         dispatcher.clearDelayedMessages().whenComplete((__, ex) -> {
@@ -1186,9 +1193,6 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
             } else {
                 asyncDeleteCursor(subscriptionName, unsubscribeFuture);
             }
-            if (temporaryDispatcher != null) {
-                temporaryDispatcher.close();
-            }
         });
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BucketDelayedDeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BucketDelayedDeliveryTest.java
index 5480a2e7a70..0a82b2b4c3c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BucketDelayedDeliveryTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BucketDelayedDeliveryTest.java
@@ -305,4 +305,52 @@ public class BucketDelayedDeliveryTest extends DelayedDeliveryTest {
         assertTrue(namespaceMetric.isPresent());
         assertEquals(6, namespaceMetric.get().value);
     }
+
+    @Test
+    public void testDelete() throws Exception {
+        String topic = BrokerTestUtil.newUniqueName("persistent://public/default/testDelete");
+
+        @Cleanup
+        Consumer<String> c1 = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName("sub")
+                .subscriptionType(SubscriptionType.Shared)
+                .subscribe();
+
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .create();
+
+        for (int i = 0; i < 1000; i++) {
+            producer.newMessage()
+                    .value("msg")
+                    .deliverAfter(1, TimeUnit.HOURS)
+                    .send();
+        }
+
+        Dispatcher dispatcher = pulsar.getBrokerService().getTopicReference(topic).get().getSubscription("sub").getDispatcher();
+        Awaitility.await().untilAsserted(() -> Assert.assertEquals(dispatcher.getNumberOfDelayedMessages(), 1000));
+
+        Map<String, String> cursorProperties =
+                ((PersistentDispatcherMultipleConsumers) dispatcher).getCursor().getCursorProperties();
+        List<Long> bucketIds = cursorProperties.entrySet().stream()
+                .filter(x -> x.getKey().startsWith(CURSOR_INTERNAL_PROPERTY_PREFIX + "delayed.bucket")).map(
+                        x -> Long.valueOf(x.getValue())).toList();
+
+        assertTrue(bucketIds.size() > 0);
+
+        admin.topics().delete(topic, true);
+
+        for (Long bucketId : bucketIds) {
+            try {
+                LedgerHandle ledgerHandle =
+                        pulsarTestContext.getBookKeeperClient()
+                                .openLedger(bucketId, BookKeeper.DigestType.CRC32C, new byte[]{});
+                Assert.fail("Should fail");
+            } catch (BKException.BKNoSuchLedgerExistsException e) {
+                // ignore it
+            }
+        }
+    }
 }


[pulsar] 02/02: [fix][broker] Ensure previous delayed index be removed from snapshotSegmentLastIndexTable & Make load operate asynchronous (#20086)

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhaocong pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit d05871213adc351d4c718c2a6fb0909b01d279ff
Author: Cong Zhao <zh...@apache.org>
AuthorDate: Sun Apr 16 16:05:44 2023 +0800

    [fix][broker] Ensure previous delayed index be removed from snapshotSegmentLastIndexTable & Make load operate asynchronous (#20086)
    
    (cherry picked from commit c4aec6661e795c46181dc1fa79282065fa875768)
---
 .../bucket/BucketDelayedDeliveryTracker.java       | 106 ++++++++++++---------
 .../PersistentDispatcherMultipleConsumers.java     |  13 +--
 .../bucket/BucketDelayedDeliveryTrackerTest.java   |  30 ++++--
 3 files changed, 85 insertions(+), 64 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
index 6ead1e207b0..6678c6df254 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.broker.delayed.bucket;
 import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.CURSOR_INTERNAL_PROPERTY_PREFIX;
 import static org.apache.pulsar.broker.delayed.bucket.Bucket.DELIMITER;
-import static org.apache.pulsar.broker.delayed.bucket.Bucket.MaxRetryTimes;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.HashBasedTable;
 import com.google.common.collect.Range;
@@ -84,7 +83,7 @@ public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker
 
     private final int maxNumBuckets;
 
-    private long numberDelayedMessages;
+    private volatile long numberDelayedMessages;
 
     @Getter
     @VisibleForTesting
@@ -102,6 +101,8 @@ public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker
 
     private final BucketDelayedMessageIndexStats stats;
 
+    private CompletableFuture<Void> pendingLoad = null;
+
     public BucketDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher,
                                  Timer timer, long tickTimeMillis,
                                  boolean isDelayedDeliveryDeliverAtTimeStrict,
@@ -269,7 +270,7 @@ public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker
                     if (ex == null) {
                         immutableBucket.setSnapshotSegments(null);
                         immutableBucket.asyncUpdateSnapshotLength();
-                        log.info("[{}] Creat bucket snapshot finish, bucketKey: {}", dispatcher.getName(),
+                        log.info("[{}] Create bucket snapshot finish, bucketKey: {}", dispatcher.getName(),
                                 immutableBucket.bucketKey());
 
                         stats.recordSuccessEvent(BucketDelayedMessageIndexStats.Type.create,
@@ -529,17 +530,25 @@ public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker
     }
 
     @Override
-    public synchronized long getNumberOfDelayedMessages() {
+    public long getNumberOfDelayedMessages() {
         return numberDelayedMessages;
     }
 
     @Override
-    public synchronized long getBufferMemoryUsage() {
+    public long getBufferMemoryUsage() {
         return this.lastMutableBucket.getBufferMemoryUsage() + sharedBucketPriorityQueue.bytesCapacity();
     }
 
     @Override
     public synchronized NavigableSet<PositionImpl> getScheduledMessages(int maxMessages) {
+        if (!checkPendingOpDone()) {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Skip getScheduledMessages to wait for bucket snapshot load finish.",
+                        dispatcher.getName());
+            }
+            return Collections.emptyNavigableSet();
+        }
+
         long cutoffTime = getCutoffTime();
 
         lastMutableBucket.moveScheduledMessageToSharedQueue(cutoffTime, sharedBucketPriorityQueue);
@@ -558,6 +567,7 @@ public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker
 
             ImmutableBucket bucket = snapshotSegmentLastIndexTable.get(ledgerId, entryId);
             if (bucket != null && immutableBuckets.asMapOfRanges().containsValue(bucket)) {
+                // All message of current snapshot segment are scheduled, try load next snapshot segment
                 if (bucket.merging) {
                     log.info("[{}] Skip load to wait for bucket snapshot merge finish, bucketKey:{}",
                             dispatcher.getName(), bucket.bucketKey());
@@ -569,26 +579,19 @@ public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker
                     log.debug("[{}] Loading next bucket snapshot segment, bucketKey: {}, nextSegmentEntryId: {}",
                             dispatcher.getName(), bucket.bucketKey(), preSegmentEntryId + 1);
                 }
-                // All message of current snapshot segment are scheduled, load next snapshot segment
-                // TODO make it asynchronous and not blocking this process
-                try {
-                    boolean createFutureDone = bucket.getSnapshotCreateFuture().orElse(NULL_LONG_PROMISE).isDone();
-
-                    if (!createFutureDone) {
-                        log.info("[{}] Skip load to wait for bucket snapshot create finish, bucketKey:{}",
-                                dispatcher.getName(), bucket.bucketKey());
-                        break;
-                    }
-
-                    if (bucket.currentSegmentEntryId == bucket.lastSegmentEntryId) {
-                        immutableBuckets.asMapOfRanges().remove(Range.closed(bucket.startLedgerId, bucket.endLedgerId));
-                        bucket.asyncDeleteBucketSnapshot(stats);
-                        continue;
-                    }
+                boolean createFutureDone = bucket.getSnapshotCreateFuture().orElse(NULL_LONG_PROMISE).isDone();
+                if (!createFutureDone) {
+                    log.info("[{}] Skip load to wait for bucket snapshot create finish, bucketKey:{}",
+                            dispatcher.getName(), bucket.bucketKey());
+                    break;
+                }
 
-                    long loadStartTime = System.currentTimeMillis();
-                    stats.recordTriggerEvent(BucketDelayedMessageIndexStats.Type.load);
-                    bucket.asyncLoadNextBucketSnapshotEntry().thenAccept(indexList -> {
+                long loadStartTime = System.currentTimeMillis();
+                stats.recordTriggerEvent(BucketDelayedMessageIndexStats.Type.load);
+                CompletableFuture<Void> loadFuture = pendingLoad = bucket.asyncLoadNextBucketSnapshotEntry()
+                        .thenAccept(indexList -> {
+                    synchronized (BucketDelayedDeliveryTracker.this) {
+                        this.snapshotSegmentLastIndexTable.remove(ledgerId, entryId);
                         if (CollectionUtils.isEmpty(indexList)) {
                             immutableBuckets.asMapOfRanges()
                                     .remove(Range.closed(bucket.startLedgerId, bucket.endLedgerId));
@@ -603,31 +606,36 @@ public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker
                             sharedBucketPriorityQueue.add(index.getTimestamp(), index.getLedgerId(),
                                     index.getEntryId());
                         }
-                    }).whenComplete((__, ex) -> {
-                        if (ex != null) {
-                            // Back bucket state
-                            bucket.setCurrentSegmentEntryId(preSegmentEntryId);
-
-                            log.error("[{}] Failed to load bucket snapshot segment, bucketKey: {}, segmentEntryId: {}",
-                                    dispatcher.getName(), bucket.bucketKey(), preSegmentEntryId + 1, ex);
-
-                            stats.recordFailEvent(BucketDelayedMessageIndexStats.Type.load);
-                        } else {
-                            log.info("[{}] Load next bucket snapshot segment finish, bucketKey: {}, segmentEntryId: {}",
-                                    dispatcher.getName(), bucket.bucketKey(), preSegmentEntryId + 1);
-
-                            stats.recordSuccessEvent(BucketDelayedMessageIndexStats.Type.load,
-                                    System.currentTimeMillis() - loadStartTime);
+                    }
+                }).whenComplete((__, ex) -> {
+                    if (ex != null) {
+                        // Back bucket state
+                        bucket.setCurrentSegmentEntryId(preSegmentEntryId);
+
+                        log.error("[{}] Failed to load bucket snapshot segment, bucketKey: {}, segmentEntryId: {}",
+                                dispatcher.getName(), bucket.bucketKey(), preSegmentEntryId + 1, ex);
+
+                        stats.recordFailEvent(BucketDelayedMessageIndexStats.Type.load);
+                    } else {
+                        log.info("[{}] Load next bucket snapshot segment finish, bucketKey: {}, segmentEntryId: {}",
+                                dispatcher.getName(), bucket.bucketKey(),
+                                (preSegmentEntryId == bucket.lastSegmentEntryId) ? "-1" : preSegmentEntryId + 1);
+
+                        stats.recordSuccessEvent(BucketDelayedMessageIndexStats.Type.load,
+                                System.currentTimeMillis() - loadStartTime);
+                    }
+                    synchronized (this) {
+                        if (timeout != null) {
+                            timeout.cancel();
                         }
-                    }).get(AsyncOperationTimeoutSeconds * (MaxRetryTimes + 1), TimeUnit.SECONDS);
-                } catch (Exception e) {
-                    // Ignore exception to reload this segment on the next schedule.
-                    log.error("[{}] An exception occurs when load next bucket snapshot, bucketKey:{}",
-                            dispatcher.getName(), bucket.bucketKey(), e);
+                        timeout = timer.newTimeout(this, tickTimeMillis, TimeUnit.MILLISECONDS);
+                    }
+                });
+
+                if (!checkPendingOpDone() || loadFuture.isCompletedExceptionally()) {
                     break;
                 }
             }
-            snapshotSegmentLastIndexTable.remove(ledgerId, entryId);
 
             positions.add(new PositionImpl(ledgerId, entryId));
 
@@ -643,6 +651,14 @@ public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker
         return positions;
     }
 
+    private synchronized boolean checkPendingOpDone() {
+        if (pendingLoad == null || pendingLoad.isDone()) {
+            pendingLoad = null;
+            return true;
+        }
+        return false;
+    }
+
     @Override
     public boolean shouldPauseAllDeliveries() {
         return false;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index c60b4562bf1..81adda053e8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -46,7 +46,6 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsExcep
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.commons.lang3.tuple.Pair;
-import org.apache.pulsar.broker.delayed.AbstractDelayedDeliveryTracker;
 import org.apache.pulsar.broker.delayed.BucketDelayedDeliveryTrackerFactory;
 import org.apache.pulsar.broker.delayed.DelayedDeliveryTracker;
 import org.apache.pulsar.broker.delayed.DelayedDeliveryTrackerFactory;
@@ -1089,7 +1088,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
     }
 
     @Override
-    public synchronized long getNumberOfDelayedMessages() {
+    public long getNumberOfDelayedMessages() {
         return delayedDeliveryTracker.map(DelayedDeliveryTracker::getNumberOfDelayedMessages).orElse(0L);
     }
 
@@ -1169,15 +1168,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
 
 
     public long getDelayedTrackerMemoryUsage() {
-        if (delayedDeliveryTracker.isEmpty()) {
-            return 0;
-        }
-
-        if (delayedDeliveryTracker.get() instanceof AbstractDelayedDeliveryTracker) {
-            return delayedDeliveryTracker.get().getBufferMemoryUsage();
-        }
-
-        return 0;
+        return delayedDeliveryTracker.map(DelayedDeliveryTracker::getBufferMemoryUsage).orElse(0L);
     }
 
     public Map<String, TopicMetricBean> getBucketDelayedIndexStats() {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java
index 95234d688f6..39b3992fbd1 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java
@@ -39,6 +39,7 @@ import java.util.NavigableMap;
 import java.util.NavigableSet;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.TreeSet;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -197,9 +198,11 @@ public class BucketDelayedDeliveryTrackerTest extends AbstractDeliveryTrackerTes
         });
 
         assertTrue(tracker.hasMessageAvailable());
-        Set<PositionImpl> scheduledMessages = tracker.getScheduledMessages(100);
-
-        assertEquals(scheduledMessages.size(), 1);
+        Set<PositionImpl> scheduledMessages = new TreeSet<>();
+        Awaitility.await().untilAsserted(() -> {
+            scheduledMessages.addAll(tracker.getScheduledMessages(100));
+            assertEquals(scheduledMessages.size(), 1);
+        });
 
         tracker.addMessage(101, 101, 101 * 10);
 
@@ -216,12 +219,15 @@ public class BucketDelayedDeliveryTrackerTest extends AbstractDeliveryTrackerTes
         clockTime.set(100 * 10);
 
         assertTrue(tracker2.hasMessageAvailable());
-        scheduledMessages = tracker2.getScheduledMessages(70);
+        Set<PositionImpl> scheduledMessages2 = new TreeSet<>();
 
-        assertEquals(scheduledMessages.size(), 70);
+        Awaitility.await().untilAsserted(() -> {
+            scheduledMessages2.addAll(tracker2.getScheduledMessages(70));
+            assertEquals(scheduledMessages2.size(), 70);
+        });
 
         int i = 31;
-        for (PositionImpl scheduledMessage : scheduledMessages) {
+        for (PositionImpl scheduledMessage : scheduledMessages2) {
             assertEquals(scheduledMessage, PositionImpl.get(i, i));
             i++;
         }
@@ -298,7 +304,11 @@ public class BucketDelayedDeliveryTrackerTest extends AbstractDeliveryTrackerTes
 
         clockTime.set(110 * 10);
 
-        NavigableSet<PositionImpl> scheduledMessages = tracker2.getScheduledMessages(110);
+        NavigableSet<PositionImpl> scheduledMessages = new TreeSet<>();
+        Awaitility.await().untilAsserted(() -> {
+            scheduledMessages.addAll(tracker2.getScheduledMessages(110));
+            assertEquals(scheduledMessages.size(), 110);
+        });
         for (int i = 1; i <= 110; i++) {
             PositionImpl position = scheduledMessages.pollFirst();
             assertEquals(position, PositionImpl.get(i, i));
@@ -370,7 +380,11 @@ public class BucketDelayedDeliveryTrackerTest extends AbstractDeliveryTrackerTes
 
         assertEquals(tracker2.getScheduledMessages(100).size(), 0);
 
-        assertEquals(tracker2.getScheduledMessages(100).size(), delayedMessagesInSnapshotValue);
+        Set<PositionImpl> scheduledMessages = new TreeSet<>();
+        Awaitility.await().untilAsserted(() -> {
+            scheduledMessages.addAll(tracker2.getScheduledMessages(100));
+            assertEquals(scheduledMessages.size(), delayedMessagesInSnapshotValue);
+        });
 
         assertTrue(mockBucketSnapshotStorage.createExceptionQueue.isEmpty());
         assertTrue(mockBucketSnapshotStorage.getMetaDataExceptionQueue.isEmpty());