You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/11/22 05:08:14 UTC

[pulsar] branch master updated: [Transaction] Transaction log (#8658)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new c25bab8  [Transaction] Transaction log (#8658)
c25bab8 is described below

commit c25bab8f28ef08c6a82acf371e71264022d574e8
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Sun Nov 22 13:07:43 2020 +0800

    [Transaction] Transaction log (#8658)
    
    Master Issue: [PIP31](https://github.com/apache/pulsar/wiki/PIP-31%3A-Transaction-Support)
    ### Motivation
    Implemention of [PIP31](https://github.com/apache/pulsar/wiki/PIP-31%3A-Transaction-Support), transaction log
    
    ### Modifications
    Add TransactionMetadataStore implemention by managed ledger
    ### Verifying this change
    Add the tests for it
---
 pom.xml                                            |    2 +
 .../broker/TransactionMetadataStoreService.java    |   31 +-
 .../apache/pulsar/broker/service/ServerCnx.java    |   15 +-
 .../broker/transaction/buffer/TransactionMeta.java |    2 +-
 .../exceptions/TransactionStatusException.java     |    2 +-
 .../buffer/impl/InMemTransactionBuffer.java        |    8 +-
 .../TransactionMetadataStoreServiceTest.java       |   12 +-
 .../transaction/buffer/TransactionBufferTest.java  |    2 +-
 .../coordinator/generate_protobuf.sh               |   23 +
 .../coordinator/generate_protobuf_docker.sh        |   42 +
 pulsar-transaction/coordinator/pom.xml             |   32 +-
 .../transaction/coordinator/TransactionLog.java    |   51 +
 .../coordinator/TransactionLogReplayCallback.java  |   31 +-
 .../coordinator/TransactionMetadataStore.java      |   14 +-
 .../coordinator/TransactionMetadataStoreState.java |   71 ++
 .../pulsar/transaction/coordinator/TxnMeta.java    |    5 +-
 .../exceptions/CoordinatorException.java           |   28 +-
 .../impl/InMemTransactionMetadataStore.java        |   14 +-
 .../coordinator/impl/MLTransactionLogImpl.java     |  251 +++++
 .../impl/MLTransactionMetadataStore.java           |  375 +++++++
 .../impl/MLTransactionMetadataStoreProvider.java   |   52 +
 .../transaction/coordinator/impl/TxnMetaImpl.java  |   40 +-
 .../proto/PulsarTransactionMetadata.java           | 1123 ++++++++++++++++++++
 .../coordinator/util/TransactionUtil.java}         |   28 +-
 .../transaction/coordinator/util/package-info.java |   19 +-
 .../src/main/proto/PulsarTransactionMetadata.proto |   53 +
 .../MLTransactionMetadataStoreTest.java            |  262 +++++
 .../TransactionMetadataStoreProviderTest.java      |   14 +-
 .../transaction/coordinator}/TxnStatusTest.java    |   17 +-
 .../coordinator/test/MockedBookKeeperTestCase.java |  139 +++
 30 files changed, 2643 insertions(+), 115 deletions(-)

diff --git a/pom.xml b/pom.xml
index 02ba4b3..4d0e507 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1149,6 +1149,7 @@ flexible messaging model and an intuitive client API.</description>
             <exclude>**/*.graffle</exclude>
             <exclude>**/*.hgrm</exclude>
             <exclude>src/main/java/org/apache/bookkeeper/mledger/proto/MLDataFormats.java</exclude>
+            <exclude>src/main/java/org/apache/pulsar/transaction/coordinator/proto/PulsarTransactionMetadata.java</exclude>
             <exclude>src/main/java/org/apache/pulsar/broker/service/schema/proto/SchemaRegistryFormat.java</exclude>
             <exclude>src/main/java/org/apache/pulsar/common/api/proto/*.java</exclude>
             <exclude>src/test/java/org/apache/pulsar/common/api/proto/*.java</exclude>
@@ -1227,6 +1228,7 @@ flexible messaging model and an intuitive client API.</description>
                  and are included in source tree for convenience -->
             <exclude>src/main/java/org/apache/bookkeeper/mledger/proto/MLDataFormats.java</exclude>
             <exclude>src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java</exclude>
+            <exclude>src/main/java/org/apache/pulsar/transaction/coordinator/proto/PulsarTransactionMetadata.java</exclude>
             <exclude>src/test/java/org/apache/pulsar/common/api/proto/TestApi.java</exclude>
             <exclude>src/main/java/org/apache/pulsar/common/api/proto/PulsarMarkers.java</exclude>
             <exclude>src/main/java/org/apache/pulsar/broker/service/schema/proto/SchemaRegistryFormat.java</exclude>
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
index 9ec9184..2a65b14 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
@@ -20,12 +20,17 @@ package org.apache.pulsar.broker;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener;
+
 import org.apache.pulsar.broker.transaction.buffer.exceptions.UnsupportedTxnActionException;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.transaction.TransactionBufferClient;
 import org.apache.pulsar.client.api.transaction.TxnID;
+
+import org.apache.pulsar.common.api.proto.PulsarApi.TxnAction;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
+
 import org.apache.pulsar.client.impl.MessageIdImpl;
-import org.apache.pulsar.common.api.proto.PulsarApi;
+
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
@@ -36,7 +41,7 @@ import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreProvide
 import org.apache.pulsar.transaction.coordinator.TransactionSubscription;
 import org.apache.pulsar.transaction.coordinator.TxnMeta;
 import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.CoordinatorNotFoundException;
-import org.apache.pulsar.transaction.impl.common.TxnStatus;
+import org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -135,12 +140,12 @@ public class TransactionMetadataStoreService {
         }
     }
 
-    public CompletableFuture<TxnID> newTransaction(TransactionCoordinatorID tcId) {
+    public CompletableFuture<TxnID> newTransaction(TransactionCoordinatorID tcId, long timeoutInMills) {
         TransactionMetadataStore store = stores.get(tcId);
         if (store == null) {
             return FutureUtil.failedFuture(new CoordinatorNotFoundException(tcId));
         }
-        return store.newTransaction();
+        return store.newTransaction(timeoutInMills);
     }
 
     public CompletableFuture<Void> addProducedPartitionToTxn(TxnID txnId, List<String> partitions) {
@@ -179,14 +184,14 @@ public class TransactionMetadataStoreService {
         return store.updateTxnStatus(txnId, newStatus, expectedStatus);
     }
 
-    public CompletableFuture<Void> endTransaction(TxnID txnID, int txnAction, List<PulsarApi.MessageIdData> messageIdDataList) {
+    public CompletableFuture<Void> endTransaction(TxnID txnID, int txnAction, List<MessageIdData> messageIdDataList) {
         CompletableFuture<Void> completableFuture = new CompletableFuture<>();
         TxnStatus newStatus;
         switch (txnAction) {
-            case PulsarApi.TxnAction.COMMIT_VALUE:
+            case TxnAction.COMMIT_VALUE:
                 newStatus = TxnStatus.COMMITTING;
                 break;
-            case PulsarApi.TxnAction.ABORT_VALUE:
+            case TxnAction.ABORT_VALUE:
                 newStatus = TxnStatus.ABORTING;
                 break;
             default:
@@ -210,7 +215,7 @@ public class TransactionMetadataStoreService {
     }
 
     private CompletableFuture<Void> endTxnInTransactionBuffer(TxnID txnID, int txnAction,
-                                                              List<PulsarApi.MessageIdData> messageIdDataList) {
+                                                              List<MessageIdData> messageIdDataList) {
         CompletableFuture<Void> resultFuture = new CompletableFuture<>();
         List<CompletableFuture<TxnID>> completableFutureList = new ArrayList<>();
         this.getTxnMeta(txnID).whenComplete((txnMeta, throwable) -> {
@@ -221,10 +226,10 @@ public class TransactionMetadataStoreService {
 
             txnMeta.ackedPartitions().forEach(tbSub -> {
                 CompletableFuture<TxnID> actionFuture = new CompletableFuture<>();
-                if (PulsarApi.TxnAction.COMMIT_VALUE == txnAction) {
+                if (TxnAction.COMMIT_VALUE == txnAction) {
                     actionFuture = tbClient.commitTxnOnSubscription(
                             tbSub.getTopic(), tbSub.getSubscription(), txnID.getMostSigBits(), txnID.getLeastSigBits());
-                } else if (PulsarApi.TxnAction.ABORT_VALUE == txnAction) {
+                } else if (TxnAction.ABORT_VALUE == txnAction) {
                     actionFuture = tbClient.abortTxnOnSubscription(
                             tbSub.getTopic(), tbSub.getSubscription(), txnID.getMostSigBits(), txnID.getLeastSigBits());
                 } else {
@@ -234,7 +239,7 @@ public class TransactionMetadataStoreService {
             });
 
             List<MessageId> messageIdList = new ArrayList<>();
-            for (PulsarApi.MessageIdData messageIdData : messageIdDataList) {
+            for (MessageIdData messageIdData : messageIdDataList) {
                 messageIdList.add(new MessageIdImpl(
                         messageIdData.getLedgerId(), messageIdData.getEntryId(), messageIdData.getPartition()));
                 messageIdData.recycle();
@@ -242,12 +247,12 @@ public class TransactionMetadataStoreService {
 
             txnMeta.producedPartitions().forEach(partition -> {
                 CompletableFuture<TxnID> actionFuture = new CompletableFuture<>();
-                if (PulsarApi.TxnAction.COMMIT_VALUE == txnAction) {
+                if (TxnAction.COMMIT_VALUE == txnAction) {
                     actionFuture = tbClient.commitTxnOnTopic(partition, txnID.getMostSigBits(), txnID.getLeastSigBits(),
                             messageIdList.stream().filter(
                                     msg -> ((MessageIdImpl) msg).getPartitionIndex() ==
                                             TopicName.get(partition).getPartitionIndex()).collect(Collectors.toList()));
-                } else if (PulsarApi.TxnAction.ABORT_VALUE == txnAction) {
+                } else if (TxnAction.ABORT_VALUE == txnAction) {
                     actionFuture = tbClient.abortTxnOnTopic(partition, txnID.getMostSigBits(), txnID.getLeastSigBits(),
                             messageIdList.stream().filter(
                                     msg -> ((MessageIdImpl) msg).getPartitionIndex() ==
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index c8eae21..c5ef5fe 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -37,7 +37,6 @@ import io.netty.handler.ssl.SslHandler;
 import io.netty.util.concurrent.Promise;
 
 import java.net.SocketAddress;
-import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Set;
@@ -129,7 +128,7 @@ import org.apache.pulsar.common.util.SafeCollectionUtils;
 import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
 import org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite;
 import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
-import org.apache.pulsar.transaction.coordinator.TransactionSubscription;
+import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -1669,7 +1668,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
             log.debug("Receive new txn request {} to transaction meta store {} from {}.", command.getRequestId(), command.getTcId(), remoteAddress);
         }
         TransactionCoordinatorID tcId = TransactionCoordinatorID.get(command.getTcId());
-        service.pulsar().getTransactionMetadataStoreService().newTransaction(tcId)
+        service.pulsar().getTransactionMetadataStoreService().newTransaction(tcId, command.getTxnTtlSeconds())
             .whenComplete(((txnID, ex) -> {
                 if (ex == null) {
                     if (log.isDebugEnabled()) {
@@ -1827,13 +1826,9 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
             log.debug("Receive add published partition to txn request {} from {} with txnId {}",
                     command.getRequestId(), remoteAddress, txnID);
         }
-        List<TransactionSubscription> subscriptionList = command.getSubscriptionList().stream()
-                .map(subscription -> TransactionSubscription.builder()
-                        .topic(subscription.getTopic())
-                        .subscription(subscription.getSubscription())
-                        .build())
-                .collect(Collectors.toList());
-        service.pulsar().getTransactionMetadataStoreService().addAckedPartitionToTxn(txnID, subscriptionList)
+
+        service.pulsar().getTransactionMetadataStoreService().addAckedPartitionToTxn(txnID,
+                MLTransactionMetadataStore.subscriptionToTxnSubscription(command.getSubscriptionList()))
                 .whenComplete(((v, ex) -> {
                     if (ex == null) {
                         if (log.isDebugEnabled()) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionMeta.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionMeta.java
index a1d00f4..f769c98 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionMeta.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionMeta.java
@@ -24,7 +24,7 @@ import java.util.concurrent.CompletableFuture;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionStatusException;
 import org.apache.pulsar.client.api.transaction.TxnID;
-import org.apache.pulsar.transaction.impl.common.TxnStatus;
+import org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus;
 
 /**
  * The metadata for the transaction in the transaction buffer.
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/exceptions/TransactionStatusException.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/exceptions/TransactionStatusException.java
index c81b17a..7aebe2d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/exceptions/TransactionStatusException.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/exceptions/TransactionStatusException.java
@@ -19,7 +19,7 @@
 package org.apache.pulsar.broker.transaction.buffer.exceptions;
 
 import org.apache.pulsar.client.api.transaction.TxnID;
-import org.apache.pulsar.transaction.impl.common.TxnStatus;
+import org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus;
 
 /**
  * Exceptions are thrown when operations are applied to a transaction which is not in expected txn status.
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
index 048bba4..7f707c1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
@@ -31,7 +31,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import org.apache.bookkeeper.mledger.Position;
-import org.apache.pulsar.common.api.proto.PulsarApi;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
 import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader;
@@ -41,7 +41,7 @@ import org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionNotSeal
 import org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionSealedException;
 import org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionStatusException;
 import org.apache.pulsar.client.api.transaction.TxnID;
-import org.apache.pulsar.transaction.impl.common.TxnStatus;
+import org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus;
 
 /**
  * The in-memory implementation of {@link TransactionBuffer}.
@@ -280,7 +280,7 @@ class InMemTransactionBuffer implements TransactionBuffer {
     }
 
     @Override
-    public CompletableFuture<Void> commitTxn(TxnID txnID, List<PulsarApi.MessageIdData> messageIdDataList) {
+    public CompletableFuture<Void> commitTxn(TxnID txnID, List<MessageIdData> messageIdDataList) {
         CompletableFuture<Void> commitFuture = new CompletableFuture<>();
         try {
             TxnBuffer txnBuffer = getTxnBufferOrThrowNotFoundException(txnID);
@@ -307,7 +307,7 @@ class InMemTransactionBuffer implements TransactionBuffer {
     }
 
     @Override
-    public CompletableFuture<Void> abortTxn(TxnID txnID, List<PulsarApi.MessageIdData> sendMessageIdList) {
+    public CompletableFuture<Void> abortTxn(TxnID txnID, List<MessageIdData> sendMessageIdList) {
         CompletableFuture<Void> abortFuture = new CompletableFuture<>();
 
         try {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java
index 9a21f18..8b91aea 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java
@@ -24,7 +24,7 @@ import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
 import org.apache.pulsar.transaction.coordinator.TransactionSubscription;
 import org.apache.pulsar.transaction.coordinator.TxnMeta;
-import org.apache.pulsar.transaction.impl.common.TxnStatus;
+import org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus;
 import org.junit.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -71,9 +71,9 @@ public class TransactionMetadataStoreServiceTest extends BrokerTestBase {
         transactionMetadataStoreService.addTransactionMetadataStore(TransactionCoordinatorID.get(1));
         transactionMetadataStoreService.addTransactionMetadataStore(TransactionCoordinatorID.get(2));
         Assert.assertEquals(transactionMetadataStoreService.getStores().size(), 3);
-        TxnID txnID0 = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(0)).get();
-        TxnID txnID1 = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(1)).get();
-        TxnID txnID2 = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(2)).get();
+        TxnID txnID0 = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(0), 0).get();
+        TxnID txnID1 = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(1), 0).get();
+        TxnID txnID2 = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(2), 0).get();
         Assert.assertEquals(0, txnID0.getMostSigBits());
         Assert.assertEquals(1, txnID1.getMostSigBits());
         Assert.assertEquals(2, txnID2.getMostSigBits());
@@ -88,7 +88,7 @@ public class TransactionMetadataStoreServiceTest extends BrokerTestBase {
         TransactionMetadataStoreService transactionMetadataStoreService = pulsar.getTransactionMetadataStoreService();
         transactionMetadataStoreService.addTransactionMetadataStore(TransactionCoordinatorID.get(0));
         Assert.assertEquals(transactionMetadataStoreService.getStores().size(), 1);
-        TxnID txnID = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(0)).get();
+        TxnID txnID = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(0), 0).get();
         List<String> partitions = new ArrayList<>();
         partitions.add("ptn-0");
         partitions.add("ptn-1");
@@ -105,7 +105,7 @@ public class TransactionMetadataStoreServiceTest extends BrokerTestBase {
         TransactionMetadataStoreService transactionMetadataStoreService = pulsar.getTransactionMetadataStoreService();
         transactionMetadataStoreService.addTransactionMetadataStore(TransactionCoordinatorID.get(0));
         Assert.assertEquals(transactionMetadataStoreService.getStores().size(), 1);
-        TxnID txnID = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(0)).get();
+        TxnID txnID = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(0), 0).get();
         List<TransactionSubscription> partitions = new ArrayList<>();
         partitions.add(TransactionSubscription.builder().topic("ptn-1").subscription("sub-1").build());
         partitions.add(TransactionSubscription.builder().topic("ptn-2").subscription("sub-1").build());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferTest.java
index 3c8798e..5025479 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferTest.java
@@ -35,7 +35,7 @@ import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionNotFoundException;
 import org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionNotSealedException;
 import org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionStatusException;
-import org.apache.pulsar.transaction.impl.common.TxnStatus;
+import org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.DataProvider;
diff --git a/pulsar-transaction/coordinator/generate_protobuf.sh b/pulsar-transaction/coordinator/generate_protobuf.sh
new file mode 100755
index 0000000..959b824
--- /dev/null
+++ b/pulsar-transaction/coordinator/generate_protobuf.sh
@@ -0,0 +1,23 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+
+PROTOC=${PROTOC:-protoc}
+${PROTOC} --java_out=pulsar-transaction/coordinator/src/main/java pulsar-transaction/coordinator/src/main/proto/PulsarTransactionMetadata.proto
diff --git a/pulsar-transaction/coordinator/generate_protobuf_docker.sh b/pulsar-transaction/coordinator/generate_protobuf_docker.sh
new file mode 100755
index 0000000..10b9ad8e
--- /dev/null
+++ b/pulsar-transaction/coordinator/generate_protobuf_docker.sh
@@ -0,0 +1,42 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Fail script in case of errors
+set -e
+
+ROOT_DIR=$(git rev-parse --show-toplevel)
+COMMON_DIR=$ROOT_DIR/
+cd $COMMON_DIR
+
+BUILD_IMAGE_NAME="${BUILD_IMAGE_NAME:-apachepulsar/pulsar-build}"
+BUILD_IMAGE_VERSION="${BUILD_IMAGE_VERSION:-ubuntu-16.04}"
+
+IMAGE="$BUILD_IMAGE_NAME:$BUILD_IMAGE_VERSION"
+
+echo $IMAGE
+
+# Force to pull image in case it was updated
+docker pull $IMAGE
+
+WORKDIR=/workdir
+docker run -i \
+    -v ${COMMON_DIR}:${WORKDIR} $IMAGE \
+    bash -c "cd ${WORKDIR}; PROTOC=/pulsar/protobuf/src/protoc ./pulsar-transaction/coordinator/generate_protobuf.sh"
+
diff --git a/pulsar-transaction/coordinator/pom.xml b/pulsar-transaction/coordinator/pom.xml
index a6eb2e3f..33553cd 100644
--- a/pulsar-transaction/coordinator/pom.xml
+++ b/pulsar-transaction/coordinator/pom.xml
@@ -37,7 +37,7 @@
 
         <dependency>
             <groupId>${project.groupId}</groupId>
-            <artifactId>pulsar-transaction-common</artifactId>
+            <artifactId>pulsar-common</artifactId>
             <version>${project.version}</version>
         </dependency>
 
@@ -47,6 +47,36 @@
             <version>${project.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>testmocks</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
     </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-checkstyle-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>check-style</id>
+                        <phase>verify</phase>
+                        <configuration>
+                            <configLocation>../../buildtools/src/main/resources/pulsar/checkstyle.xml</configLocation>
+                            <suppressionsLocation>../../buildtools/src/main/resources/pulsar/suppressions.xml</suppressionsLocation>
+                            <encoding>UTF-8</encoding>
+                            <excludes>**/proto/*</excludes>
+                        </configuration>
+                        <goals>
+                            <goal>check</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
 
 </project>
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionLog.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionLog.java
new file mode 100644
index 0000000..fb3b5ca
--- /dev/null
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionLog.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator;
+
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntry;
+
+/**
+ * A log interface for transaction to read and write transaction operation.
+ */
+public interface TransactionLog {
+
+
+    /**
+     * Replay transaction log to load the transaction map.
+     *
+     * @param transactionLogReplayCallback the call back for replaying the transaction log
+     */
+    void replayAsync(TransactionLogReplayCallback transactionLogReplayCallback);
+
+    /**
+     * Close the transaction log.
+     */
+    CompletableFuture<Void> closeAsync();
+
+    /**
+     * Append the transaction operation to the transaction log.
+     *
+     * @param transactionMetadataEntry {@link TransactionMetadataEntry} transaction metadata entry
+     * @return a future represents the result of this operation
+     */
+    CompletableFuture<Position> append(TransactionMetadataEntry transactionMetadataEntry);
+}
\ No newline at end of file
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/exceptions/TransactionStatusException.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionLogReplayCallback.java
similarity index 52%
copy from pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/exceptions/TransactionStatusException.java
copy to pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionLogReplayCallback.java
index c81b17a..a61ae86 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/exceptions/TransactionStatusException.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionLogReplayCallback.java
@@ -16,22 +16,27 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.broker.transaction.buffer.exceptions;
+package org.apache.pulsar.transaction.coordinator;
 
-import org.apache.pulsar.client.api.transaction.TxnID;
-import org.apache.pulsar.transaction.impl.common.TxnStatus;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntry;
 
 /**
- * Exceptions are thrown when operations are applied to a transaction which is not in expected txn status.
+ * The callback of transaction log replay the transaction operate.
  */
-public class TransactionStatusException extends TransactionBufferException {
+public interface TransactionLogReplayCallback {
 
-    private static final long serialVersionUID = 0L;
+    /**
+     * Transaction log replay complete callback for transaction metadata store.
+     */
+    void replayComplete();
 
-    public TransactionStatusException(TxnID txnId,
-                                      TxnStatus expectedStatus,
-                                      TxnStatus actualStatus) {
-        super("Transaction `" + txnId + "` is not in an expected status `" + expectedStatus
-            + "`, but is in status `" + actualStatus + "`");
-    }
-}
+    /**
+     * Handle metadata entry.
+     *
+     * @param position the transaction operation position
+     * @param transactionMetadataEntry the metadata entry of transaction
+     */
+    void handleMetadataEntry(Position position, TransactionMetadataEntry transactionMetadataEntry);
+
+}
\ No newline at end of file
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStore.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStore.java
index 8d4b69f..cc933e9 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStore.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStore.java
@@ -24,7 +24,7 @@ import java.util.List;
 import java.util.concurrent.CompletableFuture;
 
 import org.apache.pulsar.client.api.transaction.TxnID;
-import org.apache.pulsar.transaction.impl.common.TxnStatus;
+import org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus;
 
 /**
  * A store for storing all the transaction metadata.
@@ -55,11 +55,12 @@ public interface TransactionMetadataStore {
     /**
      * Create a new transaction in the transaction metadata store.
      *
+     * @param timeoutInMills the timeout duration of the transaction in mills
      * @return a future represents the result of creating a new transaction.
      *         it returns {@link TxnID} as the identifier for identifying the
      *         transaction.
      */
-    CompletableFuture<TxnID> newTransaction();
+    CompletableFuture<TxnID> newTransaction(long timeoutInMills);
 
     /**
      * Add the produced partitions to transaction identified by <tt>txnid</tt>.
@@ -87,6 +88,7 @@ public interface TransactionMetadataStore {
      * <p>If the current transaction status is not <tt>expectedStatus</tt>, the
      * update will be failed.
      *
+     * @param txnid {@link TxnID} for update txn status
      * @param newStatus the new txn status that the transaction should be updated to
      * @param expectedStatus the expected status that the transaction should be
      * @return a future represents the result of the operation
@@ -95,7 +97,15 @@ public interface TransactionMetadataStore {
         TxnID txnid, TxnStatus newStatus, TxnStatus expectedStatus);
 
     /**
+     * Get the transaction coordinator id.
+     * @return transaction coordinator id
+     */
+    TransactionCoordinatorID getTransactionCoordinatorID();
+
+    /**
      * Close the transaction metadata store.
+     *
+     * @return a future represents the result of this operation
      */
     CompletableFuture<Void> closeAsync();
 
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreState.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreState.java
new file mode 100644
index 0000000..5dbd9ba
--- /dev/null
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreState.java
@@ -0,0 +1,71 @@
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator;
+
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+
+/**
+ * The implement of transaction metadata store state.
+ */
+public abstract class TransactionMetadataStoreState {
+
+    /**
+     * The state of the transactionMetadataStore {@link TransactionMetadataStore}.
+     */
+    public enum State {
+        None,
+        Initializing,
+        Ready,
+        Close
+    }
+
+    private static final AtomicReferenceFieldUpdater<TransactionMetadataStoreState, State> STATE_UPDATER =
+            AtomicReferenceFieldUpdater.newUpdater(TransactionMetadataStoreState.class, State.class, "state");
+
+    @SuppressWarnings("unused")
+    private volatile State state = null;
+
+    public TransactionMetadataStoreState(State state) {
+        STATE_UPDATER.set(this, state);
+
+    }
+
+    protected boolean changeToReadyState() {
+        return (STATE_UPDATER.compareAndSet(this, State.Initializing, State.Ready));
+    }
+
+    protected boolean changeToInitializingState() {
+        return STATE_UPDATER.compareAndSet(this, State.None, State.Initializing);
+    }
+
+    protected boolean changeToCloseState() {
+        return (STATE_UPDATER.compareAndSet(this, State.Ready, State.Close)
+                || STATE_UPDATER.compareAndSet(this, State.None, State.Close)
+                || STATE_UPDATER.compareAndSet(this, State.Initializing, State.Close));
+    }
+
+    protected boolean checkIfReady() {
+        return STATE_UPDATER.get(this) == State.Ready;
+    }
+
+    public State getState() {
+        return STATE_UPDATER.get(this);
+    }
+}
\ No newline at end of file
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TxnMeta.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TxnMeta.java
index cbc8dc9..509da8c 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TxnMeta.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TxnMeta.java
@@ -23,7 +23,7 @@ import java.util.List;
 
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.InvalidTxnStatusException;
-import org.apache.pulsar.transaction.impl.common.TxnStatus;
+import org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus;
 
 /**
  * An interface represents the metadata of a transaction in {@link TransactionMetadataStore}.
@@ -74,11 +74,12 @@ public interface TxnMeta {
     /**
      * Add the list of acked partitions to the transaction.
      *
+     * @param subscriptions the ackd subscriptions add to the transaction
      * @return transaction meta
      * @throws InvalidTxnStatusException if the transaction is not in
      *         {@link TxnStatus#OPEN}
      */
-    TxnMeta addAckedPartitions(List<TransactionSubscription> partitions)
+    TxnMeta addAckedPartitions(List<TransactionSubscription> subscriptions)
         throws InvalidTxnStatusException;
 
     /**
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/exceptions/CoordinatorException.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/exceptions/CoordinatorException.java
index bd66876..ed1b894 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/exceptions/CoordinatorException.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/exceptions/CoordinatorException.java
@@ -20,7 +20,8 @@ package org.apache.pulsar.transaction.coordinator.exceptions;
 
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
-import org.apache.pulsar.transaction.impl.common.TxnStatus;
+import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
+import org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus;
 
 /**
  * The base exception for exceptions thrown from coordinator.
@@ -85,6 +86,10 @@ public abstract class CoordinatorException extends Exception {
             super(message);
         }
 
+        public TransactionNotFoundException(TxnID txnID) {
+            super("The transaction with this txdID `" + txnID + "`not found ");
+        }
+
         public TransactionNotFoundException(String message, Throwable cause) {
             super(message, cause);
         }
@@ -93,4 +98,25 @@ public abstract class CoordinatorException extends Exception {
             super(cause);
         }
     }
+
+    /**
+     * Exception is thrown when a operation of transaction is executed in a error transaction metadata store state.
+     */
+    public static class TransactionMetadataStoreStateException extends CoordinatorException {
+
+        private static final long serialVersionUID = 0L;
+
+        public TransactionMetadataStoreStateException(String message) {
+            super(message);
+        }
+
+        public TransactionMetadataStoreStateException(TransactionCoordinatorID tcID,
+                                                      TransactionMetadataStoreState.State expectedState,
+                                                      TransactionMetadataStoreState.State currentState,
+                                                      String operation) {
+            super("Expect Transaction Coordinator `" + tcID + "` to be in " + expectedState
+                    + " status but it is in " + currentState + " state for " + operation);
+
+        }
+    }
 }
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java
index 15bcd41..c6d6f13 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java
@@ -31,7 +31,7 @@ import org.apache.pulsar.transaction.coordinator.TransactionSubscription;
 import org.apache.pulsar.transaction.coordinator.TxnMeta;
 import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.InvalidTxnStatusException;
 import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.TransactionNotFoundException;
-import org.apache.pulsar.transaction.impl.common.TxnStatus;
+import org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus;
 
 /**
  * An in-memory implementation of {@link TransactionMetadataStore}.
@@ -53,8 +53,7 @@ class InMemTransactionMetadataStore implements TransactionMetadataStore {
         CompletableFuture<TxnMeta> getFuture = new CompletableFuture<>();
         TxnMetaImpl txn = transactions.get(txnid);
         if (null == txn) {
-            getFuture.completeExceptionally(
-                new TransactionNotFoundException("Transaction not found :" + txnid));
+            getFuture.completeExceptionally(new TransactionNotFoundException(txnid));
         } else {
             getFuture.complete(txn);
         }
@@ -62,12 +61,12 @@ class InMemTransactionMetadataStore implements TransactionMetadataStore {
     }
 
     @Override
-    public CompletableFuture<TxnID> newTransaction() {
+    public CompletableFuture<TxnID> newTransaction(long timeoutInMills) {
         TxnID txnID = new TxnID(
             tcID.getId(),
             localID.getAndIncrement()
         );
-        TxnMetaImpl txn = new TxnMetaImpl(txnID);
+        TxnMetaImpl txn = TxnMetaImpl.create(txnID);
         transactions.put(txnID, txn);
         return CompletableFuture.completedFuture(txnID);
     }
@@ -115,6 +114,11 @@ class InMemTransactionMetadataStore implements TransactionMetadataStore {
     }
 
     @Override
+    public TransactionCoordinatorID getTransactionCoordinatorID() {
+        return tcID;
+    }
+
+    @Override
     public CompletableFuture<Void> closeAsync() {
         transactions.clear();
         return CompletableFuture.completedFuture(null);
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
new file mode 100644
index 0000000..761169c
--- /dev/null
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
@@ -0,0 +1,251 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator.impl;
+
+import io.netty.buffer.ByteBuf;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream;
+import org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream;
+import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
+import org.apache.pulsar.transaction.coordinator.TransactionLog;
+import org.apache.pulsar.transaction.coordinator.TransactionLogReplayCallback;
+import org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntry;
+import org.jctools.queues.MessagePassingQueue;
+import org.jctools.queues.SpscArrayQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MLTransactionLogImpl implements TransactionLog {
+
+    private static final Logger log = LoggerFactory.getLogger(MLTransactionLogImpl.class);
+
+    private final ManagedLedger managedLedger;
+
+    private final static String TRANSACTION_LOG_PREFIX = NamespaceName.SYSTEM_NAMESPACE + "/transaction-log-";
+
+    private final ManagedCursor cursor;
+
+    private final static String TRANSACTION_SUBSCRIPTION_NAME = "transaction.subscription";
+
+    private final SpscArrayQueue<Entry> entryQueue;
+
+    //this is for replay
+    private final PositionImpl lastConfirmedEntry;
+
+    private final long tcId;
+
+    private final String topicName;
+
+    public MLTransactionLogImpl(TransactionCoordinatorID tcID,
+                                ManagedLedgerFactory managedLedgerFactory) throws Exception {
+        this.topicName = TRANSACTION_LOG_PREFIX + tcID;
+        this.tcId = tcID.getId();
+        this.managedLedger = managedLedgerFactory.open(topicName);
+        this.cursor =  managedLedger.openCursor(TRANSACTION_SUBSCRIPTION_NAME,
+                CommandSubscribe.InitialPosition.Earliest);
+        this.entryQueue = new SpscArrayQueue<>(2000);
+        this.lastConfirmedEntry = (PositionImpl) managedLedger.getLastConfirmedEntry();
+    }
+
+    @Override
+    public void replayAsync(TransactionLogReplayCallback transactionLogReplayCallback) {
+        new TransactionLogReplayer(transactionLogReplayCallback).start();
+    }
+
+    private void readAsync(int numberOfEntriesToRead,
+                           AsyncCallbacks.ReadEntriesCallback readEntriesCallback) {
+        cursor.asyncReadEntries(numberOfEntriesToRead, readEntriesCallback, System.nanoTime());
+    }
+
+    @Override
+    public CompletableFuture<Void> closeAsync() {
+        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+
+        managedLedger.asyncClose(new AsyncCallbacks.CloseCallback() {
+            @Override
+            public void closeComplete(Object ctx) {
+                log.info("Transaction log with tcId : {} close managedLedger successful!", tcId);
+                completableFuture.complete(null);
+            }
+
+            @Override
+            public void closeFailed(ManagedLedgerException exception, Object ctx) {
+                log.error("Transaction log with tcId : {} close managedLedger fail!", tcId);
+                completableFuture.completeExceptionally(exception);
+            }
+        }, null);
+
+        return completableFuture;
+    }
+
+    @Override
+    public CompletableFuture<Position> append(TransactionMetadataEntry transactionMetadataEntry) {
+        int transactionMetadataEntrySize = transactionMetadataEntry.getSerializedSize();
+        ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(transactionMetadataEntrySize, transactionMetadataEntrySize);
+        ByteBufCodedOutputStream outStream = ByteBufCodedOutputStream.get(buf);
+        CompletableFuture<Position> completableFuture = new CompletableFuture<>();
+        try {
+            transactionMetadataEntry.writeTo(outStream);
+            managedLedger.asyncAddEntry(buf, new AsyncCallbacks.AddEntryCallback() {
+                @Override
+                public void addComplete(Position position, Object ctx) {
+                    buf.release();
+                    completableFuture.complete(position);
+                }
+
+                @Override
+                public void addFailed(ManagedLedgerException exception, Object ctx) {
+                    log.error("Transaction log write transaction operation error", exception);
+                    buf.release();
+                    completableFuture.completeExceptionally(exception);
+                }
+            } , null);
+        } catch (IOException e) {
+            log.error("Transaction log write transaction operation error", e);
+            completableFuture.completeExceptionally(e);
+        } finally {
+            outStream.recycle();
+        }
+        return completableFuture;
+    }
+
+    public CompletableFuture<Void> deletePosition(List<Position> positions) {
+        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+        this.cursor.asyncDelete(positions, new AsyncCallbacks.DeleteCallback() {
+            @Override
+            public void deleteComplete(Object position) {
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}][{}] Deleted message at {}", topicName,
+                            TRANSACTION_SUBSCRIPTION_NAME, position);
+                }
+                completableFuture.complete(null);
+            }
+
+            @Override
+            public void deleteFailed(ManagedLedgerException exception, Object ctx) {
+                log.warn("[{}][{}] Failed to delete message at {}", topicName,
+                        TRANSACTION_SUBSCRIPTION_NAME, ctx, exception);
+                completableFuture.completeExceptionally(exception);
+            }
+        }, null);
+        return completableFuture;
+    }
+
+    class TransactionLogReplayer {
+
+        private FillEntryQueueCallback fillEntryQueueCallback;
+        private long currentLoadEntryId;
+        private TransactionLogReplayCallback transactionLogReplayCallback;
+
+        TransactionLogReplayer(TransactionLogReplayCallback transactionLogReplayCallback) {
+            this.fillEntryQueueCallback = new FillEntryQueueCallback();
+            this.transactionLogReplayCallback = transactionLogReplayCallback;
+        }
+
+        public void start() {
+            if (((PositionImpl) cursor.getMarkDeletedPosition()).compareTo(lastConfirmedEntry) == 0) {
+                this.transactionLogReplayCallback.replayComplete();
+                return;
+            }
+            while (currentLoadEntryId < lastConfirmedEntry.getEntryId()) {
+                fillEntryQueueCallback.fillQueue();
+                Entry entry = entryQueue.poll();
+                if (entry != null) {
+                    ByteBuf buffer = entry.getDataBuffer();
+                    currentLoadEntryId = entry.getEntryId();
+                    ByteBufCodedInputStream stream = ByteBufCodedInputStream.get(buffer);
+                    TransactionMetadataEntry.Builder transactionMetadataEntryBuilder =
+                            TransactionMetadataEntry.newBuilder();
+                    TransactionMetadataEntry transactionMetadataEntry;
+                    try {
+                        transactionMetadataEntry =
+                                transactionMetadataEntryBuilder.mergeFrom(stream, null).build();
+                    } catch (IOException e) {
+                        log.error(e.getMessage(), e);
+                        throw new RuntimeException("TransactionLog convert entry error : ", e);
+                    }
+                    transactionLogReplayCallback.handleMetadataEntry(entry.getPosition(), transactionMetadataEntry);
+                    entry.release();
+                    transactionMetadataEntry.recycle();
+                    transactionMetadataEntryBuilder.recycle();
+                    stream.recycle();
+                } else {
+                    try {
+                        Thread.sleep(1);
+                    } catch (InterruptedException e) {
+                        //no-op
+                    }
+                }
+            }
+            transactionLogReplayCallback.replayComplete();
+        }
+    }
+
+    class FillEntryQueueCallback implements AsyncCallbacks.ReadEntriesCallback {
+
+        private AtomicLong outstandingReadsRequests = new AtomicLong(0);
+
+        void fillQueue() {
+            if (entryQueue.size() < entryQueue.capacity() && outstandingReadsRequests.get() == 0) {
+                if (cursor.hasMoreEntries()) {
+                    outstandingReadsRequests.incrementAndGet();
+                    readAsync(100, this);
+                }
+            }
+        }
+
+        @Override
+        public void readEntriesComplete(List<Entry> entries, Object ctx) {
+            entryQueue.fill(new MessagePassingQueue.Supplier<Entry>() {
+                private int i = 0;
+                @Override
+                public Entry get() {
+                    Entry entry = entries.get(i);
+                    i++;
+                    return entry;
+                }
+            }, entries.size());
+
+            outstandingReadsRequests.decrementAndGet();
+        }
+
+        @Override
+        public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
+            log.error("Transaction log init fail error!", exception);
+            outstandingReadsRequests.decrementAndGet();
+        }
+
+    }
+}
\ No newline at end of file
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
new file mode 100644
index 0000000..c2e519b
--- /dev/null
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
@@ -0,0 +1,375 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator.impl;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.commons.lang3.tuple.MutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.common.api.proto.PulsarApi.Subscription;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
+import org.apache.pulsar.transaction.coordinator.TransactionLogReplayCallback;
+import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
+import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
+import org.apache.pulsar.transaction.coordinator.TransactionSubscription;
+import org.apache.pulsar.transaction.coordinator.TxnMeta;
+import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException;
+import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.InvalidTxnStatusException;
+import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.TransactionNotFoundException;
+import org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntry;
+import org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntry.TransactionMetadataOp;
+import org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The provider that offers managed ledger implementation of {@link TransactionMetadataStore}.
+ */
+public class MLTransactionMetadataStore
+        extends TransactionMetadataStoreState implements TransactionMetadataStore {
+
+    private static final Logger log = LoggerFactory.getLogger(MLTransactionMetadataStore.class);
+
+    private final TransactionCoordinatorID tcID;
+    private final AtomicLong sequenceId = new AtomicLong(TC_ID_NOT_USED);
+    private final MLTransactionLogImpl transactionLog;
+    private static final long TC_ID_NOT_USED = -1L;
+    private final ConcurrentMap<TxnID, Pair<TxnMeta, List<Position>>> txnMetaMap = new ConcurrentHashMap<>();
+
+    public MLTransactionMetadataStore(TransactionCoordinatorID tcID,
+                                      MLTransactionLogImpl mlTransactionLog) {
+        super(State.None);
+        this.tcID = tcID;
+        this.transactionLog = mlTransactionLog;
+
+        if (!changeToInitializingState()) {
+            log.error("Managed ledger transaction metadata store change state error when init it");
+            return;
+        }
+        new Thread(() -> transactionLog.replayAsync(new TransactionLogReplayCallback() {
+
+            @Override
+            public void replayComplete() {
+                if (!changeToReadyState()) {
+                    log.error("Managed ledger transaction metadata store change state error when replay complete");
+                }
+            }
+
+            @Override
+            public void handleMetadataEntry(Position position, TransactionMetadataEntry transactionMetadataEntry) {
+
+                try {
+
+                    TxnID txnID = new TxnID(transactionMetadataEntry.getTxnidMostBits(),
+                            transactionMetadataEntry.getTxnidLeastBits());
+                    switch (transactionMetadataEntry.getMetadataOp()) {
+                        case NEW:
+                            if (sequenceId.get() < transactionMetadataEntry.getTxnidLeastBits()) {
+                                sequenceId.set(transactionMetadataEntry.getTxnidLeastBits());
+                            }
+                            if (txnMetaMap.containsKey(txnID)) {
+                                txnMetaMap.get(txnID).getRight().add(position);
+                            } else {
+                                List<Position> positions = new ArrayList<>();
+                                positions.add(position);
+                                txnMetaMap.put(txnID, MutablePair.of(TxnMetaImpl.create(txnID), positions));
+                            }
+                            break;
+                        case ADD_PARTITION:
+                            if (!txnMetaMap.containsKey(txnID)) {
+                                transactionLog.deletePosition(Collections.singletonList(position));
+                            } else {
+                                txnMetaMap.get(txnID).getLeft()
+                                        .addProducedPartitions(transactionMetadataEntry.getPartitionsList());
+                                txnMetaMap.get(txnID).getRight().add(position);
+                            }
+                            break;
+                        case ADD_SUBSCRIPTION:
+                            if (!txnMetaMap.containsKey(txnID)) {
+                                transactionLog.deletePosition(Collections.singletonList(position));
+                            } else {
+                                txnMetaMap.get(txnID).getLeft()
+                                        .addAckedPartitions(subscriptionToTxnSubscription(
+                                                transactionMetadataEntry.getSubscriptionsList()));
+                                txnMetaMap.get(txnID).getRight().add(position);
+                            }
+                            break;
+                        case UPDATE:
+                            if (!txnMetaMap.containsKey(txnID)) {
+                                transactionLog.deletePosition(Collections.singletonList(position));
+                            } else {
+                                TxnStatus newStatus = transactionMetadataEntry.getNewStatus();
+                                if (newStatus == TxnStatus.COMMITTED || newStatus == TxnStatus.ABORTED) {
+                                    transactionLog.deletePosition(txnMetaMap.get(txnID).getRight()).thenAccept(v -> {
+                                        TxnMeta txnMeta = txnMetaMap.remove(txnID).getLeft();
+                                        ((TxnMetaImpl) txnMeta).recycle();
+                                    });
+                                } else {
+                                    txnMetaMap.get(txnID).getLeft()
+                                            .updateTxnStatus(transactionMetadataEntry.getNewStatus(),
+                                                    transactionMetadataEntry.getExpectedStatus());
+                                }
+                                txnMetaMap.get(txnID).getRight().add(position);
+                            }
+                            break;
+                        default:
+                            throw new InvalidTxnStatusException("Transaction `"
+                                    + txnID + "` load replay metadata operation "
+                                    + "from transaction log with unknown operation");
+                    }
+                } catch (InvalidTxnStatusException  e) {
+                    log.error(e.getMessage(), e);
+                }
+            }
+        })).start();
+    }
+
+    @Override
+    public CompletableFuture<TxnStatus> getTxnStatus(TxnID txnID) {
+        return CompletableFuture.completedFuture(txnMetaMap.get(txnID).getLeft().status());
+    }
+
+    @Override
+    public CompletableFuture<TxnMeta> getTxnMeta(TxnID txnID) {
+        Pair<TxnMeta, List<Position>> txnMetaListPair = txnMetaMap.get(txnID);
+        CompletableFuture<TxnMeta> completableFuture = new CompletableFuture<>();
+        if (txnMetaListPair == null) {
+            completableFuture.completeExceptionally(new TransactionNotFoundException(txnID));
+        } else {
+            completableFuture.complete(txnMetaListPair.getLeft());
+        }
+        return completableFuture;
+    }
+
+    @Override
+    public CompletableFuture<TxnID> newTransaction(long timeOut) {
+        if (!checkIfReady()) {
+            return FutureUtil.failedFuture(
+                    new CoordinatorException
+                            .TransactionMetadataStoreStateException(tcID, State.Ready, getState(), "new Transaction"));
+        }
+
+        long mostSigBits = tcID.getId();
+        long leastSigBits = sequenceId.incrementAndGet();
+        TxnID txnID = new TxnID(mostSigBits, leastSigBits);
+        long currentTimeMillis = System.currentTimeMillis();
+        TransactionMetadataEntry transactionMetadataEntry = TransactionMetadataEntry
+                .newBuilder()
+                .setTxnidMostBits(mostSigBits)
+                .setTxnidLeastBits(leastSigBits)
+                .setStartTime(currentTimeMillis)
+                .setTimeoutMs(timeOut)
+                .setMetadataOp(TransactionMetadataEntry.TransactionMetadataOp.NEW)
+                .setLastModificationTime(currentTimeMillis)
+                .build();
+        return transactionLog.append(transactionMetadataEntry)
+                .thenCompose(position -> {
+                    TxnMeta txn = TxnMetaImpl.create(txnID);
+                    List<Position> positions = new ArrayList<>();
+                    positions.add(position);
+                    Pair<TxnMeta, List<Position>> pair = MutablePair.of(txn, positions);
+                    txnMetaMap.put(txnID, pair);
+                    transactionMetadataEntry.recycle();
+                    return CompletableFuture.completedFuture(txnID);
+                });
+    }
+
+    @Override
+    public CompletableFuture<Void> addProducedPartitionToTxn(TxnID txnID, List<String> partitions) {
+        if (!checkIfReady()) {
+            return FutureUtil.failedFuture(
+                    new CoordinatorException.TransactionMetadataStoreStateException(tcID,
+                            State.Ready, getState(), "add produced partition"));
+        }
+        return getTxnPositionPair(txnID).thenCompose(txnMetaListPair -> {
+            TransactionMetadataEntry transactionMetadataEntry = TransactionMetadataEntry
+                    .newBuilder()
+                    .setTxnidMostBits(txnID.getMostSigBits())
+                    .setTxnidLeastBits(txnID.getLeastSigBits())
+                    .setMetadataOp(TransactionMetadataOp.ADD_PARTITION)
+                    .addAllPartitions(partitions)
+                    .setLastModificationTime(System.currentTimeMillis())
+                    .build();
+
+            return transactionLog.append(transactionMetadataEntry)
+                    .thenCompose(position -> {
+                        try {
+                            txnMetaListPair.getLeft().addProducedPartitions(partitions);
+                            txnMetaMap.get(txnID).getRight().add(position);
+                            return CompletableFuture.completedFuture(null);
+                        } catch (InvalidTxnStatusException e) {
+                            txnMetaMap.get(txnID).getRight().add(position);
+                            log.error("TxnID : " + txnMetaListPair.getLeft().id().toString()
+                                    + " add produced partition error with TxnStatus : "
+                                    + txnMetaListPair.getLeft().status().name(), e);
+                            return FutureUtil.failedFuture(e);
+                        } finally {
+                            transactionMetadataEntry.recycle();
+                        }
+                    });
+        });
+    }
+
+    @Override
+    public CompletableFuture<Void> addAckedPartitionToTxn(TxnID txnID,
+                                                          List<TransactionSubscription> txnSubscriptions) {
+        if (!checkIfReady()) {
+            return FutureUtil.failedFuture(
+                    new CoordinatorException.TransactionMetadataStoreStateException(tcID,
+                            State.Ready, getState(), "add acked partition"));
+        }
+        return getTxnPositionPair(txnID).thenCompose(txnMetaListPair -> {
+            TransactionMetadataEntry transactionMetadataEntry = TransactionMetadataEntry
+                    .newBuilder()
+                    .setTxnidMostBits(txnID.getMostSigBits())
+                    .setTxnidLeastBits(txnID.getLeastSigBits())
+                    .setMetadataOp(TransactionMetadataOp.ADD_SUBSCRIPTION)
+                    .addAllSubscriptions(txnSubscriptionToSubscription(txnSubscriptions))
+                    .setLastModificationTime(System.currentTimeMillis())
+                    .build();
+
+            return transactionLog.append(transactionMetadataEntry)
+                    .thenCompose(position -> {
+                        try {
+                            txnMetaListPair.getLeft().addAckedPartitions(txnSubscriptions);
+                            txnMetaMap.get(txnID).getRight().add(position);
+                            return CompletableFuture.completedFuture(null);
+                        } catch (InvalidTxnStatusException e) {
+                            txnMetaMap.get(txnID).getRight().add(position);
+                            log.error("TxnID : " + txnMetaListPair.getLeft().id().toString()
+                                    + " add acked subscription error with TxnStatus : "
+                                    + txnMetaListPair.getLeft().status().name(), e);
+                            return FutureUtil.failedFuture(e);
+                        } finally {
+                            transactionMetadataEntry.recycle();
+                        }
+                    });
+        });
+    }
+
+    @Override
+    public CompletableFuture<Void> updateTxnStatus(TxnID txnID, TxnStatus newStatus, TxnStatus expectedStatus) {
+        if (!checkIfReady()) {
+            return FutureUtil.failedFuture(
+                    new CoordinatorException.TransactionMetadataStoreStateException(tcID,
+                            State.Ready, getState(), "update transaction status"));
+        }
+        return getTxnPositionPair(txnID).thenCompose(txnMetaListPair -> {
+
+            TransactionMetadataEntry transactionMetadataEntry = TransactionMetadataEntry
+                    .newBuilder()
+                    .setTxnidMostBits(txnID.getMostSigBits())
+                    .setTxnidLeastBits(txnID.getLeastSigBits())
+                    .setExpectedStatus(expectedStatus)
+                    .setMetadataOp(TransactionMetadataOp.UPDATE)
+                    .setLastModificationTime(System.currentTimeMillis())
+                    .setNewStatus(newStatus)
+                    .build();
+
+            return transactionLog.append(transactionMetadataEntry).thenCompose(position -> {
+                try {
+                    txnMetaListPair.getLeft().updateTxnStatus(newStatus, expectedStatus);
+                    txnMetaListPair.getRight().add(position);
+                    if (newStatus == TxnStatus.COMMITTED || newStatus == TxnStatus.ABORTED) {
+                        return transactionLog.deletePosition(txnMetaListPair.getRight()).thenCompose(v -> {
+                            txnMetaMap.remove(txnID);
+                            ((TxnMetaImpl) txnMetaListPair.getLeft()).recycle();
+                            return CompletableFuture.completedFuture(null);
+                        });
+                    }
+                    return CompletableFuture.completedFuture(null);
+                } catch (InvalidTxnStatusException e) {
+                    txnMetaListPair.getRight().add(position);
+                    log.error("TxnID : " + txnMetaListPair.getLeft().id().toString()
+                            + " add update txn status error with TxnStatus : "
+                            + txnMetaListPair.getLeft().status().name(), e);
+                    return FutureUtil.failedFuture(e);
+                } finally {
+                    transactionMetadataEntry.recycle();
+                }
+            });
+        });
+    }
+
+    @Override
+    public TransactionCoordinatorID getTransactionCoordinatorID() {
+        return tcID;
+    }
+
+    private CompletableFuture<Pair<TxnMeta, List<Position>>> getTxnPositionPair(TxnID txnID) {
+        CompletableFuture<Pair<TxnMeta, List<Position>>> completableFuture = new CompletableFuture<>();
+        Pair<TxnMeta, List<Position>> txnMetaListPair = txnMetaMap.get(txnID);
+        if (txnMetaListPair == null) {
+            completableFuture.completeExceptionally(new TransactionNotFoundException(txnID));
+        } else {
+            completableFuture.complete(txnMetaListPair);
+        }
+        return completableFuture;
+    }
+
+    @Override
+    public CompletableFuture<Void> closeAsync() {
+        return transactionLog.closeAsync().thenCompose(v -> {
+            txnMetaMap.clear();
+            if (!this.changeToCloseState()) {
+                return FutureUtil.failedFuture(
+                        new IllegalStateException("Managed ledger transaction metadata store state to close error!"));
+            }
+            return CompletableFuture.completedFuture(null);
+        });
+    }
+
+    public static List<Subscription> txnSubscriptionToSubscription(List<TransactionSubscription> tnxSubscriptions) {
+        List<Subscription> subscriptions = new ArrayList<>(tnxSubscriptions.size());
+        for (TransactionSubscription transactionSubscription : tnxSubscriptions) {
+            Subscription.Builder subscriptionBuilder = Subscription.newBuilder();
+            Subscription subscription = subscriptionBuilder
+                    .setSubscription(transactionSubscription.getSubscription())
+                    .setTopic(transactionSubscription.getTopic()).build();
+            subscriptions.add(subscription);
+            subscriptionBuilder.recycle();
+        }
+        return subscriptions;
+    }
+
+    public static List<TransactionSubscription> subscriptionToTxnSubscription(
+            List<Subscription> subscriptions) {
+        List<TransactionSubscription> transactionSubscriptions = new ArrayList<>(subscriptions.size());
+        for (Subscription subscription : subscriptions) {
+            TransactionSubscription.TransactionSubscriptionBuilder transactionSubscriptionBuilder  =
+                    TransactionSubscription.builder();
+            transactionSubscriptionBuilder.subscription(subscription.getSubscription());
+            transactionSubscriptionBuilder.topic(subscription.getTopic());
+            transactionSubscriptions
+                    .add(transactionSubscriptionBuilder.build());
+            subscription.recycle();
+        }
+        return transactionSubscriptions;
+    }
+}
\ No newline at end of file
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java
new file mode 100644
index 0000000..5e1061d
--- /dev/null
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator.impl;
+
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
+import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
+import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The provider that offers managed ledger implementation of {@link TransactionMetadataStore}.
+ */
+public class MLTransactionMetadataStoreProvider implements TransactionMetadataStoreProvider {
+
+    private static final Logger log = LoggerFactory.getLogger(MLTransactionMetadataStoreProvider.class);
+
+    @Override
+    public CompletableFuture<TransactionMetadataStore>
+    openStore(TransactionCoordinatorID transactionCoordinatorId, ManagedLedgerFactory managedLedgerFactory) {
+        TransactionMetadataStore transactionMetadataStore;
+        try {
+            transactionMetadataStore =
+                    new MLTransactionMetadataStore(transactionCoordinatorId,
+                            new MLTransactionLogImpl(transactionCoordinatorId, managedLedgerFactory));
+        } catch (Exception e) {
+            log.error("MLTransactionMetadataStore init fail", e);
+            return FutureUtil.failedFuture(e);
+        }
+        return CompletableFuture.completedFuture(transactionMetadataStore);
+    }
+}
\ No newline at end of file
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnMetaImpl.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnMetaImpl.java
index 0b1ae74..6d695c6 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnMetaImpl.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnMetaImpl.java
@@ -18,6 +18,9 @@
  */
 package org.apache.pulsar.transaction.coordinator.impl;
 
+import io.netty.util.Recycler;
+import io.netty.util.Recycler.Handle;
+
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
@@ -28,7 +31,8 @@ import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.transaction.coordinator.TransactionSubscription;
 import org.apache.pulsar.transaction.coordinator.TxnMeta;
 import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.InvalidTxnStatusException;
-import org.apache.pulsar.transaction.impl.common.TxnStatus;
+import org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus;
+import org.apache.pulsar.transaction.coordinator.util.TransactionUtil;
 
 /**
  * A class represents the metadata of a transaction stored in
@@ -36,14 +40,38 @@ import org.apache.pulsar.transaction.impl.common.TxnStatus;
  */
 class TxnMetaImpl implements TxnMeta {
 
-    private final TxnID txnID;
+    private TxnID txnID;
     private final Set<String> producedPartitions = new HashSet<>();
     private final Set<TransactionSubscription> ackedPartitions = new HashSet<>();
-    private TxnStatus txnStatus;
+    private volatile TxnStatus txnStatus = TxnStatus.OPEN;
+    private final Handle<TxnMetaImpl> recycleHandle;
+
+    private static final Recycler<TxnMetaImpl> RECYCLER = new Recycler<TxnMetaImpl>() {
+        protected TxnMetaImpl newObject(Recycler.Handle<TxnMetaImpl> handle) {
+            return new TxnMetaImpl(handle);
+        }
+    };
+
+    TxnMetaImpl(Handle<TxnMetaImpl> handle) {
+        this.recycleHandle = handle;
+    }
 
-    TxnMetaImpl(TxnID txnID) {
-        this.txnID = txnID;
+    // Constructor for transaction metadata
+    static TxnMetaImpl create(TxnID txnID) {
+        @SuppressWarnings("unchecked")
+        TxnMetaImpl txnMeta = RECYCLER.get();
+        txnMeta.txnID = txnID;
+        return txnMeta;
+    }
+
+    public void recycle() {
+        this.producedPartitions.clear();
+        this.ackedPartitions.clear();
         this.txnStatus = TxnStatus.OPEN;
+
+        if (recycleHandle != null) {
+            recycleHandle.recycle(this);
+        }
     }
 
     @Override
@@ -140,7 +168,7 @@ class TxnMetaImpl implements TxnMeta {
                                                     TxnStatus expectedStatus)
         throws InvalidTxnStatusException {
         checkTxnStatus(expectedStatus);
-        if (!txnStatus.canTransitionTo(newStatus)) {
+        if (!TransactionUtil.canTransitionTo(txnStatus, newStatus)) {
             throw new InvalidTxnStatusException(
                 "Transaction `" + txnID + "` CANNOT transaction from status " + txnStatus + " to " + newStatus);
         }
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/proto/PulsarTransactionMetadata.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/proto/PulsarTransactionMetadata.java
new file mode 100644
index 0000000..422674b
--- /dev/null
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/proto/PulsarTransactionMetadata.java
@@ -0,0 +1,1123 @@
+// Generated by the protocol buffer compiler.  DO NOT EDIT!
+// source: pulsar-transaction/coordinator/src/main/proto/PulsarTransactionMetadata.proto
+
+package org.apache.pulsar.transaction.coordinator.proto;
+
+public final class PulsarTransactionMetadata {
+  private PulsarTransactionMetadata() {}
+  public static void registerAllExtensions(
+      org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite registry) {
+  }
+  public enum TxnStatus
+      implements org.apache.pulsar.shaded.com.google.protobuf.v241.Internal.EnumLite {
+    OPEN(0, 0),
+    COMMITTING(1, 1),
+    COMMITTED(2, 2),
+    ABORTING(3, 3),
+    ABORTED(4, 4),
+    ;
+    
+    public static final int OPEN_VALUE = 0;
+    public static final int COMMITTING_VALUE = 1;
+    public static final int COMMITTED_VALUE = 2;
+    public static final int ABORTING_VALUE = 3;
+    public static final int ABORTED_VALUE = 4;
+    
+    
+    public final int getNumber() { return value; }
+    
+    public static TxnStatus valueOf(int value) {
+      switch (value) {
+        case 0: return OPEN;
+        case 1: return COMMITTING;
+        case 2: return COMMITTED;
+        case 3: return ABORTING;
+        case 4: return ABORTED;
+        default: return null;
+      }
+    }
+    
+    public static org.apache.pulsar.shaded.com.google.protobuf.v241.Internal.EnumLiteMap<TxnStatus>
+        internalGetValueMap() {
+      return internalValueMap;
+    }
+    private static org.apache.pulsar.shaded.com.google.protobuf.v241.Internal.EnumLiteMap<TxnStatus>
+        internalValueMap =
+          new org.apache.pulsar.shaded.com.google.protobuf.v241.Internal.EnumLiteMap<TxnStatus>() {
+            public TxnStatus findValueByNumber(int number) {
+              return TxnStatus.valueOf(number);
+            }
+          };
+    
+    private final int value;
+    
+    private TxnStatus(int index, int value) {
+      this.value = value;
+    }
+    
+    // @@protoc_insertion_point(enum_scope:pulsar.proto.TxnStatus)
+  }
+  
+  public interface TransactionMetadataEntryOrBuilder
+      extends org.apache.pulsar.shaded.com.google.protobuf.v241.MessageLiteOrBuilder {
+    
+    // optional .pulsar.proto.TransactionMetadataEntry.TransactionMetadataOp metadata_op = 1;
+    boolean hasMetadataOp();
+    org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntry.TransactionMetadataOp getMetadataOp();
+    
+    // optional uint64 txnid_least_bits = 2 [default = 0];
+    boolean hasTxnidLeastBits();
+    long getTxnidLeastBits();
+    
+    // optional uint64 txnid_most_bits = 3 [default = 0];
+    boolean hasTxnidMostBits();
+    long getTxnidMostBits();
+    
+    // optional .pulsar.proto.TxnStatus expected_status = 4;
+    boolean hasExpectedStatus();
+    org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus getExpectedStatus();
+    
+    // optional .pulsar.proto.TxnStatus new_status = 5;
+    boolean hasNewStatus();
+    org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus getNewStatus();
+    
+    // repeated string partitions = 6;
+    java.util.List<String> getPartitionsList();
+    int getPartitionsCount();
+    String getPartitions(int index);
+    
+    // repeated .pulsar.proto.Subscription subscriptions = 7;
+    java.util.List<org.apache.pulsar.common.api.proto.PulsarApi.Subscription> 
+        getSubscriptionsList();
+    org.apache.pulsar.common.api.proto.PulsarApi.Subscription getSubscriptions(int index);
+    int getSubscriptionsCount();
+    
+    // optional uint64 timeout_ms = 8;
+    boolean hasTimeoutMs();
+    long getTimeoutMs();
+    
+    // optional uint64 start_time = 9;
+    boolean hasStartTime();
+    long getStartTime();
+    
+    // optional uint64 last_modification_time = 10;
+    boolean hasLastModificationTime();
+    long getLastModificationTime();
+  }
+  public static final class TransactionMetadataEntry extends
+      org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite
+      implements TransactionMetadataEntryOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage  {
+    // Use TransactionMetadataEntry.newBuilder() to construct.
+    private io.netty.util.Recycler.Handle handle;
+    private TransactionMetadataEntry(io.netty.util.Recycler.Handle handle) {
+      this.handle = handle;
+    }
+    
+     private static final io.netty.util.Recycler<TransactionMetadataEntry> RECYCLER = new io.netty.util.Recycler<TransactionMetadataEntry>() {
+            protected TransactionMetadataEntry newObject(Handle handle) {
+              return new TransactionMetadataEntry(handle);
+            }
+          };
+        
+        public void recycle() {
+            this.initFields();
+            this.memoizedIsInitialized = -1;
+            this.bitField0_ = 0;
+            this.memoizedSerializedSize = -1;
+            if (handle != null) { RECYCLER.recycle(this, handle); }
+        }
+         
+    private TransactionMetadataEntry(boolean noInit) {}
+    
+    private static final TransactionMetadataEntry defaultInstance;
+    public static TransactionMetadataEntry getDefaultInstance() {
+      return defaultInstance;
+    }
+    
+    public TransactionMetadataEntry getDefaultInstanceForType() {
+      return defaultInstance;
+    }
+    
+    public enum TransactionMetadataOp
+        implements org.apache.pulsar.shaded.com.google.protobuf.v241.Internal.EnumLite {
+      NEW(0, 0),
+      ADD_PARTITION(1, 1),
+      ADD_SUBSCRIPTION(2, 2),
+      UPDATE(3, 3),
+      ;
+      
+      public static final int NEW_VALUE = 0;
+      public static final int ADD_PARTITION_VALUE = 1;
+      public static final int ADD_SUBSCRIPTION_VALUE = 2;
+      public static final int UPDATE_VALUE = 3;
+      
+      
+      public final int getNumber() { return value; }
+      
+      public static TransactionMetadataOp valueOf(int value) {
+        switch (value) {
+          case 0: return NEW;
+          case 1: return ADD_PARTITION;
+          case 2: return ADD_SUBSCRIPTION;
+          case 3: return UPDATE;
+          default: return null;
+        }
+      }
+      
+      public static org.apache.pulsar.shaded.com.google.protobuf.v241.Internal.EnumLiteMap<TransactionMetadataOp>
+          internalGetValueMap() {
+        return internalValueMap;
+      }
+      private static org.apache.pulsar.shaded.com.google.protobuf.v241.Internal.EnumLiteMap<TransactionMetadataOp>
+          internalValueMap =
+            new org.apache.pulsar.shaded.com.google.protobuf.v241.Internal.EnumLiteMap<TransactionMetadataOp>() {
+              public TransactionMetadataOp findValueByNumber(int number) {
+                return TransactionMetadataOp.valueOf(number);
+              }
+            };
+      
+      private final int value;
+      
+      private TransactionMetadataOp(int index, int value) {
+        this.value = value;
+      }
+      
+      // @@protoc_insertion_point(enum_scope:pulsar.proto.TransactionMetadataEntry.TransactionMetadataOp)
+    }
+    
+    private int bitField0_;
+    // optional .pulsar.proto.TransactionMetadataEntry.TransactionMetadataOp metadata_op = 1;
+    public static final int METADATA_OP_FIELD_NUMBER = 1;
+    private org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntry.TransactionMetadataOp metadataOp_;
+    public boolean hasMetadataOp() {
+      return ((bitField0_ & 0x00000001) == 0x00000001);
+    }
+    public org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntry.TransactionMetadataOp getMetadataOp() {
+      return metadataOp_;
+    }
+    
+    // optional uint64 txnid_least_bits = 2 [default = 0];
+    public static final int TXNID_LEAST_BITS_FIELD_NUMBER = 2;
+    private long txnidLeastBits_;
+    public boolean hasTxnidLeastBits() {
+      return ((bitField0_ & 0x00000002) == 0x00000002);
+    }
+    public long getTxnidLeastBits() {
+      return txnidLeastBits_;
+    }
+    
+    // optional uint64 txnid_most_bits = 3 [default = 0];
+    public static final int TXNID_MOST_BITS_FIELD_NUMBER = 3;
+    private long txnidMostBits_;
+    public boolean hasTxnidMostBits() {
+      return ((bitField0_ & 0x00000004) == 0x00000004);
+    }
+    public long getTxnidMostBits() {
+      return txnidMostBits_;
+    }
+    
+    // optional .pulsar.proto.TxnStatus expected_status = 4;
+    public static final int EXPECTED_STATUS_FIELD_NUMBER = 4;
+    private org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus expectedStatus_;
+    public boolean hasExpectedStatus() {
+      return ((bitField0_ & 0x00000008) == 0x00000008);
+    }
+    public org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus getExpectedStatus() {
+      return expectedStatus_;
+    }
+    
+    // optional .pulsar.proto.TxnStatus new_status = 5;
+    public static final int NEW_STATUS_FIELD_NUMBER = 5;
+    private org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus newStatus_;
+    public boolean hasNewStatus() {
+      return ((bitField0_ & 0x00000010) == 0x00000010);
+    }
+    public org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus getNewStatus() {
+      return newStatus_;
+    }
+    
+    // repeated string partitions = 6;
+    public static final int PARTITIONS_FIELD_NUMBER = 6;
+    private org.apache.pulsar.shaded.com.google.protobuf.v241.LazyStringList partitions_;
+    public java.util.List<String>
+        getPartitionsList() {
+      return partitions_;
+    }
+    public int getPartitionsCount() {
+      return partitions_.size();
+    }
+    public String getPartitions(int index) {
+      return partitions_.get(index);
+    }
+    
+    // repeated .pulsar.proto.Subscription subscriptions = 7;
+    public static final int SUBSCRIPTIONS_FIELD_NUMBER = 7;
+    private java.util.List<org.apache.pulsar.common.api.proto.PulsarApi.Subscription> subscriptions_;
+    public java.util.List<org.apache.pulsar.common.api.proto.PulsarApi.Subscription> getSubscriptionsList() {
+      return subscriptions_;
+    }
+    public java.util.List<? extends org.apache.pulsar.common.api.proto.PulsarApi.SubscriptionOrBuilder> 
+        getSubscriptionsOrBuilderList() {
+      return subscriptions_;
+    }
+    public int getSubscriptionsCount() {
+      return subscriptions_.size();
+    }
+    public org.apache.pulsar.common.api.proto.PulsarApi.Subscription getSubscriptions(int index) {
+      return subscriptions_.get(index);
+    }
+    public org.apache.pulsar.common.api.proto.PulsarApi.SubscriptionOrBuilder getSubscriptionsOrBuilder(
+        int index) {
+      return subscriptions_.get(index);
+    }
+    
+    // optional uint64 timeout_ms = 8;
+    public static final int TIMEOUT_MS_FIELD_NUMBER = 8;
+    private long timeoutMs_;
+    public boolean hasTimeoutMs() {
+      return ((bitField0_ & 0x00000020) == 0x00000020);
+    }
+    public long getTimeoutMs() {
+      return timeoutMs_;
+    }
+    
+    // optional uint64 start_time = 9;
+    public static final int START_TIME_FIELD_NUMBER = 9;
+    private long startTime_;
+    public boolean hasStartTime() {
+      return ((bitField0_ & 0x00000040) == 0x00000040);
+    }
+    public long getStartTime() {
+      return startTime_;
+    }
+    
+    // optional uint64 last_modification_time = 10;
+    public static final int LAST_MODIFICATION_TIME_FIELD_NUMBER = 10;
+    private long lastModificationTime_;
+    public boolean hasLastModificationTime() {
+      return ((bitField0_ & 0x00000080) == 0x00000080);
+    }
+    public long getLastModificationTime() {
+      return lastModificationTime_;
+    }
+    
+    private void initFields() {
+      metadataOp_ = org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntry.TransactionMetadataOp.NEW;
+      txnidLeastBits_ = 0L;
+      txnidMostBits_ = 0L;
+      expectedStatus_ = org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus.OPEN;
+      newStatus_ = org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus.OPEN;
+      partitions_ = org.apache.pulsar.shaded.com.google.protobuf.v241.LazyStringArrayList.EMPTY;
+      subscriptions_ = java.util.Collections.emptyList();
+      timeoutMs_ = 0L;
+      startTime_ = 0L;
+      lastModificationTime_ = 0L;
+    }
+    private byte memoizedIsInitialized = -1;
+    public final boolean isInitialized() {
+      byte isInitialized = memoizedIsInitialized;
+      if (isInitialized != -1) return isInitialized == 1;
+      
+      for (int i = 0; i < getSubscriptionsCount(); i++) {
+        if (!getSubscriptions(i).isInitialized()) {
+          memoizedIsInitialized = 0;
+          return false;
+        }
+      }
+      memoizedIsInitialized = 1;
+      return true;
+    }
+    
+    public void writeTo(org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream output)
+                        throws java.io.IOException {
+        throw new RuntimeException("Cannot use CodedOutputStream");
+    }
+    
+    public void writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream output)
+                        throws java.io.IOException {
+      getSerializedSize();
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        output.writeEnum(1, metadataOp_.getNumber());
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        output.writeUInt64(2, txnidLeastBits_);
+      }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        output.writeUInt64(3, txnidMostBits_);
+      }
+      if (((bitField0_ & 0x00000008) == 0x00000008)) {
+        output.writeEnum(4, expectedStatus_.getNumber());
+      }
+      if (((bitField0_ & 0x00000010) == 0x00000010)) {
+        output.writeEnum(5, newStatus_.getNumber());
+      }
+      for (int i = 0; i < partitions_.size(); i++) {
+        output.writeBytes(6, partitions_.getByteString(i));
+      }
+      for (int i = 0; i < subscriptions_.size(); i++) {
+        output.writeMessage(7, subscriptions_.get(i));
+      }
+      if (((bitField0_ & 0x00000020) == 0x00000020)) {
+        output.writeUInt64(8, timeoutMs_);
+      }
+      if (((bitField0_ & 0x00000040) == 0x00000040)) {
+        output.writeUInt64(9, startTime_);
+      }
+      if (((bitField0_ & 0x00000080) == 0x00000080)) {
+        output.writeUInt64(10, lastModificationTime_);
+      }
+    }
+    
+    private int memoizedSerializedSize = -1;
+    public int getSerializedSize() {
+      int size = memoizedSerializedSize;
+      if (size != -1) return size;
+    
+      size = 0;
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+          .computeEnumSize(1, metadataOp_.getNumber());
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+          .computeUInt64Size(2, txnidLeastBits_);
+      }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+          .computeUInt64Size(3, txnidMostBits_);
+      }
+      if (((bitField0_ & 0x00000008) == 0x00000008)) {
+        size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+          .computeEnumSize(4, expectedStatus_.getNumber());
+      }
+      if (((bitField0_ & 0x00000010) == 0x00000010)) {
+        size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+          .computeEnumSize(5, newStatus_.getNumber());
+      }
+      {
+        int dataSize = 0;
+        for (int i = 0; i < partitions_.size(); i++) {
+          dataSize += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+            .computeBytesSizeNoTag(partitions_.getByteString(i));
+        }
+        size += dataSize;
+        size += 1 * getPartitionsList().size();
+      }
+      for (int i = 0; i < subscriptions_.size(); i++) {
+        size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+          .computeMessageSize(7, subscriptions_.get(i));
+      }
+      if (((bitField0_ & 0x00000020) == 0x00000020)) {
+        size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+          .computeUInt64Size(8, timeoutMs_);
+      }
+      if (((bitField0_ & 0x00000040) == 0x00000040)) {
+        size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+          .computeUInt64Size(9, startTime_);
+      }
+      if (((bitField0_ & 0x00000080) == 0x00000080)) {
+        size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+          .computeUInt64Size(10, lastModificationTime_);
+      }
+      memoizedSerializedSize = size;
+      return size;
+    }
+    
+    private static final long serialVersionUID = 0L;
+    @java.lang.Override
+    protected java.lang.Object writeReplace()
+        throws java.io.ObjectStreamException {
+      return super.writeReplace();
+    }
+    
+    public static org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntry parseFrom(
+        org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString data)
+        throws org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException {
+         throw new RuntimeException("Disabled");
+    }
+    public static org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntry parseFrom(
+        org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString data,
+        org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry)
+        throws org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException {
+         throw new RuntimeException("Disabled");
+    }
+    public static org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntry parseFrom(byte[] data)
+        throws org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data).buildParsed();
+    }
+    public static org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntry parseFrom(
+        byte[] data,
+        org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry)
+        throws org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data, extensionRegistry)
+               .buildParsed();
+    }
+    public static org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntry parseFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntry parseFrom(
+        java.io.InputStream input,
+        org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    public static org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntry parseDelimitedFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntry parseDelimitedFrom(
+        java.io.InputStream input,
+        org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input, extensionRegistry)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntry parseFrom(
+        org.apache.pulsar.shaded.com.google.protobuf.v241.CodedInputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntry parseFrom(
+        org.apache.pulsar.shaded.com.google.protobuf.v241.CodedInputStream input,
+        org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    
+    public static Builder newBuilder() { return Builder.create(); }
+    public Builder newBuilderForType() { return newBuilder(); }
+    public static Builder newBuilder(org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntry prototype) {
+      return newBuilder().mergeFrom(prototype);
+    }
+    public Builder toBuilder() { return newBuilder(this); }
+    
+    public static final class Builder extends
+        org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite.Builder<
+          org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntry, Builder>
+        implements org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntryOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder  {
+      // Construct using org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntry.newBuilder()
+      private final io.netty.util.Recycler.Handle handle;
+      private Builder(io.netty.util.Recycler.Handle handle) {
+        this.handle = handle;
+        maybeForceBuilderInitialization();
+      }
+      private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {
+         protected Builder newObject(io.netty.util.Recycler.Handle handle) {
+               return new Builder(handle);
+             }
+            };
+      
+       public void recycle() {
+                clear();
+                if (handle != null) {RECYCLER.recycle(this, handle);}
+            }
+      
+      private void maybeForceBuilderInitialization() {
+      }
+      private static Builder create() {
+        return RECYCLER.get();
+      }
+      
+      public Builder clear() {
+        super.clear();
+        metadataOp_ = org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntry.TransactionMetadataOp.NEW;
+        bitField0_ = (bitField0_ & ~0x00000001);
+        txnidLeastBits_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000002);
+        txnidMostBits_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000004);
+        expectedStatus_ = org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus.OPEN;
+        bitField0_ = (bitField0_ & ~0x00000008);
+        newStatus_ = org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus.OPEN;
+        bitField0_ = (bitField0_ & ~0x00000010);
+        partitions_ = org.apache.pulsar.shaded.com.google.protobuf.v241.LazyStringArrayList.EMPTY;
+        bitField0_ = (bitField0_ & ~0x00000020);
+        subscriptions_ = java.util.Collections.emptyList();
+        bitField0_ = (bitField0_ & ~0x00000040);
+        timeoutMs_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000080);
+        startTime_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000100);
+        lastModificationTime_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000200);
+        return this;
+      }
+      
+      public Builder clone() {
+        return create().mergeFrom(buildPartial());
+      }
+      
+      public org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntry getDefaultInstanceForType() {
+        return org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntry.getDefaultInstance();
+      }
+      
+      public org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntry build() {
+        org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntry result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(result);
+        }
+        return result;
+      }
+      
+      private org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntry buildParsed()
+          throws org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException {
+        org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntry result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(
+            result).asInvalidProtocolBufferException();
+        }
+        return result;
+      }
+      
+      public org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntry buildPartial() {
+        org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntry result = org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntry.RECYCLER.get();
+        int from_bitField0_ = bitField0_;
+        int to_bitField0_ = 0;
+        if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+          to_bitField0_ |= 0x00000001;
+        }
+        result.metadataOp_ = metadataOp_;
+        if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+          to_bitField0_ |= 0x00000002;
+        }
+        result.txnidLeastBits_ = txnidLeastBits_;
+        if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
+          to_bitField0_ |= 0x00000004;
+        }
+        result.txnidMostBits_ = txnidMostBits_;
+        if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
+          to_bitField0_ |= 0x00000008;
+        }
+        result.expectedStatus_ = expectedStatus_;
+        if (((from_bitField0_ & 0x00000010) == 0x00000010)) {
+          to_bitField0_ |= 0x00000010;
+        }
+        result.newStatus_ = newStatus_;
+        if (((bitField0_ & 0x00000020) == 0x00000020)) {
+          partitions_ = new org.apache.pulsar.shaded.com.google.protobuf.v241.UnmodifiableLazyStringList(
+              partitions_);
+          bitField0_ = (bitField0_ & ~0x00000020);
+        }
+        result.partitions_ = partitions_;
+        if (((bitField0_ & 0x00000040) == 0x00000040)) {
+          subscriptions_ = java.util.Collections.unmodifiableList(subscriptions_);
+          bitField0_ = (bitField0_ & ~0x00000040);
+        }
+        result.subscriptions_ = subscriptions_;
+        if (((from_bitField0_ & 0x00000080) == 0x00000080)) {
+          to_bitField0_ |= 0x00000020;
+        }
+        result.timeoutMs_ = timeoutMs_;
+        if (((from_bitField0_ & 0x00000100) == 0x00000100)) {
+          to_bitField0_ |= 0x00000040;
+        }
+        result.startTime_ = startTime_;
+        if (((from_bitField0_ & 0x00000200) == 0x00000200)) {
+          to_bitField0_ |= 0x00000080;
+        }
+        result.lastModificationTime_ = lastModificationTime_;
+        result.bitField0_ = to_bitField0_;
+        return result;
+      }
+      
+      public Builder mergeFrom(org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntry other) {
+        if (other == org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntry.getDefaultInstance()) return this;
+        if (other.hasMetadataOp()) {
+          setMetadataOp(other.getMetadataOp());
+        }
+        if (other.hasTxnidLeastBits()) {
+          setTxnidLeastBits(other.getTxnidLeastBits());
+        }
+        if (other.hasTxnidMostBits()) {
+          setTxnidMostBits(other.getTxnidMostBits());
+        }
+        if (other.hasExpectedStatus()) {
+          setExpectedStatus(other.getExpectedStatus());
+        }
+        if (other.hasNewStatus()) {
+          setNewStatus(other.getNewStatus());
+        }
+        if (!other.partitions_.isEmpty()) {
+          if (partitions_.isEmpty()) {
+            partitions_ = other.partitions_;
+            bitField0_ = (bitField0_ & ~0x00000020);
+          } else {
+            ensurePartitionsIsMutable();
+            partitions_.addAll(other.partitions_);
+          }
+          
+        }
+        if (!other.subscriptions_.isEmpty()) {
+          if (subscriptions_.isEmpty()) {
+            subscriptions_ = other.subscriptions_;
+            bitField0_ = (bitField0_ & ~0x00000040);
+          } else {
+            ensureSubscriptionsIsMutable();
+            subscriptions_.addAll(other.subscriptions_);
+          }
+          
+        }
+        if (other.hasTimeoutMs()) {
+          setTimeoutMs(other.getTimeoutMs());
+        }
+        if (other.hasStartTime()) {
+          setStartTime(other.getStartTime());
+        }
+        if (other.hasLastModificationTime()) {
+          setLastModificationTime(other.getLastModificationTime());
+        }
+        return this;
+      }
+      
+      public final boolean isInitialized() {
+        for (int i = 0; i < getSubscriptionsCount(); i++) {
+          if (!getSubscriptions(i).isInitialized()) {
+            
+            return false;
+          }
+        }
+        return true;
+      }
+      
+      public Builder mergeFrom(org.apache.pulsar.shaded.com.google.protobuf.v241.CodedInputStream input,
+                              org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry)
+                              throws java.io.IOException {
+         throw new java.io.IOException("Merge from CodedInputStream is disabled");
+                              }
+      public Builder mergeFrom(
+          org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream input,
+          org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry)
+          throws java.io.IOException {
+        while (true) {
+          int tag = input.readTag();
+          switch (tag) {
+            case 0:
+              
+              return this;
+            default: {
+              if (!input.skipField(tag)) {
+                
+                return this;
+              }
+              break;
+            }
+            case 8: {
+              int rawValue = input.readEnum();
+              org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntry.TransactionMetadataOp value = org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntry.TransactionMetadataOp.valueOf(rawValue);
+              if (value != null) {
+                bitField0_ |= 0x00000001;
+                metadataOp_ = value;
+              }
+              break;
+            }
+            case 16: {
+              bitField0_ |= 0x00000002;
+              txnidLeastBits_ = input.readUInt64();
+              break;
+            }
+            case 24: {
+              bitField0_ |= 0x00000004;
+              txnidMostBits_ = input.readUInt64();
+              break;
+            }
+            case 32: {
+              int rawValue = input.readEnum();
+              org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus value = org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus.valueOf(rawValue);
+              if (value != null) {
+                bitField0_ |= 0x00000008;
+                expectedStatus_ = value;
+              }
+              break;
+            }
+            case 40: {
+              int rawValue = input.readEnum();
+              org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus value = org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus.valueOf(rawValue);
+              if (value != null) {
+                bitField0_ |= 0x00000010;
+                newStatus_ = value;
+              }
+              break;
+            }
+            case 50: {
+              ensurePartitionsIsMutable();
+              partitions_.add(input.readBytes());
+              break;
+            }
+            case 58: {
+              org.apache.pulsar.common.api.proto.PulsarApi.Subscription.Builder subBuilder = org.apache.pulsar.common.api.proto.PulsarApi.Subscription.newBuilder();
+              input.readMessage(subBuilder, extensionRegistry);
+              addSubscriptions(subBuilder.buildPartial());
+              break;
+            }
+            case 64: {
+              bitField0_ |= 0x00000080;
+              timeoutMs_ = input.readUInt64();
+              break;
+            }
+            case 72: {
+              bitField0_ |= 0x00000100;
+              startTime_ = input.readUInt64();
+              break;
+            }
+            case 80: {
+              bitField0_ |= 0x00000200;
+              lastModificationTime_ = input.readUInt64();
+              break;
+            }
+          }
+        }
+      }
+      
+      private int bitField0_;
+      
+      // optional .pulsar.proto.TransactionMetadataEntry.TransactionMetadataOp metadata_op = 1;
+      private org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntry.TransactionMetadataOp metadataOp_ = org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntry.TransactionMetadataOp.NEW;
+      public boolean hasMetadataOp() {
+        return ((bitField0_ & 0x00000001) == 0x00000001);
+      }
+      public org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntry.TransactionMetadataOp getMetadataOp() {
+        return metadataOp_;
+      }
+      public Builder setMetadataOp(org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntry.TransactionMetadataOp value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        bitField0_ |= 0x00000001;
+        metadataOp_ = value;
+        
+        return this;
+      }
+      public Builder clearMetadataOp() {
+        bitField0_ = (bitField0_ & ~0x00000001);
+        metadataOp_ = org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TransactionMetadataEntry.TransactionMetadataOp.NEW;
+        
+        return this;
+      }
+      
+      // optional uint64 txnid_least_bits = 2 [default = 0];
+      private long txnidLeastBits_ ;
+      public boolean hasTxnidLeastBits() {
+        return ((bitField0_ & 0x00000002) == 0x00000002);
+      }
+      public long getTxnidLeastBits() {
+        return txnidLeastBits_;
+      }
+      public Builder setTxnidLeastBits(long value) {
+        bitField0_ |= 0x00000002;
+        txnidLeastBits_ = value;
+        
+        return this;
+      }
+      public Builder clearTxnidLeastBits() {
+        bitField0_ = (bitField0_ & ~0x00000002);
+        txnidLeastBits_ = 0L;
+        
+        return this;
+      }
+      
+      // optional uint64 txnid_most_bits = 3 [default = 0];
+      private long txnidMostBits_ ;
+      public boolean hasTxnidMostBits() {
+        return ((bitField0_ & 0x00000004) == 0x00000004);
+      }
+      public long getTxnidMostBits() {
+        return txnidMostBits_;
+      }
+      public Builder setTxnidMostBits(long value) {
+        bitField0_ |= 0x00000004;
+        txnidMostBits_ = value;
+        
+        return this;
+      }
+      public Builder clearTxnidMostBits() {
+        bitField0_ = (bitField0_ & ~0x00000004);
+        txnidMostBits_ = 0L;
+        
+        return this;
+      }
+      
+      // optional .pulsar.proto.TxnStatus expected_status = 4;
+      private org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus expectedStatus_ = org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus.OPEN;
+      public boolean hasExpectedStatus() {
+        return ((bitField0_ & 0x00000008) == 0x00000008);
+      }
+      public org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus getExpectedStatus() {
+        return expectedStatus_;
+      }
+      public Builder setExpectedStatus(org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        bitField0_ |= 0x00000008;
+        expectedStatus_ = value;
+        
+        return this;
+      }
+      public Builder clearExpectedStatus() {
+        bitField0_ = (bitField0_ & ~0x00000008);
+        expectedStatus_ = org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus.OPEN;
+        
+        return this;
+      }
+      
+      // optional .pulsar.proto.TxnStatus new_status = 5;
+      private org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus newStatus_ = org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus.OPEN;
+      public boolean hasNewStatus() {
+        return ((bitField0_ & 0x00000010) == 0x00000010);
+      }
+      public org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus getNewStatus() {
+        return newStatus_;
+      }
+      public Builder setNewStatus(org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        bitField0_ |= 0x00000010;
+        newStatus_ = value;
+        
+        return this;
+      }
+      public Builder clearNewStatus() {
+        bitField0_ = (bitField0_ & ~0x00000010);
+        newStatus_ = org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus.OPEN;
+        
+        return this;
+      }
+      
+      // repeated string partitions = 6;
+      private org.apache.pulsar.shaded.com.google.protobuf.v241.LazyStringList partitions_ = org.apache.pulsar.shaded.com.google.protobuf.v241.LazyStringArrayList.EMPTY;
+      private void ensurePartitionsIsMutable() {
+        if (!((bitField0_ & 0x00000020) == 0x00000020)) {
+          partitions_ = new org.apache.pulsar.shaded.com.google.protobuf.v241.LazyStringArrayList(partitions_);
+          bitField0_ |= 0x00000020;
+         }
+      }
+      public java.util.List<String>
+          getPartitionsList() {
+        return java.util.Collections.unmodifiableList(partitions_);
+      }
+      public int getPartitionsCount() {
+        return partitions_.size();
+      }
+      public String getPartitions(int index) {
+        return partitions_.get(index);
+      }
+      public Builder setPartitions(
+          int index, String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  ensurePartitionsIsMutable();
+        partitions_.set(index, value);
+        
+        return this;
+      }
+      public Builder addPartitions(String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  ensurePartitionsIsMutable();
+        partitions_.add(value);
+        
+        return this;
+      }
+      public Builder addAllPartitions(
+          java.lang.Iterable<String> values) {
+        ensurePartitionsIsMutable();
+        super.addAll(values, partitions_);
+        
+        return this;
+      }
+      public Builder clearPartitions() {
+        partitions_ = org.apache.pulsar.shaded.com.google.protobuf.v241.LazyStringArrayList.EMPTY;
+        bitField0_ = (bitField0_ & ~0x00000020);
+        
+        return this;
+      }
+      void addPartitions(org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString value) {
+        ensurePartitionsIsMutable();
+        partitions_.add(value);
+        
+      }
+      
+      // repeated .pulsar.proto.Subscription subscriptions = 7;
+      private java.util.List<org.apache.pulsar.common.api.proto.PulsarApi.Subscription> subscriptions_ =
+        java.util.Collections.emptyList();
+      private void ensureSubscriptionsIsMutable() {
+        if (!((bitField0_ & 0x00000040) == 0x00000040)) {
+          subscriptions_ = new java.util.ArrayList<org.apache.pulsar.common.api.proto.PulsarApi.Subscription>(subscriptions_);
+          bitField0_ |= 0x00000040;
+         }
+      }
+      
+      public java.util.List<org.apache.pulsar.common.api.proto.PulsarApi.Subscription> getSubscriptionsList() {
+        return java.util.Collections.unmodifiableList(subscriptions_);
+      }
+      public int getSubscriptionsCount() {
+        return subscriptions_.size();
+      }
+      public org.apache.pulsar.common.api.proto.PulsarApi.Subscription getSubscriptions(int index) {
+        return subscriptions_.get(index);
+      }
+      public Builder setSubscriptions(
+          int index, org.apache.pulsar.common.api.proto.PulsarApi.Subscription value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        ensureSubscriptionsIsMutable();
+        subscriptions_.set(index, value);
+        
+        return this;
+      }
+      public Builder setSubscriptions(
+          int index, org.apache.pulsar.common.api.proto.PulsarApi.Subscription.Builder builderForValue) {
+        ensureSubscriptionsIsMutable();
+        subscriptions_.set(index, builderForValue.build());
+        
+        return this;
+      }
+      public Builder addSubscriptions(org.apache.pulsar.common.api.proto.PulsarApi.Subscription value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        ensureSubscriptionsIsMutable();
+        subscriptions_.add(value);
+        
+        return this;
+      }
+      public Builder addSubscriptions(
+          int index, org.apache.pulsar.common.api.proto.PulsarApi.Subscription value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        ensureSubscriptionsIsMutable();
+        subscriptions_.add(index, value);
+        
+        return this;
+      }
+      public Builder addSubscriptions(
+          org.apache.pulsar.common.api.proto.PulsarApi.Subscription.Builder builderForValue) {
+        ensureSubscriptionsIsMutable();
+        subscriptions_.add(builderForValue.build());
+        
+        return this;
+      }
+      public Builder addSubscriptions(
+          int index, org.apache.pulsar.common.api.proto.PulsarApi.Subscription.Builder builderForValue) {
+        ensureSubscriptionsIsMutable();
+        subscriptions_.add(index, builderForValue.build());
+        
+        return this;
+      }
+      public Builder addAllSubscriptions(
+          java.lang.Iterable<? extends org.apache.pulsar.common.api.proto.PulsarApi.Subscription> values) {
+        ensureSubscriptionsIsMutable();
+        super.addAll(values, subscriptions_);
+        
+        return this;
+      }
+      public Builder clearSubscriptions() {
+        subscriptions_ = java.util.Collections.emptyList();
+        bitField0_ = (bitField0_ & ~0x00000040);
+        
+        return this;
+      }
+      public Builder removeSubscriptions(int index) {
+        ensureSubscriptionsIsMutable();
+        subscriptions_.remove(index);
+        
+        return this;
+      }
+      
+      // optional uint64 timeout_ms = 8;
+      private long timeoutMs_ ;
+      public boolean hasTimeoutMs() {
+        return ((bitField0_ & 0x00000080) == 0x00000080);
+      }
+      public long getTimeoutMs() {
+        return timeoutMs_;
+      }
+      public Builder setTimeoutMs(long value) {
+        bitField0_ |= 0x00000080;
+        timeoutMs_ = value;
+        
+        return this;
+      }
+      public Builder clearTimeoutMs() {
+        bitField0_ = (bitField0_ & ~0x00000080);
+        timeoutMs_ = 0L;
+        
+        return this;
+      }
+      
+      // optional uint64 start_time = 9;
+      private long startTime_ ;
+      public boolean hasStartTime() {
+        return ((bitField0_ & 0x00000100) == 0x00000100);
+      }
+      public long getStartTime() {
+        return startTime_;
+      }
+      public Builder setStartTime(long value) {
+        bitField0_ |= 0x00000100;
+        startTime_ = value;
+        
+        return this;
+      }
+      public Builder clearStartTime() {
+        bitField0_ = (bitField0_ & ~0x00000100);
+        startTime_ = 0L;
+        
+        return this;
+      }
+      
+      // optional uint64 last_modification_time = 10;
+      private long lastModificationTime_ ;
+      public boolean hasLastModificationTime() {
+        return ((bitField0_ & 0x00000200) == 0x00000200);
+      }
+      public long getLastModificationTime() {
+        return lastModificationTime_;
+      }
+      public Builder setLastModificationTime(long value) {
+        bitField0_ |= 0x00000200;
+        lastModificationTime_ = value;
+        
+        return this;
+      }
+      public Builder clearLastModificationTime() {
+        bitField0_ = (bitField0_ & ~0x00000200);
+        lastModificationTime_ = 0L;
+        
+        return this;
+      }
+      
+      // @@protoc_insertion_point(builder_scope:pulsar.proto.TransactionMetadataEntry)
+    }
+    
+    static {
+      defaultInstance = new TransactionMetadataEntry(true);
+      defaultInstance.initFields();
+    }
+    
+    // @@protoc_insertion_point(class_scope:pulsar.proto.TransactionMetadataEntry)
+  }
+  
+  
+  static {
+  }
+  
+  // @@protoc_insertion_point(outer_class_scope)
+}
diff --git a/pulsar-transaction/common/src/main/java/org/apache/pulsar/transaction/impl/common/TxnStatus.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/util/TransactionUtil.java
similarity index 68%
rename from pulsar-transaction/common/src/main/java/org/apache/pulsar/transaction/impl/common/TxnStatus.java
rename to pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/util/TransactionUtil.java
index dc5b8d9..ecff44d 100644
--- a/pulsar-transaction/common/src/main/java/org/apache/pulsar/transaction/impl/common/TxnStatus.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/util/TransactionUtil.java
@@ -16,26 +16,19 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.transaction.impl.common;
+package org.apache.pulsar.transaction.coordinator.util;
 
