You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/09/15 16:26:43 UTC

[GitHub] [pulsar] coderzc opened a new pull request, #17677: [feat][broker][PIP-195] Implement BookkeeperBucketSnapshotStorage - part3

coderzc opened a new pull request, #17677:
URL: https://github.com/apache/pulsar/pull/17677

   Master Issue: #16763 
   
   ### Motivation
   
   #16763 
   
   ### Modifications
   
   Implement BookkeeperBucketSnapshotStorage and add unit test.
   
   ### Verifying this change
   
   - [x] Make sure that the change passes the CI checks.
   
   ### Documentation
   
   <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
   
   - [ ] `doc-required` 
   (Your PR needs to update docs and you will update later)
   
   - [x] `doc-not-needed` 
   (Please explain why)
   
   - [ ] `doc` 
   (Your PR contains doc changes)
   
   - [ ] `doc-complete`
   (Docs have been already added)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #17677: [feat][broker][PIP-195] Implement BookkeeperBucketSnapshotStorage - part3

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #17677:
URL: https://github.com/apache/pulsar/pull/17677#discussion_r1051776722


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java:
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed.bucket;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import javax.validation.constraints.NotNull;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
+import org.apache.pulsar.common.util.FutureUtil;
+
+@Slf4j
+public class BookkeeperBucketSnapshotStorage implements BucketSnapshotStorage {
+
+    private static final byte[] LedgerPassword = "".getBytes();
+
+    private final PulsarService pulsar;
+    private final ServiceConfiguration config;
+    private BookKeeper bookKeeper;
+
+    public BookkeeperBucketSnapshotStorage(PulsarService pulsar) {
+        this.pulsar = pulsar;
+        this.config = pulsar.getConfig();
+    }
+
+    @Override
+    public CompletableFuture<Long> createBucketSnapshot(SnapshotMetadata snapshotMetadata,
+                                                        List<SnapshotSegment> bucketSnapshotSegments,
+                                                        String bucketKey) {
+        return createLedger(bucketKey)
+                .thenCompose(ledgerHandle -> addEntry(ledgerHandle, snapshotMetadata.toByteArray())

Review Comment:
   If the entry add failed, do we need to clean up the created Ledger?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] zymap commented on a diff in pull request #17677: [feat][broker][PIP-195] Implement BookkeeperBucketSnapshotStorage - part3

Posted by GitBox <gi...@apache.org>.
zymap commented on code in PR #17677:
URL: https://github.com/apache/pulsar/pull/17677#discussion_r1027584351


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java:
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed.bucket;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import javax.validation.constraints.NotNull;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
+import org.apache.pulsar.common.util.FutureUtil;
+
+@Slf4j
+public class BookkeeperBucketSnapshotStorage implements BucketSnapshotStorage {
+
+    private static final byte[] LedgerPassword = "".getBytes();
+
+    private final PulsarService pulsar;
+    private final ServiceConfiguration config;
+    private BookKeeper bookKeeper;
+
+    public BookkeeperBucketSnapshotStorage(PulsarService pulsar) {
+        this.pulsar = pulsar;
+        this.config = pulsar.getConfig();
+    }
+
+    @Override
+    public CompletableFuture<Long> createBucketSnapshot(SnapshotMetadata snapshotMetadata,
+                                                        List<SnapshotSegment> bucketSnapshotSegments,
+                                                        String bucketKey) {
+        return createLedger(bucketKey)
+                .thenCompose(ledgerHandle -> addEntry(ledgerHandle, snapshotMetadata.toByteArray())
+                        .thenCompose(__ -> addSnapshotSegments(ledgerHandle, bucketSnapshotSegments))
+                        .thenCompose(__ -> closeLedger(ledgerHandle))
+                        .thenApply(__ -> ledgerHandle.getId()));
+    }
+
+    @Override
+    public CompletableFuture<SnapshotMetadata> getBucketSnapshotMetadata(long bucketId) {
+        return openLedger(bucketId).thenCompose(
+                ledgerHandle -> getLedgerEntryThenCloseLedger(ledgerHandle, 0, 0).
+                        thenApply(entryEnumeration -> parseSnapshotMetadataEntry(entryEnumeration.nextElement())));
+    }
+
+    @Override
+    public CompletableFuture<List<SnapshotSegment>> getBucketSnapshotSegment(long bucketId, long firstSegmentEntryId,
+                                                                             long lastSegmentEntryId) {
+        return openLedger(bucketId).thenCompose(
+                ledgerHandle -> getLedgerEntryThenCloseLedger(ledgerHandle, firstSegmentEntryId,
+                        lastSegmentEntryId).thenApply(this::parseSnapshotSegmentEntries));
+    }
+
+    @Override
+    public CompletableFuture<Long> getBucketSnapshotLength(long bucketId) {
+        return openLedger(bucketId).thenApply(ledgerHandle -> {
+            long length = ledgerHandle.getLength();
+            closeLedger(ledgerHandle);
+            return length;
+        });
+    }
+
+    @Override
+    public CompletableFuture<Void> deleteBucketSnapshot(long bucketId) {
+        return deleteLedger(bucketId);
+    }
+
+    @Override
+    public void start() throws Exception {
+        pulsar.getBookKeeperClient();
+        this.bookKeeper = pulsar.getBookKeeperClientFactory().create(
+                pulsar.getConfiguration(),
+                pulsar.getLocalMetadataStore(),
+                pulsar.getIoEventLoopGroup(),
+                Optional.empty(),
+                null
+        );
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (bookKeeper != null) {
+            bookKeeper.close();
+        }
+    }
+
+    private CompletableFuture<Void> addSnapshotSegments(LedgerHandle ledgerHandle,
+                                                        List<SnapshotSegment> bucketSnapshotSegments) {
+        List<CompletableFuture<Void>> addFutures = new ArrayList<>();
+        for (SnapshotSegment bucketSnapshotSegment : bucketSnapshotSegments) {
+            addFutures.add(addEntry(ledgerHandle, bucketSnapshotSegment.toByteArray()));
+        }
+
+        return FutureUtil.waitForAll(addFutures);
+    }
+
+    private SnapshotMetadata parseSnapshotMetadataEntry(LedgerEntry ledgerEntry) {
+        try {
+            return SnapshotMetadata.parseFrom(ledgerEntry.getEntry());
+        } catch (InvalidProtocolBufferException e) {
+            throw new RuntimeException(e);

Review Comment:
   Why not declare a specified exception for the snapshot storage? The explicit exception would help us to decide how to handle them better in the next step.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #17677: [feat][broker][PIP-195] Implement BookkeeperBucketSnapshotStorage - part3

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #17677:
URL: https://github.com/apache/pulsar/pull/17677#discussion_r1020071318


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java:
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed.bucket;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import javax.validation.constraints.NotNull;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BookkeeperBucketSnapshotStorage implements BucketSnapshotStorage {
+
+    private static final Logger log = LoggerFactory.getLogger(BookkeeperBucketSnapshotStorage.class);
+    private static final byte[] LedgerPassword = "".getBytes();
+
+    private final PulsarService pulsar;
+    private final ServiceConfiguration config;
+    private BookKeeper bookKeeper;
+
+    public BookkeeperBucketSnapshotStorage(PulsarService pulsar) {
+        this.pulsar = pulsar;
+        this.config = pulsar.getConfig();
+    }
+
+    @Override
+    public CompletableFuture<Long> createBucketSnapshot(SnapshotMetadata snapshotMetadata,
+                                                        List<SnapshotSegment> bucketSnapshotSegments) {
+        return createLedger()
+                .thenCompose(ledgerHandle -> addEntry(ledgerHandle, snapshotMetadata.toByteArray())
+                                .thenApply(__ -> ledgerHandle))
+                .thenCompose(ledgerHandle -> addSnapshotSegments(ledgerHandle, bucketSnapshotSegments)
+                        .thenApply(__ -> ledgerHandle))
+                .thenCompose(ledgerHandle -> closeLedger(ledgerHandle)
+                        .thenApply(__ -> ledgerHandle.getId()));
+    }
+
+    @Override
+    public CompletableFuture<SnapshotMetadata> getBucketSnapshotMetadata(long bucketId) {
+        return openLedger(bucketId).thenCompose(

Review Comment:
   ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on a diff in pull request #17677: [feat][broker][PIP-195] Implement BookkeeperBucketSnapshotStorage - part3

Posted by GitBox <gi...@apache.org>.
coderzc commented on code in PR #17677:
URL: https://github.com/apache/pulsar/pull/17677#discussion_r1020062115


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java:
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed.bucket;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import javax.validation.constraints.NotNull;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BookkeeperBucketSnapshotStorage implements BucketSnapshotStorage {
+
+    private static final Logger log = LoggerFactory.getLogger(BookkeeperBucketSnapshotStorage.class);
+    private static final byte[] LedgerPassword = "".getBytes();
+
+    private final PulsarService pulsar;
+    private final ServiceConfiguration config;
+    private BookKeeper bookKeeper;
+
+    public BookkeeperBucketSnapshotStorage(PulsarService pulsar) {
+        this.pulsar = pulsar;
+        this.config = pulsar.getConfig();
+    }
+
+    @Override
+    public CompletableFuture<Long> createBucketSnapshot(SnapshotMetadata snapshotMetadata,
+                                                        List<SnapshotSegment> bucketSnapshotSegments) {
+        return createLedger()
+                .thenCompose(ledgerHandle -> addEntry(ledgerHandle, snapshotMetadata.toByteArray())
+                                .thenApply(__ -> ledgerHandle))
+                .thenCompose(ledgerHandle -> addSnapshotSegments(ledgerHandle, bucketSnapshotSegments)
+                        .thenApply(__ -> ledgerHandle))
+                .thenCompose(ledgerHandle -> closeLedger(ledgerHandle)
+                        .thenApply(__ -> ledgerHandle.getId()));
+    }
+
+    @Override
+    public CompletableFuture<SnapshotMetadata> getBucketSnapshotMetadata(long bucketId) {
+        return openLedger(bucketId).thenCompose(

Review Comment:
   `LedgerHandle` be closed in `getLedgerEntry`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #17677: [feat][broker][PIP-195] Implement BookkeeperBucketSnapshotStorage - part3

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #17677:
URL: https://github.com/apache/pulsar/pull/17677#discussion_r1025947870


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java:
##########
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed.bucket;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import javax.validation.constraints.NotNull;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BookkeeperBucketSnapshotStorage implements BucketSnapshotStorage {
+
+    private static final Logger log = LoggerFactory.getLogger(BookkeeperBucketSnapshotStorage.class);
+    private static final byte[] LedgerPassword = "".getBytes();
+
+    private final PulsarService pulsar;
+    private final ServiceConfiguration config;
+    private BookKeeper bookKeeper;
+
+    public BookkeeperBucketSnapshotStorage(PulsarService pulsar) {
+        this.pulsar = pulsar;
+        this.config = pulsar.getConfig();
+    }
+
+    @Override
+    public CompletableFuture<Long> createBucketSnapshot(SnapshotMetadata snapshotMetadata,
+                                                        List<SnapshotSegment> bucketSnapshotSegments) {
+        return createLedger()

Review Comment:
   I'm not sure if it would cause some problems when we created a lot of ledgers, but I think it would be OK for this 0-1 implementation.
   
   @codelipenghui  Could you please help confirm it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on a diff in pull request #17677: [feat][broker][PIP-195] Implement BookkeeperBucketSnapshotStorage - part3

Posted by GitBox <gi...@apache.org>.
coderzc commented on code in PR #17677:
URL: https://github.com/apache/pulsar/pull/17677#discussion_r1021055417


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java:
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed.bucket;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import javax.validation.constraints.NotNull;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BookkeeperBucketSnapshotStorage implements BucketSnapshotStorage {
+
+    private static final Logger log = LoggerFactory.getLogger(BookkeeperBucketSnapshotStorage.class);
+    private static final byte[] LedgerPassword = "".getBytes();
+
+    private final PulsarService pulsar;
+    private final ServiceConfiguration config;
+    private BookKeeper bookKeeper;
+
+    public BookkeeperBucketSnapshotStorage(PulsarService pulsar) {
+        this.pulsar = pulsar;
+        this.config = pulsar.getConfig();
+    }
+
+    @Override
+    public CompletableFuture<Long> createBucketSnapshot(SnapshotMetadata snapshotMetadata,
+                                                        List<SnapshotSegment> bucketSnapshotSegments) {
+        return createLedger()
+                .thenCompose(ledgerHandle -> addEntry(ledgerHandle, snapshotMetadata.toByteArray())
+                                .thenApply(__ -> ledgerHandle))
+                .thenCompose(ledgerHandle -> addSnapshotSegments(ledgerHandle, bucketSnapshotSegments)
+                        .thenApply(__ -> ledgerHandle))
+                .thenCompose(ledgerHandle -> closeLedger(ledgerHandle)
+                        .thenApply(__ -> ledgerHandle.getId()));
+    }
+
+    @Override
+    public CompletableFuture<SnapshotMetadata> getBucketSnapshotMetadata(long bucketId) {
+        return openLedger(bucketId).thenCompose(
+                ledgerHandle -> getLedgerEntry(ledgerHandle, 0, 0).
+                        thenCompose(
+                        entryEnumeration -> parseSnapshotMetadataEntry(entryEnumeration.nextElement())));
+    }
+
+    @Override
+    public CompletableFuture<List<SnapshotSegment>> getBucketSnapshotSegment(long bucketId, long firstSegmentEntryId,
+                                                                             long lastSegmentEntryId) {
+        return openLedger(bucketId).thenCompose(
+                ledgerHandle -> getLedgerEntry(ledgerHandle, firstSegmentEntryId, lastSegmentEntryId).thenCompose(
+                        this::parseSnapshotSegmentEntries));
+    }
+
+    @Override
+    public CompletableFuture<Long> getBucketSnapshotLength(long bucketId) {
+        return openLedger(bucketId).thenApply(ledgerHandle -> {
+            long length = ledgerHandle.getLength();
+            closeLedger(ledgerHandle);
+            return length;
+        });
+    }
+
+    @Override
+    public CompletableFuture<Void> deleteBucketSnapshot(long bucketId) {
+        return deleteLedger(bucketId);
+    }
+
+    @Override
+    public void start() throws Exception {
+        this.bookKeeper = pulsar.getBookKeeperClientFactory().create(

Review Comment:
   The`defaultBkClient` uses `statsLogger` to record operate stat of `managedLedger`, I don't think should use the `defaultBkClient`.
   https://github.com/apache/pulsar/blob/6fec66b12b04a37e4c2b05d78d4e33b380c270df/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java#L84-L89



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #17677: [feat][broker][PIP-195] Implement BookkeeperBucketSnapshotStorage - part3

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #17677:
URL: https://github.com/apache/pulsar/pull/17677#discussion_r1020079684


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java:
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed.bucket;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import javax.validation.constraints.NotNull;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BookkeeperBucketSnapshotStorage implements BucketSnapshotStorage {
+
+    private static final Logger log = LoggerFactory.getLogger(BookkeeperBucketSnapshotStorage.class);
+    private static final byte[] LedgerPassword = "".getBytes();
+
+    private final PulsarService pulsar;
+    private final ServiceConfiguration config;
+    private BookKeeper bookKeeper;
+
+    public BookkeeperBucketSnapshotStorage(PulsarService pulsar) {
+        this.pulsar = pulsar;
+        this.config = pulsar.getConfig();
+    }
+
+    @Override
+    public CompletableFuture<Long> createBucketSnapshot(SnapshotMetadata snapshotMetadata,
+                                                        List<SnapshotSegment> bucketSnapshotSegments) {
+        return createLedger()
+                .thenCompose(ledgerHandle -> addEntry(ledgerHandle, snapshotMetadata.toByteArray())
+                                .thenApply(__ -> ledgerHandle))
+                .thenCompose(ledgerHandle -> addSnapshotSegments(ledgerHandle, bucketSnapshotSegments)
+                        .thenApply(__ -> ledgerHandle))
+                .thenCompose(ledgerHandle -> closeLedger(ledgerHandle)
+                        .thenApply(__ -> ledgerHandle.getId()));
+    }
+
+    @Override
+    public CompletableFuture<SnapshotMetadata> getBucketSnapshotMetadata(long bucketId) {
+        return openLedger(bucketId).thenCompose(
+                ledgerHandle -> getLedgerEntry(ledgerHandle, 0, 0).
+                        thenCompose(
+                        entryEnumeration -> parseSnapshotMetadataEntry(entryEnumeration.nextElement())));
+    }
+
+    @Override
+    public CompletableFuture<List<SnapshotSegment>> getBucketSnapshotSegment(long bucketId, long firstSegmentEntryId,
+                                                                             long lastSegmentEntryId) {
+        return openLedger(bucketId).thenCompose(
+                ledgerHandle -> getLedgerEntry(ledgerHandle, firstSegmentEntryId, lastSegmentEntryId).thenCompose(
+                        this::parseSnapshotSegmentEntries));
+    }
+
+    @Override
+    public CompletableFuture<Long> getBucketSnapshotLength(long bucketId) {
+        return openLedger(bucketId).thenApply(ledgerHandle -> {
+            long length = ledgerHandle.getLength();
+            closeLedger(ledgerHandle);
+            return length;
+        });
+    }
+
+    @Override
+    public CompletableFuture<Void> deleteBucketSnapshot(long bucketId) {
+        return deleteLedger(bucketId);
+    }
+
+    @Override
+    public void start() throws Exception {
+        this.bookKeeper = pulsar.getBookKeeperClientFactory().create(
+                pulsar.getConfiguration(),
+                pulsar.getLocalMetadataStore(),
+                pulsar.getIoEventLoopGroup(),
+                Optional.empty(),
+                null
+        );
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (bookKeeper != null) {
+            bookKeeper.close();
+        }
+    }
+
+    private CompletableFuture<Void> addSnapshotSegments(LedgerHandle ledgerHandle,
+                                                        List<SnapshotSegment> bucketSnapshotSegments) {
+        List<CompletableFuture<Void>> addFutures = new ArrayList<>();
+        for (SnapshotSegment bucketSnapshotSegment : bucketSnapshotSegments) {
+            addFutures.add(addEntry(ledgerHandle, bucketSnapshotSegment.toByteArray()));
+        }
+
+        return FutureUtil.waitForAll(addFutures);
+    }
+
+    private CompletableFuture<SnapshotMetadata> parseSnapshotMetadataEntry(LedgerEntry ledgerEntry) {
+        CompletableFuture<SnapshotMetadata> result = new CompletableFuture<>();
+        try {
+            result.complete(SnapshotMetadata.parseFrom(ledgerEntry.getEntry()));
+        } catch (IOException e) {
+            result.completeExceptionally(e);
+        }
+        return result;
+    }
+
+    private CompletableFuture<List<SnapshotSegment>> parseSnapshotSegmentEntries(
+            Enumeration<LedgerEntry> entryEnumeration) {
+        CompletableFuture<List<SnapshotSegment>> result = new CompletableFuture<>();
+        List<SnapshotSegment> snapshotMetadataList = new ArrayList<>();
+        try {
+            while (entryEnumeration.hasMoreElements()) {
+                LedgerEntry ledgerEntry = entryEnumeration.nextElement();
+                snapshotMetadataList.add(SnapshotSegment.parseFrom(ledgerEntry.getEntry()));
+            }
+            result.complete(snapshotMetadataList);
+        } catch (IOException e) {
+            result.completeExceptionally(e);
+        }
+        return result;
+    }
+
+    @NotNull
+    private CompletableFuture<LedgerHandle> createLedger() {
+        CompletableFuture<LedgerHandle> future = new CompletableFuture<>();
+        bookKeeper.asyncCreateLedger(
+                config.getManagedLedgerDefaultEnsembleSize(),
+                config.getManagedLedgerDefaultWriteQuorum(),
+                config.getManagedLedgerDefaultAckQuorum(),
+                BookKeeper.DigestType.fromApiDigestType(config.getManagedLedgerDigestType()),
+                LedgerPassword,
+                (rc, handle, ctx) -> {
+                    if (rc != BKException.Code.OK) {
+                        future.completeExceptionally(bkException("Failed to create ledger", rc, -1));
+                    } else {
+                        future.complete(handle);
+                    }
+                }, null, null);
+        return future;
+    }
+
+    private CompletableFuture<LedgerHandle> openLedger(Long ledgerId) {
+        final CompletableFuture<LedgerHandle> future = new CompletableFuture<>();
+        bookKeeper.asyncOpenLedger(
+                ledgerId,
+                BookKeeper.DigestType.fromApiDigestType(config.getManagedLedgerDigestType()),
+                LedgerPassword,
+                (rc, handle, ctx) -> {
+                    if (rc != BKException.Code.OK) {
+                        future.completeExceptionally(bkException("Failed to open ledger", rc, ledgerId));
+                    } else {
+                        future.complete(handle);
+                    }
+                }, null
+        );
+        return future;
+    }
+
+    private CompletableFuture<Void> closeLedger(LedgerHandle ledgerHandle) {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        ledgerHandle.asyncClose((rc, handle, ctx) -> {
+            if (rc != BKException.Code.OK) {
+                log.warn("Failed to close a Ledger Handle: {}", ledgerHandle.getId());
+                future.completeExceptionally(bkException("Failed to close ledger", rc, ledgerHandle.getId()));
+            } else {
+                future.complete(null);
+            }
+        }, null);
+        return future;
+    }
+
+    private CompletableFuture<Void> addEntry(LedgerHandle ledgerHandle, byte[] data) {
+        final CompletableFuture<Void> future = new CompletableFuture<>();
+        ledgerHandle.asyncAddEntry(data,
+                (rc, handle, entryId, ctx) -> {
+                    if (rc != BKException.Code.OK) {
+                        future.completeExceptionally(bkException("Failed to add entry", rc, ledgerHandle.getId()));
+                    } else {
+                        future.complete(null);
+                    }
+                }, null
+        );
+        return future;
+    }
+
+    CompletableFuture<Enumeration<LedgerEntry>> getLedgerEntry(LedgerHandle ledger,

Review Comment:
   `addEntry` will not close the ledger, but `getLedgerEntry` will close the ledger, suggestion change this method name to `getLedgerEntryThenCloseLedger`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #17677: [feat][broker][PIP-195] Implement BookkeeperBucketSnapshotStorage - part3

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #17677:
URL: https://github.com/apache/pulsar/pull/17677#discussion_r1051933840


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java:
##########
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed.bucket;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import javax.validation.constraints.NotNull;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
+import org.apache.pulsar.common.util.FutureUtil;
+
+@Slf4j
+public class BookkeeperBucketSnapshotStorage implements BucketSnapshotStorage {
+
+    private static final byte[] LedgerPassword = "".getBytes();
+
+    private final PulsarService pulsar;
+    private final ServiceConfiguration config;
+    private BookKeeper bookKeeper;
+
+    public BookkeeperBucketSnapshotStorage(PulsarService pulsar) {
+        this.pulsar = pulsar;
+        this.config = pulsar.getConfig();
+    }
+
+    @Override
+    public CompletableFuture<Long> createBucketSnapshot(SnapshotMetadata snapshotMetadata,
+                                                        List<SnapshotSegment> bucketSnapshotSegments,
+                                                        String bucketKey) {
+        return createLedger(bucketKey)
+                .thenCompose(ledgerHandle -> addEntry(ledgerHandle, snapshotMetadata.toByteArray())
+                        .thenCompose(__ -> addSnapshotSegments(ledgerHandle, bucketSnapshotSegments))
+                        .thenCompose(__ -> closeLedger(ledgerHandle))
+                        .thenApply(__ -> ledgerHandle.getId()).whenComplete((ledgerId, ex) -> {
+                            if (ex != null) {
+                                deleteLedger(ledgerId);
+                            }
+                        }));
+    }
+
+    @Override
+    public CompletableFuture<SnapshotMetadata> getBucketSnapshotMetadata(long bucketId) {
+        return openLedger(bucketId).thenCompose(
+                ledgerHandle -> getLedgerEntryThenCloseLedger(ledgerHandle, 0, 0).
+                        thenApply(entryEnumeration -> parseSnapshotMetadataEntry(entryEnumeration.nextElement())));
+    }
+
+    @Override
+    public CompletableFuture<List<SnapshotSegment>> getBucketSnapshotSegment(long bucketId, long firstSegmentEntryId,
+                                                                             long lastSegmentEntryId) {
+        return openLedger(bucketId).thenCompose(
+                ledgerHandle -> getLedgerEntryThenCloseLedger(ledgerHandle, firstSegmentEntryId,
+                        lastSegmentEntryId).thenApply(this::parseSnapshotSegmentEntries));
+    }
+
+    @Override
+    public CompletableFuture<Long> getBucketSnapshotLength(long bucketId) {
+        return openLedger(bucketId).thenApply(ledgerHandle -> {
+            long length = ledgerHandle.getLength();
+            closeLedger(ledgerHandle);
+            return length;
+        });
+    }
+
+    @Override
+    public CompletableFuture<Void> deleteBucketSnapshot(long bucketId) {
+        return deleteLedger(bucketId);
+    }
+
+    @Override
+    public void start() throws Exception {
+        pulsar.getBookKeeperClient();

Review Comment:
   why call `pulsar.getBookKeeperClient();` here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #17677: [feat][broker][PIP-195] Implement BookkeeperBucketSnapshotStorage - part3

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #17677:
URL: https://github.com/apache/pulsar/pull/17677#discussion_r1025944991


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java:
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed.bucket;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import javax.validation.constraints.NotNull;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BookkeeperBucketSnapshotStorage implements BucketSnapshotStorage {
+
+    private static final Logger log = LoggerFactory.getLogger(BookkeeperBucketSnapshotStorage.class);
+    private static final byte[] LedgerPassword = "".getBytes();
+
+    private final PulsarService pulsar;
+    private final ServiceConfiguration config;
+    private BookKeeper bookKeeper;
+
+    public BookkeeperBucketSnapshotStorage(PulsarService pulsar) {
+        this.pulsar = pulsar;
+        this.config = pulsar.getConfig();
+    }
+
+    @Override
+    public CompletableFuture<Long> createBucketSnapshot(SnapshotMetadata snapshotMetadata,
+                                                        List<SnapshotSegment> bucketSnapshotSegments) {
+        return createLedger()
+                .thenCompose(ledgerHandle -> addEntry(ledgerHandle, snapshotMetadata.toByteArray())
+                                .thenApply(__ -> ledgerHandle))
+                .thenCompose(ledgerHandle -> addSnapshotSegments(ledgerHandle, bucketSnapshotSegments)
+                        .thenApply(__ -> ledgerHandle))
+                .thenCompose(ledgerHandle -> closeLedger(ledgerHandle)
+                        .thenApply(__ -> ledgerHandle.getId()));
+    }
+
+    @Override
+    public CompletableFuture<SnapshotMetadata> getBucketSnapshotMetadata(long bucketId) {
+        return openLedger(bucketId).thenCompose(
+                ledgerHandle -> getLedgerEntry(ledgerHandle, 0, 0).
+                        thenCompose(
+                        entryEnumeration -> parseSnapshotMetadataEntry(entryEnumeration.nextElement())));
+    }
+
+    @Override
+    public CompletableFuture<List<SnapshotSegment>> getBucketSnapshotSegment(long bucketId, long firstSegmentEntryId,
+                                                                             long lastSegmentEntryId) {
+        return openLedger(bucketId).thenCompose(
+                ledgerHandle -> getLedgerEntry(ledgerHandle, firstSegmentEntryId, lastSegmentEntryId).thenCompose(
+                        this::parseSnapshotSegmentEntries));
+    }
+
+    @Override
+    public CompletableFuture<Long> getBucketSnapshotLength(long bucketId) {
+        return openLedger(bucketId).thenApply(ledgerHandle -> {
+            long length = ledgerHandle.getLength();
+            closeLedger(ledgerHandle);
+            return length;
+        });
+    }
+
+    @Override
+    public CompletableFuture<Void> deleteBucketSnapshot(long bucketId) {
+        return deleteLedger(bucketId);
+    }
+
+    @Override
+    public void start() throws Exception {
+        this.bookKeeper = pulsar.getBookKeeperClientFactory().create(

Review Comment:
   @coderzc @codelipenghui BK client creates other threads, see: 
   
   https://github.com/apache/bookkeeper/blob/a3401a2139a554a8fbe52c8a398eeb4e51a8dad1/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java#L426-L435
   
   Maybe we can not fix the problem in this PR for now, and we can solve it all( `MangedLedger`, `Schema`,`Delay message` ) together by a new DISCUSS.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on a diff in pull request #17677: [feat][broker][PIP-195] Implement BookkeeperBucketSnapshotStorage - part3

Posted by GitBox <gi...@apache.org>.
coderzc commented on code in PR #17677:
URL: https://github.com/apache/pulsar/pull/17677#discussion_r1051956051


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java:
##########
@@ -61,7 +61,11 @@ public CompletableFuture<Long> createBucketSnapshot(SnapshotMetadata snapshotMet
                 .thenCompose(ledgerHandle -> addEntry(ledgerHandle, snapshotMetadata.toByteArray())
                         .thenCompose(__ -> addSnapshotSegments(ledgerHandle, bucketSnapshotSegments))
                         .thenCompose(__ -> closeLedger(ledgerHandle))
-                        .thenApply(__ -> ledgerHandle.getId()));
+                        .thenApply(__ -> ledgerHandle.getId()).whenComplete((ledgerId, ex) -> {

Review Comment:
   Good idea.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on a diff in pull request #17677: [feat][broker][PIP-195] Implement BookkeeperBucketSnapshotStorage - part3

Posted by GitBox <gi...@apache.org>.
coderzc commented on code in PR #17677:
URL: https://github.com/apache/pulsar/pull/17677#discussion_r1051863374


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java:
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed.bucket;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import javax.validation.constraints.NotNull;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
+import org.apache.pulsar.common.util.FutureUtil;
+
+@Slf4j
+public class BookkeeperBucketSnapshotStorage implements BucketSnapshotStorage {
+
+    private static final byte[] LedgerPassword = "".getBytes();
+
+    private final PulsarService pulsar;
+    private final ServiceConfiguration config;
+    private BookKeeper bookKeeper;
+
+    public BookkeeperBucketSnapshotStorage(PulsarService pulsar) {
+        this.pulsar = pulsar;
+        this.config = pulsar.getConfig();
+    }
+
+    @Override
+    public CompletableFuture<Long> createBucketSnapshot(SnapshotMetadata snapshotMetadata,
+                                                        List<SnapshotSegment> bucketSnapshotSegments,
+                                                        String bucketKey) {
+        return createLedger(bucketKey)
+                .thenCompose(ledgerHandle -> addEntry(ledgerHandle, snapshotMetadata.toByteArray())

Review Comment:
   I add clean up logical



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #17677: [feat][broker][PIP-195] Implement BookkeeperBucketSnapshotStorage - part3

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #17677:
URL: https://github.com/apache/pulsar/pull/17677#discussion_r1051928007


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java:
##########
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed.bucket;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import javax.validation.constraints.NotNull;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
+import org.apache.pulsar.common.util.FutureUtil;
+
+@Slf4j
+public class BookkeeperBucketSnapshotStorage implements BucketSnapshotStorage {
+
+    private static final byte[] LedgerPassword = "".getBytes();
+
+    private final PulsarService pulsar;
+    private final ServiceConfiguration config;
+    private BookKeeper bookKeeper;
+
+    public BookkeeperBucketSnapshotStorage(PulsarService pulsar) {
+        this.pulsar = pulsar;
+        this.config = pulsar.getConfig();
+    }
+
+    @Override
+    public CompletableFuture<Long> createBucketSnapshot(SnapshotMetadata snapshotMetadata,
+                                                        List<SnapshotSegment> bucketSnapshotSegments,
+                                                        String bucketKey) {
+        return createLedger(bucketKey)
+                .thenCompose(ledgerHandle -> addEntry(ledgerHandle, snapshotMetadata.toByteArray())
+                        .thenCompose(__ -> addSnapshotSegments(ledgerHandle, bucketSnapshotSegments))
+                        .thenCompose(__ -> closeLedger(ledgerHandle))
+                        .thenApply(__ -> ledgerHandle.getId()).whenComplete((ledgerId, ex) -> {
+                            if (ex != null) {
+                                deleteLedger(ledgerId);

Review Comment:
   If `ex!=null` then `ledgerId=null`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #17677: [feat][broker][PIP-195] Implement BookkeeperBucketSnapshotStorage - part3

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #17677:
URL: https://github.com/apache/pulsar/pull/17677#discussion_r1021267409


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java:
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed.bucket;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import javax.validation.constraints.NotNull;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BookkeeperBucketSnapshotStorage implements BucketSnapshotStorage {
+
+    private static final Logger log = LoggerFactory.getLogger(BookkeeperBucketSnapshotStorage.class);
+    private static final byte[] LedgerPassword = "".getBytes();
+
+    private final PulsarService pulsar;
+    private final ServiceConfiguration config;
+    private BookKeeper bookKeeper;
+
+    public BookkeeperBucketSnapshotStorage(PulsarService pulsar) {
+        this.pulsar = pulsar;
+        this.config = pulsar.getConfig();
+    }
+
+    @Override
+    public CompletableFuture<Long> createBucketSnapshot(SnapshotMetadata snapshotMetadata,
+                                                        List<SnapshotSegment> bucketSnapshotSegments) {
+        return createLedger()
+                .thenCompose(ledgerHandle -> addEntry(ledgerHandle, snapshotMetadata.toByteArray())
+                                .thenApply(__ -> ledgerHandle))
+                .thenCompose(ledgerHandle -> addSnapshotSegments(ledgerHandle, bucketSnapshotSegments)
+                        .thenApply(__ -> ledgerHandle))
+                .thenCompose(ledgerHandle -> closeLedger(ledgerHandle)
+                        .thenApply(__ -> ledgerHandle.getId()));
+    }
+
+    @Override
+    public CompletableFuture<SnapshotMetadata> getBucketSnapshotMetadata(long bucketId) {
+        return openLedger(bucketId).thenCompose(
+                ledgerHandle -> getLedgerEntry(ledgerHandle, 0, 0).
+                        thenCompose(
+                        entryEnumeration -> parseSnapshotMetadataEntry(entryEnumeration.nextElement())));
+    }
+
+    @Override
+    public CompletableFuture<List<SnapshotSegment>> getBucketSnapshotSegment(long bucketId, long firstSegmentEntryId,
+                                                                             long lastSegmentEntryId) {
+        return openLedger(bucketId).thenCompose(
+                ledgerHandle -> getLedgerEntry(ledgerHandle, firstSegmentEntryId, lastSegmentEntryId).thenCompose(
+                        this::parseSnapshotSegmentEntries));
+    }
+
+    @Override
+    public CompletableFuture<Long> getBucketSnapshotLength(long bucketId) {
+        return openLedger(bucketId).thenApply(ledgerHandle -> {
+            long length = ledgerHandle.getLength();
+            closeLedger(ledgerHandle);
+            return length;
+        });
+    }
+
+    @Override
+    public CompletableFuture<Void> deleteBucketSnapshot(long bucketId) {
+        return deleteLedger(bucketId);
+    }
+
+    @Override
+    public void start() throws Exception {
+        this.bookKeeper = pulsar.getBookKeeperClientFactory().create(

Review Comment:
   With this newly declared bk-client, there are now three bk-clients in pulsar, and I still think it is too expensive to use a single bk-client for the delay message bucket. Maybe a separate discussion should be held later to resolve this issue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] zymap commented on a diff in pull request #17677: [feat][broker][PIP-195] Implement BookkeeperBucketSnapshotStorage - part3

Posted by GitBox <gi...@apache.org>.
zymap commented on code in PR #17677:
URL: https://github.com/apache/pulsar/pull/17677#discussion_r1025965472


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java:
##########
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed.bucket;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import javax.validation.constraints.NotNull;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BookkeeperBucketSnapshotStorage implements BucketSnapshotStorage {
+
+    private static final Logger log = LoggerFactory.getLogger(BookkeeperBucketSnapshotStorage.class);
+    private static final byte[] LedgerPassword = "".getBytes();
+
+    private final PulsarService pulsar;
+    private final ServiceConfiguration config;
+    private BookKeeper bookKeeper;
+
+    public BookkeeperBucketSnapshotStorage(PulsarService pulsar) {
+        this.pulsar = pulsar;
+        this.config = pulsar.getConfig();
+    }
+
+    @Override
+    public CompletableFuture<Long> createBucketSnapshot(SnapshotMetadata snapshotMetadata,
+                                                        List<SnapshotSegment> bucketSnapshotSegments) {
+        return createLedger()
+                .thenCompose(ledgerHandle -> addEntry(ledgerHandle, snapshotMetadata.toByteArray())
+                                .thenApply(__ -> ledgerHandle))
+                .thenCompose(ledgerHandle -> addSnapshotSegments(ledgerHandle, bucketSnapshotSegments)
+                        .thenApply(__ -> ledgerHandle))
+                .thenCompose(ledgerHandle -> closeLedger(ledgerHandle)
+                        .thenApply(__ -> ledgerHandle.getId()));
+    }
+
+    @Override
+    public CompletableFuture<SnapshotMetadata> getBucketSnapshotMetadata(long bucketId) {
+        return openLedger(bucketId).thenCompose(
+                ledgerHandle -> getLedgerEntryThenCloseLedger(ledgerHandle, 0, 0).
+                        thenCompose(
+                        entryEnumeration -> parseSnapshotMetadataEntry(entryEnumeration.nextElement())));
+    }
+
+    @Override
+    public CompletableFuture<List<SnapshotSegment>> getBucketSnapshotSegment(long bucketId, long firstSegmentEntryId,
+                                                                             long lastSegmentEntryId) {
+        return openLedger(bucketId).thenCompose(
+                ledgerHandle -> getLedgerEntryThenCloseLedger(ledgerHandle, firstSegmentEntryId, lastSegmentEntryId).thenCompose(
+                        this::parseSnapshotSegmentEntries));
+    }
+
+    @Override
+    public CompletableFuture<Long> getBucketSnapshotLength(long bucketId) {
+        return openLedger(bucketId).thenApply(ledgerHandle -> {
+            long length = ledgerHandle.getLength();
+            closeLedger(ledgerHandle);
+            return length;
+        });
+    }
+
+    @Override
+    public CompletableFuture<Void> deleteBucketSnapshot(long bucketId) {
+        return deleteLedger(bucketId);
+    }
+
+    @Override
+    public void start() throws Exception {
+        pulsar.getBookKeeperClient();
+        this.bookKeeper = pulsar.getBookKeeperClientFactory().create(
+                pulsar.getConfiguration(),
+                pulsar.getLocalMetadataStore(),
+                pulsar.getIoEventLoopGroup(),
+                Optional.empty(),
+                null
+        );
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (bookKeeper != null) {
+            bookKeeper.close();
+        }
+    }
+
+    private CompletableFuture<Void> addSnapshotSegments(LedgerHandle ledgerHandle,
+                                                        List<SnapshotSegment> bucketSnapshotSegments) {
+        List<CompletableFuture<Void>> addFutures = new ArrayList<>();
+        for (SnapshotSegment bucketSnapshotSegment : bucketSnapshotSegments) {
+            addFutures.add(addEntry(ledgerHandle, bucketSnapshotSegment.toByteArray()));
+        }
+
+        return FutureUtil.waitForAll(addFutures);
+    }
+
+    private CompletableFuture<SnapshotMetadata> parseSnapshotMetadataEntry(LedgerEntry ledgerEntry) {
+        CompletableFuture<SnapshotMetadata> result = new CompletableFuture<>();
+        try {
+            result.complete(SnapshotMetadata.parseFrom(ledgerEntry.getEntry()));
+        } catch (IOException e) {
+            result.completeExceptionally(e);
+        }
+        return result;
+    }
+
+    private CompletableFuture<List<SnapshotSegment>> parseSnapshotSegmentEntries(
+            Enumeration<LedgerEntry> entryEnumeration) {
+        CompletableFuture<List<SnapshotSegment>> result = new CompletableFuture<>();
+        List<SnapshotSegment> snapshotMetadataList = new ArrayList<>();
+        try {
+            while (entryEnumeration.hasMoreElements()) {
+                LedgerEntry ledgerEntry = entryEnumeration.nextElement();
+                snapshotMetadataList.add(SnapshotSegment.parseFrom(ledgerEntry.getEntry()));
+            }
+            result.complete(snapshotMetadataList);
+        } catch (IOException e) {
+            result.completeExceptionally(e);
+        }
+        return result;
+    }
+
+    @NotNull
+    private CompletableFuture<LedgerHandle> createLedger() {
+        CompletableFuture<LedgerHandle> future = new CompletableFuture<>();
+        bookKeeper.asyncCreateLedger(
+                config.getManagedLedgerDefaultEnsembleSize(),
+                config.getManagedLedgerDefaultWriteQuorum(),
+                config.getManagedLedgerDefaultAckQuorum(),
+                BookKeeper.DigestType.fromApiDigestType(config.getManagedLedgerDigestType()),
+                LedgerPassword,
+                (rc, handle, ctx) -> {
+                    if (rc != BKException.Code.OK) {
+                        future.completeExceptionally(bkException("Failed to create ledger", rc, -1));
+                    } else {
+                        future.complete(handle);
+                    }
+                }, null, null);

Review Comment:
   Please add ledger metadata so that we can know which component is using this ledger through bookkeeper shell tools. 
   We have different components using ledgers, we need to add metadata on the ledger to know who is using it. You can check `LedgerMetadataUtils` to learn about how to add it. 



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java:
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed.bucket;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import javax.validation.constraints.NotNull;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BookkeeperBucketSnapshotStorage implements BucketSnapshotStorage {
+
+    private static final Logger log = LoggerFactory.getLogger(BookkeeperBucketSnapshotStorage.class);
+    private static final byte[] LedgerPassword = "".getBytes();
+
+    private final PulsarService pulsar;
+    private final ServiceConfiguration config;
+    private BookKeeper bookKeeper;
+
+    public BookkeeperBucketSnapshotStorage(PulsarService pulsar) {
+        this.pulsar = pulsar;
+        this.config = pulsar.getConfig();
+    }
+
+    @Override
+    public CompletableFuture<Long> createBucketSnapshot(SnapshotMetadata snapshotMetadata,
+                                                        List<SnapshotSegment> bucketSnapshotSegments) {
+        return createLedger()
+                .thenCompose(ledgerHandle -> addEntry(ledgerHandle, snapshotMetadata.toByteArray())
+                                .thenApply(__ -> ledgerHandle))
+                .thenCompose(ledgerHandle -> addSnapshotSegments(ledgerHandle, bucketSnapshotSegments)
+                        .thenApply(__ -> ledgerHandle))
+                .thenCompose(ledgerHandle -> closeLedger(ledgerHandle)
+                        .thenApply(__ -> ledgerHandle.getId()));
+    }
+
+    @Override
+    public CompletableFuture<SnapshotMetadata> getBucketSnapshotMetadata(long bucketId) {
+        return openLedger(bucketId).thenCompose(
+                ledgerHandle -> getLedgerEntry(ledgerHandle, 0, 0).
+                        thenCompose(
+                        entryEnumeration -> parseSnapshotMetadataEntry(entryEnumeration.nextElement())));
+    }
+
+    @Override
+    public CompletableFuture<List<SnapshotSegment>> getBucketSnapshotSegment(long bucketId, long firstSegmentEntryId,
+                                                                             long lastSegmentEntryId) {
+        return openLedger(bucketId).thenCompose(
+                ledgerHandle -> getLedgerEntry(ledgerHandle, firstSegmentEntryId, lastSegmentEntryId).thenCompose(
+                        this::parseSnapshotSegmentEntries));
+    }
+
+    @Override
+    public CompletableFuture<Long> getBucketSnapshotLength(long bucketId) {
+        return openLedger(bucketId).thenApply(ledgerHandle -> {
+            long length = ledgerHandle.getLength();
+            closeLedger(ledgerHandle);
+            return length;
+        });
+    }
+
+    @Override
+    public CompletableFuture<Void> deleteBucketSnapshot(long bucketId) {
+        return deleteLedger(bucketId);
+    }
+
+    @Override
+    public void start() throws Exception {
+        this.bookKeeper = pulsar.getBookKeeperClientFactory().create(

Review Comment:
   > I still think it is too expensive to use a single bk-client for the delay message
   I also want to know that.
   
   If you want to use bookkeeper storage, you need a client.  Maybe if we have more service needs for an internal bookie client, we can create only one internal bookie client and then share it with different components. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on pull request #17677: [feat][broker][PIP-195] Implement BookkeeperBucketSnapshotStorage - part3

Posted by GitBox <gi...@apache.org>.
coderzc commented on PR #17677:
URL: https://github.com/apache/pulsar/pull/17677#issuecomment-1319832244

   @poorbarcode @zymap @mattisonchao Please look at it again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] github-actions[bot] commented on pull request #17677: [feat][broker][PIP-195] Implement BookkeeperBucketSnapshotStorage - part3

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #17677:
URL: https://github.com/apache/pulsar/pull/17677#issuecomment-1288319140

   The pr had no activity for 30 days, mark with Stale label.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on a diff in pull request #17677: [feat][broker][PIP-195] Implement BookkeeperBucketSnapshotStorage - part3

Posted by GitBox <gi...@apache.org>.
coderzc commented on code in PR #17677:
URL: https://github.com/apache/pulsar/pull/17677#discussion_r1021055850


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java:
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed.bucket;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import javax.validation.constraints.NotNull;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BookkeeperBucketSnapshotStorage implements BucketSnapshotStorage {
+
+    private static final Logger log = LoggerFactory.getLogger(BookkeeperBucketSnapshotStorage.class);
+    private static final byte[] LedgerPassword = "".getBytes();
+
+    private final PulsarService pulsar;
+    private final ServiceConfiguration config;
+    private BookKeeper bookKeeper;
+
+    public BookkeeperBucketSnapshotStorage(PulsarService pulsar) {
+        this.pulsar = pulsar;
+        this.config = pulsar.getConfig();
+    }
+
+    @Override
+    public CompletableFuture<Long> createBucketSnapshot(SnapshotMetadata snapshotMetadata,
+                                                        List<SnapshotSegment> bucketSnapshotSegments) {
+        return createLedger()
+                .thenCompose(ledgerHandle -> addEntry(ledgerHandle, snapshotMetadata.toByteArray())
+                                .thenApply(__ -> ledgerHandle))
+                .thenCompose(ledgerHandle -> addSnapshotSegments(ledgerHandle, bucketSnapshotSegments)
+                        .thenApply(__ -> ledgerHandle))
+                .thenCompose(ledgerHandle -> closeLedger(ledgerHandle)
+                        .thenApply(__ -> ledgerHandle.getId()));
+    }
+
+    @Override
+    public CompletableFuture<SnapshotMetadata> getBucketSnapshotMetadata(long bucketId) {
+        return openLedger(bucketId).thenCompose(
+                ledgerHandle -> getLedgerEntry(ledgerHandle, 0, 0).
+                        thenCompose(
+                        entryEnumeration -> parseSnapshotMetadataEntry(entryEnumeration.nextElement())));
+    }
+
+    @Override
+    public CompletableFuture<List<SnapshotSegment>> getBucketSnapshotSegment(long bucketId, long firstSegmentEntryId,
+                                                                             long lastSegmentEntryId) {
+        return openLedger(bucketId).thenCompose(
+                ledgerHandle -> getLedgerEntry(ledgerHandle, firstSegmentEntryId, lastSegmentEntryId).thenCompose(
+                        this::parseSnapshotSegmentEntries));
+    }
+
+    @Override
+    public CompletableFuture<Long> getBucketSnapshotLength(long bucketId) {
+        return openLedger(bucketId).thenApply(ledgerHandle -> {
+            long length = ledgerHandle.getLength();
+            closeLedger(ledgerHandle);
+            return length;
+        });
+    }
+
+    @Override
+    public CompletableFuture<Void> deleteBucketSnapshot(long bucketId) {
+        return deleteLedger(bucketId);
+    }
+
+    @Override
+    public void start() throws Exception {
+        this.bookKeeper = pulsar.getBookKeeperClientFactory().create(
+                pulsar.getConfiguration(),
+                pulsar.getLocalMetadataStore(),
+                pulsar.getIoEventLoopGroup(),
+                Optional.empty(),
+                null
+        );
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (bookKeeper != null) {
+            bookKeeper.close();
+        }
+    }
+
+    private CompletableFuture<Void> addSnapshotSegments(LedgerHandle ledgerHandle,
+                                                        List<SnapshotSegment> bucketSnapshotSegments) {
+        List<CompletableFuture<Void>> addFutures = new ArrayList<>();
+        for (SnapshotSegment bucketSnapshotSegment : bucketSnapshotSegments) {
+            addFutures.add(addEntry(ledgerHandle, bucketSnapshotSegment.toByteArray()));
+        }
+
+        return FutureUtil.waitForAll(addFutures);
+    }
+
+    private CompletableFuture<SnapshotMetadata> parseSnapshotMetadataEntry(LedgerEntry ledgerEntry) {
+        CompletableFuture<SnapshotMetadata> result = new CompletableFuture<>();
+        try {
+            result.complete(SnapshotMetadata.parseFrom(ledgerEntry.getEntry()));
+        } catch (IOException e) {
+            result.completeExceptionally(e);
+        }
+        return result;
+    }
+
+    private CompletableFuture<List<SnapshotSegment>> parseSnapshotSegmentEntries(
+            Enumeration<LedgerEntry> entryEnumeration) {
+        CompletableFuture<List<SnapshotSegment>> result = new CompletableFuture<>();
+        List<SnapshotSegment> snapshotMetadataList = new ArrayList<>();
+        try {
+            while (entryEnumeration.hasMoreElements()) {
+                LedgerEntry ledgerEntry = entryEnumeration.nextElement();
+                snapshotMetadataList.add(SnapshotSegment.parseFrom(ledgerEntry.getEntry()));
+            }
+            result.complete(snapshotMetadataList);
+        } catch (IOException e) {
+            result.completeExceptionally(e);
+        }
+        return result;
+    }
+
+    @NotNull
+    private CompletableFuture<LedgerHandle> createLedger() {
+        CompletableFuture<LedgerHandle> future = new CompletableFuture<>();
+        bookKeeper.asyncCreateLedger(
+                config.getManagedLedgerDefaultEnsembleSize(),
+                config.getManagedLedgerDefaultWriteQuorum(),
+                config.getManagedLedgerDefaultAckQuorum(),
+                BookKeeper.DigestType.fromApiDigestType(config.getManagedLedgerDigestType()),
+                LedgerPassword,
+                (rc, handle, ctx) -> {
+                    if (rc != BKException.Code.OK) {
+                        future.completeExceptionally(bkException("Failed to create ledger", rc, -1));
+                    } else {
+                        future.complete(handle);
+                    }
+                }, null, null);
+        return future;
+    }
+
+    private CompletableFuture<LedgerHandle> openLedger(Long ledgerId) {
+        final CompletableFuture<LedgerHandle> future = new CompletableFuture<>();
+        bookKeeper.asyncOpenLedger(
+                ledgerId,
+                BookKeeper.DigestType.fromApiDigestType(config.getManagedLedgerDigestType()),
+                LedgerPassword,
+                (rc, handle, ctx) -> {
+                    if (rc != BKException.Code.OK) {
+                        future.completeExceptionally(bkException("Failed to open ledger", rc, ledgerId));
+                    } else {
+                        future.complete(handle);
+                    }
+                }, null
+        );
+        return future;
+    }
+
+    private CompletableFuture<Void> closeLedger(LedgerHandle ledgerHandle) {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        ledgerHandle.asyncClose((rc, handle, ctx) -> {
+            if (rc != BKException.Code.OK) {
+                log.warn("Failed to close a Ledger Handle: {}", ledgerHandle.getId());
+                future.completeExceptionally(bkException("Failed to close ledger", rc, ledgerHandle.getId()));
+            } else {
+                future.complete(null);
+            }
+        }, null);
+        return future;
+    }
+
+    private CompletableFuture<Void> addEntry(LedgerHandle ledgerHandle, byte[] data) {
+        final CompletableFuture<Void> future = new CompletableFuture<>();
+        ledgerHandle.asyncAddEntry(data,
+                (rc, handle, entryId, ctx) -> {
+                    if (rc != BKException.Code.OK) {
+                        future.completeExceptionally(bkException("Failed to add entry", rc, ledgerHandle.getId()));
+                    } else {
+                        future.complete(null);
+                    }
+                }, null
+        );
+        return future;
+    }
+
+    CompletableFuture<Enumeration<LedgerEntry>> getLedgerEntry(LedgerHandle ledger,

Review Comment:
   Ok.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on a diff in pull request #17677: [feat][broker][PIP-195] Implement BookkeeperBucketSnapshotStorage - part3

Posted by GitBox <gi...@apache.org>.
coderzc commented on code in PR #17677:
URL: https://github.com/apache/pulsar/pull/17677#discussion_r1026222183


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java:
##########
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed.bucket;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import javax.validation.constraints.NotNull;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BookkeeperBucketSnapshotStorage implements BucketSnapshotStorage {
+
+    private static final Logger log = LoggerFactory.getLogger(BookkeeperBucketSnapshotStorage.class);
+    private static final byte[] LedgerPassword = "".getBytes();
+
+    private final PulsarService pulsar;
+    private final ServiceConfiguration config;
+    private BookKeeper bookKeeper;
+
+    public BookkeeperBucketSnapshotStorage(PulsarService pulsar) {
+        this.pulsar = pulsar;
+        this.config = pulsar.getConfig();
+    }
+
+    @Override
+    public CompletableFuture<Long> createBucketSnapshot(SnapshotMetadata snapshotMetadata,
+                                                        List<SnapshotSegment> bucketSnapshotSegments) {
+        return createLedger()
+                .thenCompose(ledgerHandle -> addEntry(ledgerHandle, snapshotMetadata.toByteArray())

Review Comment:
   You are right.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #17677: [feat][broker][PIP-195] Implement BookkeeperBucketSnapshotStorage - part3

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #17677:
URL: https://github.com/apache/pulsar/pull/17677#discussion_r1051938857


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java:
##########
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed.bucket;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import javax.validation.constraints.NotNull;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BookkeeperBucketSnapshotStorage implements BucketSnapshotStorage {
+
+    private static final Logger log = LoggerFactory.getLogger(BookkeeperBucketSnapshotStorage.class);
+    private static final byte[] LedgerPassword = "".getBytes();
+
+    private final PulsarService pulsar;
+    private final ServiceConfiguration config;
+    private BookKeeper bookKeeper;
+
+    public BookkeeperBucketSnapshotStorage(PulsarService pulsar) {
+        this.pulsar = pulsar;
+        this.config = pulsar.getConfig();
+    }
+
+    @Override
+    public CompletableFuture<Long> createBucketSnapshot(SnapshotMetadata snapshotMetadata,
+                                                        List<SnapshotSegment> bucketSnapshotSegments) {
+        return createLedger()
+                .thenCompose(ledgerHandle -> addEntry(ledgerHandle, snapshotMetadata.toByteArray())
+                                .thenApply(__ -> ledgerHandle))
+                .thenCompose(ledgerHandle -> addSnapshotSegments(ledgerHandle, bucketSnapshotSegments)
+                        .thenApply(__ -> ledgerHandle))
+                .thenCompose(ledgerHandle -> closeLedger(ledgerHandle)
+                        .thenApply(__ -> ledgerHandle.getId()));
+    }
+
+    @Override
+    public CompletableFuture<SnapshotMetadata> getBucketSnapshotMetadata(long bucketId) {
+        return openLedger(bucketId).thenCompose(
+                ledgerHandle -> getLedgerEntryThenCloseLedger(ledgerHandle, 0, 0).
+                        thenCompose(
+                        entryEnumeration -> parseSnapshotMetadataEntry(entryEnumeration.nextElement())));
+    }
+
+    @Override
+    public CompletableFuture<List<SnapshotSegment>> getBucketSnapshotSegment(long bucketId, long firstSegmentEntryId,
+                                                                             long lastSegmentEntryId) {
+        return openLedger(bucketId).thenCompose(
+                ledgerHandle -> getLedgerEntryThenCloseLedger(ledgerHandle, firstSegmentEntryId, lastSegmentEntryId).thenCompose(
+                        this::parseSnapshotSegmentEntries));
+    }
+
+    @Override
+    public CompletableFuture<Long> getBucketSnapshotLength(long bucketId) {
+        return openLedger(bucketId).thenApply(ledgerHandle -> {
+            long length = ledgerHandle.getLength();
+            closeLedger(ledgerHandle);
+            return length;
+        });
+    }
+
+    @Override
+    public CompletableFuture<Void> deleteBucketSnapshot(long bucketId) {
+        return deleteLedger(bucketId);
+    }
+
+    @Override
+    public void start() throws Exception {
+        pulsar.getBookKeeperClient();
+        this.bookKeeper = pulsar.getBookKeeperClientFactory().create(
+                pulsar.getConfiguration(),
+                pulsar.getLocalMetadataStore(),
+                pulsar.getIoEventLoopGroup(),
+                Optional.empty(),
+                null
+        );
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (bookKeeper != null) {
+            bookKeeper.close();
+        }
+    }
+
+    private CompletableFuture<Void> addSnapshotSegments(LedgerHandle ledgerHandle,
+                                                        List<SnapshotSegment> bucketSnapshotSegments) {
+        List<CompletableFuture<Void>> addFutures = new ArrayList<>();
+        for (SnapshotSegment bucketSnapshotSegment : bucketSnapshotSegments) {
+            addFutures.add(addEntry(ledgerHandle, bucketSnapshotSegment.toByteArray()));
+        }
+
+        return FutureUtil.waitForAll(addFutures);
+    }
+
+    private CompletableFuture<SnapshotMetadata> parseSnapshotMetadataEntry(LedgerEntry ledgerEntry) {
+        CompletableFuture<SnapshotMetadata> result = new CompletableFuture<>();
+        try {
+            result.complete(SnapshotMetadata.parseFrom(ledgerEntry.getEntry()));
+        } catch (IOException e) {
+            result.completeExceptionally(e);
+        }
+        return result;
+    }
+
+    private CompletableFuture<List<SnapshotSegment>> parseSnapshotSegmentEntries(
+            Enumeration<LedgerEntry> entryEnumeration) {
+        CompletableFuture<List<SnapshotSegment>> result = new CompletableFuture<>();
+        List<SnapshotSegment> snapshotMetadataList = new ArrayList<>();
+        try {
+            while (entryEnumeration.hasMoreElements()) {
+                LedgerEntry ledgerEntry = entryEnumeration.nextElement();
+                snapshotMetadataList.add(SnapshotSegment.parseFrom(ledgerEntry.getEntry()));
+            }
+            result.complete(snapshotMetadataList);
+        } catch (IOException e) {
+            result.completeExceptionally(e);
+        }
+        return result;
+    }
+
+    @NotNull
+    private CompletableFuture<LedgerHandle> createLedger() {
+        CompletableFuture<LedgerHandle> future = new CompletableFuture<>();
+        bookKeeper.asyncCreateLedger(
+                config.getManagedLedgerDefaultEnsembleSize(),
+                config.getManagedLedgerDefaultWriteQuorum(),
+                config.getManagedLedgerDefaultAckQuorum(),
+                BookKeeper.DigestType.fromApiDigestType(config.getManagedLedgerDigestType()),
+                LedgerPassword,
+                (rc, handle, ctx) -> {
+                    if (rc != BKException.Code.OK) {
+                        future.completeExceptionally(bkException("Failed to create ledger", rc, -1));
+                    } else {
+                        future.complete(handle);
+                    }
+                }, null, null);
+        return future;
+    }
+
+    private CompletableFuture<LedgerHandle> openLedger(Long ledgerId) {
+        final CompletableFuture<LedgerHandle> future = new CompletableFuture<>();
+        bookKeeper.asyncOpenLedger(
+                ledgerId,
+                BookKeeper.DigestType.fromApiDigestType(config.getManagedLedgerDigestType()),
+                LedgerPassword,
+                (rc, handle, ctx) -> {
+                    if (rc != BKException.Code.OK) {
+                        future.completeExceptionally(bkException("Failed to open ledger", rc, ledgerId));
+                    } else {
+                        future.complete(handle);
+                    }
+                }, null
+        );
+        return future;
+    }
+
+    private CompletableFuture<Void> closeLedger(LedgerHandle ledgerHandle) {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        ledgerHandle.asyncClose((rc, handle, ctx) -> {
+            if (rc != BKException.Code.OK) {
+                log.warn("Failed to close a Ledger Handle: {}", ledgerHandle.getId());
+                future.completeExceptionally(bkException("Failed to close ledger", rc, ledgerHandle.getId()));
+            } else {
+                future.complete(null);
+            }
+        }, null);
+        return future;
+    }
+
+    private CompletableFuture<Void> addEntry(LedgerHandle ledgerHandle, byte[] data) {
+        final CompletableFuture<Void> future = new CompletableFuture<>();
+        ledgerHandle.asyncAddEntry(data,
+                (rc, handle, entryId, ctx) -> {
+                    if (rc != BKException.Code.OK) {
+                        future.completeExceptionally(bkException("Failed to add entry", rc, ledgerHandle.getId()));
+                    } else {
+                        future.complete(null);
+                    }
+                }, null
+        );
+        return future;
+    }
+
+    CompletableFuture<Enumeration<LedgerEntry>> getLedgerEntryThenCloseLedger(LedgerHandle ledger,
+                                                                              long firstEntryId, long lastEntryId) {
+        final CompletableFuture<Enumeration<LedgerEntry>> future = new CompletableFuture<>();
+        ledger.asyncReadEntries(firstEntryId, lastEntryId,
+                (rc, handle, entries, ctx) -> {
+                    if (rc != BKException.Code.OK) {
+                        future.completeExceptionally(bkException("Failed to read entry", rc, ledger.getId()));
+                    } else {
+                        future.complete(entries);
+                    }
+                    closeLedger(handle);

Review Comment:
   I still have concerns here, we call this method `getLedgerEntryThenCloseLedger`, but we don't ensure the ledger is closed after this method is returned.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on a diff in pull request #17677: [feat][broker][PIP-195] Implement BookkeeperBucketSnapshotStorage - part3

Posted by GitBox <gi...@apache.org>.
coderzc commented on code in PR #17677:
URL: https://github.com/apache/pulsar/pull/17677#discussion_r1051955473


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java:
##########
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed.bucket;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import javax.validation.constraints.NotNull;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
+import org.apache.pulsar.common.util.FutureUtil;
+
+@Slf4j
+public class BookkeeperBucketSnapshotStorage implements BucketSnapshotStorage {
+
+    private static final byte[] LedgerPassword = "".getBytes();
+
+    private final PulsarService pulsar;
+    private final ServiceConfiguration config;
+    private BookKeeper bookKeeper;
+
+    public BookkeeperBucketSnapshotStorage(PulsarService pulsar) {
+        this.pulsar = pulsar;
+        this.config = pulsar.getConfig();
+    }
+
+    @Override
+    public CompletableFuture<Long> createBucketSnapshot(SnapshotMetadata snapshotMetadata,
+                                                        List<SnapshotSegment> bucketSnapshotSegments,
+                                                        String bucketKey) {
+        return createLedger(bucketKey)
+                .thenCompose(ledgerHandle -> addEntry(ledgerHandle, snapshotMetadata.toByteArray())
+                        .thenCompose(__ -> addSnapshotSegments(ledgerHandle, bucketSnapshotSegments))
+                        .thenCompose(__ -> closeLedger(ledgerHandle))
+                        .thenApply(__ -> ledgerHandle.getId()).whenComplete((ledgerId, ex) -> {
+                            if (ex != null) {
+                                deleteLedger(ledgerId);
+                            }
+                        }));
+    }
+
+    @Override
+    public CompletableFuture<SnapshotMetadata> getBucketSnapshotMetadata(long bucketId) {
+        return openLedger(bucketId).thenCompose(
+                ledgerHandle -> getLedgerEntryThenCloseLedger(ledgerHandle, 0, 0).
+                        thenApply(entryEnumeration -> parseSnapshotMetadataEntry(entryEnumeration.nextElement())));
+    }
+
+    @Override
+    public CompletableFuture<List<SnapshotSegment>> getBucketSnapshotSegment(long bucketId, long firstSegmentEntryId,
+                                                                             long lastSegmentEntryId) {
+        return openLedger(bucketId).thenCompose(
+                ledgerHandle -> getLedgerEntryThenCloseLedger(ledgerHandle, firstSegmentEntryId,
+                        lastSegmentEntryId).thenApply(this::parseSnapshotSegmentEntries));
+    }
+
+    @Override
+    public CompletableFuture<Long> getBucketSnapshotLength(long bucketId) {
+        return openLedger(bucketId).thenApply(ledgerHandle -> {
+            long length = ledgerHandle.getLength();
+            closeLedger(ledgerHandle);
+            return length;
+        });
+    }
+
+    @Override
+    public CompletableFuture<Void> deleteBucketSnapshot(long bucketId) {
+        return deleteLedger(bucketId);
+    }
+
+    @Override
+    public void start() throws Exception {
+        pulsar.getBookKeeperClient();

Review Comment:
   I will remove it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #17677: [feat][broker][PIP-195] Implement BookkeeperBucketSnapshotStorage - part3

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #17677:
URL: https://github.com/apache/pulsar/pull/17677#discussion_r1052184671


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java:
##########
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed.bucket;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import javax.validation.constraints.NotNull;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
+import org.apache.pulsar.common.util.FutureUtil;
+
+@Slf4j
+public class BookkeeperBucketSnapshotStorage implements BucketSnapshotStorage {
+
+    private static final byte[] LedgerPassword = "".getBytes();
+
+    private final PulsarService pulsar;
+    private final ServiceConfiguration config;
+    private BookKeeper bookKeeper;
+
+    public BookkeeperBucketSnapshotStorage(PulsarService pulsar) {
+        this.pulsar = pulsar;
+        this.config = pulsar.getConfig();
+    }
+
+    @Override
+    public CompletableFuture<Long> createBucketSnapshot(SnapshotMetadata snapshotMetadata,
+                                                        List<SnapshotSegment> bucketSnapshotSegments,
+                                                        String bucketKey) {
+        return createLedger(bucketKey)
+                .thenCompose(ledgerHandle -> addEntry(ledgerHandle, snapshotMetadata.toByteArray())
+                        .thenCompose(__ -> addSnapshotSegments(ledgerHandle, bucketSnapshotSegments))
+                        .thenCompose(__ -> closeLedger(ledgerHandle))
+                        .thenApply(__ -> ledgerHandle.getId()).whenComplete((ledgerId, ex) -> {
+                            if (ex != null) {
+                                deleteLedger(ledgerId);
+                            }
+                        }));
+    }
+
+    @Override
+    public CompletableFuture<SnapshotMetadata> getBucketSnapshotMetadata(long bucketId) {
+        return openLedger(bucketId).thenCompose(
+                ledgerHandle -> getLedgerEntryThenCloseLedger(ledgerHandle, 0, 0).
+                        thenApply(entryEnumeration -> parseSnapshotMetadataEntry(entryEnumeration.nextElement())));
+    }
+
+    @Override
+    public CompletableFuture<List<SnapshotSegment>> getBucketSnapshotSegment(long bucketId, long firstSegmentEntryId,
+                                                                             long lastSegmentEntryId) {
+        return openLedger(bucketId).thenCompose(
+                ledgerHandle -> getLedgerEntryThenCloseLedger(ledgerHandle, firstSegmentEntryId,
+                        lastSegmentEntryId).thenApply(this::parseSnapshotSegmentEntries));
+    }
+
+    @Override
+    public CompletableFuture<Long> getBucketSnapshotLength(long bucketId) {
+        return openLedger(bucketId).thenApply(ledgerHandle -> {
+            long length = ledgerHandle.getLength();
+            closeLedger(ledgerHandle);
+            return length;
+        });
+    }
+
+    @Override
+    public CompletableFuture<Void> deleteBucketSnapshot(long bucketId) {
+        return deleteLedger(bucketId);
+    }
+
+    @Override
+    public void start() throws Exception {
+        pulsar.getBookKeeperClient();
+        this.bookKeeper = pulsar.getBookKeeperClientFactory().create(
+                pulsar.getConfiguration(),
+                pulsar.getLocalMetadataStore(),
+                pulsar.getIoEventLoopGroup(),
+                Optional.empty(),
+                null
+        );
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (bookKeeper != null) {
+            bookKeeper.close();
+        }
+    }
+
+    private CompletableFuture<Void> addSnapshotSegments(LedgerHandle ledgerHandle,
+                                                        List<SnapshotSegment> bucketSnapshotSegments) {
+        List<CompletableFuture<Void>> addFutures = new ArrayList<>();
+        for (SnapshotSegment bucketSnapshotSegment : bucketSnapshotSegments) {
+            addFutures.add(addEntry(ledgerHandle, bucketSnapshotSegment.toByteArray()));

Review Comment:
   Yes, but some entries have already been added to this ledger. So we will throw this book away, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #17677: [feat][broker][PIP-195] Implement BookkeeperBucketSnapshotStorage - part3

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #17677:
URL: https://github.com/apache/pulsar/pull/17677#discussion_r1051938857


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java:
##########
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed.bucket;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import javax.validation.constraints.NotNull;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BookkeeperBucketSnapshotStorage implements BucketSnapshotStorage {
+
+    private static final Logger log = LoggerFactory.getLogger(BookkeeperBucketSnapshotStorage.class);
+    private static final byte[] LedgerPassword = "".getBytes();
+
+    private final PulsarService pulsar;
+    private final ServiceConfiguration config;
+    private BookKeeper bookKeeper;
+
+    public BookkeeperBucketSnapshotStorage(PulsarService pulsar) {
+        this.pulsar = pulsar;
+        this.config = pulsar.getConfig();
+    }
+
+    @Override
+    public CompletableFuture<Long> createBucketSnapshot(SnapshotMetadata snapshotMetadata,
+                                                        List<SnapshotSegment> bucketSnapshotSegments) {
+        return createLedger()
+                .thenCompose(ledgerHandle -> addEntry(ledgerHandle, snapshotMetadata.toByteArray())
+                                .thenApply(__ -> ledgerHandle))
+                .thenCompose(ledgerHandle -> addSnapshotSegments(ledgerHandle, bucketSnapshotSegments)
+                        .thenApply(__ -> ledgerHandle))
+                .thenCompose(ledgerHandle -> closeLedger(ledgerHandle)
+                        .thenApply(__ -> ledgerHandle.getId()));
+    }
+
+    @Override
+    public CompletableFuture<SnapshotMetadata> getBucketSnapshotMetadata(long bucketId) {
+        return openLedger(bucketId).thenCompose(
+                ledgerHandle -> getLedgerEntryThenCloseLedger(ledgerHandle, 0, 0).
+                        thenCompose(
+                        entryEnumeration -> parseSnapshotMetadataEntry(entryEnumeration.nextElement())));
+    }
+
+    @Override
+    public CompletableFuture<List<SnapshotSegment>> getBucketSnapshotSegment(long bucketId, long firstSegmentEntryId,
+                                                                             long lastSegmentEntryId) {
+        return openLedger(bucketId).thenCompose(
+                ledgerHandle -> getLedgerEntryThenCloseLedger(ledgerHandle, firstSegmentEntryId, lastSegmentEntryId).thenCompose(
+                        this::parseSnapshotSegmentEntries));
+    }
+
+    @Override
+    public CompletableFuture<Long> getBucketSnapshotLength(long bucketId) {
+        return openLedger(bucketId).thenApply(ledgerHandle -> {
+            long length = ledgerHandle.getLength();
+            closeLedger(ledgerHandle);
+            return length;
+        });
+    }
+
+    @Override
+    public CompletableFuture<Void> deleteBucketSnapshot(long bucketId) {
+        return deleteLedger(bucketId);
+    }
+
+    @Override
+    public void start() throws Exception {
+        pulsar.getBookKeeperClient();
+        this.bookKeeper = pulsar.getBookKeeperClientFactory().create(
+                pulsar.getConfiguration(),
+                pulsar.getLocalMetadataStore(),
+                pulsar.getIoEventLoopGroup(),
+                Optional.empty(),
+                null
+        );
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (bookKeeper != null) {
+            bookKeeper.close();
+        }
+    }
+
+    private CompletableFuture<Void> addSnapshotSegments(LedgerHandle ledgerHandle,
+                                                        List<SnapshotSegment> bucketSnapshotSegments) {
+        List<CompletableFuture<Void>> addFutures = new ArrayList<>();
+        for (SnapshotSegment bucketSnapshotSegment : bucketSnapshotSegments) {
+            addFutures.add(addEntry(ledgerHandle, bucketSnapshotSegment.toByteArray()));
+        }
+
+        return FutureUtil.waitForAll(addFutures);
+    }
+
+    private CompletableFuture<SnapshotMetadata> parseSnapshotMetadataEntry(LedgerEntry ledgerEntry) {
+        CompletableFuture<SnapshotMetadata> result = new CompletableFuture<>();
+        try {
+            result.complete(SnapshotMetadata.parseFrom(ledgerEntry.getEntry()));
+        } catch (IOException e) {
+            result.completeExceptionally(e);
+        }
+        return result;
+    }
+
+    private CompletableFuture<List<SnapshotSegment>> parseSnapshotSegmentEntries(
+            Enumeration<LedgerEntry> entryEnumeration) {
+        CompletableFuture<List<SnapshotSegment>> result = new CompletableFuture<>();
+        List<SnapshotSegment> snapshotMetadataList = new ArrayList<>();
+        try {
+            while (entryEnumeration.hasMoreElements()) {
+                LedgerEntry ledgerEntry = entryEnumeration.nextElement();
+                snapshotMetadataList.add(SnapshotSegment.parseFrom(ledgerEntry.getEntry()));
+            }
+            result.complete(snapshotMetadataList);
+        } catch (IOException e) {
+            result.completeExceptionally(e);
+        }
+        return result;
+    }
+
+    @NotNull
+    private CompletableFuture<LedgerHandle> createLedger() {
+        CompletableFuture<LedgerHandle> future = new CompletableFuture<>();
+        bookKeeper.asyncCreateLedger(
+                config.getManagedLedgerDefaultEnsembleSize(),
+                config.getManagedLedgerDefaultWriteQuorum(),
+                config.getManagedLedgerDefaultAckQuorum(),
+                BookKeeper.DigestType.fromApiDigestType(config.getManagedLedgerDigestType()),
+                LedgerPassword,
+                (rc, handle, ctx) -> {
+                    if (rc != BKException.Code.OK) {
+                        future.completeExceptionally(bkException("Failed to create ledger", rc, -1));
+                    } else {
+                        future.complete(handle);
+                    }
+                }, null, null);
+        return future;
+    }
+
+    private CompletableFuture<LedgerHandle> openLedger(Long ledgerId) {
+        final CompletableFuture<LedgerHandle> future = new CompletableFuture<>();
+        bookKeeper.asyncOpenLedger(
+                ledgerId,
+                BookKeeper.DigestType.fromApiDigestType(config.getManagedLedgerDigestType()),
+                LedgerPassword,
+                (rc, handle, ctx) -> {
+                    if (rc != BKException.Code.OK) {
+                        future.completeExceptionally(bkException("Failed to open ledger", rc, ledgerId));
+                    } else {
+                        future.complete(handle);
+                    }
+                }, null
+        );
+        return future;
+    }
+
+    private CompletableFuture<Void> closeLedger(LedgerHandle ledgerHandle) {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        ledgerHandle.asyncClose((rc, handle, ctx) -> {
+            if (rc != BKException.Code.OK) {
+                log.warn("Failed to close a Ledger Handle: {}", ledgerHandle.getId());
+                future.completeExceptionally(bkException("Failed to close ledger", rc, ledgerHandle.getId()));
+            } else {
+                future.complete(null);
+            }
+        }, null);
+        return future;
+    }
+
+    private CompletableFuture<Void> addEntry(LedgerHandle ledgerHandle, byte[] data) {
+        final CompletableFuture<Void> future = new CompletableFuture<>();
+        ledgerHandle.asyncAddEntry(data,
+                (rc, handle, entryId, ctx) -> {
+                    if (rc != BKException.Code.OK) {
+                        future.completeExceptionally(bkException("Failed to add entry", rc, ledgerHandle.getId()));
+                    } else {
+                        future.complete(null);
+                    }
+                }, null
+        );
+        return future;
+    }
+
+    CompletableFuture<Enumeration<LedgerEntry>> getLedgerEntryThenCloseLedger(LedgerHandle ledger,
+                                                                              long firstEntryId, long lastEntryId) {
+        final CompletableFuture<Enumeration<LedgerEntry>> future = new CompletableFuture<>();
+        ledger.asyncReadEntries(firstEntryId, lastEntryId,
+                (rc, handle, entries, ctx) -> {
+                    if (rc != BKException.Code.OK) {
+                        future.completeExceptionally(bkException("Failed to read entry", rc, ledger.getId()));
+                    } else {
+                        future.complete(entries);
+                    }
+                    closeLedger(handle);

Review Comment:
   I still have concerns here, we call this method `getLedgerEntryThenCloseLedger`, but we don't ensure the ledger is closed after this method is returned. (because of the close behaviour running in the async background )



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #17677: [feat][broker][PIP-195] Implement BookkeeperBucketSnapshotStorage - part3

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #17677:
URL: https://github.com/apache/pulsar/pull/17677#discussion_r1051935747


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java:
##########
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed.bucket;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import javax.validation.constraints.NotNull;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
+import org.apache.pulsar.common.util.FutureUtil;
+
+@Slf4j
+public class BookkeeperBucketSnapshotStorage implements BucketSnapshotStorage {
+
+    private static final byte[] LedgerPassword = "".getBytes();
+
+    private final PulsarService pulsar;
+    private final ServiceConfiguration config;
+    private BookKeeper bookKeeper;
+
+    public BookkeeperBucketSnapshotStorage(PulsarService pulsar) {
+        this.pulsar = pulsar;
+        this.config = pulsar.getConfig();
+    }
+
+    @Override
+    public CompletableFuture<Long> createBucketSnapshot(SnapshotMetadata snapshotMetadata,
+                                                        List<SnapshotSegment> bucketSnapshotSegments,
+                                                        String bucketKey) {
+        return createLedger(bucketKey)
+                .thenCompose(ledgerHandle -> addEntry(ledgerHandle, snapshotMetadata.toByteArray())
+                        .thenCompose(__ -> addSnapshotSegments(ledgerHandle, bucketSnapshotSegments))
+                        .thenCompose(__ -> closeLedger(ledgerHandle))
+                        .thenApply(__ -> ledgerHandle.getId()).whenComplete((ledgerId, ex) -> {
+                            if (ex != null) {
+                                deleteLedger(ledgerId);
+                            }
+                        }));
+    }
+
+    @Override
+    public CompletableFuture<SnapshotMetadata> getBucketSnapshotMetadata(long bucketId) {
+        return openLedger(bucketId).thenCompose(
+                ledgerHandle -> getLedgerEntryThenCloseLedger(ledgerHandle, 0, 0).
+                        thenApply(entryEnumeration -> parseSnapshotMetadataEntry(entryEnumeration.nextElement())));
+    }
+
+    @Override
+    public CompletableFuture<List<SnapshotSegment>> getBucketSnapshotSegment(long bucketId, long firstSegmentEntryId,
+                                                                             long lastSegmentEntryId) {
+        return openLedger(bucketId).thenCompose(
+                ledgerHandle -> getLedgerEntryThenCloseLedger(ledgerHandle, firstSegmentEntryId,
+                        lastSegmentEntryId).thenApply(this::parseSnapshotSegmentEntries));
+    }
+
+    @Override
+    public CompletableFuture<Long> getBucketSnapshotLength(long bucketId) {
+        return openLedger(bucketId).thenApply(ledgerHandle -> {
+            long length = ledgerHandle.getLength();
+            closeLedger(ledgerHandle);
+            return length;
+        });
+    }
+
+    @Override
+    public CompletableFuture<Void> deleteBucketSnapshot(long bucketId) {
+        return deleteLedger(bucketId);
+    }
+
+    @Override
+    public void start() throws Exception {
+        pulsar.getBookKeeperClient();
+        this.bookKeeper = pulsar.getBookKeeperClientFactory().create(
+                pulsar.getConfiguration(),
+                pulsar.getLocalMetadataStore(),
+                pulsar.getIoEventLoopGroup(),
+                Optional.empty(),
+                null
+        );
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (bookKeeper != null) {
+            bookKeeper.close();
+        }
+    }
+
+    private CompletableFuture<Void> addSnapshotSegments(LedgerHandle ledgerHandle,
+                                                        List<SnapshotSegment> bucketSnapshotSegments) {
+        List<CompletableFuture<Void>> addFutures = new ArrayList<>();
+        for (SnapshotSegment bucketSnapshotSegment : bucketSnapshotSegments) {
+            addFutures.add(addEntry(ledgerHandle, bucketSnapshotSegment.toByteArray()));

Review Comment:
   is it matter if we add 1,2 successful but 3,4 failed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #17677: [feat][broker][PIP-195] Implement BookkeeperBucketSnapshotStorage - part3

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #17677:
URL: https://github.com/apache/pulsar/pull/17677#discussion_r1051953582


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java:
##########
@@ -61,7 +61,11 @@ public CompletableFuture<Long> createBucketSnapshot(SnapshotMetadata snapshotMet
                 .thenCompose(ledgerHandle -> addEntry(ledgerHandle, snapshotMetadata.toByteArray())
                         .thenCompose(__ -> addSnapshotSegments(ledgerHandle, bucketSnapshotSegments))
                         .thenCompose(__ -> closeLedger(ledgerHandle))
-                        .thenApply(__ -> ledgerHandle.getId()));
+                        .thenApply(__ -> ledgerHandle.getId()).whenComplete((ledgerId, ex) -> {

Review Comment:
   If we add the exception handing to the end of the chain. Do we need to check the exception types? If it just failed to close the Ledger, we don't need to remove the ledger or just the Ledger creation failed.
   
   Or we can just add exception handling for `addEntry()` method?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on a diff in pull request #17677: [feat][broker][PIP-195] Implement BookkeeperBucketSnapshotStorage - part3

Posted by GitBox <gi...@apache.org>.
coderzc commented on code in PR #17677:
URL: https://github.com/apache/pulsar/pull/17677#discussion_r1026281558


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java:
##########
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed.bucket;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import javax.validation.constraints.NotNull;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BookkeeperBucketSnapshotStorage implements BucketSnapshotStorage {
+
+    private static final Logger log = LoggerFactory.getLogger(BookkeeperBucketSnapshotStorage.class);
+    private static final byte[] LedgerPassword = "".getBytes();
+
+    private final PulsarService pulsar;
+    private final ServiceConfiguration config;
+    private BookKeeper bookKeeper;
+
+    public BookkeeperBucketSnapshotStorage(PulsarService pulsar) {
+        this.pulsar = pulsar;
+        this.config = pulsar.getConfig();
+    }
+
+    @Override
+    public CompletableFuture<Long> createBucketSnapshot(SnapshotMetadata snapshotMetadata,
+                                                        List<SnapshotSegment> bucketSnapshotSegments) {
+        return createLedger()
+                .thenCompose(ledgerHandle -> addEntry(ledgerHandle, snapshotMetadata.toByteArray())
+                                .thenApply(__ -> ledgerHandle))
+                .thenCompose(ledgerHandle -> addSnapshotSegments(ledgerHandle, bucketSnapshotSegments)
+                        .thenApply(__ -> ledgerHandle))
+                .thenCompose(ledgerHandle -> closeLedger(ledgerHandle)
+                        .thenApply(__ -> ledgerHandle.getId()));
+    }
+
+    @Override
+    public CompletableFuture<SnapshotMetadata> getBucketSnapshotMetadata(long bucketId) {
+        return openLedger(bucketId).thenCompose(
+                ledgerHandle -> getLedgerEntryThenCloseLedger(ledgerHandle, 0, 0).
+                        thenCompose(
+                        entryEnumeration -> parseSnapshotMetadataEntry(entryEnumeration.nextElement())));
+    }
+
+    @Override
+    public CompletableFuture<List<SnapshotSegment>> getBucketSnapshotSegment(long bucketId, long firstSegmentEntryId,
+                                                                             long lastSegmentEntryId) {
+        return openLedger(bucketId).thenCompose(
+                ledgerHandle -> getLedgerEntryThenCloseLedger(ledgerHandle, firstSegmentEntryId, lastSegmentEntryId).thenCompose(
+                        this::parseSnapshotSegmentEntries));
+    }
+
+    @Override
+    public CompletableFuture<Long> getBucketSnapshotLength(long bucketId) {
+        return openLedger(bucketId).thenApply(ledgerHandle -> {
+            long length = ledgerHandle.getLength();
+            closeLedger(ledgerHandle);
+            return length;
+        });
+    }
+
+    @Override
+    public CompletableFuture<Void> deleteBucketSnapshot(long bucketId) {
+        return deleteLedger(bucketId);
+    }
+
+    @Override
+    public void start() throws Exception {
+        pulsar.getBookKeeperClient();
+        this.bookKeeper = pulsar.getBookKeeperClientFactory().create(
+                pulsar.getConfiguration(),
+                pulsar.getLocalMetadataStore(),
+                pulsar.getIoEventLoopGroup(),
+                Optional.empty(),
+                null
+        );
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (bookKeeper != null) {
+            bookKeeper.close();
+        }
+    }
+
+    private CompletableFuture<Void> addSnapshotSegments(LedgerHandle ledgerHandle,
+                                                        List<SnapshotSegment> bucketSnapshotSegments) {
+        List<CompletableFuture<Void>> addFutures = new ArrayList<>();
+        for (SnapshotSegment bucketSnapshotSegment : bucketSnapshotSegments) {
+            addFutures.add(addEntry(ledgerHandle, bucketSnapshotSegment.toByteArray()));
+        }
+
+        return FutureUtil.waitForAll(addFutures);
+    }
+
+    private CompletableFuture<SnapshotMetadata> parseSnapshotMetadataEntry(LedgerEntry ledgerEntry) {
+        CompletableFuture<SnapshotMetadata> result = new CompletableFuture<>();
+        try {
+            result.complete(SnapshotMetadata.parseFrom(ledgerEntry.getEntry()));
+        } catch (IOException e) {
+            result.completeExceptionally(e);
+        }
+        return result;
+    }
+
+    private CompletableFuture<List<SnapshotSegment>> parseSnapshotSegmentEntries(
+            Enumeration<LedgerEntry> entryEnumeration) {
+        CompletableFuture<List<SnapshotSegment>> result = new CompletableFuture<>();
+        List<SnapshotSegment> snapshotMetadataList = new ArrayList<>();
+        try {
+            while (entryEnumeration.hasMoreElements()) {
+                LedgerEntry ledgerEntry = entryEnumeration.nextElement();
+                snapshotMetadataList.add(SnapshotSegment.parseFrom(ledgerEntry.getEntry()));
+            }
+            result.complete(snapshotMetadataList);
+        } catch (IOException e) {
+            result.completeExceptionally(e);
+        }
+        return result;
+    }
+
+    @NotNull
+    private CompletableFuture<LedgerHandle> createLedger() {
+        CompletableFuture<LedgerHandle> future = new CompletableFuture<>();
+        bookKeeper.asyncCreateLedger(
+                config.getManagedLedgerDefaultEnsembleSize(),
+                config.getManagedLedgerDefaultWriteQuorum(),
+                config.getManagedLedgerDefaultAckQuorum(),
+                BookKeeper.DigestType.fromApiDigestType(config.getManagedLedgerDigestType()),
+                LedgerPassword,
+                (rc, handle, ctx) -> {
+                    if (rc != BKException.Code.OK) {
+                        future.completeExceptionally(bkException("Failed to create ledger", rc, -1));
+                    } else {
+                        future.complete(handle);
+                    }
+                }, null, null);

Review Comment:
   I add metadata using `bucketKey`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on a diff in pull request #17677: [feat][broker][PIP-195] Implement BookkeeperBucketSnapshotStorage - part3

Posted by GitBox <gi...@apache.org>.
coderzc commented on code in PR #17677:
URL: https://github.com/apache/pulsar/pull/17677#discussion_r1026228488


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java:
##########
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed.bucket;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import javax.validation.constraints.NotNull;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BookkeeperBucketSnapshotStorage implements BucketSnapshotStorage {
+
+    private static final Logger log = LoggerFactory.getLogger(BookkeeperBucketSnapshotStorage.class);
+    private static final byte[] LedgerPassword = "".getBytes();
+
+    private final PulsarService pulsar;
+    private final ServiceConfiguration config;
+    private BookKeeper bookKeeper;
+
+    public BookkeeperBucketSnapshotStorage(PulsarService pulsar) {
+        this.pulsar = pulsar;
+        this.config = pulsar.getConfig();
+    }
+
+    @Override
+    public CompletableFuture<Long> createBucketSnapshot(SnapshotMetadata snapshotMetadata,
+                                                        List<SnapshotSegment> bucketSnapshotSegments) {
+        return createLedger()
+                .thenCompose(ledgerHandle -> addEntry(ledgerHandle, snapshotMetadata.toByteArray())
+                                .thenApply(__ -> ledgerHandle))
+                .thenCompose(ledgerHandle -> addSnapshotSegments(ledgerHandle, bucketSnapshotSegments)
+                        .thenApply(__ -> ledgerHandle))
+                .thenCompose(ledgerHandle -> closeLedger(ledgerHandle)
+                        .thenApply(__ -> ledgerHandle.getId()));
+    }
+
+    @Override
+    public CompletableFuture<SnapshotMetadata> getBucketSnapshotMetadata(long bucketId) {
+        return openLedger(bucketId).thenCompose(
+                ledgerHandle -> getLedgerEntryThenCloseLedger(ledgerHandle, 0, 0).
+                        thenCompose(
+                        entryEnumeration -> parseSnapshotMetadataEntry(entryEnumeration.nextElement())));
+    }
+
+    @Override
+    public CompletableFuture<List<SnapshotSegment>> getBucketSnapshotSegment(long bucketId, long firstSegmentEntryId,
+                                                                             long lastSegmentEntryId) {
+        return openLedger(bucketId).thenCompose(
+                ledgerHandle -> getLedgerEntryThenCloseLedger(ledgerHandle, firstSegmentEntryId, lastSegmentEntryId).thenCompose(
+                        this::parseSnapshotSegmentEntries));
+    }
+
+    @Override
+    public CompletableFuture<Long> getBucketSnapshotLength(long bucketId) {
+        return openLedger(bucketId).thenApply(ledgerHandle -> {
+            long length = ledgerHandle.getLength();
+            closeLedger(ledgerHandle);
+            return length;
+        });
+    }
+
+    @Override
+    public CompletableFuture<Void> deleteBucketSnapshot(long bucketId) {
+        return deleteLedger(bucketId);
+    }
+
+    @Override
+    public void start() throws Exception {
+        pulsar.getBookKeeperClient();
+        this.bookKeeper = pulsar.getBookKeeperClientFactory().create(
+                pulsar.getConfiguration(),
+                pulsar.getLocalMetadataStore(),
+                pulsar.getIoEventLoopGroup(),
+                Optional.empty(),
+                null
+        );
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (bookKeeper != null) {
+            bookKeeper.close();
+        }
+    }
+
+    private CompletableFuture<Void> addSnapshotSegments(LedgerHandle ledgerHandle,
+                                                        List<SnapshotSegment> bucketSnapshotSegments) {
+        List<CompletableFuture<Void>> addFutures = new ArrayList<>();
+        for (SnapshotSegment bucketSnapshotSegment : bucketSnapshotSegments) {
+            addFutures.add(addEntry(ledgerHandle, bucketSnapshotSegment.toByteArray()));
+        }
+
+        return FutureUtil.waitForAll(addFutures);
+    }
+
+    private CompletableFuture<SnapshotMetadata> parseSnapshotMetadataEntry(LedgerEntry ledgerEntry) {
+        CompletableFuture<SnapshotMetadata> result = new CompletableFuture<>();
+        try {
+            result.complete(SnapshotMetadata.parseFrom(ledgerEntry.getEntry()));
+        } catch (IOException e) {
+            result.completeExceptionally(e);
+        }
+        return result;
+    }
+
+    private CompletableFuture<List<SnapshotSegment>> parseSnapshotSegmentEntries(
+            Enumeration<LedgerEntry> entryEnumeration) {
+        CompletableFuture<List<SnapshotSegment>> result = new CompletableFuture<>();
+        List<SnapshotSegment> snapshotMetadataList = new ArrayList<>();
+        try {
+            while (entryEnumeration.hasMoreElements()) {
+                LedgerEntry ledgerEntry = entryEnumeration.nextElement();
+                snapshotMetadataList.add(SnapshotSegment.parseFrom(ledgerEntry.getEntry()));
+            }
+            result.complete(snapshotMetadataList);
+        } catch (IOException e) {
+            result.completeExceptionally(e);
+        }
+        return result;
+    }
+
+    @NotNull
+    private CompletableFuture<LedgerHandle> createLedger() {
+        CompletableFuture<LedgerHandle> future = new CompletableFuture<>();
+        bookKeeper.asyncCreateLedger(
+                config.getManagedLedgerDefaultEnsembleSize(),
+                config.getManagedLedgerDefaultWriteQuorum(),
+                config.getManagedLedgerDefaultAckQuorum(),
+                BookKeeper.DigestType.fromApiDigestType(config.getManagedLedgerDigestType()),
+                LedgerPassword,
+                (rc, handle, ctx) -> {
+                    if (rc != BKException.Code.OK) {
+                        future.completeExceptionally(bkException("Failed to create ledger", rc, -1));
+                    } else {
+                        future.complete(handle);
+                    }
+                }, null, null);
+        return future;
+    }
+
+    private CompletableFuture<LedgerHandle> openLedger(Long ledgerId) {
+        final CompletableFuture<LedgerHandle> future = new CompletableFuture<>();
+        bookKeeper.asyncOpenLedger(
+                ledgerId,
+                BookKeeper.DigestType.fromApiDigestType(config.getManagedLedgerDigestType()),
+                LedgerPassword,
+                (rc, handle, ctx) -> {
+                    if (rc != BKException.Code.OK) {
+                        future.completeExceptionally(bkException("Failed to open ledger", rc, ledgerId));
+                    } else {
+                        future.complete(handle);
+                    }
+                }, null
+        );
+        return future;
+    }
+
+    private CompletableFuture<Void> closeLedger(LedgerHandle ledgerHandle) {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        ledgerHandle.asyncClose((rc, handle, ctx) -> {
+            if (rc != BKException.Code.OK) {
+                log.warn("Failed to close a Ledger Handle: {}", ledgerHandle.getId());
+                future.completeExceptionally(bkException("Failed to close ledger", rc, ledgerHandle.getId()));
+            } else {
+                future.complete(null);
+            }
+        }, null);
+        return future;
+    }
+
+    private CompletableFuture<Void> addEntry(LedgerHandle ledgerHandle, byte[] data) {
+        final CompletableFuture<Void> future = new CompletableFuture<>();
+        ledgerHandle.asyncAddEntry(data,
+                (rc, handle, entryId, ctx) -> {
+                    if (rc != BKException.Code.OK) {
+                        future.completeExceptionally(bkException("Failed to add entry", rc, ledgerHandle.getId()));
+                    } else {
+                        future.complete(null);
+                    }
+                }, null
+        );
+        return future;
+    }
+
+    CompletableFuture<Enumeration<LedgerEntry>> getLedgerEntryThenCloseLedger(LedgerHandle ledger,
+                                                                              long firstEntryId, long lastEntryId) {
+        final CompletableFuture<Enumeration<LedgerEntry>> future = new CompletableFuture<>();
+        ledger.asyncReadEntries(firstEntryId, lastEntryId,
+                (rc, handle, entries, ctx) -> {
+                    if (rc != BKException.Code.OK) {
+                        future.completeExceptionally(bkException("Failed to read entry", rc, ledger.getId()));
+                    } else {
+                        future.complete(entries);
+                    }
+                    closeLedger(handle);

Review Comment:
   This is a resource release, I think it can not block the process



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #17677: [feat][broker][PIP-195] Implement BookkeeperBucketSnapshotStorage - part3

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #17677:
URL: https://github.com/apache/pulsar/pull/17677#discussion_r1026234569


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java:
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed.bucket;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import javax.validation.constraints.NotNull;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BookkeeperBucketSnapshotStorage implements BucketSnapshotStorage {
+
+    private static final Logger log = LoggerFactory.getLogger(BookkeeperBucketSnapshotStorage.class);
+    private static final byte[] LedgerPassword = "".getBytes();
+
+    private final PulsarService pulsar;
+    private final ServiceConfiguration config;
+    private BookKeeper bookKeeper;
+
+    public BookkeeperBucketSnapshotStorage(PulsarService pulsar) {
+        this.pulsar = pulsar;
+        this.config = pulsar.getConfig();
+    }
+
+    @Override
+    public CompletableFuture<Long> createBucketSnapshot(SnapshotMetadata snapshotMetadata,
+                                                        List<SnapshotSegment> bucketSnapshotSegments) {
+        return createLedger()
+                .thenCompose(ledgerHandle -> addEntry(ledgerHandle, snapshotMetadata.toByteArray())
+                                .thenApply(__ -> ledgerHandle))
+                .thenCompose(ledgerHandle -> addSnapshotSegments(ledgerHandle, bucketSnapshotSegments)
+                        .thenApply(__ -> ledgerHandle))
+                .thenCompose(ledgerHandle -> closeLedger(ledgerHandle)
+                        .thenApply(__ -> ledgerHandle.getId()));
+    }
+
+    @Override
+    public CompletableFuture<SnapshotMetadata> getBucketSnapshotMetadata(long bucketId) {
+        return openLedger(bucketId).thenCompose(
+                ledgerHandle -> getLedgerEntry(ledgerHandle, 0, 0).
+                        thenCompose(
+                        entryEnumeration -> parseSnapshotMetadataEntry(entryEnumeration.nextElement())));
+    }
+
+    @Override
+    public CompletableFuture<List<SnapshotSegment>> getBucketSnapshotSegment(long bucketId, long firstSegmentEntryId,
+                                                                             long lastSegmentEntryId) {
+        return openLedger(bucketId).thenCompose(
+                ledgerHandle -> getLedgerEntry(ledgerHandle, firstSegmentEntryId, lastSegmentEntryId).thenCompose(
+                        this::parseSnapshotSegmentEntries));
+    }
+
+    @Override
+    public CompletableFuture<Long> getBucketSnapshotLength(long bucketId) {
+        return openLedger(bucketId).thenApply(ledgerHandle -> {
+            long length = ledgerHandle.getLength();
+            closeLedger(ledgerHandle);
+            return length;
+        });
+    }
+
+    @Override
+    public CompletableFuture<Void> deleteBucketSnapshot(long bucketId) {
+        return deleteLedger(bucketId);
+    }
+
+    @Override
+    public void start() throws Exception {
+        this.bookKeeper = pulsar.getBookKeeperClientFactory().create(

Review Comment:
   > If you want to use bookkeeper storage, you need a client. Maybe if we have more service needs for an internal bookie client, we can create only one internal bookie client and then share it with different components.
   
   +1
   
   Hi @mattisonchao 
   
   > Could you indicate which part is expensive? then we can complete the trade-off.
   
   A separate bookie client for the delay message bucket only.  see also https://github.com/apache/pulsar/pull/17677#discussion_r1025944991
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on a diff in pull request #17677: [feat][broker][PIP-195] Implement BookkeeperBucketSnapshotStorage - part3

Posted by GitBox <gi...@apache.org>.
coderzc commented on code in PR #17677:
URL: https://github.com/apache/pulsar/pull/17677#discussion_r1021057967


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java:
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed.bucket;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import javax.validation.constraints.NotNull;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BookkeeperBucketSnapshotStorage implements BucketSnapshotStorage {
+
+    private static final Logger log = LoggerFactory.getLogger(BookkeeperBucketSnapshotStorage.class);
+    private static final byte[] LedgerPassword = "".getBytes();
+
+    private final PulsarService pulsar;
+    private final ServiceConfiguration config;
+    private BookKeeper bookKeeper;
+
+    public BookkeeperBucketSnapshotStorage(PulsarService pulsar) {
+        this.pulsar = pulsar;
+        this.config = pulsar.getConfig();
+    }
+
+    @Override
+    public CompletableFuture<Long> createBucketSnapshot(SnapshotMetadata snapshotMetadata,

Review Comment:
   I think it can be improved by scanned and deleting orphaned ledgers.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on a diff in pull request #17677: [feat][broker][PIP-195] Implement BookkeeperBucketSnapshotStorage - part3

Posted by GitBox <gi...@apache.org>.
coderzc commented on code in PR #17677:
URL: https://github.com/apache/pulsar/pull/17677#discussion_r1051970510


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java:
##########
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed.bucket;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import javax.validation.constraints.NotNull;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
+import org.apache.pulsar.common.util.FutureUtil;
+
+@Slf4j
+public class BookkeeperBucketSnapshotStorage implements BucketSnapshotStorage {
+
+    private static final byte[] LedgerPassword = "".getBytes();
+
+    private final PulsarService pulsar;
+    private final ServiceConfiguration config;
+    private BookKeeper bookKeeper;
+
+    public BookkeeperBucketSnapshotStorage(PulsarService pulsar) {
+        this.pulsar = pulsar;
+        this.config = pulsar.getConfig();
+    }
+
+    @Override
+    public CompletableFuture<Long> createBucketSnapshot(SnapshotMetadata snapshotMetadata,
+                                                        List<SnapshotSegment> bucketSnapshotSegments,
+                                                        String bucketKey) {
+        return createLedger(bucketKey)
+                .thenCompose(ledgerHandle -> addEntry(ledgerHandle, snapshotMetadata.toByteArray())
+                        .thenCompose(__ -> addSnapshotSegments(ledgerHandle, bucketSnapshotSegments))
+                        .thenCompose(__ -> closeLedger(ledgerHandle))
+                        .thenApply(__ -> ledgerHandle.getId()).whenComplete((ledgerId, ex) -> {
+                            if (ex != null) {
+                                deleteLedger(ledgerId);
+                            }
+                        }));
+    }
+
+    @Override
+    public CompletableFuture<SnapshotMetadata> getBucketSnapshotMetadata(long bucketId) {
+        return openLedger(bucketId).thenCompose(
+                ledgerHandle -> getLedgerEntryThenCloseLedger(ledgerHandle, 0, 0).
+                        thenApply(entryEnumeration -> parseSnapshotMetadataEntry(entryEnumeration.nextElement())));
+    }
+
+    @Override
+    public CompletableFuture<List<SnapshotSegment>> getBucketSnapshotSegment(long bucketId, long firstSegmentEntryId,
+                                                                             long lastSegmentEntryId) {
+        return openLedger(bucketId).thenCompose(
+                ledgerHandle -> getLedgerEntryThenCloseLedger(ledgerHandle, firstSegmentEntryId,
+                        lastSegmentEntryId).thenApply(this::parseSnapshotSegmentEntries));
+    }
+
+    @Override
+    public CompletableFuture<Long> getBucketSnapshotLength(long bucketId) {
+        return openLedger(bucketId).thenApply(ledgerHandle -> {
+            long length = ledgerHandle.getLength();
+            closeLedger(ledgerHandle);
+            return length;
+        });
+    }
+
+    @Override
+    public CompletableFuture<Void> deleteBucketSnapshot(long bucketId) {
+        return deleteLedger(bucketId);
+    }
+
+    @Override
+    public void start() throws Exception {
+        pulsar.getBookKeeperClient();
+        this.bookKeeper = pulsar.getBookKeeperClientFactory().create(
+                pulsar.getConfiguration(),
+                pulsar.getLocalMetadataStore(),
+                pulsar.getIoEventLoopGroup(),
+                Optional.empty(),
+                null
+        );
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (bookKeeper != null) {
+            bookKeeper.close();
+        }
+    }
+
+    private CompletableFuture<Void> addSnapshotSegments(LedgerHandle ledgerHandle,
+                                                        List<SnapshotSegment> bucketSnapshotSegments) {
+        List<CompletableFuture<Void>> addFutures = new ArrayList<>();
+        for (SnapshotSegment bucketSnapshotSegment : bucketSnapshotSegments) {
+            addFutures.add(addEntry(ledgerHandle, bucketSnapshotSegment.toByteArray()));

Review Comment:
   `FutureUtil.waitForAll` will make sure they will not be partially successful?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on a diff in pull request #17677: [feat][broker][PIP-195] Implement BookkeeperBucketSnapshotStorage - part3

Posted by GitBox <gi...@apache.org>.
coderzc commented on code in PR #17677:
URL: https://github.com/apache/pulsar/pull/17677#discussion_r1051931696


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java:
##########
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed.bucket;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import javax.validation.constraints.NotNull;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
+import org.apache.pulsar.common.util.FutureUtil;
+
+@Slf4j
+public class BookkeeperBucketSnapshotStorage implements BucketSnapshotStorage {
+
+    private static final byte[] LedgerPassword = "".getBytes();
+
+    private final PulsarService pulsar;
+    private final ServiceConfiguration config;
+    private BookKeeper bookKeeper;
+
+    public BookkeeperBucketSnapshotStorage(PulsarService pulsar) {
+        this.pulsar = pulsar;
+        this.config = pulsar.getConfig();
+    }
+
+    @Override
+    public CompletableFuture<Long> createBucketSnapshot(SnapshotMetadata snapshotMetadata,
+                                                        List<SnapshotSegment> bucketSnapshotSegments,
+                                                        String bucketKey) {
+        return createLedger(bucketKey)
+                .thenCompose(ledgerHandle -> addEntry(ledgerHandle, snapshotMetadata.toByteArray())
+                        .thenCompose(__ -> addSnapshotSegments(ledgerHandle, bucketSnapshotSegments))
+                        .thenCompose(__ -> closeLedger(ledgerHandle))
+                        .thenApply(__ -> ledgerHandle.getId()).whenComplete((ledgerId, ex) -> {
+                            if (ex != null) {
+                                deleteLedger(ledgerId);

Review Comment:
   Ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui merged pull request #17677: [feat][broker][PIP-195] Implement BookkeeperBucketSnapshotStorage - part3

Posted by GitBox <gi...@apache.org>.
codelipenghui merged PR #17677:
URL: https://github.com/apache/pulsar/pull/17677


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #17677: [feat][broker][PIP-195] Implement BookkeeperBucketSnapshotStorage - part3

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #17677:
URL: https://github.com/apache/pulsar/pull/17677#discussion_r1025944991


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java:
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed.bucket;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import javax.validation.constraints.NotNull;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BookkeeperBucketSnapshotStorage implements BucketSnapshotStorage {
+
+    private static final Logger log = LoggerFactory.getLogger(BookkeeperBucketSnapshotStorage.class);
+    private static final byte[] LedgerPassword = "".getBytes();
+
+    private final PulsarService pulsar;
+    private final ServiceConfiguration config;
+    private BookKeeper bookKeeper;
+
+    public BookkeeperBucketSnapshotStorage(PulsarService pulsar) {
+        this.pulsar = pulsar;
+        this.config = pulsar.getConfig();
+    }
+
+    @Override
+    public CompletableFuture<Long> createBucketSnapshot(SnapshotMetadata snapshotMetadata,
+                                                        List<SnapshotSegment> bucketSnapshotSegments) {
+        return createLedger()
+                .thenCompose(ledgerHandle -> addEntry(ledgerHandle, snapshotMetadata.toByteArray())
+                                .thenApply(__ -> ledgerHandle))
+                .thenCompose(ledgerHandle -> addSnapshotSegments(ledgerHandle, bucketSnapshotSegments)
+                        .thenApply(__ -> ledgerHandle))
+                .thenCompose(ledgerHandle -> closeLedger(ledgerHandle)
+                        .thenApply(__ -> ledgerHandle.getId()));
+    }
+
+    @Override
+    public CompletableFuture<SnapshotMetadata> getBucketSnapshotMetadata(long bucketId) {
+        return openLedger(bucketId).thenCompose(
+                ledgerHandle -> getLedgerEntry(ledgerHandle, 0, 0).
+                        thenCompose(
+                        entryEnumeration -> parseSnapshotMetadataEntry(entryEnumeration.nextElement())));
+    }
+
+    @Override
+    public CompletableFuture<List<SnapshotSegment>> getBucketSnapshotSegment(long bucketId, long firstSegmentEntryId,
+                                                                             long lastSegmentEntryId) {
+        return openLedger(bucketId).thenCompose(
+                ledgerHandle -> getLedgerEntry(ledgerHandle, firstSegmentEntryId, lastSegmentEntryId).thenCompose(
+                        this::parseSnapshotSegmentEntries));
+    }
+
+    @Override
+    public CompletableFuture<Long> getBucketSnapshotLength(long bucketId) {
+        return openLedger(bucketId).thenApply(ledgerHandle -> {
+            long length = ledgerHandle.getLength();
+            closeLedger(ledgerHandle);
+            return length;
+        });
+    }
+
+    @Override
+    public CompletableFuture<Void> deleteBucketSnapshot(long bucketId) {
+        return deleteLedger(bucketId);
+    }
+
+    @Override
+    public void start() throws Exception {
+        this.bookKeeper = pulsar.getBookKeeperClientFactory().create(

Review Comment:
   BK client creates other threads, see: 
   
   https://github.com/apache/bookkeeper/blob/a3401a2139a554a8fbe52c8a398eeb4e51a8dad1/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java#L426-L435
   
   Maybe we can not fix the problem in this PR for now, and we can solve it all( `MangedLedger`, `Schema`,`Delay message` ) together by a new DISCUSS.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #17677: [feat][broker][PIP-195] Implement BookkeeperBucketSnapshotStorage - part3

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #17677:
URL: https://github.com/apache/pulsar/pull/17677#discussion_r1052184671


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java:
##########
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed.bucket;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import javax.validation.constraints.NotNull;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
+import org.apache.pulsar.common.util.FutureUtil;
+
+@Slf4j
+public class BookkeeperBucketSnapshotStorage implements BucketSnapshotStorage {
+
+    private static final byte[] LedgerPassword = "".getBytes();
+
+    private final PulsarService pulsar;
+    private final ServiceConfiguration config;
+    private BookKeeper bookKeeper;
+
+    public BookkeeperBucketSnapshotStorage(PulsarService pulsar) {
+        this.pulsar = pulsar;
+        this.config = pulsar.getConfig();
+    }
+
+    @Override
+    public CompletableFuture<Long> createBucketSnapshot(SnapshotMetadata snapshotMetadata,
+                                                        List<SnapshotSegment> bucketSnapshotSegments,
+                                                        String bucketKey) {
+        return createLedger(bucketKey)
+                .thenCompose(ledgerHandle -> addEntry(ledgerHandle, snapshotMetadata.toByteArray())
+                        .thenCompose(__ -> addSnapshotSegments(ledgerHandle, bucketSnapshotSegments))
+                        .thenCompose(__ -> closeLedger(ledgerHandle))
+                        .thenApply(__ -> ledgerHandle.getId()).whenComplete((ledgerId, ex) -> {
+                            if (ex != null) {
+                                deleteLedger(ledgerId);
+                            }
+                        }));
+    }
+
+    @Override
+    public CompletableFuture<SnapshotMetadata> getBucketSnapshotMetadata(long bucketId) {
+        return openLedger(bucketId).thenCompose(
+                ledgerHandle -> getLedgerEntryThenCloseLedger(ledgerHandle, 0, 0).
+                        thenApply(entryEnumeration -> parseSnapshotMetadataEntry(entryEnumeration.nextElement())));
+    }
+
+    @Override
+    public CompletableFuture<List<SnapshotSegment>> getBucketSnapshotSegment(long bucketId, long firstSegmentEntryId,
+                                                                             long lastSegmentEntryId) {
+        return openLedger(bucketId).thenCompose(
+                ledgerHandle -> getLedgerEntryThenCloseLedger(ledgerHandle, firstSegmentEntryId,
+                        lastSegmentEntryId).thenApply(this::parseSnapshotSegmentEntries));
+    }
+
+    @Override
+    public CompletableFuture<Long> getBucketSnapshotLength(long bucketId) {
+        return openLedger(bucketId).thenApply(ledgerHandle -> {
+            long length = ledgerHandle.getLength();
+            closeLedger(ledgerHandle);
+            return length;
+        });
+    }
+
+    @Override
+    public CompletableFuture<Void> deleteBucketSnapshot(long bucketId) {
+        return deleteLedger(bucketId);
+    }
+
+    @Override
+    public void start() throws Exception {
+        pulsar.getBookKeeperClient();
+        this.bookKeeper = pulsar.getBookKeeperClientFactory().create(
+                pulsar.getConfiguration(),
+                pulsar.getLocalMetadataStore(),
+                pulsar.getIoEventLoopGroup(),
+                Optional.empty(),
+                null
+        );
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (bookKeeper != null) {
+            bookKeeper.close();
+        }
+    }
+
+    private CompletableFuture<Void> addSnapshotSegments(LedgerHandle ledgerHandle,
+                                                        List<SnapshotSegment> bucketSnapshotSegments) {
+        List<CompletableFuture<Void>> addFutures = new ArrayList<>();
+        for (SnapshotSegment bucketSnapshotSegment : bucketSnapshotSegments) {
+            addFutures.add(addEntry(ledgerHandle, bucketSnapshotSegment.toByteArray()));

Review Comment:
   Yes, but some entry has already been added to this ledger.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codecov-commenter commented on pull request #17677: [feat][broker][PIP-195] Implement BookkeeperBucketSnapshotStorage - part3

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on PR #17677:
URL: https://github.com/apache/pulsar/pull/17677#issuecomment-1330077200

   # [Codecov](https://codecov.io/gh/apache/pulsar/pull/17677?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#17677](https://codecov.io/gh/apache/pulsar/pull/17677?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (3c9288c) into [master](https://codecov.io/gh/apache/pulsar/commit/a2c15340d3a73401b57574a4032ca375bde502bc?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (a2c1534) will **decrease** coverage by `10.11%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/pulsar/pull/17677/graphs/tree.svg?width=650&height=150&src=pr&token=acYqCpsK9J&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/pulsar/pull/17677?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@              Coverage Diff              @@
   ##             master   #17677       +/-   ##
   =============================================
   - Coverage     47.23%   37.11%   -10.12%     
   + Complexity    10430     1969     -8461     
   =============================================
     Files           692      209      -483     
     Lines         67766    14421    -53345     
     Branches       7260     1573     -5687     
   =============================================
   - Hits          32009     5353    -26656     
   + Misses        32192     8480    -23712     
   + Partials       3565      588     -2977     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | unittests | `37.11% <ø> (-10.12%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/pulsar/pull/17677?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...apache/pulsar/client/impl/AutoClusterFailover.java](https://codecov.io/gh/apache/pulsar/pull/17677/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2NsaWVudC9pbXBsL0F1dG9DbHVzdGVyRmFpbG92ZXIuamF2YQ==) | `70.00% <0.00%> (-6.12%)` | :arrow_down: |
   | [.../pulsar/client/impl/ControlledClusterFailover.java](https://codecov.io/gh/apache/pulsar/pull/17677/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2NsaWVudC9pbXBsL0NvbnRyb2xsZWRDbHVzdGVyRmFpbG92ZXIuamF2YQ==) | `58.87% <0.00%> (-3.74%)` | :arrow_down: |
   | [.../java/org/apache/pulsar/client/impl/ClientCnx.java](https://codecov.io/gh/apache/pulsar/pull/17677/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2NsaWVudC9pbXBsL0NsaWVudENueC5qYXZh) | `29.97% <0.00%> (-0.20%)` | :arrow_down: |
   | [...ar/client/impl/conf/ProducerConfigurationData.java](https://codecov.io/gh/apache/pulsar/pull/17677/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2NsaWVudC9pbXBsL2NvbmYvUHJvZHVjZXJDb25maWd1cmF0aW9uRGF0YS5qYXZh) | `84.70% <0.00%> (-0.18%)` | :arrow_down: |
   | [...va/org/apache/pulsar/client/impl/ConsumerBase.java](https://codecov.io/gh/apache/pulsar/pull/17677/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2NsaWVudC9pbXBsL0NvbnN1bWVyQmFzZS5qYXZh) | `21.93% <0.00%> (-0.02%)` | :arrow_down: |
   | [...a/org/apache/pulsar/client/impl/TableViewImpl.java](https://codecov.io/gh/apache/pulsar/pull/17677/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2NsaWVudC9pbXBsL1RhYmxlVmlld0ltcGwuamF2YQ==) | `0.00% <0.00%> (ø)` | |
   | [...pache/pulsar/client/impl/TableViewBuilderImpl.java](https://codecov.io/gh/apache/pulsar/pull/17677/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2NsaWVudC9pbXBsL1RhYmxlVmlld0J1aWxkZXJJbXBsLmphdmE=) | `0.00% <0.00%> (ø)` | |
   | [...r/client/impl/UnAckedMessageRedeliveryTracker.java](https://codecov.io/gh/apache/pulsar/pull/17677/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2NsaWVudC9pbXBsL1VuQWNrZWRNZXNzYWdlUmVkZWxpdmVyeVRyYWNrZXIuamF2YQ==) | `0.00% <0.00%> (ø)` | |
   | [...e/bookkeeper/mledger/impl/LedgerMetadataUtils.java](https://codecov.io/gh/apache/pulsar/pull/17677/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-bWFuYWdlZC1sZWRnZXIvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2Jvb2trZWVwZXIvbWxlZGdlci9pbXBsL0xlZGdlck1ldGFkYXRhVXRpbHMuamF2YQ==) | | |
   | [...rg/apache/pulsar/broker/delayed/bucket/Bucket.java](https://codecov.io/gh/apache/pulsar/pull/17677/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9kZWxheWVkL2J1Y2tldC9CdWNrZXQuamF2YQ==) | | |
   | ... and [498 more](https://codecov.io/gh/apache/pulsar/pull/17677/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #17677: [feat][broker][PIP-195] Implement BookkeeperBucketSnapshotStorage - part3

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #17677:
URL: https://github.com/apache/pulsar/pull/17677#discussion_r1025950548


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java:
##########
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed.bucket;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import javax.validation.constraints.NotNull;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BookkeeperBucketSnapshotStorage implements BucketSnapshotStorage {
+
+    private static final Logger log = LoggerFactory.getLogger(BookkeeperBucketSnapshotStorage.class);
+    private static final byte[] LedgerPassword = "".getBytes();
+
+    private final PulsarService pulsar;
+    private final ServiceConfiguration config;
+    private BookKeeper bookKeeper;
+
+    public BookkeeperBucketSnapshotStorage(PulsarService pulsar) {
+        this.pulsar = pulsar;
+        this.config = pulsar.getConfig();
+    }
+
+    @Override
+    public CompletableFuture<Long> createBucketSnapshot(SnapshotMetadata snapshotMetadata,
+                                                        List<SnapshotSegment> bucketSnapshotSegments) {
+        return createLedger()
+                .thenCompose(ledgerHandle -> addEntry(ledgerHandle, snapshotMetadata.toByteArray())
+                                .thenApply(__ -> ledgerHandle))
+                .thenCompose(ledgerHandle -> addSnapshotSegments(ledgerHandle, bucketSnapshotSegments)
+                        .thenApply(__ -> ledgerHandle))
+                .thenCompose(ledgerHandle -> closeLedger(ledgerHandle)
+                        .thenApply(__ -> ledgerHandle.getId()));
+    }
+
+    @Override
+    public CompletableFuture<SnapshotMetadata> getBucketSnapshotMetadata(long bucketId) {
+        return openLedger(bucketId).thenCompose(
+                ledgerHandle -> getLedgerEntryThenCloseLedger(ledgerHandle, 0, 0).
+                        thenCompose(
+                        entryEnumeration -> parseSnapshotMetadataEntry(entryEnumeration.nextElement())));
+    }
+
+    @Override
+    public CompletableFuture<List<SnapshotSegment>> getBucketSnapshotSegment(long bucketId, long firstSegmentEntryId,
+                                                                             long lastSegmentEntryId) {
+        return openLedger(bucketId).thenCompose(
+                ledgerHandle -> getLedgerEntryThenCloseLedger(ledgerHandle, firstSegmentEntryId, lastSegmentEntryId).thenCompose(
+                        this::parseSnapshotSegmentEntries));
+    }
+
+    @Override
+    public CompletableFuture<Long> getBucketSnapshotLength(long bucketId) {
+        return openLedger(bucketId).thenApply(ledgerHandle -> {
+            long length = ledgerHandle.getLength();
+            closeLedger(ledgerHandle);
+            return length;
+        });
+    }
+
+    @Override
+    public CompletableFuture<Void> deleteBucketSnapshot(long bucketId) {
+        return deleteLedger(bucketId);
+    }
+
+    @Override
+    public void start() throws Exception {
+        pulsar.getBookKeeperClient();
+        this.bookKeeper = pulsar.getBookKeeperClientFactory().create(
+                pulsar.getConfiguration(),
+                pulsar.getLocalMetadataStore(),
+                pulsar.getIoEventLoopGroup(),
+                Optional.empty(),
+                null
+        );
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (bookKeeper != null) {
+            bookKeeper.close();
+        }
+    }
+
+    private CompletableFuture<Void> addSnapshotSegments(LedgerHandle ledgerHandle,
+                                                        List<SnapshotSegment> bucketSnapshotSegments) {
+        List<CompletableFuture<Void>> addFutures = new ArrayList<>();
+        for (SnapshotSegment bucketSnapshotSegment : bucketSnapshotSegments) {
+            addFutures.add(addEntry(ledgerHandle, bucketSnapshotSegment.toByteArray()));
+        }
+
+        return FutureUtil.waitForAll(addFutures);
+    }
+
+    private CompletableFuture<SnapshotMetadata> parseSnapshotMetadataEntry(LedgerEntry ledgerEntry) {
+        CompletableFuture<SnapshotMetadata> result = new CompletableFuture<>();
+        try {
+            result.complete(SnapshotMetadata.parseFrom(ledgerEntry.getEntry()));

Review Comment:
   We can return this future directly to avoid create another `CompletableFuture`



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java:
##########
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed.bucket;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import javax.validation.constraints.NotNull;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BookkeeperBucketSnapshotStorage implements BucketSnapshotStorage {
+
+    private static final Logger log = LoggerFactory.getLogger(BookkeeperBucketSnapshotStorage.class);
+    private static final byte[] LedgerPassword = "".getBytes();
+
+    private final PulsarService pulsar;
+    private final ServiceConfiguration config;
+    private BookKeeper bookKeeper;
+
+    public BookkeeperBucketSnapshotStorage(PulsarService pulsar) {
+        this.pulsar = pulsar;
+        this.config = pulsar.getConfig();
+    }
+
+    @Override
+    public CompletableFuture<Long> createBucketSnapshot(SnapshotMetadata snapshotMetadata,
+                                                        List<SnapshotSegment> bucketSnapshotSegments) {
+        return createLedger()

Review Comment:
   I'm not sure if it would cause some problems when we created a lot of ledgers, but I think it would be OK for this 0-1 implementation.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java:
##########
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed.bucket;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import javax.validation.constraints.NotNull;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BookkeeperBucketSnapshotStorage implements BucketSnapshotStorage {
+
+    private static final Logger log = LoggerFactory.getLogger(BookkeeperBucketSnapshotStorage.class);

Review Comment:
   Can we use Lombok annotation?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java:
##########
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed.bucket;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import javax.validation.constraints.NotNull;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BookkeeperBucketSnapshotStorage implements BucketSnapshotStorage {
+
+    private static final Logger log = LoggerFactory.getLogger(BookkeeperBucketSnapshotStorage.class);
+    private static final byte[] LedgerPassword = "".getBytes();
+
+    private final PulsarService pulsar;
+    private final ServiceConfiguration config;
+    private BookKeeper bookKeeper;
+
+    public BookkeeperBucketSnapshotStorage(PulsarService pulsar) {
+        this.pulsar = pulsar;
+        this.config = pulsar.getConfig();
+    }
+
+    @Override
+    public CompletableFuture<Long> createBucketSnapshot(SnapshotMetadata snapshotMetadata,
+                                                        List<SnapshotSegment> bucketSnapshotSegments) {
+        return createLedger()
+                .thenCompose(ledgerHandle -> addEntry(ledgerHandle, snapshotMetadata.toByteArray())

Review Comment:
   This logic is weird.  why can't use logic like this?
   ```java
   createLedger()
           .thenCompose(ledgerHandle -> addEntry(ledgerHandle, snapshotMetadata.toByteArray())
               .thenCompose(__ -> addSnapshotSegments(ledgerHandle, bucketSnapshotSegments))
               .thenCompose(__ -> closeLedger(ledgerHandle))
               .thenApply(__ -> ledgerHandle.getId()));
   ```
   The advantage is we can avoid creating many `Stage`.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java:
##########
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed.bucket;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import javax.validation.constraints.NotNull;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BookkeeperBucketSnapshotStorage implements BucketSnapshotStorage {
+
+    private static final Logger log = LoggerFactory.getLogger(BookkeeperBucketSnapshotStorage.class);
+    private static final byte[] LedgerPassword = "".getBytes();
+
+    private final PulsarService pulsar;
+    private final ServiceConfiguration config;
+    private BookKeeper bookKeeper;
+
+    public BookkeeperBucketSnapshotStorage(PulsarService pulsar) {
+        this.pulsar = pulsar;
+        this.config = pulsar.getConfig();
+    }
+
+    @Override
+    public CompletableFuture<Long> createBucketSnapshot(SnapshotMetadata snapshotMetadata,
+                                                        List<SnapshotSegment> bucketSnapshotSegments) {
+        return createLedger()
+                .thenCompose(ledgerHandle -> addEntry(ledgerHandle, snapshotMetadata.toByteArray())
+                                .thenApply(__ -> ledgerHandle))
+                .thenCompose(ledgerHandle -> addSnapshotSegments(ledgerHandle, bucketSnapshotSegments)
+                        .thenApply(__ -> ledgerHandle))
+                .thenCompose(ledgerHandle -> closeLedger(ledgerHandle)
+                        .thenApply(__ -> ledgerHandle.getId()));
+    }
+
+    @Override
+    public CompletableFuture<SnapshotMetadata> getBucketSnapshotMetadata(long bucketId) {
+        return openLedger(bucketId).thenCompose(
+                ledgerHandle -> getLedgerEntryThenCloseLedger(ledgerHandle, 0, 0).
+                        thenCompose(
+                        entryEnumeration -> parseSnapshotMetadataEntry(entryEnumeration.nextElement())));
+    }
+
+    @Override
+    public CompletableFuture<List<SnapshotSegment>> getBucketSnapshotSegment(long bucketId, long firstSegmentEntryId,
+                                                                             long lastSegmentEntryId) {
+        return openLedger(bucketId).thenCompose(
+                ledgerHandle -> getLedgerEntryThenCloseLedger(ledgerHandle, firstSegmentEntryId, lastSegmentEntryId).thenCompose(
+                        this::parseSnapshotSegmentEntries));
+    }
+
+    @Override
+    public CompletableFuture<Long> getBucketSnapshotLength(long bucketId) {
+        return openLedger(bucketId).thenApply(ledgerHandle -> {
+            long length = ledgerHandle.getLength();
+            closeLedger(ledgerHandle);

Review Comment:
   We ignored the result or exception.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java:
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed.bucket;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import javax.validation.constraints.NotNull;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BookkeeperBucketSnapshotStorage implements BucketSnapshotStorage {
+
+    private static final Logger log = LoggerFactory.getLogger(BookkeeperBucketSnapshotStorage.class);
+    private static final byte[] LedgerPassword = "".getBytes();
+
+    private final PulsarService pulsar;
+    private final ServiceConfiguration config;
+    private BookKeeper bookKeeper;
+
+    public BookkeeperBucketSnapshotStorage(PulsarService pulsar) {
+        this.pulsar = pulsar;
+        this.config = pulsar.getConfig();
+    }
+
+    @Override
+    public CompletableFuture<Long> createBucketSnapshot(SnapshotMetadata snapshotMetadata,
+                                                        List<SnapshotSegment> bucketSnapshotSegments) {
+        return createLedger()
+                .thenCompose(ledgerHandle -> addEntry(ledgerHandle, snapshotMetadata.toByteArray())
+                                .thenApply(__ -> ledgerHandle))
+                .thenCompose(ledgerHandle -> addSnapshotSegments(ledgerHandle, bucketSnapshotSegments)
+                        .thenApply(__ -> ledgerHandle))
+                .thenCompose(ledgerHandle -> closeLedger(ledgerHandle)
+                        .thenApply(__ -> ledgerHandle.getId()));
+    }
+
+    @Override
+    public CompletableFuture<SnapshotMetadata> getBucketSnapshotMetadata(long bucketId) {
+        return openLedger(bucketId).thenCompose(
+                ledgerHandle -> getLedgerEntry(ledgerHandle, 0, 0).
+                        thenCompose(
+                        entryEnumeration -> parseSnapshotMetadataEntry(entryEnumeration.nextElement())));
+    }
+
+    @Override
+    public CompletableFuture<List<SnapshotSegment>> getBucketSnapshotSegment(long bucketId, long firstSegmentEntryId,
+                                                                             long lastSegmentEntryId) {
+        return openLedger(bucketId).thenCompose(
+                ledgerHandle -> getLedgerEntry(ledgerHandle, firstSegmentEntryId, lastSegmentEntryId).thenCompose(
+                        this::parseSnapshotSegmentEntries));
+    }
+
+    @Override
+    public CompletableFuture<Long> getBucketSnapshotLength(long bucketId) {
+        return openLedger(bucketId).thenApply(ledgerHandle -> {
+            long length = ledgerHandle.getLength();
+            closeLedger(ledgerHandle);
+            return length;
+        });
+    }
+
+    @Override
+    public CompletableFuture<Void> deleteBucketSnapshot(long bucketId) {
+        return deleteLedger(bucketId);
+    }
+
+    @Override
+    public void start() throws Exception {
+        this.bookKeeper = pulsar.getBookKeeperClientFactory().create(

Review Comment:
   >I still think it is too expensive to use a single bk-client for the delay message
   
   Hi @poorbarcode 
   Could you indicate which part is expensive? then we can complete the trade-off.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java:
##########
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed.bucket;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import javax.validation.constraints.NotNull;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BookkeeperBucketSnapshotStorage implements BucketSnapshotStorage {
+
+    private static final Logger log = LoggerFactory.getLogger(BookkeeperBucketSnapshotStorage.class);
+    private static final byte[] LedgerPassword = "".getBytes();
+
+    private final PulsarService pulsar;
+    private final ServiceConfiguration config;
+    private BookKeeper bookKeeper;
+
+    public BookkeeperBucketSnapshotStorage(PulsarService pulsar) {
+        this.pulsar = pulsar;
+        this.config = pulsar.getConfig();
+    }
+
+    @Override
+    public CompletableFuture<Long> createBucketSnapshot(SnapshotMetadata snapshotMetadata,
+                                                        List<SnapshotSegment> bucketSnapshotSegments) {
+        return createLedger()
+                .thenCompose(ledgerHandle -> addEntry(ledgerHandle, snapshotMetadata.toByteArray())
+                                .thenApply(__ -> ledgerHandle))
+                .thenCompose(ledgerHandle -> addSnapshotSegments(ledgerHandle, bucketSnapshotSegments)
+                        .thenApply(__ -> ledgerHandle))
+                .thenCompose(ledgerHandle -> closeLedger(ledgerHandle)
+                        .thenApply(__ -> ledgerHandle.getId()));
+    }
+
+    @Override
+    public CompletableFuture<SnapshotMetadata> getBucketSnapshotMetadata(long bucketId) {
+        return openLedger(bucketId).thenCompose(
+                ledgerHandle -> getLedgerEntryThenCloseLedger(ledgerHandle, 0, 0).
+                        thenCompose(
+                        entryEnumeration -> parseSnapshotMetadataEntry(entryEnumeration.nextElement())));
+    }
+
+    @Override
+    public CompletableFuture<List<SnapshotSegment>> getBucketSnapshotSegment(long bucketId, long firstSegmentEntryId,
+                                                                             long lastSegmentEntryId) {
+        return openLedger(bucketId).thenCompose(
+                ledgerHandle -> getLedgerEntryThenCloseLedger(ledgerHandle, firstSegmentEntryId, lastSegmentEntryId).thenCompose(
+                        this::parseSnapshotSegmentEntries));
+    }
+
+    @Override
+    public CompletableFuture<Long> getBucketSnapshotLength(long bucketId) {
+        return openLedger(bucketId).thenApply(ledgerHandle -> {
+            long length = ledgerHandle.getLength();
+            closeLedger(ledgerHandle);
+            return length;
+        });
+    }
+
+    @Override
+    public CompletableFuture<Void> deleteBucketSnapshot(long bucketId) {
+        return deleteLedger(bucketId);
+    }
+
+    @Override
+    public void start() throws Exception {
+        pulsar.getBookKeeperClient();
+        this.bookKeeper = pulsar.getBookKeeperClientFactory().create(
+                pulsar.getConfiguration(),
+                pulsar.getLocalMetadataStore(),
+                pulsar.getIoEventLoopGroup(),
+                Optional.empty(),
+                null
+        );
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (bookKeeper != null) {
+            bookKeeper.close();
+        }
+    }
+
+    private CompletableFuture<Void> addSnapshotSegments(LedgerHandle ledgerHandle,
+                                                        List<SnapshotSegment> bucketSnapshotSegments) {
+        List<CompletableFuture<Void>> addFutures = new ArrayList<>();
+        for (SnapshotSegment bucketSnapshotSegment : bucketSnapshotSegments) {
+            addFutures.add(addEntry(ledgerHandle, bucketSnapshotSegment.toByteArray()));
+        }
+
+        return FutureUtil.waitForAll(addFutures);
+    }
+
+    private CompletableFuture<SnapshotMetadata> parseSnapshotMetadataEntry(LedgerEntry ledgerEntry) {
+        CompletableFuture<SnapshotMetadata> result = new CompletableFuture<>();
+        try {
+            result.complete(SnapshotMetadata.parseFrom(ledgerEntry.getEntry()));
+        } catch (IOException e) {
+            result.completeExceptionally(e);
+        }
+        return result;
+    }
+
+    private CompletableFuture<List<SnapshotSegment>> parseSnapshotSegmentEntries(
+            Enumeration<LedgerEntry> entryEnumeration) {
+        CompletableFuture<List<SnapshotSegment>> result = new CompletableFuture<>();
+        List<SnapshotSegment> snapshotMetadataList = new ArrayList<>();
+        try {
+            while (entryEnumeration.hasMoreElements()) {
+                LedgerEntry ledgerEntry = entryEnumeration.nextElement();
+                snapshotMetadataList.add(SnapshotSegment.parseFrom(ledgerEntry.getEntry()));
+            }
+            result.complete(snapshotMetadataList);
+        } catch (IOException e) {
+            result.completeExceptionally(e);
+        }
+        return result;
+    }
+
+    @NotNull
+    private CompletableFuture<LedgerHandle> createLedger() {
+        CompletableFuture<LedgerHandle> future = new CompletableFuture<>();
+        bookKeeper.asyncCreateLedger(
+                config.getManagedLedgerDefaultEnsembleSize(),
+                config.getManagedLedgerDefaultWriteQuorum(),
+                config.getManagedLedgerDefaultAckQuorum(),
+                BookKeeper.DigestType.fromApiDigestType(config.getManagedLedgerDigestType()),
+                LedgerPassword,
+                (rc, handle, ctx) -> {
+                    if (rc != BKException.Code.OK) {
+                        future.completeExceptionally(bkException("Failed to create ledger", rc, -1));
+                    } else {
+                        future.complete(handle);
+                    }
+                }, null, null);
+        return future;
+    }
+
+    private CompletableFuture<LedgerHandle> openLedger(Long ledgerId) {
+        final CompletableFuture<LedgerHandle> future = new CompletableFuture<>();
+        bookKeeper.asyncOpenLedger(
+                ledgerId,
+                BookKeeper.DigestType.fromApiDigestType(config.getManagedLedgerDigestType()),
+                LedgerPassword,
+                (rc, handle, ctx) -> {
+                    if (rc != BKException.Code.OK) {
+                        future.completeExceptionally(bkException("Failed to open ledger", rc, ledgerId));
+                    } else {
+                        future.complete(handle);
+                    }
+                }, null
+        );
+        return future;
+    }
+
+    private CompletableFuture<Void> closeLedger(LedgerHandle ledgerHandle) {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        ledgerHandle.asyncClose((rc, handle, ctx) -> {
+            if (rc != BKException.Code.OK) {
+                log.warn("Failed to close a Ledger Handle: {}", ledgerHandle.getId());
+                future.completeExceptionally(bkException("Failed to close ledger", rc, ledgerHandle.getId()));
+            } else {
+                future.complete(null);
+            }
+        }, null);
+        return future;
+    }
+
+    private CompletableFuture<Void> addEntry(LedgerHandle ledgerHandle, byte[] data) {
+        final CompletableFuture<Void> future = new CompletableFuture<>();
+        ledgerHandle.asyncAddEntry(data,
+                (rc, handle, entryId, ctx) -> {
+                    if (rc != BKException.Code.OK) {
+                        future.completeExceptionally(bkException("Failed to add entry", rc, ledgerHandle.getId()));
+                    } else {
+                        future.complete(null);
+                    }
+                }, null
+        );
+        return future;
+    }
+
+    CompletableFuture<Enumeration<LedgerEntry>> getLedgerEntryThenCloseLedger(LedgerHandle ledger,
+                                                                              long firstEntryId, long lastEntryId) {
+        final CompletableFuture<Enumeration<LedgerEntry>> future = new CompletableFuture<>();
+        ledger.asyncReadEntries(firstEntryId, lastEntryId,
+                (rc, handle, entries, ctx) -> {
+                    if (rc != BKException.Code.OK) {
+                        future.completeExceptionally(bkException("Failed to read entry", rc, ledger.getId()));
+                    } else {
+                        future.complete(entries);
+                    }
+                    closeLedger(handle);

Review Comment:
   We ignored the result or exception.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java:
##########
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed.bucket;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import javax.validation.constraints.NotNull;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BookkeeperBucketSnapshotStorage implements BucketSnapshotStorage {
+
+    private static final Logger log = LoggerFactory.getLogger(BookkeeperBucketSnapshotStorage.class);
+    private static final byte[] LedgerPassword = "".getBytes();
+
+    private final PulsarService pulsar;
+    private final ServiceConfiguration config;
+    private BookKeeper bookKeeper;
+
+    public BookkeeperBucketSnapshotStorage(PulsarService pulsar) {
+        this.pulsar = pulsar;
+        this.config = pulsar.getConfig();
+    }
+
+    @Override
+    public CompletableFuture<Long> createBucketSnapshot(SnapshotMetadata snapshotMetadata,
+                                                        List<SnapshotSegment> bucketSnapshotSegments) {
+        return createLedger()
+                .thenCompose(ledgerHandle -> addEntry(ledgerHandle, snapshotMetadata.toByteArray())
+                                .thenApply(__ -> ledgerHandle))
+                .thenCompose(ledgerHandle -> addSnapshotSegments(ledgerHandle, bucketSnapshotSegments)
+                        .thenApply(__ -> ledgerHandle))
+                .thenCompose(ledgerHandle -> closeLedger(ledgerHandle)
+                        .thenApply(__ -> ledgerHandle.getId()));
+    }
+
+    @Override
+    public CompletableFuture<SnapshotMetadata> getBucketSnapshotMetadata(long bucketId) {
+        return openLedger(bucketId).thenCompose(
+                ledgerHandle -> getLedgerEntryThenCloseLedger(ledgerHandle, 0, 0).
+                        thenCompose(
+                        entryEnumeration -> parseSnapshotMetadataEntry(entryEnumeration.nextElement())));
+    }
+
+    @Override
+    public CompletableFuture<List<SnapshotSegment>> getBucketSnapshotSegment(long bucketId, long firstSegmentEntryId,
+                                                                             long lastSegmentEntryId) {
+        return openLedger(bucketId).thenCompose(
+                ledgerHandle -> getLedgerEntryThenCloseLedger(ledgerHandle, firstSegmentEntryId, lastSegmentEntryId).thenCompose(
+                        this::parseSnapshotSegmentEntries));
+    }
+
+    @Override
+    public CompletableFuture<Long> getBucketSnapshotLength(long bucketId) {
+        return openLedger(bucketId).thenApply(ledgerHandle -> {
+            long length = ledgerHandle.getLength();
+            closeLedger(ledgerHandle);
+            return length;
+        });
+    }
+
+    @Override
+    public CompletableFuture<Void> deleteBucketSnapshot(long bucketId) {
+        return deleteLedger(bucketId);
+    }
+
+    @Override
+    public void start() throws Exception {
+        pulsar.getBookKeeperClient();
+        this.bookKeeper = pulsar.getBookKeeperClientFactory().create(
+                pulsar.getConfiguration(),
+                pulsar.getLocalMetadataStore(),
+                pulsar.getIoEventLoopGroup(),
+                Optional.empty(),
+                null
+        );
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (bookKeeper != null) {
+            bookKeeper.close();
+        }
+    }
+
+    private CompletableFuture<Void> addSnapshotSegments(LedgerHandle ledgerHandle,
+                                                        List<SnapshotSegment> bucketSnapshotSegments) {
+        List<CompletableFuture<Void>> addFutures = new ArrayList<>();
+        for (SnapshotSegment bucketSnapshotSegment : bucketSnapshotSegments) {
+            addFutures.add(addEntry(ledgerHandle, bucketSnapshotSegment.toByteArray()));
+        }
+
+        return FutureUtil.waitForAll(addFutures);
+    }
+
+    private CompletableFuture<SnapshotMetadata> parseSnapshotMetadataEntry(LedgerEntry ledgerEntry) {
+        CompletableFuture<SnapshotMetadata> result = new CompletableFuture<>();
+        try {
+            result.complete(SnapshotMetadata.parseFrom(ledgerEntry.getEntry()));
+        } catch (IOException e) {
+            result.completeExceptionally(e);
+        }
+        return result;
+    }
+
+    private CompletableFuture<List<SnapshotSegment>> parseSnapshotSegmentEntries(

Review Comment:
   It looks like a sync method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on a diff in pull request #17677: [feat][broker][PIP-195] Implement BookkeeperBucketSnapshotStorage - part3

Posted by GitBox <gi...@apache.org>.
coderzc commented on code in PR #17677:
URL: https://github.com/apache/pulsar/pull/17677#discussion_r1026228745


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java:
##########
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed.bucket;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import javax.validation.constraints.NotNull;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BookkeeperBucketSnapshotStorage implements BucketSnapshotStorage {
+
+    private static final Logger log = LoggerFactory.getLogger(BookkeeperBucketSnapshotStorage.class);
+    private static final byte[] LedgerPassword = "".getBytes();
+
+    private final PulsarService pulsar;
+    private final ServiceConfiguration config;
+    private BookKeeper bookKeeper;
+
+    public BookkeeperBucketSnapshotStorage(PulsarService pulsar) {
+        this.pulsar = pulsar;
+        this.config = pulsar.getConfig();
+    }
+
+    @Override
+    public CompletableFuture<Long> createBucketSnapshot(SnapshotMetadata snapshotMetadata,
+                                                        List<SnapshotSegment> bucketSnapshotSegments) {
+        return createLedger()
+                .thenCompose(ledgerHandle -> addEntry(ledgerHandle, snapshotMetadata.toByteArray())
+                                .thenApply(__ -> ledgerHandle))
+                .thenCompose(ledgerHandle -> addSnapshotSegments(ledgerHandle, bucketSnapshotSegments)
+                        .thenApply(__ -> ledgerHandle))
+                .thenCompose(ledgerHandle -> closeLedger(ledgerHandle)
+                        .thenApply(__ -> ledgerHandle.getId()));
+    }
+
+    @Override
+    public CompletableFuture<SnapshotMetadata> getBucketSnapshotMetadata(long bucketId) {
+        return openLedger(bucketId).thenCompose(
+                ledgerHandle -> getLedgerEntryThenCloseLedger(ledgerHandle, 0, 0).
+                        thenCompose(
+                        entryEnumeration -> parseSnapshotMetadataEntry(entryEnumeration.nextElement())));
+    }
+
+    @Override
+    public CompletableFuture<List<SnapshotSegment>> getBucketSnapshotSegment(long bucketId, long firstSegmentEntryId,
+                                                                             long lastSegmentEntryId) {
+        return openLedger(bucketId).thenCompose(
+                ledgerHandle -> getLedgerEntryThenCloseLedger(ledgerHandle, firstSegmentEntryId, lastSegmentEntryId).thenCompose(
+                        this::parseSnapshotSegmentEntries));
+    }
+
+    @Override
+    public CompletableFuture<Long> getBucketSnapshotLength(long bucketId) {
+        return openLedger(bucketId).thenApply(ledgerHandle -> {
+            long length = ledgerHandle.getLength();
+            closeLedger(ledgerHandle);

Review Comment:
   Ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on a diff in pull request #17677: [feat][broker][PIP-195] Implement BookkeeperBucketSnapshotStorage - part3

Posted by GitBox <gi...@apache.org>.
coderzc commented on code in PR #17677:
URL: https://github.com/apache/pulsar/pull/17677#discussion_r1027594715


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java:
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed.bucket;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import javax.validation.constraints.NotNull;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
+import org.apache.pulsar.common.util.FutureUtil;
+
+@Slf4j
+public class BookkeeperBucketSnapshotStorage implements BucketSnapshotStorage {
+
+    private static final byte[] LedgerPassword = "".getBytes();
+
+    private final PulsarService pulsar;
+    private final ServiceConfiguration config;
+    private BookKeeper bookKeeper;
+
+    public BookkeeperBucketSnapshotStorage(PulsarService pulsar) {
+        this.pulsar = pulsar;
+        this.config = pulsar.getConfig();
+    }
+
+    @Override
+    public CompletableFuture<Long> createBucketSnapshot(SnapshotMetadata snapshotMetadata,
+                                                        List<SnapshotSegment> bucketSnapshotSegments,
+                                                        String bucketKey) {
+        return createLedger(bucketKey)
+                .thenCompose(ledgerHandle -> addEntry(ledgerHandle, snapshotMetadata.toByteArray())
+                        .thenCompose(__ -> addSnapshotSegments(ledgerHandle, bucketSnapshotSegments))
+                        .thenCompose(__ -> closeLedger(ledgerHandle))
+                        .thenApply(__ -> ledgerHandle.getId()));
+    }
+
+    @Override
+    public CompletableFuture<SnapshotMetadata> getBucketSnapshotMetadata(long bucketId) {
+        return openLedger(bucketId).thenCompose(
+                ledgerHandle -> getLedgerEntryThenCloseLedger(ledgerHandle, 0, 0).
+                        thenApply(entryEnumeration -> parseSnapshotMetadataEntry(entryEnumeration.nextElement())));
+    }
+
+    @Override
+    public CompletableFuture<List<SnapshotSegment>> getBucketSnapshotSegment(long bucketId, long firstSegmentEntryId,
+                                                                             long lastSegmentEntryId) {
+        return openLedger(bucketId).thenCompose(
+                ledgerHandle -> getLedgerEntryThenCloseLedger(ledgerHandle, firstSegmentEntryId,
+                        lastSegmentEntryId).thenApply(this::parseSnapshotSegmentEntries));
+    }
+
+    @Override
+    public CompletableFuture<Long> getBucketSnapshotLength(long bucketId) {
+        return openLedger(bucketId).thenApply(ledgerHandle -> {
+            long length = ledgerHandle.getLength();
+            closeLedger(ledgerHandle);
+            return length;
+        });
+    }
+
+    @Override
+    public CompletableFuture<Void> deleteBucketSnapshot(long bucketId) {
+        return deleteLedger(bucketId);
+    }
+
+    @Override
+    public void start() throws Exception {
+        pulsar.getBookKeeperClient();
+        this.bookKeeper = pulsar.getBookKeeperClientFactory().create(
+                pulsar.getConfiguration(),
+                pulsar.getLocalMetadataStore(),
+                pulsar.getIoEventLoopGroup(),
+                Optional.empty(),
+                null
+        );
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (bookKeeper != null) {
+            bookKeeper.close();
+        }
+    }
+
+    private CompletableFuture<Void> addSnapshotSegments(LedgerHandle ledgerHandle,
+                                                        List<SnapshotSegment> bucketSnapshotSegments) {
+        List<CompletableFuture<Void>> addFutures = new ArrayList<>();
+        for (SnapshotSegment bucketSnapshotSegment : bucketSnapshotSegments) {
+            addFutures.add(addEntry(ledgerHandle, bucketSnapshotSegment.toByteArray()));
+        }
+
+        return FutureUtil.waitForAll(addFutures);
+    }
+
+    private SnapshotMetadata parseSnapshotMetadataEntry(LedgerEntry ledgerEntry) {
+        try {
+            return SnapshotMetadata.parseFrom(ledgerEntry.getEntry());
+        } catch (InvalidProtocolBufferException e) {
+            throw new RuntimeException(e);

Review Comment:
   ```suggestion
               throw new BucketSnapshotException(e);
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java:
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed.bucket;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import javax.validation.constraints.NotNull;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
+import org.apache.pulsar.common.util.FutureUtil;
+
+@Slf4j
+public class BookkeeperBucketSnapshotStorage implements BucketSnapshotStorage {
+
+    private static final byte[] LedgerPassword = "".getBytes();
+
+    private final PulsarService pulsar;
+    private final ServiceConfiguration config;
+    private BookKeeper bookKeeper;
+
+    public BookkeeperBucketSnapshotStorage(PulsarService pulsar) {
+        this.pulsar = pulsar;
+        this.config = pulsar.getConfig();
+    }
+
+    @Override
+    public CompletableFuture<Long> createBucketSnapshot(SnapshotMetadata snapshotMetadata,
+                                                        List<SnapshotSegment> bucketSnapshotSegments,
+                                                        String bucketKey) {
+        return createLedger(bucketKey)
+                .thenCompose(ledgerHandle -> addEntry(ledgerHandle, snapshotMetadata.toByteArray())
+                        .thenCompose(__ -> addSnapshotSegments(ledgerHandle, bucketSnapshotSegments))
+                        .thenCompose(__ -> closeLedger(ledgerHandle))
+                        .thenApply(__ -> ledgerHandle.getId()));
+    }
+
+    @Override
+    public CompletableFuture<SnapshotMetadata> getBucketSnapshotMetadata(long bucketId) {
+        return openLedger(bucketId).thenCompose(
+                ledgerHandle -> getLedgerEntryThenCloseLedger(ledgerHandle, 0, 0).
+                        thenApply(entryEnumeration -> parseSnapshotMetadataEntry(entryEnumeration.nextElement())));
+    }
+
+    @Override
+    public CompletableFuture<List<SnapshotSegment>> getBucketSnapshotSegment(long bucketId, long firstSegmentEntryId,
+                                                                             long lastSegmentEntryId) {
+        return openLedger(bucketId).thenCompose(
+                ledgerHandle -> getLedgerEntryThenCloseLedger(ledgerHandle, firstSegmentEntryId,
+                        lastSegmentEntryId).thenApply(this::parseSnapshotSegmentEntries));
+    }
+
+    @Override
+    public CompletableFuture<Long> getBucketSnapshotLength(long bucketId) {
+        return openLedger(bucketId).thenApply(ledgerHandle -> {
+            long length = ledgerHandle.getLength();
+            closeLedger(ledgerHandle);
+            return length;
+        });
+    }
+
+    @Override
+    public CompletableFuture<Void> deleteBucketSnapshot(long bucketId) {
+        return deleteLedger(bucketId);
+    }
+
+    @Override
+    public void start() throws Exception {
+        pulsar.getBookKeeperClient();
+        this.bookKeeper = pulsar.getBookKeeperClientFactory().create(
+                pulsar.getConfiguration(),
+                pulsar.getLocalMetadataStore(),
+                pulsar.getIoEventLoopGroup(),
+                Optional.empty(),
+                null
+        );
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (bookKeeper != null) {
+            bookKeeper.close();
+        }
+    }
+
+    private CompletableFuture<Void> addSnapshotSegments(LedgerHandle ledgerHandle,
+                                                        List<SnapshotSegment> bucketSnapshotSegments) {
+        List<CompletableFuture<Void>> addFutures = new ArrayList<>();
+        for (SnapshotSegment bucketSnapshotSegment : bucketSnapshotSegments) {
+            addFutures.add(addEntry(ledgerHandle, bucketSnapshotSegment.toByteArray()));
+        }
+
+        return FutureUtil.waitForAll(addFutures);
+    }
+
+    private SnapshotMetadata parseSnapshotMetadataEntry(LedgerEntry ledgerEntry) {
+        try {
+            return SnapshotMetadata.parseFrom(ledgerEntry.getEntry());
+        } catch (InvalidProtocolBufferException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private List<SnapshotSegment> parseSnapshotSegmentEntries(Enumeration<LedgerEntry> entryEnumeration) {
+        List<SnapshotSegment> snapshotMetadataList = new ArrayList<>();
+        try {
+            while (entryEnumeration.hasMoreElements()) {
+                LedgerEntry ledgerEntry = entryEnumeration.nextElement();
+                snapshotMetadataList.add(SnapshotSegment.parseFrom(ledgerEntry.getEntry()));
+            }
+            return snapshotMetadataList;
+        } catch (IOException e) {
+            throw new RuntimeException(e);

Review Comment:
   ```suggestion
               throw new BucketSnapshotException(e);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #17677: [feat][broker][PIP-195] Implement BookkeeperBucketSnapshotStorage - part3

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #17677:
URL: https://github.com/apache/pulsar/pull/17677#discussion_r1019978133


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java:
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed.bucket;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import javax.validation.constraints.NotNull;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BookkeeperBucketSnapshotStorage implements BucketSnapshotStorage {
+
+    private static final Logger log = LoggerFactory.getLogger(BookkeeperBucketSnapshotStorage.class);
+    private static final byte[] LedgerPassword = "".getBytes();
+
+    private final PulsarService pulsar;
+    private final ServiceConfiguration config;
+    private BookKeeper bookKeeper;
+
+    public BookkeeperBucketSnapshotStorage(PulsarService pulsar) {
+        this.pulsar = pulsar;
+        this.config = pulsar.getConfig();
+    }
+
+    @Override
+    public CompletableFuture<Long> createBucketSnapshot(SnapshotMetadata snapshotMetadata,
+                                                        List<SnapshotSegment> bucketSnapshotSegments) {
+        return createLedger()
+                .thenCompose(ledgerHandle -> addEntry(ledgerHandle, snapshotMetadata.toByteArray())
+                                .thenApply(__ -> ledgerHandle))
+                .thenCompose(ledgerHandle -> addSnapshotSegments(ledgerHandle, bucketSnapshotSegments)
+                        .thenApply(__ -> ledgerHandle))
+                .thenCompose(ledgerHandle -> closeLedger(ledgerHandle)
+                        .thenApply(__ -> ledgerHandle.getId()));
+    }
+
+    @Override
+    public CompletableFuture<SnapshotMetadata> getBucketSnapshotMetadata(long bucketId) {
+        return openLedger(bucketId).thenCompose(
+                ledgerHandle -> getLedgerEntry(ledgerHandle, 0, 0).
+                        thenCompose(
+                        entryEnumeration -> parseSnapshotMetadataEntry(entryEnumeration.nextElement())));
+    }
+
+    @Override
+    public CompletableFuture<List<SnapshotSegment>> getBucketSnapshotSegment(long bucketId, long firstSegmentEntryId,
+                                                                             long lastSegmentEntryId) {
+        return openLedger(bucketId).thenCompose(
+                ledgerHandle -> getLedgerEntry(ledgerHandle, firstSegmentEntryId, lastSegmentEntryId).thenCompose(
+                        this::parseSnapshotSegmentEntries));
+    }
+
+    @Override
+    public CompletableFuture<Long> getBucketSnapshotLength(long bucketId) {
+        return openLedger(bucketId).thenApply(ledgerHandle -> {
+            long length = ledgerHandle.getLength();
+            closeLedger(ledgerHandle);
+            return length;
+        });
+    }
+
+    @Override
+    public CompletableFuture<Void> deleteBucketSnapshot(long bucketId) {
+        return deleteLedger(bucketId);
+    }
+
+    @Override
+    public void start() throws Exception {
+        this.bookKeeper = pulsar.getBookKeeperClientFactory().create(

Review Comment:
   Do we really need a separate BK client?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java:
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed.bucket;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import javax.validation.constraints.NotNull;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BookkeeperBucketSnapshotStorage implements BucketSnapshotStorage {
+
+    private static final Logger log = LoggerFactory.getLogger(BookkeeperBucketSnapshotStorage.class);
+    private static final byte[] LedgerPassword = "".getBytes();
+
+    private final PulsarService pulsar;
+    private final ServiceConfiguration config;
+    private BookKeeper bookKeeper;
+
+    public BookkeeperBucketSnapshotStorage(PulsarService pulsar) {
+        this.pulsar = pulsar;
+        this.config = pulsar.getConfig();
+    }
+
+    @Override
+    public CompletableFuture<Long> createBucketSnapshot(SnapshotMetadata snapshotMetadata,

Review Comment:
   If the ledger was created successfully, but cursor properties persistence failed. What should we do with these orphaned ledgers?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java:
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed.bucket;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import javax.validation.constraints.NotNull;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata;
+import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BookkeeperBucketSnapshotStorage implements BucketSnapshotStorage {
+
+    private static final Logger log = LoggerFactory.getLogger(BookkeeperBucketSnapshotStorage.class);
+    private static final byte[] LedgerPassword = "".getBytes();
+
+    private final PulsarService pulsar;
+    private final ServiceConfiguration config;
+    private BookKeeper bookKeeper;
+
+    public BookkeeperBucketSnapshotStorage(PulsarService pulsar) {
+        this.pulsar = pulsar;
+        this.config = pulsar.getConfig();
+    }
+
+    @Override
+    public CompletableFuture<Long> createBucketSnapshot(SnapshotMetadata snapshotMetadata,
+                                                        List<SnapshotSegment> bucketSnapshotSegments) {
+        return createLedger()
+                .thenCompose(ledgerHandle -> addEntry(ledgerHandle, snapshotMetadata.toByteArray())
+                                .thenApply(__ -> ledgerHandle))
+                .thenCompose(ledgerHandle -> addSnapshotSegments(ledgerHandle, bucketSnapshotSegments)
+                        .thenApply(__ -> ledgerHandle))
+                .thenCompose(ledgerHandle -> closeLedger(ledgerHandle)
+                        .thenApply(__ -> ledgerHandle.getId()));
+    }
+
+    @Override
+    public CompletableFuture<SnapshotMetadata> getBucketSnapshotMetadata(long bucketId) {
+        return openLedger(bucketId).thenCompose(

Review Comment:
   Should we close this `LedgerHandle`?
   
   Same for methods `getBucketSnapshotSegment` and `getBucketSnapshotLength`



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketSnapshotException.java:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.delayed.bucket;
+
+import org.apache.pulsar.broker.service.BrokerServiceException;
+
+public class BucketSnapshotException extends BrokerServiceException {

Review Comment:
   Suggest making `BucketSnapshotException` extend `PersistenceException` instead of `BrokerServiceException `.  Because if you add a new exception,  later exception handling needs to be adapted as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org