You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by cb...@apache.org on 2023/04/24 09:59:06 UTC

[pulsar] branch branch-3.0 updated (89e4d46f4a5 -> 254f09b70d8)

This is an automated email from the ASF dual-hosted git repository.

cbornet pushed a change to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


    from 89e4d46f4a5 [fix] [broker] Make `LeastResourceUsageWithWeight` thread safe (#20159)
     new 49539c20bea Revert "[improve][broker] Cache LedgerHandle in BookkeeperBucketSnapshotStorage (#20117)"
     new f3572574b13 Revert "[improve][broker] Optimize delayed metadata index bitmap (#20136)"
     new 610c5f15143 Revert "[improve][broker] Make timer execute immediately after load index (#20126)"
     new 254f09b70d8 Revert "[improve] [broker] Close temporary open ledger in BookkeeperBucketSnapshotStorage (#20111)"

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../bucket/BookkeeperBucketSnapshotStorage.java    | 33 +++-----------
 .../bucket/BucketDelayedDeliveryTracker.java       | 11 ++---
 .../broker/delayed/bucket/ImmutableBucket.java     | 53 +++++++++-------------
 .../broker/delayed/bucket/MutableBucket.java       | 11 ++---
 .../BookkeeperBucketSnapshotStorageTest.java       | 43 ------------------
 5 files changed, 36 insertions(+), 115 deletions(-)


[pulsar] 01/04: Revert "[improve][broker] Cache LedgerHandle in BookkeeperBucketSnapshotStorage (#20117)"

Posted by cb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cbornet pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 49539c20bea1d8b10b8465be5ee632c1c4bdc7ce
Author: Christophe Bornet <cb...@hotmail.com>
AuthorDate: Mon Apr 24 11:46:27 2023 +0200

    Revert "[improve][broker] Cache LedgerHandle in BookkeeperBucketSnapshotStorage (#20117)"
    
    This reverts commit e1d63990644700bf61b3d7af1ef6d4d62145c2bb.
---
 .../bucket/BookkeeperBucketSnapshotStorage.java    | 52 +++++++++++-----------
 .../BookkeeperBucketSnapshotStorageTest.java       | 43 ------------------
 2 files changed, 26 insertions(+), 69 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
index 18a4c322f7b..9c30ccf1c0b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
@@ -26,7 +26,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
 import javax.validation.constraints.NotNull;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.BKException;
@@ -49,8 +48,6 @@ public class BookkeeperBucketSnapshotStorage implements BucketSnapshotStorage {
     private final ServiceConfiguration config;
     private BookKeeper bookKeeper;
 
-    private final Map<Long, CompletableFuture<LedgerHandle>> ledgerHandleFutureCache = new ConcurrentHashMap<>();
-
     public BookkeeperBucketSnapshotStorage(PulsarService pulsar) {
         this.pulsar = pulsar;
         this.config = pulsar.getConfig();
@@ -69,30 +66,45 @@ public class BookkeeperBucketSnapshotStorage implements BucketSnapshotStorage {
 
     @Override
     public CompletableFuture<SnapshotMetadata> getBucketSnapshotMetadata(long bucketId) {
-        return getLedgerHandle(bucketId).thenCompose(ledgerHandle -> getLedgerEntry(ledgerHandle, 0, 0)
-                .thenApply(entryEnumeration -> parseSnapshotMetadataEntry(entryEnumeration.nextElement())));
+        return openLedger(bucketId).thenCompose(ledgerHandle -> {
+            CompletableFuture<SnapshotMetadata> snapshotFuture =
+                    getLedgerEntry(ledgerHandle, 0, 0)
+                            .thenApply(entryEnumeration -> parseSnapshotMetadataEntry(entryEnumeration.nextElement()));
+
+            snapshotFuture.whenComplete((__, e) -> closeLedger(ledgerHandle));
+
+            return snapshotFuture;
+        });
     }
 
     @Override
     public CompletableFuture<List<SnapshotSegment>> getBucketSnapshotSegment(long bucketId, long firstSegmentEntryId,
                                                                              long lastSegmentEntryId) {
-        return getLedgerHandle(bucketId).thenCompose(
-                ledgerHandle -> getLedgerEntry(ledgerHandle, firstSegmentEntryId, lastSegmentEntryId)
-                        .thenApply(this::parseSnapshotSegmentEntries));
+        return openLedger(bucketId).thenCompose(ledgerHandle -> {
+            CompletableFuture<List<SnapshotSegment>> parseFuture =
+                    getLedgerEntry(ledgerHandle, firstSegmentEntryId, lastSegmentEntryId)
+                            .thenApply(this::parseSnapshotSegmentEntries);
+
+            parseFuture.whenComplete((__, e) -> closeLedger(ledgerHandle));
+
+            return parseFuture;
+        });
     }
 
     @Override
     public CompletableFuture<Long> getBucketSnapshotLength(long bucketId) {
-        return getLedgerHandle(bucketId).thenCompose(
-                ledgerHandle -> CompletableFuture.completedFuture(ledgerHandle.getLength()));
+        return openLedger(bucketId).thenCompose(ledgerHandle -> {
+            CompletableFuture<Long> lengthFuture =
+                    CompletableFuture.completedFuture(ledgerHandle.getLength());
+
+            lengthFuture.whenComplete((__, e) -> closeLedger(ledgerHandle));
+
+            return lengthFuture;
+        });
     }
 
     @Override
     public CompletableFuture<Void> deleteBucketSnapshot(long bucketId) {
-        CompletableFuture<LedgerHandle> ledgerHandleFuture = ledgerHandleFutureCache.remove(bucketId);
-        if (ledgerHandleFuture != null) {
-            ledgerHandleFuture.whenComplete((lh, ex) -> closeLedger(lh));
-        }
         return deleteLedger(bucketId);
     }
 
@@ -166,18 +178,6 @@ public class BookkeeperBucketSnapshotStorage implements BucketSnapshotStorage {
         return future;
     }
 
-    private CompletableFuture<LedgerHandle> getLedgerHandle(Long ledgerId) {
-        CompletableFuture<LedgerHandle> ledgerHandleCompletableFuture =
-                ledgerHandleFutureCache.computeIfAbsent(ledgerId, k -> openLedger(ledgerId));
-        // remove future of completed exceptionally
-        ledgerHandleCompletableFuture.whenComplete((__, ex) -> {
-            if (ex != null) {
-                ledgerHandleFutureCache.remove(ledgerId, ledgerHandleCompletableFuture);
-            }
-        });
-        return ledgerHandleCompletableFuture;
-    }
-
     private CompletableFuture<LedgerHandle> openLedger(Long ledgerId) {
         final CompletableFuture<LedgerHandle> future = new CompletableFuture<>();
         bookKeeper.asyncOpenLedger(
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BookkeeperBucketSnapshotStorageTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BookkeeperBucketSnapshotStorageTest.java
index 7cb6b8d5865..a628b58e10d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BookkeeperBucketSnapshotStorageTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BookkeeperBucketSnapshotStorageTest.java
@@ -30,7 +30,6 @@ import java.util.concurrent.ExecutionException;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.delayed.bucket.BookkeeperBucketSnapshotStorage;
 import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat;
-import org.apache.pulsar.common.util.FutureUtil;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -205,46 +204,4 @@ public class BookkeeperBucketSnapshotStorageTest extends MockedPulsarServiceBase
         Assert.assertTrue(bucketSnapshotLength > 0L);
     }
 
-    @Test
-    public void testConcurrencyGet() throws ExecutionException, InterruptedException {
-        DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata segmentMetadata =
-                DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata.newBuilder()
-                        .setMinScheduleTimestamp(System.currentTimeMillis())
-                        .setMaxScheduleTimestamp(System.currentTimeMillis())
-                        .putDelayedIndexBitMap(100L, ByteString.copyFrom(new byte[1])).build();
-
-        DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata snapshotMetadata =
-                DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata.newBuilder()
-                        .addMetadataList(segmentMetadata)
-                        .build();
-        List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment> bucketSnapshotSegments = new ArrayList<>();
-
-        long timeMillis = System.currentTimeMillis();
-        DelayedMessageIndexBucketSnapshotFormat.DelayedIndex delayedIndex =
-                DelayedMessageIndexBucketSnapshotFormat.DelayedIndex.newBuilder().setLedgerId(100L).setEntryId(10L)
-                        .setTimestamp(timeMillis).build();
-        DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment snapshotSegment =
-                DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment.newBuilder().addIndexes(delayedIndex).build();
-        bucketSnapshotSegments.add(snapshotSegment);
-        bucketSnapshotSegments.add(snapshotSegment);
-
-        CompletableFuture<Long> future =
-                bucketSnapshotStorage.createBucketSnapshot(snapshotMetadata,
-                        bucketSnapshotSegments, UUID.randomUUID().toString(), TOPIC_NAME, CURSOR_NAME);
-        Long bucketId = future.get();
-        Assert.assertNotNull(bucketId);
-
-        List<CompletableFuture<Void>> futures = new ArrayList<>();
-        for (int i = 0; i < 100; i++) {
-            CompletableFuture<Void> future0 = CompletableFuture.runAsync(() -> {
-                List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment> list =
-                        bucketSnapshotStorage.getBucketSnapshotSegment(bucketId, 1, 3).join();
-                Assert.assertTrue(list.size() > 0);
-            });
-            futures.add(future0);
-        }
-
-        FutureUtil.waitForAll(futures).join();
-    }
-
 }


[pulsar] 03/04: Revert "[improve][broker] Make timer execute immediately after load index (#20126)"

Posted by cb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cbornet pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 610c5f151436557d51b56e06a97ba422553592d7
Author: Christophe Bornet <cb...@hotmail.com>
AuthorDate: Mon Apr 24 11:49:13 2023 +0200

    Revert "[improve][broker] Make timer execute immediately after load index (#20126)"
    
    This reverts commit 49480ea558e647169e8df01bfd2e871a5386e19e.
---
 .../broker/delayed/bucket/BucketDelayedDeliveryTracker.java       | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
index b4d1745e22f..f57248acbb7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
@@ -541,7 +541,7 @@ public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker
 
     @Override
     public synchronized NavigableSet<PositionImpl> getScheduledMessages(int maxMessages) {
-        if (!checkPendingLoadDone()) {
+        if (!checkPendingOpDone()) {
             if (log.isDebugEnabled()) {
                 log.debug("[{}] Skip getScheduledMessages to wait for bucket snapshot load finish.",
                         dispatcher.getName());
@@ -628,11 +628,11 @@ public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker
                         if (timeout != null) {
                             timeout.cancel();
                         }
-                        timeout = timer.newTimeout(this, 0, TimeUnit.MILLISECONDS);
+                        timeout = timer.newTimeout(this, tickTimeMillis, TimeUnit.MILLISECONDS);
                     }
                 });
 
-                if (!checkPendingLoadDone() || loadFuture.isCompletedExceptionally()) {
+                if (!checkPendingOpDone() || loadFuture.isCompletedExceptionally()) {
                     break;
                 }
             }
@@ -651,7 +651,7 @@ public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker
         return positions;
     }
 
-    private synchronized boolean checkPendingLoadDone() {
+    private synchronized boolean checkPendingOpDone() {
         if (pendingLoad == null || pendingLoad.isDone()) {
             pendingLoad = null;
             return true;


[pulsar] 02/04: Revert "[improve][broker] Optimize delayed metadata index bitmap (#20136)"

Posted by cb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cbornet pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit f3572574b132dfb6d5a67011f7fb68096e69e1c7
Author: Christophe Bornet <cb...@hotmail.com>
AuthorDate: Mon Apr 24 11:48:03 2023 +0200

    Revert "[improve][broker] Optimize delayed metadata index bitmap (#20136)"
    
    This reverts commit ff59240165c73a9c3a3dcca20702ab44b0b18d33.
---
 .../bucket/BucketDelayedDeliveryTracker.java       |  3 --
 .../broker/delayed/bucket/ImmutableBucket.java     | 53 +++++++++-------------
 .../broker/delayed/bucket/MutableBucket.java       | 11 ++---
 3 files changed, 25 insertions(+), 42 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
index b17387e276e..b4d1745e22f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
@@ -485,9 +485,6 @@ public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker
                                     });
                                 });
                             }
-
-                            // optimize bm
-                            delayedIndexBitMap.values().forEach(RoaringBitmap::runOptimize);
                             immutableBucketDelayedIndexPair.getLeft().setDelayedIndexBitMap(delayedIndexBitMap);
 
                             afterCreateImmutableBucket(immutableBucketDelayedIndexPair, createStartTime);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
index 57de5c84fcd..82e98cefa5d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
@@ -20,10 +20,11 @@ package org.apache.pulsar.broker.delayed.bucket;
 
 import static org.apache.bookkeeper.mledger.util.Futures.executeWithRetry;
 import static org.apache.pulsar.broker.delayed.bucket.BucketDelayedDeliveryTracker.NULL_LONG_PROMISE;
-import java.io.IOException;
+import com.google.protobuf.ByteString;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Supplier;
@@ -36,8 +37,8 @@ import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotF
 import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.DelayedIndex;
 import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata;
 import org.apache.pulsar.common.util.FutureUtil;
-import org.roaringbitmap.InvalidRoaringFormat;
 import org.roaringbitmap.RoaringBitmap;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
 
 @Slf4j
 class ImmutableBucket extends Bucket {
@@ -97,7 +98,7 @@ class ImmutableBucket extends Bucket {
                         this.setLastSegmentEntryId(metadataList.size());
                         this.recoverDelayedIndexBitMapAndNumber(nextSnapshotEntryIndex, metadataList);
                         List<Long> firstScheduleTimestamps = metadataList.stream().map(
-                                SnapshotSegmentMetadata::getMinScheduleTimestamp).toList();
+                                        SnapshotSegmentMetadata::getMinScheduleTimestamp).toList();
                         this.setFirstScheduleTimestamps(firstScheduleTimestamps);
 
                         return nextSnapshotEntryIndex + 1;
@@ -138,37 +139,25 @@ class ImmutableBucket extends Bucket {
         });
     }
 
-    /**
-     * Recover delayed index bit map and message numbers.
-     * @throws InvalidRoaringFormat invalid bitmap serialization format
-     */
     private void recoverDelayedIndexBitMapAndNumber(int startSnapshotIndex,
-                                                    List<SnapshotSegmentMetadata> segmentMetaList) {
-        delayedIndexBitMap.clear(); // cleanup dirty bm
-        final var numberMessages = new MutableLong(0);
-        for (int i = startSnapshotIndex; i < segmentMetaList.size(); i++) {
-            for (final var entry : segmentMetaList.get(i).getDelayedIndexBitMapMap().entrySet()) {
-                final var ledgerId = entry.getKey();
-                final var bs = entry.getValue();
-                final var sbm = new RoaringBitmap();
-                try {
-                    sbm.deserialize(bs.asReadOnlyByteBuffer());
-                } catch (IOException e) {
-                    throw new InvalidRoaringFormat(e.getMessage());
+                                                    List<SnapshotSegmentMetadata> segmentMetadata) {
+        this.delayedIndexBitMap.clear();
+        MutableLong numberMessages = new MutableLong(0);
+        for (int i = startSnapshotIndex; i < segmentMetadata.size(); i++) {
+            Map<Long, ByteString> bitByteStringMap = segmentMetadata.get(i).getDelayedIndexBitMapMap();
+            bitByteStringMap.forEach((leaderId, bitSetString) -> {
+                boolean exist = this.delayedIndexBitMap.containsKey(leaderId);
+                RoaringBitmap bitSet =
+                        new ImmutableRoaringBitmap(bitSetString.asReadOnlyByteBuffer()).toRoaringBitmap();
+                numberMessages.add(bitSet.getCardinality());
+                if (!exist) {
+                    this.delayedIndexBitMap.put(leaderId, bitSet);
+                } else {
+                    this.delayedIndexBitMap.get(leaderId).or(bitSet);
                 }
-                numberMessages.add(sbm.getCardinality());
-                delayedIndexBitMap.compute(ledgerId, (lId, bm) -> {
-                    if (bm == null) {
-                        return sbm;
-                    }
-                    bm.or(sbm);
-                    return bm;
-                });
-            }
+            });
         }
-        // optimize bm
-        delayedIndexBitMap.values().forEach(RoaringBitmap::runOptimize);
-        setNumberBucketDelayedMessages(numberMessages.getValue());
+        this.setNumberBucketDelayedMessages(numberMessages.getValue());
     }
 
     CompletableFuture<List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment>> getRemainSnapshotSegment() {
@@ -204,7 +193,7 @@ class ImmutableBucket extends Bucket {
                         stats.recordFailEvent(BucketDelayedMessageIndexStats.Type.delete);
                     } else {
                         log.info("[{}] Delete bucket snapshot finish, bucketId: {}, bucketKey: {}",
-                                dispatcherName, bucketId, bucketKey);
+                                 dispatcherName, bucketId, bucketKey);
 
                         stats.recordSuccessEvent(BucketDelayedMessageIndexStats.Type.delete,
                                 System.currentTimeMillis() - deleteStartTime);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
index f404d5d02c1..e49ebe9606e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
@@ -116,13 +116,10 @@ class MutableBucket extends Bucket implements AutoCloseable {
 
                 Iterator<Map.Entry<Long, RoaringBitmap>> iterator = bitMap.entrySet().iterator();
                 while (iterator.hasNext()) {
-                    final var entry = iterator.next();
-                    final var lId = entry.getKey();
-                    final var bm = entry.getValue();
-                    bm.runOptimize();
-                    final var array = new byte[bm.serializedSizeInBytes()];
-                    bm.serialize(ByteBuffer.wrap(array));
-                    segmentMetadataBuilder.putDelayedIndexBitMap(lId, ByteString.copyFrom(array));
+                    Map.Entry<Long, RoaringBitmap> entry = iterator.next();
+                    byte[] array = new byte[entry.getValue().serializedSizeInBytes()];
+                    entry.getValue().serialize(ByteBuffer.wrap(array));
+                    segmentMetadataBuilder.putDelayedIndexBitMap(entry.getKey(), ByteString.copyFrom(array));
                     iterator.remove();
                 }
 


[pulsar] 04/04: Revert "[improve] [broker] Close temporary open ledger in BookkeeperBucketSnapshotStorage (#20111)"

Posted by cb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cbornet pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 254f09b70d80640095f2fe6d12ed79f046fb3241
Author: Christophe Bornet <cb...@hotmail.com>
AuthorDate: Mon Apr 24 11:50:50 2023 +0200

    Revert "[improve] [broker] Close temporary open ledger in BookkeeperBucketSnapshotStorage (#20111)"
    
    This reverts commit 1d1a3ef864a65c995ceda4b7875ed934c2574298.
---
 .../bucket/BookkeeperBucketSnapshotStorage.java    | 33 +++++-----------------
 1 file changed, 7 insertions(+), 26 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
index 9c30ccf1c0b..e7d4f9301dd 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
@@ -66,41 +66,22 @@ public class BookkeeperBucketSnapshotStorage implements BucketSnapshotStorage {
 
     @Override
     public CompletableFuture<SnapshotMetadata> getBucketSnapshotMetadata(long bucketId) {
-        return openLedger(bucketId).thenCompose(ledgerHandle -> {
-            CompletableFuture<SnapshotMetadata> snapshotFuture =
-                    getLedgerEntry(ledgerHandle, 0, 0)
-                            .thenApply(entryEnumeration -> parseSnapshotMetadataEntry(entryEnumeration.nextElement()));
-
-            snapshotFuture.whenComplete((__, e) -> closeLedger(ledgerHandle));
-
-            return snapshotFuture;
-        });
+        return openLedger(bucketId).thenCompose(
+                ledgerHandle -> getLedgerEntry(ledgerHandle, 0, 0).
+                        thenApply(entryEnumeration -> parseSnapshotMetadataEntry(entryEnumeration.nextElement())));
     }
 
     @Override
     public CompletableFuture<List<SnapshotSegment>> getBucketSnapshotSegment(long bucketId, long firstSegmentEntryId,
                                                                              long lastSegmentEntryId) {
-        return openLedger(bucketId).thenCompose(ledgerHandle -> {
-            CompletableFuture<List<SnapshotSegment>> parseFuture =
-                    getLedgerEntry(ledgerHandle, firstSegmentEntryId, lastSegmentEntryId)
-                            .thenApply(this::parseSnapshotSegmentEntries);
-
-            parseFuture.whenComplete((__, e) -> closeLedger(ledgerHandle));
-
-            return parseFuture;
-        });
+        return openLedger(bucketId).thenCompose(
+                ledgerHandle -> getLedgerEntry(ledgerHandle, firstSegmentEntryId,
+                        lastSegmentEntryId).thenApply(this::parseSnapshotSegmentEntries));
     }
 
     @Override
     public CompletableFuture<Long> getBucketSnapshotLength(long bucketId) {
-        return openLedger(bucketId).thenCompose(ledgerHandle -> {
-            CompletableFuture<Long> lengthFuture =
-                    CompletableFuture.completedFuture(ledgerHandle.getLength());
-
-            lengthFuture.whenComplete((__, e) -> closeLedger(ledgerHandle));
-
-            return lengthFuture;
-        });
+        return openLedger(bucketId).thenApply(LedgerHandle::getLength);
     }
 
     @Override