You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xi...@apache.org on 2023/02/09 03:51:43 UTC

[pulsar] branch xiangying/cherry-pick/TC_auth created (now e1133b89a45)

This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a change to branch xiangying/cherry-pick/TC_auth
in repository https://gitbox.apache.org/repos/asf/pulsar.git


      at e1133b89a45 [improve][txn] Allow superusers to abort transactions (#19467)

This branch includes the following new commits:

     new e1133b89a45 [improve][txn] Allow superusers to abort transactions (#19467)

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[pulsar] 01/01: [improve][txn] Allow superusers to abort transactions (#19467)

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch xiangying/cherry-pick/TC_auth
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit e1133b89a45c89d4b1f4870c5061c33f3dfcfd38
Author: Nicolò Boschi <bo...@gmail.com>
AuthorDate: Thu Feb 9 02:08:19 2023 +0100

    [improve][txn] Allow superusers to abort transactions (#19467)
    
    Super users must be always allowed to abort a transaction even if they're not the original owner.
    
    * Check that only owner or superusers are allowed to perform txn operations (end, add partition and add subscription)
    
    (cherry picked from commit 459a7a57c1b67cfe161cdc40c007a1c2e403b7cd)
---
 .../org/apache/pulsar/broker/PulsarService.java    |   4 +
 .../broker/TransactionMetadataStoreService.java    |  22 +-
 .../pulsar/broker/admin/impl/TransactionsBase.java |   1 +
 .../apache/pulsar/broker/service/ServerCnx.java    | 138 +++++--
 .../pulsar/broker/service/ServerCnxTest.java       | 423 ++++++++++++++++++++-
 .../TransactionMetadataStoreServiceTest.java       |  36 +-
 .../broker/stats/TransactionMetricsTest.java       |   8 +-
 ...thenticatedTransactionProducerConsumerTest.java | 329 ++++++++++++++++
 .../broker/transaction/TransactionTestBase.java    |  21 +-
 .../client/impl/TransactionEndToEndTest.java       |   2 +-
 .../common/policies/data/TransactionMetadata.java  |   3 +
 .../coordinator/TransactionMetadataStore.java      |   3 +-
 .../pulsar/transaction/coordinator/TxnMeta.java    |   7 +
 .../impl/InMemTransactionMetadataStore.java        |  11 +-
 .../impl/MLTransactionMetadataStore.java           |  25 +-
 .../transaction/coordinator/impl/TxnMetaImpl.java  |   8 +-
 .../src/main/proto/PulsarTransactionMetadata.proto |   1 +
 .../MLTransactionMetadataStoreTest.java            |  31 +-
 .../TransactionMetadataStoreProviderTest.java      |  12 +-
 19 files changed, 992 insertions(+), 93 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 5c239e73bf5..a36a7f5c70b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -1262,6 +1262,10 @@ public class PulsarService implements AutoCloseable, ShutdownService {
         });
     }
 
+    public boolean isRunning() {
+        return this.state == State.Started || this.state == State.Init;
+    }
+
     public LedgerOffloader createManagedLedgerOffloader(OffloadPoliciesImpl offloadPolicies)
             throws PulsarServerException {
         try {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
index 902546958c5..d65c448d571 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
@@ -289,12 +289,13 @@ public class TransactionMetadataStoreService {
         }
     }
 
-    public CompletableFuture<TxnID> newTransaction(TransactionCoordinatorID tcId, long timeoutInMills) {
+    public CompletableFuture<TxnID> newTransaction(TransactionCoordinatorID tcId, long timeoutInMills,
+                                                   String owner) {
         TransactionMetadataStore store = stores.get(tcId);
         if (store == null) {
             return FutureUtil.failedFuture(new CoordinatorNotFoundException(tcId));
         }
-        return store.newTransaction(timeoutInMills);
+        return store.newTransaction(timeoutInMills, owner);
     }
 
     public CompletableFuture<Void> addProducedPartitionToTxn(TxnID txnId, List<String> partitions) {
@@ -516,7 +517,22 @@ public class TransactionMetadataStoreService {
         return Collections.unmodifiableMap(stores);
     }
 
-    public synchronized void close () {
+    public CompletableFuture<Boolean> verifyTxnOwnership(TxnID txnID, String checkOwner) {
+        return getTxnMeta(txnID)
+                .thenCompose(meta -> {
+                    // owner was null in the old versions or no auth enabled
+                    if (meta.getOwner() == null) {
+                        return CompletableFuture.completedFuture(true);
+                    }
+                    if (meta.getOwner().equals(checkOwner)) {
+                        return CompletableFuture.completedFuture(true);
+                    }
+                    return CompletableFuture.completedFuture(false);
+                });
+    }
+
+
+    public void close () {
         this.internalPinnedExecutor.shutdown();
         stores.forEach((tcId, metadataStore) -> {
             metadataStore.closeAsync().whenComplete((v, ex) -> {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
index 8eff6815404..6af1919ebbe 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
@@ -182,6 +182,7 @@ public abstract class TransactionsBase extends AdminResource {
         transactionMetadata.status = txnMeta.status().name();
         transactionMetadata.openTimestamp = txnMeta.getOpenTimestamp();
         transactionMetadata.timeoutAt = txnMeta.getTimeoutAt();
+        transactionMetadata.owner = txnMeta.getOwner();
 
         List<CompletableFuture<TransactionInPendingAckStats>> ackedPartitionsFutures = new ArrayList<>();
         Map<String, Map<String, CompletableFuture<TransactionInPendingAckStats>>> ackFutures = new HashMap<>();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 4d32d1d6d35..1d80c02f356 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -44,6 +44,7 @@ import java.net.SocketAddress;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.IdentityHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Objects;
@@ -2226,7 +2227,8 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
 
         TransactionMetadataStoreService transactionMetadataStoreService =
                 service.pulsar().getTransactionMetadataStoreService();
-        transactionMetadataStoreService.newTransaction(tcId, command.getTxnTtlSeconds())
+        final String owner = getPrincipal();
+        transactionMetadataStoreService.newTransaction(tcId, command.getTxnTtlSeconds(), owner)
             .whenComplete(((txnID, ex) -> {
                 if (ex == null) {
                     if (log.isDebugEnabled()) {
@@ -2261,9 +2263,15 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
 
         TransactionMetadataStoreService transactionMetadataStoreService =
                 service.pulsar().getTransactionMetadataStoreService();
-        service.pulsar().getTransactionMetadataStoreService().addProducedPartitionToTxn(txnID,
-                command.getPartitionsList())
-                .whenComplete(((v, ex) -> {
+        verifyTxnOwnership(txnID)
+                .thenCompose(isOwner -> {
+                    if (!isOwner) {
+                        return failedFutureTxnNotOwned(txnID);
+                    }
+                    return transactionMetadataStoreService
+                            .addProducedPartitionToTxn(txnID, command.getPartitionsList());
+                })
+                .whenComplete((v, ex) -> {
                     if (ex == null) {
                         if (log.isDebugEnabled()) {
                             log.debug("Send response success for add published partition to txn request {}", requestId);
@@ -2278,7 +2286,25 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                                 ex.getMessage()));
                         transactionMetadataStoreService.handleOpFail(ex, tcId);
                     }
-            }));
+                });
+    }
+
+    private CompletableFuture<Void> failedFutureTxnNotOwned(TxnID txnID) {
+        String msg = String.format(
+                "Client (%s) is neither the owner of the transaction %s nor a super user",
+                getPrincipal(), txnID
+        );
+        log.warn("[{}] {}", remoteAddress, msg);
+        return FutureUtil.failedFuture(new CoordinatorException.TransactionNotFoundException(msg));
+    }
+
+    private CompletableFuture<Void> failedFutureTxnTcNotAllowed(TxnID txnID) {
+        String msg = String.format(
+                "TC client (%s) is not a super user, and is not allowed to operate on transaction %s",
+                getPrincipal(), txnID
+        );
+        log.warn("[{}] {}", remoteAddress, msg);
+        return FutureUtil.failedFuture(new CoordinatorException.TransactionNotFoundException(msg));
     }
 
     @Override
@@ -2295,8 +2321,13 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
         TransactionMetadataStoreService transactionMetadataStoreService =
                 service.pulsar().getTransactionMetadataStoreService();
 
-        transactionMetadataStoreService
-                .endTransaction(txnID, txnAction, false)
+        verifyTxnOwnership(txnID)
+                .thenCompose(isOwner -> {
+                    if (!isOwner) {
+                        return failedFutureTxnNotOwned(txnID);
+                    }
+                    return transactionMetadataStoreService.endTransaction(txnID, txnAction, false);
+                })
                 .whenComplete((v, ex) -> {
                     if (ex == null) {
                         ctx.writeAndFlush(Commands.newEndTxnResponse(requestId,
@@ -2311,6 +2342,34 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                 });
     }
 
+    private CompletableFuture<Boolean> verifyTxnOwnershipForTCToBrokerCommands() {
+        if (service.isAuthenticationEnabled() && service.isAuthorizationEnabled()) {
+            return getBrokerService()
+                    .getAuthorizationService()
+                    .isSuperUser(getPrincipal(), getAuthenticationData());
+        } else {
+            return CompletableFuture.completedFuture(true);
+        }
+    }
+
+    private CompletableFuture<Boolean> verifyTxnOwnership(TxnID txnID) {
+        final String checkOwner = getPrincipal();
+        return service.pulsar().getTransactionMetadataStoreService()
+                .verifyTxnOwnership(txnID, checkOwner)
+                .thenCompose(isOwner -> {
+                    if (isOwner) {
+                        return CompletableFuture.completedFuture(true);
+                    }
+                    if (service.isAuthenticationEnabled() && service.isAuthorizationEnabled()) {
+                        return getBrokerService()
+                                .getAuthorizationService()
+                                .isSuperUser(checkOwner, getAuthenticationData());
+                    } else {
+                        return CompletableFuture.completedFuture(false);
+                    }
+                });
+    }
+
     @Override
     protected void handleEndTxnOnPartition(CommandEndTxnOnPartition command) {
         final long requestId = command.getRequestId();
@@ -2326,9 +2385,17 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
         CompletableFuture<Optional<Topic>> topicFuture = service.getTopicIfExists(TopicName.get(topic).toString());
         topicFuture.thenAccept(optionalTopic -> {
             if (optionalTopic.isPresent()) {
-                optionalTopic.get().endTxn(txnID, txnAction, lowWaterMark)
+                // we only accept super user becase this endpoint is reserved for tc to broker communication
+                verifyTxnOwnershipForTCToBrokerCommands()
+                        .thenCompose(isOwner -> {
+                            if (!isOwner) {
+                                return failedFutureTxnTcNotAllowed(txnID);
+                            }
+                            return optionalTopic.get().endTxn(txnID, txnAction, lowWaterMark);
+                        })
                         .whenComplete((ignored, throwable) -> {
                             if (throwable != null) {
+                                throwable = FutureUtil.unwrapCompletionException(throwable);
                                 log.error("handleEndTxnOnPartition fail!, topic {}, txnId: [{}], "
                                         + "txnAction: [{}]", topic, txnID, TxnAction.valueOf(txnAction), throwable);
                                 ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(
@@ -2340,7 +2407,6 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                             ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(requestId,
                                     txnID.getLeastSigBits(), txnID.getMostSigBits()));
                         });
-
             } else {
                 getBrokerService().getManagedLedgerFactory()
                         .asyncExists(TopicName.get(topic).getPersistenceNamingEncoding())
@@ -2409,23 +2475,28 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                             Commands.newEndTxnOnSubscriptionResponse(requestId, txnidLeastBits, txnidMostBits));
                     return;
                 }
-
-                CompletableFuture<Void> completableFuture =
-                        subscription.endTxn(txnidMostBits, txnidLeastBits, txnAction, lowWaterMark);
-                completableFuture.whenComplete((ignored, e) -> {
-                    if (e != null) {
-                        log.error("handleEndTxnOnSubscription fail ! topic: {}, subscription: {}"
-                                        + "txnId: [{}], txnAction: [{}]", topic, subName,
-                                txnID, TxnAction.valueOf(txnAction), e.getCause());
-                        ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
-                                requestId, txnidLeastBits, txnidMostBits,
-                                BrokerServiceException.getClientErrorCode(e),
-                                "Handle end txn on subscription failed."));
-                        return;
-                    }
-                    ctx.writeAndFlush(
-                            Commands.newEndTxnOnSubscriptionResponse(requestId, txnidLeastBits, txnidMostBits));
-                });
+                // we only accept super user becase this endpoint is reserved for tc to broker communication
+                verifyTxnOwnershipForTCToBrokerCommands()
+                        .thenCompose(isOwner -> {
+                            if (!isOwner) {
+                                return failedFutureTxnTcNotAllowed(txnID);
+                            }
+                            return subscription.endTxn(txnidMostBits, txnidLeastBits, txnAction, lowWaterMark);
+                        }).whenComplete((ignored, e) -> {
+                            if (e != null) {
+                                e = FutureUtil.unwrapCompletionException(e);
+                                log.error("handleEndTxnOnSubscription fail ! topic: {}, subscription: {}"
+                                                + "txnId: [{}], txnAction: [{}]", topic, subName,
+                                        txnID, TxnAction.valueOf(txnAction), e.getCause());
+                                ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
+                                        requestId, txnidLeastBits, txnidMostBits,
+                                        BrokerServiceException.getClientErrorCode(e),
+                                        "Handle end txn on subscription failed: " + e.getMessage()));
+                                return;
+                            }
+                            ctx.writeAndFlush(
+                                    Commands.newEndTxnOnSubscriptionResponse(requestId, txnidLeastBits, txnidMostBits));
+                        });
             } else {
                 getBrokerService().getManagedLedgerFactory()
                         .asyncExists(TopicName.get(topic).getPersistenceNamingEncoding())
@@ -2490,6 +2561,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
     protected void handleAddSubscriptionToTxn(CommandAddSubscriptionToTxn command) {
         final TxnID txnID = new TxnID(command.getTxnidMostBits(), command.getTxnidLeastBits());
         final long requestId = command.getRequestId();
+        final List<org.apache.pulsar.common.api.proto.Subscription> subscriptionsList = command.getSubscriptionsList();
         if (log.isDebugEnabled()) {
             log.debug("Receive add published partition to txn request {} from {} with txnId {}",
                     requestId, remoteAddress, txnID);
@@ -2504,9 +2576,15 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
         TransactionMetadataStoreService transactionMetadataStoreService =
                 service.pulsar().getTransactionMetadataStoreService();
 
-        transactionMetadataStoreService.addAckedPartitionToTxn(txnID,
-                MLTransactionMetadataStore.subscriptionToTxnSubscription(command.getSubscriptionsList()))
-                .whenComplete(((v, ex) -> {
+        verifyTxnOwnership(txnID)
+                .thenCompose(isOwner -> {
+                    if (!isOwner) {
+                        return failedFutureTxnNotOwned(txnID);
+                    }
+                    return transactionMetadataStoreService.addAckedPartitionToTxn(txnID,
+                            MLTransactionMetadataStore.subscriptionToTxnSubscription(subscriptionsList));
+                })
+                .whenComplete((v, ex) -> {
                     if (ex == null) {
                         if (log.isDebugEnabled()) {
                             log.debug("Send response success for add published partition to txn request {}",
@@ -2522,7 +2600,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                                 ex.getMessage()));
                         transactionMetadataStoreService.handleOpFail(ex, tcId);
                     }
-                }));
+                });
     }
 
     @Override
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index ee1789b322f..c3a34dbadbb 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -22,6 +22,9 @@ import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructor
 import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockBookKeeper;
 import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockZooKeeper;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.CALLS_REAL_METHODS;
 import static org.mockito.Mockito.doAnswer;
@@ -37,6 +40,7 @@ import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
@@ -50,6 +54,7 @@ import java.lang.reflect.Field;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
@@ -72,6 +77,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.TransactionMetadataStoreService;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.broker.authentication.AuthenticationProvider;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
@@ -88,16 +94,23 @@ import org.apache.pulsar.broker.service.ServerCnx.State;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.service.schema.DefaultSchemaRegistryService;
 import org.apache.pulsar.broker.service.utils.ClientChannelHelper;
+import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.common.api.AuthData;
 import org.apache.pulsar.common.api.proto.AuthMethod;
 import org.apache.pulsar.common.api.proto.BaseCommand;
 import org.apache.pulsar.common.api.proto.BaseCommand.Type;
 import org.apache.pulsar.common.api.proto.CommandAck.AckType;
+import org.apache.pulsar.common.api.proto.CommandAddPartitionToTxnResponse;
+import org.apache.pulsar.common.api.proto.CommandAddSubscriptionToTxnResponse;
 import org.apache.pulsar.common.api.proto.CommandAuthResponse;
 import org.apache.pulsar.common.api.proto.CommandCloseProducer;
 import org.apache.pulsar.common.api.proto.CommandConnected;
+import org.apache.pulsar.common.api.proto.CommandEndTxnOnPartitionResponse;
+import org.apache.pulsar.common.api.proto.CommandEndTxnOnSubscriptionResponse;
+import org.apache.pulsar.common.api.proto.CommandEndTxnResponse;
 import org.apache.pulsar.common.api.proto.CommandError;
 import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse;
+import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadataResponse;
 import org.apache.pulsar.common.api.proto.CommandProducerSuccess;
 import org.apache.pulsar.common.api.proto.CommandSendError;
 import org.apache.pulsar.common.api.proto.CommandSendReceipt;
@@ -107,6 +120,7 @@ import org.apache.pulsar.common.api.proto.CommandSuccess;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.api.proto.ProtocolVersion;
 import org.apache.pulsar.common.api.proto.ServerError;
+import org.apache.pulsar.common.api.proto.TxnAction;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.Policies;
@@ -119,6 +133,7 @@ import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 import org.apache.pulsar.metadata.impl.ZKMetadataStore;
 import org.apache.zookeeper.ZooKeeper;
+import org.apache.pulsar.transaction.coordinator.TxnMeta;
 import org.awaitility.Awaitility;
 import org.mockito.ArgumentCaptor;
 import org.mockito.MockedStatic;
@@ -1962,8 +1977,8 @@ public class ServerCnxTest {
     public void testNeverDelayConsumerFutureWhenNotFail() throws Exception{
         // Mock ServerCnx.field: consumers
         ConcurrentLongHashMap.Builder mapBuilder = Mockito.mock(ConcurrentLongHashMap.Builder.class);
-        Mockito.when(mapBuilder.expectedItems(Mockito.anyInt())).thenReturn(mapBuilder);
-        Mockito.when(mapBuilder.concurrencyLevel(Mockito.anyInt())).thenReturn(mapBuilder);
+        Mockito.when(mapBuilder.expectedItems(anyInt())).thenReturn(mapBuilder);
+        Mockito.when(mapBuilder.concurrencyLevel(anyInt())).thenReturn(mapBuilder);
         ConcurrentLongHashMap consumers = Mockito.mock(ConcurrentLongHashMap.class);
         Mockito.when(mapBuilder.build()).thenReturn(consumers);
         ArgumentCaptor<Long> ignoreArgumentCaptor = ArgumentCaptor.forClass(Long.class);
@@ -2019,7 +2034,7 @@ public class ServerCnxTest {
                     return false;
                 }
             };
-            Mockito.when(consumers.putIfAbsent(Mockito.anyLong(), Mockito.any())).thenReturn(existingConsumerFuture);
+            Mockito.when(consumers.putIfAbsent(anyLong(), Mockito.any())).thenReturn(existingConsumerFuture);
             // do test: delay complete after execute 'isDone()' many times
             // Why is the design so complicated, see: https://github.com/apache/pulsar/pull/15051
             try (MockedStatic<ConcurrentLongHashMap> theMock = Mockito.mockStatic(ConcurrentLongHashMap.class)) {
@@ -2058,12 +2073,12 @@ public class ServerCnxTest {
         }
         // case3: exists existingConsumerFuture, already complete and exception
         CompletableFuture existingConsumerFuture = Mockito.mock(CompletableFuture.class);
-        Mockito.when(consumers.putIfAbsent(Mockito.anyLong(), Mockito.any())).thenReturn(existingConsumerFuture);
+        Mockito.when(consumers.putIfAbsent(anyLong(), Mockito.any())).thenReturn(existingConsumerFuture);
         // make consumerFuture delay finish
         Mockito.when(existingConsumerFuture.isDone()).thenReturn(true);
         // when sync get return, future will return success value.
         Mockito.when(existingConsumerFuture.get()).thenThrow(new NullPointerException());
-        Mockito.when(existingConsumerFuture.get(Mockito.anyLong(), Mockito.any())).
+        Mockito.when(existingConsumerFuture.get(anyLong(), Mockito.any())).
                 thenThrow(new NullPointerException());
         Mockito.when(existingConsumerFuture.isCompletedExceptionally()).thenReturn(true);
         Mockito.when(existingConsumerFuture.getNow(Mockito.any())).thenThrow(new NullPointerException());
@@ -2117,4 +2132,402 @@ public class ServerCnxTest {
         verify(authResponse, times(1)).hasClientVersion();
         verify(authResponse, times(0)).getClientVersion();
     }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void shouldFailHandleGetTopicsOfNamespace() throws Exception {
+        ServerCnx serverCnx = mock(ServerCnx.class, CALLS_REAL_METHODS);
+        Field stateUpdater = ServerCnx.class.getDeclaredField("state");
+        stateUpdater.setAccessible(true);
+        stateUpdater.set(serverCnx, ServerCnx.State.Failed);
+        serverCnx.handleGetTopicsOfNamespace(any());
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void shouldFailHandleGetSchema() throws Exception {
+        ServerCnx serverCnx = mock(ServerCnx.class, CALLS_REAL_METHODS);
+        Field stateUpdater = ServerCnx.class.getDeclaredField("state");
+        stateUpdater.setAccessible(true);
+        stateUpdater.set(serverCnx, ServerCnx.State.Failed);
+        serverCnx.handleGetSchema(any());
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void shouldFailHandleGetOrCreateSchema() throws Exception {
+        ServerCnx serverCnx = mock(ServerCnx.class, CALLS_REAL_METHODS);
+        Field stateUpdater = ServerCnx.class.getDeclaredField("state");
+        stateUpdater.setAccessible(true);
+        stateUpdater.set(serverCnx, ServerCnx.State.Failed);
+        serverCnx.handleGetOrCreateSchema(any());
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void shouldFailHandleTcClientConnectRequest() throws Exception {
+        ServerCnx serverCnx = mock(ServerCnx.class, CALLS_REAL_METHODS);
+        Field stateUpdater = ServerCnx.class.getDeclaredField("state");
+        stateUpdater.setAccessible(true);
+        stateUpdater.set(serverCnx, ServerCnx.State.Failed);
+        serverCnx.handleTcClientConnectRequest(any());
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void shouldFailHandleNewTxn() throws Exception {
+        ServerCnx serverCnx = mock(ServerCnx.class, CALLS_REAL_METHODS);
+        Field stateUpdater = ServerCnx.class.getDeclaredField("state");
+        stateUpdater.setAccessible(true);
+        stateUpdater.set(serverCnx, ServerCnx.State.Failed);
+        serverCnx.handleNewTxn(any());
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void shouldFailHandleAddPartitionToTxn() throws Exception {
+        ServerCnx serverCnx = mock(ServerCnx.class, CALLS_REAL_METHODS);
+        Field stateUpdater = ServerCnx.class.getDeclaredField("state");
+        stateUpdater.setAccessible(true);
+        stateUpdater.set(serverCnx, ServerCnx.State.Failed);
+        serverCnx.handleAddPartitionToTxn(any());
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void shouldFailHandleEndTxn() throws Exception {
+        ServerCnx serverCnx = mock(ServerCnx.class, CALLS_REAL_METHODS);
+        Field stateUpdater = ServerCnx.class.getDeclaredField("state");
+        stateUpdater.setAccessible(true);
+        stateUpdater.set(serverCnx, ServerCnx.State.Failed);
+        serverCnx.handleEndTxn(any());
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void shouldFailHandleEndTxnOnPartition() throws Exception {
+        ServerCnx serverCnx = mock(ServerCnx.class, CALLS_REAL_METHODS);
+        Field stateUpdater = ServerCnx.class.getDeclaredField("state");
+        stateUpdater.setAccessible(true);
+        stateUpdater.set(serverCnx, ServerCnx.State.Failed);
+        serverCnx.handleEndTxnOnPartition(any());
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void shouldFailHandleEndTxnOnSubscription() throws Exception {
+        ServerCnx serverCnx = mock(ServerCnx.class, CALLS_REAL_METHODS);
+        Field stateUpdater = ServerCnx.class.getDeclaredField("state");
+        stateUpdater.setAccessible(true);
+        stateUpdater.set(serverCnx, ServerCnx.State.Failed);
+        serverCnx.handleEndTxnOnSubscription(any());
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void shouldFailHandleAddSubscriptionToTxn() throws Exception {
+        ServerCnx serverCnx = mock(ServerCnx.class, CALLS_REAL_METHODS);
+        Field stateUpdater = ServerCnx.class.getDeclaredField("state");
+        stateUpdater.setAccessible(true);
+        stateUpdater.set(serverCnx, ServerCnx.State.Failed);
+        serverCnx.handleAddSubscriptionToTxn(any());
+    }
+
+    @Test(timeOut = 30000)
+    public void handlePartitionMetadataRequestWithServiceNotReady() throws Exception {
+        resetChannel();
+        setChannelConnected();
+        doReturn(false).when(pulsar).isRunning();
+        assertTrue(channel.isActive());
+
+        ByteBuf clientCommand = Commands.newPartitionMetadataRequest(successTopicName, 1);
+        channel.writeInbound(clientCommand);
+        Object response = getResponse();
+        assertTrue(response instanceof CommandPartitionedTopicMetadataResponse);
+        assertEquals(((CommandPartitionedTopicMetadataResponse) response).getError(), ServerError.ServiceNotReady);
+        channel.finish();
+    }
+
+    @Test(timeOut = 30000)
+    public void sendAddPartitionToTxnResponse() throws Exception {
+        final TransactionMetadataStoreService txnStore = mock(TransactionMetadataStoreService.class);
+        when(txnStore.getTxnMeta(any())).thenReturn(CompletableFuture.completedFuture(mock(TxnMeta.class)));
+        when(txnStore.verifyTxnOwnership(any(), any())).thenReturn(CompletableFuture.completedFuture(true));
+        when(txnStore.addProducedPartitionToTxn(any(TxnID.class), any()))
+                .thenReturn(CompletableFuture.completedFuture(null));
+        when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore);
+        svcConfig.setTransactionCoordinatorEnabled(true);
+        resetChannel();
+        setChannelConnected();
+        ByteBuf clientCommand = Commands.newAddPartitionToTxn(89L, 1L, 12L,
+                Lists.newArrayList("tenant/ns/topic1"));
+        channel.writeInbound(clientCommand);
+        CommandAddPartitionToTxnResponse response = (CommandAddPartitionToTxnResponse) getResponse();
+
+        assertEquals(response.getRequestId(), 89L);
+        assertEquals(response.getTxnidLeastBits(), 1L);
+        assertEquals(response.getTxnidMostBits(), 12L);
+        assertFalse(response.hasError());
+        assertFalse(response.hasMessage());
+
+        channel.finish();
+    }
+
+    @Test(timeOut = 30000)
+    public void sendAddPartitionToTxnResponseFailed() throws Exception {
+        final TransactionMetadataStoreService txnStore = mock(TransactionMetadataStoreService.class);
+        when(txnStore.getTxnMeta(any())).thenReturn(CompletableFuture.completedFuture(mock(TxnMeta.class)));
+        when(txnStore.verifyTxnOwnership(any(), any())).thenReturn(CompletableFuture.completedFuture(true));
+        when(txnStore.addProducedPartitionToTxn(any(TxnID.class), any()))
+                .thenReturn(FutureUtil.failedFuture(new RuntimeException("server error")));
+        when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore);
+        svcConfig.setTransactionCoordinatorEnabled(true);
+        resetChannel();
+        setChannelConnected();
+        ByteBuf clientCommand = Commands.newAddPartitionToTxn(89L, 1L, 12L,
+                Lists.newArrayList("tenant/ns/topic1"));
+        channel.writeInbound(clientCommand);
+        CommandAddPartitionToTxnResponse response = (CommandAddPartitionToTxnResponse) getResponse();
+
+        assertEquals(response.getRequestId(), 89L);
+        assertEquals(response.getTxnidLeastBits(), 1L);
+        assertEquals(response.getTxnidMostBits(), 12L);
+        assertEquals(response.getError().getValue(), 0);
+        assertEquals(response.getMessage(), "server error");
+
+        channel.finish();
+    }
+
+    @Test(timeOut = 30000)
+    public void sendAddSubscriptionToTxnResponse() throws Exception {
+        final TransactionMetadataStoreService txnStore = mock(TransactionMetadataStoreService.class);
+        when(txnStore.getTxnMeta(any())).thenReturn(CompletableFuture.completedFuture(mock(TxnMeta.class)));
+        when(txnStore.verifyTxnOwnership(any(), any())).thenReturn(CompletableFuture.completedFuture(true));
+        when(txnStore.addAckedPartitionToTxn(any(TxnID.class), any()))
+                .thenReturn(CompletableFuture.completedFuture(null));
+        when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore);
+        svcConfig.setTransactionCoordinatorEnabled(true);
+        resetChannel();
+        setChannelConnected();
+        final  org.apache.pulsar.common.api.proto.Subscription sub =
+                new org.apache.pulsar.common.api.proto.Subscription();
+        sub.setTopic("topic1");
+        sub.setSubscription("sub1");
+        ByteBuf clientCommand = Commands.newAddSubscriptionToTxn(89L, 1L, 12L,
+                Lists.newArrayList(sub));
+        channel.writeInbound(clientCommand);
+        CommandAddSubscriptionToTxnResponse response = (CommandAddSubscriptionToTxnResponse) getResponse();
+
+        assertEquals(response.getRequestId(), 89L);
+        assertEquals(response.getTxnidLeastBits(), 1L);
+        assertEquals(response.getTxnidMostBits(), 12L);
+        assertFalse(response.hasError());
+        assertFalse(response.hasMessage());
+
+        channel.finish();
+    }
+
+    @Test(timeOut = 30000)
+    public void sendAddSubscriptionToTxnResponseFailed() throws Exception {
+        final TransactionMetadataStoreService txnStore = mock(TransactionMetadataStoreService.class);
+        when(txnStore.getTxnMeta(any())).thenReturn(CompletableFuture.completedFuture(mock(TxnMeta.class)));
+        when(txnStore.verifyTxnOwnership(any(), any())).thenReturn(CompletableFuture.completedFuture(true));
+        when(txnStore.addAckedPartitionToTxn(any(TxnID.class), any()))
+                .thenReturn(FutureUtil.failedFuture(new RuntimeException("server error")));
+        when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore);
+        svcConfig.setTransactionCoordinatorEnabled(true);
+        resetChannel();
+        setChannelConnected();
+        final  org.apache.pulsar.common.api.proto.Subscription sub =
+                new org.apache.pulsar.common.api.proto.Subscription();
+        sub.setTopic("topic1");
+        sub.setSubscription("sub1");
+        ByteBuf clientCommand = Commands.newAddSubscriptionToTxn(89L, 1L, 12L,
+                Lists.newArrayList(sub));
+        channel.writeInbound(clientCommand);
+        CommandAddSubscriptionToTxnResponse response = (CommandAddSubscriptionToTxnResponse) getResponse();
+
+        assertEquals(response.getRequestId(), 89L);
+        assertEquals(response.getTxnidLeastBits(), 1L);
+        assertEquals(response.getTxnidMostBits(), 12L);
+        assertEquals(response.getError().getValue(), 0);
+        assertEquals(response.getMessage(), "server error");
+
+        channel.finish();
+    }
+
+
+    @Test(timeOut = 30000)
+    public void sendEndTxnResponse() throws Exception {
+        final TransactionMetadataStoreService txnStore = mock(TransactionMetadataStoreService.class);
+        when(txnStore.getTxnMeta(any())).thenReturn(CompletableFuture.completedFuture(mock(TxnMeta.class)));
+        when(txnStore.verifyTxnOwnership(any(), any())).thenReturn(CompletableFuture.completedFuture(true));
+        when(txnStore.endTransaction(any(TxnID.class), anyInt(), anyBoolean()))
+                .thenReturn(CompletableFuture.completedFuture(null));
+        when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore);
+        svcConfig.setTransactionCoordinatorEnabled(true);
+        resetChannel();
+        setChannelConnected();
+        ByteBuf clientCommand = Commands.serializeWithSize(Commands.newEndTxn(89L, 1L, 12L,
+                TxnAction.COMMIT));
+        channel.writeInbound(clientCommand);
+        CommandEndTxnResponse response = (CommandEndTxnResponse) getResponse();
+
+        assertEquals(response.getRequestId(), 89L);
+        assertEquals(response.getTxnidLeastBits(), 1L);
+        assertEquals(response.getTxnidMostBits(), 12L);
+        assertFalse(response.hasError());
+        assertFalse(response.hasMessage());
+
+        channel.finish();
+    }
+
+    @Test(timeOut = 30000)
+    public void sendEndTxnResponseFailed() throws Exception {
+        final TransactionMetadataStoreService txnStore = mock(TransactionMetadataStoreService.class);
+        when(txnStore.getTxnMeta(any())).thenReturn(CompletableFuture.completedFuture(mock(TxnMeta.class)));
+        when(txnStore.verifyTxnOwnership(any(), any())).thenReturn(CompletableFuture.completedFuture(true));
+        when(txnStore.endTransaction(any(TxnID.class), anyInt(), anyBoolean()))
+                .thenReturn(FutureUtil.failedFuture(new RuntimeException("server error")));
+        when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore);
+        svcConfig.setTransactionCoordinatorEnabled(true);
+        resetChannel();
+        setChannelConnected();
+        ByteBuf clientCommand = Commands.serializeWithSize(Commands.newEndTxn(89L, 1L, 12L,
+                TxnAction.COMMIT));
+        channel.writeInbound(clientCommand);
+        CommandEndTxnResponse response = (CommandEndTxnResponse) getResponse();
+
+        assertEquals(response.getRequestId(), 89L);
+        assertEquals(response.getTxnidLeastBits(), 1L);
+        assertEquals(response.getTxnidMostBits(), 12L);
+        assertEquals(response.getError().getValue(), 0);
+        assertEquals(response.getMessage(), "server error");
+
+        channel.finish();
+    }
+
+    @Test(timeOut = 30000)
+    public void sendEndTxnOnPartitionResponse() throws Exception {
+        final TransactionMetadataStoreService txnStore = mock(TransactionMetadataStoreService.class);
+        when(txnStore.getTxnMeta(any())).thenReturn(CompletableFuture.completedFuture(mock(TxnMeta.class)));
+        when(txnStore.verifyTxnOwnership(any(), any())).thenReturn(CompletableFuture.completedFuture(true));
+        when(txnStore.endTransaction(any(TxnID.class), anyInt(), anyBoolean()))
+                .thenReturn(CompletableFuture.completedFuture(null));
+        when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore);
+
+        svcConfig.setTransactionCoordinatorEnabled(true);
+        resetChannel();
+        setChannelConnected();
+        Topic topic = mock(Topic.class);
+        doReturn(CompletableFuture.completedFuture(null)).when(topic).endTxn(any(TxnID.class), anyInt(), anyLong());
+        doReturn(CompletableFuture.completedFuture(Optional.of(topic))).when(brokerService)
+                .getTopicIfExists(any(String.class));
+        ByteBuf clientCommand = Commands.newEndTxnOnPartition(89L, 1L, 12L,
+                successTopicName, TxnAction.COMMIT, 1L);
+        channel.writeInbound(clientCommand);
+        CommandEndTxnOnPartitionResponse response = (CommandEndTxnOnPartitionResponse) getResponse();
+
+        assertEquals(response.getRequestId(), 89L);
+        assertEquals(response.getTxnidLeastBits(), 1L);
+        assertEquals(response.getTxnidMostBits(), 12L);
+        assertFalse(response.hasError());
+        assertFalse(response.hasMessage());
+
+        channel.finish();
+    }
+
+    @Test(timeOut = 30000)
+    public void sendEndTxnOnPartitionResponseFailed() throws Exception {
+        final TransactionMetadataStoreService txnStore = mock(TransactionMetadataStoreService.class);
+        when(txnStore.getTxnMeta(any())).thenReturn(CompletableFuture.completedFuture(mock(TxnMeta.class)));
+        when(txnStore.verifyTxnOwnership(any(), any())).thenReturn(CompletableFuture.completedFuture(true));
+        when(txnStore.endTransaction(any(TxnID.class), anyInt(), anyBoolean()))
+                .thenReturn(CompletableFuture.completedFuture(null));
+        when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore);
+
+        svcConfig.setTransactionCoordinatorEnabled(true);
+        resetChannel();
+        setChannelConnected();
+        Topic topic = mock(Topic.class);
+        doReturn(FutureUtil.failedFuture(new RuntimeException("server error"))).when(topic)
+                .endTxn(any(TxnID.class), anyInt(), anyLong());
+        doReturn(CompletableFuture.completedFuture(Optional.of(topic))).when(brokerService)
+                .getTopicIfExists(any(String.class));
+        ByteBuf clientCommand = Commands.newEndTxnOnPartition(89L, 1L, 12L,
+                successTopicName, TxnAction.COMMIT, 1L);
+        channel.writeInbound(clientCommand);
+        CommandEndTxnOnPartitionResponse response = (CommandEndTxnOnPartitionResponse) getResponse();
+
+        assertEquals(response.getRequestId(), 89L);
+        assertEquals(response.getTxnidLeastBits(), 1L);
+        assertEquals(response.getTxnidMostBits(), 12L);
+        assertEquals(response.getError().getValue(), 0);
+        assertEquals(response.getMessage(), "server error");
+
+        channel.finish();
+    }
+
+    @Test(timeOut = 30000)
+    public void sendEndTxnOnSubscription() throws Exception {
+        final TransactionMetadataStoreService txnStore = mock(TransactionMetadataStoreService.class);
+        when(txnStore.getTxnMeta(any())).thenReturn(CompletableFuture.completedFuture(mock(TxnMeta.class)));
+        when(txnStore.verifyTxnOwnership(any(), any())).thenReturn(CompletableFuture.completedFuture(true));
+        when(txnStore.endTransaction(any(TxnID.class), anyInt(), anyBoolean()))
+                .thenReturn(CompletableFuture.completedFuture(null));
+        when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore);
+
+        svcConfig.setTransactionCoordinatorEnabled(true);
+        resetChannel();
+        setChannelConnected();
+        Topic topic = mock(Topic.class);
+        final org.apache.pulsar.broker.service.Subscription sub =
+                mock(org.apache.pulsar.broker.service.Subscription.class);
+        doReturn(sub).when(topic).getSubscription(any());
+        doReturn(CompletableFuture.completedFuture(null))
+                .when(sub).endTxn(anyLong(), anyLong(), anyInt(), anyLong());
+        doReturn(CompletableFuture.completedFuture(Optional.of(topic))).when(brokerService)
+                .getTopicIfExists(any(String.class));
+
+        ByteBuf clientCommand = Commands.newEndTxnOnSubscription(89L, 1L, 12L,
+                successTopicName, successSubName, TxnAction.COMMIT, 1L);
+        channel.writeInbound(clientCommand);
+        CommandEndTxnOnSubscriptionResponse response = (CommandEndTxnOnSubscriptionResponse) getResponse();
+
+        assertEquals(response.getRequestId(), 89L);
+        assertEquals(response.getTxnidLeastBits(), 1L);
+        assertEquals(response.getTxnidMostBits(), 12L);
+        assertFalse(response.hasError());
+        assertFalse(response.hasMessage());
+
+        channel.finish();
+    }
+
+
+    @Test(timeOut = 30000)
+    public void sendEndTxnOnSubscriptionFailed() throws Exception {
+        final TransactionMetadataStoreService txnStore = mock(TransactionMetadataStoreService.class);
+        when(txnStore.getTxnMeta(any())).thenReturn(CompletableFuture.completedFuture(mock(TxnMeta.class)));
+        when(txnStore.verifyTxnOwnership(any(), any())).thenReturn(CompletableFuture.completedFuture(true));
+        when(txnStore.endTransaction(any(TxnID.class), anyInt(), anyBoolean()))
+                .thenReturn(CompletableFuture.completedFuture(null));
+        when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore);
+
+        svcConfig.setTransactionCoordinatorEnabled(true);
+        resetChannel();
+        setChannelConnected();
+        Topic topic = mock(Topic.class);
+
+        final org.apache.pulsar.broker.service.Subscription sub =
+                mock(org.apache.pulsar.broker.service.Subscription.class);
+        doReturn(sub).when(topic).getSubscription(any());
+        doReturn(FutureUtil.failedFuture(new RuntimeException("server error")))
+                .when(sub).endTxn(anyLong(), anyLong(), anyInt(), anyLong());
+        doReturn(CompletableFuture.completedFuture(Optional.of(topic))).when(brokerService)
+                .getTopicIfExists(any(String.class));
+
+        ByteBuf clientCommand = Commands.newEndTxnOnSubscription(89L, 1L, 12L,
+                successTopicName, successSubName, TxnAction.COMMIT, 1L);
+        channel.writeInbound(clientCommand);
+        CommandEndTxnOnSubscriptionResponse response = (CommandEndTxnOnSubscriptionResponse) getResponse();
+
+        assertEquals(response.getRequestId(), 89L);
+        assertEquals(response.getTxnidLeastBits(), 1L);
+        assertEquals(response.getTxnidMostBits(), 12L);
+        assertEquals(response.getError().getValue(), 0);
+        assertEquals(response.getMessage(), "Handle end txn on subscription failed: server error");
+
+        channel.finish();
+    }
+
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java
index e50a39c5dfd..236e6557ada 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java
@@ -105,9 +105,9 @@ public class TransactionMetadataStoreServiceTest extends BrokerTestBase {
                 .getStores().get(TransactionCoordinatorID.get(1)));
         checkTransactionMetadataStoreReady((MLTransactionMetadataStore) pulsar.getTransactionMetadataStoreService()
                 .getStores().get(TransactionCoordinatorID.get(2)));
-        TxnID txnID0 = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(0), 5).get();
-        TxnID txnID1 = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(1), 5).get();
-        TxnID txnID2 = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(2), 5).get();
+        TxnID txnID0 = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(0), 5, null).get();
+        TxnID txnID1 = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(1), 5, null).get();
+        TxnID txnID2 = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(2), 5, null).get();
         Assert.assertEquals(txnID0.getMostSigBits(), 0);
         Assert.assertEquals(txnID1.getMostSigBits(), 1);
         Assert.assertEquals(txnID2.getMostSigBits(), 2);
@@ -129,7 +129,7 @@ public class TransactionMetadataStoreServiceTest extends BrokerTestBase {
                         .getStores().get(TransactionCoordinatorID.get(0));
 
         checkTransactionMetadataStoreReady(transactionMetadataStore);
-        TxnID txnID = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(0), 5000).get();
+        TxnID txnID = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(0), 5000, null).get();
         List<String> partitions = new ArrayList<>();
         partitions.add("ptn-0");
         partitions.add("ptn-1");
@@ -152,7 +152,7 @@ public class TransactionMetadataStoreServiceTest extends BrokerTestBase {
                 (MLTransactionMetadataStore) pulsar.getTransactionMetadataStoreService()
                         .getStores().get(TransactionCoordinatorID.get(0));
         checkTransactionMetadataStoreReady(transactionMetadataStore);
-        TxnID txnID = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(0), 5000).get();
+        TxnID txnID = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(0), 5000, null).get();
         List<TransactionSubscription> partitions = new ArrayList<>();
         partitions.add(TransactionSubscription.builder().topic("ptn-1").subscription("sub-1").build());
         partitions.add(TransactionSubscription.builder().topic("ptn-2").subscription("sub-1").build());
@@ -181,7 +181,7 @@ public class TransactionMetadataStoreServiceTest extends BrokerTestBase {
         int i = -1;
         while (++i < 1000) {
             try {
-                transactionMetadataStore.newTransaction(2000).get();
+                newTransactionWithTimeoutOf(2000);
             } catch (Exception e) {
                 //no operation
             }
@@ -193,6 +193,14 @@ public class TransactionMetadataStoreServiceTest extends BrokerTestBase {
                 .until(() -> txnMap.size() == 0);
     }
 
+    private TxnID newTransactionWithTimeoutOf(long timeout)
+            throws InterruptedException, ExecutionException {
+        MLTransactionMetadataStore transactionMetadataStore =
+                (MLTransactionMetadataStore) pulsar.getTransactionMetadataStoreService()
+                        .getStores().get(TransactionCoordinatorID.get(0));
+        return transactionMetadataStore.newTransaction(timeout, null).get();
+    }
+
     @Test
     public void testTimeoutTrackerExpired() throws Exception {
         pulsar.getTransactionMetadataStoreService().handleTcClientConnect(TransactionCoordinatorID.get(0));
@@ -207,7 +215,7 @@ public class TransactionMetadataStoreServiceTest extends BrokerTestBase {
         ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>> txnMap =
                 (ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>>) field.get(transactionMetadataStore);
 
-        transactionMetadataStore.newTransaction(2000).get();
+        newTransactionWithTimeoutOf(2000);
 
         assertEquals(txnMap.size(), 1);
 
@@ -215,7 +223,7 @@ public class TransactionMetadataStoreServiceTest extends BrokerTestBase {
                 Assert.assertEquals(txnMetaListPair.getLeft().status(), TxnStatus.OPEN));
         Awaitility.await().atLeast(1000, TimeUnit.MICROSECONDS).until(() -> txnMap.size() == 0);
 
-        transactionMetadataStore.newTransaction(2000).get();
+        newTransactionWithTimeoutOf(2000);
         assertEquals(txnMap.size(), 1);
 
         txnMap.forEach((txnID, txnMetaListPair) ->
@@ -242,7 +250,7 @@ public class TransactionMetadataStoreServiceTest extends BrokerTestBase {
             int i = -1;
             while (++i < 100) {
                 try {
-                    transactionMetadataStore.newTransaction(1000);
+                    newTransactionWithTimeoutOf(1000);
                 } catch (Exception e) {
                     //no operation
                 }
@@ -253,7 +261,7 @@ public class TransactionMetadataStoreServiceTest extends BrokerTestBase {
             int i = -1;
             while (++i < 100) {
                 try {
-                    transactionMetadataStore.newTransaction(2000);
+                    newTransactionWithTimeoutOf(2000);
                 } catch (Exception e) {
                     //no operation
                 }
@@ -264,7 +272,7 @@ public class TransactionMetadataStoreServiceTest extends BrokerTestBase {
             int i = -1;
             while (++i < 100) {
                 try {
-                    transactionMetadataStore.newTransaction(3000);
+                    newTransactionWithTimeoutOf(3000);
                 } catch (Exception e) {
                     //no operation
                 }
@@ -275,7 +283,7 @@ public class TransactionMetadataStoreServiceTest extends BrokerTestBase {
             int i = -1;
             while (++i < 100) {
                 try {
-                    transactionMetadataStore.newTransaction(4000);
+                    newTransactionWithTimeoutOf(4000);
                 } catch (Exception e) {
                     //no operation
                 }
@@ -305,7 +313,7 @@ public class TransactionMetadataStoreServiceTest extends BrokerTestBase {
                         .getStores().get(TransactionCoordinatorID.get(0));
         checkTransactionMetadataStoreReady(transactionMetadataStore);
 
-        transactionMetadataStore.newTransaction(timeout);
+        newTransactionWithTimeoutOf(2000);
 
         pulsar.getTransactionMetadataStoreService()
                 .removeTransactionMetadataStore(TransactionCoordinatorID.get(0));
@@ -346,7 +354,7 @@ public class TransactionMetadataStoreServiceTest extends BrokerTestBase {
 
         checkTransactionMetadataStoreReady(transactionMetadataStore);
 
-        TxnID txnID = transactionMetadataStore.newTransaction(timeOut - 2000).get();
+        TxnID txnID = newTransactionWithTimeoutOf(timeOut - 2000);
         TxnMeta txnMeta = transactionMetadataStore.getTxnMeta(txnID).get();
         txnMeta.updateTxnStatus(txnStatus, TxnStatus.OPEN);
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
index 0c9f877150a..2eafd8a5a7f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
@@ -99,11 +99,11 @@ public class TransactionMetricsTest extends BrokerTestBase {
         Awaitility.await().until(() ->
                 pulsar.getTransactionMetadataStoreService().getStores().size() == 2);
         pulsar.getTransactionMetadataStoreService().getStores()
-                .get(transactionCoordinatorIDOne).newTransaction(timeout).get();
+                .get(transactionCoordinatorIDOne).newTransaction(timeout, null).get();
         pulsar.getTransactionMetadataStoreService().getStores()
-                .get(transactionCoordinatorIDTwo).newTransaction(timeout).get();
+                .get(transactionCoordinatorIDTwo).newTransaction(timeout, null).get();
         pulsar.getTransactionMetadataStoreService().getStores()
-                .get(transactionCoordinatorIDTwo).newTransaction(timeout).get();
+                .get(transactionCoordinatorIDTwo).newTransaction(timeout, null).get();
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
         PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut);
         String metricsStr = statsOut.toString();
@@ -191,7 +191,7 @@ public class TransactionMetricsTest extends BrokerTestBase {
         metric.forEach(item -> assertEquals(item.value, txnCount / 2));
 
         TxnID txnID = pulsar.getTransactionMetadataStoreService().getStores()
-                .get(transactionCoordinatorIDOne).newTransaction(1000).get();
+                .get(transactionCoordinatorIDOne).newTransaction(1000, null).get();
 
         Awaitility.await().atMost(2000, TimeUnit.MILLISECONDS).until(() -> {
             try {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/AuthenticatedTransactionProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/AuthenticatedTransactionProducerConsumerTest.java
new file mode 100644
index 00000000000..000080ff454
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/AuthenticatedTransactionProducerConsumerTest.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import io.jsonwebtoken.Jwts;
+import io.jsonwebtoken.SignatureAlgorithm;
+
+import java.security.KeyPair;
+import java.security.KeyPairGenerator;
+import java.security.NoSuchAlgorithmException;
+import java.security.PrivateKey;
+import java.time.Duration;
+import java.util.Base64;
+import java.util.Date;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.transaction.Transaction;
+import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.functions.utils.Exceptions;
+import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
+import org.apache.pulsar.transaction.coordinator.TxnMeta;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+/**
+ * Test for consuming transaction messages.
+ */
+@Slf4j
+@Test(groups = "broker")
+public class AuthenticatedTransactionProducerConsumerTest extends TransactionTestBase {
+
+    private static final String TOPIC = NAMESPACE1 + "/txn-auth";
+
+    private final String ADMIN_TOKEN;
+    private final String TOKEN_PUBLIC_KEY;
+    private final KeyPair kp;
+
+    AuthenticatedTransactionProducerConsumerTest() throws NoSuchAlgorithmException {
+        KeyPairGenerator kpg = KeyPairGenerator.getInstance("RSA");
+        kp = kpg.generateKeyPair();
+
+        byte[] encodedPublicKey = kp.getPublic().getEncoded();
+        TOKEN_PUBLIC_KEY = "data:;base64," + Base64.getEncoder().encodeToString(encodedPublicKey);
+        ADMIN_TOKEN = generateToken(kp, "admin");
+    }
+
+
+    private String generateToken(KeyPair kp, String subject) {
+        PrivateKey pkey = kp.getPrivate();
+        long expMillis = System.currentTimeMillis() + Duration.ofHours(1).toMillis();
+        Date exp = new Date(expMillis);
+
+        return Jwts.builder()
+                .setSubject(subject)
+                .setExpiration(exp)
+                .signWith(pkey, SignatureAlgorithm.forSigningKey(pkey))
+                .compact();
+    }
+
+    @BeforeMethod(alwaysRun = true)
+    public void setup() throws Exception {
+        conf.setAuthenticationEnabled(true);
+        conf.setAuthorizationEnabled(true);
+
+        Set<String> superUserRoles = new HashSet<>();
+        superUserRoles.add("admin");
+        conf.setSuperUserRoles(superUserRoles);
+
+        Set<String> providers = new HashSet<>();
+        providers.add(AuthenticationProviderToken.class.getName());
+        conf.setAuthenticationProviders(providers);
+
+        // Set provider domain name
+        Properties properties = new Properties();
+        properties.setProperty("tokenPublicKey", TOKEN_PUBLIC_KEY);
+
+        conf.setProperties(properties);
+        conf.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName());
+        conf.setBrokerClientAuthenticationParameters("token:" + ADMIN_TOKEN);
+        setBrokerCount(1);
+        internalSetup();
+        setUpBase(1, 1, TOPIC, 1);
+
+        grantTxnLookupToRole("client");
+        admin.namespaces().grantPermissionOnNamespace(NAMESPACE1, "client",
+                EnumSet.allOf(AuthAction.class));
+        grantTxnLookupToRole("client2");
+    }
+
+    @SneakyThrows
+    private void grantTxnLookupToRole(String role) {
+        admin.namespaces().grantPermissionOnNamespace(
+                NamespaceName.SYSTEM_NAMESPACE.toString(),
+                role,
+                Sets.newHashSet(AuthAction.consume));
+    }
+
+    @Override
+    protected PulsarClient createNewPulsarClient(ClientBuilder clientBuilder) throws PulsarClientException {
+        return clientBuilder
+                .enableTransaction(true)
+                .authentication(AuthenticationFactory.token(ADMIN_TOKEN))
+                .build();
+    }
+
+    @Override
+    protected PulsarAdmin createNewPulsarAdmin(PulsarAdminBuilder builder) throws PulsarClientException {
+        return builder
+                .authentication(AuthenticationFactory.token(ADMIN_TOKEN))
+                .build();
+    }
+
+    @AfterMethod(alwaysRun = true)
+    protected void cleanup() {
+        super.internalCleanup();
+    }
+
+    @DataProvider(name = "actors")
+    public Object[][] actors() {
+        return new Object[][]{
+                {"client", true},
+                {"client", false},
+                {"client2", true},
+                {"client2", false},
+                {"admin", true},
+                {"admin", false}
+        };
+    }
+
+    @Test(dataProvider = "actors")
+    public void testEndTxn(String actor, boolean afterUnload) throws Exception {
+        @Cleanup final PulsarClient pulsarClientOwner = PulsarClient.builder()
+                .serviceUrl(pulsarServiceList.get(0).getBrokerServiceUrl())
+                .authentication(AuthenticationFactory.token(generateToken(kp, "client")))
+                .enableTransaction(true)
+                .build();
+
+        @Cleanup final PulsarClient pulsarClientOther = PulsarClient.builder()
+                .serviceUrl(pulsarServiceList.get(0).getBrokerServiceUrl())
+                .authentication(AuthenticationFactory.token(generateToken(kp, actor)))
+                .enableTransaction(true)
+                .build();
+        Transaction transaction = pulsarClientOwner.newTransaction()
+                .withTransactionTimeout(60, TimeUnit.SECONDS).build().get();
+
+        @Cleanup final Consumer<String> consumer = pulsarClientOwner
+                .newConsumer(Schema.STRING)
+                .subscriptionName("test")
+                .topic(TOPIC)
+                .subscribe();
+
+
+        @Cleanup final Producer<String> producer = pulsarClientOwner
+                .newProducer(Schema.STRING)
+                .sendTimeout(60, TimeUnit.SECONDS)
+                .topic(TOPIC)
+                .create();
+
+        producer.newMessage().value("beforetxn").send();
+        consumer.acknowledgeAsync(consumer.receive(5, TimeUnit.SECONDS).getMessageId(), transaction);
+        producer.newMessage(transaction).value("message").send();
+        if (afterUnload) {
+            pulsarServiceList.get(0)
+                    .getTransactionMetadataStoreService()
+                    .removeTransactionMetadataStore(
+                            TransactionCoordinatorID.get(transaction.getTxnID().getMostSigBits()));
+        }
+
+        final Throwable ex = syncGetException((
+                (PulsarClientImpl) pulsarClientOther).getTcClient().commitAsync(transaction.getTxnID())
+        );
+        if (actor.equals("client") || actor.equals("admin")) {
+            Assert.assertNull(ex);
+            Assert.assertEquals(consumer.receive(5, TimeUnit.SECONDS).getValue(), "message");
+        } else {
+            Assert.assertNotNull(ex);
+            Assert.assertTrue(ex instanceof TransactionCoordinatorClientException, ex.getClass().getName());
+            Assert.assertNull(consumer.receive(5, TimeUnit.SECONDS));
+            transaction.commit().get();
+            Assert.assertEquals(consumer.receive(5, TimeUnit.SECONDS).getValue(), "message");
+        }
+    }
+
+    @Test(dataProvider = "actors")
+    public void testAddPartitionToTxn(String actor, boolean afterUnload) throws Exception {
+        @Cleanup final PulsarClient pulsarClientOwner = PulsarClient.builder()
+                .serviceUrl(pulsarServiceList.get(0).getBrokerServiceUrl())
+                .authentication(AuthenticationFactory.token(generateToken(kp, "client")))
+                .enableTransaction(true)
+                .build();
+
+        @Cleanup final PulsarClient pulsarClientOther = PulsarClient.builder()
+                .serviceUrl(pulsarServiceList.get(0).getBrokerServiceUrl())
+                .authentication(AuthenticationFactory.token(generateToken(kp, actor)))
+                .enableTransaction(true)
+                .build();
+        Transaction transaction = pulsarClientOwner.newTransaction()
+                .withTransactionTimeout(60, TimeUnit.SECONDS).build().get();
+
+        if (afterUnload) {
+            pulsarServiceList.get(0)
+                    .getTransactionMetadataStoreService()
+                    .removeTransactionMetadataStore(
+                            TransactionCoordinatorID.get(transaction.getTxnID().getMostSigBits()));
+        }
+
+        final Throwable ex = syncGetException(((PulsarClientImpl) pulsarClientOther)
+                .getTcClient().addPublishPartitionToTxnAsync(transaction.getTxnID(), Lists.newArrayList(TOPIC)));
+
+        final TxnMeta txnMeta = pulsarServiceList.get(0).getTransactionMetadataStoreService()
+                .getTxnMeta(transaction.getTxnID()).get();
+        if (actor.equals("client") || actor.equals("admin")) {
+            Assert.assertNull(ex);
+            Assert.assertEquals(txnMeta.producedPartitions(), Lists.newArrayList(TOPIC));
+        } else {
+            Assert.assertNotNull(ex);
+            Assert.assertTrue(ex instanceof TransactionCoordinatorClientException);
+            Assert.assertTrue(txnMeta.producedPartitions().isEmpty());
+        }
+    }
+
+    @Test(dataProvider = "actors")
+    public void testAddSubscriptionToTxn(String actor, boolean afterUnload) throws Exception {
+        @Cleanup final PulsarClient pulsarClientOwner = PulsarClient.builder()
+                .serviceUrl(pulsarServiceList.get(0).getBrokerServiceUrl())
+                .authentication(AuthenticationFactory.token(generateToken(kp, "client")))
+                .enableTransaction(true)
+                .build();
+
+        @Cleanup final PulsarClient pulsarClientOther = PulsarClient.builder()
+                .serviceUrl(pulsarServiceList.get(0).getBrokerServiceUrl())
+                .authentication(AuthenticationFactory.token(generateToken(kp, actor)))
+                .enableTransaction(true)
+                .build();
+        Transaction transaction = pulsarClientOwner.newTransaction()
+                .withTransactionTimeout(60, TimeUnit.SECONDS).build().get();
+
+        if (afterUnload) {
+            pulsarServiceList.get(0)
+                    .getTransactionMetadataStoreService()
+                    .removeTransactionMetadataStore(
+                            TransactionCoordinatorID.get(transaction.getTxnID().getMostSigBits()));
+        }
+
+
+        final Throwable ex = syncGetException(((PulsarClientImpl) pulsarClientOther)
+                .getTcClient().addSubscriptionToTxnAsync(transaction.getTxnID(), TOPIC, "sub"));
+
+        final TxnMeta txnMeta = pulsarServiceList.get(0).getTransactionMetadataStoreService()
+                .getTxnMeta(transaction.getTxnID()).get();
+        if (actor.equals("client") || actor.equals("admin")) {
+            Assert.assertNull(ex);
+            Assert.assertEquals(txnMeta.ackedPartitions().size(), 1);
+        } else {
+            Assert.assertNotNull(ex);
+            Assert.assertTrue(ex instanceof TransactionCoordinatorClientException);
+            Assert.assertTrue(txnMeta.ackedPartitions().isEmpty());
+        }
+    }
+
+    @Test
+    public void testNoAuth() throws Exception {
+        try {
+            @Cleanup final PulsarClient pulsarClient = PulsarClient.builder()
+                    .serviceUrl(pulsarServiceList.get(0).getBrokerServiceUrl())
+                    .enableTransaction(true)
+                    .build();
+            Assert.fail("should have failed");
+        } catch (Exception t) {
+            Assert.assertTrue(Exceptions.areExceptionsPresentInChain(t,
+                    PulsarClientException.AuthenticationException.class));
+        }
+    }
+
+    private static Throwable syncGetException(CompletableFuture<?> future) {
+        try {
+            future.get();
+        } catch (InterruptedException e) {
+            return e;
+        } catch (ExecutionException e) {
+            return FutureUtil.unwrapCompletionException(e);
+        }
+        return null;
+    }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
index 7510f06c3bc..25c555f09b9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
@@ -48,7 +48,10 @@ import org.apache.pulsar.broker.auth.SameThreadOrderedSafeExecutor;
 import org.apache.pulsar.broker.intercept.CounterBrokerInterceptor;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
@@ -93,7 +96,9 @@ public abstract class TransactionTestBase extends TestRetrySupport {
         if (admin != null) {
             admin.close();
         }
-        admin = spy(PulsarAdmin.builder().serviceHttpUrl(pulsarServiceList.get(0).getWebServiceAddress()).build());
+        admin = spy(
+                createNewPulsarAdmin(PulsarAdmin.builder().serviceHttpUrl(pulsarServiceList.get(0).getWebServiceAddress()))
+        );
 
         if (pulsarClient != null) {
             pulsarClient.shutdown();
@@ -111,6 +116,15 @@ public abstract class TransactionTestBase extends TestRetrySupport {
         mockBookKeeper = createMockBookKeeper(bkExecutor);
         startBroker();
     }
+
+    protected PulsarClient createNewPulsarClient(ClientBuilder clientBuilder) throws PulsarClientException {
+        return clientBuilder.build();
+    }
+
+    protected PulsarAdmin createNewPulsarAdmin(PulsarAdminBuilder builder) throws PulsarClientException {
+        return builder.build();
+    }
+
     protected void setUpBase(int numBroker,int numPartitionsOfTC, String topic, int numPartitions) throws Exception{
         setBrokerCount(numBroker);
         internalSetup();
@@ -137,11 +151,10 @@ public abstract class TransactionTestBase extends TestRetrySupport {
         if (pulsarClient != null) {
             pulsarClient.shutdown();
         }
-        pulsarClient = PulsarClient.builder()
+        pulsarClient = createNewPulsarClient(PulsarClient.builder()
                 .serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl())
                 .statsInterval(0, TimeUnit.SECONDS)
-                .enableTransaction(true)
-                .build();
+                .enableTransaction(true));
     }
 
     protected void startBroker() throws Exception {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
index 6b0f122d741..e3bbe1ad97b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
@@ -861,7 +861,7 @@ public class TransactionEndToEndTest extends TransactionTestBase {
     @Test
     public void testTxnTimeoutAtTransactionMetadataStore() throws Exception{
         TxnID txnID = pulsarServiceList.get(0).getTransactionMetadataStoreService()
-                .newTransaction(new TransactionCoordinatorID(0), 1).get();
+                .newTransaction(new TransactionCoordinatorID(0), 1, null).get();
         Awaitility.await().until(() -> {
             try {
                getPulsarServiceList().get(0).getTransactionMetadataStoreService().getTxnMeta(txnID).get();
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionMetadata.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionMetadata.java
index 4f987014c74..824a0818763 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionMetadata.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionMetadata.java
@@ -41,4 +41,7 @@ public class TransactionMetadata {
 
     /** The ackedPartitions of this transaction. */
     public Map<String, Map<String, TransactionInPendingAckStats>> ackedPartitions;
+
+    /** The owner of this transaction. */
+    public String owner;
 }
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStore.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStore.java
index bbffe80120f..dd592955404 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStore.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStore.java
@@ -56,11 +56,12 @@ public interface TransactionMetadataStore {
      * Create a new transaction in the transaction metadata store.
      *
      * @param timeoutInMills the timeout duration of the transaction in mills
+*      @param owner the role which is the owner of the transaction
      * @return a future represents the result of creating a new transaction.
      *         it returns {@link TxnID} as the identifier for identifying the
      *         transaction.
      */
-    CompletableFuture<TxnID> newTransaction(long timeoutInMills);
+    CompletableFuture<TxnID> newTransaction(long timeoutInMills, String owner);
 
     /**
      * Add the produced partitions to transaction identified by <tt>txnid</tt>.
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TxnMeta.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TxnMeta.java
index 4d6d83dfb0f..3d680514930 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TxnMeta.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TxnMeta.java
@@ -107,4 +107,11 @@ public interface TxnMeta {
      * @return transaction timeout at.
      */
     long getTimeoutAt();
+
+    /**
+     * Return the transaction's owner.
+     *
+     * @return transaction's owner.
+     */
+    String getOwner();
 }
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java
index ba90c06b27d..11378697590 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java
@@ -24,8 +24,10 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.LongAdder;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.common.policies.data.TransactionCoordinatorStats;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
 import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
 import org.apache.pulsar.transaction.coordinator.TransactionSubscription;
@@ -73,12 +75,17 @@ class InMemTransactionMetadataStore implements TransactionMetadataStore {
     }
 
     @Override
-    public CompletableFuture<TxnID> newTransaction(long timeoutInMills) {
+    public CompletableFuture<TxnID> newTransaction(long timeoutInMills, String owner) {
+        if (owner != null) {
+            if (StringUtils.isBlank(owner)) {
+                return FutureUtil.failedFuture(new IllegalArgumentException("Owner can't be blank"));
+            }
+        }
         TxnID txnID = new TxnID(
             tcID.getId(),
             localID.getAndIncrement()
         );
-        TxnMetaImpl txn = new TxnMetaImpl(txnID, System.currentTimeMillis(), timeoutInMills);
+        TxnMetaImpl txn = new TxnMetaImpl(txnID, System.currentTimeMillis(), timeoutInMills, owner);
         transactions.put(txnID, txn);
         return CompletableFuture.completedFuture(txnID);
     }
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
index 273a01850cb..ea7c38444bc 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
@@ -32,6 +32,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.LongAdder;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.Position;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.MutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
@@ -142,8 +143,11 @@ public class MLTransactionMetadataStore
                                     positions.add(position);
                                     long openTimestamp = transactionMetadataEntry.getStartTime();
                                     long timeoutAt = transactionMetadataEntry.getTimeoutMs();
-                                    txnMetaMap.put(transactionId, MutablePair.of(new TxnMetaImpl(txnID,
-                                            openTimestamp, timeoutAt), positions));
+                                    final String owner = transactionMetadataEntry.hasOwner()
+                                            ? transactionMetadataEntry.getOwner() : null;
+                                    final TxnMetaImpl left = new TxnMetaImpl(txnID,
+                                            openTimestamp, timeoutAt, owner);
+                                    txnMetaMap.put(transactionId, MutablePair.of(left, positions));
                                     recoverTracker.handleOpenStatusTransaction(txnSequenceId,
                                             timeoutAt + openTimestamp);
                                 }
@@ -217,7 +221,7 @@ public class MLTransactionMetadataStore
     }
 
     @Override
-    public CompletableFuture<TxnID> newTransaction(long timeOut) {
+    public CompletableFuture<TxnID> newTransaction(long timeOut, String owner) {
         CompletableFuture<TxnID> completableFuture = new CompletableFuture<>();
         FutureUtil.safeRunAsync(() -> {
             if (!checkIfReady()) {
@@ -238,13 +242,20 @@ public class MLTransactionMetadataStore
                     .setMetadataOp(TransactionMetadataEntry.TransactionMetadataOp.NEW)
                     .setLastModificationTime(currentTimeMillis)
                     .setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
+            if (owner != null) {
+                if (StringUtils.isBlank(owner)) {
+                    completableFuture.completeExceptionally(new IllegalArgumentException("Owner can't be blank"));
+                    return;
+                }
+                transactionMetadataEntry.setOwner(owner);
+            }
             transactionLog.append(transactionMetadataEntry)
                     .whenComplete((position, throwable) -> {
                         if (throwable != null) {
                             completableFuture.completeExceptionally(throwable);
                         } else {
                             appendLogCount.increment();
-                            TxnMeta txn = new TxnMetaImpl(txnID, currentTimeMillis, timeOut);
+                            TxnMeta txn = new TxnMetaImpl(txnID, currentTimeMillis, timeOut, owner);
                             List<Position> positions = new ArrayList<>();
                             positions.add(position);
                             Pair<TxnMeta, List<Position>> pair = MutablePair.of(txn, positions);
@@ -288,9 +299,9 @@ public class MLTransactionMetadataStore
                                 promise.complete(null);
                             } catch (InvalidTxnStatusException e) {
                                 transactionLog.deletePosition(Collections.singletonList(position));
-                                log.error("TxnID : " + txnMetaListPair.getLeft().id().toString()
-                                        + " add produced partition error with TxnStatus : "
-                                        + txnMetaListPair.getLeft().status().name(), e);
+                                log.error("TxnID {} add produced partition error"
+                                                + " with TxnStatus: {}", txnMetaListPair.getLeft().id().toString()
+                                        , txnMetaListPair.getLeft().status().name(), e);
                                 promise.completeExceptionally(e);
                             }
                         });
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnMetaImpl.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnMetaImpl.java
index a3d62b1e05d..0361ee1828f 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnMetaImpl.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnMetaImpl.java
@@ -42,11 +42,13 @@ class TxnMetaImpl implements TxnMeta {
     private volatile TxnStatus txnStatus = TxnStatus.OPEN;
     private final long openTimestamp;
     private final long timeoutAt;
+    private final String owner;
 
-    TxnMetaImpl(TxnID txnID, long openTimestamp, long timeoutAt) {
+    TxnMetaImpl(TxnID txnID, long openTimestamp, long timeoutAt, String owner) {
         this.txnID = txnID;
         this.openTimestamp = openTimestamp;
         this.timeoutAt = timeoutAt;
+        this.owner = owner;
     }
 
     @Override
@@ -161,4 +163,8 @@ class TxnMetaImpl implements TxnMeta {
         return this.timeoutAt;
     }
 
+    @Override
+    public String getOwner() {
+        return this.owner;
+    }
 }
diff --git a/pulsar-transaction/coordinator/src/main/proto/PulsarTransactionMetadata.proto b/pulsar-transaction/coordinator/src/main/proto/PulsarTransactionMetadata.proto
index 828fc724795..0a60c73af4c 100644
--- a/pulsar-transaction/coordinator/src/main/proto/PulsarTransactionMetadata.proto
+++ b/pulsar-transaction/coordinator/src/main/proto/PulsarTransactionMetadata.proto
@@ -51,4 +51,5 @@ message TransactionMetadataEntry {
   optional uint64 start_time      = 9;
   optional uint64 last_modification_time = 10;
   optional uint64 max_local_txn_id = 11;
+  optional string owner          = 12;
 }
\ No newline at end of file
diff --git a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
index aafe54e6069..26b0c1f7dc9 100644
--- a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
+++ b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
@@ -39,14 +39,12 @@ import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
-
 import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-
 import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.State.WriteFailed;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
@@ -85,7 +83,7 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
                 break;
             }
             if (transactionMetadataStore.checkIfReady()) {
-                TxnID txnID = transactionMetadataStore.newTransaction(5000).get();
+                TxnID txnID = transactionMetadataStore.newTransaction(5000, null).get();
                 assertEquals(transactionMetadataStore.getTxnStatus(txnID).get(), TxnStatus.OPEN);
 
                 List<String> partitions = new ArrayList<>();
@@ -154,7 +152,7 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
         transactionMetadataStore.init(new TransactionRecoverTrackerImpl()).get();
 
         Awaitility.await().until(transactionMetadataStore::checkIfReady);
-        TxnID txnID = transactionMetadataStore.newTransaction(20000).get();
+        TxnID txnID = transactionMetadataStore.newTransaction(20000, null).get();
         transactionMetadataStore.updateTxnStatus(txnID, TxnStatus.COMMITTING, TxnStatus.OPEN, false).get();
         if (isUseManagedLedgerProperties) {
             transactionMetadataStore.updateTxnStatus(txnID, TxnStatus.COMMITTED, TxnStatus.COMMITTING, false).get();
@@ -183,7 +181,7 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
         transactionMetadataStore.init(new TransactionRecoverTrackerImpl()).get();
 
         Awaitility.await().until(transactionMetadataStore::checkIfReady);
-        txnID = transactionMetadataStore.newTransaction(100000).get();
+        txnID = transactionMetadataStore.newTransaction(100000, null).get();
         assertEquals(txnID.getLeastSigBits(), 1);
     }
 
@@ -214,8 +212,8 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
                 break;
             }
             if (transactionMetadataStore.checkIfReady()) {
-                TxnID txnID1 = transactionMetadataStore.newTransaction(1000).get();
-                TxnID txnID2 = transactionMetadataStore.newTransaction(1000).get();
+                TxnID txnID1 = transactionMetadataStore.newTransaction(1000, "user1").get();
+                TxnID txnID2 = transactionMetadataStore.newTransaction(1000, "user2").get();
                 assertEquals(transactionMetadataStore.getTxnStatus(txnID1).get(), TxnStatus.OPEN);
                 assertEquals(transactionMetadataStore.getTxnStatus(txnID2).get(), TxnStatus.OPEN);
 
@@ -267,6 +265,9 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
                         assertEquals(txnMeta2.ackedPartitions().size(), subscriptions.size());
                         Assert.assertTrue(subscriptions.containsAll(txnMeta1.ackedPartitions()));
                         Assert.assertTrue(subscriptions.containsAll(txnMeta2.ackedPartitions()));
+
+                        assertEquals(txnMeta1.getOwner(), "user1");
+                        assertEquals(txnMeta2.getOwner(), "user2");
                         assertEquals(txnMeta1.status(), TxnStatus.COMMITTING);
                         assertEquals(txnMeta2.status(), TxnStatus.COMMITTING);
                         transactionMetadataStoreTest
@@ -286,7 +287,7 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
                         } catch (ExecutionException e) {
                             Assert.assertTrue(e.getCause() instanceof TransactionNotFoundException);
                         }
-                        TxnID txnID = transactionMetadataStoreTest.newTransaction(1000).get();
+                        TxnID txnID = transactionMetadataStoreTest.newTransaction(1000, null).get();
                         assertEquals(txnID.getLeastSigBits(), 2L);
                         break;
                     } else {
@@ -327,8 +328,8 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
                 break;
             }
             if (transactionMetadataStore.checkIfReady()) {
-                TxnID txnID1 = transactionMetadataStore.newTransaction(1000).get();
-                TxnID txnID2 = transactionMetadataStore.newTransaction(1000).get();
+                TxnID txnID1 = transactionMetadataStore.newTransaction(1000, null).get();
+                TxnID txnID2 = transactionMetadataStore.newTransaction(1000, null).get();
                 assertEquals(transactionMetadataStore.getTxnStatus(txnID1).get(), TxnStatus.OPEN);
                 assertEquals(transactionMetadataStore.getTxnStatus(txnID2).get(), TxnStatus.OPEN);
 
@@ -391,9 +392,9 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
         Awaitility.await().until(transactionMetadataStore::checkIfReady);
 
         // txnID1 have not deleted from cursor, we can recover from transaction log
-        TxnID txnID1 = transactionMetadataStore.newTransaction(1000).get();
+        TxnID txnID1 = transactionMetadataStore.newTransaction(1000, null).get();
         // txnID2 have deleted from cursor.
-        TxnID txnID2 = transactionMetadataStore.newTransaction(1000).get();
+        TxnID txnID2 = transactionMetadataStore.newTransaction(1000, null).get();
 
         transactionMetadataStore.updateTxnStatus(txnID2, TxnStatus.ABORTING, TxnStatus.OPEN, false).get();
         transactionMetadataStore.updateTxnStatus(txnID2, TxnStatus.ABORTED, TxnStatus.ABORTING, false).get();
@@ -429,7 +430,7 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
         transactionMetadataStore.init(new TransactionRecoverTrackerImpl()).get();
 
         Awaitility.await().until(transactionMetadataStore::checkIfReady);
-        transactionMetadataStore.newTransaction(5000).get();
+        transactionMetadataStore.newTransaction(5000, null).get();
         Field field = MLTransactionLogImpl.class.getDeclaredField("managedLedger");
         field.setAccessible(true);
         ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) field.get(mlTransactionLog);
@@ -438,12 +439,12 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
         AtomicReferenceFieldUpdater state = (AtomicReferenceFieldUpdater) field.get(managedLedger);
         state.set(managedLedger, WriteFailed);
         try {
-            transactionMetadataStore.newTransaction(5000).get();
+            transactionMetadataStore.newTransaction(5000, null).get();
             fail();
         } catch (ExecutionException e) {
             assertTrue(e.getCause() instanceof ManagedLedgerException.ManagedLedgerAlreadyClosedException);
         }
-        transactionMetadataStore.newTransaction(5000).get();
+        transactionMetadataStore.newTransaction(5000, null).get();
 
     }
 
diff --git a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java
index a54caaf16c1..3f046240c11 100644
--- a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java
+++ b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java
@@ -79,14 +79,14 @@ public class TransactionMetadataStoreProviderTest {
 
     @Test
     public void testGetTxnStatusSuccess() throws Exception {
-        TxnID txnID = this.store.newTransaction(0L).get();
+        TxnID txnID = this.store.newTransaction(0L, null).get();
         TxnStatus txnStatus = this.store.getTxnStatus(txnID).get();
         assertEquals(txnStatus, TxnStatus.OPEN);
     }
 
     @Test
     public void testUpdateTxnStatusSuccess() throws Exception {
-        TxnID txnID = this.store.newTransaction(0L).get();
+        TxnID txnID = this.store.newTransaction(0L, null).get();
         TxnStatus txnStatus = this.store.getTxnStatus(txnID).get();
         assertEquals(txnStatus, TxnStatus.OPEN);
 
@@ -100,7 +100,7 @@ public class TransactionMetadataStoreProviderTest {
 
     @Test
     public void testUpdateTxnStatusNotExpectedStatus() throws Exception {
-        TxnID txnID = this.store.newTransaction(0L).get();
+        TxnID txnID = this.store.newTransaction(0L, null).get();
         TxnStatus txnStatus = this.store.getTxnStatus(txnID).get();
         assertEquals(txnStatus, TxnStatus.OPEN);
 
@@ -119,7 +119,7 @@ public class TransactionMetadataStoreProviderTest {
 
     @Test
     public void testUpdateTxnStatusCannotTransition() throws Exception {
-        TxnID txnID = this.store.newTransaction(0L).get();
+        TxnID txnID = this.store.newTransaction(0L, null).get();
         TxnStatus txnStatus = this.store.getTxnStatus(txnID).get();
         assertEquals(txnStatus, TxnStatus.OPEN);
 
@@ -138,7 +138,7 @@ public class TransactionMetadataStoreProviderTest {
 
     @Test
     public void testAddProducedPartition() throws Exception {
-        TxnID txnID = this.store.newTransaction(0L).get();
+        TxnID txnID = this.store.newTransaction(0L, null).get();
         TxnStatus txnStatus = this.store.getTxnStatus(txnID).get();
         assertEquals(txnStatus, TxnStatus.OPEN);
 
@@ -192,7 +192,7 @@ public class TransactionMetadataStoreProviderTest {
 
     @Test
     public void testAddAckedPartition() throws Exception {
-        TxnID txnID = this.store.newTransaction(0L).get();
+        TxnID txnID = this.store.newTransaction(0L, null).get();
         TxnStatus txnStatus = this.store.getTxnStatus(txnID).get();
         assertEquals(txnStatus, TxnStatus.OPEN);