You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by yu...@apache.org on 2023/05/07 23:07:35 UTC
[pulsar] 03/03: [improve][broker] Optimization protobuf code in the bucket delayed tracker (#20158)
This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 0743c774fd6621a8ddcdf50b7f28dc8f68a6e492
Author: Cong Zhao <zh...@apache.org>
AuthorDate: Fri Apr 21 09:51:55 2023 +0800
[improve][broker] Optimization protobuf code in the bucket delayed tracker (#20158)
(cherry picked from commit a20d5e96acc91f8099845e8924e84cb0b5632f3f)
---
pulsar-broker/pom.xml | 2 +
.../bucket/BookkeeperBucketSnapshotStorage.java | 24 +++---
.../pulsar/broker/delayed/bucket/Bucket.java | 7 +-
.../bucket/BucketDelayedDeliveryTracker.java | 13 ++-
.../delayed/bucket/BucketSnapshotStorage.java | 4 +-
.../bucket/CombinedSegmentDelayedIndexQueue.java | 22 +++--
.../broker/delayed/bucket/DelayedIndexQueue.java | 12 ++-
.../broker/delayed/bucket/ImmutableBucket.java | 19 ++---
.../broker/delayed/bucket/MutableBucket.java | 29 ++++---
.../TripleLongPriorityDelayedIndexQueue.java | 25 ++++--
...oto => DelayedMessageIndexBucketMetadata.proto} | 11 +--
...roto => DelayedMessageIndexBucketSegment.proto} | 12 +--
.../BookkeeperBucketSnapshotStorageTest.java | 95 +++++++++++-----------
.../broker/delayed/MockBucketSnapshotStorage.java | 14 ++--
.../delayed/bucket/DelayedIndexQueueTest.java | 70 +++++++---------
15 files changed, 177 insertions(+), 182 deletions(-)
diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml
index 31b335e1aea..25b31b4f2b7 100644
--- a/pulsar-broker/pom.xml
+++ b/pulsar-broker/pom.xml
@@ -566,6 +566,7 @@
<excludes>
<exclude>**/ResourceUsage.proto</exclude>
<exclude>**/TransactionPendingAck.proto</exclude>
+ <exclude>**/DelayedMessageIndexBucketSegment.proto</exclude>
</excludes>
</configuration>
<executions>
@@ -610,6 +611,7 @@
<sources>
<source>${project.basedir}/src/main/proto/TransactionPendingAck.proto</source>
<source>${project.basedir}/src/main/proto/ResourceUsage.proto</source>
+ <source>${project.basedir}/src/main/proto/DelayedMessageIndexBucketSegment.proto</source>
</sources>
<targetSourcesSubDir>generated-sources/lightproto/java</targetSourcesSubDir>
<targetTestSourcesSubDir>generated-sources/lightproto/java</targetTestSourcesSubDir>
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
index 18a4c322f7b..040bbbc586f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
@@ -19,7 +19,7 @@
package org.apache.pulsar.broker.delayed.bucket;
import com.google.protobuf.InvalidProtocolBufferException;
-import java.io.IOException;
+import io.netty.buffer.ByteBuf;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
@@ -36,8 +36,8 @@ import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata;
-import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
+import org.apache.pulsar.broker.delayed.proto.SnapshotMetadata;
+import org.apache.pulsar.broker.delayed.proto.SnapshotSegment;
import org.apache.pulsar.common.util.FutureUtil;
@Slf4j
@@ -126,7 +126,8 @@ public class BookkeeperBucketSnapshotStorage implements BucketSnapshotStorage {
private SnapshotMetadata parseSnapshotMetadataEntry(LedgerEntry ledgerEntry) {
try {
- return SnapshotMetadata.parseFrom(ledgerEntry.getEntry());
+ ByteBuf entryBuffer = ledgerEntry.getEntryBuffer();
+ return SnapshotMetadata.parseFrom(entryBuffer.nioBuffer());
} catch (InvalidProtocolBufferException e) {
throw new BucketSnapshotSerializationException(e);
}
@@ -134,15 +135,14 @@ public class BookkeeperBucketSnapshotStorage implements BucketSnapshotStorage {
private List<SnapshotSegment> parseSnapshotSegmentEntries(Enumeration<LedgerEntry> entryEnumeration) {
List<SnapshotSegment> snapshotMetadataList = new ArrayList<>();
- try {
- while (entryEnumeration.hasMoreElements()) {
- LedgerEntry ledgerEntry = entryEnumeration.nextElement();
- snapshotMetadataList.add(SnapshotSegment.parseFrom(ledgerEntry.getEntry()));
- }
- return snapshotMetadataList;
- } catch (IOException e) {
- throw new BucketSnapshotSerializationException(e);
+ while (entryEnumeration.hasMoreElements()) {
+ LedgerEntry ledgerEntry = entryEnumeration.nextElement();
+ SnapshotSegment snapshotSegment = new SnapshotSegment();
+ ByteBuf entryBuffer = ledgerEntry.getEntryBuffer();
+ snapshotSegment.parseFrom(entryBuffer, entryBuffer.readableBytes());
+ snapshotMetadataList.add(snapshotSegment);
}
+ return snapshotMetadataList;
}
@NotNull
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java
index 4d7d3aa512b..a1693b1553d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java
@@ -31,7 +31,8 @@ import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
-import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat;
+import org.apache.pulsar.broker.delayed.proto.SnapshotMetadata;
+import org.apache.pulsar.broker.delayed.proto.SnapshotSegment;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.FutureUtil;
import org.roaringbitmap.RoaringBitmap;
@@ -132,8 +133,8 @@ abstract class Bucket {
}
CompletableFuture<Long> asyncSaveBucketSnapshot(
- ImmutableBucket bucket, DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata snapshotMetadata,
- List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment> bucketSnapshotSegments) {
+ ImmutableBucket bucket, SnapshotMetadata snapshotMetadata,
+ List<SnapshotSegment> bucketSnapshotSegments) {
final String bucketKey = bucket.bucketKey();
final String cursorName = Codec.decode(cursor.getName());
final String topicName = dispatcherName.substring(0, dispatcherName.lastIndexOf(" / " + cursorName));
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
index b17387e276e..c90064c9137 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
@@ -53,8 +53,8 @@ import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.mutable.MutableLong;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.delayed.AbstractDelayedDeliveryTracker;
-import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat;
-import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.DelayedIndex;
+import org.apache.pulsar.broker.delayed.proto.DelayedIndex;
+import org.apache.pulsar.broker.delayed.proto.SnapshotSegment;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.common.policies.data.stats.TopicMetricBean;
import org.apache.pulsar.common.util.FutureUtil;
@@ -286,8 +286,7 @@ public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker
// Put indexes back into the shared queue and downgrade to memory mode
synchronized (BucketDelayedDeliveryTracker.this) {
immutableBucket.getSnapshotSegments().ifPresent(snapshotSegments -> {
- for (DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment snapshotSegment :
- snapshotSegments) {
+ for (SnapshotSegment snapshotSegment : snapshotSegments) {
for (DelayedIndex delayedIndex : snapshotSegment.getIndexesList()) {
sharedBucketPriorityQueue.add(delayedIndex.getTimestamp(),
delayedIndex.getLedgerId(), delayedIndex.getEntryId());
@@ -450,7 +449,7 @@ public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker
return FutureUtil.failedFuture(new RuntimeException("Can't merge buckets due to bucket create failed"));
}
- List<CompletableFuture<List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment>>> getRemainFutures =
+ List<CompletableFuture<List<SnapshotSegment>>> getRemainFutures =
buckets.stream().map(ImmutableBucket::getRemainSnapshotSegment).toList();
return FutureUtil.waitForAll(getRemainFutures)
@@ -601,11 +600,11 @@ public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker
bucket.asyncDeleteBucketSnapshot(stats);
return;
}
- DelayedMessageIndexBucketSnapshotFormat.DelayedIndex
+ DelayedIndex
lastDelayedIndex = indexList.get(indexList.size() - 1);
this.snapshotSegmentLastIndexTable.put(lastDelayedIndex.getLedgerId(),
lastDelayedIndex.getEntryId(), bucket);
- for (DelayedMessageIndexBucketSnapshotFormat.DelayedIndex index : indexList) {
+ for (DelayedIndex index : indexList) {
sharedBucketPriorityQueue.add(index.getTimestamp(), index.getLedgerId(),
index.getEntryId());
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketSnapshotStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketSnapshotStorage.java
index 51c89bed47a..7464ef9cd3f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketSnapshotStorage.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketSnapshotStorage.java
@@ -20,8 +20,8 @@ package org.apache.pulsar.broker.delayed.bucket;
import java.util.List;
import java.util.concurrent.CompletableFuture;
-import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata;
-import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
+import org.apache.pulsar.broker.delayed.proto.SnapshotMetadata;
+import org.apache.pulsar.broker.delayed.proto.SnapshotSegment;
public interface BucketSnapshotStorage {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/CombinedSegmentDelayedIndexQueue.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/CombinedSegmentDelayedIndexQueue.java
index 5655a268782..006938e9ed2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/CombinedSegmentDelayedIndexQueue.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/CombinedSegmentDelayedIndexQueue.java
@@ -24,8 +24,8 @@ import java.util.Objects;
import java.util.PriorityQueue;
import javax.annotation.concurrent.NotThreadSafe;
import lombok.AllArgsConstructor;
-import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.DelayedIndex;
-import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
+import org.apache.pulsar.broker.delayed.proto.DelayedIndex;
+import org.apache.pulsar.broker.delayed.proto.SnapshotSegment;
@NotThreadSafe
class CombinedSegmentDelayedIndexQueue implements DelayedIndexQueue {
@@ -40,8 +40,8 @@ class CombinedSegmentDelayedIndexQueue implements DelayedIndexQueue {
}
private static final Comparator<Node> COMPARATOR_NODE = (node1, node2) -> DelayedIndexQueue.COMPARATOR.compare(
- node1.segmentList.get(node1.segmentListCursor).getIndexes(node1.segmentCursor),
- node2.segmentList.get(node2.segmentListCursor).getIndexes(node2.segmentCursor));
+ node1.segmentList.get(node1.segmentListCursor).getIndexeAt(node1.segmentCursor),
+ node2.segmentList.get(node2.segmentListCursor).getIndexeAt(node2.segmentCursor));
private final PriorityQueue<Node> kpq;
@@ -77,7 +77,7 @@ class CombinedSegmentDelayedIndexQueue implements DelayedIndexQueue {
Objects.requireNonNull(node);
SnapshotSegment snapshotSegment = node.segmentList.get(node.segmentListCursor);
- DelayedIndex delayedIndex = snapshotSegment.getIndexes(node.segmentCursor);
+ DelayedIndex delayedIndex = snapshotSegment.getIndexeAt(node.segmentCursor);
if (!needAdvanceCursor) {
return delayedIndex;
}
@@ -104,4 +104,16 @@ class CombinedSegmentDelayedIndexQueue implements DelayedIndexQueue {
return delayedIndex;
}
+
+ @Override
+ public void popToObject(DelayedIndex delayedIndex) {
+ DelayedIndex value = getValue(true);
+ delayedIndex.copyFrom(value);
+ }
+
+ @Override
+ public long peekTimestamp() {
+ DelayedIndex value = getValue(false);
+ return value.getTimestamp();
+ }
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/DelayedIndexQueue.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/DelayedIndexQueue.java
index dee476c376e..f1209a3137a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/DelayedIndexQueue.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/DelayedIndexQueue.java
@@ -20,10 +20,10 @@ package org.apache.pulsar.broker.delayed.bucket;
import java.util.Comparator;
import java.util.Objects;
-import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat;
+import org.apache.pulsar.broker.delayed.proto.DelayedIndex;
interface DelayedIndexQueue {
- Comparator<DelayedMessageIndexBucketSnapshotFormat.DelayedIndex> COMPARATOR = (o1, o2) -> {
+ Comparator<DelayedIndex> COMPARATOR = (o1, o2) -> {
if (!Objects.equals(o1.getTimestamp(), o2.getTimestamp())) {
return Long.compare(o1.getTimestamp(), o2.getTimestamp());
} else if (!Objects.equals(o1.getLedgerId(), o2.getLedgerId())) {
@@ -35,7 +35,11 @@ interface DelayedIndexQueue {
boolean isEmpty();
- DelayedMessageIndexBucketSnapshotFormat.DelayedIndex peek();
+ DelayedIndex peek();
- DelayedMessageIndexBucketSnapshotFormat.DelayedIndex pop();
+ DelayedIndex pop();
+
+ void popToObject(DelayedIndex delayedIndex);
+
+ long peekTimestamp();
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
index 57de5c84fcd..0932f51f350 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
@@ -32,9 +32,9 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.mutable.MutableLong;
-import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat;
-import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.DelayedIndex;
-import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata;
+import org.apache.pulsar.broker.delayed.proto.DelayedIndex;
+import org.apache.pulsar.broker.delayed.proto.SnapshotSegment;
+import org.apache.pulsar.broker.delayed.proto.SnapshotSegmentMetadata;
import org.apache.pulsar.common.util.FutureUtil;
import org.roaringbitmap.InvalidRoaringFormat;
import org.roaringbitmap.RoaringBitmap;
@@ -43,7 +43,7 @@ import org.roaringbitmap.RoaringBitmap;
class ImmutableBucket extends Bucket {
@Setter
- private List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment> snapshotSegments;
+ private List<SnapshotSegment> snapshotSegments;
boolean merging = false;
@@ -55,7 +55,7 @@ class ImmutableBucket extends Bucket {
super(dispatcherName, cursor, sequencer, storage, startLedgerId, endLedgerId);
}
- public Optional<List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment>> getSnapshotSegments() {
+ public Optional<List<SnapshotSegment>> getSnapshotSegments() {
return Optional.ofNullable(snapshotSegments);
}
@@ -84,7 +84,7 @@ class ImmutableBucket extends Bucket {
}
}), BucketSnapshotPersistenceException.class, MaxRetryTimes)
.thenApply(snapshotMetadata -> {
- List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata> metadataList =
+ List<SnapshotSegmentMetadata> metadataList =
snapshotMetadata.getMetadataListList();
// Skip all already reach schedule time snapshot segments
@@ -125,10 +125,9 @@ class ImmutableBucket extends Bucket {
return Collections.emptyList();
}
- DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment snapshotSegment =
+ SnapshotSegment snapshotSegment =
bucketSnapshotSegments.get(0);
- List<DelayedMessageIndexBucketSnapshotFormat.DelayedIndex> indexList =
- snapshotSegment.getIndexesList();
+ List<DelayedIndex> indexList = snapshotSegment.getIndexesList();
this.setCurrentSegmentEntryId(nextSegmentEntryId);
if (isRecover) {
this.asyncUpdateSnapshotLength();
@@ -171,7 +170,7 @@ class ImmutableBucket extends Bucket {
setNumberBucketDelayedMessages(numberMessages.getValue());
}
- CompletableFuture<List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment>> getRemainSnapshotSegment() {
+ CompletableFuture<List<SnapshotSegment>> getRemainSnapshotSegment() {
int nextSegmentEntryId = currentSegmentEntryId + 1;
if (nextSegmentEntryId > lastSegmentEntryId) {
return CompletableFuture.completedFuture(Collections.emptyList());
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
index f404d5d02c1..b7e9e68f1bd 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
@@ -30,10 +30,10 @@ import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.commons.lang3.tuple.Pair;
-import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.DelayedIndex;
-import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata;
-import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
-import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata;
+import org.apache.pulsar.broker.delayed.proto.DelayedIndex;
+import org.apache.pulsar.broker.delayed.proto.SnapshotMetadata;
+import org.apache.pulsar.broker.delayed.proto.SnapshotSegment;
+import org.apache.pulsar.broker.delayed.proto.SnapshotSegmentMetadata;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue;
import org.roaringbitmap.RoaringBitmap;
@@ -75,14 +75,16 @@ class MutableBucket extends Bucket implements AutoCloseable {
List<SnapshotSegment> bucketSnapshotSegments = new ArrayList<>();
List<SnapshotSegmentMetadata> segmentMetadataList = new ArrayList<>();
Map<Long, RoaringBitmap> bitMap = new HashMap<>();
- SnapshotSegment.Builder snapshotSegmentBuilder = SnapshotSegment.newBuilder();
+ SnapshotSegment snapshotSegment = new SnapshotSegment();
SnapshotSegmentMetadata.Builder segmentMetadataBuilder = SnapshotSegmentMetadata.newBuilder();
List<Long> firstScheduleTimestamps = new ArrayList<>();
long currentTimestampUpperLimit = 0;
long currentFirstTimestamp = 0L;
while (!delayedIndexQueue.isEmpty()) {
- DelayedIndex delayedIndex = delayedIndexQueue.peek();
+ DelayedIndex delayedIndex = snapshotSegment.addIndexe();
+ delayedIndexQueue.popToObject(delayedIndex);
+
long timestamp = delayedIndex.getTimestamp();
if (currentTimestampUpperLimit == 0) {
currentFirstTimestamp = timestamp;
@@ -100,16 +102,13 @@ class MutableBucket extends Bucket implements AutoCloseable {
sharedQueue.add(timestamp, ledgerId, entryId);
}
- delayedIndexQueue.pop();
numMessages++;
bitMap.computeIfAbsent(ledgerId, k -> new RoaringBitmap()).add(entryId, entryId + 1);
- snapshotSegmentBuilder.addIndexes(delayedIndex);
-
- if (delayedIndexQueue.isEmpty() || delayedIndexQueue.peek().getTimestamp() > currentTimestampUpperLimit
+ if (delayedIndexQueue.isEmpty() || delayedIndexQueue.peekTimestamp() > currentTimestampUpperLimit
|| (maxIndexesPerBucketSnapshotSegment != -1
- && snapshotSegmentBuilder.getIndexesCount() >= maxIndexesPerBucketSnapshotSegment)) {
+ && snapshotSegment.getIndexesCount() >= maxIndexesPerBucketSnapshotSegment)) {
segmentMetadataBuilder.setMaxScheduleTimestamp(timestamp);
segmentMetadataBuilder.setMinScheduleTimestamp(currentFirstTimestamp);
currentTimestampUpperLimit = 0;
@@ -129,8 +128,8 @@ class MutableBucket extends Bucket implements AutoCloseable {
segmentMetadataList.add(segmentMetadataBuilder.build());
segmentMetadataBuilder.clear();
- bucketSnapshotSegments.add(snapshotSegmentBuilder.build());
- snapshotSegmentBuilder.clear();
+ bucketSnapshotSegments.add(snapshotSegment);
+ snapshotSegment = new SnapshotSegment();
}
}
@@ -153,8 +152,8 @@ class MutableBucket extends Bucket implements AutoCloseable {
// Add the first snapshot segment last message to snapshotSegmentLastMessageTable
checkArgument(!bucketSnapshotSegments.isEmpty());
- SnapshotSegment snapshotSegment = bucketSnapshotSegments.get(0);
- DelayedIndex lastDelayedIndex = snapshotSegment.getIndexes(snapshotSegment.getIndexesCount() - 1);
+ SnapshotSegment firstSnapshotSegment = bucketSnapshotSegments.get(0);
+ DelayedIndex lastDelayedIndex = firstSnapshotSegment.getIndexeAt(firstSnapshotSegment.getIndexesCount() - 1);
Pair<ImmutableBucket, DelayedIndex> result = Pair.of(bucket, lastDelayedIndex);
CompletableFuture<Long> future = asyncSaveBucketSnapshot(bucket,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/TripleLongPriorityDelayedIndexQueue.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/TripleLongPriorityDelayedIndexQueue.java
index b8d54bd78b4..4faee3b17f1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/TripleLongPriorityDelayedIndexQueue.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/TripleLongPriorityDelayedIndexQueue.java
@@ -19,7 +19,7 @@
package org.apache.pulsar.broker.delayed.bucket;
import javax.annotation.concurrent.NotThreadSafe;
-import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat;
+import org.apache.pulsar.broker.delayed.proto.DelayedIndex;
import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue;
@NotThreadSafe
@@ -41,17 +41,28 @@ class TripleLongPriorityDelayedIndexQueue implements DelayedIndexQueue {
}
@Override
- public DelayedMessageIndexBucketSnapshotFormat.DelayedIndex peek() {
- DelayedMessageIndexBucketSnapshotFormat.DelayedIndex delayedIndex =
- DelayedMessageIndexBucketSnapshotFormat.DelayedIndex.newBuilder().setTimestamp(queue.peekN1())
- .setLedgerId(queue.peekN2()).setEntryId(queue.peekN3()).build();
+ public DelayedIndex peek() {
+ DelayedIndex delayedIndex = new DelayedIndex().setTimestamp(queue.peekN1())
+ .setLedgerId(queue.peekN2()).setEntryId(queue.peekN3());
return delayedIndex;
}
@Override
- public DelayedMessageIndexBucketSnapshotFormat.DelayedIndex pop() {
- DelayedMessageIndexBucketSnapshotFormat.DelayedIndex peek = peek();
+ public DelayedIndex pop() {
+ DelayedIndex peek = peek();
queue.pop();
return peek;
}
+
+ @Override
+ public void popToObject(DelayedIndex delayedIndex) {
+ delayedIndex.setTimestamp(queue.peekN1())
+ .setLedgerId(queue.peekN2()).setEntryId(queue.peekN3());
+ queue.pop();
+ }
+
+ @Override
+ public long peekTimestamp() {
+ return queue.peekN1();
+ }
}
diff --git a/pulsar-broker/src/main/proto/DelayedMessageIndexBucketSnapshotFormat.proto b/pulsar-broker/src/main/proto/DelayedMessageIndexBucketMetadata.proto
similarity index 85%
copy from pulsar-broker/src/main/proto/DelayedMessageIndexBucketSnapshotFormat.proto
copy to pulsar-broker/src/main/proto/DelayedMessageIndexBucketMetadata.proto
index 6996b860c52..01b770c567d 100644
--- a/pulsar-broker/src/main/proto/DelayedMessageIndexBucketSnapshotFormat.proto
+++ b/pulsar-broker/src/main/proto/DelayedMessageIndexBucketMetadata.proto
@@ -21,12 +21,7 @@ syntax = "proto2";
package pulsar.delay;
option java_package = "org.apache.pulsar.broker.delayed.proto";
option optimize_for = SPEED;
-
-message DelayedIndex {
- required uint64 timestamp = 1;
- required uint64 ledger_id = 2;
- required uint64 entry_id = 3;
-}
+option java_multiple_files = true;
message SnapshotSegmentMetadata {
map<uint64, bytes> delayed_index_bit_map = 1;
@@ -34,10 +29,6 @@ message SnapshotSegmentMetadata {
required uint64 min_schedule_timestamp = 3;
}
-message SnapshotSegment {
- repeated DelayedIndex indexes = 1;
-}
-
message SnapshotMetadata {
repeated SnapshotSegmentMetadata metadata_list = 1;
}
diff --git a/pulsar-broker/src/main/proto/DelayedMessageIndexBucketSnapshotFormat.proto b/pulsar-broker/src/main/proto/DelayedMessageIndexBucketSegment.proto
similarity index 80%
rename from pulsar-broker/src/main/proto/DelayedMessageIndexBucketSnapshotFormat.proto
rename to pulsar-broker/src/main/proto/DelayedMessageIndexBucketSegment.proto
index 6996b860c52..633d6a8f161 100644
--- a/pulsar-broker/src/main/proto/DelayedMessageIndexBucketSnapshotFormat.proto
+++ b/pulsar-broker/src/main/proto/DelayedMessageIndexBucketSegment.proto
@@ -21,6 +21,7 @@ syntax = "proto2";
package pulsar.delay;
option java_package = "org.apache.pulsar.broker.delayed.proto";
option optimize_for = SPEED;
+option java_multiple_files = true;
message DelayedIndex {
required uint64 timestamp = 1;
@@ -28,16 +29,7 @@ message DelayedIndex {
required uint64 entry_id = 3;
}
-message SnapshotSegmentMetadata {
- map<uint64, bytes> delayed_index_bit_map = 1;
- required uint64 max_schedule_timestamp = 2;
- required uint64 min_schedule_timestamp = 3;
-}
-
message SnapshotSegment {
repeated DelayedIndex indexes = 1;
-}
-
-message SnapshotMetadata {
- repeated SnapshotSegmentMetadata metadata_list = 1;
+ map<uint64, bytes> delayed_index_bit_map = 2;
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BookkeeperBucketSnapshotStorageTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BookkeeperBucketSnapshotStorageTest.java
index 7cb6b8d5865..d26f38fa2bc 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BookkeeperBucketSnapshotStorageTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BookkeeperBucketSnapshotStorageTest.java
@@ -29,7 +29,10 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.delayed.bucket.BookkeeperBucketSnapshotStorage;
-import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat;
+import org.apache.pulsar.broker.delayed.proto.DelayedIndex;
+import org.apache.pulsar.broker.delayed.proto.SnapshotMetadata;
+import org.apache.pulsar.broker.delayed.proto.SnapshotSegment;
+import org.apache.pulsar.broker.delayed.proto.SnapshotSegmentMetadata;
import org.apache.pulsar.common.util.FutureUtil;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
@@ -60,9 +63,8 @@ public class BookkeeperBucketSnapshotStorageTest extends MockedPulsarServiceBase
@Test
public void testCreateSnapshot() throws ExecutionException, InterruptedException {
- DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata snapshotMetadata =
- DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata.newBuilder().build();
- List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment> bucketSnapshotSegments = new ArrayList<>();
+ SnapshotMetadata snapshotMetadata = SnapshotMetadata.newBuilder().build();
+ List<SnapshotSegment> bucketSnapshotSegments = new ArrayList<>();
CompletableFuture<Long> future =
bucketSnapshotStorage.createBucketSnapshot(snapshotMetadata,
bucketSnapshotSegments, UUID.randomUUID().toString(), TOPIC_NAME, CURSOR_NAME);
@@ -72,24 +74,23 @@ public class BookkeeperBucketSnapshotStorageTest extends MockedPulsarServiceBase
@Test
public void testGetSnapshot() throws ExecutionException, InterruptedException {
- DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata segmentMetadata =
- DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata.newBuilder()
+ SnapshotSegmentMetadata segmentMetadata =
+ SnapshotSegmentMetadata.newBuilder()
.setMinScheduleTimestamp(System.currentTimeMillis())
.setMaxScheduleTimestamp(System.currentTimeMillis())
.putDelayedIndexBitMap(100L, ByteString.copyFrom(new byte[1])).build();
- DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata snapshotMetadata =
- DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata.newBuilder()
+ SnapshotMetadata snapshotMetadata =
+ SnapshotMetadata.newBuilder()
.addMetadataList(segmentMetadata)
.build();
- List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment> bucketSnapshotSegments = new ArrayList<>();
+ List<SnapshotSegment> bucketSnapshotSegments = new ArrayList<>();
long timeMillis = System.currentTimeMillis();
- DelayedMessageIndexBucketSnapshotFormat.DelayedIndex delayedIndex =
- DelayedMessageIndexBucketSnapshotFormat.DelayedIndex.newBuilder().setLedgerId(100L).setEntryId(10L)
- .setTimestamp(timeMillis).build();
- DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment snapshotSegment =
- DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment.newBuilder().addIndexes(delayedIndex).build();
+ DelayedIndex delayedIndex = new DelayedIndex().setLedgerId(100L).setEntryId(10L)
+ .setTimestamp(timeMillis);
+ SnapshotSegment snapshotSegment = new SnapshotSegment();
+ snapshotSegment.addIndexe().copyFrom(delayedIndex);
bucketSnapshotSegments.add(snapshotSegment);
bucketSnapshotSegments.add(snapshotSegment);
@@ -99,13 +100,13 @@ public class BookkeeperBucketSnapshotStorageTest extends MockedPulsarServiceBase
Long bucketId = future.get();
Assert.assertNotNull(bucketId);
- CompletableFuture<List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment>> bucketSnapshotSegment =
+ CompletableFuture<List<SnapshotSegment>> bucketSnapshotSegment =
bucketSnapshotStorage.getBucketSnapshotSegment(bucketId, 1, 3);
- List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment> snapshotSegments = bucketSnapshotSegment.get();
+ List<SnapshotSegment> snapshotSegments = bucketSnapshotSegment.get();
Assert.assertEquals(2, snapshotSegments.size());
- for (DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment segment : snapshotSegments) {
- for (DelayedMessageIndexBucketSnapshotFormat.DelayedIndex index : segment.getIndexesList()) {
+ for (SnapshotSegment segment : snapshotSegments) {
+ for (DelayedIndex index : segment.getIndexesList()) {
Assert.assertEquals(100L, index.getLedgerId());
Assert.assertEquals(10L, index.getEntryId());
Assert.assertEquals(timeMillis, index.getTimestamp());
@@ -121,17 +122,17 @@ public class BookkeeperBucketSnapshotStorageTest extends MockedPulsarServiceBase
map.put(100L, ByteString.copyFrom("test1", StandardCharsets.UTF_8));
map.put(200L, ByteString.copyFrom("test2", StandardCharsets.UTF_8));
- DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata segmentMetadata =
- DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata.newBuilder()
+ SnapshotSegmentMetadata segmentMetadata =
+ SnapshotSegmentMetadata.newBuilder()
.setMaxScheduleTimestamp(timeMillis)
.setMinScheduleTimestamp(timeMillis)
.putAllDelayedIndexBitMap(map).build();
- DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata snapshotMetadata =
- DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata.newBuilder()
+ SnapshotMetadata snapshotMetadata =
+ SnapshotMetadata.newBuilder()
.addMetadataList(segmentMetadata)
.build();
- List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment> bucketSnapshotSegments = new ArrayList<>();
+ List<SnapshotSegment> bucketSnapshotSegments = new ArrayList<>();
CompletableFuture<Long> future =
bucketSnapshotStorage.createBucketSnapshot(snapshotMetadata,
@@ -139,10 +140,10 @@ public class BookkeeperBucketSnapshotStorageTest extends MockedPulsarServiceBase
Long bucketId = future.get();
Assert.assertNotNull(bucketId);
- DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata bucketSnapshotMetadata =
+ SnapshotMetadata bucketSnapshotMetadata =
bucketSnapshotStorage.getBucketSnapshotMetadata(bucketId).get();
- DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata metadata =
+ SnapshotSegmentMetadata metadata =
bucketSnapshotMetadata.getMetadataList(0);
Assert.assertEquals(timeMillis, metadata.getMaxScheduleTimestamp());
@@ -152,9 +153,9 @@ public class BookkeeperBucketSnapshotStorageTest extends MockedPulsarServiceBase
@Test
public void testDeleteSnapshot() throws ExecutionException, InterruptedException {
- DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata snapshotMetadata =
- DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata.newBuilder().build();
- List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment> bucketSnapshotSegments = new ArrayList<>();
+ SnapshotMetadata snapshotMetadata =
+ SnapshotMetadata.newBuilder().build();
+ List<SnapshotSegment> bucketSnapshotSegments = new ArrayList<>();
CompletableFuture<Long> future =
bucketSnapshotStorage.createBucketSnapshot(snapshotMetadata,
bucketSnapshotSegments, UUID.randomUUID().toString(), TOPIC_NAME, CURSOR_NAME);
@@ -173,24 +174,22 @@ public class BookkeeperBucketSnapshotStorageTest extends MockedPulsarServiceBase
@Test
public void testGetBucketSnapshotLength() throws ExecutionException, InterruptedException {
- DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata segmentMetadata =
- DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata.newBuilder()
+ SnapshotSegmentMetadata segmentMetadata =
+ SnapshotSegmentMetadata.newBuilder()
.setMinScheduleTimestamp(System.currentTimeMillis())
.setMaxScheduleTimestamp(System.currentTimeMillis())
.putDelayedIndexBitMap(100L, ByteString.copyFrom(new byte[1])).build();
- DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata snapshotMetadata =
- DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata.newBuilder()
+ SnapshotMetadata snapshotMetadata =
+ SnapshotMetadata.newBuilder()
.addMetadataList(segmentMetadata)
.build();
- List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment> bucketSnapshotSegments = new ArrayList<>();
+ List<SnapshotSegment> bucketSnapshotSegments = new ArrayList<>();
long timeMillis = System.currentTimeMillis();
- DelayedMessageIndexBucketSnapshotFormat.DelayedIndex delayedIndex =
- DelayedMessageIndexBucketSnapshotFormat.DelayedIndex.newBuilder().setLedgerId(100L).setEntryId(10L)
- .setTimestamp(timeMillis).build();
- DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment snapshotSegment =
- DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment.newBuilder().addIndexes(delayedIndex).build();
+ DelayedIndex delayedIndex = new DelayedIndex().setLedgerId(100L).setEntryId(10L).setTimestamp(timeMillis);
+ SnapshotSegment snapshotSegment = new SnapshotSegment();
+ snapshotSegment.addIndexe().copyFrom(delayedIndex);
bucketSnapshotSegments.add(snapshotSegment);
bucketSnapshotSegments.add(snapshotSegment);
@@ -207,24 +206,22 @@ public class BookkeeperBucketSnapshotStorageTest extends MockedPulsarServiceBase
@Test
public void testConcurrencyGet() throws ExecutionException, InterruptedException {
- DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata segmentMetadata =
- DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata.newBuilder()
+ SnapshotSegmentMetadata segmentMetadata =
+ SnapshotSegmentMetadata.newBuilder()
.setMinScheduleTimestamp(System.currentTimeMillis())
.setMaxScheduleTimestamp(System.currentTimeMillis())
.putDelayedIndexBitMap(100L, ByteString.copyFrom(new byte[1])).build();
- DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata snapshotMetadata =
- DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata.newBuilder()
+ SnapshotMetadata snapshotMetadata =
+ SnapshotMetadata.newBuilder()
.addMetadataList(segmentMetadata)
.build();
- List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment> bucketSnapshotSegments = new ArrayList<>();
+ List<SnapshotSegment> bucketSnapshotSegments = new ArrayList<>();
long timeMillis = System.currentTimeMillis();
- DelayedMessageIndexBucketSnapshotFormat.DelayedIndex delayedIndex =
- DelayedMessageIndexBucketSnapshotFormat.DelayedIndex.newBuilder().setLedgerId(100L).setEntryId(10L)
- .setTimestamp(timeMillis).build();
- DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment snapshotSegment =
- DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment.newBuilder().addIndexes(delayedIndex).build();
+ DelayedIndex delayedIndex = new DelayedIndex().setLedgerId(100L).setEntryId(10L).setTimestamp(timeMillis);
+ SnapshotSegment snapshotSegment = new SnapshotSegment();
+ snapshotSegment.addIndexe().copyFrom(delayedIndex);
bucketSnapshotSegments.add(snapshotSegment);
bucketSnapshotSegments.add(snapshotSegment);
@@ -237,7 +234,7 @@ public class BookkeeperBucketSnapshotStorageTest extends MockedPulsarServiceBase
List<CompletableFuture<Void>> futures = new ArrayList<>();
for (int i = 0; i < 100; i++) {
CompletableFuture<Void> future0 = CompletableFuture.runAsync(() -> {
- List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment> list =
+ List<SnapshotSegment> list =
bucketSnapshotStorage.getBucketSnapshotSegment(bucketId, 1, 3).join();
Assert.assertTrue(list.size() > 0);
});
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockBucketSnapshotStorage.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockBucketSnapshotStorage.java
index 9e924bdeda3..dc1c0e09ca2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockBucketSnapshotStorage.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockBucketSnapshotStorage.java
@@ -36,8 +36,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.delayed.bucket.BucketSnapshotStorage;
-import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata;
-import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
+import org.apache.pulsar.broker.delayed.proto.SnapshotMetadata;
+import org.apache.pulsar.broker.delayed.proto.SnapshotSegment;
import org.apache.pulsar.common.util.FutureUtil;
@Slf4j
@@ -139,13 +139,9 @@ public class MockBucketSnapshotStorage implements BucketSnapshotStorage {
long lastEntryId = Math.min(lastSegmentEntryId, this.bucketSnapshots.get(bucketId).size());
for (int i = (int) firstSegmentEntryId; i <= lastEntryId ; i++) {
ByteBuf byteBuf = this.bucketSnapshots.get(bucketId).get(i);
- SnapshotSegment snapshotSegment;
- try {
- snapshotSegment = SnapshotSegment.parseFrom(byteBuf.nioBuffer());
- snapshotSegments.add(snapshotSegment);
- } catch (InvalidProtocolBufferException e) {
- throw new RuntimeException(e);
- }
+ SnapshotSegment snapshotSegment = new SnapshotSegment();
+ snapshotSegment.parseFrom(byteBuf, byteBuf.readableBytes());
+ snapshotSegments.add(snapshotSegment);
}
return snapshotSegments;
}, executorService);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/DelayedIndexQueueTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/DelayedIndexQueueTest.java
index 8f87f0d49a2..5dc3dcc7cb9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/DelayedIndexQueueTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/DelayedIndexQueueTest.java
@@ -23,8 +23,8 @@ import java.util.ArrayList;
import java.util.List;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.DelayedIndex;
-import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
+import org.apache.pulsar.broker.delayed.proto.DelayedIndex;
+import org.apache.pulsar.broker.delayed.proto.SnapshotSegment;
import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -35,27 +35,19 @@ public class DelayedIndexQueueTest {
@Test
public void testCompare() {
DelayedIndex delayedIndex =
- DelayedIndex.newBuilder().setTimestamp(1).setLedgerId(1L).setEntryId(1L)
- .build();
+ new DelayedIndex().setTimestamp(1).setLedgerId(1L).setEntryId(1L);
DelayedIndex delayedIndex2 =
- DelayedIndex.newBuilder().setTimestamp(2).setLedgerId(2L).setEntryId(2L)
- .build();
+ new DelayedIndex().setTimestamp(2).setLedgerId(2L).setEntryId(2L);
Assert.assertTrue(COMPARATOR.compare(delayedIndex, delayedIndex2) < 0);
delayedIndex =
- DelayedIndex.newBuilder().setTimestamp(1).setLedgerId(1L).setEntryId(1L)
- .build();
+ new DelayedIndex().setTimestamp(1).setLedgerId(1L).setEntryId(1L);
delayedIndex2 =
- DelayedIndex.newBuilder().setTimestamp(1).setLedgerId(2L).setEntryId(2L)
- .build();
+ new DelayedIndex().setTimestamp(1).setLedgerId(2L).setEntryId(2L);
Assert.assertTrue(COMPARATOR.compare(delayedIndex, delayedIndex2) < 0);
- delayedIndex =
- DelayedIndex.newBuilder().setTimestamp(1).setLedgerId(1L).setEntryId(1L)
- .build();
- delayedIndex2 =
- DelayedIndex.newBuilder().setTimestamp(1).setLedgerId(1L).setEntryId(2L)
- .build();
+ delayedIndex = new DelayedIndex().setTimestamp(1).setLedgerId(1L).setEntryId(1L);
+ delayedIndex2 = new DelayedIndex().setTimestamp(1).setLedgerId(1L).setEntryId(2L);
Assert.assertTrue(COMPARATOR.compare(delayedIndex, delayedIndex2) < 0);
}
@@ -63,21 +55,19 @@ public class DelayedIndexQueueTest {
public void testCombinedSegmentDelayedIndexQueue() {
List<DelayedIndex> listA = new ArrayList<>();
for (int i = 0; i < 10; i++) {
- DelayedIndex delayedIndex =
- DelayedIndex.newBuilder().setTimestamp(i).setLedgerId(1L).setEntryId(1L)
- .build();
+ DelayedIndex delayedIndex = new DelayedIndex().setTimestamp(i).setLedgerId(1L).setEntryId(1L);
listA.add(delayedIndex);
}
- SnapshotSegment snapshotSegmentA1 = SnapshotSegment.newBuilder().addAllIndexes(listA).build();
+ SnapshotSegment snapshotSegmentA1 = new SnapshotSegment();
+ snapshotSegmentA1.addAllIndexes(listA);
List<DelayedIndex> listA2 = new ArrayList<>();
for (int i = 10; i < 20; i++) {
- DelayedIndex delayedIndex =
- DelayedIndex.newBuilder().setTimestamp(i).setLedgerId(1L).setEntryId(1L)
- .build();
+ DelayedIndex delayedIndex = new DelayedIndex().setTimestamp(i).setLedgerId(1L).setEntryId(1L);
listA2.add(delayedIndex);
}
- SnapshotSegment snapshotSegmentA2 = SnapshotSegment.newBuilder().addAllIndexes(listA2).build();
+ SnapshotSegment snapshotSegmentA2 = new SnapshotSegment();
+ snapshotSegmentA2.addAllIndexes(listA2);
List<SnapshotSegment> segmentListA = new ArrayList<>();
segmentListA.add(snapshotSegmentA1);
@@ -85,36 +75,32 @@ public class DelayedIndexQueueTest {
List<DelayedIndex> listB = new ArrayList<>();
for (int i = 0; i < 9; i++) {
- DelayedIndex delayedIndex =
- DelayedIndex.newBuilder().setTimestamp(i).setLedgerId(2L).setEntryId(1L)
- .build();
+ DelayedIndex delayedIndex = new DelayedIndex().setTimestamp(i).setLedgerId(2L).setEntryId(1L);
- DelayedIndex delayedIndex2 =
- DelayedIndex.newBuilder().setTimestamp(i).setLedgerId(2L).setEntryId(2L)
- .build();
+ DelayedIndex delayedIndex2 = new DelayedIndex().setTimestamp(i).setLedgerId(2L).setEntryId(2L);
listB.add(delayedIndex);
listB.add(delayedIndex2);
}
- SnapshotSegment snapshotSegmentB = SnapshotSegment.newBuilder().addAllIndexes(listB).build();
+ SnapshotSegment snapshotSegmentB = new SnapshotSegment();
+ snapshotSegmentB.addAllIndexes(listB);
List<SnapshotSegment> segmentListB = new ArrayList<>();
segmentListB.add(snapshotSegmentB);
- segmentListB.add(SnapshotSegment.newBuilder().build());
+ segmentListB.add(new SnapshotSegment());
List<DelayedIndex> listC = new ArrayList<>();
for (int i = 10; i < 30; i+=2) {
DelayedIndex delayedIndex =
- DelayedIndex.newBuilder().setTimestamp(i).setLedgerId(2L).setEntryId(1L)
- .build();
+ new DelayedIndex().setTimestamp(i).setLedgerId(2L).setEntryId(1L);
DelayedIndex delayedIndex2 =
- DelayedIndex.newBuilder().setTimestamp(i).setLedgerId(2L).setEntryId(2L)
- .build();
+ new DelayedIndex().setTimestamp(i).setLedgerId(2L).setEntryId(2L);
listC.add(delayedIndex);
listC.add(delayedIndex2);
}
- SnapshotSegment snapshotSegmentC = SnapshotSegment.newBuilder().addAllIndexes(listC).build();
+ SnapshotSegment snapshotSegmentC = new SnapshotSegment();
+ snapshotSegmentC.addAllIndexes(listC);
List<SnapshotSegment> segmentListC = new ArrayList<>();
segmentListC.add(snapshotSegmentC);
@@ -123,11 +109,14 @@ public class DelayedIndexQueueTest {
int count = 0;
while (!delayedIndexQueue.isEmpty()) {
- DelayedIndex pop = delayedIndexQueue.pop();
+ DelayedIndex pop = new DelayedIndex();
+ delayedIndexQueue.popToObject(pop);
log.info("{} , {}, {}", pop.getTimestamp(), pop.getLedgerId(), pop.getEntryId());
count++;
if (!delayedIndexQueue.isEmpty()) {
DelayedIndex peek = delayedIndexQueue.peek();
+ long timestamp = delayedIndexQueue.peekTimestamp();
+ Assert.assertEquals(timestamp, peek.getTimestamp());
Assert.assertTrue(COMPARATOR.compare(peek, pop) >= 0);
}
}
@@ -147,10 +136,13 @@ public class DelayedIndexQueueTest {
int count = 0;
while (!delayedIndexQueue.isEmpty()) {
- DelayedIndex pop = delayedIndexQueue.pop();
+ DelayedIndex pop = new DelayedIndex();
+ delayedIndexQueue.popToObject(pop);
count++;
if (!delayedIndexQueue.isEmpty()) {
DelayedIndex peek = delayedIndexQueue.peek();
+ long timestamp = delayedIndexQueue.peekTimestamp();
+ Assert.assertEquals(timestamp, peek.getTimestamp());
Assert.assertTrue(COMPARATOR.compare(peek, pop) >= 0);
}
}