-import com.google.common.annotations.Beta;
+import org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus;
+
+import static org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus.ABORTED;
+import static org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus.ABORTING;
+import static org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus.COMMITTED;
+import static org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus.COMMITTING;
 
 /**
- * A enum represents the status of a transaction.
+ * An transaction util of {@link TransactionUtil}.
  */
-@Beta
-public enum TxnStatus {
-
-    // A new transaction is open.
-    OPEN,
-    // A transaction is in the progress of committing.
-    COMMITTING,
-    // A transaction is already committed.
-    COMMITTED,
-    // A transaction is in the progress of aborting.
-    ABORTING,
-    // A transaction is already aborted.
-    ABORTED;
+public class TransactionUtil {
 
     /**
      * Check if the a status can be transaction to a new status.
@@ -43,8 +36,7 @@ public enum TxnStatus {
      * @param newStatus the new status
      * @return true if the current status can be transitioning to.
      */
-    public boolean canTransitionTo(TxnStatus newStatus) {
-        TxnStatus currentStatus = this;
+    public static boolean canTransitionTo(TxnStatus currentStatus, TxnStatus newStatus) {
 
         switch (currentStatus) {
             case OPEN:
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/exceptions/TransactionStatusException.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/util/package-info.java
similarity index 51%
copy from pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/exceptions/TransactionStatusException.java
copy to pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/util/package-info.java
index c81b17a..4501e50 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/exceptions/TransactionStatusException.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/util/package-info.java
@@ -16,22 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.broker.transaction.buffer.exceptions;
-
-import org.apache.pulsar.client.api.transaction.TxnID;
-import org.apache.pulsar.transaction.impl.common.TxnStatus;
-
 /**
- * Exceptions are thrown when operations are applied to a transaction which is not in expected txn status.
+ * Implementations of the transaction coordinator.
  */
-public class TransactionStatusException extends TransactionBufferException {
-
-    private static final long serialVersionUID = 0L;
-
-    public TransactionStatusException(TxnID txnId,
-                                      TxnStatus expectedStatus,
-                                      TxnStatus actualStatus) {
-        super("Transaction `" + txnId + "` is not in an expected status `" + expectedStatus
-            + "`, but is in status `" + actualStatus + "`");
-    }
-}
+package org.apache.pulsar.transaction.coordinator.util;
\ No newline at end of file
diff --git a/pulsar-transaction/coordinator/src/main/proto/PulsarTransactionMetadata.proto b/pulsar-transaction/coordinator/src/main/proto/PulsarTransactionMetadata.proto
new file mode 100644
index 0000000..5391919
--- /dev/null
+++ b/pulsar-transaction/coordinator/src/main/proto/PulsarTransactionMetadata.proto
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+syntax = "proto2";
+
+import "pulsar-common/src/main/proto/PulsarApi.proto";
+package pulsar.proto;
+option java_package = "org.apache.pulsar.transaction.coordinator.proto";
+option optimize_for = LITE_RUNTIME;
+
+enum TxnStatus {
+  OPEN       = 0;
+  COMMITTING = 1;
+  COMMITTED  = 2;
+  ABORTING   = 3;
+  ABORTED    = 4;
+}
+
+message TransactionMetadataEntry {
+
+  enum TransactionMetadataOp {
+    NEW                 = 0;
+    ADD_PARTITION       = 1;
+    ADD_SUBSCRIPTION    = 2;
+    UPDATE              = 3;
+  }
+
+  optional TransactionMetadataOp metadata_op   = 1;
+  optional uint64 txnid_least_bits    = 2 [default = 0];
+  optional uint64 txnid_most_bits     = 3 [default = 0];
+  optional TxnStatus expected_status  = 4;
+  optional TxnStatus new_status       = 5;
+  repeated string partitions          = 6;
+  repeated pulsar.proto.Subscription subscriptions = 7;
+  optional uint64 timeout_ms      = 8;
+  optional uint64 start_time      = 9;
+  optional uint64 last_modification_time = 10;
+}
\ No newline at end of file
diff --git a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
new file mode 100644
index 0000000..84e68c7
--- /dev/null
+++ b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
@@ -0,0 +1,262 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator;
+
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.TransactionNotFoundException;
+import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl;
+import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
+import org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus;
+import org.apache.pulsar.transaction.coordinator.test.MockedBookKeeperTestCase;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
+
+    public MLTransactionMetadataStoreTest() {
+        super(3);
+    }
+
+    @Test
+    public void testTransactionOperation() throws Exception {
+        ManagedLedgerFactoryConfig factoryConf = new ManagedLedgerFactoryConfig();
+        factoryConf.setMaxCacheSize(0);
+        ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(bkc, zkc, factoryConf);
+        TransactionCoordinatorID transactionCoordinatorID = new TransactionCoordinatorID(1);
+        MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory);
+        MLTransactionMetadataStore transactionMetadataStore =
+                new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog);
+        int checkReplayRetryCount = 0;
+        while (true) {
+            checkReplayRetryCount++;
+            if (checkReplayRetryCount > 3) {
+                Assert.fail();
+                break;
+            }
+            if (transactionMetadataStore.checkIfReady()) {
+                TxnID txnID = transactionMetadataStore.newTransaction(5000).get();
+                Assert.assertEquals(transactionMetadataStore.getTxnStatus(txnID).get(), TxnStatus.OPEN);
+
+                List<String> partitions = new ArrayList<>();
+                partitions.add("pt-1");
+                partitions.add("pt-2");
+                transactionMetadataStore.addProducedPartitionToTxn(txnID, partitions).get();
+                Assert.assertEquals(transactionMetadataStore.getTxnMeta(txnID).get().producedPartitions(), partitions);
+
+                partitions.add("pt-3");
+                transactionMetadataStore.addProducedPartitionToTxn(txnID, partitions).get();
+                Assert.assertEquals(transactionMetadataStore.getTxnMeta(txnID).get().producedPartitions(),
+                        partitions);
+
+                List<TransactionSubscription> subscriptions = new ArrayList<>();
+                subscriptions.add(new TransactionSubscription("topic1", "sub1"));
+                subscriptions.add(new TransactionSubscription("topic2", "sub2"));
+                transactionMetadataStore.addAckedPartitionToTxn(txnID, subscriptions).get();
+                Assert.assertTrue(transactionMetadataStore.getTxnMeta(txnID).get().ackedPartitions().containsAll(subscriptions));
+
+                transactionMetadataStore.addAckedPartitionToTxn(txnID, subscriptions).get();
+                Assert.assertEquals(transactionMetadataStore.getTxnMeta(txnID).get().producedPartitions(),
+                        partitions);
+
+                transactionMetadataStore.updateTxnStatus(txnID, TxnStatus.COMMITTING, TxnStatus.OPEN).get();
+                Assert.assertEquals(transactionMetadataStore.getTxnStatus(txnID).get(), TxnStatus.COMMITTING);
+
+                transactionMetadataStore.updateTxnStatus(txnID, TxnStatus.COMMITTED, TxnStatus.COMMITTING).get();
+
+                try {
+                    transactionMetadataStore.getTxnMeta(txnID).get();
+                    Assert.fail();
+                } catch (ExecutionException e) {
+                    Assert.assertTrue(e.getCause() instanceof TransactionNotFoundException);
+                }
+                break;
+            } else {
+                checkReplayRetryCount++;
+                Thread.sleep(100);
+            }
+        }
+    }
+
+    @Test
+    public void testInitTransactionReader() throws Exception {
+        ManagedLedgerFactoryConfig factoryConf = new ManagedLedgerFactoryConfig();
+        factoryConf.setMaxCacheSize(0);
+        ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(bkc, zkc, factoryConf);
+        TransactionCoordinatorID transactionCoordinatorID = new TransactionCoordinatorID(1);
+        MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory);
+        MLTransactionMetadataStore transactionMetadataStore =
+                new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog);
+        int checkReplayRetryCount = 0;
+        while (true) {
+            if (checkReplayRetryCount > 3) {
+                Assert.fail();
+                break;
+            }
+            if (transactionMetadataStore.checkIfReady()) {
+                TxnID txnID1 = transactionMetadataStore.newTransaction(1000).get();
+                TxnID txnID2 = transactionMetadataStore.newTransaction(1000).get();
+                Assert.assertEquals(transactionMetadataStore.getTxnStatus(txnID1).get(), TxnStatus.OPEN);
+                Assert.assertEquals(transactionMetadataStore.getTxnStatus(txnID2).get(), TxnStatus.OPEN);
+
+                List<String> partitions = new ArrayList<>();
+                partitions.add("pt-1");
+                partitions.add("pt-2");
+                transactionMetadataStore.addProducedPartitionToTxn(txnID1, partitions).get();
+                transactionMetadataStore.addProducedPartitionToTxn(txnID2, partitions).get();
+
+                List<TransactionSubscription> subscriptions = new ArrayList<>();
+                subscriptions.add(new TransactionSubscription("topic1", "sub1"));
+                subscriptions.add(new TransactionSubscription("topic2", "sub2"));
+
+                transactionMetadataStore.addAckedPartitionToTxn(txnID1, subscriptions).get();
+                transactionMetadataStore.addAckedPartitionToTxn(txnID2, subscriptions).get();
+                List<TransactionSubscription> subscriptions1 = new ArrayList<>();
+                subscriptions1.add(new TransactionSubscription("topic1", "sub1"));
+                subscriptions1.add(new TransactionSubscription("topic3", "sub3"));
+                subscriptions1.add(new TransactionSubscription("topic3", "sub3"));
+                transactionMetadataStore.addAckedPartitionToTxn(txnID1, subscriptions1).get();
+                transactionMetadataStore.addAckedPartitionToTxn(txnID2, subscriptions1).get();
+
+                transactionMetadataStore.updateTxnStatus(txnID1, TxnStatus.COMMITTING, TxnStatus.OPEN).get();
+                transactionMetadataStore.updateTxnStatus(txnID2, TxnStatus.COMMITTING, TxnStatus.OPEN).get();
+
+                transactionMetadataStore.closeAsync();
+
+                MLTransactionMetadataStore transactionMetadataStoreTest =
+                        new MLTransactionMetadataStore(transactionCoordinatorID,
+
+                                new MLTransactionLogImpl(transactionCoordinatorID, factory));
+
+                while (true) {
+                    if (checkReplayRetryCount > 6) {
+                        Assert.fail();
+                        break;
+                    }
+                    if (transactionMetadataStoreTest.checkIfReady()) {
+                        subscriptions.add(new TransactionSubscription("topic3", "sub3"));
+                        TxnMeta txnMeta1 = transactionMetadataStoreTest.getTxnMeta(txnID1).get();
+                        TxnMeta txnMeta2 = transactionMetadataStoreTest.getTxnMeta(txnID2).get();
+                        Assert.assertEquals(txnMeta1.producedPartitions(), partitions);
+                        Assert.assertEquals(txnMeta2.producedPartitions(), partitions);
+                        Assert.assertEquals(txnMeta1.ackedPartitions().size(), subscriptions.size());
+                        Assert.assertEquals(txnMeta2.ackedPartitions().size(), subscriptions.size());
+                        Assert.assertTrue(subscriptions.containsAll(txnMeta1.ackedPartitions()));
+                        Assert.assertTrue(subscriptions.containsAll(txnMeta2.ackedPartitions()));
+                        Assert.assertEquals(txnMeta1.status(), TxnStatus.COMMITTING);
+                        Assert.assertEquals(txnMeta2.status(), TxnStatus.COMMITTING);
+                        transactionMetadataStoreTest
+                                .updateTxnStatus(txnID1, TxnStatus.COMMITTED, TxnStatus.COMMITTING).get();
+                        transactionMetadataStoreTest
+                                .updateTxnStatus(txnID2, TxnStatus.COMMITTED, TxnStatus.COMMITTING).get();
+                        try {
+                            transactionMetadataStoreTest.getTxnMeta(txnID1).get();
+                            Assert.fail();
+                        } catch (ExecutionException e) {
+                            Assert.assertTrue(e.getCause() instanceof TransactionNotFoundException);
+                        }
+
+                        try {
+                            transactionMetadataStoreTest.getTxnMeta(txnID2).get();
+                            Assert.fail();
+                        } catch (ExecutionException e) {
+                            Assert.assertTrue(e.getCause() instanceof TransactionNotFoundException);
+                        }
+                        TxnID txnID = transactionMetadataStoreTest.newTransaction(1000).get();
+                        Assert.assertEquals(txnID.getLeastSigBits(), 2L);
+                        break;
+                    } else {
+                        checkReplayRetryCount++;
+                        Thread.sleep(100);
+                    }
+                }
+                break;
+            } else {
+                checkReplayRetryCount++;
+                Thread.sleep(100);
+            }
+        }
+    }
+
+    @Test
+    public void testDeleteLog() throws Exception {
+        ManagedLedgerFactoryConfig factoryConf = new ManagedLedgerFactoryConfig();
+        factoryConf.setMaxCacheSize(0);
+        ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(bkc, zkc, factoryConf);
+        TransactionCoordinatorID transactionCoordinatorID = new TransactionCoordinatorID(1);
+        MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory);
+        MLTransactionMetadataStore transactionMetadataStore =
+                new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog);
+        int checkReplayRetryCount = 0;
+        while (true) {
+            if (checkReplayRetryCount > 3) {
+                Assert.fail();
+                break;
+            }
+            if (transactionMetadataStore.checkIfReady()) {
+                TxnID txnID1 = transactionMetadataStore.newTransaction(1000).get();
+                TxnID txnID2 = transactionMetadataStore.newTransaction(1000).get();
+                Assert.assertEquals(transactionMetadataStore.getTxnStatus(txnID1).get(), TxnStatus.OPEN);
+                Assert.assertEquals(transactionMetadataStore.getTxnStatus(txnID2).get(), TxnStatus.OPEN);
+
+                List<String> partitions = new ArrayList<>();
+                partitions.add("pt-1");
+                partitions.add("pt-2");
+                transactionMetadataStore.addProducedPartitionToTxn(txnID1, partitions).get();
+                transactionMetadataStore.addProducedPartitionToTxn(txnID2, partitions).get();
+
+                List<TransactionSubscription> subscriptions = new ArrayList<>();
+                subscriptions.add(new TransactionSubscription("topic1", "sub1"));
+                subscriptions.add(new TransactionSubscription("topic2", "sub2"));
+
+                transactionMetadataStore.addAckedPartitionToTxn(txnID1, subscriptions).get();
+                transactionMetadataStore.addAckedPartitionToTxn(txnID2, subscriptions).get();
+                List<TransactionSubscription> subscriptions1 = new ArrayList<>();
+                subscriptions1.add(new TransactionSubscription("topic1", "sub1"));
+                subscriptions1.add(new TransactionSubscription("topic3", "sub3"));
+                subscriptions1.add(new TransactionSubscription("topic3", "sub3"));
+                transactionMetadataStore.addAckedPartitionToTxn(txnID1, subscriptions1).get();
+                transactionMetadataStore.addAckedPartitionToTxn(txnID2, subscriptions1).get();
+
+                transactionMetadataStore.updateTxnStatus(txnID1, TxnStatus.COMMITTING, TxnStatus.OPEN).get();
+                transactionMetadataStore.updateTxnStatus(txnID2, TxnStatus.ABORTING, TxnStatus.OPEN).get();
+
+                transactionMetadataStore.updateTxnStatus(txnID1, TxnStatus.COMMITTED, TxnStatus.COMMITTING).get();
+                transactionMetadataStore.updateTxnStatus(txnID2, TxnStatus.ABORTED, TxnStatus.ABORTING).get();
+                Field field = mlTransactionLog.getClass().getDeclaredField("cursor");
+                field.setAccessible(true);
+                ManagedCursor cursor = (ManagedCursor) field.get(mlTransactionLog);
+                Assert.assertEquals(cursor.getMarkDeletedPosition(), cursor.getManagedLedger().getLastConfirmedEntry());
+
+                break;
+            } else {
+                checkReplayRetryCount++;
+                Thread.sleep(100);
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java
index ff4bd31..9a79b37 100644
--- a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java
+++ b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java
@@ -29,8 +29,8 @@ import java.util.concurrent.ExecutionException;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.InvalidTxnStatusException;
 import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.TransactionNotFoundException;
-import org.apache.pulsar.transaction.impl.common.TxnStatus;
 import org.apache.pulsar.transaction.coordinator.impl.InMemTransactionMetadataStoreProvider;
+import org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Factory;
@@ -78,14 +78,14 @@ public class TransactionMetadataStoreProviderTest {
 
     @Test
     public void testGetTxnStatusSuccess() throws Exception {
-        TxnID txnID = this.store.newTransaction().get();
+        TxnID txnID = this.store.newTransaction(0L).get();
         TxnStatus txnStatus = this.store.getTxnStatus(txnID).get();
         assertEquals(txnStatus, TxnStatus.OPEN);
     }
 
     @Test
     public void testUpdateTxnStatusSuccess() throws Exception {
-        TxnID txnID = this.store.newTransaction().get();
+        TxnID txnID = this.store.newTransaction(0L).get();
         TxnStatus txnStatus = this.store.getTxnStatus(txnID).get();
         assertEquals(txnStatus, TxnStatus.OPEN);
 
@@ -99,7 +99,7 @@ public class TransactionMetadataStoreProviderTest {
 
     @Test
     public void testUpdateTxnStatusNotExpectedStatus() throws Exception {
-        TxnID txnID = this.store.newTransaction().get();
+        TxnID txnID = this.store.newTransaction(0L).get();
         TxnStatus txnStatus = this.store.getTxnStatus(txnID).get();
         assertEquals(txnStatus, TxnStatus.OPEN);
 
@@ -118,7 +118,7 @@ public class TransactionMetadataStoreProviderTest {
 
     @Test
     public void testUpdateTxnStatusCannotTransition() throws Exception {
-        TxnID txnID = this.store.newTransaction().get();
+        TxnID txnID = this.store.newTransaction(0L).get();
         TxnStatus txnStatus = this.store.getTxnStatus(txnID).get();
         assertEquals(txnStatus, TxnStatus.OPEN);
 
@@ -137,7 +137,7 @@ public class TransactionMetadataStoreProviderTest {
 
     @Test
     public void testAddProducedPartition() throws Exception {
-        TxnID txnID = this.store.newTransaction().get();
+        TxnID txnID = this.store.newTransaction(0L).get();
         TxnStatus txnStatus = this.store.getTxnStatus(txnID).get();
         assertEquals(txnStatus, TxnStatus.OPEN);
 
@@ -191,7 +191,7 @@ public class TransactionMetadataStoreProviderTest {
 
     @Test
     public void testAddAckedPartition() throws Exception {
-        TxnID txnID = this.store.newTransaction().get();
+        TxnID txnID = this.store.newTransaction(0L).get();
         TxnStatus txnStatus = this.store.getTxnStatus(txnID).get();
         assertEquals(txnStatus, TxnStatus.OPEN);
 
diff --git a/pulsar-transaction/common/src/test/java/org/apache/pulsar/transaction/impl/common/TxnStatusTest.java b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TxnStatusTest.java
similarity index 91%
rename from pulsar-transaction/common/src/test/java/org/apache/pulsar/transaction/impl/common/TxnStatusTest.java
rename to pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TxnStatusTest.java
index 7a8c8b9..5f606dc 100644
--- a/pulsar-transaction/common/src/test/java/org/apache/pulsar/transaction/impl/common/TxnStatusTest.java
+++ b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TxnStatusTest.java
@@ -16,16 +16,19 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.transaction.impl.common;
-
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertTrue;
+package org.apache.pulsar.transaction.coordinator;
 
 import com.google.common.collect.Sets;
-import java.util.Set;
+import org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus;
+import org.apache.pulsar.transaction.coordinator.util.TransactionUtil;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
+import java.util.Set;
+
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
 /**
  * Unit test {@link TxnStatus}.
  */
@@ -103,13 +106,13 @@ public class TxnStatusTest {
                                         Set<TxnStatus> statusesCanNotTransactionTo) {
         statusesCanTransitionTo.forEach(newStatus -> {
             assertTrue(
-                status.canTransitionTo(newStatus),
+                    TransactionUtil.canTransitionTo(status, newStatus),
                 "Status `" + status + "` should be able to transition to `" + newStatus + "`"
             );
         });
         statusesCanNotTransactionTo.forEach(newStatus -> {
             assertFalse(
-                status.canTransitionTo(newStatus),
+                    TransactionUtil.canTransitionTo(status, newStatus),
                 "Status `" + status + "` should NOT be able to transition to `" + newStatus + "`"
             );
         });
diff --git a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/test/MockedBookKeeperTestCase.java b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/test/MockedBookKeeperTestCase.java
new file mode 100644
index 0000000..c6df6d5
--- /dev/null
+++ b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/test/MockedBookKeeperTestCase.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator.test;
+
+import org.apache.bookkeeper.client.PulsarMockBookKeeper;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
+import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.MockZooKeeper;
+import org.apache.zookeeper.ZooDefs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+
+import java.lang.reflect.Method;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * A class runs several bookie servers for testing.
+ */
+public abstract class MockedBookKeeperTestCase {
+
+    static final Logger LOG = LoggerFactory.getLogger(MockedBookKeeperTestCase.class);
+
+    // ZooKeeper related variables
+    protected MockZooKeeper zkc;
+
+    // BookKeeper related variables
+    protected PulsarMockBookKeeper bkc;
+    protected int numBookies;
+
+    protected ManagedLedgerFactoryImpl factory;
+
+    protected ClientConfiguration baseClientConf = new ClientConfiguration();
+
+    protected OrderedScheduler executor;
+    protected ExecutorService cachedExecutor;
+
+    public MockedBookKeeperTestCase() {
+        // By default start a 3 bookies cluster
+        this(3);
+    }
+
+    public MockedBookKeeperTestCase(int numBookies) {
+        this.numBookies = numBookies;
+    }
+
+    @BeforeMethod
+    public void setUp(Method method) throws Exception {
+        LOG.info(">>>>>> starting {}", method);
+        try {
+            // start bookkeeper service
+            startBookKeeper();
+        } catch (Exception e) {
+            LOG.error("Error setting up", e);
+            throw e;
+        }
+
+        ManagedLedgerFactoryConfig conf = new ManagedLedgerFactoryConfig();
+        factory = new ManagedLedgerFactoryImpl(bkc, zkc, conf);
+
+        zkc.create("/managed-ledgers", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void tearDown(Method method) {
+        try {
+            LOG.info("@@@@@@@@@ stopping " + method);
+            factory.shutdown();
+            factory = null;
+            stopBookKeeper();
+            stopZooKeeper();
+            LOG.info("--------- stopped {}", method);
+        } catch (Exception e) {
+            LOG.error("tearDown Error", e);
+        }
+    }
+
+    @BeforeClass
+    public void setUpClass() {
+        executor = OrderedScheduler.newSchedulerBuilder().numThreads(2).name("test").build();
+        cachedExecutor = Executors.newCachedThreadPool();
+    }
+
+    @AfterClass
+    public void tearDownClass() {
+        executor.shutdown();
+        cachedExecutor.shutdown();
+    }
+
+    /**
+     * Start cluster
+     *
+     * @throws Exception
+     */
+    protected void startBookKeeper() throws Exception {
+        zkc = MockZooKeeper.newInstance();
+        for (int i = 0; i < numBookies; i++) {
+            ZkUtils.createFullPathOptimistic(zkc, "/ledgers/available/192.168.1.1:" + (5000 + i), "".getBytes(), null,
+                    null);
+        }
+
+        zkc.create("/ledgers/LAYOUT", "1\nflat:1".getBytes(), null, null);
+
+        bkc = new PulsarMockBookKeeper(zkc, executor.chooseThread(this));
+    }
+
+    protected void stopBookKeeper() throws Exception {
+        bkc.shutdown();
+    }
+
+    protected void stopZooKeeper() throws Exception {
+        zkc.shutdown();
+    }
+
+}
\ No newline at end of file