You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/07/19 14:41:41 UTC

[GitHub] [pulsar] poorbarcode opened a new pull request, #16685: [improve] [txn] [PIP-160] Transaction log store enables the batch feature

poorbarcode opened a new pull request, #16685:
URL: https://github.com/apache/pulsar/pull/16685

   Master Issue: #15370
   
   ### Motivation
   
   see #15370
   
   ### Modifications
   
   I will complete proposal #15370 with these pull requests( *current pull request is the step-3* ): 
   
   1. Write the batch transaction log handler: `TxnLogBufferedWriter`
   2. Configuration changes and protocol changes.
   3. Transaction log store enables the batch feature.
   4. Pending ack log store enables the batch feature.
   5. Supports dynamic configuration.
   6. Append admin API for transaction batch log and docs( admin and configuration doc ).
   7. Append metrics support for transaction batch log.
   
   ### Documentation
   
   
   - [ ] `doc-required` 
     
   - [x] `doc-not-needed` 
     
   - [ ] `doc` 
   
   - [ ] `doc-complete`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on pull request #16685: [improve][txn] PIP-160: Transaction log store enables the batch feature

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on PR #16685:
URL: https://github.com/apache/pulsar/pull/16685#issuecomment-1192044492

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] congbobo184 commented on a diff in pull request #16685: [improve][txn] PIP-160: Transaction log store enables the batch feature

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on code in PR #16685:
URL: https://github.com/apache/pulsar/pull/16685#discussion_r925483775


##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java:
##########
@@ -283,4 +365,43 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
         }
 
     }
