You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2023/04/21 08:06:30 UTC
[pulsar] branch master updated: [improve][broker] Move bitmap from lastMutableBucket to ImmutableBucket (#20156)
This is an automated email from the ASF dual-hosted git repository.
zhaocong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e5a833a2dcb [improve][broker] Move bitmap from lastMutableBucket to ImmutableBucket (#20156)
e5a833a2dcb is described below
commit e5a833a2dcb7ce13ada4ca94714cc045a02de276
Author: Cong Zhao <zh...@apache.org>
AuthorDate: Fri Apr 21 16:06:22 2023 +0800
[improve][broker] Move bitmap from lastMutableBucket to ImmutableBucket (#20156)
---
.../bucket/BucketDelayedDeliveryTracker.java | 3 +-
.../broker/delayed/bucket/MutableBucket.java | 41 +++++++++++++++-------
2 files changed, 31 insertions(+), 13 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
index c90064c9137..67a7de1f013 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
@@ -168,7 +168,7 @@ public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker
}
try {
- FutureUtil.waitForAll(futures.values()).get(AsyncOperationTimeoutSeconds * 2, TimeUnit.SECONDS);
+ FutureUtil.waitForAll(futures.values()).get(AsyncOperationTimeoutSeconds * 5, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
log.error("[{}] Failed to recover delayed message index bucket snapshot.", dispatcher.getName(), e);
if (e instanceof InterruptedException) {
@@ -343,6 +343,7 @@ public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker
// If (ledgerId < startLedgerId || existBucket) means that message index belong to previous bucket range,
// enter sharedBucketPriorityQueue directly
sharedBucketPriorityQueue.add(deliverAt, ledgerId, entryId);
+ lastMutableBucket.putIndexBit(ledgerId, entryId);
} else {
checkArgument(ledgerId >= lastMutableBucket.endLedgerId);
lastMutableBucket.addMessage(ledgerId, entryId, deliverAt);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
index b7e9e68f1bd..1173a401a89 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
@@ -19,7 +19,7 @@
package org.apache.pulsar.broker.delayed.bucket;
import static com.google.common.base.Preconditions.checkArgument;
-import com.google.protobuf.ByteString;
+import com.google.protobuf.UnsafeByteOperations;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
@@ -74,6 +74,8 @@ class MutableBucket extends Bucket implements AutoCloseable {
List<SnapshotSegment> bucketSnapshotSegments = new ArrayList<>();
List<SnapshotSegmentMetadata> segmentMetadataList = new ArrayList<>();
+ Map<Long, RoaringBitmap> immutableBucketBitMap = new HashMap<>();
+
Map<Long, RoaringBitmap> bitMap = new HashMap<>();
SnapshotSegment snapshotSegment = new SnapshotSegment();
SnapshotSegmentMetadata.Builder segmentMetadataBuilder = SnapshotSegmentMetadata.newBuilder();
@@ -82,18 +84,20 @@ class MutableBucket extends Bucket implements AutoCloseable {
long currentTimestampUpperLimit = 0;
long currentFirstTimestamp = 0L;
while (!delayedIndexQueue.isEmpty()) {
- DelayedIndex delayedIndex = snapshotSegment.addIndexe();
- delayedIndexQueue.popToObject(delayedIndex);
-
- long timestamp = delayedIndex.getTimestamp();
+ final long timestamp = delayedIndexQueue.peekTimestamp();
if (currentTimestampUpperLimit == 0) {
currentFirstTimestamp = timestamp;
firstScheduleTimestamps.add(currentFirstTimestamp);
currentTimestampUpperLimit = timestamp + timeStepPerBucketSnapshotSegment - 1;
}
- long ledgerId = delayedIndex.getLedgerId();
- long entryId = delayedIndex.getEntryId();
+ DelayedIndex delayedIndex = snapshotSegment.addIndexe();
+ delayedIndexQueue.popToObject(delayedIndex);
+
+ final long ledgerId = delayedIndex.getLedgerId();
+ final long entryId = delayedIndex.getEntryId();
+
+ removeIndexBit(ledgerId, entryId);
checkArgument(ledgerId >= startLedgerId && ledgerId <= endLedgerId);
@@ -102,10 +106,10 @@ class MutableBucket extends Bucket implements AutoCloseable {
sharedQueue.add(timestamp, ledgerId, entryId);
}
- numMessages++;
-
bitMap.computeIfAbsent(ledgerId, k -> new RoaringBitmap()).add(entryId, entryId + 1);
+ numMessages++;
+
if (delayedIndexQueue.isEmpty() || delayedIndexQueue.peekTimestamp() > currentTimestampUpperLimit
|| (maxIndexesPerBucketSnapshotSegment != -1
&& snapshotSegment.getIndexesCount() >= maxIndexesPerBucketSnapshotSegment)) {
@@ -119,9 +123,17 @@ class MutableBucket extends Bucket implements AutoCloseable {
final var lId = entry.getKey();
final var bm = entry.getValue();
bm.runOptimize();
- final var array = new byte[bm.serializedSizeInBytes()];
- bm.serialize(ByteBuffer.wrap(array));
- segmentMetadataBuilder.putDelayedIndexBitMap(lId, ByteString.copyFrom(array));
+ ByteBuffer byteBuffer = ByteBuffer.allocate(bm.serializedSizeInBytes());
+ bm.serialize(byteBuffer);
+ byteBuffer.flip();
+ segmentMetadataBuilder.putDelayedIndexBitMap(lId, UnsafeByteOperations.unsafeWrap(byteBuffer));
+ immutableBucketBitMap.compute(lId, (__, bm0) -> {
+ if (bm0 == null) {
+ return bm;
+ }
+ bm0.or(bm);
+ return bm0;
+ });
iterator.remove();
}
@@ -133,6 +145,10 @@ class MutableBucket extends Bucket implements AutoCloseable {
}
}
+ // optimize bm
+ immutableBucketBitMap.values().forEach(RoaringBitmap::runOptimize);
+ this.delayedIndexBitMap.values().forEach(RoaringBitmap::runOptimize);
+
SnapshotMetadata bucketSnapshotMetadata = SnapshotMetadata.newBuilder()
.addAllMetadataList(segmentMetadataList)
.build();
@@ -145,6 +161,7 @@ class MutableBucket extends Bucket implements AutoCloseable {
bucket.setNumberBucketDelayedMessages(numMessages);
bucket.setLastSegmentEntryId(lastSegmentEntryId);
bucket.setFirstScheduleTimestamps(firstScheduleTimestamps);
+ bucket.setDelayedIndexBitMap(immutableBucketBitMap);
// Skip first segment, because it has already been loaded
List<SnapshotSegment> snapshotSegments = bucketSnapshotSegments.subList(1, bucketSnapshotSegments.size());