You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bo...@apache.org on 2022/07/22 09:43:46 UTC

[pulsar] branch master updated: [improve] [txn] [PIP-160] Transaction log store enable the batch feature (#16685)

This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 4d16ad5c415 [improve] [txn] [PIP-160] Transaction log store enable the batch feature (#16685)
4d16ad5c415 is described below

commit 4d16ad5c415028afc8c52ff222481763e89a3dab
Author: fengyubiao <yu...@streamnative.io>
AuthorDate: Fri Jul 22 17:43:37 2022 +0800

    [improve] [txn] [PIP-160] Transaction log store enable the batch feature (#16685)
    
    Master Issue: #15370
    
    ### Motivation
    
    see #15370
    
    ### Modifications
    
    I will complete proposal #15370 with these pull requests( *current pull request is the step-3* ):
    
    1. Write the batch transaction log handler: `TxnLogBufferedWriter`
    2. Configuration changes and protocol changes.
    3. Transaction log store enables the batch feature.
    4. Pending ack log store enables the batch feature.
    5. Supports dynamic configuration.
    6. Append admin API for transaction batch log and docs( admin and configuration doc ).
    7. Append metrics support for transaction batch log.
---
 .../broker/TransactionMetadataStoreService.java    |  16 +-
 .../pulsar/broker/service/BrokerService.java       |   5 +
 .../broker/stats/ManagedLedgerMetricsTest.java     |  16 +-
 .../pulsar/broker/transaction/TransactionTest.java |   9 +-
 .../TransactionMetadataStoreProvider.java          |   5 +-
 .../InMemTransactionMetadataStoreProvider.java     |   5 +-
 .../coordinator/impl/MLTransactionLogImpl.java     | 155 +++++++-
 .../impl/MLTransactionMetadataStore.java           |   4 +-
 .../impl/MLTransactionMetadataStoreProvider.java   |   7 +-
 .../impl/MLTransactionSequenceIdGenerator.java     |  14 +-
 .../coordinator/impl/TxnBatchedPositionImpl.java   |  60 +++-
 .../coordinator/impl/TxnLogBufferedWriter.java     |   9 +-
 .../impl/TxnLogBufferedWriterConfig.java           |  30 ++
 .../MLTransactionMetadataStoreTest.java            | 158 ++++++---
 .../TransactionMetadataStoreProviderTest.java      |  13 +-
 .../coordinator/impl/MLTransactionLogImplTest.java | 391 +++++++++++++++++++++
 .../impl/TxnBatchedPositionImplTest.java           | 137 ++++++++
 17 files changed, 939 insertions(+), 95 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
index 2af680364f6..6f64164d46f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
@@ -35,6 +35,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
@@ -67,6 +68,7 @@ import org.apache.pulsar.transaction.coordinator.TxnMeta;
 import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.CoordinatorNotFoundException;
 import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.InvalidTxnStatusException;
 import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.TransactionMetadataStoreStateException;
+import org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriterConfig;
 import org.apache.pulsar.transaction.coordinator.proto.TxnStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -200,6 +202,17 @@ public class TransactionMetadataStoreService {
     }
 
     public CompletableFuture<TransactionMetadataStore> openTransactionMetadataStore(TransactionCoordinatorID tcId) {
+        final ScheduledExecutorService transactionLogBufferedWriteAsyncFlushTrigger =
+                pulsarService.getBrokerService().getTransactionTimer();
+        final ServiceConfiguration serviceConfiguration = pulsarService.getConfiguration();
+        final TxnLogBufferedWriterConfig txnLogBufferedWriterConfig = new TxnLogBufferedWriterConfig();
+        txnLogBufferedWriterConfig.setBatchEnabled(serviceConfiguration.isTransactionLogBatchedWriteEnabled());
+        txnLogBufferedWriterConfig
+                .setBatchedWriteMaxRecords(serviceConfiguration.getTransactionLogBatchedWriteMaxRecords());
+        txnLogBufferedWriterConfig.setBatchedWriteMaxSize(serviceConfiguration.getTransactionLogBatchedWriteMaxSize());
+        txnLogBufferedWriterConfig
+                .setBatchedWriteMaxDelayInMillis(serviceConfiguration.getTransactionLogBatchedWriteMaxDelayInMillis());
+
         return pulsarService.getBrokerService()
                 .getManagedLedgerConfig(getMLTransactionLogName(tcId)).thenCompose(v -> {
                             TransactionTimeoutTracker timeoutTracker = timeoutTrackerFactory.newTracker(tcId);
@@ -209,7 +222,8 @@ public class TransactionMetadataStoreService {
                             return transactionMetadataStoreProvider
                                     .openStore(tcId, pulsarService.getManagedLedgerFactory(), v,
                                             timeoutTracker, recoverTracker,
-                                            pulsarService.getConfig().getMaxActiveTransactionsPerCoordinator());
+                                            pulsarService.getConfig().getMaxActiveTransactionsPerCoordinator(),
+                                            txnLogBufferedWriterConfig, transactionLogBufferedWriteAsyncFlushTrigger);
                 });
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 8e701ebf5bf..63a3c3ba2ca 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -224,6 +224,9 @@ public class BrokerService implements Closeable {
     @Getter
     private final ScheduledExecutorService backlogQuotaChecker;
 
+    @Getter
+    private final ScheduledExecutorService transactionTimer;
+
     protected final AtomicReference<Semaphore> lookupRequestSemaphore;
     protected final AtomicReference<Semaphore> topicLoadRequestSemaphore;
 
@@ -344,6 +347,8 @@ public class BrokerService implements Closeable {
         this.backlogQuotaManager = new BacklogQuotaManager(pulsar);
         this.backlogQuotaChecker = Executors
                 .newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-backlog-quota-checker"));
+        this.transactionTimer = Executors
+                .newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-backlog-quota-checker"));
         this.authenticationService = new AuthenticationService(pulsar.getConfiguration());
         this.blockedDispatchers =
                 ConcurrentOpenHashSet.<PersistentDispatcherMultipleConsumers>newBuilder().build();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedLedgerMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedLedgerMetricsTest.java
index 6da6026b34f..403256cef30 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedLedgerMetricsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedLedgerMetricsTest.java
@@ -20,6 +20,8 @@ package org.apache.pulsar.broker.stats;
 
 import java.util.List;
 import java.util.Map.Entry;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import com.google.common.collect.Sets;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
@@ -34,6 +36,7 @@ import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.stats.Metrics;
 import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
 import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl;
+import org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriterConfig;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -94,17 +97,24 @@ public class ManagedLedgerMetricsTest extends BrokerTestBase {
 
     @Test
     public void testTransactionTopic() throws Exception {
+        TxnLogBufferedWriterConfig txnLogBufferedWriterConfig = new TxnLogBufferedWriterConfig();
+        txnLogBufferedWriterConfig.setBatchEnabled(false);
+        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);
         admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
                 new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet("test")));
         admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
         createTransactionCoordinatorAssign();
         ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
         managedLedgerConfig.setMaxEntriesPerLedger(2);
-        new MLTransactionLogImpl(TransactionCoordinatorID.get(0),
-                pulsar.getManagedLedgerFactory(), managedLedgerConfig)
-                .initialize().join();
+        MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(TransactionCoordinatorID.get(0),
+                pulsar.getManagedLedgerFactory(), managedLedgerConfig, txnLogBufferedWriterConfig,
+                scheduledExecutorService);
+        mlTransactionLog.initialize().get(2, TimeUnit.SECONDS);
         ManagedLedgerMetrics metrics = new ManagedLedgerMetrics(pulsar);
         metrics.generate();
+        // cleanup.
+        mlTransactionLog.closeAsync().get();
+        scheduledExecutorService.shutdown();
     }
 
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 30eded8eb25..e91a7f08cfb 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -130,6 +130,7 @@ import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTracker;
 import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl;
 import org.apache.pulsar.transaction.coordinator.impl.MLTransactionSequenceIdGenerator;
 import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
+import org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriterConfig;
 import org.awaitility.Awaitility;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -677,6 +678,8 @@ public class TransactionTest extends TransactionTestBase {
 
     @Test
     public void testEndTCRecoveringWhenManagerLedgerDisReadable() throws Exception{
+        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
+
         String topic = NAMESPACE1 + "/testEndTCRecoveringWhenManagerLedgerDisReadable";
         admin.topics().createNonPartitionedTopic(topic);
 
@@ -699,7 +702,8 @@ public class TransactionTest extends TransactionTestBase {
         persistentTopic.getManagedLedger().getConfig().setManagedLedgerInterceptor(mlTransactionSequenceIdGenerator);
         MLTransactionLogImpl mlTransactionLog =
                 new MLTransactionLogImpl(new TransactionCoordinatorID(1), null,
-                        persistentTopic.getManagedLedger().getConfig());
+                        persistentTopic.getManagedLedger().getConfig(), new TxnLogBufferedWriterConfig(),
+                        scheduledExecutorService);
         Class<MLTransactionLogImpl> mlTransactionLogClass = MLTransactionLogImpl.class;
         Field field = mlTransactionLogClass.getDeclaredField("cursor");
         field.setAccessible(true);
@@ -746,6 +750,9 @@ public class TransactionTest extends TransactionTestBase {
         metadataStore3.init(transactionRecoverTracker).get();
         Awaitility.await().untilAsserted(() ->
                 assertEquals(metadataStore3.getCoordinatorStats().state, "Ready"));
+
+        // cleanup.
+        scheduledExecutorService.shutdown();
     }
 
     @Test
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProvider.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProvider.java
index edcc42ded84..9521764fff9 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProvider.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProvider.java
@@ -22,8 +22,10 @@ import static com.google.common.base.Preconditions.checkArgument;
 import com.google.common.annotations.Beta;
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+import org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriterConfig;
 
 /**
  * A provider that provides {@link TransactionMetadataStore}.
@@ -68,5 +70,6 @@ public interface TransactionMetadataStoreProvider {
     CompletableFuture<TransactionMetadataStore> openStore(
             TransactionCoordinatorID transactionCoordinatorId, ManagedLedgerFactory managedLedgerFactory,
             ManagedLedgerConfig managedLedgerConfig, TransactionTimeoutTracker timeoutTracker,
-            TransactionRecoverTracker recoverTracker, long maxActiveTransactionsPerCoordinator);
+            TransactionRecoverTracker recoverTracker, long maxActiveTransactionsPerCoordinator,
+            TxnLogBufferedWriterConfig txnLogBufferedWriterConfig, ScheduledExecutorService scheduledExecutorService);
 }
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStoreProvider.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStoreProvider.java
index 8247aef4a88..a86ac1f3661 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStoreProvider.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStoreProvider.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.transaction.coordinator.impl;
 
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
@@ -38,7 +39,9 @@ public class InMemTransactionMetadataStoreProvider implements TransactionMetadat
                                                                  ManagedLedgerConfig managedLedgerConfig,
                                                                  TransactionTimeoutTracker timeoutTracker,
                                                                  TransactionRecoverTracker recoverTracker,
-                                                                 long maxActiveTransactionsPerCoordinator) {
+                                                                 long maxActiveTransactionsPerCoordinator,
+                                                                 TxnLogBufferedWriterConfig txnLogBufferedWriterConfig,
+                                                                 ScheduledExecutorService scheduledExecutorService) {
         return CompletableFuture.completedFuture(
             new InMemTransactionMetadataStore(transactionCoordinatorId));
     }
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
index f9fd728f24b..67d42296424 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
@@ -18,9 +18,16 @@
  */
 package org.apache.pulsar.transaction.coordinator.impl;
 
+import static org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriter.BATCHED_ENTRY_DATA_PREFIX_MAGIC_NUMBER;
+import static org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriter.BATCHED_ENTRY_DATA_PREFIX_MAGIC_NUMBER_LEN;
+import static org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriter.BATCHED_ENTRY_DATA_PREFIX_VERSION_LEN;
 import io.netty.buffer.ByteBuf;
+import io.netty.util.concurrent.FastThreadLocal;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.Entry;
@@ -31,15 +38,18 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerAlreadyClosedException;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.api.proto.CommandSubscribe;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.collections.BitSetRecyclable;
 import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
 import org.apache.pulsar.transaction.coordinator.TransactionLog;
 import org.apache.pulsar.transaction.coordinator.TransactionLogReplayCallback;
+import org.apache.pulsar.transaction.coordinator.proto.BatchedTransactionMetadataEntry;
 import org.apache.pulsar.transaction.coordinator.proto.TransactionMetadataEntry;
 import org.jctools.queues.MessagePassingQueue;
 import org.jctools.queues.SpscArrayQueue;
@@ -66,13 +76,26 @@ public class MLTransactionLogImpl implements TransactionLog {
 
     private final TopicName topicName;
 
+    private TxnLogBufferedWriter<TransactionMetadataEntry> bufferedWriter;
+
+    private final ScheduledExecutorService scheduledExecutorService;
+
+    private final TxnLogBufferedWriterConfig txnLogBufferedWriterConfig;
+
     public MLTransactionLogImpl(TransactionCoordinatorID tcID,
                                 ManagedLedgerFactory managedLedgerFactory,
-                                ManagedLedgerConfig managedLedgerConfig) {
+                                ManagedLedgerConfig managedLedgerConfig,
+                                TxnLogBufferedWriterConfig txnLogBufferedWriterConfig,
+                                ScheduledExecutorService scheduledExecutorService) {
         this.topicName = getMLTransactionLogName(tcID);
         this.tcId = tcID.getId();
         this.managedLedgerFactory = managedLedgerFactory;
         this.managedLedgerConfig = managedLedgerConfig;
+        this.scheduledExecutorService = scheduledExecutorService;
+        this.txnLogBufferedWriterConfig = txnLogBufferedWriterConfig;
+        if (txnLogBufferedWriterConfig.isBatchEnabled()) {
+            this.managedLedgerConfig.setDeletionAtBatchIndexLevelEnabled(true);
+        }
         this.entryQueue = new SpscArrayQueue<>(2000);
     }
 
@@ -90,6 +113,13 @@ public class MLTransactionLogImpl implements TransactionLog {
                     @Override
                     public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
                         MLTransactionLogImpl.this.managedLedger = ledger;
+                        MLTransactionLogImpl.this.bufferedWriter = new TxnLogBufferedWriter<>(
+                                managedLedger, ((ManagedLedgerImpl) managedLedger).getExecutor(),
+                                scheduledExecutorService, TransactionLogDataSerializer.INSTANCE,
+                                txnLogBufferedWriterConfig.getBatchedWriteMaxRecords(),
+                                txnLogBufferedWriterConfig.getBatchedWriteMaxSize(),
+                                txnLogBufferedWriterConfig.getBatchedWriteMaxDelayInMillis(),
+                                txnLogBufferedWriterConfig.isBatchEnabled());
 
                         managedLedger.asyncOpenCursor(TRANSACTION_SUBSCRIPTION_NAME,
                                 CommandSubscribe.InitialPosition.Earliest, new AsyncCallbacks.OpenCursorCallback() {
@@ -133,10 +163,12 @@ public class MLTransactionLogImpl implements TransactionLog {
             public void closeComplete(Object ctx) {
                 log.info("Transaction log with tcId : {} close managedLedger successful!", tcId);
                 completableFuture.complete(null);
+                bufferedWriter.close();
             }
 
             @Override
             public void closeFailed(ManagedLedgerException exception, Object ctx) {
+                // If close managed ledger failure, should not close buffered writer.
                 log.error("Transaction log with tcId : {} close managedLedger fail!", tcId);
                 completableFuture.completeExceptionally(exception);
             }
@@ -147,14 +179,10 @@ public class MLTransactionLogImpl implements TransactionLog {
 
     @Override
     public CompletableFuture<Position> append(TransactionMetadataEntry transactionMetadataEntry) {
-        int transactionMetadataEntrySize = transactionMetadataEntry.getSerializedSize();
-        ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(transactionMetadataEntrySize, transactionMetadataEntrySize);
         CompletableFuture<Position> completableFuture = new CompletableFuture<>();
-        transactionMetadataEntry.writeTo(buf);
-        managedLedger.asyncAddEntry(buf, new AsyncCallbacks.AddEntryCallback() {
+        bufferedWriter.asyncAddData(transactionMetadataEntry, new TxnLogBufferedWriter.AddDataCallback() {
             @Override
-            public void addComplete(Position position, ByteBuf entryData, Object ctx) {
-                buf.release();
+            public void addComplete(Position position, Object context) {
                 completableFuture.complete(position);
             }
 
@@ -164,7 +192,6 @@ public class MLTransactionLogImpl implements TransactionLog {
                 if (exception instanceof ManagedLedgerAlreadyClosedException) {
                     managedLedger.readyToCreateNewLedger();
                 }
-                buf.release();
                 completableFuture.completeExceptionally(exception);
             }
         }, null);
@@ -174,6 +201,12 @@ public class MLTransactionLogImpl implements TransactionLog {
 
     public CompletableFuture<Void> deletePosition(List<Position> positions) {
         CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+        // Change the flag in ackSet to deleted.
+        for (Position position : positions) {
+            if (position instanceof TxnBatchedPositionImpl batchedPosition){
+                batchedPosition.setAckSetByIndex();
+            }
+        }
         this.cursor.asyncDelete(positions, new AsyncCallbacks.DeleteCallback() {
             @Override
             public void deleteComplete(Object position) {
@@ -209,15 +242,42 @@ public class MLTransactionLogImpl implements TransactionLog {
         }
 
         public void start() {
-            TransactionMetadataEntry transactionMetadataEntry = new TransactionMetadataEntry();
-
             while (fillEntryQueueCallback.fillQueue() || entryQueue.size() > 0) {
                 Entry entry = entryQueue.poll();
                 if (entry != null) {
                     try {
-                        ByteBuf buffer = entry.getDataBuffer();
-                        transactionMetadataEntry.parseFrom(buffer, buffer.readableBytes());
-                        transactionLogReplayCallback.handleMetadataEntry(entry.getPosition(), transactionMetadataEntry);
+                        List<TransactionMetadataEntry> logs = deserializeEntry(entry);
+                        if (logs.isEmpty()){
+                            continue;
+                        } else if (logs.size() == 1){
+                            TransactionMetadataEntry log = logs.get(0);
+                            transactionLogReplayCallback.handleMetadataEntry(entry.getPosition(), log);
+                        } else {
+                            /**
+                             * 1. Query batch index of current entry from cursor.
+                             * 2. Filter the data which has already ack.
+                             * 3. Build batched position and handle valid data.
+                             */
+                            long[] ackSetAlreadyAck = cursor.getDeletedBatchIndexesAsLongArray(
+                                    PositionImpl.get(entry.getLedgerId(), entry.getEntryId()));
+                            BitSetRecyclable bitSetAlreadyAck = null;
+                            if (ackSetAlreadyAck != null){
+                                bitSetAlreadyAck = BitSetRecyclable.valueOf(ackSetAlreadyAck);
+                            }
+                            int batchSize = logs.size();
+                            for (int batchIndex = 0; batchIndex < batchSize; batchIndex++){
+                                if (bitSetAlreadyAck != null && !bitSetAlreadyAck.get(batchIndex)){
+                                   continue;
+                                }
+                                TransactionMetadataEntry log = logs.get(batchIndex);
+                                TxnBatchedPositionImpl batchedPosition = new TxnBatchedPositionImpl(entry.getLedgerId(),
+                                        entry.getEntryId(), batchSize, batchIndex);
+                                transactionLogReplayCallback.handleMetadataEntry(batchedPosition, log);
+                            }
+                            if (ackSetAlreadyAck != null){
+                                bitSetAlreadyAck.recycle();
+                            }
+                        }
                     } finally {
                         entry.release();
                     }
@@ -233,6 +293,28 @@ public class MLTransactionLogImpl implements TransactionLog {
         }
     }
 
+    public static List<TransactionMetadataEntry> deserializeEntry(ByteBuf buffer){
+        // Check whether it is batched Entry.
+        buffer.markReaderIndex();
+        short magicNum = buffer.readShort();
+        buffer.resetReaderIndex();
+        if (magicNum == BATCHED_ENTRY_DATA_PREFIX_MAGIC_NUMBER){
+            // skip magic num and version mark.
+            buffer.skipBytes(BATCHED_ENTRY_DATA_PREFIX_MAGIC_NUMBER_LEN + BATCHED_ENTRY_DATA_PREFIX_VERSION_LEN);
+            BatchedTransactionMetadataEntry batchedLog = new BatchedTransactionMetadataEntry();
+            batchedLog.parseFrom(buffer, buffer.readableBytes());
+            return batchedLog.getTransactionLogsList();
+        } else {
+            TransactionMetadataEntry log = new TransactionMetadataEntry();
+            log.parseFrom(buffer, buffer.readableBytes());
+            return Collections.singletonList(log);
+        }
+    }
+
+    public static List<TransactionMetadataEntry> deserializeEntry(Entry entry){
+        return deserializeEntry(entry.getDataBuffer());
+    }
+
     class FillEntryQueueCallback implements AsyncCallbacks.ReadEntriesCallback {
 
         private final AtomicLong outstandingReadsRequests = new AtomicLong(0);
@@ -283,4 +365,51 @@ public class MLTransactionLogImpl implements TransactionLog {
         }
 
     }
+
+    /**
+     * Used only for buffered writer. Since all cmd-writes in buffered writer are in the same thread, so we can use
+     * threadLocal variables here. Why need to be on the same thread ?
+     * Because {@link BatchedTransactionMetadataEntry#clear()} will modifies the elements in the attribute
+     * {@link BatchedTransactionMetadataEntry#getTransactionLogsList()} ()}, this will cause problems by multi-thread
+     * write.
+     */
+    private static final FastThreadLocal<BatchedTransactionMetadataEntry> localBatchedTransactionLogCache =
+            new FastThreadLocal<>() {
+                @Override
+                protected BatchedTransactionMetadataEntry initialValue() throws Exception {
+                    return new BatchedTransactionMetadataEntry();
+                }
+            };
+
+    private static class TransactionLogDataSerializer
+            implements TxnLogBufferedWriter.DataSerializer<TransactionMetadataEntry>{
+
+        private static final TransactionLogDataSerializer INSTANCE = new TransactionLogDataSerializer();
+
+        @Override
+        public int getSerializedSize(TransactionMetadataEntry data) {
+            return data.getSerializedSize();
+        }
+
+        @Override
+        public ByteBuf serialize(TransactionMetadataEntry data) {
+            int transactionMetadataEntrySize = data.getSerializedSize();
+            ByteBuf buf =
+                    PulsarByteBufAllocator.DEFAULT.buffer(transactionMetadataEntrySize, transactionMetadataEntrySize);
+            data.writeTo(buf);
+            return buf;
+        }
+
+        @Override
+        public ByteBuf serialize(ArrayList<TransactionMetadataEntry> transactionLogArray) {
+            // Since all writes are in the same thread, so we can use threadLocal variables here.
+            BatchedTransactionMetadataEntry data = localBatchedTransactionLogCache.get();
+            data.clear();
+            data.addAllTransactionLogs(transactionLogArray);
+            int bytesSize = data.getSerializedSize();
+            ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(bytesSize, bytesSize);
+            data.writeTo(buf);
+            return buf;
+        }
+    }
 }
\ No newline at end of file
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
index a35ba975851..65e9a654a78 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.transaction.coordinator.impl;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.MoreExecutors;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import java.time.Duration;
@@ -67,7 +68,8 @@ public class MLTransactionMetadataStore
 
     private final TransactionCoordinatorID tcID;
     private final MLTransactionLogImpl transactionLog;
-    private final ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>> txnMetaMap = new ConcurrentSkipListMap<>();
+    @VisibleForTesting
+    final ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>> txnMetaMap = new ConcurrentSkipListMap<>();
     private final TransactionTimeoutTracker timeoutTracker;
     private final TransactionMetadataStoreStats transactionMetadataStoreStats;
     private final LongAdder createdTransactionCount;
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java
index fe887aacf9e..919fb934cc9 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.transaction.coordinator.impl;
 
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
@@ -42,11 +43,13 @@ public class MLTransactionMetadataStoreProvider implements TransactionMetadataSt
                                                                  ManagedLedgerConfig managedLedgerConfig,
                                                                  TransactionTimeoutTracker timeoutTracker,
                                                                  TransactionRecoverTracker recoverTracker,
-                                                                 long maxActiveTransactionsPerCoordinator) {
+                                                                 long maxActiveTransactionsPerCoordinator,
+                                                                 TxnLogBufferedWriterConfig txnLogBufferedWriterConfig,
+                                                                 ScheduledExecutorService scheduledExecutorService) {
         MLTransactionSequenceIdGenerator mlTransactionSequenceIdGenerator = new MLTransactionSequenceIdGenerator();
         managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionSequenceIdGenerator);
         MLTransactionLogImpl txnLog = new MLTransactionLogImpl(transactionCoordinatorId,
-                managedLedgerFactory, managedLedgerConfig);
+                managedLedgerFactory, managedLedgerConfig, txnLogBufferedWriterConfig, scheduledExecutorService);
 
         // MLTransactionLogInterceptor will init sequenceId and update the sequenceId to managedLedger properties.
         return txnLog.initialize().thenCompose(__ ->
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionSequenceIdGenerator.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionSequenceIdGenerator.java
index 4717bdf389e..505e1f05d12 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionSequenceIdGenerator.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionSequenceIdGenerator.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pulsar.transaction.coordinator.impl;
 
-import io.netty.buffer.ByteBuf;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicLong;
@@ -26,6 +26,7 @@ import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.client.api.LedgerEntry;
 import org.apache.bookkeeper.mledger.impl.OpAddEntry;
 import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
+import org.apache.commons.collections4.CollectionUtils;
 import org.apache.pulsar.transaction.coordinator.proto.TransactionMetadataEntry;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -71,10 +72,13 @@ public class MLTransactionSequenceIdGenerator implements ManagedLedgerIntercepto
                         try {
                             LedgerEntry ledgerEntry = entries.getEntry(lh.getLastAddConfirmed());
                             if (ledgerEntry != null) {
-                                TransactionMetadataEntry lastConfirmEntry = new TransactionMetadataEntry();
-                                ByteBuf buffer = ledgerEntry.getEntryBuffer();
-                                lastConfirmEntry.parseFrom(buffer, buffer.readableBytes());
-                                this.sequenceId.set(lastConfirmEntry.getMaxLocalTxnId());
+                                List<TransactionMetadataEntry> transactionLogs =
+                                        MLTransactionLogImpl.deserializeEntry(ledgerEntry.getEntryBuffer());
+                                if (!CollectionUtils.isEmpty(transactionLogs)){
+                                    TransactionMetadataEntry lastConfirmEntry =
+                                            transactionLogs.get(transactionLogs.size() - 1);
+                                    this.sequenceId.set(lastConfirmEntry.getMaxLocalTxnId());
+                                }
                             }
                             entries.close();
                             promise.complete(null);
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnBatchedPositionImpl.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnBatchedPositionImpl.java
index e5e9b60cfe8..5a813941076 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnBatchedPositionImpl.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnBatchedPositionImpl.java
@@ -18,10 +18,10 @@
  */
 package org.apache.pulsar.transaction.coordinator.impl;
 
-import java.util.Objects;
 import lombok.Getter;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.common.util.collections.BitSetRecyclable;
 
 /***
  * The difference with {@link PositionImpl} is that there are two more parameters:
@@ -37,23 +37,67 @@ public class TxnBatchedPositionImpl extends PositionImpl {
     @Getter
     private final int batchIndex;
 
-    public TxnBatchedPositionImpl(Position position, int batchSize, int batchIndex, long[] ackSet){
-        super(position.getLedgerId(), position.getEntryId(), ackSet);
+    public TxnBatchedPositionImpl(long ledgerId, long entryId, int batchSize, int batchIndex){
+        super(ledgerId, entryId);
         this.batchIndex = batchIndex;
         this.batchSize = batchSize;
     }
 
+    public TxnBatchedPositionImpl(Position position, int batchSize, int batchIndex){
+        this(position.getLedgerId(), position.getEntryId(), batchSize, batchIndex);
+    }
+
+    /**
+     * It's exactly the same as {@link PositionImpl},make sure that when {@link TxnBatchedPositionImpl} used as the key
+     * of map same as {@link PositionImpl}. {@link #batchSize} and {@link #batchIndex} should not be involved in
+     * calculate, just like {@link PositionImpl#ackSet} is not involved in calculate.
+     * Note: In {@link java.util.concurrent.ConcurrentSkipListMap}, it use the {@link Comparable#compareTo(Object)} to
+     *   determine whether the keys are the same. In {@link java.util.HashMap}, it use the
+     *   {@link Object#hashCode()} & {@link  Object#equals(Object)} to determine whether the keys are the same.
+     */
     @Override
     public boolean equals(Object o) {
-        if (o instanceof TxnBatchedPositionImpl other) {
-            return super.equals(o) && batchSize == other.batchSize && batchIndex == other.batchIndex;
-        }
-        return false;
+        return super.equals(o);
 
     }
 
+    /**
+     * It's exactly the same as {@link PositionImpl},make sure that when {@link TxnBatchedPositionImpl} used as the key
+     * of map same as {@link PositionImpl}. {@link #batchSize} and {@link #batchIndex} should not be involved in
+     * calculate, just like {@link PositionImpl#ackSet} is not involved in calculate.
+     * Note: In {@link java.util.concurrent.ConcurrentSkipListMap}, it use the {@link Comparable#compareTo(Object)} to
+     *   determine whether the keys are the same. In {@link java.util.HashMap}, it use the
+     *   {@link Object#hashCode()} & {@link  Object#equals(Object)} to determine whether the keys are the same.
+     */
     @Override
     public int hashCode() {
-        return Objects.hash(super.hashCode(), batchSize, batchIndex);
+        return super.hashCode();
+    }
+
+    /**
+     * It's exactly the same as {@link PositionImpl},to make sure that when compare to the "markDeletePosition", it
+     * looks like {@link PositionImpl}. {@link #batchSize} and {@link #batchIndex} should not be involved in calculate,
+     * just like {@link PositionImpl#ackSet} is not involved in calculate.
+     * Note: In {@link java.util.concurrent.ConcurrentSkipListMap}, it use the {@link Comparable#compareTo(Object)} to
+     *    determine whether the keys are the same. In {@link java.util.HashMap}, it use the
+     *    {@link Object#hashCode()} & {@link  Object#equals(Object)} to determine whether the keys are the same.
+     */
+    public int compareTo(PositionImpl that) {
+        return super.compareTo(that);
+    }
+
+    /**
+     * Build the attribute ackSet to that {@link #batchIndex} is false and others is true.
+     */
+    public void setAckSetByIndex(){
+        if (batchSize == 1){
+            return;
+        }
+        BitSetRecyclable bitSetRecyclable = BitSetRecyclable.create();
+        bitSetRecyclable.set(0, batchSize, true);
+        bitSetRecyclable.clear(batchIndex);
+        long[] ackSet = bitSetRecyclable.toLongArray();
+        bitSetRecyclable.recycle();
+        setAckSet(ackSet);
     }
 }
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java
index b3e4a7a8641..ac1b28324cf 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java
@@ -39,7 +39,6 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
-import org.apache.pulsar.common.util.collections.BitSetRecyclable;
 
 /***
  * See PIP-160: https://github.com/apache/pulsar/issues/15516.
@@ -58,8 +57,10 @@ import org.apache.pulsar.common.util.collections.BitSetRecyclable;
 public class TxnLogBufferedWriter<T> implements AsyncCallbacks.AddEntryCallback, Closeable {
 
     public static final short BATCHED_ENTRY_DATA_PREFIX_MAGIC_NUMBER = 0x0e01;
+    public static final int BATCHED_ENTRY_DATA_PREFIX_MAGIC_NUMBER_LEN = 2;
 
     public static final short BATCHED_ENTRY_DATA_PREFIX_VERSION = 1;
+    public static final short BATCHED_ENTRY_DATA_PREFIX_VERSION_LEN = 2;
 
     private static final ManagedLedgerException BUFFERED_WRITER_CLOSED_EXCEPTION =
             new ManagedLedgerException.ManagedLedgerFencedException(
@@ -321,12 +322,8 @@ public class TxnLogBufferedWriter<T> implements AsyncCallbacks.AddEntryCallback,
             final int batchSize = flushContext.asyncAddArgsList.size();
             for (int batchIndex = 0; batchIndex < batchSize; batchIndex++) {
                 final AsyncAddArgs asyncAddArgs = flushContext.asyncAddArgsList.get(batchIndex);
-                BitSetRecyclable bitSetRecyclable = BitSetRecyclable.create();
-                bitSetRecyclable.set(batchIndex);
-                long[] ackSet = bitSetRecyclable.toLongArray();
-                bitSetRecyclable.recycle();
                 final TxnBatchedPositionImpl txnBatchedPosition = new TxnBatchedPositionImpl(position, batchSize,
-                        batchIndex, ackSet);
+                        batchIndex);
                 // Because this task already running at ordered task, so just "run".
                 try {
                     asyncAddArgs.callback.addComplete(txnBatchedPosition, asyncAddArgs.ctx);
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterConfig.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterConfig.java
new file mode 100644
index 00000000000..a056fd1bb9c
--- /dev/null
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterConfig.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator.impl;
+
+import lombok.Data;
+
+@Data
+public class TxnLogBufferedWriterConfig {
+
+    private int batchedWriteMaxRecords = 512;
+    private int batchedWriteMaxSize = 1024 * 1024 * 4;
+    private int batchedWriteMaxDelayInMillis = 1;
+    private boolean batchEnabled = false;
+}
diff --git a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
index 70365b28b37..bcfecfeb780 100644
--- a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
+++ b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
@@ -18,6 +18,9 @@
  */
 package org.apache.pulsar.transaction.coordinator;
 
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
@@ -28,15 +31,18 @@ import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException;
 import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.TransactionNotFoundException;
 import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl;
 import org.apache.pulsar.transaction.coordinator.impl.MLTransactionSequenceIdGenerator;
 import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
+import org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriterConfig;
 import org.apache.pulsar.transaction.coordinator.proto.TxnStatus;
 import org.apache.pulsar.transaction.coordinator.test.MockedBookKeeperTestCase;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
+import org.testng.annotations.AfterClass;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
@@ -55,12 +61,19 @@ import static org.testng.Assert.fail;
 
 public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
 
+    private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);
+
     public MLTransactionMetadataStoreTest() {
         super(3);
     }
 
-    @Test
-    public void testTransactionOperation() throws Exception {
+    @AfterClass
+    public void cleanup(){
+        scheduledExecutorService.shutdown();
+    }
+
+    @Test(dataProvider = "bufferedWriterConfigDataProvider")
+    public void testTransactionOperation(TxnLogBufferedWriterConfig txnLogBufferedWriterConfig) throws Exception {
         ManagedLedgerFactoryConfig factoryConf = new ManagedLedgerFactoryConfig();
         factoryConf.setMaxCacheSize(0);
 
@@ -71,8 +84,8 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
         MLTransactionSequenceIdGenerator mlTransactionSequenceIdGenerator = new MLTransactionSequenceIdGenerator();
         managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionSequenceIdGenerator);
         MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory,
-                managedLedgerConfig);
-        mlTransactionLog.initialize().join();
+                managedLedgerConfig, txnLogBufferedWriterConfig, scheduledExecutorService);
+        mlTransactionLog.initialize().get(2, TimeUnit.SECONDS);
         MLTransactionMetadataStore transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
                         new TransactionTimeoutTrackerImpl(),
@@ -131,13 +144,25 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
 
     @DataProvider(name = "isUseManagedLedgerProperties")
     public Object[][] versions() {
-        return new Object[][] { { true }, { false } };
+        return new Object[][] { { true }, { false }};
+    }
+
+    @DataProvider(name = "bufferedWriterConfigDataProvider")
+    public Object[][] bufferedWriterConfigDataProvider() {
+        TxnLogBufferedWriterConfig disabled = new TxnLogBufferedWriterConfig();
+        disabled.setBatchEnabled(false);
+        TxnLogBufferedWriterConfig enabled = new TxnLogBufferedWriterConfig();
+        enabled.setBatchEnabled(true);
+        enabled.setBatchedWriteMaxRecords(3);
+        enabled.setBatchedWriteMaxDelayInMillis(1);
+        return new Object[][] { { enabled }, { disabled } };
     }
 
     @Test(dataProvider = "isUseManagedLedgerProperties")
     public void testRecoverSequenceId(boolean isUseManagedLedgerProperties) throws Exception {
         ManagedLedgerFactoryConfig factoryConf = new ManagedLedgerFactoryConfig();
         factoryConf.setMaxCacheSize(0);
+        TxnLogBufferedWriterConfig disabledBufferedWriter = new TxnLogBufferedWriterConfig();
 
         @Cleanup("shutdown")
         ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc, factoryConf);
@@ -147,8 +172,8 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
         managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionSequenceIdGenerator);
         managedLedgerConfig.setMaxEntriesPerLedger(3);
         MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory,
-                managedLedgerConfig);
-        mlTransactionLog.initialize().join();
+                managedLedgerConfig, disabledBufferedWriter, scheduledExecutorService);
+        mlTransactionLog.initialize().get(2, TimeUnit.SECONDS);
         MLTransactionMetadataStore transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
                         new TransactionTimeoutTrackerImpl(), mlTransactionSequenceIdGenerator, 0L);
@@ -175,10 +200,10 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
                 Assert.assertEquals(managedLedger.getState(), ClosedLedger);
             });
         }
-        mlTransactionLog.closeAsync().get();
+        mlTransactionLog.closeAsync().get(2, TimeUnit.SECONDS);
         mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory,
-                managedLedgerConfig);
-        mlTransactionLog.initialize().join();
+                managedLedgerConfig, disabledBufferedWriter, scheduledExecutorService);
+        mlTransactionLog.initialize().get(2, TimeUnit.SECONDS);
         transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
                         new TransactionTimeoutTrackerImpl(), mlTransactionSequenceIdGenerator, 0L);
@@ -189,8 +214,12 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
         assertEquals(txnID.getLeastSigBits(), 1);
     }
 
-    @Test
-    public void testInitTransactionReader() throws Exception {
+    /***
+     * Verify transaction meta store write and read correct.
+     * TODO After the batch feature is dynamically switched,append tests that contain both batch and non-batch data.
+     */
+    @Test(dataProvider = "bufferedWriterConfigDataProvider")
+    public void testInitTransactionReader(TxnLogBufferedWriterConfig txnLogBufferedWriterConfig) throws Exception {
         ManagedLedgerFactoryConfig factoryConf = new ManagedLedgerFactoryConfig();
         factoryConf.setMaxCacheSize(0);
 
@@ -202,8 +231,8 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
         MLTransactionSequenceIdGenerator mlTransactionSequenceIdGenerator = new MLTransactionSequenceIdGenerator();
         managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionSequenceIdGenerator);
         MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory,
-                managedLedgerConfig);
-        mlTransactionLog.initialize().join();
+                managedLedgerConfig, txnLogBufferedWriterConfig, scheduledExecutorService);
+        mlTransactionLog.initialize().get(2, TimeUnit.SECONDS);
 
         MLTransactionMetadataStore transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
@@ -216,8 +245,10 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
                 break;
             }
             if (transactionMetadataStore.checkIfReady()) {
-                TxnID txnID1 = transactionMetadataStore.newTransaction(1000).get();
-                TxnID txnID2 = transactionMetadataStore.newTransaction(1000).get();
+                CompletableFuture<TxnID> txIDFuture1 = transactionMetadataStore.newTransaction(1000);
+                CompletableFuture<TxnID> txIDFuture2 = transactionMetadataStore.newTransaction(1000);
+                TxnID txnID1 = txIDFuture1.get();
+                TxnID txnID2 = txIDFuture2.get();
                 assertEquals(transactionMetadataStore.getTxnStatus(txnID1).get(), TxnStatus.OPEN);
                 assertEquals(transactionMetadataStore.getTxnStatus(txnID2).get(), TxnStatus.OPEN);
 
@@ -231,23 +262,29 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
                 subscriptions.add(new TransactionSubscription("topic1", "sub1"));
                 subscriptions.add(new TransactionSubscription("topic2", "sub2"));
 
-                transactionMetadataStore.addAckedPartitionToTxn(txnID1, subscriptions).get();
-                transactionMetadataStore.addAckedPartitionToTxn(txnID2, subscriptions).get();
+                List<CompletableFuture<?>> futureList = new ArrayList<>();
+                futureList.add(transactionMetadataStore.addAckedPartitionToTxn(txnID1, subscriptions));
+                futureList.add(transactionMetadataStore.addAckedPartitionToTxn(txnID2, subscriptions));
+                FutureUtil.waitForAll(futureList).get();
+
                 List<TransactionSubscription> subscriptions1 = new ArrayList<>();
                 subscriptions1.add(new TransactionSubscription("topic1", "sub1"));
                 subscriptions1.add(new TransactionSubscription("topic3", "sub3"));
                 subscriptions1.add(new TransactionSubscription("topic3", "sub3"));
-                transactionMetadataStore.addAckedPartitionToTxn(txnID1, subscriptions1).get();
-                transactionMetadataStore.addAckedPartitionToTxn(txnID2, subscriptions1).get();
-
-                transactionMetadataStore.updateTxnStatus(txnID1, TxnStatus.COMMITTING, TxnStatus.OPEN, false).get();
-                transactionMetadataStore.updateTxnStatus(txnID2, TxnStatus.COMMITTING, TxnStatus.OPEN, false).get();
-
+                futureList.add(transactionMetadataStore.addAckedPartitionToTxn(txnID1, subscriptions1));
+                futureList.add(transactionMetadataStore.addAckedPartitionToTxn(txnID2, subscriptions1));
+                FutureUtil.waitForAll(futureList).get();
+
+                futureList.add(transactionMetadataStore.updateTxnStatus(txnID1, TxnStatus.COMMITTING, TxnStatus.OPEN,
+                        false));
+                futureList.add(transactionMetadataStore.updateTxnStatus(txnID2, TxnStatus.COMMITTING, TxnStatus.OPEN,
+                        false));
+                FutureUtil.waitForAll(futureList).get();
                 transactionMetadataStore.closeAsync();
 
                 MLTransactionLogImpl txnLog2 = new MLTransactionLogImpl(transactionCoordinatorID, factory,
-                        managedLedgerConfig);
-                txnLog2.initialize().join();
+                        managedLedgerConfig, txnLogBufferedWriterConfig, scheduledExecutorService);
+                txnLog2.initialize().get(2, TimeUnit.SECONDS);
 
                 MLTransactionMetadataStore transactionMetadataStoreTest =
                         new MLTransactionMetadataStore(transactionCoordinatorID,
@@ -304,8 +341,12 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
         }
     }
 
-    @Test
-    public void testDeleteLog() throws Exception {
+    /***
+     * Verify transaction meta store delete logs after commit/abort correct.
+     * TODO After the batch feature is dynamically switched,append tests that contain both batch and non-batch data.
+     */
+    @Test(dataProvider = "bufferedWriterConfigDataProvider")
+    public void testDeleteLog(TxnLogBufferedWriterConfig txnLogBufferedWriterConfig) throws Exception {
         ManagedLedgerFactoryConfig factoryConf = new ManagedLedgerFactoryConfig();
         factoryConf.setMaxCacheSize(0);
 
@@ -316,8 +357,8 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
         MLTransactionSequenceIdGenerator mlTransactionSequenceIdGenerator = new MLTransactionSequenceIdGenerator();
         managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionSequenceIdGenerator);
         MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory,
-                managedLedgerConfig);
-        mlTransactionLog.initialize().join();
+                managedLedgerConfig, txnLogBufferedWriterConfig, scheduledExecutorService);
+        mlTransactionLog.initialize().get(2, TimeUnit.SECONDS);
         MLTransactionMetadataStore transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
                         new TransactionTimeoutTrackerImpl(), mlTransactionSequenceIdGenerator, 0L);
@@ -329,35 +370,44 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
                 break;
             }
             if (transactionMetadataStore.checkIfReady()) {
-                TxnID txnID1 = transactionMetadataStore.newTransaction(1000).get();
-                TxnID txnID2 = transactionMetadataStore.newTransaction(1000).get();
+                CompletableFuture<TxnID> txIDFuture1 = transactionMetadataStore.newTransaction(1000);
+                CompletableFuture<TxnID> txIDFuture2 = transactionMetadataStore.newTransaction(1000);
+                TxnID txnID1 = txIDFuture1.get();
+                TxnID txnID2 = txIDFuture2.get();
                 assertEquals(transactionMetadataStore.getTxnStatus(txnID1).get(), TxnStatus.OPEN);
                 assertEquals(transactionMetadataStore.getTxnStatus(txnID2).get(), TxnStatus.OPEN);
 
+                List<CompletableFuture<?>> futureList = new ArrayList<>();
                 List<String> partitions = new ArrayList<>();
                 partitions.add("pt-1");
                 partitions.add("pt-2");
-                transactionMetadataStore.addProducedPartitionToTxn(txnID1, partitions).get();
-                transactionMetadataStore.addProducedPartitionToTxn(txnID2, partitions).get();
+                futureList.add(transactionMetadataStore.addProducedPartitionToTxn(txnID1, partitions));
+                futureList.add(transactionMetadataStore.addProducedPartitionToTxn(txnID2, partitions));
+                FutureUtil.waitForAll(futureList).get();
 
                 List<TransactionSubscription> subscriptions = new ArrayList<>();
                 subscriptions.add(new TransactionSubscription("topic1", "sub1"));
                 subscriptions.add(new TransactionSubscription("topic2", "sub2"));
 
-                transactionMetadataStore.addAckedPartitionToTxn(txnID1, subscriptions).get();
-                transactionMetadataStore.addAckedPartitionToTxn(txnID2, subscriptions).get();
+                futureList.add(transactionMetadataStore.addAckedPartitionToTxn(txnID1, subscriptions));
+                futureList.add(transactionMetadataStore.addAckedPartitionToTxn(txnID2, subscriptions));
+                FutureUtil.waitForAll(futureList).get();
+
                 List<TransactionSubscription> subscriptions1 = new ArrayList<>();
                 subscriptions1.add(new TransactionSubscription("topic1", "sub1"));
                 subscriptions1.add(new TransactionSubscription("topic3", "sub3"));
                 subscriptions1.add(new TransactionSubscription("topic3", "sub3"));
-                transactionMetadataStore.addAckedPartitionToTxn(txnID1, subscriptions1).get();
-                transactionMetadataStore.addAckedPartitionToTxn(txnID2, subscriptions1).get();
+                futureList.add(transactionMetadataStore.addAckedPartitionToTxn(txnID1, subscriptions1));
+                futureList.add(transactionMetadataStore.addAckedPartitionToTxn(txnID2, subscriptions1));
+                FutureUtil.waitForAll(futureList).get();
+
+                futureList.add(transactionMetadataStore.updateTxnStatus(txnID1, TxnStatus.COMMITTING, TxnStatus.OPEN, false));
+                futureList.add(transactionMetadataStore.updateTxnStatus(txnID2, TxnStatus.ABORTING, TxnStatus.OPEN, false));
 
-                transactionMetadataStore.updateTxnStatus(txnID1, TxnStatus.COMMITTING, TxnStatus.OPEN, false).get();
-                transactionMetadataStore.updateTxnStatus(txnID2, TxnStatus.ABORTING, TxnStatus.OPEN, false).get();
+                futureList.add(transactionMetadataStore.updateTxnStatus(txnID1, TxnStatus.COMMITTED, TxnStatus.COMMITTING, false));
+                futureList.add(transactionMetadataStore.updateTxnStatus(txnID2, TxnStatus.ABORTED, TxnStatus.ABORTING, false));
+                FutureUtil.waitForAll(futureList).get();
 
-                transactionMetadataStore.updateTxnStatus(txnID1, TxnStatus.COMMITTED, TxnStatus.COMMITTING, false).get();
-                transactionMetadataStore.updateTxnStatus(txnID2, TxnStatus.ABORTED, TxnStatus.ABORTING, false).get();
                 Field field = mlTransactionLog.getClass().getDeclaredField("cursor");
                 field.setAccessible(true);
                 ManagedCursor cursor = (ManagedCursor) field.get(mlTransactionLog);
@@ -371,8 +421,12 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
         }
     }
 
-    @Test
-    public void testRecoverWhenDeleteFromCursor() throws Exception {
+    /**
+     * Verify transaction meta store recover correct.
+     * TODO After the batch feature is dynamically switched,append tests that contain both batch and non-batch data.
+     */
+    @Test(dataProvider = "bufferedWriterConfigDataProvider")
+    public void testRecoverWhenDeleteFromCursor(TxnLogBufferedWriterConfig txnLogBufferedWriterConfig) throws Exception {
         ManagedLedgerFactoryConfig factoryConf = new ManagedLedgerFactoryConfig();
         factoryConf.setMaxCacheSize(0);
 
@@ -383,8 +437,8 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
         MLTransactionSequenceIdGenerator mlTransactionSequenceIdGenerator = new MLTransactionSequenceIdGenerator();
         managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionSequenceIdGenerator);
         MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory,
-                managedLedgerConfig);
-        mlTransactionLog.initialize().join();
+                managedLedgerConfig, txnLogBufferedWriterConfig, scheduledExecutorService);
+        mlTransactionLog.initialize().get(2, TimeUnit.SECONDS);
         MLTransactionMetadataStore transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
                         new TransactionTimeoutTrackerImpl(), mlTransactionSequenceIdGenerator, 0L);
