You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/10/24 00:31:49 UTC

[GitHub] [pulsar] liangyepianzhou opened a new pull request, #16931: [Improve][Txn][PIP-196]Segmented transaction buffer snapshot segment and index system topic

liangyepianzhou opened a new pull request, #16931:
URL: https://github.com/apache/pulsar/pull/16931

   Master Issue: https://github.com/apache/pulsar/issues/16913
   ### Motivation
   Implement system topic client for snapshot segment topic and index topic to send segment snapshots or indexes.
   The configuration `transactionBufferSegmentedSnapshotEnabled` is used in the Transaction Buffer to determine which `AbortedTxnProcessor` is adopted by this TB.
   ### Modification
   Add a TransactionBufferSnapshotService to handle the system topic service:
   <img width="988" alt="image" src="https://user-images.githubusercontent.com/55571188/191040279-a647b07a-c084-4e2a-8814-090e52486ca4.png"> 
   The abstraction for transaction system topic service:
   <img width="895" alt="image" src="https://user-images.githubusercontent.com/55571188/191043197-9a76672a-6701-44af-94d8-662a15433ba9.png">
   The abstraction for transaction system topic client:
   <img width="778" alt="image" src="https://user-images.githubusercontent.com/55571188/191041224-eeaf318e-fe16-4f89-82ba-831976c073e3.png">
   
   ### Verifying this change
   
   - [x] Make sure that the change passes the CI checks.
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
     - *Added integration tests for end-to-end deployment with large payloads (10MB)*
     - *Extended integration test for recovery after broker failure*
   
   ### Does this pull request potentially affect one of the following parts:
   
   *If `yes` was chosen, please highlight the changes*
   
     - Dependencies (does it add or upgrade a dependency): (yes / no)
     - The public API: (yes / no)
     - The schema: (yes / no / don't know)
     - The default values of configurations: (yes / no)
     - The wire protocol: (yes / no)
     - The rest endpoints: (yes / no)
     - The admin cli options: (yes / no)
     - Anything that affects deployment: (yes / no / don't know)
   
   ### Documentation
   
   Check the box below or label this PR directly.
   
   Need to update docs? 
   
   - [ ] `doc-required` 
   (Your PR needs to update docs and you will update later)
     
   - [x] `doc-not-needed` 
   (Please explain why)
     
   - [ ] `doc` 
   (Your PR contains doc changes)
   
   - [ ] `doc-complete`
   (Docs have been already added)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #16931: [Improve][Txn][PIP-196]Segmented transaction buffer snapshot segment and index system topic

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #16931:
URL: https://github.com/apache/pulsar/pull/16931#discussion_r999507081


##########
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TxnID.java:
##########
@@ -39,14 +43,14 @@ public class TxnID implements Serializable {
      *
      * @serial
      */
-    private final long mostSigBits;
+    private long mostSigBits;

Review Comment:
   +1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] congbobo184 commented on pull request #16931: [feat][broker] Segmented transaction buffer snapshot segment and index system topic

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on PR #16931:
URL: https://github.com/apache/pulsar/pull/16931#issuecomment-1288435798

   please modify the PR description for the new commit, then can be merged.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #16931: [Improve][Txn][PIP-196]Segmented transaction buffer snapshot segment and index system topic

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #16931:
URL: https://github.com/apache/pulsar/pull/16931#discussion_r1000839228


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java:
##########
@@ -420,7 +420,29 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {
         });
     }
 
