You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2023/04/21 08:06:30 UTC

[pulsar] branch master updated: [improve][broker] Move bitmap from lastMutableBucket to ImmutableBucket (#20156)

This is an automated email from the ASF dual-hosted git repository.

zhaocong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e5a833a2dcb [improve][broker] Move bitmap from lastMutableBucket to ImmutableBucket (#20156)
e5a833a2dcb is described below

commit e5a833a2dcb7ce13ada4ca94714cc045a02de276
Author: Cong Zhao <zh...@apache.org>
AuthorDate: Fri Apr 21 16:06:22 2023 +0800

    [improve][broker] Move bitmap from lastMutableBucket to ImmutableBucket (#20156)
---
 .../bucket/BucketDelayedDeliveryTracker.java       |  3 +-
 .../broker/delayed/bucket/MutableBucket.java       | 41 +++++++++++++++-------
 2 files changed, 31 insertions(+), 13 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
index c90064c9137..67a7de1f013 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
@@ -168,7 +168,7 @@ public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker
         }
 
         try {
-            FutureUtil.waitForAll(futures.values()).get(AsyncOperationTimeoutSeconds * 2, TimeUnit.SECONDS);
+            FutureUtil.waitForAll(futures.values()).get(AsyncOperationTimeoutSeconds * 5, TimeUnit.SECONDS);
         } catch (InterruptedException | ExecutionException | TimeoutException e) {
             log.error("[{}] Failed to recover delayed message index bucket snapshot.", dispatcher.getName(), e);
             if (e instanceof InterruptedException) {
@@ -343,6 +343,7 @@ public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker
             // If (ledgerId < startLedgerId || existBucket) means that message index belong to previous bucket range,
             // enter sharedBucketPriorityQueue directly
             sharedBucketPriorityQueue.add(deliverAt, ledgerId, entryId);
+            lastMutableBucket.putIndexBit(ledgerId, entryId);
         } else {
             checkArgument(ledgerId >= lastMutableBucket.endLedgerId);
             lastMutableBucket.addMessage(ledgerId, entryId, deliverAt);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
index b7e9e68f1bd..1173a401a89 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
@@ -19,7 +19,7 @@
 package org.apache.pulsar.broker.delayed.bucket;
 
 import static com.google.common.base.Preconditions.checkArgument;
-import com.google.protobuf.ByteString;
+import com.google.protobuf.UnsafeByteOperations;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -74,6 +74,8 @@ class MutableBucket extends Bucket implements AutoCloseable {
 
         List<SnapshotSegment> bucketSnapshotSegments = new ArrayList<>();
         List<SnapshotSegmentMetadata> segmentMetadataList = new ArrayList<>();
+        Map<Long, RoaringBitmap> immutableBucketBitMap = new HashMap<>();
+
         Map<Long, RoaringBitmap> bitMap = new HashMap<>();
         SnapshotSegment snapshotSegment = new SnapshotSegment();
         SnapshotSegmentMetadata.Builder segmentMetadataBuilder = SnapshotSegmentMetadata.newBuilder();
@@ -82,18 +84,20 @@ class MutableBucket extends Bucket implements AutoCloseable {
         long currentTimestampUpperLimit = 0;
         long currentFirstTimestamp = 0L;
         while (!delayedIndexQueue.isEmpty()) {
-            DelayedIndex delayedIndex = snapshotSegment.addIndexe();
-            delayedIndexQueue.popToObject(delayedIndex);
-
-            long timestamp = delayedIndex.getTimestamp();
+            final long timestamp = delayedIndexQueue.peekTimestamp();
             if (currentTimestampUpperLimit == 0) {
                 currentFirstTimestamp = timestamp;
                 firstScheduleTimestamps.add(currentFirstTimestamp);
                 currentTimestampUpperLimit = timestamp + timeStepPerBucketSnapshotSegment - 1;
             }
 
-            long ledgerId = delayedIndex.getLedgerId();
-            long entryId = delayedIndex.getEntryId();
+            DelayedIndex delayedIndex = snapshotSegment.addIndexe();
+            delayedIndexQueue.popToObject(delayedIndex);
+
+            final long ledgerId = delayedIndex.getLedgerId();
+            final long entryId = delayedIndex.getEntryId();
+
+            removeIndexBit(ledgerId, entryId);
 
             checkArgument(ledgerId >= startLedgerId && ledgerId <= endLedgerId);
 
@@ -102,10 +106,10 @@ class MutableBucket extends Bucket implements AutoCloseable {
                 sharedQueue.add(timestamp, ledgerId, entryId);
             }
 
-            numMessages++;
-
             bitMap.computeIfAbsent(ledgerId, k -> new RoaringBitmap()).add(entryId, entryId + 1);
 
+            numMessages++;
+
             if (delayedIndexQueue.isEmpty() || delayedIndexQueue.peekTimestamp() > currentTimestampUpperLimit
                     || (maxIndexesPerBucketSnapshotSegment != -1
                     && snapshotSegment.getIndexesCount() >= maxIndexesPerBucketSnapshotSegment)) {
@@ -119,9 +123,17 @@ class MutableBucket extends Bucket implements AutoCloseable {
                     final var lId = entry.getKey();
                     final var bm = entry.getValue();
                     bm.runOptimize();
-                    final var array = new byte[bm.serializedSizeInBytes()];
-                    bm.serialize(ByteBuffer.wrap(array));
-                    segmentMetadataBuilder.putDelayedIndexBitMap(lId, ByteString.copyFrom(array));
+                    ByteBuffer byteBuffer = ByteBuffer.allocate(bm.serializedSizeInBytes());
+                    bm.serialize(byteBuffer);
+                    byteBuffer.flip();
+                    segmentMetadataBuilder.putDelayedIndexBitMap(lId, UnsafeByteOperations.unsafeWrap(byteBuffer));
+                    immutableBucketBitMap.compute(lId, (__, bm0) -> {
+                        if (bm0 == null) {
+                            return bm;
+                        }
+                        bm0.or(bm);
+                        return bm0;
+                    });
                     iterator.remove();
                 }
 
@@ -133,6 +145,10 @@ class MutableBucket extends Bucket implements AutoCloseable {
             }
         }
 
+        // optimize bm
+        immutableBucketBitMap.values().forEach(RoaringBitmap::runOptimize);
+        this.delayedIndexBitMap.values().forEach(RoaringBitmap::runOptimize);
+
         SnapshotMetadata bucketSnapshotMetadata = SnapshotMetadata.newBuilder()
                 .addAllMetadataList(segmentMetadataList)
                 .build();
@@ -145,6 +161,7 @@ class MutableBucket extends Bucket implements AutoCloseable {
         bucket.setNumberBucketDelayedMessages(numMessages);
         bucket.setLastSegmentEntryId(lastSegmentEntryId);
         bucket.setFirstScheduleTimestamps(firstScheduleTimestamps);
+        bucket.setDelayedIndexBitMap(immutableBucketBitMap);
 
         // Skip first segment, because it has already been loaded
         List<SnapshotSegment> snapshotSegments = bucketSnapshotSegments.subList(1, bucketSnapshotSegments.size());