You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ni...@apache.org on 2023/02/09 12:46:26 UTC

[pulsar] branch branch-2.11 updated: [fix][txn] Always send correct transaction id in end txn response (#19137)

This is an automated email from the ASF dual-hosted git repository.

nicoloboschi pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new a835a845db2 [fix][txn] Always send correct transaction id in end txn response (#19137)
a835a845db2 is described below

commit a835a845db24309721f91b1db2bb6f59078a9346
Author: Nicolò Boschi <bo...@gmail.com>
AuthorDate: Tue Jan 10 07:14:47 2023 +0100

    [fix][txn] Always send correct transaction id in end txn response (#19137)
    
    (cherry picked from commit 90764e7a03034589764343202196508e7950584e)
---
 .../pulsar/broker/service/PulsarCommandSender.java |   2 +-
 .../broker/service/PulsarCommandSenderImpl.java    |   7 +-
 .../apache/pulsar/broker/service/ServerCnx.java    |  44 ++--
 .../pulsar/broker/service/ServerCnxTest.java       | 285 ++++++++++++++++++++-
 .../broker/service/utils/ClientChannelHelper.java  |  35 ++-
 .../client/impl/TransactionMetaStoreHandler.java   |  62 +++--
 .../apache/pulsar/common/protocol/Commands.java    |  16 +-
 7 files changed, 383 insertions(+), 68 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java
index dc5b97d846f..95c54718c36 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java
@@ -88,7 +88,7 @@ public interface PulsarCommandSender {
 
     void sendNewTxnResponse(long requestId, TxnID txnID, long tcID);
 
-    void sendNewTxnErrorResponse(long requestId, long txnID, ServerError error, String message);
+    void sendNewTxnErrorResponse(long requestId, long tcID, ServerError error, String message);
 
     void sendEndTxnResponse(long requestId, TxnID txnID, int txnAction);
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
index e61742668a9..8dd5a451b2b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
@@ -332,8 +332,8 @@ public class PulsarCommandSenderImpl implements PulsarCommandSender {
     }
 
     @Override
-    public void sendNewTxnErrorResponse(long requestId, long txnID, ServerError error, String message) {
-        BaseCommand command = Commands.newTxnResponse(requestId, txnID, error, message);
+    public void sendNewTxnErrorResponse(long requestId, long tcID, ServerError error, String message) {
+        BaseCommand command = Commands.newTxnResponse(requestId, tcID, error, message);
         safeIntercept(command, cnx);
         ByteBuf outBuf = Commands.serializeWithSize(command);
         cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
@@ -353,7 +353,8 @@ public class PulsarCommandSenderImpl implements PulsarCommandSender {
 
     @Override
     public void sendEndTxnErrorResponse(long requestId, TxnID txnID, ServerError error, String message) {
-        BaseCommand command = Commands.newEndTxnResponse(requestId, txnID.getMostSigBits(), error, message);
+        BaseCommand command = Commands.newEndTxnResponse(requestId, txnID.getLeastSigBits(),
+                txnID.getMostSigBits(), error, message);
         safeIntercept(command, cnx);
         ByteBuf outBuf = Commands.serializeWithSize(command);
         cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 7e6600eb197..cc7c67e8cd5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -2367,7 +2367,9 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                     } else {
                         ex = handleTxnException(ex, BaseCommand.Type.ADD_PARTITION_TO_TXN.name(), requestId);
 
-                        ctx.writeAndFlush(Commands.newAddPartitionToTxnResponse(requestId, txnID.getMostSigBits(),
+                        ctx.writeAndFlush(Commands.newAddPartitionToTxnResponse(requestId,
+                                txnID.getLeastSigBits(),
+                                txnID.getMostSigBits(),
                                 BrokerServiceException.getClientErrorCode(ex),
                                 ex.getMessage()));
                         transactionMetadataStoreService.handleOpFail(ex, tcId);
@@ -2412,7 +2414,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
         final long requestId = command.getRequestId();
         final String topic = command.getTopic();
         final int txnAction = command.getTxnAction().getValue();
-        TxnID txnID = new TxnID(command.getTxnidMostBits(), command.getTxnidLeastBits());
+        final TxnID txnID = new TxnID(command.getTxnidMostBits(), command.getTxnidLeastBits());
         final long lowWaterMark = command.getTxnidLeastBitsOfLowWatermark();
 
         if (log.isDebugEnabled()) {
@@ -2448,7 +2450,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                                 ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(requestId,
                                         ServerError.ServiceNotReady,
                                         "The topic " + topic + " does not exist in broker.",
-                                        txnID.getMostSigBits(), txnID.getLeastSigBits()));
+                                        txnID.getLeastSigBits(), txnID.getMostSigBits()));
                             } else {
                                 log.warn("handleEndTxnOnPartition fail ! The topic {} has not been created, "
                                                 + "txnId: [{}], txnAction: [{}]",
@@ -2457,13 +2459,13 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                                         txnID.getLeastSigBits(), txnID.getMostSigBits()));
                             }
                         }).exceptionally(e -> {
-                    log.error("handleEndTxnOnPartition fail ! topic {}, "
-                                    + "txnId: [{}], txnAction: [{}]", topic, txnID,
-                            TxnAction.valueOf(txnAction), e.getCause());
-                    ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(
-                            requestId, ServerError.ServiceNotReady,
-                            e.getMessage(), txnID.getLeastSigBits(), txnID.getMostSigBits()));
-                    return null;
+                            log.error("handleEndTxnOnPartition fail ! topic {}, "
+                                            + "txnId: [{}], txnAction: [{}]", topic, txnID,
+                                    TxnAction.valueOf(txnAction), e.getCause());
+                            ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(
+                                    requestId, ServerError.ServiceNotReady,
+                                    e.getMessage(), txnID.getLeastSigBits(), txnID.getMostSigBits()));
+                            return null;
                 });
             }
         }).exceptionally(e -> {
@@ -2517,7 +2519,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                         ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
                                 requestId, txnidLeastBits, txnidMostBits,
                                 BrokerServiceException.getClientErrorCode(e),
-                                "Handle end txn on subscription failed."));
+                                "Handle end txn on subscription failed: " + e.getMessage()));
                         return;
                     }
                     ctx.writeAndFlush(
@@ -2530,7 +2532,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                             if (b) {
                                 log.error("handleEndTxnOnSubscription fail! The topic {} does not exist in broker, "
                                                 + "subscription: {}, txnId: [{}], txnAction: [{}]", topic, subName,
-                                        new TxnID(txnidMostBits, txnidLeastBits), TxnAction.valueOf(txnAction));
+                                        txnID, TxnAction.valueOf(txnAction));
                                 ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
                                         requestId, txnID.getLeastSigBits(), txnID.getMostSigBits(),
                                         ServerError.ServiceNotReady,
@@ -2543,13 +2545,13 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                                         txnID.getLeastSigBits(), txnID.getMostSigBits()));
                             }
                         }).exceptionally(e -> {
-                    log.error("handleEndTxnOnSubscription fail ! topic {}, subscription: {}"
-                                    + "txnId: [{}], txnAction: [{}]", topic, subName,
-                            txnID, TxnAction.valueOf(txnAction), e.getCause());
-                    ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
-                            requestId, txnID.getLeastSigBits(), txnID.getMostSigBits(),
-                            ServerError.ServiceNotReady, e.getMessage()));
-                    return null;
+                            log.error("handleEndTxnOnSubscription fail ! topic {}, subscription: {}"
+                                            + "txnId: [{}], txnAction: [{}]", topic, subName,
+                                    txnID, TxnAction.valueOf(txnAction), e.getCause());
+                            ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
+                                    requestId, txnID.getLeastSigBits(), txnID.getMostSigBits(),
+                                    ServerError.ServiceNotReady, e.getMessage()));
+                            return null;
                 });
             }
         }).exceptionally(e -> {
@@ -2559,7 +2561,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
             ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
                     requestId, txnidLeastBits, txnidMostBits,
                     ServerError.ServiceNotReady,
-                    "Handle end txn on subscription failed."));
+                    "Handle end txn on subscription failed: " + e.getMessage()));
             return null;
         });
     }
