You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by cb...@apache.org on 2023/04/24 09:59:07 UTC
[pulsar] 01/04: Revert "[improve][broker] Cache LedgerHandle in BookkeeperBucketSnapshotStorage (#20117)"
This is an automated email from the ASF dual-hosted git repository.
cbornet pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 49539c20bea1d8b10b8465be5ee632c1c4bdc7ce
Author: Christophe Bornet <cb...@hotmail.com>
AuthorDate: Mon Apr 24 11:46:27 2023 +0200
Revert "[improve][broker] Cache LedgerHandle in BookkeeperBucketSnapshotStorage (#20117)"
This reverts commit e1d63990644700bf61b3d7af1ef6d4d62145c2bb.
---
.../bucket/BookkeeperBucketSnapshotStorage.java | 52 +++++++++++-----------
.../BookkeeperBucketSnapshotStorageTest.java | 43 ------------------
2 files changed, 26 insertions(+), 69 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
index 18a4c322f7b..9c30ccf1c0b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
@@ -26,7 +26,6 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
import javax.validation.constraints.NotNull;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BKException;
@@ -49,8 +48,6 @@ public class BookkeeperBucketSnapshotStorage implements BucketSnapshotStorage {
private final ServiceConfiguration config;
private BookKeeper bookKeeper;
- private final Map<Long, CompletableFuture<LedgerHandle>> ledgerHandleFutureCache = new ConcurrentHashMap<>();
-
public BookkeeperBucketSnapshotStorage(PulsarService pulsar) {
this.pulsar = pulsar;
this.config = pulsar.getConfig();
@@ -69,30 +66,45 @@ public class BookkeeperBucketSnapshotStorage implements BucketSnapshotStorage {
@Override
public CompletableFuture<SnapshotMetadata> getBucketSnapshotMetadata(long bucketId) {
- return getLedgerHandle(bucketId).thenCompose(ledgerHandle -> getLedgerEntry(ledgerHandle, 0, 0)
- .thenApply(entryEnumeration -> parseSnapshotMetadataEntry(entryEnumeration.nextElement())));
+ return openLedger(bucketId).thenCompose(ledgerHandle -> {
+ CompletableFuture<SnapshotMetadata> snapshotFuture =
+ getLedgerEntry(ledgerHandle, 0, 0)
+ .thenApply(entryEnumeration -> parseSnapshotMetadataEntry(entryEnumeration.nextElement()));
+
+ snapshotFuture.whenComplete((__, e) -> closeLedger(ledgerHandle));
+
+ return snapshotFuture;
+ });
}
@Override
public CompletableFuture<List<SnapshotSegment>> getBucketSnapshotSegment(long bucketId, long firstSegmentEntryId,
long lastSegmentEntryId) {
- return getLedgerHandle(bucketId).thenCompose(
- ledgerHandle -> getLedgerEntry(ledgerHandle, firstSegmentEntryId, lastSegmentEntryId)
- .thenApply(this::parseSnapshotSegmentEntries));
+ return openLedger(bucketId).thenCompose(ledgerHandle -> {
+ CompletableFuture<List<SnapshotSegment>> parseFuture =
+ getLedgerEntry(ledgerHandle, firstSegmentEntryId, lastSegmentEntryId)
+ .thenApply(this::parseSnapshotSegmentEntries);
+
+ parseFuture.whenComplete((__, e) -> closeLedger(ledgerHandle));
+
+ return parseFuture;
+ });
}
@Override
public CompletableFuture<Long> getBucketSnapshotLength(long bucketId) {
- return getLedgerHandle(bucketId).thenCompose(
- ledgerHandle -> CompletableFuture.completedFuture(ledgerHandle.getLength()));
+ return openLedger(bucketId).thenCompose(ledgerHandle -> {
+ CompletableFuture<Long> lengthFuture =
+ CompletableFuture.completedFuture(ledgerHandle.getLength());
+
+ lengthFuture.whenComplete((__, e) -> closeLedger(ledgerHandle));
+
+ return lengthFuture;
+ });
}
@Override
public CompletableFuture<Void> deleteBucketSnapshot(long bucketId) {
- CompletableFuture<LedgerHandle> ledgerHandleFuture = ledgerHandleFutureCache.remove(bucketId);
- if (ledgerHandleFuture != null) {
- ledgerHandleFuture.whenComplete((lh, ex) -> closeLedger(lh));
- }
return deleteLedger(bucketId);
}
@@ -166,18 +178,6 @@ public class BookkeeperBucketSnapshotStorage implements BucketSnapshotStorage {
return future;
}
- private CompletableFuture<LedgerHandle> getLedgerHandle(Long ledgerId) {
- CompletableFuture<LedgerHandle> ledgerHandleCompletableFuture =
- ledgerHandleFutureCache.computeIfAbsent(ledgerId, k -> openLedger(ledgerId));
- // remove future of completed exceptionally
- ledgerHandleCompletableFuture.whenComplete((__, ex) -> {
- if (ex != null) {
- ledgerHandleFutureCache.remove(ledgerId, ledgerHandleCompletableFuture);
- }
- });
- return ledgerHandleCompletableFuture;
- }
-
private CompletableFuture<LedgerHandle> openLedger(Long ledgerId) {
final CompletableFuture<LedgerHandle> future = new CompletableFuture<>();
bookKeeper.asyncOpenLedger(
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BookkeeperBucketSnapshotStorageTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BookkeeperBucketSnapshotStorageTest.java
index 7cb6b8d5865..a628b58e10d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BookkeeperBucketSnapshotStorageTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BookkeeperBucketSnapshotStorageTest.java
@@ -30,7 +30,6 @@ import java.util.concurrent.ExecutionException;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.delayed.bucket.BookkeeperBucketSnapshotStorage;
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat;
-import org.apache.pulsar.common.util.FutureUtil;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -205,46 +204,4 @@ public class BookkeeperBucketSnapshotStorageTest extends MockedPulsarServiceBase
Assert.assertTrue(bucketSnapshotLength > 0L);
}
- @Test
- public void testConcurrencyGet() throws ExecutionException, InterruptedException {
- DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata segmentMetadata =
- DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata.newBuilder()
- .setMinScheduleTimestamp(System.currentTimeMillis())
- .setMaxScheduleTimestamp(System.currentTimeMillis())
- .putDelayedIndexBitMap(100L, ByteString.copyFrom(new byte[1])).build();
-
- DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata snapshotMetadata =
- DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata.newBuilder()
- .addMetadataList(segmentMetadata)
- .build();
- List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment> bucketSnapshotSegments = new ArrayList<>();
-
- long timeMillis = System.currentTimeMillis();
- DelayedMessageIndexBucketSnapshotFormat.DelayedIndex delayedIndex =
- DelayedMessageIndexBucketSnapshotFormat.DelayedIndex.newBuilder().setLedgerId(100L).setEntryId(10L)
- .setTimestamp(timeMillis).build();
- DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment snapshotSegment =
- DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment.newBuilder().addIndexes(delayedIndex).build();
- bucketSnapshotSegments.add(snapshotSegment);
- bucketSnapshotSegments.add(snapshotSegment);
-
- CompletableFuture<Long> future =
- bucketSnapshotStorage.createBucketSnapshot(snapshotMetadata,
- bucketSnapshotSegments, UUID.randomUUID().toString(), TOPIC_NAME, CURSOR_NAME);
- Long bucketId = future.get();
- Assert.assertNotNull(bucketId);
-
- List<CompletableFuture<Void>> futures = new ArrayList<>();
- for (int i = 0; i < 100; i++) {
- CompletableFuture<Void> future0 = CompletableFuture.runAsync(() -> {
- List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment> list =
- bucketSnapshotStorage.getBucketSnapshotSegment(bucketId, 1, 3).join();
- Assert.assertTrue(list.size() > 0);
- });
- futures.add(future0);
- }
-
- FutureUtil.waitForAll(futures).join();
- }
-
}