You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by yu...@apache.org on 2023/05/07 23:07:35 UTC

[pulsar] 03/03: [improve][broker] Optimization protobuf code in the bucket delayed tracker (#20158)

This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 0743c774fd6621a8ddcdf50b7f28dc8f68a6e492
Author: Cong Zhao <zh...@apache.org>
AuthorDate: Fri Apr 21 09:51:55 2023 +0800

    [improve][broker] Optimization protobuf code in the bucket delayed tracker (#20158)
    
    (cherry picked from commit a20d5e96acc91f8099845e8924e84cb0b5632f3f)
---
 pulsar-broker/pom.xml                              |  2 +
 .../bucket/BookkeeperBucketSnapshotStorage.java    | 24 +++---
 .../pulsar/broker/delayed/bucket/Bucket.java       |  7 +-
 .../bucket/BucketDelayedDeliveryTracker.java       | 13 ++-
 .../delayed/bucket/BucketSnapshotStorage.java      |  4 +-
 .../bucket/CombinedSegmentDelayedIndexQueue.java   | 22 +++--
 .../broker/delayed/bucket/DelayedIndexQueue.java   | 12 ++-
 .../broker/delayed/bucket/ImmutableBucket.java     | 19 ++---
 .../broker/delayed/bucket/MutableBucket.java       | 29 ++++---
 .../TripleLongPriorityDelayedIndexQueue.java       | 25 ++++--
 ...oto => DelayedMessageIndexBucketMetadata.proto} | 11 +--
 ...roto => DelayedMessageIndexBucketSegment.proto} | 12 +--
 .../BookkeeperBucketSnapshotStorageTest.java       | 95 +++++++++++-----------
 .../broker/delayed/MockBucketSnapshotStorage.java  | 14 ++--
 .../delayed/bucket/DelayedIndexQueueTest.java      | 70 +++++++---------
 15 files changed, 177 insertions(+), 182 deletions(-)

diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml
index 31b335e1aea..25b31b4f2b7 100644
--- a/pulsar-broker/pom.xml
+++ b/pulsar-broker/pom.xml
@@ -566,6 +566,7 @@
           <excludes>
             <exclude>**/ResourceUsage.proto</exclude>
             <exclude>**/TransactionPendingAck.proto</exclude>
+            <exclude>**/DelayedMessageIndexBucketSegment.proto</exclude>
           </excludes>
         </configuration>
         <executions>
@@ -610,6 +611,7 @@
           <sources>
             <source>${project.basedir}/src/main/proto/TransactionPendingAck.proto</source>
             <source>${project.basedir}/src/main/proto/ResourceUsage.proto</source>
+            <source>${project.basedir}/src/main/proto/DelayedMessageIndexBucketSegment.proto</source>
           </sources>
           <targetSourcesSubDir>generated-sources/lightproto/java</targetSourcesSubDir>
           <targetTestSourcesSubDir>generated-sources/lightproto/java</targetTestSourcesSubDir>
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
index 18a4c322f7b..040bbbc586f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
@@ -19,7 +19,7 @@
 package org.apache.pulsar.broker.delayed.bucket;
 
 import com.google.protobuf.InvalidProtocolBufferException;
-import java.io.IOException;
+import io.netty.buffer.ByteBuf;
 import java.util.ArrayList;
 import java.util.Enumeration;
 import java.util.List;
@@ -36,8 +36,8 @@ import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata;
-import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
+import org.apache.pulsar.broker.delayed.proto.SnapshotMetadata;
+import org.apache.pulsar.broker.delayed.proto.SnapshotSegment;
 import org.apache.pulsar.common.util.FutureUtil;
 
 @Slf4j
