You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/07/21 08:05:15 UTC

[GitHub] [pulsar] codelipenghui commented on a diff in pull request #16685: [improve][txn] PIP-160: Transaction log store enables the batch feature

codelipenghui commented on code in PR #16685:
URL: https://github.com/apache/pulsar/pull/16685#discussion_r926344917


##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java:
##########
@@ -66,13 +74,24 @@ public class MLTransactionLogImpl implements TransactionLog {
 
     private final TopicName topicName;
 
+    private TxnLogBufferedWriter<TransactionMetadataEntry> bufferedWriter;
+
+    private final ScheduledExecutorService scheduledExecutorService;
+
+    private final TxnLogBufferedWriterConfig txnLogBufferedWriterConfig;
+
     public MLTransactionLogImpl(TransactionCoordinatorID tcID,
                                 ManagedLedgerFactory managedLedgerFactory,
-                                ManagedLedgerConfig managedLedgerConfig) {
+                                ManagedLedgerConfig managedLedgerConfig,
+                                TxnLogBufferedWriterConfig txnLogBufferedWriterConfig,
+                                ScheduledExecutorService scheduledExecutorService) {
         this.topicName = getMLTransactionLogName(tcID);
         this.tcId = tcID.getId();
         this.managedLedgerFactory = managedLedgerFactory;
         this.managedLedgerConfig = managedLedgerConfig;
+        this.scheduledExecutorService = scheduledExecutorService;
+        this.txnLogBufferedWriterConfig = txnLogBufferedWriterConfig;
+        this.managedLedgerConfig.setDeletionAtBatchIndexLevelEnabled(true);

Review Comment:
   We only need to enable the batch index level ack while enabling the transaction log batch.



##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java:
##########
@@ -233,6 +289,28 @@ public void start() {
         }
     }
 
+    public static List<TransactionMetadataEntry> deserializeEntry(ByteBuf buffer){

Review Comment:
   Do we need to release the buffer after deserializing?



##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnBatchedPositionImpl.java:
##########
@@ -56,4 +61,19 @@ public boolean equals(Object o) {
     public int hashCode() {
         return Objects.hash(super.hashCode(), batchSize, batchIndex);
     }
+
+    /**
+     * Build the attribute ackSet to that {@link #batchIndex} is false and others is true.
+     */
+    public void deleteFromAckSet(){

Review Comment:
   ```suggestion
       void setAckSetByIndex()
   ```



##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java:
##########
@@ -233,6 +289,28 @@ public void start() {
         }
     }
 
+    public static List<TransactionMetadataEntry> deserializeEntry(ByteBuf buffer){
+        // Check whether it is batched Entry.
+        buffer.markReaderIndex();
+        short magicNum = buffer.readShort();
+        buffer.resetReaderIndex();
+        if (magicNum == BATCHED_ENTRY_DATA_PREFIX_MAGIC_NUMBER){
+            // skip version
+            buffer.skipBytes(4);
+            BatchedTransactionMetadataEntry batchedLog = new BatchedTransactionMetadataEntry();
+            batchedLog.parseFrom(buffer, buffer.readableBytes());
+            return batchedLog.getTransactionLogsList();
+        } else {
+            TransactionMetadataEntry log = new TransactionMetadataEntry();
+            log.parseFrom(buffer, buffer.readableBytes());
+            return Collections.singletonList(log);
+        }
+    }
+
+    public static List<TransactionMetadataEntry> deserializeEntry(Entry entry){
+        return deserializeEntry(entry.getDataBuffer());

Review Comment:
   Also need to consider the entry release.



##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnBatchedPositionImpl.java:
##########
@@ -56,4 +61,19 @@ public boolean equals(Object o) {
     public int hashCode() {
         return Objects.hash(super.hashCode(), batchSize, batchIndex);
     }
+
+    /**
+     * Build the attribute ackSet to that {@link #batchIndex} is false and others is true.
+     */
+    public void deleteFromAckSet(){

Review Comment:
   And please add a test to cover the new method.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##########
@@ -343,6 +346,8 @@ public BrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup) throws
         this.backlogQuotaManager = new BacklogQuotaManager(pulsar);
         this.backlogQuotaChecker = Executors
                 .newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-backlog-quota-checker"));
+        this.transactionLogBufferedWriteAsyncFlushTrigger = Executors
+                .newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-backlog-quota-checker"));

Review Comment:
   Can we reuse the `brokerClientSharedTimer` in the PulsarService? It's a little expensive to have a new scheduled executor with 1 millis tick time.



##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java:
##########
@@ -147,14 +175,10 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {
 
     @Override
     public CompletableFuture<Position> append(TransactionMetadataEntry transactionMetadataEntry) {
-        int transactionMetadataEntrySize = transactionMetadataEntry.getSerializedSize();
-        ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(transactionMetadataEntrySize, transactionMetadataEntrySize);
         CompletableFuture<Position> completableFuture = new CompletableFuture<>();
-        transactionMetadataEntry.writeTo(buf);
-        managedLedger.asyncAddEntry(buf, new AsyncCallbacks.AddEntryCallback() {
+        bufferedWriter.asyncAddData(transactionMetadataEntry, new TxnLogBufferedWriter.AddDataCallback() {

Review Comment:
   Please check if the is released after receiving the callback in TxnLogBufferedWriter.addComplete



##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java:
##########
@@ -233,6 +289,28 @@ public void start() {
         }
     }
 
+    public static List<TransactionMetadataEntry> deserializeEntry(ByteBuf buffer){
+        // Check whether it is batched Entry.
+        buffer.markReaderIndex();
+        short magicNum = buffer.readShort();
+        buffer.resetReaderIndex();
+        if (magicNum == BATCHED_ENTRY_DATA_PREFIX_MAGIC_NUMBER){
+            // skip version
+            buffer.skipBytes(4);

Review Comment:
   It does not only skip the version but also skip the magnum right? We should use constant here, e.g. buffer.skipBytes(BATCHED_ENTRY_DATA_PREFIX_MAGIC_NUMBER_LENGH + BATCHED_ENTRY_DATA_PREFIX_VERSION_LENGH)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org