+    @Override
+    public void asyncOpenReadOnlyManagedLedger(String managedLedgerName,

Review Comment:
   Can we write this method like this( Remove the new class `AsyncCallbacks .OpenReadOnlyManagedLedgerCallback` )?
   
   ```
   @Override
   public CompletableFuture<ReadOnlyManagedLedgerImpl> asyncOpenReadOnlyManagedLedger(String managedLedgerName,
                             ManagedLedgerConfig config, Object ctx) {
       if (closed) {
           return FutureUtil.failedFuture(new ManagedLedgerException.ManagedLedgerFactoryClosedException());
       }
       ReadOnlyManagedLedgerImpl roManagedLedger = new ReadOnlyManagedLedgerImpl(this,
               bookkeeperFactory
                       .get(new EnsemblePlacementPolicyConfig(config.getBookKeeperEnsemblePlacementPolicyClassName(),
                               config.getBookKeeperEnsemblePlacementPolicyProperties())),
               store, config, scheduledExecutor, managedLedgerName);
       return roManagedLedger.initialize().thenApply(ignore -> roManagedLedger);
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #16931: [Improve][Txn][PIP-196]Segmented transaction buffer snapshot segment and index system topic

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #16931:
URL: https://github.com/apache/pulsar/pull/16931#discussion_r999249164


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TransactionBufferSnapshotBaseSystemTopicClient.java:
##########
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.systopic;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.service.SystemTopicTxnBufferSnapshotService;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.naming.TopicName;
+
+@Slf4j
+public class  TransactionBufferSnapshotBaseSystemTopicClient<T> extends SystemTopicClientBase<T> {
+
+    protected final SystemTopicTxnBufferSnapshotService<T> systemTopicTxnBufferSnapshotService;
+    protected final Class<T> schemaType;
+
+    public TransactionBufferSnapshotBaseSystemTopicClient(PulsarClient client,
+                                                          TopicName topicName,
+                                                          SystemTopicTxnBufferSnapshotService<T>
+                                                                  systemTopicTxnBufferSnapshotService,
+                                                          Class<T> schemaType) {
+        super(client, topicName);
+        this.systemTopicTxnBufferSnapshotService = systemTopicTxnBufferSnapshotService;
+        this.schemaType = schemaType;
+    }
+
+    protected void removeWriter(Writer<T> writer) {
+        writers.remove(writer);
+        this.systemTopicTxnBufferSnapshotService.removeClient(topicName, this);
+    }
+
+    protected void removeReader(Reader<T> reader) {
+        readers.remove(reader);
+        this.systemTopicTxnBufferSnapshotService.removeClient(topicName, this);
+    }
+
+    protected static class TransactionBufferSnapshotWriter<T> implements Writer<T> {
+
+        protected final Producer<T> producer;
+        protected final TransactionBufferSnapshotBaseSystemTopicClient<T>
+                transactionBufferSnapshotBaseSystemTopicClient;
+
+        protected TransactionBufferSnapshotWriter(Producer<T> producer,
+                                                  TransactionBufferSnapshotBaseSystemTopicClient<T>
+                                                    transactionBufferSnapshotBaseSystemTopicClient) {
+            this.producer = producer;
+            this.transactionBufferSnapshotBaseSystemTopicClient = transactionBufferSnapshotBaseSystemTopicClient;
+        }
+
+        @Override
+        public MessageId write(T t, String key)
+                throws PulsarClientException {
+            return producer.newMessage().key(key)
+                    .value(t).send();
+        }
+
+        @Override
+        public CompletableFuture<MessageId> writeAsync(T t, String key) {
+            return producer.newMessage()
+                    .key(key)
+                    .value(t).sendAsync();
+        }
+
+        @Override
+        public MessageId delete(T t, String key)
+                throws PulsarClientException {
+            return producer.newMessage()
+                    .key(key)
+                    .value(null)
+                    .send();
+        }
+
+        @Override
+        public CompletableFuture<MessageId> deleteAsync(T t, String key) {
+            return producer.newMessage()
+                    .key(key)
+                    .value(null)
+                    .sendAsync();
+        }
+
+        @Override
+        public void close() throws IOException {
+            this.producer.close();
+            transactionBufferSnapshotBaseSystemTopicClient.removeWriter(this);

Review Comment:
   This code `transactionBufferSnapshotBaseSystemTopicClient.removeWriter(this); ` has already been executed once in `closeAsync`. should we remove this line?
   
   Same for `TransactionBufferSnapshotReader`



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicTxnBufferSnapshotService.java:
##########
@@ -23,64 +23,63 @@
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
 import org.apache.pulsar.broker.systopic.SystemTopicClient;
-import org.apache.pulsar.broker.systopic.SystemTopicClient.Reader;
-import org.apache.pulsar.broker.systopic.SystemTopicClient.Writer;
-import org.apache.pulsar.broker.systopic.TransactionBufferSystemTopicClient;
-import org.apache.pulsar.broker.transaction.buffer.matadata.TransactionBufferSnapshot;
+import org.apache.pulsar.broker.systopic.SystemTopicClientBase;
 import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException.InvalidTopicNameException;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.events.EventType;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.FutureUtil;
 
-public class SystemTopicBaseTxnBufferSnapshotService implements TransactionBufferSnapshotService {
+public class SystemTopicTxnBufferSnapshotService<T> {
 
-    private final Map<TopicName, SystemTopicClient<TransactionBufferSnapshot>> clients;
+    protected final Map<TopicName, SystemTopicClient<T>> clients;
+    protected final NamespaceEventsSystemTopicFactory namespaceEventsSystemTopicFactory;
 
-    private final NamespaceEventsSystemTopicFactory namespaceEventsSystemTopicFactory;
+    protected final Class<T> schemaType;
+    protected final EventType systemTopicType;
 
-    public SystemTopicBaseTxnBufferSnapshotService(PulsarClient client) {
+    public SystemTopicTxnBufferSnapshotService(PulsarClient client, EventType systemTopicType,
+                                               Class<T> schemaType) {
         this.namespaceEventsSystemTopicFactory = new NamespaceEventsSystemTopicFactory(client);
+        this.systemTopicType = systemTopicType;
+        this.schemaType = schemaType;
         this.clients = new ConcurrentHashMap<>();
     }
 
-    @Override
-    public CompletableFuture<Writer<TransactionBufferSnapshot>> createWriter(TopicName topicName) {
+    public CompletableFuture<SystemTopicClient.Writer<T>> createWriter(TopicName topicName) {
         return getTransactionBufferSystemTopicClient(topicName).thenCompose(SystemTopicClient::newWriterAsync);
     }
 
-    private CompletableFuture<SystemTopicClient<TransactionBufferSnapshot>> getTransactionBufferSystemTopicClient(
-            TopicName topicName) {
-        TopicName systemTopicName = NamespaceEventsSystemTopicFactory
-                .getSystemTopicName(topicName.getNamespaceObject(), EventType.TRANSACTION_BUFFER_SNAPSHOT);
-        if (systemTopicName == null) {
-            return FutureUtil.failedFuture(
-                    new InvalidTopicNameException("Can't create SystemTopicBaseTxnBufferSnapshotService, "
-                            + "because the topicName is null!"));
-        }
-        return CompletableFuture.completedFuture(clients.computeIfAbsent(systemTopicName,
-                (v) -> namespaceEventsSystemTopicFactory
-                        .createTransactionBufferSystemTopicClient(topicName.getNamespaceObject(), this)));
-    }
-
-    @Override
-    public CompletableFuture<Reader<TransactionBufferSnapshot>> createReader(TopicName topicName) {
+    public CompletableFuture<SystemTopicClient.Reader<T>> createReader(TopicName topicName) {
         return getTransactionBufferSystemTopicClient(topicName).thenCompose(SystemTopicClient::newReaderAsync);
     }
 
-    @Override
-    public void removeClient(TopicName topicName,
-                                          TransactionBufferSystemTopicClient transactionBufferSystemTopicClient) {
+    public void removeClient(TopicName topicName, SystemTopicClientBase<T> transactionBufferSystemTopicClient) {
         if (transactionBufferSystemTopicClient.getReaders().size() == 0
                 && transactionBufferSystemTopicClient.getWriters().size() == 0) {
             clients.remove(topicName);

Review Comment:
   There are many System topics under each namespace. I think these resources should be released( `gc(SystemTopicClient)` ) on the namespace deleted event. Now such an implementation will have a race condition: 
   
   - close the last `writer`
   - registry a new `writer`
   
   | close the last `writer` | registry a new `writer` | 
   |---|---|
   | check `transactionBufferSystemTopicClient.getWriters().size() == 0` : `true` |  |
   |  | registry new one |
   | remove(topic) |  |



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #16931: [Improve][Txn][PIP-196]Segmented transaction buffer snapshot segment and index system topic

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #16931:
URL: https://github.com/apache/pulsar/pull/16931#discussion_r1001430459


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TransactionBufferSnapshotBaseSystemTopicClient.java:
##########
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.systopic;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.service.SystemTopicTxnBufferSnapshotService;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.naming.TopicName;
+
+@Slf4j
+public class  TransactionBufferSnapshotBaseSystemTopicClient<T> extends SystemTopicClientBase<T> {
+
+    protected final SystemTopicTxnBufferSnapshotService<T> systemTopicTxnBufferSnapshotService;
+    protected final Class<T> schemaType;
+
+    public TransactionBufferSnapshotBaseSystemTopicClient(PulsarClient client,
+                                                          TopicName topicName,
+                                                          SystemTopicTxnBufferSnapshotService<T>
+                                                                  systemTopicTxnBufferSnapshotService,
+                                                          Class<T> schemaType) {
+        super(client, topicName);
+        this.systemTopicTxnBufferSnapshotService = systemTopicTxnBufferSnapshotService;
+        this.schemaType = schemaType;
+    }
+
+    protected void removeWriter(Writer<T> writer) {
+        writers.remove(writer);
+        this.systemTopicTxnBufferSnapshotService.removeClient(topicName, this);
+    }
+
+    protected void removeReader(Reader<T> reader) {
+        readers.remove(reader);
+        this.systemTopicTxnBufferSnapshotService.removeClient(topicName, this);
+    }
+
+    protected static class TransactionBufferSnapshotWriter<T> implements Writer<T> {
+
+        protected final Producer<T> producer;
+        protected final TransactionBufferSnapshotBaseSystemTopicClient<T>
+                transactionBufferSnapshotBaseSystemTopicClient;
+
+        protected TransactionBufferSnapshotWriter(Producer<T> producer,
+                                                  TransactionBufferSnapshotBaseSystemTopicClient<T>
+                                                    transactionBufferSnapshotBaseSystemTopicClient) {
+            this.producer = producer;
+            this.transactionBufferSnapshotBaseSystemTopicClient = transactionBufferSnapshotBaseSystemTopicClient;
+        }
+
+        @Override
+        public MessageId write(String key, T t)
+                throws PulsarClientException {
+            return producer.newMessage().key(key)
+                    .value(t).send();
+        }
+
+        @Override
+        public CompletableFuture<MessageId> writeAsync(String key, T t) {
+            return producer.newMessage()
+                    .key(key)
+                    .value(t).sendAsync();
+        }
+
+        @Override
+        public MessageId delete(String key, T t)
+                throws PulsarClientException {
+            return producer.newMessage()
+                    .key(key)
+                    .value(null)
+                    .send();
+        }
+
+        @Override
+        public CompletableFuture<MessageId> deleteAsync(String key, T t) {
+            return producer.newMessage()
+                    .key(key)
+                    .value(null)
+                    .sendAsync();
+        }
+
+        @Override
+        public void close() throws IOException {
+            this.closeAsync().join();
+        }
+
+        @Override
+        public CompletableFuture<Void> closeAsync() {
+            CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+            producer.closeAsync().whenComplete((v, e) -> {
+                // if close fail, also need remove the producer
+                transactionBufferSnapshotBaseSystemTopicClient.removeWriter(this);
+                if (e != null) {
+                    completableFuture.completeExceptionally(e);
+                    return;
+                }
+                completableFuture.complete(null);
+            });
+            return completableFuture;
+        }
+
+        @Override
+        public SystemTopicClient<T> getSystemTopicClient() {
+            return transactionBufferSnapshotBaseSystemTopicClient;
+        }
+    }
+
+    protected static class TransactionBufferSnapshotReader<T> implements Reader<T> {
+
+        private final org.apache.pulsar.client.api.Reader<T> reader;
+        private final TransactionBufferSnapshotBaseSystemTopicClient<T> transactionBufferSnapshotBaseSystemTopicClient;
+
+        protected TransactionBufferSnapshotReader(
+                org.apache.pulsar.client.api.Reader<T> reader,
+                TransactionBufferSnapshotBaseSystemTopicClient<T> transactionBufferSnapshotBaseSystemTopicClient) {
+            this.reader = reader;
+            this.transactionBufferSnapshotBaseSystemTopicClient = transactionBufferSnapshotBaseSystemTopicClient;
+        }
+
+        @Override
+        public Message<T> readNext() throws PulsarClientException {
+            return reader.readNext();
+        }
+
+        @Override
+        public CompletableFuture<Message<T>> readNextAsync() {
+            return reader.readNextAsync();
+        }
+
+        @Override
+        public boolean hasMoreEvents() throws PulsarClientException {
+            return reader.hasMessageAvailable();
+        }
+
+        @Override
+        public CompletableFuture<Boolean> hasMoreEventsAsync() {
+            return reader.hasMessageAvailableAsync();
+        }
+
+        @Override
+        public void close() throws IOException {

Review Comment:
   Should we use `this.closeAsync().join` instead of `this.reader.closeAsync().join()` ?
   
   Same as this comment: https://github.com/apache/pulsar/pull/16931#discussion_r1000817696



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] liangyepianzhou commented on a diff in pull request #16931: [Improve][Txn][PIP-196]Segmented transaction buffer snapshot segment and index system topic

Posted by GitBox <gi...@apache.org>.
liangyepianzhou commented on code in PR #16931:
URL: https://github.com/apache/pulsar/pull/16931#discussion_r1001306990


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java:
##########
@@ -420,7 +420,29 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {
         });
     }
 
+    @Override
+    public void asyncOpenReadOnlyManagedLedger(String managedLedgerName,

Review Comment:
   @codelipenghui Use public CompletableFuture<ReadOnlyManagedLedgerImpl> asyncOpenReadOnlyManagedLedgercan avoid defining a new inner class.
   But other asynchronous methods of ManagedLedgerFactoryImpl.java all use callback, whether it is necessary for us to keep it consistent.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] congbobo184 commented on a diff in pull request #16931: [Improve][Txn][PIP-196]Segmented transaction buffer snapshot segment and index system topic

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on code in PR #16931:
URL: https://github.com/apache/pulsar/pull/16931#discussion_r974860026


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java:
##########
@@ -418,7 +418,33 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {
         });
     }
 
+    @Override
+    public void asyncOpenReadOnlyManagedLedger(String managedLedgerName, ManagedLedgerConfig config,
+                                               OpenLedgerCallback callback, Object ctx) {
+        if (closed) {
+            callback.openLedgerFailed(new ManagedLedgerException.ManagedLedgerFactoryClosedException(), ctx);
+            return;
+        }
+        ReadOnlyManagedLedgerImpl roManagedLedger = new ReadOnlyManagedLedgerImpl(this,
+                bookkeeperFactory
+                        .get(new EnsemblePlacementPolicyConfig(config.getBookKeeperEnsemblePlacementPolicyClassName(),
+                                config.getBookKeeperEnsemblePlacementPolicyProperties())),
+                store, config, scheduledExecutor, managedLedgerName);
+
+        roManagedLedger.initialize(new ManagedLedgerInitializeLedgerCallback() {
+            @Override
+            public void initializeComplete() {
+                log.info("[{}] Successfully initialize Read-only managed ledger", managedLedgerName);
+                callback.openLedgerComplete(roManagedLedger, ctx);
+            }
 
+            @Override
+            public void initializeFailed(ManagedLedgerException e) {
+                log.error("[{}] Failed to initialize Read-only managed ledger", managedLedgerName, e);
+                callback.openLedgerFailed(e, ctx);
+            }
+        }, ctx);
+    }

Review Comment:
   should use `org.apache.bookkeeper.mledger.impl.ReadOnlyManagedLedgerImpl.initializeAndCreateCursor`, ReadOnlyManagedLedger initialize is different from ManagedLedger



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ReadOnlyCursor.java:
##########
@@ -62,6 +62,22 @@ void asyncReadEntries(int numberOfEntriesToRead, ReadEntriesCallback callback,
     void asyncReadEntries(int numberOfEntriesToRead, long maxSizeBytes, ReadEntriesCallback callback,
                           Object ctx, PositionImpl maxPosition);
 
+    /**
+     * Asynchronously read entry by position from the ManagedLedger.
+     * @param position the position of the entry to read.
+     * @param callback callback  object
+     * @param ctx opaque context
+     */
+    void asyncReadEntry(PositionImpl position, AsyncCallbacks.ReadEntryCallback callback, Object ctx);

Review Comment:
   don't add the method into the cursor, this makes the definition of cursor ambiguous. only use managedLedger `asyncReadEntry` is enough



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java:
##########
@@ -819,7 +819,11 @@ public void start() throws PulsarServerException {
 
             // Register pulsar system namespaces and start transaction meta store service
             if (config.isTransactionCoordinatorEnabled()) {
-                this.transactionBufferSnapshotService = new SystemTopicBaseTxnBufferSnapshotService(getClient());
+//                if (transactionBufferSegmentedSnapshotEnabled)
+                this.transactionBufferSnapshotService = new TransactionBufferSnapshotSegmentServiceImpl(getClient());
+//                  else
+                this.transactionBufferSnapshotService = new TransactionBufferSnapshotServiceImpl(getClient());

Review Comment:
   Why assign the value twice?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBaseTxnBufferSnapshotBaseService.java:
##########
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.broker.systopic.SystemTopicClient;
+import org.apache.pulsar.broker.systopic.SystemTopicClientBase;
+import org.apache.pulsar.common.naming.TopicName;
+
+public abstract class  SystemTopicBaseTxnBufferSnapshotBaseService<T> implements

Review Comment:
   ```suggestion
   public abstract class  SystemTopicBaseTxnBufferSnapshotService<T> implements
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TransactionBufferSnapshotIndexSystemTopicClient.java:
##########
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.broker.systopic;
+
+import java.util.concurrent.CompletableFuture;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.service.SystemTopicTxnBufferSnapshotService;
+import org.apache.pulsar.broker.transaction.buffer.matadata.v2.TransactionBufferSnapshotIndexes;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.naming.TopicName;
+
+@Slf4j
+public class TransactionBufferSnapshotIndexSystemTopicClient extends
+        TransactionBufferSnapshotBaseSystemTopicClient<TransactionBufferSnapshotIndexes> {
+
+    private final SystemTopicTxnBufferSnapshotService<TransactionBufferSnapshotIndexes>
+            transactionBufferSnapshotIndexService;
+
+    public TransactionBufferSnapshotIndexSystemTopicClient(PulsarClient client, TopicName topicName,
+           SystemTopicTxnBufferSnapshotService<TransactionBufferSnapshotIndexes>
+                   transactionBufferSnapshotIndexService) {
+        super(client, topicName, transactionBufferSnapshotIndexService);
+        this.transactionBufferSnapshotIndexService = transactionBufferSnapshotIndexService;
+    }
+
+    @Override
+    protected CompletableFuture<Writer<TransactionBufferSnapshotIndexes>> newWriterAsyncInternal() {
+        return client.newProducer(Schema.AVRO(TransactionBufferSnapshotIndexes.class))
+                .topic(topicName.toString())
+                .createAsync().thenCompose(producer -> {
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] A new transactionBufferSnapshot writer is created", topicName);
+                    }
+                    return CompletableFuture.completedFuture(
+                            new TransactionBufferSnapshotIndexWriter(producer, this));
+                });
+    }
+
+    @Override
+    protected CompletableFuture<Reader<TransactionBufferSnapshotIndexes>> newReaderAsyncInternal() {
+        return client.newReader(Schema.AVRO(TransactionBufferSnapshotIndexes.class))
+                .topic(topicName.toString())
+                .startMessageId(MessageId.earliest)
+                .readCompacted(true)
+                .createAsync()
+                .thenCompose(reader -> {
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] A new transactionBufferSnapshot buffer reader is created", topicName);
+                    }
+                    return CompletableFuture.completedFuture(
+                            new TransactionBufferSnapshotIndexReader(reader, this));
+                });
+    }
+
+    private static class TransactionBufferSnapshotIndexWriter extends
+            TransactionBufferSnapshotBaseWriter<TransactionBufferSnapshotIndexes> {
+
+
+        private TransactionBufferSnapshotIndexWriter(Producer<TransactionBufferSnapshotIndexes> producer,
+                                                     TransactionBufferSnapshotIndexSystemTopicClient
+                                                             transactionBufferSnapshotIndexSystemTopicClient) {
+            super(producer, transactionBufferSnapshotIndexSystemTopicClient);
+        }
+
+        @Override
+        public MessageId write(TransactionBufferSnapshotIndexes transactionBufferSnapshotIndexes)
+                throws PulsarClientException {
+            return producer.newMessage().key(transactionBufferSnapshotIndexes.getTopicName())
+                    .value(transactionBufferSnapshotIndexes).send();
+        }
+
+        @Override
+        public CompletableFuture<MessageId> writeAsync(TransactionBufferSnapshotIndexes
+                                                                       transactionBufferSnapshotIndexes) {
+            return producer.newMessage()

Review Comment:
   also can use abstract method



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TransactionBufferSnapshotIndexSystemTopicClient.java:
##########
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.broker.systopic;
+
+import java.util.concurrent.CompletableFuture;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.service.SystemTopicTxnBufferSnapshotService;
+import org.apache.pulsar.broker.transaction.buffer.matadata.v2.TransactionBufferSnapshotIndexes;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.naming.TopicName;
+
+@Slf4j
+public class TransactionBufferSnapshotIndexSystemTopicClient extends
+        TransactionBufferSnapshotBaseSystemTopicClient<TransactionBufferSnapshotIndexes> {
+
+    private final SystemTopicTxnBufferSnapshotService<TransactionBufferSnapshotIndexes>
+            transactionBufferSnapshotIndexService;
+
+    public TransactionBufferSnapshotIndexSystemTopicClient(PulsarClient client, TopicName topicName,
+           SystemTopicTxnBufferSnapshotService<TransactionBufferSnapshotIndexes>
+                   transactionBufferSnapshotIndexService) {
+        super(client, topicName, transactionBufferSnapshotIndexService);
+        this.transactionBufferSnapshotIndexService = transactionBufferSnapshotIndexService;
+    }
+
+    @Override
+    protected CompletableFuture<Writer<TransactionBufferSnapshotIndexes>> newWriterAsyncInternal() {
+        return client.newProducer(Schema.AVRO(TransactionBufferSnapshotIndexes.class))
+                .topic(topicName.toString())
+                .createAsync().thenCompose(producer -> {
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] A new transactionBufferSnapshot writer is created", topicName);
+                    }
+                    return CompletableFuture.completedFuture(
+                            new TransactionBufferSnapshotIndexWriter(producer, this));
+                });
+    }
+
+    @Override
+    protected CompletableFuture<Reader<TransactionBufferSnapshotIndexes>> newReaderAsyncInternal() {
+        return client.newReader(Schema.AVRO(TransactionBufferSnapshotIndexes.class))
+                .topic(topicName.toString())
+                .startMessageId(MessageId.earliest)
+                .readCompacted(true)
+                .createAsync()
+                .thenCompose(reader -> {
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] A new transactionBufferSnapshot buffer reader is created", topicName);
+                    }
+                    return CompletableFuture.completedFuture(
+                            new TransactionBufferSnapshotIndexReader(reader, this));
+                });
+    }
+
+    private static class TransactionBufferSnapshotIndexWriter extends
+            TransactionBufferSnapshotBaseWriter<TransactionBufferSnapshotIndexes> {
+
+
+        private TransactionBufferSnapshotIndexWriter(Producer<TransactionBufferSnapshotIndexes> producer,
+                                                     TransactionBufferSnapshotIndexSystemTopicClient
+                                                             transactionBufferSnapshotIndexSystemTopicClient) {
+            super(producer, transactionBufferSnapshotIndexSystemTopicClient);
+        }
+
+        @Override
+        public MessageId write(TransactionBufferSnapshotIndexes transactionBufferSnapshotIndexes)
+                throws PulsarClientException {
+            return producer.newMessage().key(transactionBufferSnapshotIndexes.getTopicName())
+                    .value(transactionBufferSnapshotIndexes).send();
+        }
+
+        @Override
+        public CompletableFuture<MessageId> writeAsync(TransactionBufferSnapshotIndexes
+                                                                       transactionBufferSnapshotIndexes) {
+            return producer.newMessage()
+                    .key(transactionBufferSnapshotIndexes.getTopicName())
+                    .value(transactionBufferSnapshotIndexes).sendAsync();
+        }
+
+        @Override
+        public MessageId delete(TransactionBufferSnapshotIndexes transactionBufferSnapshotIndexes)
+                throws PulsarClientException {
+            return producer.newMessage()
+                    .key(transactionBufferSnapshotIndexes.getTopicName())
+                    .value(null)
+                    .send();
+        }
+
+        @Override
+        public CompletableFuture<MessageId> deleteAsync(TransactionBufferSnapshotIndexes
+                                                                        transactionBufferSnapshotIndexes) {
+            return producer.newMessage()

Review Comment:
   delete also can use super.deleteAsync() right?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBaseTxnBufferSnapshotService.java:
##########
@@ -18,38 +18,29 @@
  */
 package org.apache.pulsar.broker.service;
 
-import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
 import org.apache.pulsar.broker.systopic.SystemTopicClient;
-import org.apache.pulsar.broker.systopic.SystemTopicClient.Reader;
-import org.apache.pulsar.broker.systopic.SystemTopicClient.Writer;
-import org.apache.pulsar.broker.systopic.TransactionBufferSystemTopicClient;
 import org.apache.pulsar.broker.transaction.buffer.matadata.TransactionBufferSnapshot;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException.InvalidTopicNameException;
 import org.apache.pulsar.common.events.EventType;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.FutureUtil;
 
-public class SystemTopicBaseTxnBufferSnapshotService implements TransactionBufferSnapshotService {
-
-    private final Map<TopicName, SystemTopicClient<TransactionBufferSnapshot>> clients;
+public class SystemTopicBaseTxnBufferSnapshotService
+        extends SystemTopicBaseTxnBufferSnapshotBaseService<TransactionBufferSnapshot> {
 
     private final NamespaceEventsSystemTopicFactory namespaceEventsSystemTopicFactory;
 
     public SystemTopicBaseTxnBufferSnapshotService(PulsarClient client) {
+        super(new ConcurrentHashMap<>());

Review Comment:
   ```suggestion
           super();
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBaseTxnBufferSnapshotSegmentService.java:
##########
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
+import org.apache.pulsar.broker.systopic.SystemTopicClient;
+import org.apache.pulsar.broker.transaction.buffer.matadata.v2.TransactionBufferSnapshotIndexes;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.common.events.EventType;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.FutureUtil;
+
+public class SystemTopicBaseTxnBufferSnapshotSegmentService extends
+        SystemTopicBaseTxnBufferSnapshotBaseService<TransactionBufferSnapshotIndexes.TransactionBufferSnapshot> {
+    private final NamespaceEventsSystemTopicFactory namespaceEventsSystemTopicFactory;
+
+    public SystemTopicBaseTxnBufferSnapshotSegmentService(PulsarClient client) {
+        super(new ConcurrentHashMap<>());

Review Comment:
   ```suggestion
           super();
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TransactionBufferSnapshotSegmentSystemTopicClient.java:
##########
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.systopic;
+
+import java.util.concurrent.CompletableFuture;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.service.BrokerServiceException;
+import org.apache.pulsar.broker.service.SystemTopicTxnBufferSnapshotService;
+import org.apache.pulsar.broker.transaction.buffer.matadata.v2.TransactionBufferSnapshotIndexes;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.FutureUtil;
+
+@Slf4j
+public class TransactionBufferSnapshotSegmentSystemTopicClient extends
+        TransactionBufferSnapshotBaseSystemTopicClient<TransactionBufferSnapshotIndexes.TransactionBufferSnapshot> {
+
+    public <T> TransactionBufferSnapshotSegmentSystemTopicClient(PulsarClient client, TopicName topicName,
+              SystemTopicTxnBufferSnapshotService<TransactionBufferSnapshotIndexes.TransactionBufferSnapshot>
+                                                                     transactionBufferSnapshotSegmentService) {
+        super(client, topicName, transactionBufferSnapshotSegmentService);
+    }
+
+    @Override
+    protected CompletableFuture<Writer<TransactionBufferSnapshotIndexes.TransactionBufferSnapshot>>
+    newWriterAsyncInternal() {
+        return client.newProducer(Schema.AVRO(TransactionBufferSnapshotIndexes.TransactionBufferSnapshot.class))
+                .topic(topicName.toString())
+                .createAsync().thenCompose(producer -> {
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] A new transactionBufferSnapshot segment writer is created", topicName);
+                    }
+                    return CompletableFuture.completedFuture(
+                            new TransactionBufferSnapshotSegmentWriter(producer, this));
+                });
+    }
+
+    @Override
+    protected CompletableFuture<Reader<TransactionBufferSnapshotIndexes.TransactionBufferSnapshot>>
+    newReaderAsyncInternal() {
+        return FutureUtil.failedFuture(new BrokerServiceException
+                .NotAllowedException("Do not allow to get reader for segment topic reader"));
+    }
+
+    protected static String buildKey(TransactionBufferSnapshotIndexes.TransactionBufferSnapshot snapshot) {
+        return "multiple-" + snapshot.getSequenceId() + "-" + snapshot.getTopicName();

Review Comment:
   constant final



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransactionBufferSnapshotSegmentServiceImpl.java:
##########
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.broker.service;
+
+import org.apache.pulsar.client.api.PulsarClient;
+
+public class TransactionBufferSnapshotSegmentServiceImpl implements TransactionBufferSnapshotService {

Review Comment:
   it's better to use only one implement



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] liangyepianzhou commented on a diff in pull request #16931: [Improve][Txn][PIP-196]Segmented transaction buffer snapshot segment and index system topic

Posted by GitBox <gi...@apache.org>.
liangyepianzhou commented on code in PR #16931:
URL: https://github.com/apache/pulsar/pull/16931#discussion_r998196225


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java:
##########
@@ -117,6 +118,15 @@ ReadOnlyCursor openReadOnlyCursor(String managedLedgerName, Position startPositi
     void asyncOpenReadOnlyCursor(String managedLedgerName, Position startPosition, ManagedLedgerConfig config,
             OpenReadOnlyCursorCallback callback, Object ctx);
 
+    /**
+     * Asynchronous open a Read-only managedLedger.
+     * @param managedLedgerName the unique name that identifies the managed ledger
+     * @param config the managed ledegr configuratiion.
+     * @param ctx opaque context
+     */
+    CompletableFuture<ReadOnlyManagedLedgerImpl> asyncOpenReadOnlyManagedLedger(String managedLedgerName,

Review Comment:
   But if we use `asyncOpenReadOnlyManagedLedger`, we can abstract this part of the code and make it asynchronous called by other methods.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #16931: [Improve][Txn][PIP-196]Segmented transaction buffer snapshot segment and index system topic

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #16931:
URL: https://github.com/apache/pulsar/pull/16931#discussion_r999245551


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicTxnBufferSnapshotService.java:
##########
@@ -23,64 +23,63 @@
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
 import org.apache.pulsar.broker.systopic.SystemTopicClient;
-import org.apache.pulsar.broker.systopic.SystemTopicClient.Reader;
-import org.apache.pulsar.broker.systopic.SystemTopicClient.Writer;
-import org.apache.pulsar.broker.systopic.TransactionBufferSystemTopicClient;
-import org.apache.pulsar.broker.transaction.buffer.matadata.TransactionBufferSnapshot;
+import org.apache.pulsar.broker.systopic.SystemTopicClientBase;
 import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException.InvalidTopicNameException;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.events.EventType;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.FutureUtil;
 
-public class SystemTopicBaseTxnBufferSnapshotService implements TransactionBufferSnapshotService {
+public class SystemTopicTxnBufferSnapshotService<T> {
 
-    private final Map<TopicName, SystemTopicClient<TransactionBufferSnapshot>> clients;
+    protected final Map<TopicName, SystemTopicClient<T>> clients;
+    protected final NamespaceEventsSystemTopicFactory namespaceEventsSystemTopicFactory;
 
-    private final NamespaceEventsSystemTopicFactory namespaceEventsSystemTopicFactory;
+    protected final Class<T> schemaType;
+    protected final EventType systemTopicType;
 
-    public SystemTopicBaseTxnBufferSnapshotService(PulsarClient client) {
+    public SystemTopicTxnBufferSnapshotService(PulsarClient client, EventType systemTopicType,
+                                               Class<T> schemaType) {
         this.namespaceEventsSystemTopicFactory = new NamespaceEventsSystemTopicFactory(client);
+        this.systemTopicType = systemTopicType;
+        this.schemaType = schemaType;
         this.clients = new ConcurrentHashMap<>();
     }
 
-    @Override
-    public CompletableFuture<Writer<TransactionBufferSnapshot>> createWriter(TopicName topicName) {
+    public CompletableFuture<SystemTopicClient.Writer<T>> createWriter(TopicName topicName) {
         return getTransactionBufferSystemTopicClient(topicName).thenCompose(SystemTopicClient::newWriterAsync);
     }
 
-    private CompletableFuture<SystemTopicClient<TransactionBufferSnapshot>> getTransactionBufferSystemTopicClient(
-            TopicName topicName) {
-        TopicName systemTopicName = NamespaceEventsSystemTopicFactory
-                .getSystemTopicName(topicName.getNamespaceObject(), EventType.TRANSACTION_BUFFER_SNAPSHOT);
-        if (systemTopicName == null) {
-            return FutureUtil.failedFuture(
-                    new InvalidTopicNameException("Can't create SystemTopicBaseTxnBufferSnapshotService, "
-                            + "because the topicName is null!"));
-        }
-        return CompletableFuture.completedFuture(clients.computeIfAbsent(systemTopicName,
-                (v) -> namespaceEventsSystemTopicFactory
-                        .createTransactionBufferSystemTopicClient(topicName.getNamespaceObject(), this)));
-    }
-
-    @Override
-    public CompletableFuture<Reader<TransactionBufferSnapshot>> createReader(TopicName topicName) {
+    public CompletableFuture<SystemTopicClient.Reader<T>> createReader(TopicName topicName) {
         return getTransactionBufferSystemTopicClient(topicName).thenCompose(SystemTopicClient::newReaderAsync);
     }
 
-    @Override
-    public void removeClient(TopicName topicName,
-                                          TransactionBufferSystemTopicClient transactionBufferSystemTopicClient) {
+    public void removeClient(TopicName topicName, SystemTopicClientBase<T> transactionBufferSystemTopicClient) {
         if (transactionBufferSystemTopicClient.getReaders().size() == 0
                 && transactionBufferSystemTopicClient.getWriters().size() == 0) {
             clients.remove(topicName);

Review Comment:
   This class is very similar to the factory class used to register the `SystemTopicClient` for system topics. There are many System topics under each namespace. I think these resources should be released( `gc(SystemTopicClient)` ) on the namespace deleted event. Now such an implementation will have a race condition: 
   
   - close the last `writer`
   - registry a new `writer`
   
   | close the last `writer` | registry a new `writer` | 
   |---|---|
   | check `transactionBufferSystemTopicClient.getWriters().size() == 0` : `true` |  |
   |  | registry new one |
   | remove(topic) |  |
   
   @liangyepianzhou and I have discussed and would like to consult your opinion @codelipenghui 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] liangyepianzhou commented on a diff in pull request #16931: [Improve][Txn][PIP-196]Segmented transaction buffer snapshot segment and index system topic

Posted by GitBox <gi...@apache.org>.
liangyepianzhou commented on code in PR #16931:
URL: https://github.com/apache/pulsar/pull/16931#discussion_r1000117434


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClient.java:
##########
@@ -93,22 +93,22 @@
          * @return message id
          * @throws PulsarClientException exception while write event cause
          */
-        MessageId write(T t) throws PulsarClientException;
+        MessageId write(T t, String key) throws PulsarClientException;

Review Comment:
   Because `MessageId write(T t) throws PulsarClientException;` can not be implement in the `TransactionBufferSnapshotBaseSystemTopicClient`.
   We can not get the key from the value of type T.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui closed pull request #16931: [Improve][Txn][PIP-196]Segmented transaction buffer snapshot segment and index system topic

Posted by GitBox <gi...@apache.org>.
codelipenghui closed pull request #16931: [Improve][Txn][PIP-196]Segmented transaction buffer snapshot segment and index system topic
URL: https://github.com/apache/pulsar/pull/16931


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] liangyepianzhou commented on a diff in pull request #16931: [Improve][Txn][PIP-196]Segmented transaction buffer snapshot segment and index system topic

Posted by GitBox <gi...@apache.org>.
liangyepianzhou commented on code in PR #16931:
URL: https://github.com/apache/pulsar/pull/16931#discussion_r998152459


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java:
##########
@@ -117,6 +118,15 @@ ReadOnlyCursor openReadOnlyCursor(String managedLedgerName, Position startPositi
     void asyncOpenReadOnlyCursor(String managedLedgerName, Position startPosition, ManagedLedgerConfig config,
             OpenReadOnlyCursorCallback callback, Object ctx);
 
+    /**
+     * Asynchronous open a Read-only managedLedger.
+     * @param managedLedgerName the unique name that identifies the managed ledger
+     * @param config the managed ledegr configuratiion.
+     * @param ctx opaque context
+     */
+    CompletableFuture<ReadOnlyManagedLedgerImpl> asyncOpenReadOnlyManagedLedger(String managedLedgerName,

Review Comment:
   It is used in `asyncOpenReadOnlyCursor`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] liangyepianzhou commented on a diff in pull request #16931: [Improve][Txn][PIP-196]Segmented transaction buffer snapshot segment and index system topic

Posted by GitBox <gi...@apache.org>.
liangyepianzhou commented on code in PR #16931:
URL: https://github.com/apache/pulsar/pull/16931#discussion_r998161007


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransactionBufferSnapshotServiceFactory.java:
##########
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.broker.service;
+
+import org.apache.pulsar.broker.transaction.buffer.matadata.TransactionBufferSnapshot;
+import org.apache.pulsar.broker.transaction.buffer.matadata.v2.TransactionBufferSnapshotIndexes;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.common.events.EventType;
+
+public class TransactionBufferSnapshotServiceFactory {
+
+    private SystemTopicTxnBufferSnapshotService<TransactionBufferSnapshot> txnBufferSnapshotService;
+
+    private SystemTopicTxnBufferSnapshotService<TransactionBufferSnapshotIndexes.TransactionBufferSnapshot>
+            txnBufferSnapshotSegmentService;
+
+    private SystemTopicTxnBufferSnapshotService<TransactionBufferSnapshotIndexes> txnBufferSnapshotIndexService;
+
+    public TransactionBufferSnapshotServiceFactory(PulsarClient pulsarClient,
+                                                boolean transactionBufferSegmentedSnapshotEnabled) {

Review Comment:
   In the original design, this configuration may not be at the broker level, but at the namespace or topic level.
   So here are all of them implemented.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codecov-commenter commented on pull request #16931: [feat][broker] Segmented transaction buffer snapshot segment and index system topic

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on PR #16931:
URL: https://github.com/apache/pulsar/pull/16931#issuecomment-1288258459

   # [Codecov](https://codecov.io/gh/apache/pulsar/pull/16931?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#16931](https://codecov.io/gh/apache/pulsar/pull/16931?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (6ec7dc6) into [master](https://codecov.io/gh/apache/pulsar/commit/6c65ca0d8a80bfaaa4d5869e0cea485f5c94369b?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (6c65ca0) will **increase** coverage by `35.87%`.
   > The diff coverage is `50.00%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/pulsar/pull/16931/graphs/tree.svg?width=650&height=150&src=pr&token=acYqCpsK9J&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/pulsar/pull/16931?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@              Coverage Diff              @@
   ##             master   #16931       +/-   ##
   =============================================
   + Coverage     34.91%   70.79%   +35.87%     
   + Complexity     5707      438     -5269     
   =============================================
     Files           607       26      -581     
     Lines         53396     2246    -51150     
     Branches       5712      245     -5467     
   =============================================
   - Hits          18644     1590    -17054     
   + Misses        32119      483    -31636     
   + Partials       2633      173     -2460     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | unittests | `70.79% <50.00%> (+35.87%)` | :arrow_up: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/pulsar/pull/16931?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...apache/pulsar/proxy/server/DirectProxyHandler.java](https://codecov.io/gh/apache/pulsar/pull/16931/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLXByb3h5L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9wdWxzYXIvcHJveHkvc2VydmVyL0RpcmVjdFByb3h5SGFuZGxlci5qYXZh) | `63.63% <50.00%> (ø)` | |
   | [...pache/pulsar/proxy/server/ProxyServiceStarter.java](https://codecov.io/gh/apache/pulsar/pull/16931/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLXByb3h5L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9wdWxzYXIvcHJveHkvc2VydmVyL1Byb3h5U2VydmljZVN0YXJ0ZXIuamF2YQ==) | `60.66% <0.00%> (-1.34%)` | :arrow_down: |
   | [...n/java/org/apache/pulsar/broker/PulsarService.java](https://codecov.io/gh/apache/pulsar/pull/16931/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9QdWxzYXJTZXJ2aWNlLmphdmE=) | | |
   | [...pulsar/broker/admin/impl/PersistentTopicsBase.java](https://codecov.io/gh/apache/pulsar/pull/16931/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9hZG1pbi9pbXBsL1BlcnNpc3RlbnRUb3BpY3NCYXNlLmphdmE=) | | |
   | [...broker/delayed/InMemoryDelayedDeliveryTracker.java](https://codecov.io/gh/apache/pulsar/pull/16931/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9kZWxheWVkL0luTWVtb3J5RGVsYXllZERlbGl2ZXJ5VHJhY2tlci5qYXZh) | | |
   | [...rg/apache/pulsar/broker/service/BrokerService.java](https://codecov.io/gh/apache/pulsar/pull/16931/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9zZXJ2aWNlL0Jyb2tlclNlcnZpY2UuamF2YQ==) | | |
   | [.../service/SystemTopicBasedTopicPoliciesService.java](https://codecov.io/gh/apache/pulsar/pull/16931/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9zZXJ2aWNlL1N5c3RlbVRvcGljQmFzZWRUb3BpY1BvbGljaWVzU2VydmljZS5qYXZh) | | |
   | [...r/service/SystemTopicTxnBufferSnapshotService.java](https://codecov.io/gh/apache/pulsar/pull/16931/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9zZXJ2aWNlL1N5c3RlbVRvcGljVHhuQnVmZmVyU25hcHNob3RTZXJ2aWNlLmphdmE=) | | |
   | [.../pulsar/broker/stats/BrokerOperabilityMetrics.java](https://codecov.io/gh/apache/pulsar/pull/16931/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9zdGF0cy9Ccm9rZXJPcGVyYWJpbGl0eU1ldHJpY3MuamF2YQ==) | | |
   | [...er/systopic/NamespaceEventsSystemTopicFactory.java](https://codecov.io/gh/apache/pulsar/pull/16931/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9zeXN0b3BpYy9OYW1lc3BhY2VFdmVudHNTeXN0ZW1Ub3BpY0ZhY3RvcnkuamF2YQ==) | | |
   | ... and [573 more](https://codecov.io/gh/apache/pulsar/pull/16931/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #16931: [Improve][Txn][PIP-196]Segmented transaction buffer snapshot segment and index system topic

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #16931:
URL: https://github.com/apache/pulsar/pull/16931#discussion_r997736460


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java:
##########
@@ -420,7 +420,28 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {
         });
     }
 
-
+    @Override
+    public CompletableFuture<ReadOnlyManagedLedgerImpl> asyncOpenReadOnlyManagedLedger(String managedLedgerName,
+                              ManagedLedgerConfig config, Object ctx) {
+        CompletableFuture<ReadOnlyManagedLedgerImpl> future = new CompletableFuture<>();
+        if (closed) {
+            return FutureUtil.failedFuture(new ManagedLedgerException.ManagedLedgerFactoryClosedException());
+        }
+        ReadOnlyManagedLedgerImpl roManagedLedger = new ReadOnlyManagedLedgerImpl(this,
+                bookkeeperFactory
+                        .get(new EnsemblePlacementPolicyConfig(config.getBookKeeperEnsemblePlacementPolicyClassName(),
+                                config.getBookKeeperEnsemblePlacementPolicyProperties())),
+                store, config, scheduledExecutor, managedLedgerName);
+        roManagedLedger.initialize().thenRun(() -> {

Review Comment:
   `ReadOnlyManagedLedgerImpl.initialize()` has already return `CompletableFuture<ReadOnlyManagedLedgerImpl>` , can we write it like this: 
   
   ```java
   return new ReadOnlyManagedLedgerImpl(this,
                   bookkeeperFactory
                           .get(new EnsemblePlacementPolicyConfig(config.getBookKeeperEnsemblePlacementPolicyClassName(),
                                   config.getBookKeeperEnsemblePlacementPolicyProperties())),
                   store, config, scheduledExecutor, managedLedgerName) initialize();
   ```
   
   Or the method `ReadOnlyManagedLedgerImpl.initialize()` can modify the return declaration to `CompletableFuture<Void>` ?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TopicPoliciesSystemTopicClient.java:
##########
@@ -85,42 +85,48 @@ private TopicPolicyWriter(Producer<PulsarEvent> producer, SystemTopicClient<Puls
         }
 
         @Override
-        public MessageId write(PulsarEvent event) throws PulsarClientException {
-            TypedMessageBuilder<PulsarEvent> builder = producer.newMessage().key(getEventKey(event)).value(event);
+        public MessageId write(PulsarEvent event, String key) throws PulsarClientException {
+            TypedMessageBuilder<PulsarEvent> builder = producer.newMessage().key(key).value(event);
             setReplicateCluster(event, builder);
             return builder.send();
         }
 
         @Override
-        public CompletableFuture<MessageId> writeAsync(PulsarEvent event) {
-            TypedMessageBuilder<PulsarEvent> builder = producer.newMessage().key(getEventKey(event)).value(event);
+        public CompletableFuture<MessageId> writeAsync(PulsarEvent event, String key) {
+            TypedMessageBuilder<PulsarEvent> builder = producer.newMessage().key(key).value(event);
             setReplicateCluster(event, builder);
             return builder.sendAsync();
         }
 
         @Override
-        public MessageId delete(PulsarEvent event) throws PulsarClientException {
-            validateActionType(event);
-            TypedMessageBuilder<PulsarEvent> builder = producer.newMessage().key(getEventKey(event)).value(null);
+        public MessageId delete(PulsarEvent event, String key) throws PulsarClientException {
+            TypedMessageBuilder<PulsarEvent> builder = producer.newMessage().key(key).value(null);
             setReplicateCluster(event, builder);
             return builder.send();
         }
 
         @Override
-        public CompletableFuture<MessageId> deleteAsync(PulsarEvent event) {
+        public CompletableFuture<MessageId> deleteAsync(PulsarEvent event, String key) {
             validateActionType(event);
             TypedMessageBuilder<PulsarEvent> builder = producer.newMessage().key(getEventKey(event)).value(null);

Review Comment:
   should we use `@param key` instead of `getEventKey(event)` here?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransactionBufferSnapshotServiceFactory.java:
##########
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.broker.service;
+
+import org.apache.pulsar.broker.transaction.buffer.matadata.TransactionBufferSnapshot;
+import org.apache.pulsar.broker.transaction.buffer.matadata.v2.TransactionBufferSnapshotIndexes;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.common.events.EventType;
+
+public class TransactionBufferSnapshotServiceFactory {
+
+    private SystemTopicTxnBufferSnapshotService<TransactionBufferSnapshot> txnBufferSnapshotService;
+
+    private SystemTopicTxnBufferSnapshotService<TransactionBufferSnapshotIndexes.TransactionBufferSnapshot>
+            txnBufferSnapshotSegmentService;
+
+    private SystemTopicTxnBufferSnapshotService<TransactionBufferSnapshotIndexes> txnBufferSnapshotIndexService;
+
+    public TransactionBufferSnapshotServiceFactory(PulsarClient pulsarClient,
+                                                boolean transactionBufferSegmentedSnapshotEnabled) {

Review Comment:
   Now `@param transactionBufferSegmentedSnapshotEnabled ` is not unused, how do we turn the feature on or off?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicTxnBufferSnapshotService.java:
##########
@@ -23,64 +23,63 @@
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
 import org.apache.pulsar.broker.systopic.SystemTopicClient;
-import org.apache.pulsar.broker.systopic.SystemTopicClient.Reader;
-import org.apache.pulsar.broker.systopic.SystemTopicClient.Writer;
-import org.apache.pulsar.broker.systopic.TransactionBufferSystemTopicClient;
-import org.apache.pulsar.broker.transaction.buffer.matadata.TransactionBufferSnapshot;
+import org.apache.pulsar.broker.systopic.SystemTopicClientBase;
 import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException.InvalidTopicNameException;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.events.EventType;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.FutureUtil;
 
-public class SystemTopicBaseTxnBufferSnapshotService implements TransactionBufferSnapshotService {
+public class SystemTopicTxnBufferSnapshotService<T> {
 
-    private final Map<TopicName, SystemTopicClient<TransactionBufferSnapshot>> clients;
+    protected final Map<TopicName, SystemTopicClient<T>> clients;
+    protected final NamespaceEventsSystemTopicFactory namespaceEventsSystemTopicFactory;
 
-    private final NamespaceEventsSystemTopicFactory namespaceEventsSystemTopicFactory;
+    protected final Class<T> schemaType;
+    protected final EventType systemTopicType;
 
-    public SystemTopicBaseTxnBufferSnapshotService(PulsarClient client) {
+    public SystemTopicTxnBufferSnapshotService(PulsarClient client, EventType systemTopicType,
+                                               Class<T> schemaType) {
         this.namespaceEventsSystemTopicFactory = new NamespaceEventsSystemTopicFactory(client);
+        this.systemTopicType = systemTopicType;
+        this.schemaType = schemaType;
         this.clients = new ConcurrentHashMap<>();
     }
 
-    @Override
-    public CompletableFuture<Writer<TransactionBufferSnapshot>> createWriter(TopicName topicName) {
+    public CompletableFuture<SystemTopicClient.Writer<T>> createWriter(TopicName topicName) {
         return getTransactionBufferSystemTopicClient(topicName).thenCompose(SystemTopicClient::newWriterAsync);
     }
 
-    private CompletableFuture<SystemTopicClient<TransactionBufferSnapshot>> getTransactionBufferSystemTopicClient(
-            TopicName topicName) {
-        TopicName systemTopicName = NamespaceEventsSystemTopicFactory
-                .getSystemTopicName(topicName.getNamespaceObject(), EventType.TRANSACTION_BUFFER_SNAPSHOT);
-        if (systemTopicName == null) {
-            return FutureUtil.failedFuture(
-                    new InvalidTopicNameException("Can't create SystemTopicBaseTxnBufferSnapshotService, "
-                            + "because the topicName is null!"));
-        }
-        return CompletableFuture.completedFuture(clients.computeIfAbsent(systemTopicName,
-                (v) -> namespaceEventsSystemTopicFactory
-                        .createTransactionBufferSystemTopicClient(topicName.getNamespaceObject(), this)));
-    }
-
-    @Override
-    public CompletableFuture<Reader<TransactionBufferSnapshot>> createReader(TopicName topicName) {
+    public CompletableFuture<SystemTopicClient.Reader<T>> createReader(TopicName topicName) {
         return getTransactionBufferSystemTopicClient(topicName).thenCompose(SystemTopicClient::newReaderAsync);
     }
 
-    @Override
-    public void removeClient(TopicName topicName,
-                                          TransactionBufferSystemTopicClient transactionBufferSystemTopicClient) {
+    public void removeClient(TopicName topicName, SystemTopicClientBase<T> transactionBufferSystemTopicClient) {
         if (transactionBufferSystemTopicClient.getReaders().size() == 0
                 && transactionBufferSystemTopicClient.getWriters().size() == 0) {
             clients.remove(topicName);

Review Comment:
   How do we prevent concurrency between `createReader | createWriter` and `removeClient`? Now that we have a fixed number of system topics, maybe we should not recycle `SystemTopicClient` objects



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java:
##########
@@ -117,6 +118,15 @@ ReadOnlyCursor openReadOnlyCursor(String managedLedgerName, Position startPositi
     void asyncOpenReadOnlyCursor(String managedLedgerName, Position startPosition, ManagedLedgerConfig config,
             OpenReadOnlyCursorCallback callback, Object ctx);
 
+    /**
+     * Asynchronous open a Read-only managedLedger.
+     * @param managedLedgerName the unique name that identifies the managed ledger
+     * @param config the managed ledegr configuratiion.
+     * @param ctx opaque context
+     */
+    CompletableFuture<ReadOnlyManagedLedgerImpl> asyncOpenReadOnlyManagedLedger(String managedLedgerName,

Review Comment:
   Why do we need this method when we can't seem to find anywhere else to use it but test?



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java:
##########
@@ -420,7 +420,28 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {
         });
     }
 
-
+    @Override
+    public CompletableFuture<ReadOnlyManagedLedgerImpl> asyncOpenReadOnlyManagedLedger(String managedLedgerName,
+                              ManagedLedgerConfig config, Object ctx) {
+        CompletableFuture<ReadOnlyManagedLedgerImpl> future = new CompletableFuture<>();
+        if (closed) {
+            return FutureUtil.failedFuture(new ManagedLedgerException.ManagedLedgerFactoryClosedException());
+        }
+        ReadOnlyManagedLedgerImpl roManagedLedger = new ReadOnlyManagedLedgerImpl(this,
+                bookkeeperFactory
+                        .get(new EnsemblePlacementPolicyConfig(config.getBookKeeperEnsemblePlacementPolicyClassName(),
+                                config.getBookKeeperEnsemblePlacementPolicyProperties())),
+                store, config, scheduledExecutor, managedLedgerName);
+        roManagedLedger.initialize().thenRun(() -> {
+            log.info("[{}] Successfully initialize Read-only managed ledger", managedLedgerName);
+            future.complete(roManagedLedger);
+        }).exceptionally(e -> {
+            log.error("[{}] Failed to initialize Read-only managed ledger", managedLedgerName, e);
+            future.completeExceptionally(new ManagedLedgerException.ManagedLedgerFactoryClosedException());

Review Comment:
   Why wrap it as a `ManagedLedgerFactoryClosedException `, and would it be better for the caller to handle it?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TransactionBufferSnapshotBaseSystemTopicClient.java:
##########
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.systopic;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.service.SystemTopicTxnBufferSnapshotService;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.naming.TopicName;
+
+@Slf4j
+public class  TransactionBufferSnapshotBaseSystemTopicClient<T> extends SystemTopicClientBase<T> {
+
+    protected final SystemTopicTxnBufferSnapshotService<T> systemTopicTxnBufferSnapshotService;
+    protected final Class<T> schemaType;
+
+    public TransactionBufferSnapshotBaseSystemTopicClient(PulsarClient client,
+                                                          TopicName topicName,
+                                                          SystemTopicTxnBufferSnapshotService<T>
+                                                                  systemTopicTxnBufferSnapshotService,
+                                                          Class<T> schemaType) {
+        super(client, topicName);
+        this.systemTopicTxnBufferSnapshotService = systemTopicTxnBufferSnapshotService;
+        this.schemaType = schemaType;
+    }
+
+    protected void removeWriter(Writer<T> writer) {
+        writers.remove(writer);
+        this.systemTopicTxnBufferSnapshotService.removeClient(topicName, this);
+    }
+
+    protected void removeReader(Reader<T> reader) {
+        readers.remove(reader);
+        this.systemTopicTxnBufferSnapshotService.removeClient(topicName, this);
+    }
+
+    protected static class TransactionBufferSnapshotWriter<T> implements Writer<T> {
+
+        protected final Producer<T> producer;
+        protected final TransactionBufferSnapshotBaseSystemTopicClient<T>
+                transactionBufferSnapshotBaseSystemTopicClient;
+
+        protected TransactionBufferSnapshotWriter(Producer<T> producer,
+                                                  TransactionBufferSnapshotBaseSystemTopicClient<T>
+                                                    transactionBufferSnapshotBaseSystemTopicClient) {
+            this.producer = producer;
+            this.transactionBufferSnapshotBaseSystemTopicClient = transactionBufferSnapshotBaseSystemTopicClient;
+        }
+
+        @Override
+        public MessageId write(T t, String key)
+                throws PulsarClientException {
+            return producer.newMessage().key(key)
+                    .value(t).send();
+        }
+
+        @Override
+        public CompletableFuture<MessageId> writeAsync(T t, String key) {
+            return producer.newMessage()
+                    .key(key)
+                    .value(t).sendAsync();
+        }
+
+        @Override
+        public MessageId delete(T t, String key)
+                throws PulsarClientException {
+            return producer.newMessage()
+                    .key(key)
+                    .value(null)
+                    .send();
+        }
+
+        @Override
+        public CompletableFuture<MessageId> deleteAsync(T t, String key) {
+            return producer.newMessage()
+                    .key(key)
+                    .value(null)
+                    .sendAsync();
+        }
+
+        @Override
+        public void close() throws IOException {
+            this.producer.close();
+            transactionBufferSnapshotBaseSystemTopicClient.removeWriter(this);

Review Comment:
   If `this.producer.close()` fails, should we delete `reader`? Maybe we can directly use `closeAsync().join`?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TransactionBufferSnapshotBaseSystemTopicClient.java:
##########
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.systopic;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.service.SystemTopicTxnBufferSnapshotService;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.naming.TopicName;
+
+@Slf4j
+public class  TransactionBufferSnapshotBaseSystemTopicClient<T> extends SystemTopicClientBase<T> {
+
+    protected final SystemTopicTxnBufferSnapshotService<T> systemTopicTxnBufferSnapshotService;
+    protected final Class<T> schemaType;
+
+    public TransactionBufferSnapshotBaseSystemTopicClient(PulsarClient client,
+                                                          TopicName topicName,
+                                                          SystemTopicTxnBufferSnapshotService<T>
+                                                                  systemTopicTxnBufferSnapshotService,
+                                                          Class<T> schemaType) {
+        super(client, topicName);
+        this.systemTopicTxnBufferSnapshotService = systemTopicTxnBufferSnapshotService;
+        this.schemaType = schemaType;
+    }
+
+    protected void removeWriter(Writer<T> writer) {
+        writers.remove(writer);
+        this.systemTopicTxnBufferSnapshotService.removeClient(topicName, this);
+    }
+
+    protected void removeReader(Reader<T> reader) {
+        readers.remove(reader);
+        this.systemTopicTxnBufferSnapshotService.removeClient(topicName, this);
+    }
+
+    protected static class TransactionBufferSnapshotWriter<T> implements Writer<T> {
+
+        protected final Producer<T> producer;
+        protected final TransactionBufferSnapshotBaseSystemTopicClient<T>
+                transactionBufferSnapshotBaseSystemTopicClient;
+
+        protected TransactionBufferSnapshotWriter(Producer<T> producer,
+                                                  TransactionBufferSnapshotBaseSystemTopicClient<T>
+                                                    transactionBufferSnapshotBaseSystemTopicClient) {
+            this.producer = producer;
+            this.transactionBufferSnapshotBaseSystemTopicClient = transactionBufferSnapshotBaseSystemTopicClient;
+        }
+
+        @Override
+        public MessageId write(T t, String key)
+                throws PulsarClientException {
+            return producer.newMessage().key(key)
+                    .value(t).send();
+        }
+
+        @Override
+        public CompletableFuture<MessageId> writeAsync(T t, String key) {
+            return producer.newMessage()
+                    .key(key)
+                    .value(t).sendAsync();
+        }
+
+        @Override
+        public MessageId delete(T t, String key)
+                throws PulsarClientException {
+            return producer.newMessage()
+                    .key(key)
+                    .value(null)
+                    .send();
+        }
+
+        @Override
+        public CompletableFuture<MessageId> deleteAsync(T t, String key) {
+            return producer.newMessage()
+                    .key(key)
+                    .value(null)
+                    .sendAsync();
+        }
+
+        @Override
+        public void close() throws IOException {
+            this.producer.close();
+            transactionBufferSnapshotBaseSystemTopicClient.removeWriter(this);
+        }
+
+        @Override
+        public CompletableFuture<Void> closeAsync() {
+            CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+            producer.closeAsync().whenComplete((v, e) -> {
+                // if close fail, also need remove the producer
+                transactionBufferSnapshotBaseSystemTopicClient.removeWriter(this);
+                if (e != null) {
+                    completableFuture.completeExceptionally(e);
+                    return;
+                }
+                completableFuture.complete(null);
+            });
+            return completableFuture;
+        }
+
+        @Override
+        public SystemTopicClient<T> getSystemTopicClient() {
+            return transactionBufferSnapshotBaseSystemTopicClient;
+        }
+    }
+
+    protected static class TransactionBufferSnapshotReader<T> implements Reader<T> {
+
+        private final org.apache.pulsar.client.api.Reader<T> reader;
+        private final TransactionBufferSnapshotBaseSystemTopicClient<T> transactionBufferSnapshotBaseSystemTopicClient;
+
+        protected TransactionBufferSnapshotReader(
+                org.apache.pulsar.client.api.Reader<T> reader,
+                TransactionBufferSnapshotBaseSystemTopicClient<T> transactionBufferSnapshotBaseSystemTopicClient) {
+            this.reader = reader;
+            this.transactionBufferSnapshotBaseSystemTopicClient = transactionBufferSnapshotBaseSystemTopicClient;
+        }
+
+        @Override
+        public Message<T> readNext() throws PulsarClientException {
+            return reader.readNext();
+        }
+
+        @Override
+        public CompletableFuture<Message<T>> readNextAsync() {
+            return reader.readNextAsync();
+        }
+
+        @Override
+        public boolean hasMoreEvents() throws PulsarClientException {
+            return reader.hasMessageAvailable();
+        }
+
+        @Override
+        public CompletableFuture<Boolean> hasMoreEventsAsync() {
+            return reader.hasMessageAvailableAsync();
+        }
+
+        @Override
+        public void close() throws IOException {
+            this.reader.close();
+            transactionBufferSnapshotBaseSystemTopicClient.removeReader(this);
+        }
+
+        @Override
+        public CompletableFuture<Void> closeAsync() {
+            CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+            reader.closeAsync().whenComplete((v, e) -> {
+                // if close fail, also need remove the reader
+                transactionBufferSnapshotBaseSystemTopicClient.removeReader(this);
+                if (e != null) {
+                    completableFuture.completeExceptionally(e);
+                    return;
+                }
+                completableFuture.complete(null);
+            });
+            return completableFuture;
+        }
+
+        @Override
+        public SystemTopicClient<T> getSystemTopic() {
+            return transactionBufferSnapshotBaseSystemTopicClient;
+        }
+    }
+
+    @Override
+    protected CompletableFuture<Writer<T>> newWriterAsyncInternal() {
+        return client.newProducer(Schema.AVRO(schemaType))
+                .topic(topicName.toString())
+                .createAsync().thenCompose(producer -> {

Review Comment:
   Should we use `thenApply`, which would save a `CompletableFuture` object? 
   
   Same for `newReaderAsyncInternal`.



##########
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TxnID.java:
##########
@@ -39,14 +43,14 @@ public class TxnID implements Serializable {
      *
      * @serial
      */
-    private final long mostSigBits;
+    private long mostSigBits;

Review Comment:
   Why remove the final modifier?
   
   Same for `leastSigBits`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #16931: [Improve][Txn][PIP-196]Segmented transaction buffer snapshot segment and index system topic

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #16931:
URL: https://github.com/apache/pulsar/pull/16931#discussion_r999520932


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java:
##########
@@ -45,8 +45,8 @@ public ReadOnlyManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bo
         super(factory, bookKeeper, store, config, scheduledExecutor, name);
     }
 
-    CompletableFuture<ReadOnlyCursor> initializeAndCreateCursor(PositionImpl startPosition) {
-        CompletableFuture<ReadOnlyCursor> future = new CompletableFuture<>();
+    CompletableFuture<ReadOnlyManagedLedgerImpl> initialize() {
+        CompletableFuture<ReadOnlyManagedLedgerImpl> future = new CompletableFuture<>();

Review Comment:
   +1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #16931: [Improve][Txn][PIP-196]Segmented transaction buffer snapshot segment and index system topic

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #16931:
URL: https://github.com/apache/pulsar/pull/16931#discussion_r998157798


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java:
##########
@@ -117,6 +118,15 @@ ReadOnlyCursor openReadOnlyCursor(String managedLedgerName, Position startPositi
     void asyncOpenReadOnlyCursor(String managedLedgerName, Position startPosition, ManagedLedgerConfig config,
             OpenReadOnlyCursorCallback callback, Object ctx);
 
+    /**
+     * Asynchronous open a Read-only managedLedger.
+     * @param managedLedgerName the unique name that identifies the managed ledger
+     * @param config the managed ledegr configuratiion.
+     * @param ctx opaque context
+     */
+    CompletableFuture<ReadOnlyManagedLedgerImpl> asyncOpenReadOnlyManagedLedger(String managedLedgerName,

Review Comment:
   Yes, if we don't change `asyncOpenReadOnlyCursor`, it doesn't depend on `asyncOpenReadOnlyManagedLedger`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] liangyepianzhou commented on a diff in pull request #16931: [Improve][Txn][PIP-196]Segmented transaction buffer snapshot segment and index system topic

Posted by GitBox <gi...@apache.org>.
liangyepianzhou commented on code in PR #16931:
URL: https://github.com/apache/pulsar/pull/16931#discussion_r1000117434


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClient.java:
##########
@@ -93,22 +93,22 @@
          * @return message id
          * @throws PulsarClientException exception while write event cause
          */
-        MessageId write(T t) throws PulsarClientException;
+        MessageId write(T t, String key) throws PulsarClientException;

Review Comment:
   Because `MessageId write(T t) throws PulsarClientException;` can not be implement in the `TransactionBufferSnapshotBaseSystemTopicClient`.
   We can not get the key by the type T.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] liangyepianzhou commented on a diff in pull request #16931: [Improve][Txn][PIP-196]Segmented transaction buffer snapshot segment and index system topic

Posted by GitBox <gi...@apache.org>.
liangyepianzhou commented on code in PR #16931:
URL: https://github.com/apache/pulsar/pull/16931#discussion_r998151894


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java:
##########
@@ -420,7 +420,28 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {
         });
     }
 
-
+    @Override
+    public CompletableFuture<ReadOnlyManagedLedgerImpl> asyncOpenReadOnlyManagedLedger(String managedLedgerName,
+                              ManagedLedgerConfig config, Object ctx) {
+        CompletableFuture<ReadOnlyManagedLedgerImpl> future = new CompletableFuture<>();
+        if (closed) {
+            return FutureUtil.failedFuture(new ManagedLedgerException.ManagedLedgerFactoryClosedException());
+        }
+        ReadOnlyManagedLedgerImpl roManagedLedger = new ReadOnlyManagedLedgerImpl(this,
+                bookkeeperFactory
+                        .get(new EnsemblePlacementPolicyConfig(config.getBookKeeperEnsemblePlacementPolicyClassName(),
+                                config.getBookKeeperEnsemblePlacementPolicyProperties())),
+                store, config, scheduledExecutor, managedLedgerName);
+        roManagedLedger.initialize().thenRun(() -> {
+            log.info("[{}] Successfully initialize Read-only managed ledger", managedLedgerName);
+            future.complete(roManagedLedger);
+        }).exceptionally(e -> {
+            log.error("[{}] Failed to initialize Read-only managed ledger", managedLedgerName, e);
+            future.completeExceptionally(new ManagedLedgerException.ManagedLedgerFactoryClosedException());

Review Comment:
   It is used in `asyncOpenReadOnlyCursor`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #16931: [Improve][Txn][PIP-196]Segmented transaction buffer snapshot segment and index system topic

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #16931:
URL: https://github.com/apache/pulsar/pull/16931#discussion_r1002802962


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java:
##########
@@ -420,7 +420,29 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {
         });
     }
 
+    @Override
+    public void asyncOpenReadOnlyManagedLedger(String managedLedgerName,

Review Comment:
   @liangyepianzhou @poorbarcode I think AsyncCallbacks.OpenReadOnlyManagedLedgerCallback is ok. CompletableFuture is not able to pass the context unless you change the returned type.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] congbobo184 commented on a diff in pull request #16931: [Improve][Txn][PIP-196]Segmented transaction buffer snapshot segment and index system topic

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on code in PR #16931:
URL: https://github.com/apache/pulsar/pull/16931#discussion_r974988530


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/matadata/v2/TxnID.java:
##########
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer.matadata.v2;
+
+import java.util.Objects;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+
+@AllArgsConstructor
+@NoArgsConstructor
+@Getter
+@Setter
+public class TxnID {

Review Comment:
   use original TxnID



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java:
##########
@@ -47,6 +47,12 @@ public ReadOnlyManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bo
 
     CompletableFuture<ReadOnlyCursor> initializeAndCreateCursor(PositionImpl startPosition) {
         CompletableFuture<ReadOnlyCursor> future = new CompletableFuture<>();
+        initialize().thenRun(() -> future.complete(createReadOnlyCursor(startPosition)));
+        return future;
+    }
+
+    CompletableFuture<Void> initialize() {

Review Comment:
   ```suggestion
       CompletableFuture<ReadOnlyManagedLedgerImpl> initialize() {
   ```



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java:
##########
@@ -116,6 +116,16 @@ ReadOnlyCursor openReadOnlyCursor(String managedLedgerName, Position startPositi
     void asyncOpenReadOnlyCursor(String managedLedgerName, Position startPosition, ManagedLedgerConfig config,
             OpenReadOnlyCursorCallback callback, Object ctx);
 
+    /**
+     * Asynchronous open a Read-only managedLedger.
+     * @param managedLedgerName the unique name that identifies the managed ledger
+     * @param config the managed ledegr configuratiion.
+     * @param callback callback object
+     * @param ctx opaque context
+     */
+    void asyncOpenReadOnlyManagedLedger(String managedLedgerName, ManagedLedgerConfig config,

Review Comment:
   better not use callback
   ```suggestion
      CompletableFuture<ReadOnlyManagedLedgerImpl> asyncOpenReadOnlyManagedLedger(String managedLedgerName, ManagedLedgerConfig config,
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TransactionBufferSnapshotIndexSystemTopicClient.java:
##########
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.broker.systopic;
+
+import java.util.concurrent.CompletableFuture;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.service.SystemTopicTxnBufferSnapshotService;
+import org.apache.pulsar.broker.transaction.buffer.matadata.v2.TransactionBufferSnapshotIndexes;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.naming.TopicName;
+
+@Slf4j
+public class TransactionBufferSnapshotIndexSystemTopicClient extends
+        TransactionBufferSnapshotBaseSystemTopicClient<TransactionBufferSnapshotIndexes> {
+
+    private final SystemTopicTxnBufferSnapshotService<TransactionBufferSnapshotIndexes>
+            transactionBufferSnapshotIndexService;
+
+    public TransactionBufferSnapshotIndexSystemTopicClient(PulsarClient client, TopicName topicName,
+           SystemTopicTxnBufferSnapshotService<TransactionBufferSnapshotIndexes>
+                   transactionBufferSnapshotIndexService) {
+        super(client, topicName, transactionBufferSnapshotIndexService);
+        this.transactionBufferSnapshotIndexService = transactionBufferSnapshotIndexService;
+    }
+
+    @Override
+    protected CompletableFuture<Writer<TransactionBufferSnapshotIndexes>> newWriterAsyncInternal() {
+        return client.newProducer(Schema.AVRO(TransactionBufferSnapshotIndexes.class))
+                .topic(topicName.toString())

Review Comment:
   can move to TransactionBufferSnapshotBaseSystemTopicClient



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransactionBufferSnapshotServiceImpl.java:
##########
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.broker.service;
+
+import org.apache.pulsar.client.api.PulsarClient;
+
+public class TransactionBufferSnapshotServiceImpl implements TransactionBufferSnapshotService {
+
+    private SystemTopicTxnBufferSnapshotServiceImpl txnBufferSnapshotService;
+
+    private SystemTopicTxnBufferSnapshotSegmentServiceImpl txnBufferSnapshotSegmentService;
+
+    private SystemTopicTxnBufferSnapshotIndexServiceImpl txnBufferSnapshotIndexService;
+
+    public TransactionBufferSnapshotServiceImpl(PulsarClient pulsarClient,
+                                                boolean transactionBufferSegmentedSnapshotEnabled) {
+        if (transactionBufferSegmentedSnapshotEnabled) {
+            this.txnBufferSnapshotSegmentService = new SystemTopicTxnBufferSnapshotSegmentServiceImpl(pulsarClient);
+            this.txnBufferSnapshotIndexService = new SystemTopicTxnBufferSnapshotIndexServiceImpl(pulsarClient);
+        } else {
+            this.txnBufferSnapshotService = new SystemTopicTxnBufferSnapshotServiceImpl(pulsarClient);

Review Comment:
   transactionBufferSegmentedSnapshotEnabled don't control the TransactionBufferSnapshotServiceImpl



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TransactionBufferSnapshotIndexSystemTopicClient.java:
##########
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.broker.systopic;
+
+import java.util.concurrent.CompletableFuture;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.service.SystemTopicTxnBufferSnapshotService;
+import org.apache.pulsar.broker.transaction.buffer.matadata.v2.TransactionBufferSnapshotIndexes;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.naming.TopicName;
+
+@Slf4j
+public class TransactionBufferSnapshotIndexSystemTopicClient extends
+        TransactionBufferSnapshotBaseSystemTopicClient<TransactionBufferSnapshotIndexes> {
+
+    private final SystemTopicTxnBufferSnapshotService<TransactionBufferSnapshotIndexes>
+            transactionBufferSnapshotIndexService;
+
+    public TransactionBufferSnapshotIndexSystemTopicClient(PulsarClient client, TopicName topicName,
+           SystemTopicTxnBufferSnapshotService<TransactionBufferSnapshotIndexes>
+                   transactionBufferSnapshotIndexService) {
+        super(client, topicName, transactionBufferSnapshotIndexService);
+        this.transactionBufferSnapshotIndexService = transactionBufferSnapshotIndexService;
+    }
+
+    @Override
+    protected CompletableFuture<Writer<TransactionBufferSnapshotIndexes>> newWriterAsyncInternal() {
+        return client.newProducer(Schema.AVRO(TransactionBufferSnapshotIndexes.class))
+                .topic(topicName.toString())
+                .createAsync().thenCompose(producer -> {
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] A new transactionBufferSnapshot writer is created", topicName);
+                    }
+                    return CompletableFuture.completedFuture(
+                            new TransactionBufferSnapshotIndexWriter(producer, this));
+                });
+    }
+
+    @Override
+    protected CompletableFuture<Reader<TransactionBufferSnapshotIndexes>> newReaderAsyncInternal() {
+        return client.newReader(Schema.AVRO(TransactionBufferSnapshotIndexes.class))
+                .topic(topicName.toString())
+                .startMessageId(MessageId.earliest)
+                .readCompacted(true)
+                .createAsync()
+                .thenCompose(reader -> {

Review Comment:
   can move to TransactionBufferSnapshotBaseSystemTopicClient



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClient.java:
##########
@@ -93,22 +93,22 @@
          * @return message id
          * @throws PulsarClientException exception while write event cause
          */
-        MessageId write(T t) throws PulsarClientException;
+        MessageId write(T t, String key) throws PulsarClientException;

Review Comment:
   You can retain write(T t), just add write(T t, String key)



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicTxnBufferSnapshotService.java:
##########
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.broker.systopic.SystemTopicClient.Reader;
+import org.apache.pulsar.broker.systopic.SystemTopicClient.Writer;
+import org.apache.pulsar.broker.systopic.SystemTopicClientBase;
+import org.apache.pulsar.broker.systopic.TransactionBufferSnapshotBaseSystemTopicClient;
+import org.apache.pulsar.common.naming.TopicName;
+
+public interface  SystemTopicTxnBufferSnapshotService<T> {

Review Comment:
   ```suggestion
   public interface SystemTopicTxnBufferSnapshotService<T> {
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicTxnBufferSnapshotBaseService.java:
##########
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.pulsar.broker.systopic.SystemTopicClient;
+import org.apache.pulsar.broker.systopic.SystemTopicClientBase;
+import org.apache.pulsar.common.naming.TopicName;
+
+public abstract class SystemTopicTxnBufferSnapshotBaseService<T> implements SystemTopicTxnBufferSnapshotService<T> {
+
+    protected final Map<TopicName, SystemTopicClient<T>> clients;
+
+    public SystemTopicTxnBufferSnapshotBaseService() {
+        this.clients = new ConcurrentHashMap<>();
+    }
+
+    @Override
+    public CompletableFuture<SystemTopicClient.Writer<T>> createWriter(TopicName topicName) {
+        return getTransactionBufferSystemTopicClient(topicName).thenCompose(SystemTopicClient::newWriterAsync);
+

Review Comment:
   ```suggestion
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicTxnBufferSnapshotSegmentServiceImpl.java:
##########
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
+import org.apache.pulsar.broker.systopic.SystemTopicClient;
+import org.apache.pulsar.broker.transaction.buffer.matadata.v2.TransactionBufferSnapshotIndexes;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.common.events.EventType;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.FutureUtil;
+
+public class SystemTopicTxnBufferSnapshotSegmentServiceImpl extends
+        SystemTopicTxnBufferSnapshotBaseService<TransactionBufferSnapshotIndexes.TransactionBufferSnapshot> {
+    private final NamespaceEventsSystemTopicFactory namespaceEventsSystemTopicFactory;

Review Comment:
   can move to`SystemTopicTxnBufferSnapshotBaseService `



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java:
##########
@@ -47,6 +47,12 @@ public ReadOnlyManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bo
 
     CompletableFuture<ReadOnlyCursor> initializeAndCreateCursor(PositionImpl startPosition) {
         CompletableFuture<ReadOnlyCursor> future = new CompletableFuture<>();
+        initialize().thenRun(() -> future.complete(createReadOnlyCursor(startPosition)));

Review Comment:
   ```suggestion
           return asyncOpenReadOnlyManagedLedger().thenCompose(() -> createReadOnlyCursor(startPosition));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] congbobo184 commented on a diff in pull request #16931: [Improve][Txn][PIP-196]Segmented transaction buffer snapshot segment and index system topic

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on code in PR #16931:
URL: https://github.com/apache/pulsar/pull/16931#discussion_r1000516836


##########
pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java:
##########
@@ -37,6 +37,15 @@ public class SystemTopicNames {
      */
     public static final String TRANSACTION_BUFFER_SNAPSHOT = "__transaction_buffer_snapshot";
 
+    /**
+     * Local topic name for the transaction buffer snapshot segment.
+     */
+    public static final String TRANSACTION_BUFFER_SNAPSHOT_SEGMENT = "__transaction_buffer_snapshot_segments";

Review Comment:
   ```suggestion
       public static final String TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS = "__transaction_buffer_snapshot_segments";
   ```



##########
pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java:
##########
@@ -37,6 +37,15 @@ public class SystemTopicNames {
      */
     public static final String TRANSACTION_BUFFER_SNAPSHOT = "__transaction_buffer_snapshot";
 
+    /**
+     * Local topic name for the transaction buffer snapshot segment.

Review Comment:
   ```suggestion
        * Local topic name for the transaction buffer snapshot segments.
   ```



##########
pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java:
##########
@@ -37,6 +37,15 @@ public class SystemTopicNames {
      */
     public static final String TRANSACTION_BUFFER_SNAPSHOT = "__transaction_buffer_snapshot";
 
+    /**
+     * Local topic name for the transaction buffer snapshot segment.

Review Comment:
   ```suggestion
        * Local topic name for the transaction buffer snapshot segments.
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TransactionBufferSnapshotBaseSystemTopicClient.java:
##########
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.systopic;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.service.SystemTopicTxnBufferSnapshotService;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.naming.TopicName;
+
+@Slf4j
+public class  TransactionBufferSnapshotBaseSystemTopicClient<T> extends SystemTopicClientBase<T> {
+
+    protected final SystemTopicTxnBufferSnapshotService<T> systemTopicTxnBufferSnapshotService;
+    protected final Class<T> schemaType;
+
+    public TransactionBufferSnapshotBaseSystemTopicClient(PulsarClient client,
+                                                          TopicName topicName,
+                                                          SystemTopicTxnBufferSnapshotService<T>
+                                                                  systemTopicTxnBufferSnapshotService,
+                                                          Class<T> schemaType) {
+        super(client, topicName);
+        this.systemTopicTxnBufferSnapshotService = systemTopicTxnBufferSnapshotService;
+        this.schemaType = schemaType;
+    }
+
+    protected void removeWriter(Writer<T> writer) {
+        writers.remove(writer);
+        this.systemTopicTxnBufferSnapshotService.removeClient(topicName, this);
+    }
+
+    protected void removeReader(Reader<T> reader) {
+        readers.remove(reader);
+        this.systemTopicTxnBufferSnapshotService.removeClient(topicName, this);
+    }
+
+    protected static class TransactionBufferSnapshotWriter<T> implements Writer<T> {
+
+        protected final Producer<T> producer;
+        protected final TransactionBufferSnapshotBaseSystemTopicClient<T>
+                transactionBufferSnapshotBaseSystemTopicClient;
+
+        protected TransactionBufferSnapshotWriter(Producer<T> producer,
+                                                  TransactionBufferSnapshotBaseSystemTopicClient<T>
+                                                    transactionBufferSnapshotBaseSystemTopicClient) {
+            this.producer = producer;
+            this.transactionBufferSnapshotBaseSystemTopicClient = transactionBufferSnapshotBaseSystemTopicClient;
+        }
+
+        @Override
+        public MessageId write(T t, String key)
+                throws PulsarClientException {
+            return producer.newMessage().key(key)
+                    .value(t).send();
+        }
+
+        @Override
+        public CompletableFuture<MessageId> writeAsync(T t, String key) {
+            return producer.newMessage()
+                    .key(key)
+                    .value(t).sendAsync();
+        }
+
+        @Override
+        public MessageId delete(T t, String key)
+                throws PulsarClientException {
+            return producer.newMessage()
+                    .key(key)
+                    .value(null)
+                    .send();
+        }
+
+        @Override
+        public CompletableFuture<MessageId> deleteAsync(T t, String key) {
+            return producer.newMessage()
+                    .key(key)
+                    .value(null)
+                    .sendAsync();
+        }
+
+        @Override
+        public void close() throws IOException {
+            this.producer.close();
+            transactionBufferSnapshotBaseSystemTopicClient.removeWriter(this);

Review Comment:
   +1



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TopicPoliciesSystemTopicClient.java:
##########
@@ -74,7 +74,7 @@ protected CompletableFuture<Reader<PulsarEvent>> newReaderAsyncInternal() {
                 });
     }
 
-    private static class TopicPolicyWriter implements Writer<PulsarEvent> {
+    public static class TopicPolicyWriter implements Writer<PulsarEvent> {

Review Comment:
   why change this to public?



##########
pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java:
##########
@@ -37,6 +37,15 @@ public class SystemTopicNames {
      */
     public static final String TRANSACTION_BUFFER_SNAPSHOT = "__transaction_buffer_snapshot";
 
+    /**
+     * Local topic name for the transaction buffer snapshot segment.
+     */
+    public static final String TRANSACTION_BUFFER_SNAPSHOT_SEGMENT = "__transaction_buffer_snapshot_segments";

Review Comment:
   ```suggestion
       public static final String TRANSACTION_BUFFER_SNAPSHOT_SEGMENT = "__transaction_buffer_snapshot_segments";
   ```
   ```suggestion
       public static final String TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS = "__transaction_buffer_snapshot_segments";
   ```



##########
pulsar-common/src/main/java/org/apache/pulsar/common/events/EventType.java:
##########
@@ -31,5 +31,15 @@ public enum EventType {
     /**
      * Transaction buffer snapshot events.
      */
-    TRANSACTION_BUFFER_SNAPSHOT
+    TRANSACTION_BUFFER_SNAPSHOT,
+
+    /**
+     * Transaction buffer snapshot segment events.
+     */
+    TRANSACTION_BUFFER_SNAPSHOT_SEGMENT,

Review Comment:
   ```suggestion
       TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS,
   ```



##########
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TxnID.java:
##########
@@ -39,14 +43,14 @@ public class TxnID implements Serializable {
      *
      * @serial
      */
-    private final long mostSigBits;
+    private long mostSigBits;

Review Comment:
   add final



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TransactionBufferSnapshotBaseSystemTopicClient.java:
##########
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.systopic;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.service.SystemTopicTxnBufferSnapshotService;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.naming.TopicName;
+
+@Slf4j
+public class  TransactionBufferSnapshotBaseSystemTopicClient<T> extends SystemTopicClientBase<T> {
+
+    protected final SystemTopicTxnBufferSnapshotService<T> systemTopicTxnBufferSnapshotService;
+    protected final Class<T> schemaType;
+
+    public TransactionBufferSnapshotBaseSystemTopicClient(PulsarClient client,
+                                                          TopicName topicName,
+                                                          SystemTopicTxnBufferSnapshotService<T>
+                                                                  systemTopicTxnBufferSnapshotService,
+                                                          Class<T> schemaType) {
+        super(client, topicName);
+        this.systemTopicTxnBufferSnapshotService = systemTopicTxnBufferSnapshotService;
+        this.schemaType = schemaType;
+    }
+
+    protected void removeWriter(Writer<T> writer) {
+        writers.remove(writer);
+        this.systemTopicTxnBufferSnapshotService.removeClient(topicName, this);
+    }
+
+    protected void removeReader(Reader<T> reader) {
+        readers.remove(reader);
+        this.systemTopicTxnBufferSnapshotService.removeClient(topicName, this);
+    }
+
+    protected static class TransactionBufferSnapshotWriter<T> implements Writer<T> {
+
+        protected final Producer<T> producer;
+        protected final TransactionBufferSnapshotBaseSystemTopicClient<T>
+                transactionBufferSnapshotBaseSystemTopicClient;
+
+        protected TransactionBufferSnapshotWriter(Producer<T> producer,
+                                                  TransactionBufferSnapshotBaseSystemTopicClient<T>
+                                                    transactionBufferSnapshotBaseSystemTopicClient) {
+            this.producer = producer;
+            this.transactionBufferSnapshotBaseSystemTopicClient = transactionBufferSnapshotBaseSystemTopicClient;
+        }
+
+        @Override
+        public MessageId write(String key, T t)
+                throws PulsarClientException {
+            return producer.newMessage().key(key)
+                    .value(t).send();
+        }
+
+        @Override
+        public CompletableFuture<MessageId> writeAsync(String key, T t) {
+            return producer.newMessage()
+                    .key(key)
+                    .value(t).sendAsync();
+        }
+
+        @Override
+        public MessageId delete(String key, T t)
+                throws PulsarClientException {
+            return producer.newMessage()
+                    .key(key)
+                    .value(null)
+                    .send();
+        }
+
+        @Override
+        public CompletableFuture<MessageId> deleteAsync(String key, T t) {
+            return producer.newMessage()
+                    .key(key)
+                    .value(null)
+                    .sendAsync();
+        }
+
+        @Override
+        public void close() throws IOException {
+            this.producer.closeAsync().join();
+            transactionBufferSnapshotBaseSystemTopicClient.removeWriter(this);
+        }
+
+        @Override
+        public CompletableFuture<Void> closeAsync() {
+            CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+            producer.closeAsync().whenComplete((v, e) -> {
+                // if close fail, also need remove the producer
+                transactionBufferSnapshotBaseSystemTopicClient.removeWriter(this);
+                if (e != null) {
+                    completableFuture.completeExceptionally(e);
+                    return;
+                }
+                completableFuture.complete(null);
+            });
+            return completableFuture;
+        }
+
+        @Override
+        public SystemTopicClient<T> getSystemTopicClient() {
+            return transactionBufferSnapshotBaseSystemTopicClient;
+        }
+    }
+
+    protected static class TransactionBufferSnapshotReader<T> implements Reader<T> {
+
+        private final org.apache.pulsar.client.api.Reader<T> reader;
+        private final TransactionBufferSnapshotBaseSystemTopicClient<T> transactionBufferSnapshotBaseSystemTopicClient;
+
+        protected TransactionBufferSnapshotReader(
+                org.apache.pulsar.client.api.Reader<T> reader,
+                TransactionBufferSnapshotBaseSystemTopicClient<T> transactionBufferSnapshotBaseSystemTopicClient) {
+            this.reader = reader;
+            this.transactionBufferSnapshotBaseSystemTopicClient = transactionBufferSnapshotBaseSystemTopicClient;
+        }
+
+        @Override
+        public Message<T> readNext() throws PulsarClientException {
+            return reader.readNext();
+        }
+
+        @Override
+        public CompletableFuture<Message<T>> readNextAsync() {
+            return reader.readNextAsync();
+        }
+
+        @Override
+        public boolean hasMoreEvents() throws PulsarClientException {
+            return reader.hasMessageAvailable();
+        }
+
+        @Override
+        public CompletableFuture<Boolean> hasMoreEventsAsync() {
+            return reader.hasMessageAvailableAsync();
+        }
+
+        @Override
+        public void close() throws IOException {
+            this.reader.closeAsync().join();
+            transactionBufferSnapshotBaseSystemTopicClient.removeReader(this);
+        }
+
+        @Override
+        public CompletableFuture<Void> closeAsync() {
+            CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+            reader.closeAsync().whenComplete((v, e) -> {
+                // if close fail, also need remove the reader
+                transactionBufferSnapshotBaseSystemTopicClient.removeReader(this);
+                if (e != null) {
+                    completableFuture.completeExceptionally(e);
+                    return;
+                }
+                completableFuture.complete(null);
+            });
+            return completableFuture;
+        }
+
+        @Override
+        public SystemTopicClient<T> getSystemTopic() {
+            return transactionBufferSnapshotBaseSystemTopicClient;
+        }
+    }
+
+    @Override
+    protected CompletableFuture<Writer<T>> newWriterAsyncInternal() {
+        return client.newProducer(Schema.AVRO(schemaType))
+                .topic(topicName.toString())
+                .createAsync().thenApply(producer -> {
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] A new {} writer is created", topicName, schemaType.getName());
+                    }
+                    return  new TransactionBufferSnapshotWriter<>(producer, this);
+                });
+    }
+
+    @Override
+    protected CompletableFuture<Reader<T>> newReaderAsyncInternal() {
+        return client.newReader(Schema.AVRO(schemaType))
+                .topic(topicName.toString())
+                .startMessageId(MessageId.earliest)
+                .readCompacted(true)
+                .createAsync()
+                .thenApply(reader -> {
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] A new {} reader is created", topicName, schemaType);

Review Comment:
   ```suggestion
                           log.debug("[{}] A new {} reader is created", topicName, schemaType.getName());
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on pull request #16931: [Improve][Txn][PIP-196]Segmented transaction buffer snapshot segment and index system topic

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on PR #16931:
URL: https://github.com/apache/pulsar/pull/16931#issuecomment-1288248896

   @poorbarcode @congbobo184 Please help review the PR again, thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #16931: [Improve][Txn][PIP-196]Segmented transaction buffer snapshot segment and index system topic

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #16931:
URL: https://github.com/apache/pulsar/pull/16931#discussion_r1000814409


##########
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TxnID.java:
##########
@@ -39,14 +43,14 @@ public class TxnID implements Serializable {
      *
      * @serial
      */
-    private final long mostSigBits;
+    private long mostSigBits;

Review Comment:
   Hi @liangyepianzhou 
   
   I see you have added the new class `TxnIDData`, so should we remove these two changes?



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java:
##########
@@ -420,7 +420,29 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {
         });
     }
 
+    @Override
+    public void asyncOpenReadOnlyManagedLedger(String managedLedgerName,

Review Comment:
   Can we write this method like this( Saves the new class `AsyncCallbacks .OpenReadOnlyManagedLedgerCallback` )?
   
   ```
   @Override
   public CompletableFuture<ReadOnlyManagedLedgerImpl> asyncOpenReadOnlyManagedLedger(String managedLedgerName,
                             ManagedLedgerConfig config, Object ctx) {
       if (closed) {
           return FutureUtil.failedFuture(new ManagedLedgerException.ManagedLedgerFactoryClosedException());
       }
       ReadOnlyManagedLedgerImpl roManagedLedger = new ReadOnlyManagedLedgerImpl(this,
               bookkeeperFactory
                       .get(new EnsemblePlacementPolicyConfig(config.getBookKeeperEnsemblePlacementPolicyClassName(),
                               config.getBookKeeperEnsemblePlacementPolicyProperties())),
               store, config, scheduledExecutor, managedLedgerName);
       return roManagedLedger.initialize().thenApply(ignore -> roManagedLedger);
   }
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TransactionBufferSnapshotBaseSystemTopicClient.java:
##########
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.systopic;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.service.SystemTopicTxnBufferSnapshotService;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.naming.TopicName;
+
+@Slf4j
+public class  TransactionBufferSnapshotBaseSystemTopicClient<T> extends SystemTopicClientBase<T> {
+
+    protected final SystemTopicTxnBufferSnapshotService<T> systemTopicTxnBufferSnapshotService;
+    protected final Class<T> schemaType;
+
+    public TransactionBufferSnapshotBaseSystemTopicClient(PulsarClient client,
+                                                          TopicName topicName,
+                                                          SystemTopicTxnBufferSnapshotService<T>
+                                                                  systemTopicTxnBufferSnapshotService,
+                                                          Class<T> schemaType) {
+        super(client, topicName);
+        this.systemTopicTxnBufferSnapshotService = systemTopicTxnBufferSnapshotService;
+        this.schemaType = schemaType;
+    }
+
+    protected void removeWriter(Writer<T> writer) {
+        writers.remove(writer);
+        this.systemTopicTxnBufferSnapshotService.removeClient(topicName, this);
+    }
+
+    protected void removeReader(Reader<T> reader) {
+        readers.remove(reader);
+        this.systemTopicTxnBufferSnapshotService.removeClient(topicName, this);
+    }
+
+    protected static class TransactionBufferSnapshotWriter<T> implements Writer<T> {
+
+        protected final Producer<T> producer;
+        protected final TransactionBufferSnapshotBaseSystemTopicClient<T>
+                transactionBufferSnapshotBaseSystemTopicClient;
+
+        protected TransactionBufferSnapshotWriter(Producer<T> producer,
+                                                  TransactionBufferSnapshotBaseSystemTopicClient<T>
+                                                    transactionBufferSnapshotBaseSystemTopicClient) {
+            this.producer = producer;
+            this.transactionBufferSnapshotBaseSystemTopicClient = transactionBufferSnapshotBaseSystemTopicClient;
+        }
+
+        @Override
+        public MessageId write(T t, String key)
+                throws PulsarClientException {
+            return producer.newMessage().key(key)
+                    .value(t).send();
+        }
+
+        @Override
+        public CompletableFuture<MessageId> writeAsync(T t, String key) {
+            return producer.newMessage()
+                    .key(key)
+                    .value(t).sendAsync();
+        }
+
+        @Override
+        public MessageId delete(T t, String key)
+                throws PulsarClientException {
+            return producer.newMessage()
+                    .key(key)
+                    .value(null)
+                    .send();
+        }
+
+        @Override
+        public CompletableFuture<MessageId> deleteAsync(T t, String key) {
+            return producer.newMessage()
+                    .key(key)
+                    .value(null)
+                    .sendAsync();
+        }
+
+        @Override
+        public void close() throws IOException {
+            this.producer.close();
+            transactionBufferSnapshotBaseSystemTopicClient.removeWriter(this);

Review Comment:
   Can we write this method like this?
   
   ```
   @Override
   public void close() throws IOException {
       this.closeAsync().join();
   }
   ```
   
   Same for `TransactionBufferSnapshotReader`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] liangyepianzhou commented on a diff in pull request #16931: [Improve][Txn][PIP-196]Segmented transaction buffer snapshot segment and index system topic

Posted by GitBox <gi...@apache.org>.
liangyepianzhou commented on code in PR #16931:
URL: https://github.com/apache/pulsar/pull/16931#discussion_r998167663


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicTxnBufferSnapshotService.java:
##########
@@ -23,64 +23,63 @@
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
 import org.apache.pulsar.broker.systopic.SystemTopicClient;
-import org.apache.pulsar.broker.systopic.SystemTopicClient.Reader;
-import org.apache.pulsar.broker.systopic.SystemTopicClient.Writer;
-import org.apache.pulsar.broker.systopic.TransactionBufferSystemTopicClient;
-import org.apache.pulsar.broker.transaction.buffer.matadata.TransactionBufferSnapshot;
+import org.apache.pulsar.broker.systopic.SystemTopicClientBase;
 import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException.InvalidTopicNameException;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.events.EventType;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.FutureUtil;
 
-public class SystemTopicBaseTxnBufferSnapshotService implements TransactionBufferSnapshotService {
+public class SystemTopicTxnBufferSnapshotService<T> {
 
-    private final Map<TopicName, SystemTopicClient<TransactionBufferSnapshot>> clients;
+    protected final Map<TopicName, SystemTopicClient<T>> clients;
+    protected final NamespaceEventsSystemTopicFactory namespaceEventsSystemTopicFactory;
 
-    private final NamespaceEventsSystemTopicFactory namespaceEventsSystemTopicFactory;
+    protected final Class<T> schemaType;
+    protected final EventType systemTopicType;
 
-    public SystemTopicBaseTxnBufferSnapshotService(PulsarClient client) {
+    public SystemTopicTxnBufferSnapshotService(PulsarClient client, EventType systemTopicType,
+                                               Class<T> schemaType) {
         this.namespaceEventsSystemTopicFactory = new NamespaceEventsSystemTopicFactory(client);
+        this.systemTopicType = systemTopicType;
+        this.schemaType = schemaType;
         this.clients = new ConcurrentHashMap<>();
     }
 
-    @Override
-    public CompletableFuture<Writer<TransactionBufferSnapshot>> createWriter(TopicName topicName) {
+    public CompletableFuture<SystemTopicClient.Writer<T>> createWriter(TopicName topicName) {
         return getTransactionBufferSystemTopicClient(topicName).thenCompose(SystemTopicClient::newWriterAsync);
     }
 
-    private CompletableFuture<SystemTopicClient<TransactionBufferSnapshot>> getTransactionBufferSystemTopicClient(
-            TopicName topicName) {
-        TopicName systemTopicName = NamespaceEventsSystemTopicFactory
-                .getSystemTopicName(topicName.getNamespaceObject(), EventType.TRANSACTION_BUFFER_SNAPSHOT);
-        if (systemTopicName == null) {
-            return FutureUtil.failedFuture(
-                    new InvalidTopicNameException("Can't create SystemTopicBaseTxnBufferSnapshotService, "
-                            + "because the topicName is null!"));
-        }
-        return CompletableFuture.completedFuture(clients.computeIfAbsent(systemTopicName,
-                (v) -> namespaceEventsSystemTopicFactory
-                        .createTransactionBufferSystemTopicClient(topicName.getNamespaceObject(), this)));
-    }
-
-    @Override
-    public CompletableFuture<Reader<TransactionBufferSnapshot>> createReader(TopicName topicName) {
+    public CompletableFuture<SystemTopicClient.Reader<T>> createReader(TopicName topicName) {
         return getTransactionBufferSystemTopicClient(topicName).thenCompose(SystemTopicClient::newReaderAsync);
     }
 
-    @Override
-    public void removeClient(TopicName topicName,
-                                          TransactionBufferSystemTopicClient transactionBufferSystemTopicClient) {
+    public void removeClient(TopicName topicName, SystemTopicClientBase<T> transactionBufferSystemTopicClient) {
         if (transactionBufferSystemTopicClient.getReaders().size() == 0
                 && transactionBufferSystemTopicClient.getWriters().size() == 0) {
             clients.remove(topicName);

Review Comment:
   > How do we prevent concurrency between createReader | createWriter and removeClient?
   
   This is a `ConcurrentHashMap`.
   >Now that we have a fixed number of system topics, maybe we should not recycle SystemTopicClient objects 
   
   Sorry, I do not understand this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #16931: [Improve][Txn][PIP-196]Segmented transaction buffer snapshot segment and index system topic

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #16931:
URL: https://github.com/apache/pulsar/pull/16931#discussion_r998910789


##########
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TxnID.java:
##########
@@ -39,14 +43,14 @@ public class TxnID implements Serializable {
      *
      * @serial
      */
-    private final long mostSigBits;
+    private long mostSigBits;

Review Comment:
   OK



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #16931: [Improve][Txn][PIP-196]Segmented transaction buffer snapshot segment and index system topic

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #16931:
URL: https://github.com/apache/pulsar/pull/16931#discussion_r999512272


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java:
##########
@@ -117,6 +117,17 @@ ReadOnlyCursor openReadOnlyCursor(String managedLedgerName, Position startPositi
     void asyncOpenReadOnlyCursor(String managedLedgerName, Position startPosition, ManagedLedgerConfig config,
             OpenReadOnlyCursorCallback callback, Object ctx);
 
+    /**
+     * Asynchronous open a Read-only managedLedger.
+     * @param managedLedgerName the unique name that identifies the managed ledger
+     * @param callback
+     * @param config the managed ledger configuration.
+     * @param ctx opaque context
+     */
+    void asyncOpenReadOnlyManagedLedger(String managedLedgerName,

Review Comment:
   As discussed here https://github.com/apache/pulsar/pull/16931#discussion_r997743540,@liangyepianzhou wants to use this method in the next PR( Maybe we can change this in the next PR )



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] liangyepianzhou merged pull request #16931: [feat][broker] Segmented transaction buffer snapshot segment and index system topic

Posted by GitBox <gi...@apache.org>.
liangyepianzhou merged PR #16931:
URL: https://github.com/apache/pulsar/pull/16931


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #16931: [Improve][Txn][PIP-196]Segmented transaction buffer snapshot segment and index system topic

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #16931:
URL: https://github.com/apache/pulsar/pull/16931#discussion_r996567311


##########
pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java:
##########
@@ -37,6 +37,15 @@ public class SystemTopicNames {
      */
     public static final String TRANSACTION_BUFFER_SNAPSHOT = "__transaction_buffer_snapshot";
 
+    /**
+     * Local topic name for the transaction buffer snapshot segment.
+     */
+    public static final String TRANSACTION_BUFFER_SNAPSHOT_SEGMENT = "__transaction_buffer_snapshot_segment";

Review Comment:
   ```suggestion
       public static final String TRANSACTION_BUFFER_SNAPSHOT_SEGMENT = "__transaction_buffer_snapshot_segments";
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/matadata/v2/TransactionBufferSnapshotIndexes.java:
##########
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer.matadata.v2;

Review Comment:
   ```suggestion
   package org.apache.pulsar.broker.transaction.buffer.metadata.v2;
   ```
   
   It should be a mistake before.



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java:
##########
@@ -45,8 +45,8 @@ public ReadOnlyManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bo
         super(factory, bookKeeper, store, config, scheduledExecutor, name);
     }
 
-    CompletableFuture<ReadOnlyCursor> initializeAndCreateCursor(PositionImpl startPosition) {
-        CompletableFuture<ReadOnlyCursor> future = new CompletableFuture<>();
+    CompletableFuture<ReadOnlyManagedLedgerImpl> initialize() {
+        CompletableFuture<ReadOnlyManagedLedgerImpl> future = new CompletableFuture<>();

Review Comment:
   If we can call ReadOnlyManagedLedgerImpl.initialize(), we already have the ReadOnlyManagedLedgerImpl instance, can we change to CompletableFuture<Void>  here?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/matadata/v2/TransactionBufferSnapshotIndexes.java:
##########
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer.matadata.v2;
+
+import java.util.List;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+import org.apache.pulsar.client.api.transaction.TxnID;
+
+@AllArgsConstructor
+@NoArgsConstructor
+@Getter
+@Setter
+@Builder
+public class TransactionBufferSnapshotIndexes {
+    private String topicName;
+
+    private List<TransactionBufferSnapshotIndex> indexList;
+
+    private TransactionBufferSnapshot snapshot;
+
+    @Builder
+    @Data
+    @AllArgsConstructor
+    @NoArgsConstructor
+    public static class TransactionBufferSnapshotIndex {
+        public long sequenceID;
+        public long maxReadPositionLedgerID;
+        public long maxReadPositionEntryID;
+        public long persistentPositionLedgerID;
+        public long persistentPositionEntryID;
+    }
+
+    @Data
+    @AllArgsConstructor
+    @NoArgsConstructor
+    public static class TransactionBufferSnapshot {
+        private String topicName;
+        private long sequenceId;
+        private long maxReadPositionLedgerId;
+        private long maxReadPositionEntryId;
+        private List<TxnID> aborts;
+    }

Review Comment:
   It's better to define it as a separate class, not an internal class.
   Otherwise, the caller will use `TransactionBufferSnapshotIndexes.TransactionBufferSnapshot`.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicTxnBufferSnapshotService.java:
##########
@@ -23,64 +23,63 @@
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
 import org.apache.pulsar.broker.systopic.SystemTopicClient;
-import org.apache.pulsar.broker.systopic.SystemTopicClient.Reader;
-import org.apache.pulsar.broker.systopic.SystemTopicClient.Writer;
-import org.apache.pulsar.broker.systopic.TransactionBufferSystemTopicClient;
-import org.apache.pulsar.broker.transaction.buffer.matadata.TransactionBufferSnapshot;
+import org.apache.pulsar.broker.systopic.SystemTopicClientBase;
 import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException.InvalidTopicNameException;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.events.EventType;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.FutureUtil;
 
-public class SystemTopicBaseTxnBufferSnapshotService implements TransactionBufferSnapshotService {
+public class SystemTopicTxnBufferSnapshotService<T> {
 
-    private final Map<TopicName, SystemTopicClient<TransactionBufferSnapshot>> clients;
+    protected final Map<TopicName, SystemTopicClient<T>> clients;
+    protected final NamespaceEventsSystemTopicFactory namespaceEventsSystemTopicFactory;
 
-    private final NamespaceEventsSystemTopicFactory namespaceEventsSystemTopicFactory;
+    protected final Class<T> schemaType;
+    protected final EventType systemTopicType;
 
-    public SystemTopicBaseTxnBufferSnapshotService(PulsarClient client) {
+    public SystemTopicTxnBufferSnapshotService(PulsarClient client, EventType systemTopicType,
+                                               Class<T> schemaType) {
         this.namespaceEventsSystemTopicFactory = new NamespaceEventsSystemTopicFactory(client);
+        this.systemTopicType = systemTopicType;
+        this.schemaType = schemaType;
         this.clients = new ConcurrentHashMap<>();
     }
 
-    @Override
-    public CompletableFuture<Writer<TransactionBufferSnapshot>> createWriter(TopicName topicName) {
+    public CompletableFuture<SystemTopicClient.Writer<T>> createWriter(TopicName topicName) {
         return getTransactionBufferSystemTopicClient(topicName).thenCompose(SystemTopicClient::newWriterAsync);
     }
 
-    private CompletableFuture<SystemTopicClient<TransactionBufferSnapshot>> getTransactionBufferSystemTopicClient(
-            TopicName topicName) {
-        TopicName systemTopicName = NamespaceEventsSystemTopicFactory
-                .getSystemTopicName(topicName.getNamespaceObject(), EventType.TRANSACTION_BUFFER_SNAPSHOT);
-        if (systemTopicName == null) {
-            return FutureUtil.failedFuture(
-                    new InvalidTopicNameException("Can't create SystemTopicBaseTxnBufferSnapshotService, "
-                            + "because the topicName is null!"));
-        }
-        return CompletableFuture.completedFuture(clients.computeIfAbsent(systemTopicName,
-                (v) -> namespaceEventsSystemTopicFactory
-                        .createTransactionBufferSystemTopicClient(topicName.getNamespaceObject(), this)));
-    }
-
-    @Override
-    public CompletableFuture<Reader<TransactionBufferSnapshot>> createReader(TopicName topicName) {
+    public CompletableFuture<SystemTopicClient.Reader<T>> createReader(TopicName topicName) {
         return getTransactionBufferSystemTopicClient(topicName).thenCompose(SystemTopicClient::newReaderAsync);
     }
 
-    @Override
-    public void removeClient(TopicName topicName,
-                                          TransactionBufferSystemTopicClient transactionBufferSystemTopicClient) {
+    public void removeClient(TopicName topicName, SystemTopicClientBase<T> transactionBufferSystemTopicClient) {
         if (transactionBufferSystemTopicClient.getReaders().size() == 0
                 && transactionBufferSystemTopicClient.getWriters().size() == 0) {
             clients.remove(topicName);

Review Comment:
   I think you are talking about the system topic client leak issue right?
   Can we resolve it with another bug-fix PR? It will help users to find the PR that fixed the BUG.
   Don't hide it in a feature PR.



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java:
##########
@@ -117,6 +117,17 @@ ReadOnlyCursor openReadOnlyCursor(String managedLedgerName, Position startPositi
     void asyncOpenReadOnlyCursor(String managedLedgerName, Position startPosition, ManagedLedgerConfig config,
             OpenReadOnlyCursorCallback callback, Object ctx);
 
+    /**
+     * Asynchronous open a Read-only managedLedger.
+     * @param managedLedgerName the unique name that identifies the managed ledger
+     * @param callback
+     * @param config the managed ledger configuration.
+     * @param ctx opaque context
+     */
+    void asyncOpenReadOnlyManagedLedger(String managedLedgerName,

Review Comment:
   We have 
   
   ```
   @InterfaceAudience.LimitedPrivate
   @InterfaceStability.Stable
   ```
   for this class
   Does the new method is required?
   I only see the test will call this method, if it's not required, can we avoid introducing the new method to the public API?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClient.java:
##########
@@ -93,22 +93,22 @@
          * @return message id
          * @throws PulsarClientException exception while write event cause
          */
-        MessageId write(T t) throws PulsarClientException;
+        MessageId write(T t, String key) throws PulsarClientException;

Review Comment:
   +1, why mark this one as resolved, I don't see any conclusion for this one.
   And it's better to use `MessageId write(String key, T value)`.



##########
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TxnID.java:
##########
@@ -39,14 +43,14 @@ public class TxnID implements Serializable {
      *
      * @serial
      */
-    private final long mostSigBits;
+    private long mostSigBits;

Review Comment:
   I think the issue is we are using by `TransactionBufferSnapshotIndexes` right? It's better to add a new one in the broker side like `TxnIdData`.
   
   Here we will change a public API for users. It's better to keep it simple and understandable. I don't think the user will need the new constructor of TxnID.
   
   And allow user to update the `mostSigBits` and `leastSigBits` is also not good. We should avoid it.



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java:
##########
@@ -420,7 +420,30 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {
         });
     }
 
+    @Override
+    public void asyncOpenReadOnlyManagedLedger(String managedLedgerName,
+                              AsyncCallbacks.OpenReadOnlyManagedLedgerCallback callback,
+                              ManagedLedgerConfig config, Object ctx) {
+        CompletableFuture<ReadOnlyManagedLedgerImpl> future = new CompletableFuture<>();

Review Comment:
   Looks like we will not use this one.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TopicPoliciesSystemTopicClient.java:
##########
@@ -85,42 +85,48 @@ private TopicPolicyWriter(Producer<PulsarEvent> producer, SystemTopicClient<Puls
         }
 
         @Override
-        public MessageId write(PulsarEvent event) throws PulsarClientException {
-            TypedMessageBuilder<PulsarEvent> builder = producer.newMessage().key(getEventKey(event)).value(event);
+        public MessageId write(PulsarEvent event, String key) throws PulsarClientException {
+            TypedMessageBuilder<PulsarEvent> builder = producer.newMessage().key(key).value(event);
             setReplicateCluster(event, builder);
             return builder.send();
         }
 
         @Override
-        public CompletableFuture<MessageId> writeAsync(PulsarEvent event) {
-            TypedMessageBuilder<PulsarEvent> builder = producer.newMessage().key(getEventKey(event)).value(event);
+        public CompletableFuture<MessageId> writeAsync(PulsarEvent event, String key) {
+            TypedMessageBuilder<PulsarEvent> builder = producer.newMessage().key(key).value(event);
             setReplicateCluster(event, builder);
             return builder.sendAsync();
         }
 
         @Override
-        public MessageId delete(PulsarEvent event) throws PulsarClientException {
-            validateActionType(event);
-            TypedMessageBuilder<PulsarEvent> builder = producer.newMessage().key(getEventKey(event)).value(null);
+        public MessageId delete(PulsarEvent event, String key) throws PulsarClientException {
+            TypedMessageBuilder<PulsarEvent> builder = producer.newMessage().key(key).value(null);
             setReplicateCluster(event, builder);
             return builder.send();
         }
 
         @Override
-        public CompletableFuture<MessageId> deleteAsync(PulsarEvent event) {
+        public CompletableFuture<MessageId> deleteAsync(PulsarEvent event, String key) {
             validateActionType(event);
-            TypedMessageBuilder<PulsarEvent> builder = producer.newMessage().key(getEventKey(event)).value(null);
+            TypedMessageBuilder<PulsarEvent> builder = producer.newMessage().key(key).value(null);
             setReplicateCluster(event, builder);
             return builder.sendAsync();
         }
 
-        private String getEventKey(PulsarEvent event) {
+        public static String getEventKey(PulsarEvent event) {
             return TopicName.get(event.getTopicPoliciesEvent().getDomain(),
                 event.getTopicPoliciesEvent().getTenant(),
                 event.getTopicPoliciesEvent().getNamespace(),
                 event.getTopicPoliciesEvent().getTopic()).toString();
         }
 
+        public static String getEventKey(TopicName topicName) {
+            return TopicName.get(topicName.getDomain().toString(),
+                    topicName.getTenant(),
+                    topicName.getNamespace(),
+                    TopicName.get(topicName.getPartitionedTopicName()).getLocalName()).toString();
+        }

Review Comment:
   I think the caller should provide the `getEventKey` implementation, not the `TopicPoliciesSystemTopicClient`? Please keep the responsibilities clear.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] liangyepianzhou commented on a diff in pull request #16931: [Improve][Txn][PIP-196]Segmented transaction buffer snapshot segment and index system topic

Posted by GitBox <gi...@apache.org>.
liangyepianzhou commented on code in PR #16931:
URL: https://github.com/apache/pulsar/pull/16931#discussion_r1001306990


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java:
##########
@@ -420,7 +420,29 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {
         });
     }
 
+    @Override
+    public void asyncOpenReadOnlyManagedLedger(String managedLedgerName,

Review Comment:
   @codelipenghui Use public CompletableFuture<ReadOnlyManagedLedgerImpl> asyncOpenReadOnlyManagedLedger can avoid defining a new inner class.
   But other asynchronous methods of ManagedLedgerFactoryImpl.java all use callback, whether it is necessary for us to keep it consistent.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #16931: [Improve][Txn][PIP-196]Segmented transaction buffer snapshot segment and index system topic

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #16931:
URL: https://github.com/apache/pulsar/pull/16931#discussion_r1002802232


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java:
##########
@@ -420,7 +420,29 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {
         });
     }
 
+    @Override
+    public void asyncOpenReadOnlyManagedLedger(String managedLedgerName,

Review Comment:
   It's not required @liangyepianzhou 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] liangyepianzhou commented on a diff in pull request #16931: [Improve][Txn][PIP-196]Segmented transaction buffer snapshot segment and index system topic

Posted by GitBox <gi...@apache.org>.
liangyepianzhou commented on code in PR #16931:
URL: https://github.com/apache/pulsar/pull/16931#discussion_r976031447


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java:
##########
@@ -47,6 +47,12 @@ public ReadOnlyManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bo
 
     CompletableFuture<ReadOnlyCursor> initializeAndCreateCursor(PositionImpl startPosition) {
         CompletableFuture<ReadOnlyCursor> future = new CompletableFuture<>();
+        initialize().thenRun(() -> future.complete(createReadOnlyCursor(startPosition)));

Review Comment:
   delete this method



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] liangyepianzhou commented on a diff in pull request #16931: [Improve][Txn][PIP-196]Segmented transaction buffer snapshot segment and index system topic

Posted by GitBox <gi...@apache.org>.
liangyepianzhou commented on code in PR #16931:
URL: https://github.com/apache/pulsar/pull/16931#discussion_r998170994


##########
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TxnID.java:
##########
@@ -39,14 +43,14 @@ public class TxnID implements Serializable {
      *
      * @serial
      */
-    private final long mostSigBits;
+    private long mostSigBits;

Review Comment:
   We need NoArgsConstructor to make it can be deserialized as a schema.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org