You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by cb...@apache.org on 2023/04/24 09:59:07 UTC

[pulsar] 01/04: Revert "[improve][broker] Cache LedgerHandle in BookkeeperBucketSnapshotStorage (#20117)"

This is an automated email from the ASF dual-hosted git repository.

cbornet pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 49539c20bea1d8b10b8465be5ee632c1c4bdc7ce
Author: Christophe Bornet <cb...@hotmail.com>
AuthorDate: Mon Apr 24 11:46:27 2023 +0200

    Revert "[improve][broker] Cache LedgerHandle in BookkeeperBucketSnapshotStorage (#20117)"
    
    This reverts commit e1d63990644700bf61b3d7af1ef6d4d62145c2bb.
---
 .../bucket/BookkeeperBucketSnapshotStorage.java    | 52 +++++++++++-----------
 .../BookkeeperBucketSnapshotStorageTest.java       | 43 ------------------
 2 files changed, 26 insertions(+), 69 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
index 18a4c322f7b..9c30ccf1c0b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
@@ -26,7 +26,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
 import javax.validation.constraints.NotNull;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.BKException;
@@ -49,8 +48,6 @@ public class BookkeeperBucketSnapshotStorage implements BucketSnapshotStorage {
     private final ServiceConfiguration config;
     private BookKeeper bookKeeper;
 
-    private final Map<Long, CompletableFuture<LedgerHandle>> ledgerHandleFutureCache = new ConcurrentHashMap<>();
-
     public BookkeeperBucketSnapshotStorage(PulsarService pulsar) {
         this.pulsar = pulsar;
         this.config = pulsar.getConfig();
@@ -69,30 +66,45 @@ public class BookkeeperBucketSnapshotStorage implements BucketSnapshotStorage {
 
     @Override
     public CompletableFuture<SnapshotMetadata> getBucketSnapshotMetadata(long bucketId) {
-        return getLedgerHandle(bucketId).thenCompose(ledgerHandle -> getLedgerEntry(ledgerHandle, 0, 0)
-                .thenApply(entryEnumeration -> parseSnapshotMetadataEntry(entryEnumeration.nextElement())));
+        return openLedger(bucketId).thenCompose(ledgerHandle -> {
+            CompletableFuture<SnapshotMetadata> snapshotFuture =
+                    getLedgerEntry(ledgerHandle, 0, 0)
+                            .thenApply(entryEnumeration -> parseSnapshotMetadataEntry(entryEnumeration.nextElement()));
+
+            snapshotFuture.whenComplete((__, e) -> closeLedger(ledgerHandle));
+
+            return snapshotFuture;
+        });
     }
 
     @Override
     public CompletableFuture<List<SnapshotSegment>> getBucketSnapshotSegment(long bucketId, long firstSegmentEntryId,
                                                                              long lastSegmentEntryId) {
-        return getLedgerHandle(bucketId).thenCompose(
-                ledgerHandle -> getLedgerEntry(ledgerHandle, firstSegmentEntryId, lastSegmentEntryId)
-                        .thenApply(this::parseSnapshotSegmentEntries));
+        return openLedger(bucketId).thenCompose(ledgerHandle -> {
+            CompletableFuture<List<SnapshotSegment>> parseFuture =
+                    getLedgerEntry(ledgerHandle, firstSegmentEntryId, lastSegmentEntryId)
+                            .thenApply(this::parseSnapshotSegmentEntries);
+
+            parseFuture.whenComplete((__, e) -> closeLedger(ledgerHandle));
+
+            return parseFuture;
+        });
     }
 
     @Override
     public CompletableFuture<Long> getBucketSnapshotLength(long bucketId) {
-        return getLedgerHandle(bucketId).thenCompose(
-                ledgerHandle -> CompletableFuture.completedFuture(ledgerHandle.getLength()));
+        return openLedger(bucketId).thenCompose(ledgerHandle -> {
+            CompletableFuture<Long> lengthFuture =
+                    CompletableFuture.completedFuture(ledgerHandle.getLength());
+
+            lengthFuture.whenComplete((__, e) -> closeLedger(ledgerHandle));
+
+            return lengthFuture;
+        });
     }
 
     @Override
     public CompletableFuture<Void> deleteBucketSnapshot(long bucketId) {
-        CompletableFuture<LedgerHandle> ledgerHandleFuture = ledgerHandleFutureCache.remove(bucketId);
-        if (ledgerHandleFuture != null) {
-            ledgerHandleFuture.whenComplete((lh, ex) -> closeLedger(lh));
-        }
         return deleteLedger(bucketId);
     }
 
@@ -166,18 +178,6 @@ public class BookkeeperBucketSnapshotStorage implements BucketSnapshotStorage {
         return future;
     }
 
-    private CompletableFuture<LedgerHandle> getLedgerHandle(Long ledgerId) {
-        CompletableFuture<LedgerHandle> ledgerHandleCompletableFuture =
-                ledgerHandleFutureCache.computeIfAbsent(ledgerId, k -> openLedger(ledgerId));
-        // remove future of completed exceptionally
-        ledgerHandleCompletableFuture.whenComplete((__, ex) -> {
-            if (ex != null) {
-                ledgerHandleFutureCache.remove(ledgerId, ledgerHandleCompletableFuture);
-            }
-        });
-        return ledgerHandleCompletableFuture;
-    }
-
     private CompletableFuture<LedgerHandle> openLedger(Long ledgerId) {
         final CompletableFuture<LedgerHandle> future = new CompletableFuture<>();
         bookKeeper.asyncOpenLedger(
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BookkeeperBucketSnapshotStorageTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BookkeeperBucketSnapshotStorageTest.java
index 7cb6b8d5865..a628b58e10d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BookkeeperBucketSnapshotStorageTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BookkeeperBucketSnapshotStorageTest.java
@@ -30,7 +30,6 @@ import java.util.concurrent.ExecutionException;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.delayed.bucket.BookkeeperBucketSnapshotStorage;
 import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat;
-import org.apache.pulsar.common.util.FutureUtil;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -205,46 +204,4 @@ public class BookkeeperBucketSnapshotStorageTest extends MockedPulsarServiceBase
         Assert.assertTrue(bucketSnapshotLength > 0L);
     }
 
-    @Test
-    public void testConcurrencyGet() throws ExecutionException, InterruptedException {
-        DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata segmentMetadata =
-                DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata.newBuilder()
-                        .setMinScheduleTimestamp(System.currentTimeMillis())
-                        .setMaxScheduleTimestamp(System.currentTimeMillis())
-                        .putDelayedIndexBitMap(100L, ByteString.copyFrom(new byte[1])).build();
-
-        DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata snapshotMetadata =
-                DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata.newBuilder()
-                        .addMetadataList(segmentMetadata)
-                        .build();
-        List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment> bucketSnapshotSegments = new ArrayList<>();
-
-        long timeMillis = System.currentTimeMillis();
-        DelayedMessageIndexBucketSnapshotFormat.DelayedIndex delayedIndex =
-                DelayedMessageIndexBucketSnapshotFormat.DelayedIndex.newBuilder().setLedgerId(100L).setEntryId(10L)
-                        .setTimestamp(timeMillis).build();
-        DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment snapshotSegment =
-                DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment.newBuilder().addIndexes(delayedIndex).build();
-        bucketSnapshotSegments.add(snapshotSegment);
-        bucketSnapshotSegments.add(snapshotSegment);
-
-        CompletableFuture<Long> future =
-                bucketSnapshotStorage.createBucketSnapshot(snapshotMetadata,
-                        bucketSnapshotSegments, UUID.randomUUID().toString(), TOPIC_NAME, CURSOR_NAME);
-        Long bucketId = future.get();
-        Assert.assertNotNull(bucketId);
-
-        List<CompletableFuture<Void>> futures = new ArrayList<>();
-        for (int i = 0; i < 100; i++) {
-            CompletableFuture<Void> future0 = CompletableFuture.runAsync(() -> {
-                List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment> list =
-                        bucketSnapshotStorage.getBucketSnapshotSegment(bucketId, 1, 3).join();
-                Assert.assertTrue(list.size() > 0);
-            });
-            futures.add(future0);
-        }
-
-        FutureUtil.waitForAll(futures).join();
-    }
-
 }