@@ -2616,7 +2618,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                     } else {
                         ex = handleTxnException(ex, BaseCommand.Type.ADD_SUBSCRIPTION_TO_TXN.name(), requestId);
 
-                        ctx.writeAndFlush(Commands.newAddSubscriptionToTxnResponse(requestId,
+                        ctx.writeAndFlush(Commands.newAddSubscriptionToTxnResponse(requestId, txnID.getLeastSigBits(),
                                 txnID.getMostSigBits(), BrokerServiceException.getClientErrorCode(ex),
                                 ex.getMessage()));
                         transactionMetadataStoreService.handleOpFail(ex, tcId);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 0c916988662..a8dcad16029 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -22,6 +22,10 @@ import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructor
 import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockBookKeeper;
 import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockZooKeeper;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyObject;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.CALLS_REAL_METHODS;
 import static org.mockito.Mockito.doAnswer;
@@ -75,13 +79,13 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.TransactionMetadataStoreService;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.broker.authentication.AuthenticationProvider;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.broker.authentication.AuthenticationState;
 import org.apache.pulsar.broker.authorization.AuthorizationService;
 import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
-import org.apache.pulsar.broker.cache.ConfigurationCacheService;
 import org.apache.pulsar.broker.intercept.BrokerInterceptor;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.resources.NamespaceResources;
@@ -91,14 +95,20 @@ import org.apache.pulsar.broker.service.ServerCnx.State;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.service.schema.DefaultSchemaRegistryService;
 import org.apache.pulsar.broker.service.utils.ClientChannelHelper;
+import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.common.api.AuthData;
 import org.apache.pulsar.common.api.proto.AuthMethod;
 import org.apache.pulsar.common.api.proto.BaseCommand;
 import org.apache.pulsar.common.api.proto.BaseCommand.Type;
 import org.apache.pulsar.common.api.proto.CommandAck.AckType;
+import org.apache.pulsar.common.api.proto.CommandAddPartitionToTxnResponse;
+import org.apache.pulsar.common.api.proto.CommandAddSubscriptionToTxnResponse;
 import org.apache.pulsar.common.api.proto.CommandAuthResponse;
 import org.apache.pulsar.common.api.proto.CommandCloseProducer;
 import org.apache.pulsar.common.api.proto.CommandConnected;
+import org.apache.pulsar.common.api.proto.CommandEndTxnOnPartitionResponse;
+import org.apache.pulsar.common.api.proto.CommandEndTxnOnSubscriptionResponse;
+import org.apache.pulsar.common.api.proto.CommandEndTxnResponse;
 import org.apache.pulsar.common.api.proto.CommandError;
 import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
 import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespaceResponse;
@@ -114,6 +124,8 @@ import org.apache.pulsar.common.api.proto.CommandWatchTopicListSuccess;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.api.proto.ProtocolVersion;
 import org.apache.pulsar.common.api.proto.ServerError;
+import org.apache.pulsar.common.api.proto.Subscription;
+import org.apache.pulsar.common.api.proto.TxnAction;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.AuthAction;
@@ -151,7 +163,6 @@ public class ServerCnxTest {
     private ClientChannelHelper clientChannelHelper;
     private PulsarService pulsar;
     private MetadataStoreExtended store;
-    private ConfigurationCacheService configCacheService;
     private NamespaceResources namespaceResources;
     protected NamespaceService namespaceService;
     private final int currentProtocolVersion = ProtocolVersion.values()[ProtocolVersion.values().length - 1]
@@ -2154,7 +2165,7 @@ public class ServerCnxTest {
                     return false;
                 }
             };
-            Mockito.when(consumers.putIfAbsent(Mockito.anyLong(), Mockito.any())).thenReturn(existingConsumerFuture);
+            Mockito.when(consumers.putIfAbsent(anyLong(), Mockito.any())).thenReturn(existingConsumerFuture);
             // do test: delay complete after execute 'isDone()' many times
             // Why is the design so complicated, see: https://github.com/apache/pulsar/pull/15051
             try (MockedStatic<ConcurrentLongHashMap> theMock = Mockito.mockStatic(ConcurrentLongHashMap.class)) {
@@ -2193,12 +2204,12 @@ public class ServerCnxTest {
         }
         // case3: exists existingConsumerFuture, already complete and exception
         CompletableFuture existingConsumerFuture = Mockito.mock(CompletableFuture.class);
-        Mockito.when(consumers.putIfAbsent(Mockito.anyLong(), Mockito.any())).thenReturn(existingConsumerFuture);
+        Mockito.when(consumers.putIfAbsent(anyLong(), Mockito.any())).thenReturn(existingConsumerFuture);
         // make consumerFuture delay finish
         Mockito.when(existingConsumerFuture.isDone()).thenReturn(true);
         // when sync get return, future will return success value.
         Mockito.when(existingConsumerFuture.get()).thenThrow(new NullPointerException());
-        Mockito.when(existingConsumerFuture.get(Mockito.anyLong(), Mockito.any())).
+        Mockito.when(existingConsumerFuture.get(anyLong(), Mockito.any())).
                 thenThrow(new NullPointerException());
         Mockito.when(existingConsumerFuture.isCompletedExceptionally()).thenReturn(true);
         Mockito.when(existingConsumerFuture.getNow(Mockito.any())).thenThrow(new NullPointerException());
@@ -2421,4 +2432,268 @@ public class ServerCnxTest {
         assertEquals(((CommandPartitionedTopicMetadataResponse) response).getError(), ServerError.ServiceNotReady);
         channel.finish();
     }
+
+    @Test(timeOut = 30000)
+    public void sendAddPartitionToTxnResponse() throws Exception {
+        final TransactionMetadataStoreService txnStore = mock(TransactionMetadataStoreService.class);
+        when(txnStore.addProducedPartitionToTxn(any(TxnID.class), any()))
+                .thenReturn(CompletableFuture.completedFuture(null));
+        when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore);
+        svcConfig.setTransactionCoordinatorEnabled(true);
+        resetChannel();
+        setChannelConnected();
+        ByteBuf clientCommand = Commands.newAddPartitionToTxn(89L, 1L, 12L,
+                List.of("tenant/ns/topic1"));
+        channel.writeInbound(clientCommand);
+        CommandAddPartitionToTxnResponse response = (CommandAddPartitionToTxnResponse) getResponse();
+
+        assertEquals(response.getRequestId(), 89L);
+        assertEquals(response.getTxnidLeastBits(), 1L);
+        assertEquals(response.getTxnidMostBits(), 12L);
+        assertFalse(response.hasError());
+        assertFalse(response.hasMessage());
+
+        channel.finish();
+    }
+
+    @Test(timeOut = 30000)
+    public void sendAddPartitionToTxnResponseFailed() throws Exception {
+        final TransactionMetadataStoreService txnStore = mock(TransactionMetadataStoreService.class);
+        when(txnStore.addProducedPartitionToTxn(any(TxnID.class), any()))
+                .thenReturn(CompletableFuture.failedFuture(new RuntimeException("server error")));
+        when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore);
+        svcConfig.setTransactionCoordinatorEnabled(true);
+        resetChannel();
+        setChannelConnected();
+        ByteBuf clientCommand = Commands.newAddPartitionToTxn(89L, 1L, 12L,
+                List.of("tenant/ns/topic1"));
+        channel.writeInbound(clientCommand);
+        CommandAddPartitionToTxnResponse response = (CommandAddPartitionToTxnResponse) getResponse();
+
+        assertEquals(response.getRequestId(), 89L);
+        assertEquals(response.getTxnidLeastBits(), 1L);
+        assertEquals(response.getTxnidMostBits(), 12L);
+        assertEquals(response.getError().getValue(), 0);
+        assertEquals(response.getMessage(), "server error");
+
+        channel.finish();
+    }
+
+    @Test(timeOut = 30000)
+    public void sendAddSubscriptionToTxnResponse() throws Exception {
+        final TransactionMetadataStoreService txnStore = mock(TransactionMetadataStoreService.class);
+        when(txnStore.addAckedPartitionToTxn(any(TxnID.class), any()))
+                .thenReturn(CompletableFuture.completedFuture(null));
+        when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore);
+        svcConfig.setTransactionCoordinatorEnabled(true);
+        resetChannel();
+        setChannelConnected();
+        final Subscription sub = new Subscription();
+        sub.setTopic("topic1");
+        sub.setSubscription("sub1");
+        ByteBuf clientCommand = Commands.newAddSubscriptionToTxn(89L, 1L, 12L,
+                List.of(sub));
+        channel.writeInbound(clientCommand);
+        CommandAddSubscriptionToTxnResponse response = (CommandAddSubscriptionToTxnResponse) getResponse();
+
+        assertEquals(response.getRequestId(), 89L);
+        assertEquals(response.getTxnidLeastBits(), 1L);
+        assertEquals(response.getTxnidMostBits(), 12L);
+        assertFalse(response.hasError());
+        assertFalse(response.hasMessage());
+
+        channel.finish();
+    }
+
+    @Test(timeOut = 30000)
+    public void sendAddSubscriptionToTxnResponseFailed() throws Exception {
+        final TransactionMetadataStoreService txnStore = mock(TransactionMetadataStoreService.class);
+        when(txnStore.addAckedPartitionToTxn(any(TxnID.class), any()))
+                .thenReturn(CompletableFuture.failedFuture(new RuntimeException("server error")));
+        when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore);
+        svcConfig.setTransactionCoordinatorEnabled(true);
+        resetChannel();
+        setChannelConnected();
+        final Subscription sub = new Subscription();
+        sub.setTopic("topic1");
+        sub.setSubscription("sub1");
+        ByteBuf clientCommand = Commands.newAddSubscriptionToTxn(89L, 1L, 12L,
+                List.of(sub));
+        channel.writeInbound(clientCommand);
+        CommandAddSubscriptionToTxnResponse response = (CommandAddSubscriptionToTxnResponse) getResponse();
+
+        assertEquals(response.getRequestId(), 89L);
+        assertEquals(response.getTxnidLeastBits(), 1L);
+        assertEquals(response.getTxnidMostBits(), 12L);
+        assertEquals(response.getError().getValue(), 0);
+        assertEquals(response.getMessage(), "server error");
+
+        channel.finish();
+    }
+
+
+    @Test(timeOut = 30000)
+    public void sendEndTxnResponse() throws Exception {
+        final TransactionMetadataStoreService txnStore = mock(TransactionMetadataStoreService.class);
+        when(txnStore.endTransaction(any(TxnID.class), anyInt(), anyBoolean()))
+                .thenReturn(CompletableFuture.completedFuture(null));
+        when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore);
+        svcConfig.setTransactionCoordinatorEnabled(true);
+        resetChannel();
+        setChannelConnected();
+        ByteBuf clientCommand = Commands.serializeWithSize(Commands.newEndTxn(89L, 1L, 12L,
+                TxnAction.COMMIT));
+        channel.writeInbound(clientCommand);
+        CommandEndTxnResponse response = (CommandEndTxnResponse) getResponse();
+
+        assertEquals(response.getRequestId(), 89L);
+        assertEquals(response.getTxnidLeastBits(), 1L);
+        assertEquals(response.getTxnidMostBits(), 12L);
+        assertFalse(response.hasError());
+        assertFalse(response.hasMessage());
+
+        channel.finish();
+    }
+
+    @Test(timeOut = 30000)
+    public void sendEndTxnResponseFailed() throws Exception {
+        final TransactionMetadataStoreService txnStore = mock(TransactionMetadataStoreService.class);
+        when(txnStore.endTransaction(any(TxnID.class), anyInt(), anyBoolean()))
+                .thenReturn(CompletableFuture.failedFuture(new RuntimeException("server error")));
+        when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore);
+        svcConfig.setTransactionCoordinatorEnabled(true);
+        resetChannel();
+        setChannelConnected();
+        ByteBuf clientCommand = Commands.serializeWithSize(Commands.newEndTxn(89L, 1L, 12L,
+                TxnAction.COMMIT));
+        channel.writeInbound(clientCommand);
+        CommandEndTxnResponse response = (CommandEndTxnResponse) getResponse();
+
+        assertEquals(response.getRequestId(), 89L);
+        assertEquals(response.getTxnidLeastBits(), 1L);
+        assertEquals(response.getTxnidMostBits(), 12L);
+        assertEquals(response.getError().getValue(), 0);
+        assertEquals(response.getMessage(), "server error");
+
+        channel.finish();
+    }
+
+    @Test(timeOut = 30000)
+    public void sendEndTxnOnPartitionResponse() throws Exception {
+        final TransactionMetadataStoreService txnStore = mock(TransactionMetadataStoreService.class);
+        when(txnStore.endTransaction(any(TxnID.class), anyInt(), anyBoolean()))
+                .thenReturn(CompletableFuture.completedFuture(null));
+        when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore);
+
+        svcConfig.setTransactionCoordinatorEnabled(true);
+        resetChannel();
+        setChannelConnected();
+        Topic topic = mock(Topic.class);
+        doReturn(CompletableFuture.completedFuture(null)).when(topic).endTxn(any(TxnID.class), anyInt(), anyLong());
+        doReturn(CompletableFuture.completedFuture(Optional.of(topic))).when(brokerService).getTopicIfExists(any(String.class));
+        ByteBuf clientCommand = Commands.newEndTxnOnPartition(89L, 1L, 12L,
+                successTopicName, TxnAction.COMMIT, 1L);
+        channel.writeInbound(clientCommand);
+        CommandEndTxnOnPartitionResponse response = (CommandEndTxnOnPartitionResponse) getResponse();
+
+        assertEquals(response.getRequestId(), 89L);
+        assertEquals(response.getTxnidLeastBits(), 1L);
+        assertEquals(response.getTxnidMostBits(), 12L);
+        assertFalse(response.hasError());
+        assertFalse(response.hasMessage());
+
+        channel.finish();
+    }
+
+    @Test(timeOut = 30000)
+    public void sendEndTxnOnPartitionResponseFailed() throws Exception {
+        final TransactionMetadataStoreService txnStore = mock(TransactionMetadataStoreService.class);
+        when(txnStore.endTransaction(any(TxnID.class), anyInt(), anyBoolean()))
+                .thenReturn(CompletableFuture.completedFuture(null));
+        when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore);
+
+        svcConfig.setTransactionCoordinatorEnabled(true);
+        resetChannel();
+        setChannelConnected();
+        Topic topic = mock(Topic.class);
+        doReturn(CompletableFuture.failedFuture(new RuntimeException("server error"))).when(topic).endTxn(any(TxnID.class), anyInt(), anyLong());
+        doReturn(CompletableFuture.completedFuture(Optional.of(topic))).when(brokerService).getTopicIfExists(any(String.class));
+        ByteBuf clientCommand = Commands.newEndTxnOnPartition(89L, 1L, 12L,
+                successTopicName, TxnAction.COMMIT, 1L);
+        channel.writeInbound(clientCommand);
+        CommandEndTxnOnPartitionResponse response = (CommandEndTxnOnPartitionResponse) getResponse();
+
+        assertEquals(response.getRequestId(), 89L);
+        assertEquals(response.getTxnidLeastBits(), 1L);
+        assertEquals(response.getTxnidMostBits(), 12L);
+        assertEquals(response.getError().getValue(), 0);
+        assertEquals(response.getMessage(), "server error");
+
+        channel.finish();
+    }
+
+    @Test(timeOut = 30000)
+    public void sendEndTxnOnSubscription() throws Exception {
+        final TransactionMetadataStoreService txnStore = mock(TransactionMetadataStoreService.class);
+        when(txnStore.endTransaction(any(TxnID.class), anyInt(), anyBoolean()))
+                .thenReturn(CompletableFuture.completedFuture(null));
+        when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore);
+
+        svcConfig.setTransactionCoordinatorEnabled(true);
+        resetChannel();
+        setChannelConnected();
+        Topic topic = mock(Topic.class);
+        final org.apache.pulsar.broker.service.Subscription sub = mock(org.apache.pulsar.broker.service.Subscription.class);
+        doReturn(sub).when(topic).getSubscription(any());
+        doReturn(CompletableFuture.completedFuture(null))
+                .when(sub).endTxn(anyLong(), anyLong(), anyInt(), anyLong());
+        doReturn(CompletableFuture.completedFuture(Optional.of(topic))).when(brokerService).getTopicIfExists(any(String.class));
+
+        ByteBuf clientCommand = Commands.newEndTxnOnSubscription(89L, 1L, 12L,
+                successTopicName, successSubName, TxnAction.COMMIT, 1L);
+        channel.writeInbound(clientCommand);
+        CommandEndTxnOnSubscriptionResponse response = (CommandEndTxnOnSubscriptionResponse) getResponse();
+
+        assertEquals(response.getRequestId(), 89L);
+        assertEquals(response.getTxnidLeastBits(), 1L);
+        assertEquals(response.getTxnidMostBits(), 12L);
+        assertFalse(response.hasError());
+        assertFalse(response.hasMessage());
+
+        channel.finish();
+    }
+
+
+    @Test(timeOut = 30000)
+    public void sendEndTxnOnSubscriptionFailed() throws Exception {
+        final TransactionMetadataStoreService txnStore = mock(TransactionMetadataStoreService.class);
+        when(txnStore.endTransaction(any(TxnID.class), anyInt(), anyBoolean()))
+                .thenReturn(CompletableFuture.completedFuture(null));
+        when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore);
+
+        svcConfig.setTransactionCoordinatorEnabled(true);
+        resetChannel();
+        setChannelConnected();
+        Topic topic = mock(Topic.class);
+
+        final org.apache.pulsar.broker.service.Subscription sub = mock(org.apache.pulsar.broker.service.Subscription.class);
+        doReturn(sub).when(topic).getSubscription(any());
+        doReturn(CompletableFuture.failedFuture(new RuntimeException("server error")))
+                .when(sub).endTxn(anyLong(), anyLong(), anyInt(), anyLong());
+        doReturn(CompletableFuture.completedFuture(Optional.of(topic))).when(brokerService).getTopicIfExists(any(String.class));
+
+        ByteBuf clientCommand = Commands.newEndTxnOnSubscription(89L, 1L, 12L,
+                successTopicName, successSubName, TxnAction.COMMIT, 1L);
+        channel.writeInbound(clientCommand);
+        CommandEndTxnOnSubscriptionResponse response = (CommandEndTxnOnSubscriptionResponse) getResponse();
+
+        assertEquals(response.getRequestId(), 89L);
+        assertEquals(response.getTxnidLeastBits(), 1L);
+        assertEquals(response.getTxnidMostBits(), 12L);
+        assertEquals(response.getError().getValue(), 0);
+        assertEquals(response.getMessage(), "Handle end txn on subscription failed: server error");
+
+        channel.finish();
+    }
+
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
index afd071a858b..f33b0fb093c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
@@ -19,7 +19,11 @@
 package org.apache.pulsar.broker.service.utils;
 
 import java.util.Queue;