+
+    private static final FastThreadLocal<BatchedTransactionMetadataEntry> localBatchedTransactionLogCache =
+            new FastThreadLocal<>() {
+                @Override
+                protected BatchedTransactionMetadataEntry initialValue() throws Exception {
+                    return new BatchedTransactionMetadataEntry();
+                }
+            };
+
+    private static class TransactionLogDataSerializer
+            implements TxnLogBufferedWriter.DataSerializer<TransactionMetadataEntry>{
+
+        private static final TransactionLogDataSerializer INSTANCE = new TransactionLogDataSerializer();
+
+        @Override
+        public int getSerializedSize(TransactionMetadataEntry data) {
+            return data.getSerializedSize();
+        }
+
+        @Override
+        public ByteBuf serialize(TransactionMetadataEntry data) {
+            int transactionMetadataEntrySize = data.getSerializedSize();
+            ByteBuf buf =
+                    PulsarByteBufAllocator.DEFAULT.buffer(transactionMetadataEntrySize, transactionMetadataEntrySize);
+            data.writeTo(buf);
+            return buf;
+        }
+
+        @Override
+        public ByteBuf serialize(ArrayList<TransactionMetadataEntry> transactionLogArray) {
+            // Since all writes are in the same thread, so we can use threadLocal variables here.
+            BatchedTransactionMetadataEntry data = localBatchedTransactionLogCache.get();
+            data.addAllTransactionLogs(transactionLogArray);

Review Comment:
   clear



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #16685: [improve][txn] PIP-160: Transaction log store enables the batch feature

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #16685:
URL: https://github.com/apache/pulsar/pull/16685#discussion_r926393477


##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java:
##########
@@ -147,14 +175,10 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {
 
     @Override
     public CompletableFuture<Position> append(TransactionMetadataEntry transactionMetadataEntry) {
-        int transactionMetadataEntrySize = transactionMetadataEntry.getSerializedSize();
-        ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(transactionMetadataEntrySize, transactionMetadataEntrySize);
         CompletableFuture<Position> completableFuture = new CompletableFuture<>();
-        transactionMetadataEntry.writeTo(buf);
-        managedLedger.asyncAddEntry(buf, new AsyncCallbacks.AddEntryCallback() {
+        bufferedWriter.asyncAddData(transactionMetadataEntry, new TxnLogBufferedWriter.AddDataCallback() {

Review Comment:
   No. The byte buf will release by `Entry.release()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #16685: [improve][txn] PIP-160: Transaction log store enables the batch feature

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #16685:
URL: https://github.com/apache/pulsar/pull/16685#discussion_r925622209


##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java:
##########
@@ -209,15 +238,46 @@ class TransactionLogReplayer {
         }
 
         public void start() {
-            TransactionMetadataEntry transactionMetadataEntry = new TransactionMetadataEntry();
-
             while (fillEntryQueueCallback.fillQueue() || entryQueue.size() > 0) {
                 Entry entry = entryQueue.poll();
                 if (entry != null) {
                     try {
-                        ByteBuf buffer = entry.getDataBuffer();
-                        transactionMetadataEntry.parseFrom(buffer, buffer.readableBytes());
-                        transactionLogReplayCallback.handleMetadataEntry(entry.getPosition(), transactionMetadataEntry);
+                        List<TransactionMetadataEntry> logs = deserializeEntry(entry);

Review Comment:
   This method is currently only used in two places: 
   
   - `MLTransactionSequenceIdGenerator.onManagedLedgerLastLedgerInitialize` Only execute once when managed ledger initialized.
   - `MLTransactionLogImpl.replayAsync` Only execute once when Transaction log component initialized.
   
   Neither approach is performed frequently, so keeping good maintainability is a good choice. And `SingletonList` is a very lightweight object. 
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #16685: [improve][txn] PIP-160: Transaction log store enables the batch feature

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #16685:
URL: https://github.com/apache/pulsar/pull/16685#discussion_r926457652


##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java:
##########
@@ -233,6 +289,28 @@ public void start() {
         }
     }
 
+    public static List<TransactionMetadataEntry> deserializeEntry(ByteBuf buffer){
+        // Check whether it is batched Entry.
+        buffer.markReaderIndex();
+        short magicNum = buffer.readShort();
+        buffer.resetReaderIndex();
+        if (magicNum == BATCHED_ENTRY_DATA_PREFIX_MAGIC_NUMBER){
+            // skip version
+            buffer.skipBytes(4);

Review Comment:
   Good idea



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #16685: [improve][txn] PIP-160: Transaction log store enables the batch feature

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #16685:
URL: https://github.com/apache/pulsar/pull/16685#discussion_r925272578


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedLedgerMetricsTest.java:
##########
@@ -94,17 +97,23 @@ public void testManagedLedgerMetrics() throws Exception {
 
     @Test
     public void testTransactionTopic() throws Exception {
+        TxnLogBufferedWriterConfig txnLogBufferedWriterConfig = new TxnLogBufferedWriterConfig();
+        txnLogBufferedWriterConfig.setBatchEnabled(true);
+        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);
         admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
                 new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet("test")));
         admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
         createTransactionCoordinatorAssign();
         ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
         managedLedgerConfig.setMaxEntriesPerLedger(2);
-        new MLTransactionLogImpl(TransactionCoordinatorID.get(0),
-                pulsar.getManagedLedgerFactory(), managedLedgerConfig)
-                .initialize().join();
+        MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(TransactionCoordinatorID.get(0),
+                pulsar.getManagedLedgerFactory(), managedLedgerConfig, txnLogBufferedWriterConfig,
+                scheduledExecutorService);
+        mlTransactionLog.initialize().join();
         ManagedLedgerMetrics metrics = new ManagedLedgerMetrics(pulsar);
         metrics.generate();
+        // cleanup.
+        mlTransactionLog.closeAsync();

Review Comment:
   Already fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #16685: [improve][txn] PIP-160: Transaction log store enables the batch feature

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #16685:
URL: https://github.com/apache/pulsar/pull/16685#discussion_r925579796


##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java:
##########
@@ -283,4 +365,43 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
         }
 
     }
+
+    private static final FastThreadLocal<BatchedTransactionMetadataEntry> localBatchedTransactionLogCache =
+            new FastThreadLocal<>() {
+                @Override
+                protected BatchedTransactionMetadataEntry initialValue() throws Exception {
+                    return new BatchedTransactionMetadataEntry();
+                }
+            };

Review Comment:
   It works, we used `ThreadLocal` here, should not recycle.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #16685: [improve][txn] PIP-160: Transaction log store enables the batch feature

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #16685:
URL: https://github.com/apache/pulsar/pull/16685#discussion_r926885537


##########
pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImplTest.java:
##########
@@ -0,0 +1,91 @@
+package org.apache.pulsar.transaction.coordinator.impl;
+
+import java.util.Arrays;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.pulsar.common.api.proto.Subscription;
+import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
+import org.apache.pulsar.transaction.coordinator.TransactionRecoverTracker;
+import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTracker;
+import org.apache.pulsar.transaction.coordinator.proto.TransactionMetadataEntry;
+import org.apache.pulsar.transaction.coordinator.test.MockedBookKeeperTestCase;
+import static org.mockito.Mockito.*;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+public class MLTransactionLogImplTest extends MockedBookKeeperTestCase {
+
+    @DataProvider(name = "variedBufferedWriteConfigProvider")
+    private Object[][] variedBufferedWriteConfigProvider(){
+        Object[][] args = new Object[4][];
+        args[0] = new Object[]{true, true};
+        args[1] = new Object[]{false, false};
+        args[2] = new Object[]{true, false};
+        args[3] = new Object[]{false, true};
+        return args;
+    }
+
+    @Test(dataProvider = "variedBufferedWriteConfigProvider")
+    public void test1(boolean writeWithBatch, boolean readWithBatch) throws Exception {

Review Comment:
   > Please provide a meaningful test name.
   
   Already fixed.
   
   > And looks like you have created a data provider, but both of the parameters are not used in the test.
   
   Already fixed.



##########
pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImplTest.java:
##########
@@ -0,0 +1,91 @@
+package org.apache.pulsar.transaction.coordinator.impl;
+
+import java.util.Arrays;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.pulsar.common.api.proto.Subscription;
+import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
+import org.apache.pulsar.transaction.coordinator.TransactionRecoverTracker;
+import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTracker;
+import org.apache.pulsar.transaction.coordinator.proto.TransactionMetadataEntry;
+import org.apache.pulsar.transaction.coordinator.test.MockedBookKeeperTestCase;
+import static org.mockito.Mockito.*;

Review Comment:
   Already fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #16685: [improve][txn] PIP-160: Transaction log store enables the batch feature

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #16685:
URL: https://github.com/apache/pulsar/pull/16685#discussion_r925593203


##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java:
##########
@@ -174,6 +197,12 @@ public void addFailed(ManagedLedgerException exception, Object ctx) {
 
     public CompletableFuture<Void> deletePosition(List<Position> positions) {
         CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+        // Change the flag in ackSet to deleted.
+        for (Position position : positions) {
+            if (position instanceof TxnBatchedPositionImpl batchedPosition){
+                batchedPosition.deleteFromAckSet();

Review Comment:
   > If add complete return 11011, index 2, is this add op batch index? We don't need this step, right?
   
   if add complete return 11011, we do not need this step. I have removed the attribute `ackSet` from the return value.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #16685: [improve][txn] PIP-160: Transaction log store enables the batch feature

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #16685:
URL: https://github.com/apache/pulsar/pull/16685#discussion_r925576170


##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterConfig.java:
##########
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator.impl;
+
+import lombok.Data;
+
+@Data
+public class TxnLogBufferedWriterConfig {
+
+    private int batchedWriteMaxRecords = 512;
+    private int batchedWriteMaxSize = 1024 * 1024 * 4;
+    private int batchedWriteMaxDelayInMillis = 1;
+    private boolean batchEnabled = false;

Review Comment:
   > default enable.
   
   In the proposal, our design is default disabled.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #16685: [improve][txn] PIP-160: Transaction log store enables the batch feature

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #16685:
URL: https://github.com/apache/pulsar/pull/16685#discussion_r926396262


##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java:
##########
@@ -233,6 +289,28 @@ public void start() {
         }
     }
 
+    public static List<TransactionMetadataEntry> deserializeEntry(ByteBuf buffer){
+        // Check whether it is batched Entry.
+        buffer.markReaderIndex();
+        short magicNum = buffer.readShort();
+        buffer.resetReaderIndex();
+        if (magicNum == BATCHED_ENTRY_DATA_PREFIX_MAGIC_NUMBER){
+            // skip version
+            buffer.skipBytes(4);
+            BatchedTransactionMetadataEntry batchedLog = new BatchedTransactionMetadataEntry();
+            batchedLog.parseFrom(buffer, buffer.readableBytes());
+            return batchedLog.getTransactionLogsList();
+        } else {
+            TransactionMetadataEntry log = new TransactionMetadataEntry();
+            log.parseFrom(buffer, buffer.readableBytes());
+            return Collections.singletonList(log);
+        }
+    }
+
+    public static List<TransactionMetadataEntry> deserializeEntry(Entry entry){
+        return deserializeEntry(entry.getDataBuffer());

Review Comment:
   No. It will be released in `TransactionLogReplayer`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] congbobo184 commented on a diff in pull request #16685: [improve][txn] PIP-160: Transaction log store enables the batch feature

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on code in PR #16685:
URL: https://github.com/apache/pulsar/pull/16685#discussion_r925354353


##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java:
##########
@@ -209,15 +238,46 @@ class TransactionLogReplayer {
         }
 
         public void start() {
-            TransactionMetadataEntry transactionMetadataEntry = new TransactionMetadataEntry();
-
             while (fillEntryQueueCallback.fillQueue() || entryQueue.size() > 0) {
                 Entry entry = entryQueue.poll();
                 if (entry != null) {
                     try {
-                        ByteBuf buffer = entry.getDataBuffer();
-                        transactionMetadataEntry.parseFrom(buffer, buffer.readableBytes());
-                        transactionLogReplayCallback.handleMetadataEntry(entry.getPosition(), transactionMetadataEntry);
+                        List<TransactionMetadataEntry> logs = deserializeEntry(entry);

Review Comment:
   If check the entry is batch entry first, we don't need return list when the entry is not batch entry



##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java:
##########
@@ -174,6 +197,12 @@ public void addFailed(ManagedLedgerException exception, Object ctx) {
 
     public CompletableFuture<Void> deletePosition(List<Position> positions) {
         CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+        // Change the flag in ackSet to deleted.
+        for (Position position : positions) {
+            if (position instanceof TxnBatchedPositionImpl batchedPosition){
+                batchedPosition.deleteFromAckSet();

Review Comment:
   If add complete return 11011, index 2, is this add op batch index? We don't need this step, right?



##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterConfig.java:
##########
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator.impl;
+
+import lombok.Data;
+
+@Data
+public class TxnLogBufferedWriterConfig {
+
+    private int batchedWriteMaxRecords = 512;
+    private int batchedWriteMaxSize = 1024 * 1024 * 4;
+    private int batchedWriteMaxDelayInMillis = 1;
+    private boolean batchEnabled = false;

Review Comment:
   default enable



##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java:
##########
@@ -283,4 +365,43 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
         }
 
     }
+
+    private static final FastThreadLocal<BatchedTransactionMetadataEntry> localBatchedTransactionLogCache =
+            new FastThreadLocal<>() {
+                @Override
+                protected BatchedTransactionMetadataEntry initialValue() throws Exception {
+                    return new BatchedTransactionMetadataEntry();
+                }
+            };

Review Comment:
   This doesn't seem to work because there is no recycle



##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java:
##########
@@ -209,15 +238,46 @@ class TransactionLogReplayer {
         }
 
         public void start() {
-            TransactionMetadataEntry transactionMetadataEntry = new TransactionMetadataEntry();
-
             while (fillEntryQueueCallback.fillQueue() || entryQueue.size() > 0) {
                 Entry entry = entryQueue.poll();
                 if (entry != null) {
                     try {
-                        ByteBuf buffer = entry.getDataBuffer();
-                        transactionMetadataEntry.parseFrom(buffer, buffer.readableBytes());
-                        transactionLogReplayCallback.handleMetadataEntry(entry.getPosition(), transactionMetadataEntry);
+                        List<TransactionMetadataEntry> logs = deserializeEntry(entry);
+                        if (logs.isEmpty()){
+                            continue;
+                        } else if (logs.size() == 1){
+                            TransactionMetadataEntry log = logs.get(0);
+                            transactionLogReplayCallback.handleMetadataEntry(entry.getPosition(), log);
+                        } else {
+                            /**
+                             * 1. Query batch index of current entry from cursor.
+                             * 2. Filter the data which has already ack.
+                             * 3. Build batched position and handle valid data.
+                             */
+                            long[] ackSetAlreadyAck = cursor.getDeletedBatchIndexesAsLongArray(
+                                    PositionImpl.get(entry.getLedgerId(), entry.getEntryId()));
+                            BitSetRecyclable bitSetAlreadyAck = null;
+                            if (ackSetAlreadyAck != null){
+                                bitSetAlreadyAck = BitSetRecyclable.valueOf(ackSetAlreadyAck);
+                            }
+                            int batchSize = logs.size();
+                            for (int batchIndex = 0; batchIndex < batchSize; batchIndex++){
+                                if (bitSetAlreadyAck != null && !bitSetAlreadyAck.get(batchIndex)){
+                                   continue;
+                                }
+                                TransactionMetadataEntry log = logs.get(batchIndex);
+                                BitSetRecyclable bitSetOfCurrentRecord = BitSetRecyclable.create();
+                                bitSetOfCurrentRecord.set(batchIndex);
+                                long[] ackSetOfCurrentRecord = bitSetOfCurrentRecord.toLongArray();
+                                bitSetOfCurrentRecord.recycle();
+                                PositionImpl batchedPosition = PositionImpl.get(entry.getLedgerId(),
+                                        entry.getEntryId(), ackSetOfCurrentRecord);

Review Comment:
   because the position is only used to delete the log data, only for cursor ack the position, so I think set(0, batchSize - 1) and clear(batch index) si better, otherwise when you ack the position should flip the position



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #16685: [improve][txn] PIP-160: Transaction log store enables the batch feature

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #16685:
URL: https://github.com/apache/pulsar/pull/16685#discussion_r926578204


##########
pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImplTest.java:
##########
@@ -0,0 +1,91 @@
+package org.apache.pulsar.transaction.coordinator.impl;
+
+import java.util.Arrays;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.pulsar.common.api.proto.Subscription;
+import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
+import org.apache.pulsar.transaction.coordinator.TransactionRecoverTracker;
+import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTracker;
+import org.apache.pulsar.transaction.coordinator.proto.TransactionMetadataEntry;
+import org.apache.pulsar.transaction.coordinator.test.MockedBookKeeperTestCase;
+import static org.mockito.Mockito.*;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+public class MLTransactionLogImplTest extends MockedBookKeeperTestCase {
+
+    @DataProvider(name = "variedBufferedWriteConfigProvider")
+    private Object[][] variedBufferedWriteConfigProvider(){
+        Object[][] args = new Object[4][];
+        args[0] = new Object[]{true, true};
+        args[1] = new Object[]{false, false};
+        args[2] = new Object[]{true, false};
+        args[3] = new Object[]{false, true};
+        return args;
+    }
+
+    @Test(dataProvider = "variedBufferedWriteConfigProvider")
+    public void test1(boolean writeWithBatch, boolean readWithBatch) throws Exception {

Review Comment:
   Please provide a meaningful test name.



##########
pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImplTest.java:
##########
@@ -0,0 +1,91 @@
+package org.apache.pulsar.transaction.coordinator.impl;
+
+import java.util.Arrays;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.pulsar.common.api.proto.Subscription;
+import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
+import org.apache.pulsar.transaction.coordinator.TransactionRecoverTracker;
+import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTracker;
+import org.apache.pulsar.transaction.coordinator.proto.TransactionMetadataEntry;
+import org.apache.pulsar.transaction.coordinator.test.MockedBookKeeperTestCase;
+import static org.mockito.Mockito.*;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+public class MLTransactionLogImplTest extends MockedBookKeeperTestCase {
+
+    @DataProvider(name = "variedBufferedWriteConfigProvider")
+    private Object[][] variedBufferedWriteConfigProvider(){
+        Object[][] args = new Object[4][];
+        args[0] = new Object[]{true, true};
+        args[1] = new Object[]{false, false};
+        args[2] = new Object[]{true, false};
+        args[3] = new Object[]{false, true};
+        return args;
+    }
+
+    @Test(dataProvider = "variedBufferedWriteConfigProvider")
+    public void test1(boolean writeWithBatch, boolean readWithBatch) throws Exception {
+        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);
+        TxnLogBufferedWriterConfig bufferedWriterConfig = new TxnLogBufferedWriterConfig();
+        TransactionCoordinatorID transactionCoordinatorID = TransactionCoordinatorID.get(0);
+        MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(TransactionCoordinatorID.get(0), factory,
+                new ManagedLedgerConfig(), bufferedWriterConfig, scheduledExecutorService);
+        mlTransactionLog.initialize().get(2, TimeUnit.SECONDS);
+
+        // Add logs: start transaction.
+        for (int i = 1; i <= 30; i++){
+            TransactionMetadataEntry transactionLog = new TransactionMetadataEntry();
+            transactionLog.setTxnidMostBits(i);
+            transactionLog.setTxnidLeastBits(i);
+            transactionLog.setMaxLocalTxnId(i);
+            transactionLog.setStartTime(i);
+            transactionLog.setTimeoutMs(i);
+            transactionLog.setMetadataOp(TransactionMetadataEntry.TransactionMetadataOp.NEW);
+            mlTransactionLog.append(transactionLog);
+        }
+        // Add logs: add partition.
+        for (int i = 1; i <= 30; i++){
+            TransactionMetadataEntry transactionLog = new TransactionMetadataEntry();
+            transactionLog.setTxnidLeastBits(i);
+            transactionLog.setMaxLocalTxnId(i);
+            transactionLog.setMetadataOp(TransactionMetadataEntry.TransactionMetadataOp.ADD_PARTITION);
+            transactionLog.addAllPartitions(Arrays.asList(String.valueOf(i)));
+            mlTransactionLog.append(transactionLog);
+        }
+        // Add logs: add subscription.
+        for (int i = 1; i <= 30; i++){
+            TransactionMetadataEntry transactionLog = new TransactionMetadataEntry();
+            transactionLog.setTxnidLeastBits(i);
+            transactionLog.setMaxLocalTxnId(i);
+            transactionLog.setMetadataOp(TransactionMetadataEntry.TransactionMetadataOp.ADD_SUBSCRIPTION);
+            Subscription subscription = new Subscription();
+            subscription.setSubscription(String.valueOf(i));
+            subscription.setTopic(String.valueOf(i));
+            transactionLog.addAllSubscriptions(Arrays.asList(subscription));
+            mlTransactionLog.append(transactionLog);
+        }
+        // Add logs: commit.
+        for (int i = 1; i <= 30; i++){
+            TransactionMetadataEntry transactionLog = new TransactionMetadataEntry();
+            transactionLog.setTxnidLeastBits(i);
+            transactionLog.setMaxLocalTxnId(i);
+            transactionLog.setMetadataOp(TransactionMetadataEntry.TransactionMetadataOp.UPDATE);
+            mlTransactionLog.append(transactionLog);
+        }
+        // Verify the mapping of position.
+
+
+        // Verify recover correct.
+        TransactionTimeoutTracker timeoutTracker = mock(TransactionTimeoutTracker.class);
+        MLTransactionSequenceIdGenerator sequenceIdGenerator = mock(MLTransactionSequenceIdGenerator.class);
+        TransactionRecoverTracker recoverTracker = mock(TransactionRecoverTracker.class);
+        // TODO 记录发送出去的请求

Review Comment:
   Do we need this one?



##########
pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImplTest.java:
##########
@@ -0,0 +1,91 @@
+package org.apache.pulsar.transaction.coordinator.impl;
+
+import java.util.Arrays;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.pulsar.common.api.proto.Subscription;
+import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
+import org.apache.pulsar.transaction.coordinator.TransactionRecoverTracker;
+import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTracker;
+import org.apache.pulsar.transaction.coordinator.proto.TransactionMetadataEntry;
+import org.apache.pulsar.transaction.coordinator.test.MockedBookKeeperTestCase;
+import static org.mockito.Mockito.*;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+public class MLTransactionLogImplTest extends MockedBookKeeperTestCase {

Review Comment:
   About the test, we should check the following situation.
   
   1. The batched log can be replay even if the batch feature is disabled.
   2. The non-batched log can be replay after the batch feature is enabled.



##########
pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImplTest.java:
##########
@@ -0,0 +1,91 @@
+package org.apache.pulsar.transaction.coordinator.impl;
+
+import java.util.Arrays;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.pulsar.common.api.proto.Subscription;
+import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
+import org.apache.pulsar.transaction.coordinator.TransactionRecoverTracker;
+import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTracker;
+import org.apache.pulsar.transaction.coordinator.proto.TransactionMetadataEntry;
+import org.apache.pulsar.transaction.coordinator.test.MockedBookKeeperTestCase;
+import static org.mockito.Mockito.*;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+public class MLTransactionLogImplTest extends MockedBookKeeperTestCase {
+
+    @DataProvider(name = "variedBufferedWriteConfigProvider")
+    private Object[][] variedBufferedWriteConfigProvider(){
+        Object[][] args = new Object[4][];
+        args[0] = new Object[]{true, true};
+        args[1] = new Object[]{false, false};
+        args[2] = new Object[]{true, false};
+        args[3] = new Object[]{false, true};
+        return args;
+    }
+
+    @Test(dataProvider = "variedBufferedWriteConfigProvider")
+    public void test1(boolean writeWithBatch, boolean readWithBatch) throws Exception {

Review Comment:
   And looks like you have created a data provider, but both of the parameters are not used in the test.



##########
pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImplTest.java:
##########
@@ -0,0 +1,91 @@
+package org.apache.pulsar.transaction.coordinator.impl;
+
+import java.util.Arrays;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.pulsar.common.api.proto.Subscription;
+import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
+import org.apache.pulsar.transaction.coordinator.TransactionRecoverTracker;
+import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTracker;
+import org.apache.pulsar.transaction.coordinator.proto.TransactionMetadataEntry;
+import org.apache.pulsar.transaction.coordinator.test.MockedBookKeeperTestCase;
+import static org.mockito.Mockito.*;

Review Comment:
   Avoid the star import.



##########
pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImplTest.java:
##########
@@ -0,0 +1,91 @@
+package org.apache.pulsar.transaction.coordinator.impl;
+
+import java.util.Arrays;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.pulsar.common.api.proto.Subscription;
+import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
+import org.apache.pulsar.transaction.coordinator.TransactionRecoverTracker;
+import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTracker;
+import org.apache.pulsar.transaction.coordinator.proto.TransactionMetadataEntry;
+import org.apache.pulsar.transaction.coordinator.test.MockedBookKeeperTestCase;
+import static org.mockito.Mockito.*;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+public class MLTransactionLogImplTest extends MockedBookKeeperTestCase {
+
+    @DataProvider(name = "variedBufferedWriteConfigProvider")
+    private Object[][] variedBufferedWriteConfigProvider(){
+        Object[][] args = new Object[4][];
+        args[0] = new Object[]{true, true};
+        args[1] = new Object[]{false, false};
+        args[2] = new Object[]{true, false};
+        args[3] = new Object[]{false, true};
+        return args;

Review Comment:
   ```suggestion
           {true, true},
           {false, false},
           {true, false},
           {false, true}
   ```



##########
pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnBatchedPositionImplTest.java:
##########
@@ -0,0 +1,38 @@
+package org.apache.pulsar.transaction.coordinator.impl;
+
+import org.apache.pulsar.common.util.collections.BitSetRecyclable;
+import org.testng.Assert;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker")
+public class TxnBatchedPositionImplTest {
+
+    @DataProvider(name = "batchSizeAndBatchIndexArgsArray")
+    private Object[][] batchSizeAndBatchIndexArgsArray(){
+        Object[][] args = new Object[5][];
+        args[0] = new Object[]{10, 5};
+        args[1] = new Object[]{64, 0};
+        args[2] = new Object[]{64, 63};
+        args[3] = new Object[]{230, 120};
+        args[4] = new Object[]{256, 255};
+        return args;

Review Comment:
   ```suggestion
           {10, 5},
           {64, 0},
           {64, 63},
           {230, 120},
           {256, 255}
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #16685: [improve][txn] PIP-160: Transaction log store enables the batch feature

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #16685:
URL: https://github.com/apache/pulsar/pull/16685#discussion_r926419200


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##########
@@ -343,6 +346,8 @@ public BrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup) throws
         this.backlogQuotaManager = new BacklogQuotaManager(pulsar);
         this.backlogQuotaChecker = Executors
                 .newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-backlog-quota-checker"));
+        this.transactionLogBufferedWriteAsyncFlushTrigger = Executors
+                .newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-backlog-quota-checker"));

Review Comment:
   Good idea.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #16685: [improve][txn] PIP-160: Transaction log store enables the batch feature

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #16685:
URL: https://github.com/apache/pulsar/pull/16685#discussion_r926344917


##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java:
##########
@@ -66,13 +74,24 @@ public class MLTransactionLogImpl implements TransactionLog {
 
     private final TopicName topicName;
 
+    private TxnLogBufferedWriter<TransactionMetadataEntry> bufferedWriter;
+
+    private final ScheduledExecutorService scheduledExecutorService;
+
+    private final TxnLogBufferedWriterConfig txnLogBufferedWriterConfig;
+
     public MLTransactionLogImpl(TransactionCoordinatorID tcID,
                                 ManagedLedgerFactory managedLedgerFactory,
-                                ManagedLedgerConfig managedLedgerConfig) {
+                                ManagedLedgerConfig managedLedgerConfig,
+                                TxnLogBufferedWriterConfig txnLogBufferedWriterConfig,
+                                ScheduledExecutorService scheduledExecutorService) {
         this.topicName = getMLTransactionLogName(tcID);
         this.tcId = tcID.getId();
         this.managedLedgerFactory = managedLedgerFactory;
         this.managedLedgerConfig = managedLedgerConfig;
+        this.scheduledExecutorService = scheduledExecutorService;
+        this.txnLogBufferedWriterConfig = txnLogBufferedWriterConfig;
+        this.managedLedgerConfig.setDeletionAtBatchIndexLevelEnabled(true);

Review Comment:
   We only need to enable the batch index level ack while enabling the transaction log batch.



##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java:
##########
@@ -233,6 +289,28 @@ public void start() {
         }
     }
 
+    public static List<TransactionMetadataEntry> deserializeEntry(ByteBuf buffer){

Review Comment:
   Do we need to release the buffer after deserializing?



##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnBatchedPositionImpl.java:
##########
@@ -56,4 +61,19 @@ public boolean equals(Object o) {
     public int hashCode() {
         return Objects.hash(super.hashCode(), batchSize, batchIndex);
     }
+
+    /**
+     * Build the attribute ackSet to that {@link #batchIndex} is false and others is true.
+     */
+    public void deleteFromAckSet(){

Review Comment:
   ```suggestion
       void setAckSetByIndex()
   ```



##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java:
##########
@@ -233,6 +289,28 @@ public void start() {
         }
     }
 
+    public static List<TransactionMetadataEntry> deserializeEntry(ByteBuf buffer){
+        // Check whether it is batched Entry.
+        buffer.markReaderIndex();
+        short magicNum = buffer.readShort();
+        buffer.resetReaderIndex();
+        if (magicNum == BATCHED_ENTRY_DATA_PREFIX_MAGIC_NUMBER){
+            // skip version
+            buffer.skipBytes(4);
+            BatchedTransactionMetadataEntry batchedLog = new BatchedTransactionMetadataEntry();
+            batchedLog.parseFrom(buffer, buffer.readableBytes());
+            return batchedLog.getTransactionLogsList();
+        } else {
+            TransactionMetadataEntry log = new TransactionMetadataEntry();
+            log.parseFrom(buffer, buffer.readableBytes());
+            return Collections.singletonList(log);
+        }
+    }
+
+    public static List<TransactionMetadataEntry> deserializeEntry(Entry entry){
+        return deserializeEntry(entry.getDataBuffer());

Review Comment:
   Also need to consider the entry release.



##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnBatchedPositionImpl.java:
##########
@@ -56,4 +61,19 @@ public boolean equals(Object o) {
     public int hashCode() {
         return Objects.hash(super.hashCode(), batchSize, batchIndex);
     }
+
+    /**
+     * Build the attribute ackSet to that {@link #batchIndex} is false and others is true.
+     */
+    public void deleteFromAckSet(){

Review Comment:
   And please add a test to cover the new method.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##########
@@ -343,6 +346,8 @@ public BrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup) throws
         this.backlogQuotaManager = new BacklogQuotaManager(pulsar);
         this.backlogQuotaChecker = Executors
                 .newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-backlog-quota-checker"));
+        this.transactionLogBufferedWriteAsyncFlushTrigger = Executors
+                .newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-backlog-quota-checker"));

Review Comment:
   Can we reuse the `brokerClientSharedTimer` in the PulsarService? It's a little expensive to have a new scheduled executor with 1 millis tick time.



##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java:
##########
@@ -147,14 +175,10 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {
 
     @Override
     public CompletableFuture<Position> append(TransactionMetadataEntry transactionMetadataEntry) {
-        int transactionMetadataEntrySize = transactionMetadataEntry.getSerializedSize();
-        ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(transactionMetadataEntrySize, transactionMetadataEntrySize);
         CompletableFuture<Position> completableFuture = new CompletableFuture<>();
-        transactionMetadataEntry.writeTo(buf);
-        managedLedger.asyncAddEntry(buf, new AsyncCallbacks.AddEntryCallback() {
+        bufferedWriter.asyncAddData(transactionMetadataEntry, new TxnLogBufferedWriter.AddDataCallback() {

Review Comment:
   Please check if the is released after receiving the callback in TxnLogBufferedWriter.addComplete



##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java:
##########
@@ -233,6 +289,28 @@ public void start() {
         }
     }
 
+    public static List<TransactionMetadataEntry> deserializeEntry(ByteBuf buffer){
+        // Check whether it is batched Entry.
+        buffer.markReaderIndex();
+        short magicNum = buffer.readShort();
+        buffer.resetReaderIndex();
+        if (magicNum == BATCHED_ENTRY_DATA_PREFIX_MAGIC_NUMBER){
+            // skip version
+            buffer.skipBytes(4);

Review Comment:
   It does not only skip the version but also skip the magnum right? We should use constant here, e.g. buffer.skipBytes(BATCHED_ENTRY_DATA_PREFIX_MAGIC_NUMBER_LENGH + BATCHED_ENTRY_DATA_PREFIX_VERSION_LENGH)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] liangyepianzhou commented on a diff in pull request #16685: [improve][txn] PIP-160: Transaction log store enables the batch feature

Posted by GitBox <gi...@apache.org>.
liangyepianzhou commented on code in PR #16685:
URL: https://github.com/apache/pulsar/pull/16685#discussion_r925127257


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedLedgerMetricsTest.java:
##########
@@ -94,17 +97,23 @@ public void testManagedLedgerMetrics() throws Exception {
 
     @Test
     public void testTransactionTopic() throws Exception {
+        TxnLogBufferedWriterConfig txnLogBufferedWriterConfig = new TxnLogBufferedWriterConfig();
+        txnLogBufferedWriterConfig.setBatchEnabled(true);
+        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);
         admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
                 new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet("test")));
         admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
         createTransactionCoordinatorAssign();
         ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
         managedLedgerConfig.setMaxEntriesPerLedger(2);
-        new MLTransactionLogImpl(TransactionCoordinatorID.get(0),
-                pulsar.getManagedLedgerFactory(), managedLedgerConfig)
-                .initialize().join();
+        MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(TransactionCoordinatorID.get(0),
+                pulsar.getManagedLedgerFactory(), managedLedgerConfig, txnLogBufferedWriterConfig,
+                scheduledExecutorService);
+        mlTransactionLog.initialize().join();
         ManagedLedgerMetrics metrics = new ManagedLedgerMetrics(pulsar);
         metrics.generate();
+        // cleanup.
+        mlTransactionLog.closeAsync();

Review Comment:
   ```suggestion
           mlTransactionLog.closeAsync();
           scheduledExecutorService.shutdown();
   ```



##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java:
##########
@@ -247,8 +247,11 @@ public CompletableFuture<TxnID> newTransaction(long timeOut) {
                         .setMetadataOp(TransactionMetadataEntry.TransactionMetadataOp.NEW)
                         .setLastModificationTime(currentTimeMillis)
                         .setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
+                String id = UUID.randomUUID().toString();

Review Comment:
   ```suggestion
   
   ```



##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java:
##########
@@ -247,8 +247,11 @@ public CompletableFuture<TxnID> newTransaction(long timeOut) {
                         .setMetadataOp(TransactionMetadataEntry.TransactionMetadataOp.NEW)
                         .setLastModificationTime(currentTimeMillis)
                         .setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
+                String id = UUID.randomUUID().toString();
+                System.out.println(id + " start");
                 transactionLog.append(transactionMetadataEntry)
                         .whenComplete((position, throwable) -> {
+                            System.out.println(id + " end");

Review Comment:
   ```suggestion
                            if (log.isDebugEnabled()) {
                                   log.debug("Transaction coordinator [{}] complete to open transaction [{}]",
                                           txnID.getMostSigBits(), txnID.getLeastSigBits());
                               }
   ```



##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java:
##########
@@ -247,8 +247,11 @@ public CompletableFuture<TxnID> newTransaction(long timeOut) {
                         .setMetadataOp(TransactionMetadataEntry.TransactionMetadataOp.NEW)
                         .setLastModificationTime(currentTimeMillis)
                         .setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
+                String id = UUID.randomUUID().toString();
+                System.out.println(id + " start");

Review Comment:
   ```suggestion
                  if (log.isDebugEnabled()) {
                       log.debug("Transaction coordinator [{}] start to open transaction [{}]", 
                               txnID.getMostSigBits(), txnID.getLeastSigBits());
                   } 
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] liangyepianzhou commented on a diff in pull request #16685: [improve][txn] PIP-160: Transaction log store enables the batch feature

Posted by GitBox <gi...@apache.org>.
liangyepianzhou commented on code in PR #16685:
URL: https://github.com/apache/pulsar/pull/16685#discussion_r925144474


##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java:
##########
@@ -174,6 +197,12 @@ public void addFailed(ManagedLedgerException exception, Object ctx) {
 
     public CompletableFuture<Void> deletePosition(List<Position> positions) {
         CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+        // Change the flag in ackSet to deleted.
+        for (Position position : positions) {
+            if (position instanceof TxnBatchedPositionImpl batchedPosition){
+                batchedPosition.deleteFromAckSet();

Review Comment:
   A little question, If the BacthLog support deletion at the batch index level?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #16685: [improve][txn] PIP-160: Transaction log store enables the batch feature

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #16685:
URL: https://github.com/apache/pulsar/pull/16685#discussion_r925584342


##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java:
##########
@@ -209,15 +238,46 @@ class TransactionLogReplayer {
         }
 
         public void start() {
-            TransactionMetadataEntry transactionMetadataEntry = new TransactionMetadataEntry();
-
             while (fillEntryQueueCallback.fillQueue() || entryQueue.size() > 0) {
                 Entry entry = entryQueue.poll();
                 if (entry != null) {
                     try {
-                        ByteBuf buffer = entry.getDataBuffer();
-                        transactionMetadataEntry.parseFrom(buffer, buffer.readableBytes());
-                        transactionLogReplayCallback.handleMetadataEntry(entry.getPosition(), transactionMetadataEntry);
+                        List<TransactionMetadataEntry> logs = deserializeEntry(entry);
+                        if (logs.isEmpty()){
+                            continue;
+                        } else if (logs.size() == 1){
+                            TransactionMetadataEntry log = logs.get(0);
+                            transactionLogReplayCallback.handleMetadataEntry(entry.getPosition(), log);
+                        } else {
+                            /**
+                             * 1. Query batch index of current entry from cursor.
+                             * 2. Filter the data which has already ack.
+                             * 3. Build batched position and handle valid data.
+                             */
+                            long[] ackSetAlreadyAck = cursor.getDeletedBatchIndexesAsLongArray(
+                                    PositionImpl.get(entry.getLedgerId(), entry.getEntryId()));
+                            BitSetRecyclable bitSetAlreadyAck = null;
+                            if (ackSetAlreadyAck != null){
+                                bitSetAlreadyAck = BitSetRecyclable.valueOf(ackSetAlreadyAck);
+                            }
+                            int batchSize = logs.size();
+                            for (int batchIndex = 0; batchIndex < batchSize; batchIndex++){
+                                if (bitSetAlreadyAck != null && !bitSetAlreadyAck.get(batchIndex)){
+                                   continue;
+                                }
+                                TransactionMetadataEntry log = logs.get(batchIndex);
+                                BitSetRecyclable bitSetOfCurrentRecord = BitSetRecyclable.create();
+                                bitSetOfCurrentRecord.set(batchIndex);
+                                long[] ackSetOfCurrentRecord = bitSetOfCurrentRecord.toLongArray();
+                                bitSetOfCurrentRecord.recycle();
+                                PositionImpl batchedPosition = PositionImpl.get(entry.getLedgerId(),
+                                        entry.getEntryId(), ackSetOfCurrentRecord);

Review Comment:
   Already deleted.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #16685: [improve][txn] PIP-160: Transaction log store enables the batch feature

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #16685:
URL: https://github.com/apache/pulsar/pull/16685#discussion_r925271722


##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java:
##########
@@ -247,8 +247,11 @@ public CompletableFuture<TxnID> newTransaction(long timeOut) {
                         .setMetadataOp(TransactionMetadataEntry.TransactionMetadataOp.NEW)
                         .setLastModificationTime(currentTimeMillis)
                         .setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
+                String id = UUID.randomUUID().toString();
+                System.out.println(id + " start");
                 transactionLog.append(transactionMetadataEntry)
                         .whenComplete((position, throwable) -> {
+                            System.out.println(id + " end");

Review Comment:
   This is the debug problem log, which has been deleted.
   
   Thanks



##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java:
##########
@@ -247,8 +247,11 @@ public CompletableFuture<TxnID> newTransaction(long timeOut) {
                         .setMetadataOp(TransactionMetadataEntry.TransactionMetadataOp.NEW)
                         .setLastModificationTime(currentTimeMillis)
                         .setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
+                String id = UUID.randomUUID().toString();
+                System.out.println(id + " start");

Review Comment:
   This is the debug problem log, which has been deleted.
   
   Thanks



##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java:
##########
@@ -247,8 +247,11 @@ public CompletableFuture<TxnID> newTransaction(long timeOut) {
                         .setMetadataOp(TransactionMetadataEntry.TransactionMetadataOp.NEW)
                         .setLastModificationTime(currentTimeMillis)
                         .setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
+                String id = UUID.randomUUID().toString();

Review Comment:
   This is the debug problem log, which has been deleted.
   
   Thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #16685: [improve][txn] PIP-160: Transaction log store enables the batch feature

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #16685:
URL: https://github.com/apache/pulsar/pull/16685#discussion_r926218619


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedLedgerMetricsTest.java:
##########
@@ -94,17 +97,24 @@ public void testManagedLedgerMetrics() throws Exception {
 
     @Test
     public void testTransactionTopic() throws Exception {
+        TxnLogBufferedWriterConfig txnLogBufferedWriterConfig = new TxnLogBufferedWriterConfig();
+        txnLogBufferedWriterConfig.setBatchEnabled(false);
+        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);
         admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
                 new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet("test")));
         admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
         createTransactionCoordinatorAssign();
         ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
         managedLedgerConfig.setMaxEntriesPerLedger(2);
-        new MLTransactionLogImpl(TransactionCoordinatorID.get(0),
-                pulsar.getManagedLedgerFactory(), managedLedgerConfig)
-                .initialize().join();
+        MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(TransactionCoordinatorID.get(0),
+                pulsar.getManagedLedgerFactory(), managedLedgerConfig, txnLogBufferedWriterConfig,
+                scheduledExecutorService);
+        mlTransactionLog.initialize().join();

Review Comment:
   Good idea. Already fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #16685: [improve][txn] PIP-160: Transaction log store enables the batch feature

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #16685:
URL: https://github.com/apache/pulsar/pull/16685#discussion_r926393477


##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java:
##########
@@ -147,14 +175,10 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {
 
     @Override
     public CompletableFuture<Position> append(TransactionMetadataEntry transactionMetadataEntry) {
-        int transactionMetadataEntrySize = transactionMetadataEntry.getSerializedSize();
-        ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(transactionMetadataEntrySize, transactionMetadataEntrySize);
         CompletableFuture<Position> completableFuture = new CompletableFuture<>();
-        transactionMetadataEntry.writeTo(buf);
-        managedLedger.asyncAddEntry(buf, new AsyncCallbacks.AddEntryCallback() {
+        bufferedWriter.asyncAddData(transactionMetadataEntry, new TxnLogBufferedWriter.AddDataCallback() {

Review Comment:
   No. The byte buf will release by `TxnLogBufferedWriter`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #16685: [improve][txn] PIP-160: Transaction log store enables the batch feature

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #16685:
URL: https://github.com/apache/pulsar/pull/16685#discussion_r926456114


##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java:
##########
@@ -233,6 +289,28 @@ public void start() {
         }
     }
 
+    public static List<TransactionMetadataEntry> deserializeEntry(ByteBuf buffer){

Review Comment:
   No. The byte buf will release by Entry.release().



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #16685: [improve][txn] PIP-160: Transaction log store enables the batch feature

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #16685:
URL: https://github.com/apache/pulsar/pull/16685#discussion_r926554988


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##########
@@ -343,6 +346,8 @@ public BrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup) throws
         this.backlogQuotaManager = new BacklogQuotaManager(pulsar);
         this.backlogQuotaChecker = Executors
                 .newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-backlog-quota-checker"));
+        this.transactionLogBufferedWriteAsyncFlushTrigger = Executors
+                .newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-backlog-quota-checker"));

Review Comment:
   I will fix it in the next PR



##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java:
##########
@@ -66,13 +74,24 @@ public class MLTransactionLogImpl implements TransactionLog {
 
     private final TopicName topicName;
 
+    private TxnLogBufferedWriter<TransactionMetadataEntry> bufferedWriter;
+
+    private final ScheduledExecutorService scheduledExecutorService;
+
+    private final TxnLogBufferedWriterConfig txnLogBufferedWriterConfig;
+
     public MLTransactionLogImpl(TransactionCoordinatorID tcID,
                                 ManagedLedgerFactory managedLedgerFactory,
-                                ManagedLedgerConfig managedLedgerConfig) {
+                                ManagedLedgerConfig managedLedgerConfig,
+                                TxnLogBufferedWriterConfig txnLogBufferedWriterConfig,
+                                ScheduledExecutorService scheduledExecutorService) {
         this.topicName = getMLTransactionLogName(tcID);
         this.tcId = tcID.getId();
         this.managedLedgerFactory = managedLedgerFactory;
         this.managedLedgerConfig = managedLedgerConfig;
+        this.scheduledExecutorService = scheduledExecutorService;
+        this.txnLogBufferedWriterConfig = txnLogBufferedWriterConfig;
+        this.managedLedgerConfig.setDeletionAtBatchIndexLevelEnabled(true);

Review Comment:
   Already fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #16685: [improve][txn] PIP-160: Transaction log store enables the batch feature

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #16685:
URL: https://github.com/apache/pulsar/pull/16685#discussion_r926555359


##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnBatchedPositionImpl.java:
##########
@@ -56,4 +61,19 @@ public boolean equals(Object o) {
     public int hashCode() {
         return Objects.hash(super.hashCode(), batchSize, batchIndex);
     }
+
+    /**
+     * Build the attribute ackSet to that {@link #batchIndex} is false and others is true.
+     */
+    public void deleteFromAckSet(){

Review Comment:
   Already fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] congbobo184 merged pull request #16685: [improve][txn] PIP-160: Transaction log store enables the batch feature

Posted by GitBox <gi...@apache.org>.
congbobo184 merged PR #16685:
URL: https://github.com/apache/pulsar/pull/16685


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #16685: [improve][txn] PIP-160: Transaction log store enables the batch feature

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #16685:
URL: https://github.com/apache/pulsar/pull/16685#discussion_r926884649


##########
pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java:
##########
@@ -55,12 +60,19 @@
 
 public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
 
+    private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);

Review Comment:
   Already fixed, it is `MLTransactionLogImplTest`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #16685: [improve][txn] PIP-160: Transaction log store enables the batch feature

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #16685:
URL: https://github.com/apache/pulsar/pull/16685#discussion_r926504867


##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnBatchedPositionImpl.java:
##########
@@ -56,4 +61,19 @@ public boolean equals(Object o) {
     public int hashCode() {
         return Objects.hash(super.hashCode(), batchSize, batchIndex);
     }
+
+    /**
+     * Build the attribute ackSet to that {@link #batchIndex} is false and others is true.
+     */
+    public void deleteFromAckSet(){

Review Comment:
   Good idea. A bug was found by adding this unit tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #16685: [improve][txn] PIP-160: Transaction log store enables the batch feature

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #16685:
URL: https://github.com/apache/pulsar/pull/16685#discussion_r926555248


##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java:
##########
@@ -233,6 +289,28 @@ public void start() {
         }
     }
 
+    public static List<TransactionMetadataEntry> deserializeEntry(ByteBuf buffer){
+        // Check whether it is batched Entry.
+        buffer.markReaderIndex();
+        short magicNum = buffer.readShort();
+        buffer.resetReaderIndex();
+        if (magicNum == BATCHED_ENTRY_DATA_PREFIX_MAGIC_NUMBER){
+            // skip version
+            buffer.skipBytes(4);

Review Comment:
   Already fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- commented on a diff in pull request #16685: [improve][txn] PIP-160: Transaction log store enables the batch feature

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on code in PR #16685:
URL: https://github.com/apache/pulsar/pull/16685#discussion_r926207942


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedLedgerMetricsTest.java:
##########
@@ -94,17 +97,24 @@ public void testManagedLedgerMetrics() throws Exception {
 
     @Test
     public void testTransactionTopic() throws Exception {
+        TxnLogBufferedWriterConfig txnLogBufferedWriterConfig = new TxnLogBufferedWriterConfig();
+        txnLogBufferedWriterConfig.setBatchEnabled(false);
+        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);
         admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
                 new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet("test")));
         admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
         createTransactionCoordinatorAssign();
         ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
         managedLedgerConfig.setMaxEntriesPerLedger(2);
-        new MLTransactionLogImpl(TransactionCoordinatorID.get(0),
-                pulsar.getManagedLedgerFactory(), managedLedgerConfig)
-                .initialize().join();
+        MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(TransactionCoordinatorID.get(0),
+                pulsar.getManagedLedgerFactory(), managedLedgerConfig, txnLogBufferedWriterConfig,
+                scheduledExecutorService);
+        mlTransactionLog.initialize().join();

Review Comment:
   It's better to add a timeout on the method or here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] congbobo184 commented on a diff in pull request #16685: [improve][txn] PIP-160: Transaction log store enables the batch feature

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on code in PR #16685:
URL: https://github.com/apache/pulsar/pull/16685#discussion_r925483775


##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java:
##########
@@ -283,4 +365,43 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
         }
 
     }
+
+    private static final FastThreadLocal<BatchedTransactionMetadataEntry> localBatchedTransactionLogCache =
+            new FastThreadLocal<>() {
+                @Override
+                protected BatchedTransactionMetadataEntry initialValue() throws Exception {
+                    return new BatchedTransactionMetadataEntry();
+                }
+            };
+
+    private static class TransactionLogDataSerializer
+            implements TxnLogBufferedWriter.DataSerializer<TransactionMetadataEntry>{
+
+        private static final TransactionLogDataSerializer INSTANCE = new TransactionLogDataSerializer();
+
+        @Override
+        public int getSerializedSize(TransactionMetadataEntry data) {
+            return data.getSerializedSize();
+        }
+
+        @Override
+        public ByteBuf serialize(TransactionMetadataEntry data) {
+            int transactionMetadataEntrySize = data.getSerializedSize();
+            ByteBuf buf =
+                    PulsarByteBufAllocator.DEFAULT.buffer(transactionMetadataEntrySize, transactionMetadataEntrySize);
+            data.writeTo(buf);
+            return buf;
+        }
+
+        @Override
+        public ByteBuf serialize(ArrayList<TransactionMetadataEntry> transactionLogArray) {
+            // Since all writes are in the same thread, so we can use threadLocal variables here.
+            BatchedTransactionMetadataEntry data = localBatchedTransactionLogCache.get();
+            data.addAllTransactionLogs(transactionLogArray);

Review Comment:
   clear



##########
pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java:
##########
@@ -55,12 +60,19 @@
 
 public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
 
+    private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);

Review Comment:
   1. It's better to add a single test that adds batch transaction log and then can open a new cursor for the transaction log managedLedger and read the batch log. then the TransactionCoordinator can recover successfully
   2. add a test for the log delete



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #16685: [improve][txn] PIP-160: Transaction log store enables the batch feature

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #16685:
URL: https://github.com/apache/pulsar/pull/16685#discussion_r926886084


##########
pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImplTest.java:
##########
@@ -0,0 +1,91 @@
+package org.apache.pulsar.transaction.coordinator.impl;
+
+import java.util.Arrays;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.pulsar.common.api.proto.Subscription;
+import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
+import org.apache.pulsar.transaction.coordinator.TransactionRecoverTracker;
+import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTracker;
+import org.apache.pulsar.transaction.coordinator.proto.TransactionMetadataEntry;
+import org.apache.pulsar.transaction.coordinator.test.MockedBookKeeperTestCase;
+import static org.mockito.Mockito.*;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+public class MLTransactionLogImplTest extends MockedBookKeeperTestCase {

Review Comment:
   > 1. The batched log can be replay even if the batch feature is disabled.
   > 2. The non-batched log can be replay after the batch feature is enabled.
   
   Already fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #16685: [improve][txn] PIP-160: Transaction log store enables the batch feature

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #16685:
URL: https://github.com/apache/pulsar/pull/16685#discussion_r926884869


##########
pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnBatchedPositionImplTest.java:
##########
@@ -0,0 +1,38 @@
+package org.apache.pulsar.transaction.coordinator.impl;
+
+import org.apache.pulsar.common.util.collections.BitSetRecyclable;
+import org.testng.Assert;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker")
+public class TxnBatchedPositionImplTest {
+
+    @DataProvider(name = "batchSizeAndBatchIndexArgsArray")
+    private Object[][] batchSizeAndBatchIndexArgsArray(){
+        Object[][] args = new Object[5][];
+        args[0] = new Object[]{10, 5};
+        args[1] = new Object[]{64, 0};
+        args[2] = new Object[]{64, 63};
+        args[3] = new Object[]{230, 120};
+        args[4] = new Object[]{256, 255};
+        return args;

Review Comment:
   Already fixed.



##########
pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImplTest.java:
##########
@@ -0,0 +1,91 @@
+package org.apache.pulsar.transaction.coordinator.impl;
+
+import java.util.Arrays;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.pulsar.common.api.proto.Subscription;
+import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
+import org.apache.pulsar.transaction.coordinator.TransactionRecoverTracker;
+import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTracker;
+import org.apache.pulsar.transaction.coordinator.proto.TransactionMetadataEntry;
+import org.apache.pulsar.transaction.coordinator.test.MockedBookKeeperTestCase;
+import static org.mockito.Mockito.*;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+public class MLTransactionLogImplTest extends MockedBookKeeperTestCase {
+
+    @DataProvider(name = "variedBufferedWriteConfigProvider")
+    private Object[][] variedBufferedWriteConfigProvider(){
+        Object[][] args = new Object[4][];
+        args[0] = new Object[]{true, true};
+        args[1] = new Object[]{false, false};
+        args[2] = new Object[]{true, false};
+        args[3] = new Object[]{false, true};
+        return args;

Review Comment:
   Already fixed.



##########
pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImplTest.java:
##########
@@ -0,0 +1,91 @@
+package org.apache.pulsar.transaction.coordinator.impl;
+
+import java.util.Arrays;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.pulsar.common.api.proto.Subscription;
+import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
+import org.apache.pulsar.transaction.coordinator.TransactionRecoverTracker;
+import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTracker;
+import org.apache.pulsar.transaction.coordinator.proto.TransactionMetadataEntry;
+import org.apache.pulsar.transaction.coordinator.test.MockedBookKeeperTestCase;
+import static org.mockito.Mockito.*;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+public class MLTransactionLogImplTest extends MockedBookKeeperTestCase {
+
+    @DataProvider(name = "variedBufferedWriteConfigProvider")
+    private Object[][] variedBufferedWriteConfigProvider(){
+        Object[][] args = new Object[4][];
+        args[0] = new Object[]{true, true};
+        args[1] = new Object[]{false, false};
+        args[2] = new Object[]{true, false};
+        args[3] = new Object[]{false, true};
+        return args;
+    }
+
+    @Test(dataProvider = "variedBufferedWriteConfigProvider")
+    public void test1(boolean writeWithBatch, boolean readWithBatch) throws Exception {
+        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);
+        TxnLogBufferedWriterConfig bufferedWriterConfig = new TxnLogBufferedWriterConfig();
+        TransactionCoordinatorID transactionCoordinatorID = TransactionCoordinatorID.get(0);
+        MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(TransactionCoordinatorID.get(0), factory,
+                new ManagedLedgerConfig(), bufferedWriterConfig, scheduledExecutorService);
+        mlTransactionLog.initialize().get(2, TimeUnit.SECONDS);
+
+        // Add logs: start transaction.
+        for (int i = 1; i <= 30; i++){
+            TransactionMetadataEntry transactionLog = new TransactionMetadataEntry();
+            transactionLog.setTxnidMostBits(i);
+            transactionLog.setTxnidLeastBits(i);
+            transactionLog.setMaxLocalTxnId(i);
+            transactionLog.setStartTime(i);
+            transactionLog.setTimeoutMs(i);
+            transactionLog.setMetadataOp(TransactionMetadataEntry.TransactionMetadataOp.NEW);
+            mlTransactionLog.append(transactionLog);
+        }
+        // Add logs: add partition.
+        for (int i = 1; i <= 30; i++){
+            TransactionMetadataEntry transactionLog = new TransactionMetadataEntry();
+            transactionLog.setTxnidLeastBits(i);
+            transactionLog.setMaxLocalTxnId(i);
+            transactionLog.setMetadataOp(TransactionMetadataEntry.TransactionMetadataOp.ADD_PARTITION);
+            transactionLog.addAllPartitions(Arrays.asList(String.valueOf(i)));
+            mlTransactionLog.append(transactionLog);
+        }
+        // Add logs: add subscription.
+        for (int i = 1; i <= 30; i++){
+            TransactionMetadataEntry transactionLog = new TransactionMetadataEntry();
+            transactionLog.setTxnidLeastBits(i);
+            transactionLog.setMaxLocalTxnId(i);
+            transactionLog.setMetadataOp(TransactionMetadataEntry.TransactionMetadataOp.ADD_SUBSCRIPTION);
+            Subscription subscription = new Subscription();
+            subscription.setSubscription(String.valueOf(i));
+            subscription.setTopic(String.valueOf(i));
+            transactionLog.addAllSubscriptions(Arrays.asList(subscription));
+            mlTransactionLog.append(transactionLog);
+        }
+        // Add logs: commit.
+        for (int i = 1; i <= 30; i++){
+            TransactionMetadataEntry transactionLog = new TransactionMetadataEntry();
+            transactionLog.setTxnidLeastBits(i);
+            transactionLog.setMaxLocalTxnId(i);
+            transactionLog.setMetadataOp(TransactionMetadataEntry.TransactionMetadataOp.UPDATE);
+            mlTransactionLog.append(transactionLog);
+        }
+        // Verify the mapping of position.
+
+
+        // Verify recover correct.
+        TransactionTimeoutTracker timeoutTracker = mock(TransactionTimeoutTracker.class);
+        MLTransactionSequenceIdGenerator sequenceIdGenerator = mock(MLTransactionSequenceIdGenerator.class);
+        TransactionRecoverTracker recoverTracker = mock(TransactionRecoverTracker.class);
+        // TODO 记录发送出去的请求

Review Comment:
   Already fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on pull request #16685: [improve][txn] PIP-160: Transaction log store enables the batch feature

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on PR #16685:
URL: https://github.com/apache/pulsar/pull/16685#issuecomment-1192057008

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org