@@ -401,8 +455,8 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
         transactionMetadataStore.updateTxnStatus(txnID2, TxnStatus.ABORTED, TxnStatus.ABORTING, false).get();
 
         mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory,
-                managedLedgerConfig);
-        mlTransactionLog.initialize().join();
+                managedLedgerConfig, txnLogBufferedWriterConfig, scheduledExecutorService);
+        mlTransactionLog.initialize().get(2, TimeUnit.SECONDS);
         transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
                         new TransactionTimeoutTrackerImpl(), mlTransactionSequenceIdGenerator, 0L);
@@ -411,8 +465,8 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
         Awaitility.await().until(transactionMetadataStore::checkIfReady);
     }
 
-    @Test
-    public void testManageLedgerWriteFailState() throws Exception {
+    @Test(dataProvider = "bufferedWriterConfigDataProvider")
+    public void testManageLedgerWriteFailState(TxnLogBufferedWriterConfig txnLogBufferedWriterConfig) throws Exception {
         ManagedLedgerFactoryConfig factoryConf = new ManagedLedgerFactoryConfig();
         factoryConf.setMaxCacheSize(0);
 
@@ -423,8 +477,8 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
         MLTransactionSequenceIdGenerator mlTransactionSequenceIdGenerator = new MLTransactionSequenceIdGenerator();
         managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionSequenceIdGenerator);
         MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory,
-                managedLedgerConfig);
-        mlTransactionLog.initialize().join();
+                managedLedgerConfig, txnLogBufferedWriterConfig, scheduledExecutorService);
+        mlTransactionLog.initialize().get(2, TimeUnit.SECONDS);
         MLTransactionMetadataStore transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
                         new TransactionTimeoutTrackerImpl(), mlTransactionSequenceIdGenerator, 0L);