-
+import org.apache.pulsar.common.api.proto.CommandAddPartitionToTxnResponse;
+import org.apache.pulsar.common.api.proto.CommandAddSubscriptionToTxnResponse;
+import org.apache.pulsar.common.api.proto.CommandEndTxnOnPartitionResponse;
+import org.apache.pulsar.common.api.proto.CommandEndTxnOnSubscriptionResponse;
+import org.apache.pulsar.common.api.proto.CommandEndTxnResponse;
 import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespaceResponse;
 import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadataResponse;
 import org.apache.pulsar.common.api.proto.CommandWatchTopicListSuccess;
@@ -168,6 +172,35 @@ public class ClientChannelHelper {
         protected void handlePartitionResponse(CommandPartitionedTopicMetadataResponse response) {
             queue.offer(new CommandPartitionedTopicMetadataResponse().copyFrom(response));
         }
+
+        @Override
+        protected void handleAddPartitionToTxnResponse(
+                CommandAddPartitionToTxnResponse commandAddPartitionToTxnResponse) {
+            queue.offer(new CommandAddPartitionToTxnResponse().copyFrom(commandAddPartitionToTxnResponse));
+        }
+
+        @Override
+        protected void handleAddSubscriptionToTxnResponse(
+                CommandAddSubscriptionToTxnResponse commandAddSubscriptionToTxnResponse) {
+            queue.offer(new CommandAddSubscriptionToTxnResponse().copyFrom(commandAddSubscriptionToTxnResponse));
+        }
+
+        @Override
+        protected void handleEndTxnResponse(CommandEndTxnResponse commandEndTxnResponse) {
+            queue.offer(new CommandEndTxnResponse().copyFrom(commandEndTxnResponse));
+        }
+
+        @Override
+        protected void handleEndTxnOnPartitionResponse(
+                CommandEndTxnOnPartitionResponse commandEndTxnOnPartitionResponse) {
+            queue.offer(new CommandEndTxnOnPartitionResponse().copyFrom(commandEndTxnOnPartitionResponse));
+        }
+
+        @Override
+        protected void handleEndTxnOnSubscriptionResponse(
+                CommandEndTxnOnSubscriptionResponse commandEndTxnOnSubscriptionResponse) {
+            queue.offer(new CommandEndTxnOnSubscriptionResponse().copyFrom(commandEndTxnOnSubscriptionResponse));
+        }
     };
 
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
index baff5deb3f1..15533cfd2a9 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
@@ -217,9 +217,9 @@ public class TransactionMetaStoreHandler extends HandlerState
     }
 
     void handleNewTxnResponse(CommandNewTxnResponse response) {
-        boolean hasError = response.hasError();
-        ServerError error;
-        String message;
+        final boolean hasError = response.hasError();
+        final ServerError error;
+        final String message;
         if (hasError) {
              error = response.getError();
              message = response.getMessage();
@@ -227,14 +227,13 @@ public class TransactionMetaStoreHandler extends HandlerState
             error = null;
             message = null;
         }
-        TxnID txnID = new TxnID(response.getTxnidMostBits(), response.getTxnidLeastBits());
-        long requestId = response.getRequestId();
+        final TxnID txnID = new TxnID(response.getTxnidMostBits(), response.getTxnidLeastBits());
+        final long requestId = response.getRequestId();
         internalPinnedExecutor.execute(() -> {
             OpForTxnIdCallBack op = (OpForTxnIdCallBack) pendingRequests.remove(requestId);
             if (op == null) {
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug("Got new txn response for timeout {} - {}", txnID.getMostSigBits(),
-                            txnID.getLeastSigBits());
+                    LOG.debug("Got new txn response for transaction {}", txnID);
                 }
                 return;
             }
@@ -301,9 +300,9 @@ public class TransactionMetaStoreHandler extends HandlerState
     }
 
     void handleAddPublishPartitionToTxnResponse(CommandAddPartitionToTxnResponse response) {
-        boolean hasError = response.hasError();
-        ServerError error;
-        String message;
+        final boolean hasError = response.hasError();
+        final ServerError error;
+        final String message;
         if (hasError) {
             error = response.getError();
             message = response.getMessage();
@@ -311,14 +310,13 @@ public class TransactionMetaStoreHandler extends HandlerState
             error = null;
             message = null;
         }
-        TxnID txnID = new TxnID(response.getTxnidMostBits(), response.getTxnidLeastBits());
-        long requestId = response.getRequestId();
+        final TxnID txnID = new TxnID(response.getTxnidMostBits(), response.getTxnidLeastBits());
+        final long requestId = response.getRequestId();
         internalPinnedExecutor.execute(() -> {
             OpForVoidCallBack op = (OpForVoidCallBack) pendingRequests.remove(requestId);
             if (op == null) {
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug("Got add publish partition to txn response for timeout {} - {}", txnID.getMostSigBits(),
-                            txnID.getLeastSigBits());
+                    LOG.debug("Got add publish partition to txn response for transaction {}", txnID);
                 }
                 return;
             }
@@ -352,8 +350,8 @@ public class TransactionMetaStoreHandler extends HandlerState
                             , op.backoff.next(), TimeUnit.MILLISECONDS);
                     return;
                 }
-                LOG.error("{} for request {} error {} with txnID {}.", BaseCommand.Type.ADD_PARTITION_TO_TXN.name(),
-                        requestId, error, txnID);
+                LOG.error("{} for request {}, transaction {}, error: {}",
+                        BaseCommand.Type.ADD_PARTITION_TO_TXN.name(), requestId, txnID, error);
 
             }
 
