You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/08/10 10:37:45 UTC

[pulsar] branch master updated: [Transaction] Produce transaction messages and commit (#7552)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ba5c5c7  [Transaction] Produce transaction messages and commit (#7552)
ba5c5c7 is described below

commit ba5c5c72aad9bb3020ceaec6465077b5be3cd7a6
Author: ran <ga...@126.com>
AuthorDate: Mon Aug 10 18:37:27 2020 +0800

    [Transaction] Produce transaction messages and commit (#7552)
    
    ### Motivation
    
    Currently, the transaction components are all independent, the relationship between transaction client and transaction server needs to be established.
    
    The target of this PR is making the Pulsar client could send transaction messages to the Pulsar broker and execute commit command.
---
 .../apache/pulsar/broker/ServiceConfiguration.java |   7 +
 .../org/apache/pulsar/broker/PulsarService.java    |  26 +-
 .../broker/TransactionMetadataStoreService.java    |  76 +++++-
 .../pulsar/broker/service/AbstractTopic.java       |   5 +
 .../org/apache/pulsar/broker/service/Producer.java |  10 +
 .../apache/pulsar/broker/service/ServerCnx.java    | 125 +++++++---
 .../org/apache/pulsar/broker/service/Topic.java    |  31 +++
 .../service/nonpersistent/NonPersistentTopic.java  |  19 ++
 .../broker/service/persistent/PersistentTopic.java |  74 ++++++
 .../transaction/buffer/TransactionBuffer.java      |  18 +-
 .../buffer/TransactionBufferProvider.java          |  16 +-
 .../broker/transaction/buffer/TransactionMeta.java |   8 +-
 .../TransactionBufferProviderException.java}       |  17 +-
 .../exceptions/UnsupportedTxnActionException.java  |  30 +--
 .../buffer/impl/InMemTransactionBuffer.java        |  21 +-
 .../impl/InMemTransactionBufferProvider.java       |   7 +
 .../buffer/impl/PersistentTransactionBuffer.java   | 152 ++++++++++--
 .../impl/PersistentTransactionBufferProvider.java  |  85 +++++++
 .../buffer/impl/TransactionBufferClientImpl.java   |   1 -
 .../buffer/impl/TransactionBufferHandlerImpl.java  |   1 +
 .../buffer/impl/TransactionMetaImpl.java           |  28 ++-
 .../transaction/PulsarClientTransactionTest.java   | 258 +++++++++++++++++++
 .../broker/transaction/TransactionTestBase.java    | 274 +++++++++++++++++++++
 .../buffer/PersistentTransactionBufferTest.java    |  83 +++++--
 .../buffer/TransactionBufferClientTest.java        |  25 ++
 .../TransactionCoordinatorClientTest.java          |  30 +++
 .../coordinator/TransactionMetaStoreTestBase.java  |   9 +-
 .../apache/pulsar/client/api/ClientBuilder.java    |   8 +
 .../pulsar/client/api/transaction/TxnID.java       |  18 ++
 .../pulsar/client/impl/ClientBuilderImpl.java      |   6 +
 .../org/apache/pulsar/client/impl/ClientCnx.java   |   1 +
 .../client/impl/PartitionedProducerImpl.java       |   5 +-
 .../apache/pulsar/client/impl/ProducerBase.java    |   6 +-
 .../apache/pulsar/client/impl/ProducerImpl.java    |   7 +-
 .../pulsar/client/impl/PulsarClientImpl.java       |  19 +-
 .../client/impl/TransactionMetaStoreHandler.java   |   3 +-
 .../client/impl/TypedMessageBuilderImpl.java       |   6 +-
 .../client/impl/conf/ClientConfigurationData.java  |   5 +-
 .../impl/transaction/TransactionBuilderImpl.java   |  32 ++-
 .../client/impl/transaction/TransactionImpl.java   |  16 +-
 .../apache/pulsar/common/api/proto/PulsarApi.java  |  30 +--
 .../pulsar/common/api/proto/PulsarMarkers.java     |  15 +-
 .../apache/pulsar/common/protocol/Commands.java    |  38 ++-
 .../org/apache/pulsar/common/protocol/Markers.java |   6 +
 pulsar-common/src/main/proto/PulsarApi.proto       |   4 +-
 pulsar-common/src/main/proto/PulsarMarkers.proto   |   5 +-
 46 files changed, 1471 insertions(+), 195 deletions(-)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index eb2d46a..53cab38 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1704,6 +1704,13 @@ public class ServiceConfiguration implements PulsarConfiguration {
     private String transactionMetadataStoreProviderClassName =
             "org.apache.pulsar.transaction.coordinator.impl.InMemTransactionMetadataStoreProvider";
 
+    @FieldContext(
+            category = CATEGORY_TRANSACTION,
+            doc = "Class name for transaction buffer provider"
+    )
+    private String transactionBufferProviderClassName =
+            "org.apache.pulsar.broker.transaction.buffer.impl.PersistentTransactionBufferProvider";
+
     /**** --- KeyStore TLS config variables --- ****/
     @FieldContext(
             category = CATEGORY_KEYSTORE_TLS,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index ef598b7..0a72063 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -95,12 +95,17 @@ import org.apache.pulsar.broker.service.TopicPoliciesService;
 import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
 import org.apache.pulsar.broker.stats.MetricsGenerator;
 import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet;
+import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider;
+import org.apache.pulsar.broker.transaction.buffer.impl.PersistentTransactionBufferProvider;
+import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientImpl;
 import org.apache.pulsar.broker.validator.MultipleListenerValidator;
 import org.apache.pulsar.broker.web.WebService;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminBuilder;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.transaction.TransactionBufferClient;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.common.conf.InternalConfigurationData;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.configuration.VipStatus;
@@ -203,7 +208,11 @@ public class PulsarService implements AutoCloseable {
     private final ShutdownService shutdownService;
 
     private MetricsGenerator metricsGenerator;
+
     private TransactionMetadataStoreService transactionMetadataStoreService;
+    private TransactionBufferProvider transactionBufferProvider;
+    private TransactionBufferClient transactionBufferClient;
+
     private BrokerInterceptor brokerInterceptor;
 
     public enum State {
@@ -546,9 +555,16 @@ public class PulsarService implements AutoCloseable {
 
             // Register pulsar system namespaces and start transaction meta store service
             if (config.isTransactionCoordinatorEnabled()) {
+                transactionBufferClient = TransactionBufferClientImpl.create(
+                        getNamespaceService(), ((PulsarClientImpl) getClient()).getCnxPool());
+
                 transactionMetadataStoreService = new TransactionMetadataStoreService(TransactionMetadataStoreProvider
-                        .newProvider(config.getTransactionMetadataStoreProviderClassName()), this);
+                        .newProvider(config.getTransactionMetadataStoreProviderClassName()), this,
+                        transactionBufferClient);
                 transactionMetadataStoreService.start();
+
+                transactionBufferProvider = TransactionBufferProvider
+                        .newProvider(config.getTransactionBufferProviderClassName());
             }
 
             this.metricsGenerator = new MetricsGenerator(this);
@@ -1081,6 +1097,14 @@ public class PulsarService implements AutoCloseable {
         return transactionMetadataStoreService;
     }
 
+    public TransactionBufferProvider getTransactionBufferProvider() {
+        return transactionBufferProvider;
+    }
+
+    public TransactionBufferClient getTransactionBufferClient() {
+        return transactionBufferClient;
+    }
+
     public ShutdownService getShutdownService() {
         return shutdownService;
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
index bce94f2..859bcb9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
@@ -20,7 +20,10 @@ package org.apache.pulsar.broker;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener;
+import org.apache.pulsar.broker.transaction.buffer.exceptions.UnsupportedTxnActionException;
+import org.apache.pulsar.client.api.transaction.TransactionBufferClient;
 import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
@@ -34,6 +37,7 @@ import org.apache.pulsar.transaction.impl.common.TxnStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -47,12 +51,14 @@ public class TransactionMetadataStoreService {
     private final Map<TransactionCoordinatorID, TransactionMetadataStore> stores;
     private final TransactionMetadataStoreProvider transactionMetadataStoreProvider;
     private final PulsarService pulsarService;
+    private final TransactionBufferClient tbClient;
 
     public TransactionMetadataStoreService(TransactionMetadataStoreProvider transactionMetadataStoreProvider,
-                                           PulsarService pulsarService) {
+                                           PulsarService pulsarService, TransactionBufferClient tbClient) {
         this.pulsarService = pulsarService;
         this.stores = new ConcurrentHashMap<>();
         this.transactionMetadataStoreProvider = transactionMetadataStoreProvider;
+        this.tbClient = tbClient;
     }
 
     public void start() {
@@ -169,6 +175,74 @@ public class TransactionMetadataStoreService {
         return store.updateTxnStatus(txnId, newStatus, expectedStatus);
     }
 
+    public CompletableFuture<Void> endTransaction(TxnID txnID, int txnAction) {
+        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+        TxnStatus newStatus;
+        switch (txnAction) {
+            case PulsarApi.TxnAction.COMMIT_VALUE:
+                newStatus = TxnStatus.COMMITTING;
+                break;
+            case PulsarApi.TxnAction.ABORT_VALUE:
+                newStatus = TxnStatus.ABORTING;
+                break;
+            default:
+                UnsupportedTxnActionException exception =
+                        new UnsupportedTxnActionException(txnID, txnAction);
+                LOG.error(exception.getMessage());
+                completableFuture.completeExceptionally(exception);
+                return completableFuture;
+        }
+
+        completableFuture = updateTxnStatus(txnID, newStatus, TxnStatus.OPEN)
+                .thenCompose(ignored -> endToTB(txnID, newStatus));
+        if (TxnStatus.COMMITTING.equals(newStatus)) {
+            completableFuture = completableFuture
+                    .thenCompose(ignored -> updateTxnStatus(txnID, TxnStatus.COMMITTED, TxnStatus.COMMITTING));
+        } else if (TxnStatus.ABORTING.equals(newStatus)) {
+            completableFuture = completableFuture
+                    .thenCompose(ignored -> updateTxnStatus(txnID, TxnStatus.ABORTED, TxnStatus.ABORTING));
+        }
+        return completableFuture;
+    }
+
+    private CompletableFuture<Void> endToTB(TxnID txnID, TxnStatus newStatus) {
+        CompletableFuture<Void> resultFuture = new CompletableFuture<>();
+        List<CompletableFuture<TxnID>> commitFutureList = new ArrayList<>();
+        this.getTxnMeta(txnID).whenComplete((txnMeta, throwable) -> {
+            if (throwable != null) {
+                resultFuture.completeExceptionally(throwable);
+                return;
+            }
+            txnMeta.producedPartitions().forEach(partition -> {
+                CompletableFuture<TxnID> commitFuture = new CompletableFuture<>();
+                if (TxnStatus.COMMITTING.equals(newStatus)) {
+                    commitFuture = tbClient.commitTxnOnTopic(partition, txnID.getMostSigBits(), txnID.getLeastSigBits());
+                    // TODO commitTxnOnSubscription
+                } else if (TxnStatus.ABORTING.equals(newStatus)) {
+                    // TODO abortTxnOnTopic
+                    // TODO abortTxnOnSubscription
+                    commitFuture.completeExceptionally(new Throwable("Unsupported operation."));
+                } else {
+                    // Unsupported txnStatus
+                    commitFuture.completeExceptionally(new Throwable("Unsupported txnStatus."));
+                }
+                commitFutureList.add(commitFuture);
+            });
+            try {
+                FutureUtil.waitForAll(commitFutureList).whenComplete((ignored, waitThrowable) -> {
+                    if (waitThrowable != null) {
+                        resultFuture.completeExceptionally(waitThrowable);
+                        return;
+                    }
+                    resultFuture.complete(null);
+                });
+            } catch (Exception e) {
+                resultFuture.completeExceptionally(e);
+            }
+        });
+        return resultFuture;
+    }
+
     private TransactionCoordinatorID getTcIdFromTxnId(TxnID txnId) {
         return new TransactionCoordinatorID(txnId.getMostSigBits());
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 09a0521..02602eb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -29,12 +29,14 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.LongAdder;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.bookkeeper.mledger.util.StatsBuckets;
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
 import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
 import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
+import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
 import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
@@ -94,6 +96,9 @@ public abstract class AbstractTopic implements Topic {
     private LongAdder bytesInCounter = new LongAdder();
     private LongAdder msgInCounter = new LongAdder();
 
+    protected CompletableFuture<TransactionBuffer> transactionBuffer;
+    protected ReentrantLock transactionBufferLock = new ReentrantLock();
+
     public AbstractTopic(String topic, BrokerService brokerService) {
         this.topic = topic;
         this.brokerService = brokerService;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
index b19c4b8..771cd21 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
@@ -35,12 +35,14 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.broker.service.BrokerServiceException.TopicClosedException;
 import org.apache.pulsar.broker.service.BrokerServiceException.TopicTerminatedException;
 import org.apache.pulsar.broker.service.Topic.PublishContext;
 import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
 import org.apache.pulsar.common.api.proto.PulsarApi.ServerError;
@@ -612,6 +614,14 @@ public class Producer {
         }
     }
 
+    public void publishTxnMessage(TxnID txnID, long producerId, long sequenceId, long highSequenceId,
+                                  ByteBuf headersAndPayload, long batchSize, boolean isChunked) {
+        beforePublish(producerId, sequenceId, headersAndPayload, batchSize);
+        topic.publishTxnMessage(txnID, headersAndPayload,
+                MessagePublishContext.get(this, sequenceId, highSequenceId, msgIn,
+                        headersAndPayload.readableBytes(), batchSize, isChunked, System.nanoTime()));
+    }
+
     public SchemaVersion getSchemaVersion() {
         return schemaVersion;
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 6dc384f..15b3c97 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -22,8 +22,8 @@ import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.getPartitionedTopicMetadata;
 import static org.apache.pulsar.broker.lookup.TopicLookupBase.lookupTopicAsync;
-import static org.apache.pulsar.common.protocol.Commands.newLookupErrorResponse;
 import static org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion.v5;
+import static org.apache.pulsar.common.protocol.Commands.newLookupErrorResponse;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Strings;
@@ -35,7 +35,7 @@ import io.netty.channel.ChannelOption;
 import io.netty.handler.ssl.SslHandler;
 
 import java.net.SocketAddress;
-
+import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Set;
@@ -69,8 +69,8 @@ import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotRea
 import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionNotFoundException;
 import org.apache.pulsar.broker.service.BrokerServiceException.TopicNotFoundException;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
-import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
 import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
+import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.transaction.TxnID;
@@ -78,11 +78,6 @@ import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.client.impl.ClientCnx;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.common.api.AuthData;
-import org.apache.pulsar.common.api.proto.PulsarApi.CommandNewTxn;
-import org.apache.pulsar.common.policies.data.TopicOperation;
-import org.apache.pulsar.common.protocol.CommandUtils;
-import org.apache.pulsar.common.protocol.Commands;
-import org.apache.pulsar.common.protocol.PulsarHandler;
 import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthResponse;
@@ -93,10 +88,11 @@ import org.apache.pulsar.common.api.proto.PulsarApi.CommandConsumerStats;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandConsumerStatsResponse;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandFlow;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId;
-import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetOrCreateSchema;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopic;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandNewTxn;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandPartitionedTopicMetadata;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandProducer;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandRedeliverUnacknowledgedMessages;
@@ -116,6 +112,10 @@ import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.ConsumerStats;
+import org.apache.pulsar.common.policies.data.TopicOperation;
+import org.apache.pulsar.common.protocol.CommandUtils;
+import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.protocol.PulsarHandler;
 import org.apache.pulsar.common.protocol.schema.SchemaData;
 import org.apache.pulsar.common.protocol.schema.SchemaInfoUtil;
 import org.apache.pulsar.common.protocol.schema.SchemaVersion;
@@ -124,7 +124,6 @@ import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
 import org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite;
 import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
-import org.apache.pulsar.transaction.impl.common.TxnStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -1212,6 +1211,13 @@ public class ServerCnx extends PulsarHandler {
 
         startSendOperation(producer, headersAndPayload.readableBytes(), send.getNumMessages());
 
+        if (send.hasTxnidMostBits() && send.hasTxnidLeastBits()) {
+            TxnID txnID = new TxnID(send.getTxnidMostBits(), send.getTxnidLeastBits());
+            producer.publishTxnMessage(txnID, producer.getProducerId(), send.getSequenceId(),
+                    send.getHighestSequenceId(), headersAndPayload, send.getNumMessages(), send.getIsChunk());
+            return;
+        }
+
         // Persist the message
         if (send.hasHighestSequenceId() && send.getSequenceId() <= send.getHighestSequenceId()) {
             producer.publishMessage(send.getProducerId(), send.getSequenceId(), send.getHighestSequenceId(),
@@ -1650,7 +1656,7 @@ public class ServerCnx extends PulsarHandler {
 
     @Override
     protected void handleAddPartitionToTxn(PulsarApi.CommandAddPartitionToTxn command) {
-        TxnID txnID = new TxnID(command.getTxnidMostBits(), command.getTxnidLeastBits());
+            TxnID txnID = new TxnID(command.getTxnidMostBits(), command.getTxnidLeastBits());
         if (log.isDebugEnabled()) {
             log.debug("Receive add published partition to txn request {} from {} with txnId {}", command.getRequestId(), remoteAddress, txnID);
         }
@@ -1674,40 +1680,47 @@ public class ServerCnx extends PulsarHandler {
 
     @Override
     protected void handleEndTxn(PulsarApi.CommandEndTxn command) {
-        TxnStatus newStatus = null;
-        switch (command.getTxnAction()) {
-            case COMMIT:
-                newStatus = TxnStatus.COMMITTING;
-                break;
-            case ABORT:
-                newStatus = TxnStatus.ABORTING;
-                break;
-        }
+        final long requestId = command.getRequestId();
+        final int txnAction = command.getTxnAction().getNumber();
         TxnID txnID = new TxnID(command.getTxnidMostBits(), command.getTxnidLeastBits());
-        if (log.isDebugEnabled()) {
-            log.debug("Receive end txn by {} request {} from {} with txnId {}", newStatus, command.getRequestId(), remoteAddress, txnID);
-        }
-        service.pulsar().getTransactionMetadataStoreService().updateTxnStatus(txnID, newStatus, TxnStatus.OPEN)
-            .whenComplete((v, ex) -> {
-                if (ex == null) {
-                    if (log.isDebugEnabled()) {
-                        log.debug("Send response success for end txn request {}", command.getRequestId());
-                    }
-                    ctx.writeAndFlush(Commands.newEndTxnResponse(command.getRequestId(),
-                            txnID.getLeastSigBits(), txnID.getMostSigBits()));
-                } else {
-                    if (log.isDebugEnabled()) {
-                        log.debug("Send response error for end txn request {}", command.getRequestId());
-                    }
-                    ctx.writeAndFlush(Commands.newEndTxnResponse(command.getRequestId(), txnID.getMostSigBits(),
-                            BrokerServiceException.getClientErrorCode(ex), ex.getMessage()));
-                }
-            });
+
+        service.pulsar().getTransactionMetadataStoreService().endTransaction(txnID, txnAction)
+            .thenRun(() -> {
+                ctx.writeAndFlush(Commands.newEndTxnResponse(requestId,
+                        txnID.getLeastSigBits(), txnID.getMostSigBits()));
+            }).exceptionally(throwable -> {
+                log.error("Send response error for end txn request.", throwable);
+                ctx.writeAndFlush(Commands.newEndTxnResponse(command.getRequestId(), txnID.getMostSigBits(),
+                        BrokerServiceException.getClientErrorCode(throwable), throwable.getMessage()));
+                return null;
+        });
     }
 
     @Override
     protected void handleEndTxnOnPartition(PulsarApi.CommandEndTxnOnPartition command) {
-        ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(command.getRequestId(), command.getTxnidLeastBits(), command.getTxnidMostBits()));
+        final long requestId = command.getRequestId();
+        final int txnAction = command.getTxnAction().getNumber();
+        TxnID txnID = new TxnID(command.getTxnidMostBits(), command.getTxnidLeastBits());
+
+        service.getTopics().get(command.getTopic()).whenComplete((topic, t) -> {
+            if (!topic.isPresent()) {
+                ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(
+                        command.getRequestId(), ServerError.TopicNotFound,
+                        "Topic " + command.getTopic() + " is not found."));
+                return;
+            }
+            topic.get().endTxn(txnID, txnAction)
+                .whenComplete((ignored, throwable) -> {
+                    if (throwable != null) {
+                        log.error("Handle endTxnOnPartition {} failed.", command.getTopic(), throwable);
+                        ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(
+                                requestId, ServerError.UnknownError, throwable.getMessage()));
+                        return;
+                    }
+                    ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(requestId,
+                            txnID.getLeastSigBits(), txnID.getMostSigBits()));
+                });
+        });
     }
 
     @Override
@@ -1735,6 +1748,38 @@ public class ServerCnx extends PulsarHandler {
     }
 
     @Override
+    protected void handleAddSubscriptionToTxn(PulsarApi.CommandAddSubscriptionToTxn command) {
+        TxnID txnID = new TxnID(command.getTxnidMostBits(), command.getTxnidLeastBits());
+        if (log.isDebugEnabled()) {
+            log.debug("Receive add published partition to txn request {} from {} with txnId {}",
+                    command.getRequestId(), remoteAddress, txnID);
+        }
+        List<String> subscriptionList = command.getSubscriptionList().stream()
+                .map(subscription -> subscription.getTopic() + "|" + subscription.getSubscription())
+                .collect(Collectors.toList());
+        service.pulsar().getTransactionMetadataStoreService().addAckedPartitionToTxn(txnID, subscriptionList)
+                .whenComplete(((v, ex) -> {
+                    if (ex == null) {
+                        if (log.isDebugEnabled()) {
+                            log.debug("Send response success for add published partition to txn request {}",
+                                    command.getRequestId());
+                        }
+                        ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(command.getRequestId(),
+                                txnID.getLeastSigBits(), txnID.getMostSigBits()));
+                        log.info("handle add partition to txn finish.");
+                    } else {
+                        if (log.isDebugEnabled()) {
+                            log.debug("Send response error for add published partition to txn request {}",
+                                    command.getRequestId(), ex);
+                        }
+                        ctx.writeAndFlush(Commands.newAddPartitionToTxnResponse(command.getRequestId(),
+                                txnID.getMostSigBits(), BrokerServiceException.getClientErrorCode(ex),
+                                ex.getMessage()));
+                    }
+                }));
+    }
+
+    @Override
     protected boolean isHandshakeCompleted() {
         return state == State.Connected;
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index 4d14326..505e6a5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -27,7 +27,9 @@ import org.apache.bookkeeper.mledger.Position;
 import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
 import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
 import org.apache.pulsar.broker.stats.NamespaceStats;
+import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
@@ -221,4 +223,33 @@ public interface Topic {
     default boolean isSystemTopic() {
         return false;
     }
+
+    /* ------ Transaction related ------ */
+
+    /**
+     * Get the ${@link TransactionBuffer} of this Topic.
+     *
+     * @param createIfMissing Create the TransactionBuffer if missing.
+     * @return TransactionBuffer CompletableFuture
+     */
+    CompletableFuture<TransactionBuffer> getTransactionBuffer(boolean createIfMissing);
+
+    /**
+     * Publish Transaction message to this Topic's TransactionBuffer
+     *
+     * @param txnID Transaction Id
+     * @param headersAndPayload Message data
+     * @param publishContext Publish context
+     */
+    void publishTxnMessage(TxnID txnID, ByteBuf headersAndPayload, PublishContext publishContext);
+
+    /**
+     * End the transaction in this topic.
+     *
+     * @param txnID Transaction id
+     * @param txnAction Transaction action.
+     * @return
+     */
+    CompletableFuture<Void> endTxn(TxnID txnID, int txnAction);
+
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 5a4d7cd..0e02deb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -62,7 +62,9 @@ import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
 import org.apache.pulsar.broker.stats.NamespaceStats;
+import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
@@ -966,4 +968,21 @@ public class NonPersistentTopic extends AbstractTopic implements Topic {
             }
         });
     }
+
+    @Override
+    public CompletableFuture<TransactionBuffer> getTransactionBuffer(boolean createIfMissing) {
+        return FutureUtil.failedFuture(
+                new Exception("Unsupported operation getTransactionBuffer in non-persistent topic."));
+    }
+
+    @Override
+    public void publishTxnMessage(TxnID txnID, ByteBuf headersAndPayload, PublishContext publishContext) {
+        throw new UnsupportedOperationException("PublishTxnMessage is not supported by non-persistent topic");
+    }
+
+    @Override
+    public CompletableFuture<Void> endTxn(TxnID txnID, int txnAction) {
+        return FutureUtil.failedFuture(
+                new Exception("Unsupported operation endTxn in non-persistent topic."));
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index e6120f6..625693c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -91,9 +91,11 @@ import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type;
 import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
 import org.apache.pulsar.broker.stats.NamespaceStats;
 import org.apache.pulsar.broker.stats.ReplicationMetrics;
+import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
 import org.apache.pulsar.client.admin.LongRunningProcessStatus;
 import org.apache.pulsar.client.admin.OffloadProcessStatus;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.MessageImpl;
@@ -2221,4 +2223,76 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         isFenced = false;
         isClosingOrDeleting = false;
     }
+
+    @Override
+    public CompletableFuture<TransactionBuffer> getTransactionBuffer(boolean createIfMissing) {
+        if (transactionBuffer == null && createIfMissing) {
+            transactionBufferLock.lock();
+            try {
+                if (transactionBuffer == null) {
+                    transactionBuffer = brokerService.getPulsar().getTransactionBufferProvider()
+                            .newTransactionBuffer(this);
+                }
+            } finally {
+                transactionBufferLock.unlock();
+            }
+        }
+        return transactionBuffer;
+    }
+
+    @Override
+    public void publishTxnMessage(TxnID txnID, ByteBuf headersAndPayload, PublishContext publishContext) {
+        pendingWriteOps.incrementAndGet();
+        if (isFenced) {
+            publishContext.completed(new TopicFencedException("fenced"), -1, -1);
+            decrementPendingWriteOpsAndCheck();
+            return;
+        }
+
+        MessageDeduplication.MessageDupStatus status = messageDeduplication.isDuplicate(publishContext, headersAndPayload);
+        switch (status) {
+            case NotDup:
+                getTransactionBuffer(true)
+                        .thenCompose(txnBuffer -> txnBuffer.appendBufferToTxn(
+                                txnID, publishContext.getSequenceId(), headersAndPayload))
+                        .thenAccept(position -> {
+                            decrementPendingWriteOpsAndCheck();
+                            publishContext.completed(null,
+                                    ((PositionImpl)position).getLedgerId(), ((PositionImpl)position).getEntryId());
+                        })
+                        .exceptionally(throwable -> {
+                            decrementPendingWriteOpsAndCheck();
+                            publishContext.completed(new Exception(throwable), -1, -1);
+                            return null;
+                        });
+                break;
+            case Dup:
+                // Immediately acknowledge duplicated message
+                publishContext.completed(null, -1, -1);
+                decrementPendingWriteOpsAndCheck();
+                break;
+            default:
+                publishContext.completed(new MessageDeduplication.MessageDupUnknownException(), -1, -1);
+                decrementPendingWriteOpsAndCheck();
+
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> endTxn(TxnID txnID, int txnAction) {
+        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+        getTransactionBuffer(false).thenAccept(tb -> {
+            tb.endTxnOnPartition(txnID, txnAction).whenComplete((ignored, throwable) -> {
+                if (throwable != null) {
+                    completableFuture.completeExceptionally(throwable);
+                    return;
+                }
+                completableFuture.complete(null);
+            });
+        }).exceptionally(tbThrowable -> {
+            completableFuture.completeExceptionally(tbThrowable);
+            return null;
+        });
+        return completableFuture;
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java
index d88eff6..6db93f2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java
@@ -23,6 +23,7 @@ import io.netty.buffer.ByteBuf;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 
+import org.apache.bookkeeper.mledger.Position;
 import org.apache.pulsar.client.api.transaction.TxnID;
 
 /**
@@ -70,7 +71,7 @@ public interface TransactionBuffer {
      * @throws org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionSealedException if the transaction
      *         has been sealed.
      */
-    CompletableFuture<Void> appendBufferToTxn(TxnID txnId, long sequenceId, ByteBuf buffer);
+    CompletableFuture<Position> appendBufferToTxn(TxnID txnId, long sequenceId, ByteBuf buffer);
 
     /**
      * Open a {@link TransactionBufferReader} to read entries of a given transaction
@@ -85,6 +86,21 @@ public interface TransactionBuffer {
     CompletableFuture<TransactionBufferReader> openTransactionBufferReader(TxnID txnID, long startSequenceId);
 
     /**
+     * Handle TC endTxnOnPartition command
+     *
+     * @return
+     */
+    CompletableFuture<Void> endTxnOnPartition(TxnID txnID, int txnAction);
+
+    /**
+     * Append committed marker to the related origin topic partition.
+     *
+     * @param txnID transaction id
+     * @return a future represents the position of the committed marker in the origin topic partition.
+     */
+    CompletableFuture<Position> commitPartitionTopic(TxnID txnID);
+
+    /**
      * Commit the transaction and seal the buffer for this transaction.
      *
      * <p>If a transaction is sealed, no more entries can be {@link #appendBufferToTxn(TxnID, long, ByteBuf)}.
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferProvider.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferProvider.java
index 9fc5e9d..d2ce24b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferProvider.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferProvider.java
@@ -18,12 +18,14 @@
  */
 package org.apache.pulsar.broker.transaction.buffer;
 
-import static com.google.common.base.Preconditions.checkArgument;
-
 import com.google.common.annotations.Beta;
+import org.apache.pulsar.broker.service.Topic;
+
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
 
+import static com.google.common.base.Preconditions.checkArgument;
+
 /**
  * A provider that provides {@link TransactionBuffer}.
  */
@@ -59,4 +61,12 @@ public interface TransactionBufferProvider {
      *         if the operation succeeds.
      */
     CompletableFuture<TransactionBuffer> newTransactionBuffer();
-}
\ No newline at end of file
+
+    /**
+     * Open the persistent transaction buffer.
+     *
+     * @param originTopic
+     * @return
+     */
+    CompletableFuture<TransactionBuffer> newTransactionBuffer(Topic originTopic);
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionMeta.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionMeta.java
index b080dc7..33def14 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionMeta.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionMeta.java
@@ -89,7 +89,13 @@ public interface TransactionMeta {
      * @param position the position of transaction log
      * @return
      */
-    CompletableFuture<Void> appendEntry(long sequenceId, Position position);
+    CompletableFuture<Position> appendEntry(long sequenceId, Position position);
+
+    /**
+     * Mark the transaction status is committing.
+     * @return
+     */
+    CompletableFuture<TransactionMeta> committingTxn();
 
     /**
      * Mark the transaction is committed.
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBufferProvider.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/exceptions/TransactionBufferProviderException.java
similarity index 58%
copy from pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBufferProvider.java
copy to pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/exceptions/TransactionBufferProviderException.java
index ed30ed0..e7a7a62 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBufferProvider.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/exceptions/TransactionBufferProviderException.java
@@ -16,18 +16,15 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.broker.transaction.buffer.impl;
-
-import java.util.concurrent.CompletableFuture;
-import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
-import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider;
+package org.apache.pulsar.broker.transaction.buffer.exceptions;
 
 /**
- * A provider that provides in-memory implementations of {@link TransactionBuffer}.
+ * Transaction buffer provider exception.
  */
-public class InMemTransactionBufferProvider implements TransactionBufferProvider {
-    @Override
-    public CompletableFuture<TransactionBuffer> newTransactionBuffer() {
-        return CompletableFuture.completedFuture(new InMemTransactionBuffer());
+public class TransactionBufferProviderException extends TransactionBufferException {
+
+    public TransactionBufferProviderException(String message) {
+        super(message);
     }
+
 }
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TxnID.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/exceptions/UnsupportedTxnActionException.java
similarity index 61%
copy from pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TxnID.java
copy to pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/exceptions/UnsupportedTxnActionException.java
index d49132b..f1d95b0 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TxnID.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/exceptions/UnsupportedTxnActionException.java
@@ -16,35 +16,19 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.client.api.transaction;
+package org.apache.pulsar.broker.transaction.buffer.exceptions;
 
-import java.io.Serializable;
-import lombok.Data;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.common.api.proto.PulsarApi;
 
 /**
- * An identifier for representing a transaction.
+ * Exceptions are thrown when txnAction is unsupported.
  */
-@Data
-public class TxnID implements Serializable {
+public class UnsupportedTxnActionException extends TransactionBufferException {
 
     private static final long serialVersionUID = 0L;
 
-    /*
-     * The most significant 64 bits of this TxnID.
-     *
-     * @serial
-     */
-    private final long mostSigBits;
-
-    /*
-     * The least significant 64 bits of this TxnID.
-     *
-     * @serial
-     */
-    private final long leastSigBits;
-
-    @Override
-    public String toString() {
-        return "(" + mostSigBits + "," + leastSigBits + ")";
+    public UnsupportedTxnActionException(TxnID txnId, int txnAction) {
+        super("Transaction `" + txnId + "` receive unsupported txnAction " + PulsarApi.TxnAction.valueOf(txnAction));
     }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
index 174f5c0..d110aa8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
@@ -102,11 +102,17 @@ class InMemTransactionBuffer implements TransactionBuffer {
         }
 
         @Override
-        public CompletableFuture<Void> appendEntry(long sequenceId, Position position) {
+        public CompletableFuture<Position> appendEntry(long sequenceId, Position position) {
             return FutureUtil.failedFuture(new UnsupportedOperationException());
         }
 
         @Override
+        public CompletableFuture<TransactionMeta> committingTxn() {
+            status = TxnStatus.COMMITTING;
+            return CompletableFuture.completedFuture(null);
+        }
+
+        @Override
         public CompletableFuture<TransactionMeta> commitTxn(long committedAtLedgerId, long committedAtEntryId) {
             try {
                 return CompletableFuture.completedFuture(commitAt(committedAtLedgerId, committedAtEntryId));
@@ -238,7 +244,7 @@ class InMemTransactionBuffer implements TransactionBuffer {
     }
 
     @Override
-    public CompletableFuture<Void> appendBufferToTxn(TxnID txnId,
+    public CompletableFuture<Position> appendBufferToTxn(TxnID txnId,
                                                      long sequenceId,
                                                      ByteBuf buffer) {
         TxnBuffer txnBuffer = getTxnBufferOrCreateIfNotExist(txnId);
@@ -268,6 +274,17 @@ class InMemTransactionBuffer implements TransactionBuffer {
     }
 
     @Override
+    public CompletableFuture<Void> endTxnOnPartition(TxnID txnID, int txnAction) {
+        return FutureUtil.failedFuture(
+                new Exception("Unsupported operation endTxnOnPartition in InMemTransactionBuffer."));
+    }
+
+    @Override
+    public CompletableFuture<Position> commitPartitionTopic(TxnID txnID) {
+        return null;
+    }
+
+    @Override
     public CompletableFuture<Void> commitTxn(TxnID txnID,
                                              long committedAtLedgerId,
                                              long committedAtEntryId) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBufferProvider.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBufferProvider.java
index ed30ed0..e5d2a50 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBufferProvider.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBufferProvider.java
@@ -19,6 +19,8 @@
 package org.apache.pulsar.broker.transaction.buffer.impl;
 
 import java.util.concurrent.CompletableFuture;
+
+import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
 import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider;
 
@@ -30,4 +32,9 @@ public class InMemTransactionBufferProvider implements TransactionBufferProvider
     public CompletableFuture<TransactionBuffer> newTransactionBuffer() {
         return CompletableFuture.completedFuture(new InMemTransactionBuffer());
     }
+
+    @Override
+    public CompletableFuture<TransactionBuffer> newTransactionBuffer(Topic originTopic) {
+        return null;
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/PersistentTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/PersistentTransactionBuffer.java
index 30af711..30b7d77 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/PersistentTransactionBuffer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/PersistentTransactionBuffer.java
@@ -18,12 +18,8 @@
  */
 package org.apache.pulsar.broker.transaction.buffer.impl;
 
+import com.google.common.collect.Queues;
 import io.netty.buffer.ByteBuf;
-import java.util.List;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.concurrent.CompletableFuture;
-import java.util.stream.Collectors;
 import lombok.Builder;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
@@ -34,16 +30,28 @@ import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.BrokerServiceException;
+import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
-import org.apache.pulsar.common.api.proto.PulsarMarkers.MessageIdData;
-import org.apache.pulsar.common.protocol.Markers;
-import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
 import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader;
 import org.apache.pulsar.broker.transaction.buffer.TransactionCursor;
 import org.apache.pulsar.broker.transaction.buffer.TransactionMeta;
 import org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionNotSealedException;
+import org.apache.pulsar.broker.transaction.buffer.exceptions.UnexpectedTxnStatusException;
 import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.common.api.proto.PulsarApi;
+import org.apache.pulsar.common.api.proto.PulsarMarkers.MessageIdData;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.protocol.Markers;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.transaction.impl.common.TxnStatus;
+
+import java.util.List;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.stream.Collectors;
 
 /**
  * A persistent transaction buffer implementation.
@@ -51,9 +59,12 @@ import org.apache.pulsar.client.api.transaction.TxnID;
 @Slf4j
 public class PersistentTransactionBuffer extends PersistentTopic implements TransactionBuffer {
 
+    private final static String TB_TOPIC_NAME_SUFFIX = "/_txnlog";
     private TransactionCursor txnCursor;
     private ManagedCursor retentionCursor;
-
+    private Topic originTopic;
+    private ConcurrentLinkedQueue<TxnID> pendingCommitTxn;
+    private volatile boolean pendingCommitHandling;
 
     abstract static class TxnCtx implements PublishContext {
         private final long sequenceId;
@@ -66,8 +77,6 @@ public class PersistentTransactionBuffer extends PersistentTopic implements Tran
             this.producerName = producerName;
         }
 
-
-
         @Override
         public String getProducerName() {
             return this.producerName;
@@ -79,12 +88,19 @@ public class PersistentTransactionBuffer extends PersistentTopic implements Tran
         }
     }
 
-    public PersistentTransactionBuffer(String topic, ManagedLedger ledger, BrokerService brokerService)
+    public PersistentTransactionBuffer(String topic, ManagedLedger ledger, BrokerService brokerService,
+                                       Topic originTopic)
         throws BrokerServiceException.NamingException, ManagedLedgerException {
         super(topic, ledger, brokerService);
         this.txnCursor = new TransactionCursorImpl();
         this.retentionCursor = ledger.newNonDurableCursor(
             PositionImpl.earliest, "txn-buffer-retention");
+        this.originTopic = originTopic;
+        this.pendingCommitTxn = Queues.newConcurrentLinkedQueue();
+    }
+
+    public static String getTransactionBufferTopicName(String originTopicName) {
+        return TopicName.get(originTopicName) + TB_TOPIC_NAME_SUFFIX;
     }
 
     @Override
@@ -93,12 +109,12 @@ public class PersistentTransactionBuffer extends PersistentTopic implements Tran
     }
 
     @Override
-    public CompletableFuture<Void> appendBufferToTxn(TxnID txnId, long sequenceId, ByteBuf buffer) {
-        return publishMessage(txnId, buffer, sequenceId).thenCompose(position -> appendBuffer(txnId, position,
-                                                                                             sequenceId));
+    public CompletableFuture<Position> appendBufferToTxn(TxnID txnId, long sequenceId, ByteBuf buffer) {
+        return publishMessage(txnId, buffer, sequenceId)
+                .thenCompose(position -> appendBuffer(txnId, position, sequenceId));
     }
 
-    private CompletableFuture<Void> appendBuffer(TxnID txnID, Position position, long sequenceId) {
+    private CompletableFuture<Position> appendBuffer(TxnID txnID, Position position, long sequenceId) {
         return txnCursor.getTxnMeta(txnID, true).thenCompose(meta -> meta.appendEntry(sequenceId, position));
     }
 
@@ -127,6 +143,109 @@ public class PersistentTransactionBuffer extends PersistentTopic implements Tran
     }
 
     @Override
+    public CompletableFuture<Void> endTxn(TxnID txnID, int txnAction) {
+        return FutureUtil.failedFuture(
+                new Exception("Unsupported operation endTxn in PersistentTransactionBuffer."));
+    }
+
+    @Override
+    public CompletableFuture<Void> endTxnOnPartition(TxnID txnID, int txnAction) {
+        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+        if (PulsarApi.TxnAction.COMMIT_VALUE == txnAction) {
+            committingTxn(txnID).whenComplete((ignored, throwable) -> {
+                if (throwable != null) {
+                    completableFuture.completeExceptionally(throwable);
+                    return;
+                }
+                completableFuture.complete(null);
+            });
+        } else if (PulsarApi.TxnAction.ABORT_VALUE == txnAction) {
+            // TODO handle abort operation
+            completableFuture.complete(null);
+        }
+        return completableFuture;
+    }
+
+    private CompletableFuture<Void> committingTxn(TxnID txnID) {
+        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+        txnCursor.getTxnMeta(txnID, false).whenComplete(((meta, txnMetaThrowable) -> {
+
+            if (txnMetaThrowable != null) {
+                completableFuture.completeExceptionally(txnMetaThrowable);
+            }
+
+            if (meta.status().equals(TxnStatus.COMMITTING)) {
+                // the transaction status is committing, so return complete success
+                completableFuture.complete(null);
+                return;
+            } else if (!meta.status().equals(TxnStatus.OPEN)) {
+                // in normal, this condition should not happen
+                completableFuture.completeExceptionally(
+                        new UnexpectedTxnStatusException(txnID, TxnStatus.OPEN, meta.status()));
+            }
+
+            long sequenceId = meta.lastSequenceId() + 1;
+            MessageIdData messageIdData = MessageIdData.newBuilder().setLedgerId(-1L).setEntryId(-1L).build();
+            ByteBuf committingMarker = Markers.newTxnCommittingMarker(
+                    sequenceId, txnID.getMostSigBits(), txnID.getLeastSigBits(), messageIdData);
+            publishMessage(txnID, committingMarker, sequenceId)
+                    .thenCompose(position -> meta.committingTxn()
+                    .thenAccept(committingTxn -> {
+                        pendingCommitTxn.add(txnID);
+                        if (!pendingCommitHandling) {
+                            getBrokerService().getTopicOrderedExecutor().execute(this::handlePendingCommit);
+                        }
+                        completableFuture.complete(null);
+                    }));
+        }));
+        return completableFuture;
+    }
+
+    private void handlePendingCommit() {
+        pendingCommitHandling = true;
+        TxnID txnID = pendingCommitTxn.peek();
+        txnCursor.getTxnMeta(txnID, false).thenAccept(meta -> {
+            if (meta.status().equals(TxnStatus.COMMITTING)) {
+                commitPartitionTopic(txnID).thenCompose(position ->
+                    commitTxn(txnID,
+                        ((PositionImpl) position).getLedgerId(),
+                        ((PositionImpl) position).getEntryId())
+                        .whenComplete((ignored, throwable) -> {
+                            if (throwable != null) {
+                                handlePendingCommit();
+                            } else {
+                                pendingCommitTxn.remove(txnID);
+                                if (pendingCommitTxn.peek() != null) {
+                                    handlePendingCommit();
+                                } else {
+                                    pendingCommitHandling = false;
+                                }
+                            }
+                        }));
+            } else {
+                pendingCommitTxn.remove(txnID);
+            }
+        });
+    }
+
+    // append committed marker to partitioned topic
+    @Override
+    public CompletableFuture<Position> commitPartitionTopic(TxnID txnID) {
+        CompletableFuture<Position> positionFuture = new CompletableFuture<>();
+        // TODO How to generate sequenceId for commit marker in partitioned topic
+        long ptSequenceId = -1;
+        MessageIdData messageIdData = MessageIdData.newBuilder().setLedgerId(-1L).setEntryId(-1L).build();
+        ByteBuf commitMarker = Markers.newTxnCommitMarker(
+                ptSequenceId, txnID.getMostSigBits(), txnID.getLeastSigBits(), messageIdData);
+
+        originTopic.publishMessage(commitMarker, (e, ledgerId, entryId) -> {
+            positionFuture.complete(new PositionImpl(ledgerId, entryId));
+        });
+
+        return positionFuture;
+    }
+
+    @Override
     public CompletableFuture<Void> commitTxn(TxnID txnID, long committedAtLedgerId, long committedAtEntryId) {
         return txnCursor.getTxnMeta(txnID, false)
                         .thenApply(meta -> createCommitMarker(meta, committedAtLedgerId, committedAtEntryId))
@@ -172,6 +291,7 @@ public class PersistentTransactionBuffer extends PersistentTopic implements Tran
 
     private CompletableFuture<Position> publishMessage(TxnID txnID, ByteBuf msg, long sequenceId) {
         CompletableFuture<Position> publishFuture = new CompletableFuture<>();
+
         publishMessage(msg, new TxnCtx(txnID.toString(), sequenceId, publishFuture) {
             @Override
             public void completed(Exception e, long ledgerId, long entryId) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/PersistentTransactionBufferProvider.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/PersistentTransactionBufferProvider.java
new file mode 100644
index 0000000..ebf3d7b
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/PersistentTransactionBufferProvider.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer.impl;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
+import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider;
+import org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionBufferProviderException;
+import org.apache.pulsar.common.naming.TopicName;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Persistent transaction buffer provider.
+ */
+@Slf4j
+public class PersistentTransactionBufferProvider implements TransactionBufferProvider {
+
+    @Override
+    public CompletableFuture<TransactionBuffer> newTransactionBuffer() {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<TransactionBuffer> newTransactionBuffer(Topic originTopic) {
+        CompletableFuture<TransactionBuffer> tbFuture = new CompletableFuture<>();
+
+        if (originTopic == null) {
+            tbFuture.completeExceptionally(new TransactionBufferProviderException("The originTopic is null."));
+            return tbFuture;
+        }
+        if (!(originTopic instanceof PersistentTopic)) {
+            tbFuture.completeExceptionally(new TransactionBufferProviderException(
+                    "The originTopic is not persistentTopic."));
+            return tbFuture;
+        }
+
+        PersistentTopic originPersistentTopic = (PersistentTopic) originTopic;
+        String tbTopicName = PersistentTransactionBuffer.getTransactionBufferTopicName(originPersistentTopic.getName());
+
+        originPersistentTopic.getBrokerService().getManagedLedgerFactory()
+            .asyncOpen(TopicName.get(tbTopicName).getPersistenceNamingEncoding(),
+                    new AsyncCallbacks.OpenLedgerCallback() {
+                        @Override
+                        public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
+                            try {
+                                tbFuture.complete(new PersistentTransactionBuffer(tbTopicName, ledger,
+                                        originPersistentTopic.getBrokerService(), originTopic));
+                            } catch (Exception e) {
+                                log.error("New PersistentTransactionBuffer error.", e);
+                                tbFuture.completeExceptionally(e);
+                            }
+                        }
+
+                        @Override
+                        public void openLedgerFailed(ManagedLedgerException exception, Object ctx) {
+                            log.error("Open transactionBuffer managedLedger failed.", exception);
+                            tbFuture.completeExceptionally(exception);
+                        }
+                    }, null);
+        return tbFuture;
+    }
+
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferClientImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferClientImpl.java
index 8294e29..2d05ff3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferClientImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferClientImpl.java
@@ -44,7 +44,6 @@ public class TransactionBufferClientImpl implements TransactionBufferClient {
         TransactionBufferHandler handler = new TransactionBufferHandlerImpl(connectionPool, namespaceService);
         return new TransactionBufferClientImpl(handler);
     }
-
     @Override
     public CompletableFuture<TxnID> commitTxnOnTopic(String topic, long txnIdMostBits, long txnIdLeastBits) {
         return tbHandler.endTxnOnTopic(topic, txnIdMostBits, txnIdLeastBits, PulsarApi.TxnAction.COMMIT);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
index 62c815f..624c9aa 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
@@ -143,6 +143,7 @@ public class TransactionBufferHandlerImpl implements TransactionBufferHandler, T
             if (log.isDebugEnabled()) {
                 log.debug("[{}] Got end txn on topic response for for request {}", op.topic, response.getRequestId());
             }
+            log.info("[{}] Got end txn on topic response for for request {}", op.topic, response.getRequestId());
             op.cb.complete(new TxnID(response.getTxnidMostBits(), response.getTxnidLeastBits()));
         } else {
             log.error("[{}] Got end txn on topic response for request {} error {}", op.topic, response.getRequestId(), response.getError());
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionMetaImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionMetaImpl.java
index 90af13e..ba0fcb4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionMetaImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionMetaImpl.java
@@ -109,8 +109,8 @@ public class TransactionMetaImpl implements TransactionMeta {
     }
 
     @Override
-    public CompletableFuture<Void> appendEntry(long sequenceId, Position position) {
-        CompletableFuture<Void> appendFuture = new CompletableFuture<>();
+    public CompletableFuture<Position> appendEntry(long sequenceId, Position position) {
+        CompletableFuture<Position> appendFuture = new CompletableFuture<>();
         synchronized (this) {
             if (TxnStatus.OPEN != txnStatus) {
                 appendFuture.completeExceptionally(
@@ -121,14 +121,25 @@ public class TransactionMetaImpl implements TransactionMeta {
         synchronized (this.entries) {
             this.entries.put(sequenceId, position);
         }
-        return CompletableFuture.completedFuture(null);
+        return CompletableFuture.completedFuture(position);
+    }
+
+    @Override
+    public CompletableFuture<TransactionMeta> committingTxn() {
+        CompletableFuture<TransactionMeta> committingFuture = new CompletableFuture<>();
+        if (!checkStatus(TxnStatus.OPEN, committingFuture)) {
+            return committingFuture;
+        }
+        this.txnStatus = TxnStatus.COMMITTING;
+        committingFuture.complete(this);
+        return committingFuture;
     }
 
     @Override
     public synchronized CompletableFuture<TransactionMeta> commitTxn(long committedAtLedgerId,
                                                                      long committedAtEntryId) {
         CompletableFuture<TransactionMeta> commitFuture = new CompletableFuture<>();
-        if (!checkOpened(txnID, commitFuture)) {
+        if (!checkStatus(TxnStatus.COMMITTING, commitFuture)) {
             return commitFuture;
         }
 
@@ -143,7 +154,7 @@ public class TransactionMetaImpl implements TransactionMeta {
     @Override
     public synchronized CompletableFuture<TransactionMeta> abortTxn() {
         CompletableFuture<TransactionMeta> abortFuture = new CompletableFuture<>();
-        if (!checkOpened(txnID, abortFuture)) {
+        if (!checkStatus(TxnStatus.OPEN, abortFuture)) {
             return abortFuture;
         }
 
@@ -153,11 +164,12 @@ public class TransactionMetaImpl implements TransactionMeta {
         return abortFuture;
     }
 
-    private boolean checkOpened(TxnID txnID, CompletableFuture<TransactionMeta> future) {
-        if (TxnStatus.OPEN != txnStatus) {
-            future.completeExceptionally(new UnexpectedTxnStatusException(txnID, TxnStatus.OPEN, txnStatus));
+    private boolean checkStatus(TxnStatus expectedStatus, CompletableFuture<TransactionMeta> future) {
+        if (!txnStatus.equals(expectedStatus)) {
+            future.completeExceptionally(new UnexpectedTxnStatusException(txnID, expectedStatus, txnStatus));
             return false;
         }
         return true;
     }
+
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/PulsarClientTransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/PulsarClientTransactionTest.java
new file mode 100644
index 0000000..58e46c7
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/PulsarClientTransactionTest.java
@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.google.common.collect.Sets;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.mledger.ReadOnlyCursor;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.broker.transaction.buffer.impl.PersistentTransactionBuffer;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.transaction.Transaction;
+import org.apache.pulsar.client.impl.PartitionedProducerImpl;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.transaction.TransactionImpl;
+import org.apache.pulsar.common.api.proto.PulsarApi;
+import org.apache.pulsar.common.api.proto.PulsarMarkers;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.protocol.Markers;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Pulsar client transaction test.
+ */
+@Slf4j
+public class PulsarClientTransactionTest extends TransactionTestBase {
+
+    private final static int TOPIC_PARTITION = 3;
+
+    private final static String CLUSTER_NAME = "test";
+    private final static String TENANT = "tnx";
+    private final static String NAMESPACE1 = TENANT + "/ns1";
+    private final static String TOPIC_INPUT_1 = NAMESPACE1 + "/input1";
+    private final static String TOPIC_INPUT_2 = NAMESPACE1 + "/input2";
+    private final static String TOPIC_OUTPUT_1 = NAMESPACE1 + "/output1";
+    private final static String TOPIC_OUTPUT_2 = NAMESPACE1 + "/output2";
+
+    @BeforeMethod
+    protected void setup() throws Exception {
+        internalSetup();
+
+        int webServicePort = getServiceConfigurationList().get(0).getWebServicePort().get();
+        admin.clusters().createCluster(CLUSTER_NAME, new ClusterData("http://localhost:" + webServicePort));
+        admin.tenants().createTenant(TENANT,
+                new TenantInfo(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME)));
+        admin.namespaces().createNamespace(NAMESPACE1);
+        admin.topics().createPartitionedTopic(TOPIC_INPUT_1, 3);
+        admin.topics().createPartitionedTopic(TOPIC_INPUT_2, 3);
+        admin.topics().createPartitionedTopic(TOPIC_OUTPUT_1, 3);
+        admin.topics().createPartitionedTopic(TOPIC_OUTPUT_2, 3);
+
+        admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
+                new TenantInfo(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME)));
+        admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
+        admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), 16);
+
+        int brokerPort = getServiceConfigurationList().get(0).getBrokerServicePort().get();
+        pulsarClient = PulsarClient.builder()
+                .serviceUrl("pulsar://localhost:" + brokerPort)
+                .statsInterval(0, TimeUnit.SECONDS)
+                .enableTransaction(true)
+                .build();
+
+        Thread.sleep(1000 * 3);
+    }
+
+    @AfterMethod
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void produceCommitTest() throws Exception {
+        PulsarClientImpl pulsarClientImpl = (PulsarClientImpl) pulsarClient;
+        Transaction tnx = pulsarClientImpl.newTransaction()
+                .withTransactionTimeout(5, TimeUnit.SECONDS)
+                .build().get();
+
+        long txnIdMostBits = ((TransactionImpl) tnx).getTxnIdMostBits();
+        long txnIdLeastBits = ((TransactionImpl) tnx).getTxnIdLeastBits();
+        Assert.assertTrue(txnIdMostBits > -1);
+        Assert.assertTrue(txnIdLeastBits > -1);
+
+        @Cleanup
+        PartitionedProducerImpl<byte[]> outProducer = (PartitionedProducerImpl<byte[]>) pulsarClientImpl
+                .newProducer()
+                .topic(TOPIC_OUTPUT_1)
+                .sendTimeout(0, TimeUnit.SECONDS)
+                .enableBatching(false)
+                .roundRobinRouterBatchingPartitionSwitchFrequency(1)
+                .create();
+
+        int messageCntPerPartition = 3;
+        int messageCnt = TOPIC_PARTITION * messageCntPerPartition;
+        String content = "Hello Txn - ";
+        Set<String> messageSet = new HashSet<>();
+        List<CompletableFuture<MessageId>> futureList = new ArrayList<>();
+        for (int i = 0; i < messageCnt; i++) {
+            String msg = content + i;
+            messageSet.add(msg);
+            CompletableFuture<MessageId> produceFuture = outProducer
+                    .newMessage(tnx).value(msg.getBytes(UTF_8)).sendAsync();
+            futureList.add(produceFuture);
+        }
+
+        // the target topic hasn't the commit marker before commit
+        for (int i = 0; i < TOPIC_PARTITION; i++) {
+            ReadOnlyCursor originTopicCursor = getOriginTopicCursor(TOPIC_OUTPUT_1, i);
+            Assert.assertNotNull(originTopicCursor);
+            Assert.assertFalse(originTopicCursor.hasMoreEntries());
+            originTopicCursor.close();
+        }
+
+        // the messageId callback can't be called before commit
+        futureList.forEach(messageIdFuture -> {
+            try {
+                messageIdFuture.get(1, TimeUnit.SECONDS);
+                Assert.fail("MessageId shouldn't be get before txn commit.");
+            } catch (Exception e) {
+                if (e instanceof TimeoutException) {
+                    log.info("This is a expected exception.");
+                } else {
+                    log.error("This exception is not expected.", e);
+                    Assert.fail("This exception is not expected.");
+                }
+            }
+        });
+
+        tnx.commit().get();
+
+        Thread.sleep(3000L);
+
+        // the messageId callback should be called after commit
+        futureList.forEach(messageIdFuture -> {
+            try {
+                MessageId messageId = messageIdFuture.get(1, TimeUnit.SECONDS);
+                Assert.assertNotNull(messageId);
+                log.info("Tnx commit success! messageId: {}", messageId);
+            } catch (Exception e) {
+                log.error("Tnx commit failed! tnx: " + tnx, e);
+                Assert.fail("Tnx commit failed! tnx: " + tnx);
+            }
+        });
+
+        for (int i = 0; i < TOPIC_PARTITION; i++) {
+            // the target topic partition received the commit marker
+            ReadOnlyCursor originTopicCursor = getOriginTopicCursor(TOPIC_OUTPUT_1, i);
+            Assert.assertNotNull(originTopicCursor);
+            Assert.assertTrue(originTopicCursor.hasMoreEntries());
+            List<Entry> entries = originTopicCursor.readEntries((int) originTopicCursor.getNumberOfEntries());
+            Assert.assertEquals(1, entries.size());
+            PulsarApi.MessageMetadata messageMetadata = Commands.parseMessageMetadata(entries.get(0).getDataBuffer());
+            Assert.assertEquals(PulsarMarkers.MarkerType.TXN_COMMIT_VALUE, messageMetadata.getMarkerType());
+            long commitMarkerLedgerId = entries.get(0).getLedgerId();
+            long commitMarkerEntryId = entries.get(0).getEntryId();
+
+            // the target topic transactionBuffer should receive the transaction messages,
+            // committing marker and commit marker
+            ReadOnlyCursor tbTopicCursor = getTBTopicCursor(TOPIC_OUTPUT_1, i);
+            Assert.assertNotNull(tbTopicCursor);
+            Assert.assertTrue(tbTopicCursor.hasMoreEntries());
+            long tbEntriesCnt = tbTopicCursor.getNumberOfEntries();
+            log.info("transaction buffer entries count: {}", tbEntriesCnt);
+            Assert.assertEquals(tbEntriesCnt, messageCntPerPartition + 2);
+
+            entries = tbTopicCursor.readEntries((int) tbEntriesCnt);
+            // check the messages
+            for (int j = 0; j < messageCntPerPartition; j++) {
+                messageMetadata = Commands.parseMessageMetadata(entries.get(j).getDataBuffer());
+                Assert.assertEquals(messageMetadata.getTxnidMostBits(), txnIdMostBits);
+                Assert.assertEquals(messageMetadata.getTxnidLeastBits(), txnIdLeastBits);
+
+                byte[] bytes = new byte[entries.get(j).getDataBuffer().readableBytes()];
+                entries.get(j).getDataBuffer().readBytes(bytes);
+                System.out.println(new String(bytes));
+                Assert.assertTrue(messageSet.remove(new String(bytes)));
+            }
+
+            // check committing marker
+            messageMetadata = Commands.parseMessageMetadata(entries.get(messageCntPerPartition).getDataBuffer());
+            Assert.assertEquals(PulsarMarkers.MarkerType.TXN_COMMITTING_VALUE, messageMetadata.getMarkerType());
+
+            // check commit marker, committedAtLedgerId and committedAtEntryId
+            messageMetadata = Commands.parseMessageMetadata(entries.get(messageCntPerPartition + 1).getDataBuffer());
+            Assert.assertEquals(PulsarMarkers.MarkerType.TXN_COMMIT_VALUE, messageMetadata.getMarkerType());
+            PulsarMarkers.TxnCommitMarker commitMarker = Markers.parseCommitMarker(entries.get(messageCntPerPartition + 1).getDataBuffer());
+            Assert.assertEquals(commitMarkerLedgerId, commitMarker.getMessageId().getLedgerId());
+            Assert.assertEquals(commitMarkerEntryId, commitMarker.getMessageId().getEntryId());
+        }
+
+        Assert.assertEquals(0, messageSet.size());
+        System.out.println("finish test");
+    }
+
+    private ReadOnlyCursor getTBTopicCursor(String topic, int partition) {
+        try {
+            String tbTopicName = PersistentTransactionBuffer.getTransactionBufferTopicName(
+                    TopicName.get(topic).toString() + TopicName.PARTITIONED_TOPIC_SUFFIX + partition);
+
+            return getPulsarServiceList().get(0).getManagedLedgerFactory().openReadOnlyCursor(
+                    TopicName.get(tbTopicName).getPersistenceNamingEncoding(),
+                    PositionImpl.earliest, new ManagedLedgerConfig());
+        } catch (Exception e) {
+            log.error("Failed to get transaction buffer topic readonly cursor.", e);
+            Assert.fail("Failed to get transaction buffer topic readonly cursor.");
+            return null;
+        }
+    }
+
+    private ReadOnlyCursor getOriginTopicCursor(String topic, int partition) {
+        try {
+            String partitionTopic = TopicName.get(topic).toString() + TopicName.PARTITIONED_TOPIC_SUFFIX + partition;
+            return getPulsarServiceList().get(0).getManagedLedgerFactory().openReadOnlyCursor(
+                    TopicName.get(partitionTopic).getPersistenceNamingEncoding(),
+                    PositionImpl.earliest, new ManagedLedgerConfig());
+        } catch (Exception e) {
+            log.error("Failed to get origin topic readonly cursor.", e);
+            Assert.fail("Failed to get origin topic readonly cursor.");
+            return null;
+        }
+    }
+
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
new file mode 100644
index 0000000..b8ecf5b
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
@@ -0,0 +1,274 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction;
+
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
+import org.apache.bookkeeper.client.PulsarMockBookKeeper;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.test.PortManager;
+import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.pulsar.broker.BookKeeperClientFactory;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.auth.SameThreadOrderedSafeExecutor;
+import org.apache.pulsar.broker.intercept.CounterBrokerInterceptor;
+import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
+import org.apache.pulsar.zookeeper.ZookeeperClientFactoryImpl;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.MockZooKeeper;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+
+@Slf4j
+public class TransactionTestBase {
+
+    private final static String CLUSTER_NAME = "test";
+
+    @Setter
+    private int brokerCount = 3;
+
+    private final List<SameThreadOrderedSafeExecutor> orderedExecutorList = new ArrayList<>();
+    @Getter
+    private final List<ServiceConfiguration> serviceConfigurationList = new ArrayList<>();
+    @Getter
+    private final List<PulsarService> pulsarServiceList = new ArrayList<>();
+
+    protected PulsarAdmin admin;
+    protected PulsarClient pulsarClient;
+
+    private MockZooKeeper mockZooKeeper;
+    private ExecutorService bkExecutor;
+    private NonClosableMockBookKeeper mockBookKeeper;
+
+    public void internalSetup() throws Exception {
+        init();
+
+        int webServicePort = serviceConfigurationList.get(0).getWebServicePort().get();
+        admin = spy(PulsarAdmin.builder().serviceHttpUrl("http://localhost:" + webServicePort).build());
+
+        int brokerPort = serviceConfigurationList.get(0).getBrokerServicePort().get();
+        pulsarClient = PulsarClient.builder().serviceUrl("pulsar://localhost:" + brokerPort).build();
+    }
+
+    private void init() throws Exception {
+        mockZooKeeper = createMockZooKeeper();
+
+        bkExecutor = Executors.newSingleThreadExecutor(
+                new ThreadFactoryBuilder().setNameFormat("mock-pulsar-bk")
+                        .setUncaughtExceptionHandler((thread, ex) -> log.info("Uncaught exception", ex))
+                        .build());
+        mockBookKeeper = createMockBookKeeper(mockZooKeeper, bkExecutor);
+        startBroker();
+    }
+
+    protected void startBroker() throws Exception {
+        for (int i = 0; i < brokerCount; i++) {
+            ServiceConfiguration conf = new ServiceConfiguration();
+            conf.setClusterName(CLUSTER_NAME);
+            conf.setAdvertisedAddress("localhost");
+            conf.setManagedLedgerCacheSizeMB(8);
+            conf.setActiveConsumerFailoverDelayTimeMillis(0);
+            conf.setDefaultNumberOfNamespaceBundles(1);
+            conf.setZookeeperServers("localhost:2181");
+            conf.setConfigurationStoreServers("localhost:3181");
+            conf.setAllowAutoTopicCreationType("non-partitioned");
+            conf.setBookkeeperClientExposeStatsToPrometheus(true);
+
+            Integer brokerServicePort = PortManager.nextFreePort();
+            conf.setBrokerServicePort(Optional.of(brokerServicePort));
+
+            conf.setBrokerServicePortTls(Optional.of(PortManager.nextFreePort()));
+            conf.setAdvertisedAddress("localhost");
+            conf.setWebServicePort(Optional.of(PortManager.nextFreePort()));
+            conf.setWebServicePortTls(Optional.of(PortManager.nextFreePort()));
+            serviceConfigurationList.add(conf);
+
+            PulsarService pulsar = spy(new PulsarService(conf));
+
+            setupBrokerMocks(pulsar);
+            pulsar.start();
+            pulsarServiceList.add(pulsar);
+        }
+    }
+
+    protected void setupBrokerMocks(PulsarService pulsar) throws Exception {
+        // Override default providers with mocked ones
+        doReturn(mockZooKeeperClientFactory).when(pulsar).getZooKeeperClientFactory();
+        doReturn(mockBookKeeperClientFactory).when(pulsar).newBookKeeperClientFactory();
+
+        Supplier<NamespaceService> namespaceServiceSupplier = () -> spy(new NamespaceService(pulsar));
+        doReturn(namespaceServiceSupplier).when(pulsar).getNamespaceServiceProvider();
+
+        SameThreadOrderedSafeExecutor executor = new SameThreadOrderedSafeExecutor();
+        orderedExecutorList.add(executor);
+        doReturn(executor).when(pulsar).getOrderedExecutor();
+        doReturn(new CounterBrokerInterceptor()).when(pulsar).getBrokerInterceptor();
+
+        doAnswer((invocation) -> spy(invocation.callRealMethod())).when(pulsar).newCompactor();
+    }
+
+    public static MockZooKeeper createMockZooKeeper() throws Exception {
+        MockZooKeeper zk = MockZooKeeper.newInstance(MoreExecutors.newDirectExecutorService());
+        List<ACL> dummyAclList = new ArrayList<>(0);
+
+        ZkUtils.createFullPathOptimistic(zk, "/ledgers/available/192.168.1.1:" + 5000,
+                "".getBytes(ZookeeperClientFactoryImpl.ENCODING_SCHEME), dummyAclList, CreateMode.PERSISTENT);
+
+        zk.create("/ledgers/LAYOUT", "1\nflat:1".getBytes(ZookeeperClientFactoryImpl.ENCODING_SCHEME), dummyAclList,
+                CreateMode.PERSISTENT);
+        return zk;
+    }
+
+    public static TransactionTestBase.NonClosableMockBookKeeper createMockBookKeeper(ZooKeeper zookeeper,
+                                                                                             ExecutorService executor) throws Exception {
+        return spy(new TransactionTestBase.NonClosableMockBookKeeper(zookeeper, executor));
+    }
+
+    // Prevent the MockBookKeeper instance from being closed when the broker is restarted within a test
+    public static class NonClosableMockBookKeeper extends PulsarMockBookKeeper {
+
+        public NonClosableMockBookKeeper(ZooKeeper zk, ExecutorService executor) throws Exception {
+            super(zk, executor);
+        }
+
+        @Override
+        public void close() {
+            // no-op
+        }
+
+        @Override
+        public void shutdown() {
+            // no-op
+        }
+
+        public void reallyShutdown() {
+            super.shutdown();
+        }
+    }
+
+    protected ZooKeeperClientFactory mockZooKeeperClientFactory = new ZooKeeperClientFactory() {
+
+        @Override
+        public CompletableFuture<ZooKeeper> create(String serverList, SessionType sessionType,
+                                                   int zkSessionTimeoutMillis) {
+            // Always return the same instance (so that we don't loose the mock ZK content on broker restart
+            return CompletableFuture.completedFuture(mockZooKeeper);
+        }
+    };
+
+    private final BookKeeperClientFactory mockBookKeeperClientFactory = new BookKeeperClientFactory() {
+
+        @Override
+        public BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient,
+                                 Optional<Class<? extends EnsemblePlacementPolicy>> ensemblePlacementPolicyClass,
+                                 Map<String, Object> properties) {
+            // Always return the same instance (so that we don't loose the mock BK content on broker restart
+            return mockBookKeeper;
+        }
+
+        @Override
+        public BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient,
+                                 Optional<Class<? extends EnsemblePlacementPolicy>> ensemblePlacementPolicyClass,
+                                 Map<String, Object> properties, StatsLogger statsLogger) {
+            // Always return the same instance (so that we don't loose the mock BK content on broker restart
+            return mockBookKeeper;
+        }
+
+        @Override
+        public void close() {
+            // no-op
+        }
+    };
+
+    protected final void internalCleanup() {
+        try {
+            // if init fails, some of these could be null, and if so would throw
+            // an NPE in shutdown, obscuring the real error
+            if (admin != null) {
+                admin.close();
+                admin = null;
+            }
+            if (pulsarClient != null) {
+                pulsarClient.shutdown();
+                pulsarClient = null;
+            }
+            if (pulsarServiceList.size() > 0) {
+                for (PulsarService pulsarService : pulsarServiceList) {
+                    pulsarService.close();
+                }
+            }
+            if (mockBookKeeper != null) {
+                mockBookKeeper.reallyShutdown();
+            }
+            if (mockZooKeeper != null) {
+                mockZooKeeper.shutdown();
+            }
+            if (orderedExecutorList.size() > 0) {
+                for (int i = 0; i < orderedExecutorList.size(); i++) {
+                    SameThreadOrderedSafeExecutor sameThreadOrderedSafeExecutor = orderedExecutorList.get(i);
+                    if(sameThreadOrderedSafeExecutor != null) {
+                        try {
+                            sameThreadOrderedSafeExecutor.shutdownNow();
+                            sameThreadOrderedSafeExecutor.awaitTermination(5, TimeUnit.SECONDS);
+                        } catch (InterruptedException ex) {
+                            log.error("sameThreadOrderedSafeExecutor shutdown had error", ex);
+                            Thread.currentThread().interrupt();
+                        }
+                        orderedExecutorList.set(i, null);
+                    }
+                }
+            }
+            if(bkExecutor != null) {
+                try {
+                    bkExecutor.shutdownNow();
+                    bkExecutor.awaitTermination(5, TimeUnit.SECONDS);
+                } catch (InterruptedException ex) {
+                    log.error("bkExecutor shutdown had error", ex);
+                    Thread.currentThread().interrupt();
+                }
+                bkExecutor = null;
+            }
+        } catch (Exception e) {
+            log.warn("Failed to clean up mocked pulsar service:", e);
+        }
+    }
+
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/PersistentTransactionBufferTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/PersistentTransactionBufferTest.java
index 2301e97..b0f5957 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/PersistentTransactionBufferTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/PersistentTransactionBufferTest.java
@@ -65,6 +65,7 @@ import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
 import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.commons.lang3.RandomUtils;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.cache.ConfigurationCacheService;
@@ -73,9 +74,12 @@ import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.broker.service.ServerCnx;
+import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.transaction.buffer.impl.PersistentTransactionBuffer;
 import org.apache.pulsar.broker.transaction.buffer.impl.TransactionMetaImpl;
 import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.TopicName;
@@ -96,6 +100,7 @@ import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.MockZooKeeper;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.data.ACL;
+import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
@@ -113,6 +118,9 @@ public class PersistentTransactionBufferTest extends MockedBookKeeperTestCase {
     private ManagedCursor cursorMock;
     private ConfigurationCacheService configCacheService;
 
+    private long committedLedgerId = -1L;
+    private long committedEntryId = -1L;
+
     final String successTopicName = "persistent://prop/use/ns-abc/successTopic_txn";
     private static final Logger log = LoggerFactory.getLogger(PersistentTransactionBufferTest.class);
 
@@ -206,7 +214,7 @@ public class PersistentTransactionBufferTest extends MockedBookKeeperTestCase {
 
     @SuppressWarnings("unchecked")
     void setupMLAsyncCallbackMocks()
-        throws BrokerServiceException.NamingException, ManagedLedgerException, InterruptedException {
+            throws BrokerServiceException.NamingException, ManagedLedgerException, InterruptedException, ExecutionException {
         ledgerMock = mock(ManagedLedger.class);
         cursorMock = mock(ManagedCursor.class);
         final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
@@ -312,7 +320,21 @@ public class PersistentTransactionBufferTest extends MockedBookKeeperTestCase {
             return null;
         }).when(cursorMock).asyncMarkDelete(any(), any(), any(MarkDeleteCallback.class), any());
 
-        this.buffer = new PersistentTransactionBuffer(successTopicName, factory.open("hello"), brokerService);
+        this.buffer = new PersistentTransactionBuffer(successTopicName,
+                factory.open("hello"), brokerService, getMockPersistentTopic());
+    }
+
+    private PersistentTopic getMockPersistentTopic() {
+        PersistentTopic persistentTopic = Mockito.mock(PersistentTopic.class);
+        doAnswer(new Answer() {
+            @Override
+            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+                ((Topic.PublishContext) invocationOnMock.getArguments()[1])
+                        .completed(null, committedLedgerId, committedEntryId);
+                return null;
+            }
+        }).when(persistentTopic).publishMessage(Mockito.any(), Mockito.any());
+        return persistentTopic;
     }
 
     @AfterMethod
@@ -372,14 +394,14 @@ public class PersistentTransactionBufferTest extends MockedBookKeeperTestCase {
     }
 
     @Test
-    public void testOpenReaderOnCommittedTxn() throws ExecutionException, InterruptedException {
+    public void testOpenReaderOnCommittedTxn() throws Exception {
         final int numEntries = 10;
         appendEntries(buffer, txnID, numEntries, 0L);
         TransactionMeta meta = buffer.getTransactionMeta(txnID).get();
         assertEquals(txnID, meta.id());
         assertEquals(TxnStatus.OPEN, meta.status());
 
-        buffer.commitTxn(txnID, 22L, 33L).get();
+        endTxnAndWaitTillFinish(buffer, txnID, PulsarApi.TxnAction.COMMIT, 22L, 33L);
 
         meta = buffer.getTransactionMeta(txnID).get();
         assertEquals(txnID, meta.id());
@@ -407,6 +429,17 @@ public class PersistentTransactionBufferTest extends MockedBookKeeperTestCase {
     }
 
     @Test
+    public void testEndOnPartition() throws Exception {
+        final int numEntries = 10;
+        TxnID commitTxn = new TxnID(RandomUtils.nextLong(), RandomUtils.nextLong());
+        appendEntries(buffer, commitTxn, numEntries, 0L);
+        endTxnAndWaitTillFinish(buffer, commitTxn, PulsarApi.TxnAction.COMMIT, 22L, 33L);
+
+        TransactionMeta meta = buffer.getTransactionMeta(commitTxn).get();
+        assertEquals(meta.status(), TxnStatus.COMMITTED);
+    }
+
+    @Test()
     public void testCommitTxn() throws Exception {
         final int numEntries = 10;
         appendEntries(buffer, txnID, numEntries, 0L);
@@ -415,7 +448,7 @@ public class PersistentTransactionBufferTest extends MockedBookKeeperTestCase {
         assertEquals(txnID, meta.id());
         assertEquals(meta.status(), TxnStatus.OPEN);
 
-        buffer.commitTxn(txnID, 22L, 33L).get();
+        endTxnAndWaitTillFinish(buffer, txnID, PulsarApi.TxnAction.COMMIT, 22L, 33L);
         meta = buffer.getTransactionMeta(txnID).get();
 
         assertEquals(txnID, meta.id());
@@ -423,7 +456,7 @@ public class PersistentTransactionBufferTest extends MockedBookKeeperTestCase {
     }
 
     @Test
-    public void testCommitTxnMultiTimes() throws ExecutionException, InterruptedException {
+    public void testCommitTxnMultiTimes() throws Exception {
         final int numEntries = 10;
         appendEntries(buffer, txnID, numEntries, 0L);
         TransactionMeta meta = buffer.getTransactionMeta(txnID).get();
@@ -431,7 +464,7 @@ public class PersistentTransactionBufferTest extends MockedBookKeeperTestCase {
         assertEquals(txnID, meta.id());
         assertEquals(meta.status(), TxnStatus.OPEN);
 
-        buffer.commitTxn(txnID, 22L, 33L).get();
+        endTxnAndWaitTillFinish(buffer, txnID, PulsarApi.TxnAction.COMMIT, 22L, 33L);
         try {
             buffer.commitTxn(txnID, 23L, 34L).get();
             buffer.commitTxn(txnID, 24L, 34L).get();
@@ -465,7 +498,7 @@ public class PersistentTransactionBufferTest extends MockedBookKeeperTestCase {
         assertEquals(txnID, meta.id());
         assertEquals(TxnStatus.OPEN, meta.status());
 
-        buffer.commitTxn(txnID, 22L, 33L).get();
+        endTxnAndWaitTillFinish(buffer, txnID, PulsarApi.TxnAction.COMMIT, 22L, 33L);
         meta = buffer.getTransactionMeta(txnID).get();
         assertEquals(txnID, meta.id());
         assertEquals(TxnStatus.COMMITTED, meta.status());
@@ -505,14 +538,14 @@ public class PersistentTransactionBufferTest extends MockedBookKeeperTestCase {
 
         TxnID txnId2 = new TxnID(1234L, 3456L);
         appendEntries(buffer, txnId2, numEntries, 0L);
-        buffer.commitTxn(txnId2, 22L, 0L).get();
+        endTxnAndWaitTillFinish(buffer, txnId2, PulsarApi.TxnAction.COMMIT, 22L, 0L);
         TransactionMeta meta2 = buffer.getTransactionMeta(txnId2).get();
         assertEquals(txnId2, meta2.id());
         assertEquals(TxnStatus.COMMITTED, meta2.status());
 
         TxnID txnId3 = new TxnID(1234L, 4567L);
         appendEntries(buffer, txnId3, numEntries, 0L);
-        buffer.commitTxn(txnId3, 23L, 0L).get();
+        endTxnAndWaitTillFinish(buffer, txnId3, PulsarApi.TxnAction.COMMIT, 23L, 0L);
         TransactionMeta meta3 = buffer.getTransactionMeta(txnId3).get();
         assertEquals(txnId3, meta3.id());
         assertEquals(TxnStatus.COMMITTED, meta3.status());
@@ -548,11 +581,10 @@ public class PersistentTransactionBufferTest extends MockedBookKeeperTestCase {
     }
 
     @Test
-    public void testAppendEntry() throws ExecutionException, InterruptedException, ManagedLedgerException,
-                                         BrokerServiceException.NamingException {
+    public void testAppendEntry() throws Exception {
         ManagedLedger ledger = factory.open("test_ledger");
         PersistentTransactionBuffer newBuffer = new PersistentTransactionBuffer(successTopicName, ledger,
-                                                                                brokerService);
+                brokerService, getMockPersistentTopic());
         final int numEntries = 10;
         TxnID txnID = new TxnID(1111L, 2222L);
         List<ByteBuf> appendEntries =  appendEntries(newBuffer, txnID, numEntries, 0L);
@@ -564,7 +596,7 @@ public class PersistentTransactionBufferTest extends MockedBookKeeperTestCase {
 
         verifyEntries(ledger, appendEntries, meta.getEntries());
 
-        newBuffer.commitTxn(txnID, 22L, 33L).get();
+        endTxnAndWaitTillFinish(newBuffer, txnID, PulsarApi.TxnAction.COMMIT, 22L, 33L);
         meta = (TransactionMetaImpl) newBuffer.getTransactionMeta(txnID).get();
 
         assertEquals(meta.id(), txnID);
@@ -577,7 +609,7 @@ public class PersistentTransactionBufferTest extends MockedBookKeeperTestCase {
     public void testCommitMarker() throws Exception {
         ManagedLedger ledger = factory.open("test_commit_ledger");
         PersistentTransactionBuffer commitBuffer = new PersistentTransactionBuffer(successTopicName, ledger,
-                                                                                   brokerService);
+                brokerService, getMockPersistentTopic());
         final int numEntries = 10;
         List<ByteBuf> appendEntires = appendEntries(commitBuffer, txnID, numEntries, 0L);
 
@@ -588,7 +620,7 @@ public class PersistentTransactionBufferTest extends MockedBookKeeperTestCase {
 
         verifyEntries(ledger, appendEntires, meta.getEntries());
 
-        commitBuffer.commitTxn(txnID, 22L, 33L).get();
+        endTxnAndWaitTillFinish(commitBuffer, txnID, PulsarApi.TxnAction.COMMIT, 22L, 33L);
         assertEquals(meta.id(), txnID);
         assertEquals(meta.numEntries(), numEntries);
         assertEquals(meta.status(), TxnStatus.COMMITTED);
@@ -605,7 +637,7 @@ public class PersistentTransactionBufferTest extends MockedBookKeeperTestCase {
     public void testAbortMarker() throws Exception {
         ManagedLedger ledger = factory.open("test_abort_ledger");
         PersistentTransactionBuffer abortBuffer = new PersistentTransactionBuffer(successTopicName, ledger,
-                                                                                   brokerService);
+                brokerService, getMockPersistentTopic());
         final int numEntries = 10;
         List<ByteBuf> appendEntires = appendEntries(abortBuffer, txnID, numEntries, 0L);
 
@@ -653,8 +685,8 @@ public class PersistentTransactionBufferTest extends MockedBookKeeperTestCase {
         throws ManagedLedgerException, InterruptedException, BrokerServiceException.NamingException,
                ExecutionException {
         ManagedLedger ledger = factory.open("test_deduplicate");
-        PersistentTransactionBuffer newBuffer = new PersistentTransactionBuffer(successTopicName, ledger,
-                                                                                brokerService);
+        PersistentTransactionBuffer newBuffer = new PersistentTransactionBuffer(
+                successTopicName, ledger, brokerService, getMockPersistentTopic());
         final int numEntries = 10;
 
         TxnID txnID = new TxnID(1234L, 5678L);
@@ -729,4 +761,17 @@ public class PersistentTransactionBufferTest extends MockedBookKeeperTestCase {
         }
     }
 
+    private void endTxnAndWaitTillFinish(TransactionBuffer tb, TxnID txnId, PulsarApi.TxnAction txnAction,
+                                         long committedLedgerId, long committedEntryId) throws Exception {
+        this.committedLedgerId = committedLedgerId;
+        this.committedEntryId = committedEntryId;
+        tb.endTxnOnPartition(txnId, txnAction.getNumber());
+        TransactionMeta meta = tb.getTransactionMeta(txnId).get();
+        while (meta.status().equals(TxnStatus.OPEN)
+                || meta.status().equals(TxnStatus.COMMITTING)
+                || meta.status().equals(TxnStatus.ABORTING)) {
+            Thread.sleep(1000);
+        }
+    }
+
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
index 932270a..159b5c1 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
@@ -19,6 +19,9 @@
 package org.apache.pulsar.broker.transaction.buffer;
 
 import com.google.common.collect.Sets;
+import org.apache.pulsar.broker.TransactionMetadataStoreService;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientImpl;
 import org.apache.pulsar.broker.transaction.coordinator.TransactionMetaStoreTestBase;
 import org.apache.pulsar.client.api.transaction.TransactionBufferClient;
@@ -27,6 +30,8 @@ import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -35,6 +40,7 @@ import org.testng.annotations.Test;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 
@@ -59,6 +65,25 @@ public class TransactionBufferClientTest extends TransactionMetaStoreTestBase {
                 ((PulsarClientImpl) pulsarClient).getCnxPool());
     }
 
+    @Override
+    public void afterPulsarStart() throws Exception {
+        super.afterPulsarStart();
+        for (int i = 0; i < pulsarServices.length; i++) {
+            Topic mockTopic = Mockito.mock(Topic.class);
+            Mockito.when(mockTopic.endTxn(Mockito.any(), Mockito.anyInt()))
+                    .thenReturn(CompletableFuture.completedFuture(null));
+
+            ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>> topicMap =
+                    Mockito.mock(ConcurrentOpenHashMap.class);
+            Mockito.when(topicMap.get(Mockito.anyString())).thenReturn(
+                    CompletableFuture.completedFuture(Optional.of(mockTopic)));
+
+            BrokerService brokerService = Mockito.spy(new BrokerService(pulsarServices[i]));
+            Mockito.when(brokerService.getTopics()).thenReturn(topicMap);
+            Mockito.when(pulsarServices[i].getBrokerService()).thenReturn(brokerService);
+        }
+    }
+
     @Test
     public void testCommitOnTopic() throws ExecutionException, InterruptedException {
         List<CompletableFuture<TxnID>> futures = new ArrayList<>();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorClientTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorClientTest.java
index b751461..afeb44a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorClientTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorClientTest.java
@@ -18,20 +18,50 @@
  */
 package org.apache.pulsar.broker.transaction.coordinator;
 
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+
 import com.google.common.collect.Lists;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.TransactionMetadataStoreService;
+import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientImpl;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.transaction.TransactionBufferClient;
 import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient.State;
 import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
 import org.apache.pulsar.client.api.transaction.TxnID;
+import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import java.lang.reflect.Field;
+import java.util.concurrent.CompletableFuture;
+
+
 public class TransactionCoordinatorClientTest extends TransactionMetaStoreTestBase {
 
     @BeforeClass
     public void init() throws Exception {
         super.setup();
+
+        for (PulsarService pulsarService : pulsarServices) {
+            TransactionBufferClient tbClient = Mockito.mock(TransactionBufferClientImpl.class);
+            Mockito.when(tbClient.commitTxnOnTopic(anyString(), anyLong(), anyLong()))
+                    .thenReturn(CompletableFuture.completedFuture(null));
+            Mockito.when(tbClient.abortTxnOnTopic(anyString(), anyLong(), anyLong()))
+                    .thenReturn(CompletableFuture.completedFuture(null));
+            Mockito.when(tbClient.commitTxnOnSubscription(anyString(), anyString(), anyLong(), anyLong()))
+                    .thenReturn(CompletableFuture.completedFuture(null));
+            Mockito.when(tbClient.abortTxnOnSubscription(anyString(), anyString(), anyLong(), anyLong()))
+                    .thenReturn(CompletableFuture.completedFuture(null));
+
+            TransactionMetadataStoreService metadataStoreService = pulsarService.getTransactionMetadataStoreService();
+            Class<TransactionMetadataStoreService> clazz = TransactionMetadataStoreService.class;
+            Field field = clazz.getDeclaredField("tbClient");
+            field.setAccessible(true);
+            field.set(metadataStoreService, tbClient);
+        }
     }
 
     @Test
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java
index 79b383b..b8df885 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java
@@ -28,6 +28,7 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient;
 import org.apache.pulsar.client.impl.transaction.TransactionCoordinatorClientImpl;
 import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
+import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.annotations.BeforeClass;
@@ -70,7 +71,7 @@ public class TransactionMetaStoreTestBase {
             config.setLoadBalancerEnabled(false);
             configurations[i] = config;
 
-            pulsarServices[i] = new PulsarService(config);
+            pulsarServices[i] = Mockito.spy(new PulsarService(config));
             pulsarServices[i].start();
 
             pulsarAdmins[i] = PulsarAdmin.builder()
@@ -80,6 +81,8 @@ public class TransactionMetaStoreTestBase {
 
         Thread.sleep(100);
 
+        afterPulsarStart();
+
         pulsarClient = PulsarClient.builder().
             serviceUrl(pulsarServices[0].getBrokerServiceUrl())
             .build();
@@ -88,4 +91,8 @@ public class TransactionMetaStoreTestBase {
 
         Thread.sleep(3000);
     }
+
+    public void afterPulsarStart() throws Exception {
+        log.info("[afterPulsarStart]");
+    }
 }
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
index 865855c..0ec07c2 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
@@ -478,4 +478,12 @@ public interface ClientBuilder extends Cloneable {
      * @return
      */
     ClientBuilder proxyServiceUrl(String proxyServiceUrl, ProxyProtocol proxyProtocol);
+
+    /**
+     * If enable transaction, start the transactionCoordinatorClient with pulsar client.
+     *
+     * @param enableTransaction whether enable transaction feature
+     * @return
+     */
+    ClientBuilder enableTransaction(boolean enableTransaction);
 }
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TxnID.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TxnID.java
index d49132b..b6ae760 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TxnID.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TxnID.java
@@ -19,6 +19,8 @@
 package org.apache.pulsar.client.api.transaction;
 
 import java.io.Serializable;
+import java.util.Objects;
+
 import lombok.Data;
 
 /**
@@ -47,4 +49,20 @@ public class TxnID implements Serializable {
     public String toString() {
         return "(" + mostSigBits + "," + leastSigBits + ")";
     }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(mostSigBits, leastSigBits);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (obj instanceof TxnID) {
+            TxnID other = (TxnID) obj;
+            return Objects.equals(mostSigBits, other.mostSigBits)
+                    && Objects.equals(leastSigBits, other.leastSigBits);
+        }
+
+        return false;
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
index 6b820b9..c6d57e2 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
@@ -305,4 +305,10 @@ public class ClientBuilderImpl implements ClientBuilder {
         conf.setProxyProtocol(proxyProtocol);
         return this;
     }
+
+    @Override
+    public ClientBuilder enableTransaction(boolean enableTransaction) {
+        conf.setEnableTransaction(enableTransaction);
+        return this;
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 2b60d48..975ac37 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -869,6 +869,7 @@ public class ClientCnx extends PulsarHandler {
 
     @Override
     protected void handleEndTxnOnPartitionResponse(PulsarApi.CommandEndTxnOnPartitionResponse command) {
+        log.info("handleEndTxnOnPartitionResponse");
         TransactionBufferHandler handler = checkAndGetTransactionBufferHandler();
         if (handler != null) {
             handler.handleEndTxnOnTopicResponse(command.getRequestId(), command);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
index dd7b29b..73dcb4d 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
@@ -43,6 +43,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.PulsarClientException.NotSupportedException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.TopicMetadata;
+import org.apache.pulsar.client.api.transaction.Transaction;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -161,7 +162,7 @@ public class PartitionedProducerImpl<T> extends ProducerBase<T> {
     }
 
     @Override
-    CompletableFuture<MessageId> internalSendAsync(Message<?> message) {
+    CompletableFuture<MessageId> internalSendAsync(Message<?> message, Transaction txn) {
 
         switch (getState()) {
         case Ready:
@@ -180,7 +181,7 @@ public class PartitionedProducerImpl<T> extends ProducerBase<T> {
         int partition = routerPolicy.choosePartition(message, topicMetadata);
         checkArgument(partition >= 0 && partition < topicMetadata.numPartitions(),
                 "Illegal partition index chosen by the message routing policy: " + partition);
-        return producers.get(partition).internalSendAsync(message);
+        return producers.get(partition).internalSendAsync(message, txn);
     }
 
     @Override
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java
index 50f3fa1..b10dc2e 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java
@@ -73,7 +73,7 @@ public abstract class ProducerBase<T> extends HandlerState implements Producer<T
     }
 
     public CompletableFuture<MessageId> sendAsync(Message<?> message) {
-        return internalSendAsync(message);
+        return internalSendAsync(message, null);
     }
 
     @Override
@@ -100,12 +100,12 @@ public abstract class ProducerBase<T> extends HandlerState implements Producer<T
         return new TypedMessageBuilderImpl<>(this, schema, (TransactionImpl) txn);
     }
 
-    abstract CompletableFuture<MessageId> internalSendAsync(Message<?> message);
+    abstract CompletableFuture<MessageId> internalSendAsync(Message<?> message, Transaction txn);
 
     public MessageId send(Message<?> message) throws PulsarClientException {
         try {
             // enqueue the message to the buffer
-            CompletableFuture<MessageId> sendFuture = internalSendAsync(message);
+            CompletableFuture<MessageId> sendFuture = internalSendAsync(message, null);
 
             if (!sendFuture.isDone()) {
                 // the send request wasn't completed yet (e.g. not failing at enqueuing), then attempt to triggerFlush it out
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 2609060..4cb7a22 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -64,9 +64,11 @@ import org.apache.pulsar.client.api.ProducerCryptoFailureAction;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.PulsarClientException.CryptoException;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.transaction.Transaction;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
 import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
 import org.apache.pulsar.client.impl.schema.JSONSchema;
+import org.apache.pulsar.client.impl.transaction.TransactionImpl;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata.Builder;
 import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion;
@@ -266,7 +268,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
     }
 
     @Override
-    CompletableFuture<MessageId> internalSendAsync(Message<?> message) {
+    CompletableFuture<MessageId> internalSendAsync(Message<?> message, Transaction txn) {
 
         CompletableFuture<MessageId> future = new CompletableFuture<>();
 
@@ -341,6 +343,9 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
                 nextCallback = scb;
             }
         });
+        if (txn instanceof TransactionImpl) {
+            ((TransactionImpl) txn).registerProducedTopic(topic);
+        }
         return future;
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index d51e8ae..6c4dced 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -49,6 +49,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
+import lombok.Getter;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerBuilder;
@@ -64,6 +65,7 @@ import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.schema.SchemaInfoProvider;
 import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.transaction.TransactionBuilder;
+import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
@@ -72,6 +74,7 @@ import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
 import org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema;
 import org.apache.pulsar.client.impl.schema.generic.MultiVersionSchemaInfoProvider;
 import org.apache.pulsar.client.impl.transaction.TransactionBuilderImpl;
+import org.apache.pulsar.client.impl.transaction.TransactionCoordinatorClientImpl;
 import org.apache.pulsar.client.util.ExecutorProvider;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode;
 import org.apache.pulsar.common.naming.NamespaceName;
@@ -119,6 +122,9 @@ public class PulsarClientImpl implements PulsarClient {
 
     private final Clock clientClock;
 
+    @Getter
+    private TransactionCoordinatorClientImpl tcClient;
+
     public PulsarClientImpl(ClientConfigurationData conf) throws PulsarClientException {
         this(conf, getEventLoopGroup(conf));
     }
@@ -147,6 +153,17 @@ public class PulsarClientImpl implements PulsarClient {
         timer = new HashedWheelTimer(getThreadFactory("pulsar-timer"), 1, TimeUnit.MILLISECONDS);
         producers = Maps.newIdentityHashMap();
         consumers = Maps.newIdentityHashMap();
+
+        if (conf.isEnableTransaction()) {
+            tcClient = new TransactionCoordinatorClientImpl(this);
+            try {
+                tcClient.start();
+            } catch (Throwable e) {
+                log.error("Start transactionCoordinatorClient error.", e);
+                throw new PulsarClientException(e);
+            }
+        }
+
         state.set(State.Open);
     }
 
@@ -823,7 +840,7 @@ public class PulsarClientImpl implements PulsarClient {
     // are completed.
     // @Override
     public TransactionBuilder newTransaction() {
-        return new TransactionBuilderImpl(this);
+        return new TransactionBuilderImpl(this, tcClient);
     }
 
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
index fd4a1d3..37d85c8 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
@@ -148,7 +148,8 @@ public class TransactionMetaStoreHandler extends HandlerState implements Connect
             return callback;
         }
         long requestId = client.newRequestId();
-        ByteBuf cmd = Commands.newAddPartitionToTxn(requestId, txnID.getLeastSigBits(), txnID.getMostSigBits());
+        ByteBuf cmd = Commands.newAddPartitionToTxn(
+                requestId, txnID.getLeastSigBits(), txnID.getMostSigBits(), partitions);
         OpForVoidCallBack op = OpForVoidCallBack.create(cmd, callback);
         pendingRequests.put(requestId, op);
         timeoutQueue.add(new RequestTime(System.currentTimeMillis(), requestId));
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
index 613bf02..3eaa782 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
@@ -92,12 +92,8 @@ public class TypedMessageBuilderImpl<T> implements TypedMessageBuilder<T> {
     @Override
     public CompletableFuture<MessageId> sendAsync() {
         long sequenceId = beforeSend();
-        CompletableFuture<MessageId> sendFuture = producer.internalSendAsync(getMessage());
+        CompletableFuture<MessageId> sendFuture = producer.internalSendAsync(getMessage(), txn);
         if (txn != null) {
-            // it is okay that we register produced topic after sending the messages. because
-            // the transactional messages will not be visible for consumers until the transaction
-            // is committed.
-            txn.registerProducedTopic(producer.getTopic());
             // register the sendFuture as part of the transaction
             return txn.registerSendOp(sequenceId, sendFuture);
         } else {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
index 7bedc01..2185037 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
@@ -91,7 +91,10 @@ public class ClientConfigurationData implements Serializable, Cloneable {
     /** proxyServiceUrl and proxyProtocol must be mutually inclusive **/
     private String proxyServiceUrl;
     private ProxyProtocol proxyProtocol;
-    
+
+    // transaction
+    private boolean enableTransaction = false;
+
     @JsonIgnore
     private Clock clock = Clock.systemDefaultZone();
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionBuilderImpl.java
index 735f980..0b848fc 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionBuilderImpl.java
@@ -18,22 +18,28 @@
  */
 package org.apache.pulsar.client.impl.transaction;
 
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.transaction.Transaction;
 import org.apache.pulsar.client.api.transaction.TransactionBuilder;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
 /**
  * The default implementation of transaction builder to build transactions.
  */
+@Slf4j
 public class TransactionBuilderImpl implements TransactionBuilder {
 
     private final PulsarClientImpl client;
+    private final TransactionCoordinatorClientImpl transactionCoordinatorClient;
     private long txnTimeoutMs = 60000; // 1 minute
+    private static final long txnRequestTimeoutMs = 1000 * 30; // 30 seconds
 
-    public TransactionBuilderImpl(PulsarClientImpl client) {
+    public TransactionBuilderImpl(PulsarClientImpl client, TransactionCoordinatorClientImpl tcClient) {
         this.client = client;
+        this.transactionCoordinatorClient = tcClient;
     }
 
     @Override
@@ -44,12 +50,26 @@ public class TransactionBuilderImpl implements TransactionBuilder {
 
     @Override
     public CompletableFuture<Transaction> build() {
-        // TODO: talk to TC to begin a transaction
+        // talk to TC to begin a transaction
         //       the builder is responsible for locating the transaction coorindator (TC)
         //       and start the transaction to get the transaction id.
         //       After getting the transaction id, all the operations are handled by the
         //       `TransactionImpl`
-        return CompletableFuture.completedFuture(
-            new TransactionImpl(client, txnTimeoutMs, -1L, -1L));
+        CompletableFuture<Transaction> future = new CompletableFuture<>();
+        transactionCoordinatorClient
+                .newTransactionAsync(txnRequestTimeoutMs, TimeUnit.MILLISECONDS)
+                .whenComplete((txnID, throwable) -> {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Success to new txn. txnID: {}", txnID);
+                    }
+                    if (throwable != null) {
+                        log.error("New transaction error.", throwable);
+                        future.completeExceptionally(throwable);
+                        return;
+                    }
+                    future.complete(new TransactionImpl(client, txnTimeoutMs,
+                            txnID.getLeastSigBits(), txnID.getMostSigBits()));
+                });
+        return future;
     }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
index 062652c..368d805 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
@@ -23,10 +23,14 @@ import java.util.LinkedHashMap;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.collect.Lists;
 import lombok.Data;
 import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.transaction.Transaction;
+import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.common.util.FutureUtil;
 
@@ -39,6 +43,7 @@ import org.apache.pulsar.common.util.FutureUtil;
  * failures. This decouples the transactional operations from non-transactional operations as
  * much as possible.
  */
+@Slf4j
 @Getter
 public class TransactionImpl implements Transaction {
 
@@ -63,6 +68,7 @@ public class TransactionImpl implements Transaction {
     private final Set<String> producedTopics;
     private final Set<TransactionalAckOp> ackOps;
     private final Set<String> ackedTopics;
+    private final TransactionCoordinatorClientImpl tcClient;
 
     TransactionImpl(PulsarClientImpl client,
                     long transactionTimeoutMs,
@@ -76,6 +82,7 @@ public class TransactionImpl implements Transaction {
         this.producedTopics = new HashSet<>();
         this.ackOps = new HashSet<>();
         this.ackedTopics = new HashSet<>();
+        this.tcClient = client.getTcClient();
     }
 
     public long nextSequenceId() {
@@ -86,6 +93,7 @@ public class TransactionImpl implements Transaction {
     public synchronized void registerProducedTopic(String topic) {
         if (producedTopics.add(topic)) {
             // TODO: we need to issue the request to TC to register the produced topic
+            tcClient.addPublishPartitionToTxnAsync(new TxnID(txnIdMostBits, txnIdLeastBits), Lists.newArrayList(topic));
         }
     }
 
@@ -119,7 +127,13 @@ public class TransactionImpl implements Transaction {
 
     @Override
     public CompletableFuture<Void> commit() {
-        return FutureUtil.failedFuture(new UnsupportedOperationException("Not Implemented Yet"));
+        return tcClient.commitAsync(new TxnID(txnIdMostBits, txnIdLeastBits)).whenComplete((ignored, throwable) -> {
+            sendOps.values().forEach(txnSendOp -> {
+                txnSendOp.sendFuture.whenComplete((messageId, t) -> {
+                    txnSendOp.transactionalSendFuture.complete(messageId);
+                });
+            });
+        });
     }
 
     @Override
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index 2f822e8..4c3b885 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -1578,15 +1578,6 @@ public final class PulsarApi {
               ackSet_.add(input.readInt64());
               break;
             }
-            case 42: {
-              int length = input.readRawVarint32();
-              int limit = input.pushLimit(length);
-              while (input.getBytesUntilLimit() > 0) {
-                addAckSet(input.readInt64());
-              }
-              input.popLimit(limit);
-              break;
-            }
           }
         }
       }
@@ -3699,11 +3690,11 @@ public final class PulsarApi {
     boolean hasMarkerType();
     int getMarkerType();
     
-    // optional uint64 txnid_least_bits = 22 [default = 0];
+    // optional uint64 txnid_least_bits = 22;
     boolean hasTxnidLeastBits();
     long getTxnidLeastBits();
     
-    // optional uint64 txnid_most_bits = 23 [default = 0];
+    // optional uint64 txnid_most_bits = 23;
     boolean hasTxnidMostBits();
     long getTxnidMostBits();
     
@@ -4074,7 +4065,7 @@ public final class PulsarApi {
       return markerType_;
     }
     
-    // optional uint64 txnid_least_bits = 22 [default = 0];
+    // optional uint64 txnid_least_bits = 22;
     public static final int TXNID_LEAST_BITS_FIELD_NUMBER = 22;
     private long txnidLeastBits_;
     public boolean hasTxnidLeastBits() {
@@ -4084,7 +4075,7 @@ public final class PulsarApi {
       return txnidLeastBits_;
     }
     
-    // optional uint64 txnid_most_bits = 23 [default = 0];
+    // optional uint64 txnid_most_bits = 23;
     public static final int TXNID_MOST_BITS_FIELD_NUMBER = 23;
     private long txnidMostBits_;
     public boolean hasTxnidMostBits() {
@@ -5742,7 +5733,7 @@ public final class PulsarApi {
         return this;
       }
       
-      // optional uint64 txnid_least_bits = 22 [default = 0];
+      // optional uint64 txnid_least_bits = 22;
       private long txnidLeastBits_ ;
       public boolean hasTxnidLeastBits() {
         return ((bitField0_ & 0x00080000) == 0x00080000);
@@ -5763,7 +5754,7 @@ public final class PulsarApi {
         return this;
       }
       
-      // optional uint64 txnid_most_bits = 23 [default = 0];
+      // optional uint64 txnid_most_bits = 23;
       private long txnidMostBits_ ;
       public boolean hasTxnidMostBits() {
         return ((bitField0_ & 0x00100000) == 0x00100000);
@@ -18869,15 +18860,6 @@ public final class PulsarApi {
               ackSet_.add(input.readInt64());
               break;
             }
-            case 34: {
-              int length = input.readRawVarint32();
-              int limit = input.pushLimit(length);
-              while (input.getBytesUntilLimit() > 0) {
-                addAckSet(input.readInt64());
-              }
-              input.popLimit(limit);
-              break;
-            }
           }
         }
       }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarMarkers.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarMarkers.java
index 5ef2434..2aac22c 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarMarkers.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarMarkers.java
@@ -15,8 +15,9 @@ public final class PulsarMarkers {
     REPLICATED_SUBSCRIPTION_SNAPSHOT_RESPONSE(2, 11),
     REPLICATED_SUBSCRIPTION_SNAPSHOT(3, 12),
     REPLICATED_SUBSCRIPTION_UPDATE(4, 13),
-    TXN_COMMIT(5, 20),
-    TXN_ABORT(6, 21),
+    TXN_COMMITTING(5, 20),
+    TXN_COMMIT(6, 21),
+    TXN_ABORT(7, 22),
     ;
     
     public static final int UNKNOWN_MARKER_VALUE = 0;
@@ -24,8 +25,9 @@ public final class PulsarMarkers {
     public static final int REPLICATED_SUBSCRIPTION_SNAPSHOT_RESPONSE_VALUE = 11;
     public static final int REPLICATED_SUBSCRIPTION_SNAPSHOT_VALUE = 12;
     public static final int REPLICATED_SUBSCRIPTION_UPDATE_VALUE = 13;
-    public static final int TXN_COMMIT_VALUE = 20;
-    public static final int TXN_ABORT_VALUE = 21;
+    public static final int TXN_COMMITTING_VALUE = 20;
+    public static final int TXN_COMMIT_VALUE = 21;
+    public static final int TXN_ABORT_VALUE = 22;
     
     
     public final int getNumber() { return value; }
@@ -37,8 +39,9 @@ public final class PulsarMarkers {
         case 11: return REPLICATED_SUBSCRIPTION_SNAPSHOT_RESPONSE;
         case 12: return REPLICATED_SUBSCRIPTION_SNAPSHOT;
         case 13: return REPLICATED_SUBSCRIPTION_UPDATE;
-        case 20: return TXN_COMMIT;
-        case 21: return TXN_ABORT;
+        case 20: return TXN_COMMITTING;
+        case 21: return TXN_COMMIT;
+        case 22: return TXN_ABORT;
         default: return null;
       }
     }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 16da307..b33ff6d 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -467,12 +467,17 @@ public class Commands {
 
     public static ByteBufPair newSend(long producerId, long sequenceId, int numMessaegs, ChecksumType checksumType,
                                       MessageMetadata messageMetadata, ByteBuf payload) {
-        return newSend(producerId, sequenceId, numMessaegs, 0, 0, checksumType, messageMetadata, payload);
+        return newSend(producerId, sequenceId, numMessaegs,
+                messageMetadata.hasTxnidLeastBits() ? messageMetadata.getTxnidLeastBits() : -1,
+                messageMetadata.hasTxnidMostBits() ? messageMetadata.getTxnidMostBits() : -1,
+                checksumType, messageMetadata, payload);
     }
 
     public static ByteBufPair newSend(long producerId, long lowestSequenceId, long highestSequenceId, int numMessaegs,
               ChecksumType checksumType, MessageMetadata messageMetadata, ByteBuf payload) {
-        return newSend(producerId, lowestSequenceId, highestSequenceId, numMessaegs, 0, 0,
+        return newSend(producerId, lowestSequenceId, highestSequenceId, numMessaegs,
+                messageMetadata.hasTxnidLeastBits() ? messageMetadata.getTxnidLeastBits() : -1,
+                messageMetadata.hasTxnidMostBits() ? messageMetadata.getTxnidMostBits() : -1,
                 checksumType, messageMetadata, payload);
     }
 
@@ -485,10 +490,10 @@ public class Commands {
         if (numMessages > 1) {
             sendBuilder.setNumMessages(numMessages);
         }
-        if (txnIdLeastBits > 0) {
+        if (txnIdLeastBits >= 0) {
             sendBuilder.setTxnidLeastBits(txnIdLeastBits);
         }
-        if (txnIdMostBits > 0) {
+        if (txnIdMostBits >= 0) {
             sendBuilder.setTxnidMostBits(txnIdMostBits);
         }
         if (messageData.hasTotalChunkMsgSize() && messageData.getTotalChunkMsgSize() > 1) {
@@ -513,10 +518,10 @@ public class Commands {
         if (numMessages > 1) {
             sendBuilder.setNumMessages(numMessages);
         }
-        if (txnIdLeastBits > 0) {
+        if (txnIdLeastBits >= 0) {
             sendBuilder.setTxnidLeastBits(txnIdLeastBits);
         }
-        if (txnIdMostBits > 0) {
+        if (txnIdMostBits >= 0) {
             sendBuilder.setTxnidMostBits(txnIdMostBits);
         }
         if (messageData.hasTotalChunkMsgSize() && messageData.getTotalChunkMsgSize() > 1) {
@@ -1271,12 +1276,16 @@ public class Commands {
         return res;
     }
 
-    public static ByteBuf newAddPartitionToTxn(long requestId, long txnIdLeastBits, long txnIdMostBits) {
-        CommandAddPartitionToTxn commandAddPartitionToTxn = CommandAddPartitionToTxn.newBuilder()
-                                                                                    .setRequestId(requestId)
-                                                                                    .setTxnidLeastBits(txnIdLeastBits)
-                                                                                    .setTxnidMostBits(txnIdMostBits)
-                                                                                    .build();
+    public static ByteBuf newAddPartitionToTxn(long requestId, long txnIdLeastBits, long txnIdMostBits,
+                                               List<String> partitions) {
+        PulsarApi.CommandAddPartitionToTxn.Builder builder = CommandAddPartitionToTxn.newBuilder();
+        builder.setRequestId(requestId);
+        builder.setTxnidLeastBits(txnIdLeastBits);
+        builder.setTxnidMostBits(txnIdMostBits);
+        if (partitions != null) {
+            builder.addAllPartitions(partitions);
+        }
+        CommandAddPartitionToTxn commandAddPartitionToTxn = builder.build();
         ByteBuf res = serializeWithSize(
             BaseCommand.newBuilder().setType(Type.ADD_PARTITION_TO_TXN).setAddPartitionToTxn(commandAddPartitionToTxn));
         commandAddPartitionToTxn.recycle();
@@ -1470,9 +1479,12 @@ public class Commands {
         return res;
     }
 
-    public static ByteBuf newEndTxnOnSubscriptionResponse(long requestId, ServerError error, String errorMsg) {
+    public static ByteBuf newEndTxnOnSubscriptionResponse(long requestId, long txnIdLeastBits, long txnIdMostBits,
+                                                          ServerError error, String errorMsg) {
         CommandEndTxnOnSubscriptionResponse.Builder builder = CommandEndTxnOnSubscriptionResponse.newBuilder();
         builder.setRequestId(requestId);
+        builder.setTxnidMostBits(txnIdMostBits);
+        builder.setTxnidLeastBits(txnIdLeastBits);
         builder.setError(error);
         if (errorMsg != null) {
             builder.setMessage(errorMsg);
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Markers.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Markers.java
index 34942da..3c5c525 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Markers.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Markers.java
@@ -259,6 +259,12 @@ public class Markers {
                && msgMetadata.getMarkerType() == MarkerType.TXN_COMMIT_VALUE;
     }
 
+    public static ByteBuf newTxnCommittingMarker(long sequenceId, long txnMostBits,
+                                                 long txnLeastBits, MessageIdData messageIdData) {
+        return newTxnMarker(MarkerType.TXN_COMMITTING, sequenceId,
+                txnMostBits, txnLeastBits, Optional.of(messageIdData));
+    }
+
     public static ByteBuf newTxnCommitMarker(long sequenceId, long txnMostBits,
                                              long txnLeastBits, MessageIdData messageIdData) {
         return newTxnMarker(MarkerType.TXN_COMMIT, sequenceId, txnMostBits, txnLeastBits, Optional.of(messageIdData));
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto
index c4acca2..63ff6ee 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -131,8 +131,8 @@ message MessageMetadata {
     optional int32 marker_type = 20;
 
     // transaction related message info
-    optional uint64 txnid_least_bits = 22 [default = 0];
-    optional uint64 txnid_most_bits = 23 [default = 0];
+    optional uint64 txnid_least_bits = 22;
+    optional uint64 txnid_most_bits = 23;
 
     /// Add highest sequence id to support batch message with external sequence id
     optional uint64 highest_sequence_id = 24 [default = 0];
diff --git a/pulsar-common/src/main/proto/PulsarMarkers.proto b/pulsar-common/src/main/proto/PulsarMarkers.proto
index 8e2ac21..c8c55db 100644
--- a/pulsar-common/src/main/proto/PulsarMarkers.proto
+++ b/pulsar-common/src/main/proto/PulsarMarkers.proto
@@ -32,8 +32,9 @@ enum MarkerType {
     REPLICATED_SUBSCRIPTION_UPDATE            = 13;
 
     // Next markers start at 20
-    TXN_COMMIT = 20;
-    TXN_ABORT = 21;
+    TXN_COMMITTING = 20;
+    TXN_COMMIT = 21;
+    TXN_ABORT = 22;
 }
 
 /// --- Replicated subscriptions ---