@@ -126,7 +126,8 @@ public class BookkeeperBucketSnapshotStorage implements BucketSnapshotStorage {
 
     private SnapshotMetadata parseSnapshotMetadataEntry(LedgerEntry ledgerEntry) {
         try {
-            return SnapshotMetadata.parseFrom(ledgerEntry.getEntry());
+            ByteBuf entryBuffer = ledgerEntry.getEntryBuffer();
+            return SnapshotMetadata.parseFrom(entryBuffer.nioBuffer());
         } catch (InvalidProtocolBufferException e) {
             throw new BucketSnapshotSerializationException(e);
         }
@@ -134,15 +135,14 @@ public class BookkeeperBucketSnapshotStorage implements BucketSnapshotStorage {
 
     private List<SnapshotSegment> parseSnapshotSegmentEntries(Enumeration<LedgerEntry> entryEnumeration) {
         List<SnapshotSegment> snapshotMetadataList = new ArrayList<>();
-        try {
-            while (entryEnumeration.hasMoreElements()) {
-                LedgerEntry ledgerEntry = entryEnumeration.nextElement();
-                snapshotMetadataList.add(SnapshotSegment.parseFrom(ledgerEntry.getEntry()));
-            }
-            return snapshotMetadataList;
-        } catch (IOException e) {
-            throw new BucketSnapshotSerializationException(e);
+        while (entryEnumeration.hasMoreElements()) {
+            LedgerEntry ledgerEntry = entryEnumeration.nextElement();
+            SnapshotSegment snapshotSegment = new SnapshotSegment();
+            ByteBuf entryBuffer = ledgerEntry.getEntryBuffer();
+            snapshotSegment.parseFrom(entryBuffer, entryBuffer.readableBytes());
+            snapshotMetadataList.add(snapshotSegment);
         }
+        return snapshotMetadataList;
     }
 
     @NotNull
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java
index 4d7d3aa512b..a1693b1553d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java
@@ -31,7 +31,8 @@ import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
-import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat;
+import org.apache.pulsar.broker.delayed.proto.SnapshotMetadata;
+import org.apache.pulsar.broker.delayed.proto.SnapshotSegment;
 import org.apache.pulsar.common.util.Codec;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.roaringbitmap.RoaringBitmap;
@@ -132,8 +133,8 @@ abstract class Bucket {
     }
 
     CompletableFuture<Long> asyncSaveBucketSnapshot(
-            ImmutableBucket bucket, DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata snapshotMetadata,
-            List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment> bucketSnapshotSegments) {
+            ImmutableBucket bucket, SnapshotMetadata snapshotMetadata,
+            List<SnapshotSegment> bucketSnapshotSegments) {
         final String bucketKey = bucket.bucketKey();
         final String cursorName = Codec.decode(cursor.getName());
         final String topicName = dispatcherName.substring(0, dispatcherName.lastIndexOf(" / " + cursorName));
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
index b17387e276e..c90064c9137 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
@@ -53,8 +53,8 @@ import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.mutable.MutableLong;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.delayed.AbstractDelayedDeliveryTracker;
-import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat;
-import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.DelayedIndex;
+import org.apache.pulsar.broker.delayed.proto.DelayedIndex;
+import org.apache.pulsar.broker.delayed.proto.SnapshotSegment;
 import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
 import org.apache.pulsar.common.policies.data.stats.TopicMetricBean;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -286,8 +286,7 @@ public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker
                     // Put indexes back into the shared queue and downgrade to memory mode
                     synchronized (BucketDelayedDeliveryTracker.this) {
                         immutableBucket.getSnapshotSegments().ifPresent(snapshotSegments -> {
-                            for (DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment snapshotSegment :
-                                    snapshotSegments) {
+                            for (SnapshotSegment snapshotSegment : snapshotSegments) {
                                 for (DelayedIndex delayedIndex : snapshotSegment.getIndexesList()) {
                                     sharedBucketPriorityQueue.add(delayedIndex.getTimestamp(),
                                             delayedIndex.getLedgerId(), delayedIndex.getEntryId());
@@ -450,7 +449,7 @@ public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker
                 return FutureUtil.failedFuture(new RuntimeException("Can't merge buckets due to bucket create failed"));
             }
 
-            List<CompletableFuture<List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment>>> getRemainFutures =
+            List<CompletableFuture<List<SnapshotSegment>>> getRemainFutures =
                     buckets.stream().map(ImmutableBucket::getRemainSnapshotSegment).toList();
 
             return FutureUtil.waitForAll(getRemainFutures)
@@ -601,11 +600,11 @@ public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker
                             bucket.asyncDeleteBucketSnapshot(stats);
                             return;
                         }
-                        DelayedMessageIndexBucketSnapshotFormat.DelayedIndex
+                        DelayedIndex
                                 lastDelayedIndex = indexList.get(indexList.size() - 1);
                         this.snapshotSegmentLastIndexTable.put(lastDelayedIndex.getLedgerId(),
                                 lastDelayedIndex.getEntryId(), bucket);
-                        for (DelayedMessageIndexBucketSnapshotFormat.DelayedIndex index : indexList) {
+                        for (DelayedIndex index : indexList) {
                             sharedBucketPriorityQueue.add(index.getTimestamp(), index.getLedgerId(),
                                     index.getEntryId());
                         }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketSnapshotStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketSnapshotStorage.java
index 51c89bed47a..7464ef9cd3f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketSnapshotStorage.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketSnapshotStorage.java
@@ -20,8 +20,8 @@ package org.apache.pulsar.broker.delayed.bucket;
 
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
-import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata;
-import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
+import org.apache.pulsar.broker.delayed.proto.SnapshotMetadata;
+import org.apache.pulsar.broker.delayed.proto.SnapshotSegment;
 
 public interface BucketSnapshotStorage {
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/CombinedSegmentDelayedIndexQueue.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/CombinedSegmentDelayedIndexQueue.java
index 5655a268782..006938e9ed2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/CombinedSegmentDelayedIndexQueue.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/CombinedSegmentDelayedIndexQueue.java
@@ -24,8 +24,8 @@ import java.util.Objects;
 import java.util.PriorityQueue;
 import javax.annotation.concurrent.NotThreadSafe;
 import lombok.AllArgsConstructor;
-import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.DelayedIndex;
-import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
+import org.apache.pulsar.broker.delayed.proto.DelayedIndex;
+import org.apache.pulsar.broker.delayed.proto.SnapshotSegment;
 
 @NotThreadSafe
 class CombinedSegmentDelayedIndexQueue implements DelayedIndexQueue {
@@ -40,8 +40,8 @@ class CombinedSegmentDelayedIndexQueue implements DelayedIndexQueue {
     }
 
     private static final Comparator<Node> COMPARATOR_NODE = (node1, node2) -> DelayedIndexQueue.COMPARATOR.compare(
-            node1.segmentList.get(node1.segmentListCursor).getIndexes(node1.segmentCursor),
-            node2.segmentList.get(node2.segmentListCursor).getIndexes(node2.segmentCursor));
+            node1.segmentList.get(node1.segmentListCursor).getIndexeAt(node1.segmentCursor),
+            node2.segmentList.get(node2.segmentListCursor).getIndexeAt(node2.segmentCursor));
 
     private final PriorityQueue<Node> kpq;
 
@@ -77,7 +77,7 @@ class CombinedSegmentDelayedIndexQueue implements DelayedIndexQueue {
         Objects.requireNonNull(node);
 
         SnapshotSegment snapshotSegment = node.segmentList.get(node.segmentListCursor);
-        DelayedIndex delayedIndex = snapshotSegment.getIndexes(node.segmentCursor);
+        DelayedIndex delayedIndex = snapshotSegment.getIndexeAt(node.segmentCursor);
         if (!needAdvanceCursor) {
             return delayedIndex;
         }
@@ -104,4 +104,16 @@ class CombinedSegmentDelayedIndexQueue implements DelayedIndexQueue {
 
         return delayedIndex;
     }
+
+    @Override
+    public void popToObject(DelayedIndex delayedIndex) {
+        DelayedIndex value = getValue(true);
+        delayedIndex.copyFrom(value);
+    }
+
+    @Override
+    public long peekTimestamp() {
+        DelayedIndex value = getValue(false);
+        return value.getTimestamp();
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/DelayedIndexQueue.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/DelayedIndexQueue.java
index dee476c376e..f1209a3137a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/DelayedIndexQueue.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/DelayedIndexQueue.java
@@ -20,10 +20,10 @@ package org.apache.pulsar.broker.delayed.bucket;
 
 import java.util.Comparator;
 import java.util.Objects;
-import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat;
+import org.apache.pulsar.broker.delayed.proto.DelayedIndex;
 
 interface DelayedIndexQueue {
-    Comparator<DelayedMessageIndexBucketSnapshotFormat.DelayedIndex> COMPARATOR = (o1, o2) ->  {
+    Comparator<DelayedIndex> COMPARATOR = (o1, o2) ->  {
         if (!Objects.equals(o1.getTimestamp(), o2.getTimestamp())) {
             return Long.compare(o1.getTimestamp(), o2.getTimestamp());
         } else if (!Objects.equals(o1.getLedgerId(), o2.getLedgerId())) {
@@ -35,7 +35,11 @@ interface DelayedIndexQueue {
 
     boolean isEmpty();
 
-    DelayedMessageIndexBucketSnapshotFormat.DelayedIndex peek();
+    DelayedIndex peek();
 
-    DelayedMessageIndexBucketSnapshotFormat.DelayedIndex pop();
+    DelayedIndex pop();
+
+    void popToObject(DelayedIndex delayedIndex);
+
+    long peekTimestamp();
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
index 57de5c84fcd..0932f51f350 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
@@ -32,9 +32,9 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.mutable.MutableLong;
-import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat;
-import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.DelayedIndex;
-import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata;
+import org.apache.pulsar.broker.delayed.proto.DelayedIndex;
+import org.apache.pulsar.broker.delayed.proto.SnapshotSegment;
+import org.apache.pulsar.broker.delayed.proto.SnapshotSegmentMetadata;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.roaringbitmap.InvalidRoaringFormat;
 import org.roaringbitmap.RoaringBitmap;
@@ -43,7 +43,7 @@ import org.roaringbitmap.RoaringBitmap;
 class ImmutableBucket extends Bucket {
 
     @Setter
-    private List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment> snapshotSegments;
+    private List<SnapshotSegment> snapshotSegments;
 
     boolean merging = false;
 
@@ -55,7 +55,7 @@ class ImmutableBucket extends Bucket {
         super(dispatcherName, cursor, sequencer, storage, startLedgerId, endLedgerId);
     }
 
-    public Optional<List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment>> getSnapshotSegments() {
+    public Optional<List<SnapshotSegment>> getSnapshotSegments() {
         return Optional.ofNullable(snapshotSegments);
     }
 
@@ -84,7 +84,7 @@ class ImmutableBucket extends Bucket {
                         }
                     }), BucketSnapshotPersistenceException.class, MaxRetryTimes)
                     .thenApply(snapshotMetadata -> {
-                        List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata> metadataList =
+                        List<SnapshotSegmentMetadata> metadataList =
                                 snapshotMetadata.getMetadataListList();
 
                         // Skip all already reach schedule time snapshot segments
@@ -125,10 +125,9 @@ class ImmutableBucket extends Bucket {
                             return Collections.emptyList();
                         }
 
-                        DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment snapshotSegment =
+                        SnapshotSegment snapshotSegment =
                                 bucketSnapshotSegments.get(0);
-                        List<DelayedMessageIndexBucketSnapshotFormat.DelayedIndex> indexList =
-                                snapshotSegment.getIndexesList();
+                        List<DelayedIndex> indexList = snapshotSegment.getIndexesList();
                         this.setCurrentSegmentEntryId(nextSegmentEntryId);
                         if (isRecover) {
                             this.asyncUpdateSnapshotLength();
@@ -171,7 +170,7 @@ class ImmutableBucket extends Bucket {
         setNumberBucketDelayedMessages(numberMessages.getValue());
     }
 
-    CompletableFuture<List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment>> getRemainSnapshotSegment() {
+    CompletableFuture<List<SnapshotSegment>> getRemainSnapshotSegment() {
         int nextSegmentEntryId = currentSegmentEntryId + 1;
         if (nextSegmentEntryId > lastSegmentEntryId) {
             return CompletableFuture.completedFuture(Collections.emptyList());
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
index f404d5d02c1..b7e9e68f1bd 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
@@ -30,10 +30,10 @@ import java.util.concurrent.CompletableFuture;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.commons.lang3.tuple.Pair;
-import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.DelayedIndex;
-import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata;
-import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
-import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata;
+import org.apache.pulsar.broker.delayed.proto.DelayedIndex;
+import org.apache.pulsar.broker.delayed.proto.SnapshotMetadata;
+import org.apache.pulsar.broker.delayed.proto.SnapshotSegment;
+import org.apache.pulsar.broker.delayed.proto.SnapshotSegmentMetadata;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue;
 import org.roaringbitmap.RoaringBitmap;
@@ -75,14 +75,16 @@ class MutableBucket extends Bucket implements AutoCloseable {
         List<SnapshotSegment> bucketSnapshotSegments = new ArrayList<>();
         List<SnapshotSegmentMetadata> segmentMetadataList = new ArrayList<>();
         Map<Long, RoaringBitmap> bitMap = new HashMap<>();
-        SnapshotSegment.Builder snapshotSegmentBuilder = SnapshotSegment.newBuilder();
+        SnapshotSegment snapshotSegment = new SnapshotSegment();
         SnapshotSegmentMetadata.Builder segmentMetadataBuilder = SnapshotSegmentMetadata.newBuilder();
 
         List<Long> firstScheduleTimestamps = new ArrayList<>();
         long currentTimestampUpperLimit = 0;
         long currentFirstTimestamp = 0L;
         while (!delayedIndexQueue.isEmpty()) {
-            DelayedIndex delayedIndex = delayedIndexQueue.peek();
+            DelayedIndex delayedIndex = snapshotSegment.addIndexe();
+            delayedIndexQueue.popToObject(delayedIndex);
+
             long timestamp = delayedIndex.getTimestamp();
             if (currentTimestampUpperLimit == 0) {
                 currentFirstTimestamp = timestamp;
@@ -100,16 +102,13 @@ class MutableBucket extends Bucket implements AutoCloseable {
                 sharedQueue.add(timestamp, ledgerId, entryId);
             }
 
-            delayedIndexQueue.pop();
             numMessages++;
 
             bitMap.computeIfAbsent(ledgerId, k -> new RoaringBitmap()).add(entryId, entryId + 1);
 
-            snapshotSegmentBuilder.addIndexes(delayedIndex);
-
-            if (delayedIndexQueue.isEmpty() || delayedIndexQueue.peek().getTimestamp() > currentTimestampUpperLimit
+            if (delayedIndexQueue.isEmpty() || delayedIndexQueue.peekTimestamp() > currentTimestampUpperLimit
                     || (maxIndexesPerBucketSnapshotSegment != -1
-                    && snapshotSegmentBuilder.getIndexesCount() >= maxIndexesPerBucketSnapshotSegment)) {
+                    && snapshotSegment.getIndexesCount() >= maxIndexesPerBucketSnapshotSegment)) {
                 segmentMetadataBuilder.setMaxScheduleTimestamp(timestamp);
                 segmentMetadataBuilder.setMinScheduleTimestamp(currentFirstTimestamp);
                 currentTimestampUpperLimit = 0;
@@ -129,8 +128,8 @@ class MutableBucket extends Bucket implements AutoCloseable {
                 segmentMetadataList.add(segmentMetadataBuilder.build());
                 segmentMetadataBuilder.clear();
 
-                bucketSnapshotSegments.add(snapshotSegmentBuilder.build());
-                snapshotSegmentBuilder.clear();
+                bucketSnapshotSegments.add(snapshotSegment);
+                snapshotSegment = new SnapshotSegment();
             }
         }
 
@@ -153,8 +152,8 @@ class MutableBucket extends Bucket implements AutoCloseable {
 
         // Add the first snapshot segment last message to snapshotSegmentLastMessageTable
         checkArgument(!bucketSnapshotSegments.isEmpty());
-        SnapshotSegment snapshotSegment = bucketSnapshotSegments.get(0);
-        DelayedIndex lastDelayedIndex = snapshotSegment.getIndexes(snapshotSegment.getIndexesCount() - 1);
+        SnapshotSegment firstSnapshotSegment = bucketSnapshotSegments.get(0);
+        DelayedIndex lastDelayedIndex = firstSnapshotSegment.getIndexeAt(firstSnapshotSegment.getIndexesCount() - 1);
         Pair<ImmutableBucket, DelayedIndex> result = Pair.of(bucket, lastDelayedIndex);
 
         CompletableFuture<Long> future = asyncSaveBucketSnapshot(bucket,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/TripleLongPriorityDelayedIndexQueue.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/TripleLongPriorityDelayedIndexQueue.java
index b8d54bd78b4..4faee3b17f1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/TripleLongPriorityDelayedIndexQueue.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/TripleLongPriorityDelayedIndexQueue.java
@@ -19,7 +19,7 @@
 package org.apache.pulsar.broker.delayed.bucket;
 
 import javax.annotation.concurrent.NotThreadSafe;
-import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat;
+import org.apache.pulsar.broker.delayed.proto.DelayedIndex;
 import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue;
 
 @NotThreadSafe
@@ -41,17 +41,28 @@ class TripleLongPriorityDelayedIndexQueue implements DelayedIndexQueue {
     }
 
     @Override
-    public DelayedMessageIndexBucketSnapshotFormat.DelayedIndex peek() {
-        DelayedMessageIndexBucketSnapshotFormat.DelayedIndex delayedIndex =
-                DelayedMessageIndexBucketSnapshotFormat.DelayedIndex.newBuilder().setTimestamp(queue.peekN1())
-                        .setLedgerId(queue.peekN2()).setEntryId(queue.peekN3()).build();
+    public DelayedIndex peek() {
+        DelayedIndex delayedIndex = new DelayedIndex().setTimestamp(queue.peekN1())
+                .setLedgerId(queue.peekN2()).setEntryId(queue.peekN3());
         return delayedIndex;
     }
 
     @Override
-    public DelayedMessageIndexBucketSnapshotFormat.DelayedIndex pop() {
-        DelayedMessageIndexBucketSnapshotFormat.DelayedIndex peek = peek();
+    public DelayedIndex pop() {
+        DelayedIndex peek = peek();
         queue.pop();
         return peek;
     }
+
+    @Override
+    public void popToObject(DelayedIndex delayedIndex) {
+        delayedIndex.setTimestamp(queue.peekN1())
+                .setLedgerId(queue.peekN2()).setEntryId(queue.peekN3());
+        queue.pop();
+    }
+
+    @Override
+    public long peekTimestamp() {
+        return queue.peekN1();
+    }
 }
diff --git a/pulsar-broker/src/main/proto/DelayedMessageIndexBucketSnapshotFormat.proto b/pulsar-broker/src/main/proto/DelayedMessageIndexBucketMetadata.proto
similarity index 85%
copy from pulsar-broker/src/main/proto/DelayedMessageIndexBucketSnapshotFormat.proto
copy to pulsar-broker/src/main/proto/DelayedMessageIndexBucketMetadata.proto
index 6996b860c52..01b770c567d 100644
--- a/pulsar-broker/src/main/proto/DelayedMessageIndexBucketSnapshotFormat.proto
+++ b/pulsar-broker/src/main/proto/DelayedMessageIndexBucketMetadata.proto
@@ -21,12 +21,7 @@ syntax = "proto2";
 package pulsar.delay;
 option java_package = "org.apache.pulsar.broker.delayed.proto";
 option optimize_for = SPEED;
-
-message DelayedIndex {
-    required uint64 timestamp = 1;
-    required uint64 ledger_id = 2;
-    required uint64 entry_id = 3;
-}
+option java_multiple_files = true;
 
 message SnapshotSegmentMetadata {
     map<uint64, bytes> delayed_index_bit_map = 1;
@@ -34,10 +29,6 @@ message SnapshotSegmentMetadata {
     required uint64 min_schedule_timestamp = 3;
 }
 
-message SnapshotSegment {
-    repeated DelayedIndex indexes = 1;
-}
-
 message SnapshotMetadata {
     repeated SnapshotSegmentMetadata metadata_list = 1;
 }
diff --git a/pulsar-broker/src/main/proto/DelayedMessageIndexBucketSnapshotFormat.proto b/pulsar-broker/src/main/proto/DelayedMessageIndexBucketSegment.proto
similarity index 80%
rename from pulsar-broker/src/main/proto/DelayedMessageIndexBucketSnapshotFormat.proto
rename to pulsar-broker/src/main/proto/DelayedMessageIndexBucketSegment.proto
index 6996b860c52..633d6a8f161 100644
--- a/pulsar-broker/src/main/proto/DelayedMessageIndexBucketSnapshotFormat.proto
+++ b/pulsar-broker/src/main/proto/DelayedMessageIndexBucketSegment.proto
@@ -21,6 +21,7 @@ syntax = "proto2";
 package pulsar.delay;
 option java_package = "org.apache.pulsar.broker.delayed.proto";
 option optimize_for = SPEED;
+option java_multiple_files = true;
 
 message DelayedIndex {
     required uint64 timestamp = 1;
@@ -28,16 +29,7 @@ message DelayedIndex {
     required uint64 entry_id = 3;
 }
 
-message SnapshotSegmentMetadata {
-    map<uint64, bytes> delayed_index_bit_map = 1;
-    required uint64 max_schedule_timestamp = 2;
-    required uint64 min_schedule_timestamp = 3;
-}
-
 message SnapshotSegment {
     repeated DelayedIndex indexes = 1;
-}
-
-message SnapshotMetadata {
-    repeated SnapshotSegmentMetadata metadata_list = 1;
+    map<uint64, bytes> delayed_index_bit_map = 2;
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BookkeeperBucketSnapshotStorageTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BookkeeperBucketSnapshotStorageTest.java
index 7cb6b8d5865..d26f38fa2bc 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BookkeeperBucketSnapshotStorageTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BookkeeperBucketSnapshotStorageTest.java
@@ -29,7 +29,10 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.delayed.bucket.BookkeeperBucketSnapshotStorage;
-import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat;
+import org.apache.pulsar.broker.delayed.proto.DelayedIndex;
+import org.apache.pulsar.broker.delayed.proto.SnapshotMetadata;
+import org.apache.pulsar.broker.delayed.proto.SnapshotSegment;
+import org.apache.pulsar.broker.delayed.proto.SnapshotSegmentMetadata;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
@@ -60,9 +63,8 @@ public class BookkeeperBucketSnapshotStorageTest extends MockedPulsarServiceBase
 
     @Test
     public void testCreateSnapshot() throws ExecutionException, InterruptedException {
-        DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata snapshotMetadata =
-                DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata.newBuilder().build();
-        List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment> bucketSnapshotSegments = new ArrayList<>();
+        SnapshotMetadata snapshotMetadata = SnapshotMetadata.newBuilder().build();
+        List<SnapshotSegment> bucketSnapshotSegments = new ArrayList<>();
         CompletableFuture<Long> future =
                 bucketSnapshotStorage.createBucketSnapshot(snapshotMetadata,
                         bucketSnapshotSegments, UUID.randomUUID().toString(), TOPIC_NAME, CURSOR_NAME);
@@ -72,24 +74,23 @@ public class BookkeeperBucketSnapshotStorageTest extends MockedPulsarServiceBase
 
     @Test
     public void testGetSnapshot() throws ExecutionException, InterruptedException {
-        DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata segmentMetadata =
-                DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata.newBuilder()
+        SnapshotSegmentMetadata segmentMetadata =
+                SnapshotSegmentMetadata.newBuilder()
                         .setMinScheduleTimestamp(System.currentTimeMillis())
                         .setMaxScheduleTimestamp(System.currentTimeMillis())
                         .putDelayedIndexBitMap(100L, ByteString.copyFrom(new byte[1])).build();
 
-        DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata snapshotMetadata =
-                DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata.newBuilder()
+        SnapshotMetadata snapshotMetadata =
+                SnapshotMetadata.newBuilder()
                         .addMetadataList(segmentMetadata)
                         .build();
-        List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment> bucketSnapshotSegments = new ArrayList<>();
+        List<SnapshotSegment> bucketSnapshotSegments = new ArrayList<>();
 
         long timeMillis = System.currentTimeMillis();
-        DelayedMessageIndexBucketSnapshotFormat.DelayedIndex delayedIndex =
-                DelayedMessageIndexBucketSnapshotFormat.DelayedIndex.newBuilder().setLedgerId(100L).setEntryId(10L)
-                        .setTimestamp(timeMillis).build();
-        DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment snapshotSegment =
-                DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment.newBuilder().addIndexes(delayedIndex).build();
+        DelayedIndex delayedIndex = new DelayedIndex().setLedgerId(100L).setEntryId(10L)
+                        .setTimestamp(timeMillis);
+        SnapshotSegment snapshotSegment = new SnapshotSegment();
+        snapshotSegment.addIndexe().copyFrom(delayedIndex);
         bucketSnapshotSegments.add(snapshotSegment);
         bucketSnapshotSegments.add(snapshotSegment);
 
@@ -99,13 +100,13 @@ public class BookkeeperBucketSnapshotStorageTest extends MockedPulsarServiceBase
         Long bucketId = future.get();
         Assert.assertNotNull(bucketId);
 
-        CompletableFuture<List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment>> bucketSnapshotSegment =
+        CompletableFuture<List<SnapshotSegment>> bucketSnapshotSegment =
                 bucketSnapshotStorage.getBucketSnapshotSegment(bucketId, 1, 3);
 
-        List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment> snapshotSegments = bucketSnapshotSegment.get();
+        List<SnapshotSegment> snapshotSegments = bucketSnapshotSegment.get();
         Assert.assertEquals(2, snapshotSegments.size());
-        for (DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment segment : snapshotSegments) {
-            for (DelayedMessageIndexBucketSnapshotFormat.DelayedIndex index : segment.getIndexesList()) {
+        for (SnapshotSegment segment : snapshotSegments) {
+            for (DelayedIndex index : segment.getIndexesList()) {
                 Assert.assertEquals(100L, index.getLedgerId());
                 Assert.assertEquals(10L, index.getEntryId());
                 Assert.assertEquals(timeMillis, index.getTimestamp());
@@ -121,17 +122,17 @@ public class BookkeeperBucketSnapshotStorageTest extends MockedPulsarServiceBase
         map.put(100L, ByteString.copyFrom("test1", StandardCharsets.UTF_8));
         map.put(200L, ByteString.copyFrom("test2", StandardCharsets.UTF_8));
 
-        DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata segmentMetadata =
-                DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata.newBuilder()
+        SnapshotSegmentMetadata segmentMetadata =
+                SnapshotSegmentMetadata.newBuilder()
                         .setMaxScheduleTimestamp(timeMillis)
                         .setMinScheduleTimestamp(timeMillis)
                         .putAllDelayedIndexBitMap(map).build();
 
-        DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata snapshotMetadata =
-                DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata.newBuilder()
+        SnapshotMetadata snapshotMetadata =
+                SnapshotMetadata.newBuilder()
                         .addMetadataList(segmentMetadata)
                         .build();
-        List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment> bucketSnapshotSegments = new ArrayList<>();
+        List<SnapshotSegment> bucketSnapshotSegments = new ArrayList<>();
 
         CompletableFuture<Long> future =
                 bucketSnapshotStorage.createBucketSnapshot(snapshotMetadata,
@@ -139,10 +140,10 @@ public class BookkeeperBucketSnapshotStorageTest extends MockedPulsarServiceBase
         Long bucketId = future.get();
         Assert.assertNotNull(bucketId);
 
-        DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata bucketSnapshotMetadata =
+        SnapshotMetadata bucketSnapshotMetadata =
                 bucketSnapshotStorage.getBucketSnapshotMetadata(bucketId).get();
 
-        DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata metadata =
+        SnapshotSegmentMetadata metadata =
                 bucketSnapshotMetadata.getMetadataList(0);
 
         Assert.assertEquals(timeMillis, metadata.getMaxScheduleTimestamp());
@@ -152,9 +153,9 @@ public class BookkeeperBucketSnapshotStorageTest extends MockedPulsarServiceBase
 
     @Test
     public void testDeleteSnapshot() throws ExecutionException, InterruptedException {
-        DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata snapshotMetadata =
-                DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata.newBuilder().build();
-        List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment> bucketSnapshotSegments = new ArrayList<>();
+        SnapshotMetadata snapshotMetadata =
+                SnapshotMetadata.newBuilder().build();
+        List<SnapshotSegment> bucketSnapshotSegments = new ArrayList<>();
         CompletableFuture<Long> future =
                 bucketSnapshotStorage.createBucketSnapshot(snapshotMetadata,
                         bucketSnapshotSegments, UUID.randomUUID().toString(), TOPIC_NAME, CURSOR_NAME);
@@ -173,24 +174,22 @@ public class BookkeeperBucketSnapshotStorageTest extends MockedPulsarServiceBase
 
     @Test
     public void testGetBucketSnapshotLength() throws ExecutionException, InterruptedException {
-        DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata segmentMetadata =
-                DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata.newBuilder()
+        SnapshotSegmentMetadata segmentMetadata =
+                SnapshotSegmentMetadata.newBuilder()
                         .setMinScheduleTimestamp(System.currentTimeMillis())
                         .setMaxScheduleTimestamp(System.currentTimeMillis())
                         .putDelayedIndexBitMap(100L, ByteString.copyFrom(new byte[1])).build();
 
-        DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata snapshotMetadata =
-                DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata.newBuilder()
+        SnapshotMetadata snapshotMetadata =
+                SnapshotMetadata.newBuilder()
                         .addMetadataList(segmentMetadata)
                         .build();
-        List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment> bucketSnapshotSegments = new ArrayList<>();
+        List<SnapshotSegment> bucketSnapshotSegments = new ArrayList<>();
 
         long timeMillis = System.currentTimeMillis();
-        DelayedMessageIndexBucketSnapshotFormat.DelayedIndex delayedIndex =
-                DelayedMessageIndexBucketSnapshotFormat.DelayedIndex.newBuilder().setLedgerId(100L).setEntryId(10L)
-                        .setTimestamp(timeMillis).build();
-        DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment snapshotSegment =
-                DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment.newBuilder().addIndexes(delayedIndex).build();
+        DelayedIndex delayedIndex = new DelayedIndex().setLedgerId(100L).setEntryId(10L).setTimestamp(timeMillis);
+        SnapshotSegment snapshotSegment = new SnapshotSegment();
+        snapshotSegment.addIndexe().copyFrom(delayedIndex);
         bucketSnapshotSegments.add(snapshotSegment);
         bucketSnapshotSegments.add(snapshotSegment);
 
@@ -207,24 +206,22 @@ public class BookkeeperBucketSnapshotStorageTest extends MockedPulsarServiceBase
 
     @Test
     public void testConcurrencyGet() throws ExecutionException, InterruptedException {
-        DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata segmentMetadata =
-                DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata.newBuilder()
+        SnapshotSegmentMetadata segmentMetadata =
+                SnapshotSegmentMetadata.newBuilder()
                         .setMinScheduleTimestamp(System.currentTimeMillis())
                         .setMaxScheduleTimestamp(System.currentTimeMillis())
                         .putDelayedIndexBitMap(100L, ByteString.copyFrom(new byte[1])).build();
 
-        DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata snapshotMetadata =
-                DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata.newBuilder()
+        SnapshotMetadata snapshotMetadata =
+                SnapshotMetadata.newBuilder()
                         .addMetadataList(segmentMetadata)
                         .build();
-        List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment> bucketSnapshotSegments = new ArrayList<>();
+        List<SnapshotSegment> bucketSnapshotSegments = new ArrayList<>();
 
         long timeMillis = System.currentTimeMillis();
-        DelayedMessageIndexBucketSnapshotFormat.DelayedIndex delayedIndex =
-                DelayedMessageIndexBucketSnapshotFormat.DelayedIndex.newBuilder().setLedgerId(100L).setEntryId(10L)
-                        .setTimestamp(timeMillis).build();
-        DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment snapshotSegment =
-                DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment.newBuilder().addIndexes(delayedIndex).build();
+        DelayedIndex delayedIndex = new DelayedIndex().setLedgerId(100L).setEntryId(10L).setTimestamp(timeMillis);
+        SnapshotSegment snapshotSegment = new SnapshotSegment();
+        snapshotSegment.addIndexe().copyFrom(delayedIndex);
         bucketSnapshotSegments.add(snapshotSegment);
         bucketSnapshotSegments.add(snapshotSegment);
 
@@ -237,7 +234,7 @@ public class BookkeeperBucketSnapshotStorageTest extends MockedPulsarServiceBase
         List<CompletableFuture<Void>> futures = new ArrayList<>();
         for (int i = 0; i < 100; i++) {
             CompletableFuture<Void> future0 = CompletableFuture.runAsync(() -> {
-                List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment> list =
+                List<SnapshotSegment> list =
                         bucketSnapshotStorage.getBucketSnapshotSegment(bucketId, 1, 3).join();
                 Assert.assertTrue(list.size() > 0);
             });
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockBucketSnapshotStorage.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockBucketSnapshotStorage.java
index 9e924bdeda3..dc1c0e09ca2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockBucketSnapshotStorage.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockBucketSnapshotStorage.java
@@ -36,8 +36,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.delayed.bucket.BucketSnapshotStorage;
-import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata;
-import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
+import org.apache.pulsar.broker.delayed.proto.SnapshotMetadata;
+import org.apache.pulsar.broker.delayed.proto.SnapshotSegment;
 import org.apache.pulsar.common.util.FutureUtil;
 
 @Slf4j
@@ -139,13 +139,9 @@ public class MockBucketSnapshotStorage implements BucketSnapshotStorage {
             long lastEntryId = Math.min(lastSegmentEntryId, this.bucketSnapshots.get(bucketId).size());
             for (int i = (int) firstSegmentEntryId; i <= lastEntryId ; i++) {
                 ByteBuf byteBuf = this.bucketSnapshots.get(bucketId).get(i);
-                SnapshotSegment snapshotSegment;
-                try {
-                    snapshotSegment = SnapshotSegment.parseFrom(byteBuf.nioBuffer());
-                    snapshotSegments.add(snapshotSegment);
-                } catch (InvalidProtocolBufferException e) {
-                    throw new RuntimeException(e);
-                }
+                SnapshotSegment snapshotSegment = new SnapshotSegment();
+                snapshotSegment.parseFrom(byteBuf, byteBuf.readableBytes());
+                snapshotSegments.add(snapshotSegment);
             }
             return snapshotSegments;
         }, executorService);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/DelayedIndexQueueTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/DelayedIndexQueueTest.java
index 8f87f0d49a2..5dc3dcc7cb9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/DelayedIndexQueueTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/DelayedIndexQueueTest.java
@@ -23,8 +23,8 @@ import java.util.ArrayList;
 import java.util.List;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.DelayedIndex;
-import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
+import org.apache.pulsar.broker.delayed.proto.DelayedIndex;
+import org.apache.pulsar.broker.delayed.proto.SnapshotSegment;
 import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -35,27 +35,19 @@ public class DelayedIndexQueueTest {
     @Test
     public void testCompare() {
         DelayedIndex delayedIndex =
-                DelayedIndex.newBuilder().setTimestamp(1).setLedgerId(1L).setEntryId(1L)
-                        .build();
+                new DelayedIndex().setTimestamp(1).setLedgerId(1L).setEntryId(1L);
         DelayedIndex delayedIndex2 =
-                DelayedIndex.newBuilder().setTimestamp(2).setLedgerId(2L).setEntryId(2L)
-                        .build();
+                new DelayedIndex().setTimestamp(2).setLedgerId(2L).setEntryId(2L);
         Assert.assertTrue(COMPARATOR.compare(delayedIndex, delayedIndex2) < 0);
 
         delayedIndex =
-                DelayedIndex.newBuilder().setTimestamp(1).setLedgerId(1L).setEntryId(1L)
-                        .build();
+                new DelayedIndex().setTimestamp(1).setLedgerId(1L).setEntryId(1L);
         delayedIndex2 =
-                DelayedIndex.newBuilder().setTimestamp(1).setLedgerId(2L).setEntryId(2L)
-                        .build();
+                new DelayedIndex().setTimestamp(1).setLedgerId(2L).setEntryId(2L);
         Assert.assertTrue(COMPARATOR.compare(delayedIndex, delayedIndex2) < 0);
 
-        delayedIndex =
-                DelayedIndex.newBuilder().setTimestamp(1).setLedgerId(1L).setEntryId(1L)
-                        .build();
-        delayedIndex2 =
-                DelayedIndex.newBuilder().setTimestamp(1).setLedgerId(1L).setEntryId(2L)
-                        .build();
+        delayedIndex = new DelayedIndex().setTimestamp(1).setLedgerId(1L).setEntryId(1L);
+        delayedIndex2 = new DelayedIndex().setTimestamp(1).setLedgerId(1L).setEntryId(2L);
         Assert.assertTrue(COMPARATOR.compare(delayedIndex, delayedIndex2) < 0);
     }
 
@@ -63,21 +55,19 @@ public class DelayedIndexQueueTest {
     public void testCombinedSegmentDelayedIndexQueue() {
         List<DelayedIndex> listA = new ArrayList<>();
         for (int i = 0; i < 10; i++) {
-            DelayedIndex delayedIndex =
-                    DelayedIndex.newBuilder().setTimestamp(i).setLedgerId(1L).setEntryId(1L)
-                            .build();
+            DelayedIndex delayedIndex = new DelayedIndex().setTimestamp(i).setLedgerId(1L).setEntryId(1L);
             listA.add(delayedIndex);
         }
-        SnapshotSegment snapshotSegmentA1 = SnapshotSegment.newBuilder().addAllIndexes(listA).build();
+        SnapshotSegment snapshotSegmentA1 = new SnapshotSegment();
+        snapshotSegmentA1.addAllIndexes(listA);
 
         List<DelayedIndex> listA2 = new ArrayList<>();
         for (int i = 10; i < 20; i++) {
-            DelayedIndex delayedIndex =
-                    DelayedIndex.newBuilder().setTimestamp(i).setLedgerId(1L).setEntryId(1L)
-                            .build();
+            DelayedIndex delayedIndex = new DelayedIndex().setTimestamp(i).setLedgerId(1L).setEntryId(1L);
             listA2.add(delayedIndex);
         }
-        SnapshotSegment snapshotSegmentA2 = SnapshotSegment.newBuilder().addAllIndexes(listA2).build();
+        SnapshotSegment snapshotSegmentA2 = new SnapshotSegment();
+        snapshotSegmentA2.addAllIndexes(listA2);
 
         List<SnapshotSegment> segmentListA = new ArrayList<>();
         segmentListA.add(snapshotSegmentA1);
@@ -85,36 +75,32 @@ public class DelayedIndexQueueTest {
 
         List<DelayedIndex> listB = new ArrayList<>();
         for (int i = 0; i < 9; i++) {
-            DelayedIndex delayedIndex =
-                    DelayedIndex.newBuilder().setTimestamp(i).setLedgerId(2L).setEntryId(1L)
-                            .build();
+            DelayedIndex delayedIndex = new DelayedIndex().setTimestamp(i).setLedgerId(2L).setEntryId(1L);
 
-            DelayedIndex delayedIndex2 =
-                    DelayedIndex.newBuilder().setTimestamp(i).setLedgerId(2L).setEntryId(2L)
-                            .build();
+            DelayedIndex delayedIndex2 = new DelayedIndex().setTimestamp(i).setLedgerId(2L).setEntryId(2L);
             listB.add(delayedIndex);
             listB.add(delayedIndex2);
         }
 
-        SnapshotSegment snapshotSegmentB = SnapshotSegment.newBuilder().addAllIndexes(listB).build();
+        SnapshotSegment snapshotSegmentB = new SnapshotSegment();
+        snapshotSegmentB.addAllIndexes(listB);
         List<SnapshotSegment> segmentListB = new ArrayList<>();
         segmentListB.add(snapshotSegmentB);
-        segmentListB.add(SnapshotSegment.newBuilder().build());
+        segmentListB.add(new SnapshotSegment());
 
         List<DelayedIndex> listC = new ArrayList<>();
         for (int i = 10; i < 30; i+=2) {
             DelayedIndex delayedIndex =
-                    DelayedIndex.newBuilder().setTimestamp(i).setLedgerId(2L).setEntryId(1L)
-                            .build();
+                    new DelayedIndex().setTimestamp(i).setLedgerId(2L).setEntryId(1L);
 
             DelayedIndex delayedIndex2 =
-                    DelayedIndex.newBuilder().setTimestamp(i).setLedgerId(2L).setEntryId(2L)
-                            .build();
+                    new DelayedIndex().setTimestamp(i).setLedgerId(2L).setEntryId(2L);
             listC.add(delayedIndex);
             listC.add(delayedIndex2);
         }
 
-        SnapshotSegment snapshotSegmentC = SnapshotSegment.newBuilder().addAllIndexes(listC).build();
+        SnapshotSegment snapshotSegmentC = new SnapshotSegment();
+        snapshotSegmentC.addAllIndexes(listC);
         List<SnapshotSegment> segmentListC = new ArrayList<>();
         segmentListC.add(snapshotSegmentC);
 
@@ -123,11 +109,14 @@ public class DelayedIndexQueueTest {
 
         int count = 0;
         while (!delayedIndexQueue.isEmpty()) {
-            DelayedIndex pop = delayedIndexQueue.pop();
+            DelayedIndex pop = new DelayedIndex();
+            delayedIndexQueue.popToObject(pop);
             log.info("{} , {}, {}", pop.getTimestamp(), pop.getLedgerId(), pop.getEntryId());
             count++;
             if (!delayedIndexQueue.isEmpty()) {
                 DelayedIndex peek = delayedIndexQueue.peek();
+                long timestamp = delayedIndexQueue.peekTimestamp();
+                Assert.assertEquals(timestamp, peek.getTimestamp());
                 Assert.assertTrue(COMPARATOR.compare(peek, pop) >= 0);
             }
         }
@@ -147,10 +136,13 @@ public class DelayedIndexQueueTest {
 
         int count = 0;
         while (!delayedIndexQueue.isEmpty()) {
-            DelayedIndex pop = delayedIndexQueue.pop();
+            DelayedIndex pop = new DelayedIndex();
+            delayedIndexQueue.popToObject(pop);
             count++;
             if (!delayedIndexQueue.isEmpty()) {
                 DelayedIndex peek = delayedIndexQueue.peek();
+                long timestamp = delayedIndexQueue.peekTimestamp();
+                Assert.assertEquals(timestamp, peek.getTimestamp());
                 Assert.assertTrue(COMPARATOR.compare(peek, pop) >= 0);
             }
         }