@@ -385,9 +383,9 @@ public class TransactionMetaStoreHandler extends HandlerState
     }
 
     public void handleAddSubscriptionToTxnResponse(CommandAddSubscriptionToTxnResponse response) {
-        boolean hasError = response.hasError();
-        ServerError error;
-        String message;
+        final boolean hasError = response.hasError();
+        final ServerError error;
+        final String message;
         if (hasError) {
             error = response.getError();
             message = response.getMessage();
@@ -395,7 +393,8 @@ public class TransactionMetaStoreHandler extends HandlerState
             error = null;
             message = null;
         }
-        long requestId = response.getRequestId();
+        final long requestId = response.getRequestId();
+        final TxnID txnID = new TxnID(response.getTxnidMostBits(), response.getTxnidLeastBits());
         internalPinnedExecutor.execute(() -> {
             OpForVoidCallBack op = (OpForVoidCallBack) pendingRequests.remove(requestId);
             if (op == null) {
@@ -411,8 +410,8 @@ public class TransactionMetaStoreHandler extends HandlerState
                 }
                 op.callback.complete(null);
             } else {
-                LOG.error("Add subscription to txn failed for request {} error {}.",
-                        requestId, error);
+                LOG.error("Add subscription to txn failed for request {}, transaction {}, error: {}",
+                        requestId, txnID, error);
                 if (checkIfNeedRetryByError(error, message, op)) {
                     if (LOG.isDebugEnabled()) {
                         LOG.debug("Get a response for {} request {} error TransactionCoordinatorNotFound and try it"
@@ -466,9 +465,9 @@ public class TransactionMetaStoreHandler extends HandlerState
     }
 
     void handleEndTxnResponse(CommandEndTxnResponse response) {
-        boolean hasError = response.hasError();
-        ServerError error;
-        String message;
+        final boolean hasError = response.hasError();
+        final ServerError error;
+        final String message;
         if (hasError) {
             error = response.getError();
             message = response.getMessage();
@@ -476,21 +475,20 @@ public class TransactionMetaStoreHandler extends HandlerState
             error = null;
             message = null;
         }
-        TxnID txnID = new TxnID(response.getTxnidMostBits(), response.getTxnidLeastBits());
-        long requestId = response.getRequestId();
+        final TxnID txnID = new TxnID(response.getTxnidMostBits(), response.getTxnidLeastBits());
+        final long requestId = response.getRequestId();
         internalPinnedExecutor.execute(() -> {
             OpForVoidCallBack op = (OpForVoidCallBack) pendingRequests.remove(requestId);
             if (op == null) {
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug("Got end txn response for timeout {} - {}", txnID.getMostSigBits(),
-                            txnID.getLeastSigBits());
+                    LOG.debug("Got end txn response for transaction but no requests pending for txn {}", txnID);
                 }
                 return;
             }
 
             if (!hasError) {
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug("Got end txn response success for request {}", requestId);
+                    LOG.debug("Got end txn response success for request {}, txn {}", requestId, txnID);
                 }
                 op.callback.complete(null);
             } else {
@@ -517,8 +515,8 @@ public class TransactionMetaStoreHandler extends HandlerState
                             , op.backoff.next(), TimeUnit.MILLISECONDS);
                     return;
                 }
-                LOG.error("Got {} response for request {} error {}", BaseCommand.Type.END_TXN.name(),
-                        requestId, error);
+                LOG.error("Got {} response for request {}, transaction {}, error: {}",
+                        BaseCommand.Type.END_TXN.name(), requestId, txnID, error);
 
             }
             onResponse(op);
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 61ce70a9a0a..99b229d500e 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -1344,12 +1344,16 @@ public class Commands {
         return serializeWithSize(cmd);
     }
 
-    public static ByteBuf newAddPartitionToTxnResponse(long requestId, long txnIdMostBits, ServerError error,
-           String errorMsg) {
+    public static ByteBuf newAddPartitionToTxnResponse(long requestId,
+                                                       long txnIdLeastBits,
+                                                       long txnIdMostBits,
+                                                       ServerError error,
+                                                       String errorMsg) {
         BaseCommand cmd = localCmd(Type.ADD_PARTITION_TO_TXN_RESPONSE);
         CommandAddPartitionToTxnResponse response = cmd.setAddPartitionToTxnResponse()
                 .setRequestId(requestId)
                 .setError(error)
+                .setTxnidLeastBits(txnIdLeastBits)
                 .setTxnidMostBits(txnIdMostBits);
 
         if (errorMsg != null) {
@@ -1378,12 +1382,13 @@ public class Commands {
         return serializeWithSize(cmd);
     }
 
-    public static ByteBuf newAddSubscriptionToTxnResponse(long requestId, long txnIdMostBits, ServerError error,
-          String errorMsg) {
+    public static ByteBuf newAddSubscriptionToTxnResponse(long requestId, long txnIdLeastBits,
+                                                          long txnIdMostBits, ServerError error, String errorMsg) {
         BaseCommand cmd = localCmd(Type.ADD_SUBSCRIPTION_TO_TXN_RESPONSE);
         CommandAddSubscriptionToTxnResponse response = cmd.setAddSubscriptionToTxnResponse()
                 .setRequestId(requestId)
                 .setTxnidMostBits(txnIdMostBits)
+                .setTxnidLeastBits(txnIdLeastBits)
                 .setError(error);
         if (errorMsg != null) {
             response.setMessage(errorMsg);
@@ -1409,11 +1414,12 @@ public class Commands {
         return cmd;
     }
 
-    public static BaseCommand newEndTxnResponse(long requestId, long txnIdMostBits,
+    public static BaseCommand newEndTxnResponse(long requestId, long txnIdLeastBits, long txnIdMostBits,
                                                 ServerError error, String errorMsg) {
         BaseCommand cmd = localCmd(Type.END_TXN_RESPONSE);
         CommandEndTxnResponse response = cmd.setEndTxnResponse()
                 .setRequestId(requestId)
+                .setTxnidLeastBits(txnIdLeastBits)
                 .setTxnidMostBits(txnIdMostBits)
                 .setError(error);
         if (errorMsg != null) {