diff --git a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java
index 3f80d850b1a..bb3cfc869f6 100644
--- a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java
+++ b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java
@@ -26,11 +26,15 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
 
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.InvalidTxnStatusException;
 import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.TransactionNotFoundException;
 import org.apache.pulsar.transaction.coordinator.impl.InMemTransactionMetadataStoreProvider;
+import org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriterConfig;
 import org.apache.pulsar.transaction.coordinator.proto.TxnStatus;
+import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Factory;
@@ -52,6 +56,7 @@ public class TransactionMetadataStoreProviderTest {
     private TransactionMetadataStoreProvider provider;
     private TransactionCoordinatorID tcId;
     private TransactionMetadataStore store;
+    private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);
 
     @Factory(dataProvider = "providers")
     public TransactionMetadataStoreProviderTest(String providerClassName) throws Exception {
@@ -63,7 +68,13 @@ public class TransactionMetadataStoreProviderTest {
     public void setup() throws Exception {
         this.tcId = new TransactionCoordinatorID(1L);
         this.store = this.provider.openStore(tcId, null, null,
-                null, new MLTransactionMetadataStoreTest.TransactionRecoverTrackerImpl(), 0L).get();
+                null, new MLTransactionMetadataStoreTest.TransactionRecoverTrackerImpl(), 0L,
+                new TxnLogBufferedWriterConfig(), scheduledExecutorService).get();
+    }
+
+    @AfterClass
+    public void cleanup(){
+        scheduledExecutorService.shutdown();
     }
 
     @Test
diff --git a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImplTest.java b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImplTest.java
new file mode 100644
index 00000000000..4561b7521c7
--- /dev/null
+++ b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImplTest.java
@@ -0,0 +1,391 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator.impl;
+
+import com.google.common.collect.ComparisonChain;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.common.api.proto.Subscription;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.collections.BitSetRecyclable;
+import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
+import org.apache.pulsar.transaction.coordinator.TransactionRecoverTracker;
+import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTracker;
+import org.apache.pulsar.transaction.coordinator.proto.TransactionMetadataEntry;
+import org.apache.pulsar.transaction.coordinator.proto.TxnStatus;
+import org.apache.pulsar.transaction.coordinator.test.MockedBookKeeperTestCase;
+import static org.mockito.Mockito.*;
+import org.awaitility.Awaitility;
+import org.testng.Assert;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker")
+public class MLTransactionLogImplTest extends MockedBookKeeperTestCase {
+
+    @DataProvider(name = "variedBufferedWriteConfigProvider")
+    private Object[][] variedBufferedWriteConfigProvider(){
+        return new Object[][]{
+                {true, true},
+                {false, false},
+                {true, false},
+                {false, true}
+        };
+    }
+    /**
+     * 1. Add some transaction logs.
+     * 2. Create a new transaction meta store and execute recover, assert that the txn-mapping built by Recover is as
+     *    expected. This validates the read and write correct.
+     * 3. Commit some transaction and add another transaction logs.
+     * 4. Create another transaction meta store and execute recover, assert that the cursor-mark-deleted-position and
+     *    the cursor-batch-indexes is expected. This validates delete correct, assert that
+     */
+    @Test(dataProvider = "variedBufferedWriteConfigProvider")
+    public void testMainProcess(boolean writeWithBatch, boolean readWithBatch) throws Exception {
+        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);
+        TxnLogBufferedWriterConfig bufferedWriterConfigForWrite = new TxnLogBufferedWriterConfig();
+        bufferedWriterConfigForWrite.setBatchedWriteMaxDelayInMillis(1000 * 3600);
+        bufferedWriterConfigForWrite.setBatchedWriteMaxRecords(3);
+        bufferedWriterConfigForWrite.setBatchEnabled(writeWithBatch);
+        TransactionCoordinatorID transactionCoordinatorID = TransactionCoordinatorID.get(0);
+        MLTransactionLogImpl mlTransactionLogForWrite = new MLTransactionLogImpl(TransactionCoordinatorID.get(0), factory,
+                new ManagedLedgerConfig(), bufferedWriterConfigForWrite, scheduledExecutorService);
+        mlTransactionLogForWrite.initialize().get(3, TimeUnit.SECONDS);
+        Map<Integer, List<CompletableFuture<Position>>> expectedMapping = new HashMap<>();
+        /**
+         * 1. Start 20 transactions, these will eventually be committed.
+         *    1-1. new transactions.
+         *    1-2. add partition.
+         *    1-3. add subscribe.
+         * 2. Start 30 transactions, these transactions will never commit, to validate the delete-logic.
+         */
+        // Add logs: start transaction.
+        for (int i = 1; i <= 20; i++){
+            TransactionMetadataEntry transactionLog = new TransactionMetadataEntry();
+            transactionLog.setTxnidMostBits(i);
+            transactionLog.setTxnidLeastBits(i);
+            transactionLog.setMaxLocalTxnId(i);
+            transactionLog.setStartTime(i);
+            transactionLog.setTimeoutMs(i);
+            transactionLog.setMetadataOp(TransactionMetadataEntry.TransactionMetadataOp.NEW);
+            CompletableFuture<Position> future = mlTransactionLogForWrite.append(transactionLog);
+            expectedMapping.computeIfAbsent(i, k -> new ArrayList<>());
+            expectedMapping.get(i).add(future);
+        }
+        // Add logs: add partition.
+        for (int i = 1; i <= 20; i++){
+            TransactionMetadataEntry transactionLog = new TransactionMetadataEntry();
+            transactionLog.setTxnidLeastBits(i);
+            transactionLog.setMaxLocalTxnId(i);
+            transactionLog.setMetadataOp(TransactionMetadataEntry.TransactionMetadataOp.ADD_PARTITION);
+            transactionLog.addAllPartitions(Arrays.asList(String.valueOf(i)));
+            CompletableFuture<Position> future = mlTransactionLogForWrite.append(transactionLog);
+            expectedMapping.computeIfAbsent(i, k -> new ArrayList<>());
+            expectedMapping.get(i).add(future);
+        }
+        // Add logs: add subscription.
+        for (int i = 1; i <= 20; i++){
+            TransactionMetadataEntry transactionLog = new TransactionMetadataEntry();
+            transactionLog.setTxnidLeastBits(i);
+            transactionLog.setMaxLocalTxnId(i);
+            transactionLog.setMetadataOp(TransactionMetadataEntry.TransactionMetadataOp.ADD_SUBSCRIPTION);
+            Subscription subscription = new Subscription();
+            subscription.setSubscription(String.valueOf(i));
+            subscription.setTopic(String.valueOf(i));
+            transactionLog.addAllSubscriptions(Arrays.asList(subscription));
+            CompletableFuture<Position> future = mlTransactionLogForWrite.append(transactionLog);
+            expectedMapping.computeIfAbsent(i, k -> new ArrayList<>());
+            expectedMapping.get(i).add(future);
+        }
+        // Add logs: new transactions. These transactions will never commit, to validate the logic of transaction log.
+        for (int i = 21; i <= 50; i++){
+            TransactionMetadataEntry transactionLog = new TransactionMetadataEntry();
+            transactionLog.setTxnidMostBits(i);
+            transactionLog.setTxnidLeastBits(i);
+            transactionLog.setMaxLocalTxnId(i);
+            transactionLog.setStartTime(i);
+            transactionLog.setTimeoutMs(i);
+            transactionLog.setMetadataOp(TransactionMetadataEntry.TransactionMetadataOp.NEW);
+            CompletableFuture<Position> future = mlTransactionLogForWrite.append(transactionLog);
+            expectedMapping.computeIfAbsent(i, k -> new ArrayList<>());
+            expectedMapping.get(i).add(future);
+        }
+        // Waiting all future completed.
+        FutureUtil.waitForAll(expectedMapping.values().stream()
+                .flatMap(l -> l.stream()).collect(Collectors.toList()))
+                .get(2, TimeUnit.SECONDS);
+        /**
+         * Create a new transaction meta store and execute recover to verify that the txn-mapping built by Recover is as
+         * expected. This validates the read and write correct.
+         */
+        // Create another transaction log for recover.
+        TxnLogBufferedWriterConfig bufferedWriterConfigForRecover = new TxnLogBufferedWriterConfig();
+        bufferedWriterConfigForRecover.setBatchedWriteMaxDelayInMillis(1000 * 3600);
+        bufferedWriterConfigForRecover.setBatchedWriteMaxRecords(3);
+        bufferedWriterConfigForRecover.setBatchEnabled(readWithBatch);
+        MLTransactionLogImpl mlTransactionLogForRecover = new MLTransactionLogImpl(TransactionCoordinatorID.get(0), factory,
+                new ManagedLedgerConfig(), bufferedWriterConfigForRecover, scheduledExecutorService);
+        mlTransactionLogForRecover.initialize().get(3, TimeUnit.SECONDS);
+        // Recover and verify the txnID and position mappings.
+        TransactionTimeoutTracker timeoutTracker = mock(TransactionTimeoutTracker.class);
+        MLTransactionSequenceIdGenerator sequenceIdGenerator = mock(MLTransactionSequenceIdGenerator.class);
+        TransactionRecoverTracker recoverTracker = mock(TransactionRecoverTracker.class);
+        MLTransactionMetadataStore transactionMetadataStoreForRecover = new MLTransactionMetadataStore(transactionCoordinatorID,
+                mlTransactionLogForRecover, timeoutTracker, sequenceIdGenerator, Integer.MAX_VALUE);
+        transactionMetadataStoreForRecover.init(recoverTracker).get(2000, TimeUnit.SECONDS);
+        Assert.assertEquals(transactionMetadataStoreForRecover.txnMetaMap.size(), expectedMapping.size());
+        Iterator<Integer> txnIdSet = expectedMapping.keySet().iterator();
+        while (txnIdSet.hasNext()){
+            int txnId = txnIdSet.next();
+            List<CompletableFuture<Position>> expectedPositions = expectedMapping.get(txnId);
+            List<Position> actualPositions = transactionMetadataStoreForRecover.txnMetaMap.get(Long.valueOf(txnId)).getRight();
+            Assert.assertEquals(actualPositions.size(), expectedPositions.size());
+            for (int i = 0; i< expectedPositions.size(); i++){
+                Position expectedPosition = expectedPositions.get(i).get(1, TimeUnit.SECONDS);
+                Position actualPosition = actualPositions.get(i);
+                Assert.assertEquals(actualPosition, expectedPosition);
+            }
+        }
+        /**
+         * 1. Commit transactions that create at step-1.
+         * 2. Start another 20 transactions, these transactions will never commit, to validate the delete-logic.
+         */
+        // Add logs: committing.
+        for (int i = 1; i <= 20; i++){
+            TransactionMetadataEntry transactionLog = new TransactionMetadataEntry();
+            transactionLog.setTxnidLeastBits(i);
+            transactionLog.setMaxLocalTxnId(i);
+            transactionLog.setExpectedStatus(TxnStatus.OPEN);
+            transactionLog.setNewStatus(TxnStatus.COMMITTING);
+            transactionLog.setMetadataOp(TransactionMetadataEntry.TransactionMetadataOp.UPDATE);
+            CompletableFuture<Position> future = mlTransactionLogForWrite.append(transactionLog);
+            expectedMapping.computeIfAbsent(i, k -> new ArrayList<>());
+            expectedMapping.get(i).add(future);
+        }
+        // Add logs: committed.
+        for (int i = 1; i <= 20; i++){
+            TransactionMetadataEntry transactionLog = new TransactionMetadataEntry();
+            transactionLog.setTxnidLeastBits(i);
+            transactionLog.setMaxLocalTxnId(i);
+            transactionLog.setExpectedStatus(TxnStatus.COMMITTING);
+            transactionLog.setNewStatus(TxnStatus.COMMITTED);
+            transactionLog.setMetadataOp(TransactionMetadataEntry.TransactionMetadataOp.UPDATE);
+            CompletableFuture<Position> future = mlTransactionLogForWrite.append(transactionLog);
+            expectedMapping.computeIfAbsent(i, k -> new ArrayList<>());
+            expectedMapping.get(i).add(future);
+        }
+        // Add logs: new transaction.
+        for (int i = 51; i <= 70; i++){
+            TransactionMetadataEntry transactionLog = new TransactionMetadataEntry();
+            transactionLog.setTxnidMostBits(i);
+            transactionLog.setTxnidLeastBits(i);
+            transactionLog.setMaxLocalTxnId(i);
+            transactionLog.setStartTime(i);
+            transactionLog.setTimeoutMs(i);
+            transactionLog.setMetadataOp(TransactionMetadataEntry.TransactionMetadataOp.NEW);
+            CompletableFuture<Position> future = mlTransactionLogForWrite.append(transactionLog);
+            expectedMapping.computeIfAbsent(i, k -> new ArrayList<>());
+            expectedMapping.get(i).add(future);
+        }
+        // Waiting all callback.
+        FutureUtil.waitForAll(expectedMapping.values().stream()
+                        .flatMap(l -> l.stream()).collect(Collectors.toList()))
+                .get(2, TimeUnit.SECONDS);
+        // rewind the cursor
+        ManagedCursorImpl managedCursorForRecover =
+                (ManagedCursorImpl)transactionMetadataStoreForRecover.getManagedLedger().getCursors().iterator().next();
+        /** Rewind the cursor for next-step. **/
+        managedCursorForRecover.rewind();
+        /**
+         * Create another transaction meta store and execute recover, assert that the cursor-mark-deleted-position and
+         * the cursor-batch-indexes is expected.
+         */
+        // Create another transaction log for recover.
+        MLTransactionLogImpl mlTransactionLogForDelete = new MLTransactionLogImpl(TransactionCoordinatorID.get(0), factory,
+                new ManagedLedgerConfig(), bufferedWriterConfigForRecover, scheduledExecutorService);
+        mlTransactionLogForDelete.initialize().get(3, TimeUnit.SECONDS);
+        MLTransactionMetadataStore transactionMetadataStoreForDelete = new MLTransactionMetadataStore(transactionCoordinatorID,
+                mlTransactionLogForDelete, timeoutTracker, sequenceIdGenerator, Integer.MAX_VALUE);
+        transactionMetadataStoreForDelete.init(recoverTracker).get(2000, TimeUnit.SECONDS);
+        ManagedCursorImpl managedCursor =
+                (ManagedCursorImpl)mlTransactionLogForDelete.getManagedLedger().getCursors().iterator().next();
+        // Calculate expected deleted positions.
+        List<Position> expectedDeletedPositions = new ArrayList<>();
+        for (int i = 1; i <= 20; i++){
+            expectedDeletedPositions.addAll(
+                    expectedMapping.remove(i).stream()
+                            .map(f -> f.join())
+                            .collect(Collectors.toList()));
+        }
+        expectedDeletedPositions = expectedDeletedPositions.stream().sorted((o1,o2) -> {
+            if (o1 instanceof TxnBatchedPositionImpl){
+                TxnBatchedPositionImpl t1 = (TxnBatchedPositionImpl) o1;
+                TxnBatchedPositionImpl t2 = (TxnBatchedPositionImpl) o2;
+                return ComparisonChain.start()
+                        .compare(o1.getLedgerId(), o2.getLedgerId())
+                        .compare(o1.getEntryId(), o2.getEntryId())
+                        .compare(t1.getBatchIndex(), t2.getBatchIndex())
+                        .result();
+            }else {
+                return ComparisonChain.start()
+                        .compare(o1.getLedgerId(), o2.getLedgerId())
+                        .compare(o1.getEntryId(), o2.getEntryId())
+                        .result();
+            }
+        }).collect(Collectors.toList());
+        PositionImpl markDeletedPosition = null;
+        LinkedHashMap<PositionImpl, BitSetRecyclable> batchIndexes = null;
+        if (expectedDeletedPositions.get(0) instanceof TxnBatchedPositionImpl){
+            Pair<PositionImpl, LinkedHashMap<PositionImpl, BitSetRecyclable>> pair =
+                    calculateBatchIndexes(
+                            expectedDeletedPositions.stream()
+                                    .map(p -> (TxnBatchedPositionImpl)p)
+                                    .collect(Collectors.toList())
+                    );
+            markDeletedPosition = pair.getLeft();
+            batchIndexes = pair.getRight();
+        } else {
+            markDeletedPosition = calculateMarkDeletedPosition(expectedDeletedPositions);
+        }
+        final PositionImpl markDeletedPosition_final = markDeletedPosition;
+        // Assert mark deleted position correct.
+        Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() -> {
+            Position actualMarkDeletedPosition = managedCursor.getMarkDeletedPosition();
+            return markDeletedPosition_final.getLedgerId() == actualMarkDeletedPosition.getLedgerId() &&
+                    markDeletedPosition_final.getEntryId() == actualMarkDeletedPosition.getEntryId();
+        });
+        // Assert batchIndexes correct.
+        if (batchIndexes != null){
+            // calculate last deleted position.
+            Map.Entry<PositionImpl, BitSetRecyclable>
+                    lastOne = batchIndexes.entrySet().stream().reduce((a, b) -> b).get();
+            // Wait last one has been deleted from cursor.
+            Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() -> {
+                long[] ls = managedCursor.getBatchPositionAckSet(lastOne.getKey());
+                return Arrays.equals(lastOne.getValue().toLongArray(), ls);
+            });
+            // Verify batch indexes.
+            for (Map.Entry<PositionImpl, BitSetRecyclable> entry : batchIndexes.entrySet()){
+                PositionImpl p = entry.getKey();
+                long[] actualAckSet = managedCursor.getBatchPositionAckSet(p);
+                Assert.assertEquals(actualAckSet, entry.getValue().toLongArray());
+                entry.getValue().recycle();
+            }
+        }
+        /** cleanup. **/
+        mlTransactionLogForWrite.closeAsync().get(2, TimeUnit.SECONDS);
+        mlTransactionLogForRecover.closeAsync().get(2, TimeUnit.SECONDS);
+        mlTransactionLogForDelete.closeAsync().get(2, TimeUnit.SECONDS);
+        transactionMetadataStoreForRecover.closeAsync().get(2, TimeUnit.SECONDS);
+        transactionMetadataStoreForDelete.closeAsync().get(2, TimeUnit.SECONDS);
+    }
+
+    /***
+     * Calculate markDeletedPosition by {@param sortedDeletedPositions}.
+     */
+    private PositionImpl calculateMarkDeletedPosition(Collection<Position> sortedDeletedPositions){
+        Position markDeletedPosition = null;
+        for (Position position : sortedDeletedPositions){
+            if (markDeletedPosition == null){
+                markDeletedPosition = position;
+                continue;
+            }
+            // Ledger are never closed, so all positions has same ledger-id.
+            if (markDeletedPosition.getEntryId() == position.getEntryId() - 1){
+                markDeletedPosition = position;
+                continue;
+            } else {
+                break;
+            }
+        }
+        if (markDeletedPosition == null) {
+            return null;
+        }
+        return PositionImpl.get(markDeletedPosition.getLedgerId(), markDeletedPosition.getEntryId());
+    }
+
+    /***
+     * Calculate markDeletedPosition and batchIndexes by {@param sortedDeletedPositions}.
+     */
+    private Pair<PositionImpl, LinkedHashMap<PositionImpl, BitSetRecyclable>> calculateBatchIndexes(
+            List<TxnBatchedPositionImpl> sortedDeletedPositions){
+        // build batchIndexes.
+        LinkedHashMap<PositionImpl, BitSetRecyclable> batchIndexes = new LinkedHashMap<>();
+        for (TxnBatchedPositionImpl batchedPosition : sortedDeletedPositions){
+            batchedPosition.setAckSetByIndex();
+            PositionImpl k = PositionImpl.get(batchedPosition.getLedgerId(), batchedPosition.getEntryId());
+            BitSetRecyclable bitSetRecyclable = batchIndexes.get(k);
+            if (bitSetRecyclable == null){
+                bitSetRecyclable = BitSetRecyclable.valueOf(batchedPosition.getAckSet());
+                batchIndexes.put(k, bitSetRecyclable);
+            }
+            bitSetRecyclable.clear(batchedPosition.getBatchIndex());
+        }
+        // calculate markDeletedPosition.
+        Position markDeletedPosition = null;
+        for (Map.Entry<PositionImpl, BitSetRecyclable> entry : batchIndexes.entrySet()){
+            PositionImpl position = entry.getKey();
+            BitSetRecyclable bitSetRecyclable = entry.getValue();
+            if (!bitSetRecyclable.isEmpty()){
+                break;
+            }
+            if (markDeletedPosition == null){
+                markDeletedPosition = position;
+                continue;
+            }
+            // Ledger are never closed, so all positions has same ledger-id.
+            if (markDeletedPosition.getEntryId() == position.getEntryId() - 1){
+                markDeletedPosition = position;
+                continue;
+            } else {
+                break;
+            }
+        }
+        // remove empty bitSet.
+        List<Position> shouldRemoveFromMap = new ArrayList<>();
+        for (Map.Entry<PositionImpl, BitSetRecyclable> entry : batchIndexes.entrySet()) {
+            BitSetRecyclable bitSetRecyclable = entry.getValue();
+            if (bitSetRecyclable.isEmpty()) {
+                shouldRemoveFromMap.add(entry.getKey());
+            }
+        }
+        for (Position position : shouldRemoveFromMap){
+            BitSetRecyclable bitSetRecyclable = batchIndexes.remove(position);
+            bitSetRecyclable.recycle();
+        }
+        return Pair.of(PositionImpl.get(markDeletedPosition.getLedgerId(), markDeletedPosition.getEntryId()),
+                batchIndexes);
+    }
+}
\ No newline at end of file
diff --git a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnBatchedPositionImplTest.java b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnBatchedPositionImplTest.java
new file mode 100644
index 00000000000..1d0bef2c97d
--- /dev/null
+++ b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnBatchedPositionImplTest.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator.impl;
+
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.common.util.collections.BitSetRecyclable;
+import org.testng.Assert;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker")
+public class TxnBatchedPositionImplTest {
+
+    @DataProvider(name = "batchSizeAndBatchIndexArgsArray")
+    private Object[][] batchSizeAndBatchIndexArgsArray(){
+        return new Object[][]{
+                {10, 5},
+                {64, 0},
+                {64, 63},
+                {230, 120},
+                {256, 255}
+        };
+    }
+
+    @Test(dataProvider = "batchSizeAndBatchIndexArgsArray")
+    public void testSetAckSetByIndex(int batchSize, int batchIndex){
+        // test 1/64
+        TxnBatchedPositionImpl txnBatchedPosition = new TxnBatchedPositionImpl(1,1, batchSize, batchIndex);
+        txnBatchedPosition.setAckSetByIndex();
+        long[] ls = txnBatchedPosition.getAckSet();
+        BitSetRecyclable bitSetRecyclable = BitSetRecyclable.valueOf(ls);
+        for (int i = 0; i < batchSize; i++){
+            if (i == batchIndex) {
+                Assert.assertFalse(bitSetRecyclable.get(i));
+            } else {
+                Assert.assertTrue(bitSetRecyclable.get(i));
+            }
+        }
+        bitSetRecyclable.recycle();
+    }
+
+    @DataProvider(name = "testHashcodeAndEqualsData")
+    public Object[][] testHashcodeAndEqualsData(){
+        Random random = new Random();
+        return new Object[][]{
+                {1,2, 10, 5},
+                {123,1523, 64, 0},
+                {random.nextInt(65535),random.nextInt(65535), 230, 120},
+                {random.nextInt(65535),random.nextInt(65535), 256, 255}
+        };
+    }
+
+    /**
+     * Why is this test needed ?
+     * {@link org.apache.bookkeeper.mledger.impl.ManagedCursorImpl} maintains batchIndexes, use {@link PositionImpl} or
+     * {@link TxnBatchedPositionImpl} as the key. However, different maps may use "param-key.equals(key-in-map)" to
+     * determine the contains, or use "key-in-map.equals(param-key)" or use "param-key.compareTo(key-in-map)" or use
+     * "key-in-map.compareTo(param-key)" to determine the {@link Map#containsKey(Object)}, the these approaches may
+     * return different results.
+     * Note: In {@link java.util.concurrent.ConcurrentSkipListMap}, it use the {@link Comparable#compareTo(Object)} to
+     *    determine whether the keys are the same. In {@link java.util.HashMap}, it use the
+     *    {@link Object#hashCode()} & {@link  Object#equals(Object)} to determine whether the keys are the same.
+     */
+    @Test(dataProvider = "testHashcodeAndEqualsData")
+    public void testKeyInMap(long ledgerId, long entryId, int batchSize, int batchIndex){
+        // build data.
+        Random random = new Random();
+        int v = random.nextInt();
+        PositionImpl position = new PositionImpl(ledgerId, entryId);
+        TxnBatchedPositionImpl txnBatchedPosition = new TxnBatchedPositionImpl(position, batchSize, batchIndex);
+        // ConcurrentSkipListMap.
+        ConcurrentSkipListMap<PositionImpl, Integer> map1 = new ConcurrentSkipListMap<>();
+        map1.put(position, v);
+        Assert.assertTrue(map1.containsKey(txnBatchedPosition));
+        ConcurrentSkipListMap<PositionImpl, Integer> map2 = new ConcurrentSkipListMap<>();
+        map2.put(txnBatchedPosition, v);
+        Assert.assertTrue(map2.containsKey(position));
+        // HashMap.
+        HashMap<PositionImpl, Integer> map3 = new HashMap<>();
+        map3.put(position, v);
+        Assert.assertTrue(map3.containsKey(txnBatchedPosition));
+        HashMap<PositionImpl, Integer> map4 = new HashMap<>();
+        map4.put(txnBatchedPosition, v);
+        Assert.assertTrue(map4.containsKey(position));
+        // ConcurrentHashMap.
+        ConcurrentHashMap<PositionImpl, Integer> map5 = new ConcurrentHashMap<>();
+        map5.put(position, v);
+        Assert.assertTrue(map5.containsKey(txnBatchedPosition));
+        ConcurrentHashMap<PositionImpl, Integer> map6 = new ConcurrentHashMap<>();
+        map6.put(txnBatchedPosition, v);
+        Assert.assertTrue(map6.containsKey(position));
+        // LinkedHashMap.
+        LinkedHashMap<PositionImpl, Integer> map7 = new LinkedHashMap<>();
+        map7.put(position, v);
+        Assert.assertTrue(map7.containsKey(txnBatchedPosition));
+        LinkedHashMap<PositionImpl, Integer> map8 = new LinkedHashMap<>();
+        map8.put(txnBatchedPosition, v);
+        Assert.assertTrue(map8.containsKey(position));
+    }
+
+    /**
+     * Why is this test needed ?
+     * Make sure that when compare to the "markDeletePosition", it looks like {@link PositionImpl}
+     * Note: In {@link java.util.concurrent.ConcurrentSkipListMap}, it use the {@link Comparable#compareTo(Object)} to
+     *    determine whether the keys are the same. In {@link java.util.HashMap}, it use the
+     *    {@link Object#hashCode()} & {@link  Object#equals(Object)} to determine whether the keys are the same.
+     */
+    @Test
+    public void testCompareTo(){
+        PositionImpl position = new PositionImpl(1, 1);
+        TxnBatchedPositionImpl txnBatchedPosition = new TxnBatchedPositionImpl(position, 2, 0);
+        Assert.assertEquals(position.compareTo(txnBatchedPosition), 0);
+        Assert.assertEquals(txnBatchedPosition.compareTo(position), 0);
+    }
+}
\ No newline at end of file