You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/09/04 06:07:40 UTC

[pulsar] branch master updated: [Transaction] Handle Acknowledge In The Transaction (#7856)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1fdffe7  [Transaction] Handle Acknowledge In The Transaction (#7856)
1fdffe7 is described below

commit 1fdffe753bcc6ebe2c7888234cdd7ccd12766250
Author: ran <ga...@126.com>
AuthorDate: Fri Sep 4 14:07:19 2020 +0800

    [Transaction] Handle Acknowledge In The Transaction (#7856)
    
    Fix https://github.com/streamnative/pulsar/issues/1307
    
    Master Issue: #2664
    
    ### Motivation
    
    Handle acknowledge in the transaction.
    
    ### Modifications
    
    1. Register subscription to the transaction metadata store.
    2. Make the ack command carry the transaction information.
    3. When committing a transaction, commit every subscription in the transaction.
---
 .../broker/TransactionMetadataStoreService.java    | 25 +++++-
 .../org/apache/pulsar/broker/service/Consumer.java | 30 ++++++-
 .../apache/pulsar/broker/service/ServerCnx.java    | 57 +++++++++++--
 .../apache/pulsar/broker/service/Subscription.java |  2 +
 .../nonpersistent/NonPersistentSubscription.java   |  7 ++
 .../service/persistent/PersistentSubscription.java | 34 ++++++--
 .../TransactionMetadataStoreServiceTest.java       |  9 +-
 .../broker/transaction/TransactionProduceTest.java | 97 ++++++++++++++++++++++
 .../buffer/TransactionBufferClientTest.java        |  7 ++
 .../transaction/TransactionCoordinatorClient.java  | 21 +++++
 .../impl/AcknowledgmentsGroupingTracker.java       |  6 +-
 .../org/apache/pulsar/client/impl/ClientCnx.java   |  8 ++
 .../apache/pulsar/client/impl/ConsumerBase.java    |  6 +-
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 16 ++--
 ...NonPersistentAcknowledgmentGroupingTracker.java |  6 +-
 .../PersistentAcknowledgmentsGroupingTracker.java  | 44 ++++++----
 .../client/impl/TransactionMetaStoreHandler.java   | 39 ++++++++-
 .../TransactionCoordinatorClientImpl.java          | 26 ++++++
 .../client/impl/transaction/TransactionImpl.java   |  7 +-
 .../impl/AcknowledgementsGroupingTrackerTest.java  | 16 ++--
 .../apache/pulsar/common/protocol/Commands.java    |  6 +-
 .../coordinator/TransactionMetadataStore.java      |  2 +-
 .../coordinator/TransactionSubscription.java       | 61 ++++++++++++++
 .../pulsar/transaction/coordinator/TxnMeta.java    |  4 +-
 .../impl/InMemTransactionMetadataStore.java        |  3 +-
 .../transaction/coordinator/impl/TxnMetaImpl.java  | 11 +--
 .../TransactionMetadataStoreProviderTest.java      | 40 +++++----
 27 files changed, 500 insertions(+), 90 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
index b1ef17b..fd7ebc9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
@@ -31,6 +31,7 @@ import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
 import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
 import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreProvider;
+import org.apache.pulsar.transaction.coordinator.TransactionSubscription;
 import org.apache.pulsar.transaction.coordinator.TxnMeta;
 import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.CoordinatorNotFoundException;
 import org.apache.pulsar.transaction.impl.common.TxnStatus;
@@ -148,7 +149,7 @@ public class TransactionMetadataStoreService {
         return store.addProducedPartitionToTxn(txnId, partitions);
     }
 
-    public CompletableFuture<Void> addAckedPartitionToTxn(TxnID txnId, List<String> partitions) {
+    public CompletableFuture<Void> addAckedPartitionToTxn(TxnID txnId, List<TransactionSubscription> partitions) {
         TransactionCoordinatorID tcId = getTcIdFromTxnId(txnId);
         TransactionMetadataStore store = stores.get(tcId);
         if (store == null) {
@@ -207,12 +208,27 @@ public class TransactionMetadataStoreService {
 
     private CompletableFuture<Void> endToTB(TxnID txnID, int txnAction) {
         CompletableFuture<Void> resultFuture = new CompletableFuture<>();
-        List<CompletableFuture<TxnID>> commitFutureList = new ArrayList<>();
+        List<CompletableFuture<TxnID>> completableFutureList = new ArrayList<>();
         this.getTxnMeta(txnID).whenComplete((txnMeta, throwable) -> {
             if (throwable != null) {
                 resultFuture.completeExceptionally(throwable);
                 return;
             }
+
+            txnMeta.ackedPartitions().forEach(tbSub -> {
+                CompletableFuture<TxnID> actionFuture = new CompletableFuture<>();
+                if (PulsarApi.TxnAction.COMMIT_VALUE == txnAction) {
+                    actionFuture = tbClient.commitTxnOnSubscription(
+                            tbSub.getTopic(), tbSub.getSubscription(), txnID.getMostSigBits(), txnID.getLeastSigBits());
+                } else if (PulsarApi.TxnAction.ABORT_VALUE == txnAction) {
+                    actionFuture = tbClient.abortTxnOnSubscription(
+                            tbSub.getTopic(), tbSub.getSubscription(), txnID.getMostSigBits(), txnID.getLeastSigBits());
+                } else {
+                    actionFuture.completeExceptionally(new Throwable("Unsupported txnAction " + txnAction));
+                }
+                completableFutureList.add(actionFuture);
+            });
+
             txnMeta.producedPartitions().forEach(partition -> {
                 CompletableFuture<TxnID> actionFuture = new CompletableFuture<>();
                 if (PulsarApi.TxnAction.COMMIT_VALUE == txnAction) {
@@ -222,10 +238,11 @@ public class TransactionMetadataStoreService {
                 } else {
                     actionFuture.completeExceptionally(new Throwable("Unsupported txnAction " + txnAction));
                 }
-                commitFutureList.add(actionFuture);
+                completableFutureList.add(actionFuture);
             });
+
             try {
-                FutureUtil.waitForAll(commitFutureList).whenComplete((ignored, waitThrowable) -> {
+                FutureUtil.waitForAll(completableFutureList).whenComplete((ignored, waitThrowable) -> {
                     if (waitThrowable != null) {
                         resultFuture.completeExceptionally(waitThrowable);
                         return;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 4a78943..1fb8e74 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -46,6 +46,8 @@ import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap.Long
 import org.apache.commons.lang3.mutable.MutableInt;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.broker.transaction.buffer.impl.TransactionEntryImpl;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
+import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
@@ -59,6 +61,7 @@ import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.stats.Rate;
 import org.apache.pulsar.common.util.DateFormatter;
 import org.apache.pulsar.common.util.SafeCollectionUtils;
+import org.apache.pulsar.transaction.common.exception.TransactionConflictException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -413,7 +416,12 @@ public class Consumer {
                     position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId());
                 }
             }
-            subscription.acknowledgeMessage(Collections.singletonList(position), AckType.Cumulative, properties);
+            List<Position> positionsAcked = Collections.singletonList(position);
+            if (ack.hasTxnidMostBits() && ack.hasTxnidLeastBits()) {
+                transactionAcknowledge(ack.getTxnidMostBits(), ack.getTxnidLeastBits(), positionsAcked, AckType.Cumulative);
+            } else {
+                subscription.acknowledgeMessage(positionsAcked, AckType.Cumulative, properties);
+            }
         } else {
             // Individual ack
             List<Position> positionsAcked = new ArrayList<>();
@@ -436,7 +444,25 @@ public class Consumer {
                             consumerId, position, ack.getValidationError());
                 }
             }
-            subscription.acknowledgeMessage(positionsAcked, AckType.Individual, properties);
+            if (ack.hasTxnidMostBits() && ack.hasTxnidLeastBits()) {
+                transactionAcknowledge(ack.getTxnidMostBits(), ack.getTxnidLeastBits(), positionsAcked, AckType.Individual);
+            } else {
+                subscription.acknowledgeMessage(positionsAcked, AckType.Individual, properties);
+            }
+        }
+    }
+
+    private void transactionAcknowledge(long txnidMostBits, long txnidLeastBits,
+                                        List<Position> positionList, AckType ackType) {
+        if (subscription instanceof PersistentSubscription) {
+            TxnID txnID = new TxnID(txnidMostBits, txnidLeastBits);
+            try {
+                ((PersistentSubscription) subscription).acknowledgeMessage(txnID, positionList, ackType);
+            } catch (TransactionConflictException e) {
+                log.error("Transaction acknowledge failed for txn " + txnID, e);
+            }
+        } else {
+            log.error("Transaction acknowledge only support the `PersistentSubscription`.");
         }
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 071ef80..e8d644d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -36,6 +36,7 @@ import io.netty.channel.ChannelOption;
 import io.netty.handler.ssl.SslHandler;
 
 import java.net.SocketAddress;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
@@ -69,6 +70,7 @@ import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataExc
 import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
 import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionNotFoundException;
 import org.apache.pulsar.broker.service.BrokerServiceException.TopicNotFoundException;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
 import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
@@ -125,6 +127,7 @@ import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
 import org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite;
 import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
+import org.apache.pulsar.transaction.coordinator.TransactionSubscription;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -1740,7 +1743,48 @@ public class ServerCnx extends PulsarHandler {
 
     @Override
     protected void handleEndTxnOnSubscription(PulsarApi.CommandEndTxnOnSubscription command) {
-        ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(command.getRequestId(), command.getTxnidLeastBits(), command.getTxnidMostBits()));
+        final long requestId = command.getRequestId();
+        final long txnidMostBits = command.getTxnidMostBits();
+        final long txnidLeastBits = command.getTxnidLeastBits();
+        final String topic = command.getSubscription().getTopic();
+        final String subName = command.getSubscription().getSubscription();
+
+        service.getTopics().get(command.getSubscription().getTopic())
+            .thenAccept(optionalTopic -> {
+                if (!optionalTopic.isPresent()) {
+                    log.error("The topic {} is not exist in broker.", command.getSubscription().getTopic());
+                    ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
+                            requestId, txnidLeastBits, txnidMostBits,
+                            ServerError.UnknownError,
+                            "The topic " + topic + " is not exist in broker."));
+                    return;
+                }
+
+                Subscription subscription = optionalTopic.get().getSubscription(subName);
+                if (subscription == null) {
+                    log.error("Topic {} subscription {} is not exist.", optionalTopic.get().getName(), subName);
+                    ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
+                            requestId, txnidLeastBits, txnidMostBits,
+                            ServerError.UnknownError,
+                            "Topic " + optionalTopic.get().getName() + " subscription " + subName + " is not exist."));
+                    return;
+                }
+
+                CompletableFuture<Void> completableFuture =
+                        subscription.endTxn(txnidMostBits, txnidLeastBits, command.getTxnAction().getNumber());
+                completableFuture.whenComplete((ignored, throwable) -> {
+                    if (throwable != null) {
+                        log.error("Handle end txn on subscription failed for request {}", requestId);
+                        ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
+                                requestId, txnidLeastBits, txnidMostBits,
+                                ServerError.UnknownError,
+                                "Handle end txn on subscription failed."));
+                        return;
+                    }
+                    ctx.writeAndFlush(
+                            Commands.newEndTxnOnSubscriptionResponse(requestId, txnidLeastBits, txnidMostBits));
+                });
+            });
     }
 
     private CompletableFuture<SchemaVersion> tryAddSchema(Topic topic, SchemaData schema) {
@@ -1769,8 +1813,11 @@ public class ServerCnx extends PulsarHandler {
             log.debug("Receive add published partition to txn request {} from {} with txnId {}",
                     command.getRequestId(), remoteAddress, txnID);
         }
-        List<String> subscriptionList = command.getSubscriptionList().stream()
-                .map(subscription -> subscription.getTopic() + "|" + subscription.getSubscription())
+        List<TransactionSubscription> subscriptionList = command.getSubscriptionList().stream()
+                .map(subscription -> TransactionSubscription.builder()
+                        .topic(subscription.getTopic())
+                        .subscription(subscription.getSubscription())
+                        .build())
                 .collect(Collectors.toList());
         service.pulsar().getTransactionMetadataStoreService().addAckedPartitionToTxn(txnID, subscriptionList)
                 .whenComplete(((v, ex) -> {
@@ -1779,7 +1826,7 @@ public class ServerCnx extends PulsarHandler {
                             log.debug("Send response success for add published partition to txn request {}",
                                     command.getRequestId());
                         }
-                        ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(command.getRequestId(),
+                        ctx.writeAndFlush(Commands.newAddSubscriptionToTxnResponse(command.getRequestId(),
                                 txnID.getLeastSigBits(), txnID.getMostSigBits()));
                         log.info("handle add partition to txn finish.");
                     } else {
@@ -1787,7 +1834,7 @@ public class ServerCnx extends PulsarHandler {
                             log.debug("Send response error for add published partition to txn request {}",
                                     command.getRequestId(), ex);
                         }
-                        ctx.writeAndFlush(Commands.newAddPartitionToTxnResponse(command.getRequestId(),
+                        ctx.writeAndFlush(Commands.newAddSubscriptionToTxnResponse(command.getRequestId(),
                                 txnID.getMostSigBits(), BrokerServiceException.getClientErrorCode(ex),
                                 ex.getMessage()));
                     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
index 85b5415..8f6b5fc 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
@@ -101,6 +101,8 @@ public interface Subscription {
         // Default is no-op
     }
 
+    CompletableFuture<Void> endTxn(long txnidMostBits, long txnidLeastBits, int txnAction);
+
     // Subscription utils
     static boolean isCumulativeAckMode(SubType subType) {
         return SubType.Exclusive.equals(subType) || SubType.Failover.equals(subType);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
index fecfd09..8675703 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
@@ -474,6 +474,13 @@ public class NonPersistentSubscription implements Subscription {
         return CompletableFuture.completedFuture(null);
     }
 
+    @Override
+    public CompletableFuture<Void> endTxn(long txnidMostBits, long txnidLeastBits, int txnAction) {
+        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+        completableFuture.completeExceptionally(new Exception("Unsupported operation end txn for NonPersistentSubscription"));
+        return completableFuture;
+    }
+
     private static final Logger log = LoggerFactory.getLogger(NonPersistentSubscription.class);
 
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 5476b8d..7abd21d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -59,6 +59,7 @@ import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.broker.service.Dispatcher;
 import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 import org.apache.pulsar.common.api.proto.PulsarApi.KeySharedMeta;
@@ -66,6 +67,7 @@ import org.apache.pulsar.common.api.proto.PulsarMarkers.ReplicatedSubscriptionsS
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ConsumerStats;
 import org.apache.pulsar.common.policies.data.SubscriptionStats;
+import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
@@ -1158,14 +1160,22 @@ public class PersistentSubscription implements Subscription {
         List<Position> positions = pendingAckMessagesMap != null ? this.pendingAckMessagesMap.remove(txnId).values() :
                                                                                              Collections.emptyList();
         // Materialize all single acks.
-        cursor.asyncDelete(positions, deleteCallback, positions);
-        if (pendingAckMessages != null) {
-            positions.forEach(position -> this.pendingAckMessages.remove(position));
+        if (positions != null) {
+            cursor.asyncDelete(positions, deleteCallback, positions);
+            if (pendingAckMessages != null) {
+                positions.forEach(position -> this.pendingAckMessages.remove(position));
+            }
+        } else {
+            deleteFuture.complete(null);
         }
 
         // Materialize cumulative ack.
-        cursor.asyncMarkDelete(this.pendingCumulativeAckMessage, (null == properties)?
-                Collections.emptyMap() : properties, markDeleteCallback, this.pendingCumulativeAckMessage);
+        if (this.pendingCumulativeAckMessage != null) {
+            cursor.asyncMarkDelete(this.pendingCumulativeAckMessage, (null == properties)?
+                    Collections.emptyMap() : properties, markDeleteCallback, this.pendingCumulativeAckMessage);
+        } else {
+            marketDeleteFuture.complete(null);
+        }
 
         // Reset txdID and position for cumulative ack.
         PENDING_CUMULATIVE_ACK_TXNID_UPDATER.set(this, null);
@@ -1238,6 +1248,20 @@ public class PersistentSubscription implements Subscription {
         }
     }
 
+    @Override
+    public CompletableFuture<Void> endTxn(long txnidMostBits, long txnidLeastBits, int txnAction) {
+        TxnID txnID = new TxnID(txnidMostBits, txnidLeastBits);
+        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+        if (PulsarApi.TxnAction.COMMIT.getNumber() == txnAction) {
+            completableFuture = commitTxn(txnID, Collections.emptyMap());
+        } else if (PulsarApi.TxnAction.ABORT.getNumber() == txnAction) {
+            completableFuture = abortTxn(txnID, null);
+        } else {
+            completableFuture.completeExceptionally(new Exception("Unsupported txnAction " + txnAction));
+        }
+        return completableFuture;
+    }
+
     @VisibleForTesting
     public ManagedCursor getCursor() {
         return cursor;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java
index 12a664c..a99f918 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service;
 import org.apache.pulsar.broker.TransactionMetadataStoreService;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
+import org.apache.pulsar.transaction.coordinator.TransactionSubscription;
 import org.apache.pulsar.transaction.coordinator.TxnMeta;
 import org.apache.pulsar.transaction.impl.common.TxnStatus;
 import org.junit.Assert;
@@ -102,10 +103,10 @@ public class TransactionMetadataStoreServiceTest extends BrokerTestBase {
         transactionMetadataStoreService.addTransactionMetadataStore(TransactionCoordinatorID.get(0));
         Assert.assertEquals(transactionMetadataStoreService.getStores().size(), 1);
         TxnID txnID = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(0)).get();
-        List<String> partitions = new ArrayList<>();
-        partitions.add("ptn-0");
-        partitions.add("ptn-1");
-        partitions.add("ptn-2");
+        List<TransactionSubscription> partitions = new ArrayList<>();
+        partitions.add(TransactionSubscription.builder().topic("ptn-1").subscription("sub-1").build());
+        partitions.add(TransactionSubscription.builder().topic("ptn-2").subscription("sub-1").build());
+        partitions.add(TransactionSubscription.builder().topic("ptn-3").subscription("sub-1").build());
         transactionMetadataStoreService.addAckedPartitionToTxn(txnID, partitions);
         TxnMeta txn = transactionMetadataStoreService.getTxnMeta(txnID).get();
         assertEquals(txn.status(), TxnStatus.OPEN);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
index edcfd17..6bdc3f0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.transaction;
 import static java.nio.charset.StandardCharsets.UTF_8;
 
 import com.google.common.collect.Sets;
+import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
@@ -32,12 +33,20 @@ import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.ReadOnlyCursor;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.transaction.buffer.impl.PersistentTransactionBuffer;
+import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.transaction.Transaction;
+import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
 import org.apache.pulsar.client.impl.PartitionedProducerImpl;
 import org.apache.pulsar.client.impl.ProducerImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
@@ -50,6 +59,7 @@ import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.Markers;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -330,4 +340,91 @@ public class TransactionProduceTest extends TransactionTestBase {
         }
     }
 
+    @Test
+    public void ackCommitTest() throws Exception {
+        final String subscriptionName = "ackCommitTest";
+        Transaction txn = ((PulsarClientImpl) pulsarClient)
+                .newTransaction()
+                .withTransactionTimeout(5, TimeUnit.SECONDS)
+                .build().get();
+        log.info("init transaction {}.", txn);
+
+        Producer<byte[]> incomingProducer = pulsarClient.newProducer()
+                .topic(TOPIC_OUTPUT)
+                .batchingMaxMessages(1)
+                .roundRobinRouterBatchingPartitionSwitchFrequency(1)
+                .create();
+        int incomingMessageCnt = 10;
+        for (int i = 0; i < incomingMessageCnt; i++) {
+            incomingProducer.newMessage().value("Hello Txn.".getBytes()).sendAsync();
+        }
+        log.info("prepare incoming messages finished.");
+
+        MultiTopicsConsumerImpl<byte[]> consumer = (MultiTopicsConsumerImpl<byte[]>) pulsarClient.newConsumer()
+                .topic(TOPIC_OUTPUT)
+                .subscriptionName(subscriptionName)
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .enableBatchIndexAcknowledgment(true)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscribe();
+
+        for (int i = 0; i < incomingMessageCnt; i++) {
+            Message<byte[]> message = consumer.receive();
+            log.info("receive messageId: {}", message.getMessageId());
+            consumer.acknowledgeAsync(message.getMessageId(), txn);
+        }
+
+        Thread.sleep(1000);
+
+        // The pending messages count should be the incomingMessageCnt
+        Assert.assertEquals(getPendingAckCount(subscriptionName), incomingMessageCnt);
+
+        consumer.redeliverUnacknowledgedMessages();
+        for (int i = 0; i < incomingMessageCnt; i++) {
+            Message<byte[]> message = consumer.receive(2, TimeUnit.SECONDS);
+            Assert.assertNull(message);
+        }
+
+        // The pending messages count should be the incomingMessageCnt
+        Assert.assertEquals(getPendingAckCount(subscriptionName), incomingMessageCnt);
+
+        txn.commit().get();
+
+        Thread.sleep(1000);
+
+        // After commit, the pending messages count should be 0
+        Assert.assertEquals(getPendingAckCount(subscriptionName), 0);
+
+        consumer.redeliverUnacknowledgedMessages();
+        for (int i = 0; i < incomingMessageCnt; i++) {
+            Message<byte[]> message = consumer.receive(2, TimeUnit.SECONDS);
+            Assert.assertNull(message);
+        }
+
+        log.info("finish test ackCommitTest");
+    }
+
+    private int getPendingAckCount(String subscriptionName) throws Exception {
+        Class<PersistentSubscription> clazz = PersistentSubscription.class;
+        Field field = clazz.getDeclaredField("pendingAckMessages");
+        field.setAccessible(true);
+
+        int pendingAckCount = 0;
+        for (PulsarService pulsarService : getPulsarServiceList()) {
+            for (String key : pulsarService.getBrokerService().getTopics().keys()) {
+                if (key.startsWith("persistent://" + TOPIC_OUTPUT)) {
+                    PersistentSubscription subscription =
+                            (PersistentSubscription) pulsarService.getBrokerService()
+                                    .getTopics().get(key).get().get().getSubscription(subscriptionName);
+                    ConcurrentOpenHashSet<Position> set = (ConcurrentOpenHashSet<Position>) field.get(subscription);
+                    if (set != null) {
+                        pendingAckCount += set.size();
+                    }
+                }
+            }
+        }
+        log.info("pendingAckCount: {}", pendingAckCount);
+        return pendingAckCount;
+    }
+
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
index 159b5c1..8e666a9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.transaction.buffer;
 import com.google.common.collect.Sets;
 import org.apache.pulsar.broker.TransactionMetadataStoreService;
 import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientImpl;
 import org.apache.pulsar.broker.transaction.coordinator.TransactionMetaStoreTestBase;
@@ -31,6 +32,7 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -69,9 +71,14 @@ public class TransactionBufferClientTest extends TransactionMetaStoreTestBase {
     public void afterPulsarStart() throws Exception {
         super.afterPulsarStart();
         for (int i = 0; i < pulsarServices.length; i++) {
+            Subscription mockSubscription = Mockito.mock(Subscription.class);
+            Mockito.when(mockSubscription.endTxn(Mockito.anyLong(), Mockito.anyLong(), Mockito.anyInt()))
+                    .thenReturn(CompletableFuture.completedFuture(null));
+
             Topic mockTopic = Mockito.mock(Topic.class);
             Mockito.when(mockTopic.endTxn(Mockito.any(), Mockito.anyInt()))
                     .thenReturn(CompletableFuture.completedFuture(null));
+            Mockito.when(mockTopic.getSubscription(Mockito.any())).thenReturn(mockSubscription);
 
             ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>> topicMap =
                     Mockito.mock(ConcurrentOpenHashMap.class);
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TransactionCoordinatorClient.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TransactionCoordinatorClient.java
index 18454ce..90e0be6 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TransactionCoordinatorClient.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TransactionCoordinatorClient.java
@@ -126,6 +126,27 @@ public interface TransactionCoordinatorClient extends Closeable {
     CompletableFuture<Void> addPublishPartitionToTxnAsync(TxnID txnID, List<String> partitions);
 
     /**
+     * Add ack subscription to txn.
+     *
+     * @param txnID transaction id
+     * @param topic topic name
+     * @param subscription subscription name
+     * @throws TransactionCoordinatorClientException
+     */
+    void addSubscriptionToTxn(TxnID txnID, String topic, String subscription)
+            throws TransactionCoordinatorClientException;
+
+    /**
+     * Add ack subscription to txn asynchronously.
+     *
+     * @param txnID transaction id
+     * @param topic topic name
+     * @param subscription subscription name
+     * @return
+     */
+    CompletableFuture<Void> addSubscriptionToTxnAsync(TxnID txnID, String topic, String subscription);
+
+    /**
      * Commit txn.
      * @param txnID txn id to commit.
      */
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java
index a624e15..bf821c5 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java
@@ -30,11 +30,13 @@ public interface AcknowledgmentsGroupingTracker extends AutoCloseable {
 
     boolean isDuplicate(MessageId messageId);
 
-    void addAcknowledgment(MessageIdImpl msgId, AckType ackType, Map<String, Long> properties);
+    void addAcknowledgment(MessageIdImpl msgId, AckType ackType, Map<String, Long> properties,
+                           long txnidMostBits, long txnidLeastBits);
 
     void addListAcknowledgment(List<MessageIdImpl> messageIds, AckType ackType, Map<String, Long> properties);
 
-    void addBatchIndexAcknowledgment(BatchMessageIdImpl msgId, int batchIndex, int batchSize, AckType ackType, Map<String, Long> properties);
+    void addBatchIndexAcknowledgment(BatchMessageIdImpl msgId, int batchIndex, int batchSize, AckType ackType,
+                                     Map<String, Long> properties, long txnidMostBits, long txnidLeastBits);
 
     void flush();
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 975ac37..b32dc99 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -868,6 +868,14 @@ public class ClientCnx extends PulsarHandler {
     }
 
     @Override
+    protected void handleAddSubscriptionToTxnResponse(PulsarApi.CommandAddSubscriptionToTxnResponse command) {
+        TransactionMetaStoreHandler handler = checkAndGetTransactionMetaStoreHandler(command.getTxnidMostBits());
+        if (handler != null) {
+            handler.handleAddSubscriptionToTxnResponse(command);
+        }
+    }
+
+    @Override
     protected void handleEndTxnOnPartitionResponse(PulsarApi.CommandEndTxnOnPartitionResponse command) {
         log.info("handleEndTxnOnPartitionResponse");
         TransactionBufferHandler handler = checkAndGetTransactionBufferHandler();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 57fc65b..d762c2b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -428,7 +428,7 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
                                                            TransactionImpl txn) {
         CompletableFuture<Void> ackFuture = doAcknowledge(messageIdList, ackType, properties, txn);
         if (txn != null) {
-            txn.registerAckedTopic(getTopic());
+            txn.registerAckedTopic(getTopic(), subscription);
             return txn.registerAckOp(ackFuture);
         } else {
             return ackFuture;
@@ -439,11 +439,11 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
                                                            Map<String,Long> properties,
                                                            TransactionImpl txn) {
         CompletableFuture<Void> ackFuture = doAcknowledge(messageId, ackType, properties, txn);
-        if (txn != null) {
+        if (txn != null && (this instanceof ConsumerImpl)) {
             // it is okay that we register acked topic after sending the acknowledgements. because
             // the transactional ack will not be visiable for consumers until the transaction is
             // committed
-            txn.registerAckedTopic(getTopic());
+            txn.registerAckedTopic(getTopic(), subscription);
             // register the ackFuture as part of the transaction
             return txn.registerAckOp(ackFuture);
         } else {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index cfaaa89..a327c4d 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -537,7 +537,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         }
 
         if (messageId instanceof BatchMessageIdImpl) {
-            if (markAckForBatchMessage((BatchMessageIdImpl) messageId, ackType, properties)) {
+            if (markAckForBatchMessage((BatchMessageIdImpl) messageId, ackType, properties) && txnImpl == null) {
                 // all messages in batch have been acked so broker can be acked via sendAcknowledge()
                 if (log.isDebugEnabled()) {
                     log.debug("[{}] [{}] acknowledging message - {}, acktype {}", subscription, consumerName, messageId,
@@ -546,8 +546,10 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
             } else {
                 if (conf.isBatchIndexAckEnabled()) {
                     BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId;
-                    acknowledgmentsGroupingTracker.addBatchIndexAcknowledgment(batchMessageId, batchMessageId.getBatchIndex(),
-                            batchMessageId.getBatchSize(), ackType, properties);
+                    acknowledgmentsGroupingTracker.addBatchIndexAcknowledgment(batchMessageId,
+                            batchMessageId.getBatchIndex(), batchMessageId.getBatchSize(), ackType, properties,
+                            txnImpl == null ? -1 : txnImpl.getTxnIdMostBits(),
+                            txnImpl == null ? -1 : txnImpl.getTxnIdLeastBits());
                 }
                 // other messages in batch are still pending ack.
                 return CompletableFuture.completedFuture(null);
@@ -578,7 +580,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
                 messageIdImpl = new MessageIdImpl(batchMessageId.getLedgerId(), batchMessageId.getEntryId()
                         , batchMessageId.getPartitionIndex());
                 acknowledgmentsGroupingTracker.addBatchIndexAcknowledgment(batchMessageId, batchMessageId.getBatchIndex(),
-                        batchMessageId.getBatchSize(), ackType, properties);
+                        batchMessageId.getBatchSize(), ackType, properties,
+                        txn == null ? -1 : txn.getTxnIdMostBits(),
+                        txn == null ? -1 : txn.getTxnIdLeastBits());
                 stats.incrementNumAcksSent(batchMessageId.getBatchSize());
             } else {
                 messageIdImpl = (MessageIdImpl) messageId;
@@ -751,7 +755,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
             stats.incrementNumAcksSent(unAckedMessageTracker.removeMessagesTill(msgId));
         }
 
-        acknowledgmentsGroupingTracker.addAcknowledgment(msgId, ackType, properties);
+        acknowledgmentsGroupingTracker.addAcknowledgment(msgId, ackType, properties,
+                txnImpl == null ? -1 : txnImpl.getTxnIdMostBits(),
+                txnImpl == null ? -1 : txnImpl.getTxnIdLeastBits());
 
         // Consumer acknowledgment operation immediately succeeds. In any case, if we're not able to send ack to broker,
         // the messages will be re-delivered
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NonPersistentAcknowledgmentGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NonPersistentAcknowledgmentGroupingTracker.java
index d70c7ce..425204d 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NonPersistentAcknowledgmentGroupingTracker.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NonPersistentAcknowledgmentGroupingTracker.java
@@ -42,7 +42,8 @@ public class NonPersistentAcknowledgmentGroupingTracker implements Acknowledgmen
     }
 
     @Override
-    public void addAcknowledgment(MessageIdImpl msgId, AckType ackType, Map<String, Long> properties) {
+    public void addAcknowledgment(MessageIdImpl msgId, AckType ackType, Map<String, Long> properties,
+                                  long txnidMostBits, long txnidLeastBits) {
         // no-op
     }
 
@@ -52,7 +53,8 @@ public class NonPersistentAcknowledgmentGroupingTracker implements Acknowledgmen
     }
 
     @Override
-    public void addBatchIndexAcknowledgment(BatchMessageIdImpl msgId, int batchIndex, int BatchSize, AckType ackType, Map<String, Long> properties) {
+    public void addBatchIndexAcknowledgment(BatchMessageIdImpl msgId, int batchIndex, int BatchSize, AckType ackType,
+                                            Map<String, Long> properties, long txnidMostSets, long txnidLeastSets) {
         // no-op
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
index fd61c42..b226d40 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
@@ -134,11 +134,14 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
     }
 
     @Override
-    public void addAcknowledgment(MessageIdImpl msgId, AckType ackType, Map<String, Long> properties) {
-        if (acknowledgementGroupTimeMicros == 0 || !properties.isEmpty()) {
+    public void addAcknowledgment(MessageIdImpl msgId, AckType ackType, Map<String, Long> properties,
+                                  long txnidMostBits, long txnidLeastBits) {
+        if (txnidMostBits != -1 && txnidLeastBits != -1) {
+            doImmediateAck(msgId, ackType, properties, txnidMostBits, txnidLeastBits);
+        } else if (acknowledgementGroupTimeMicros == 0 || !properties.isEmpty()) {
             // We cannot group acks if the delay is 0 or when there are properties attached to it. Fortunately that's an
             // uncommon condition since it's only used for the compaction subscription.
-            doImmediateAck(msgId, ackType, properties);
+            doImmediateAck(msgId, ackType, properties, -1, -1);
         } else if (ackType == AckType.Cumulative) {
             doCumulativeAck(msgId, null);
         } else {
@@ -157,9 +160,12 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
     }
 
     @Override
-    public void addBatchIndexAcknowledgment(BatchMessageIdImpl msgId, int batchIndex, int batchSize, AckType ackType, Map<String, Long> properties) {
-        if (acknowledgementGroupTimeMicros == 0 || !properties.isEmpty()) {
-            doImmediateBatchIndexAck(msgId, batchIndex, batchSize, ackType, properties);
+    public void addBatchIndexAcknowledgment(BatchMessageIdImpl msgId, int batchIndex, int batchSize, AckType ackType,
+                                            Map<String, Long> properties, long txnidMostBits, long txnidLeastBits) {
+        if (txnidMostBits != -1 && txnidLeastBits != -1) {
+            doImmediateBatchIndexAck(msgId, batchIndex, batchSize, ackType, properties, txnidMostBits, txnidLeastBits);
+        } else if (acknowledgementGroupTimeMicros == 0 || !properties.isEmpty()) {
+            doImmediateBatchIndexAck(msgId, batchIndex, batchSize, ackType, properties, -1, -1);
         } else if (ackType == AckType.Cumulative) {
             BitSetRecyclable bitSet = BitSetRecyclable.create();
             bitSet.set(0, batchSize);
@@ -209,18 +215,21 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
         }
     }
 
-    private boolean doImmediateAck(MessageIdImpl msgId, AckType ackType, Map<String, Long> properties) {
+    private boolean doImmediateAck(MessageIdImpl msgId, AckType ackType, Map<String, Long> properties,
+                                   long txnidMostBits, long txnidLeastBits) {
         ClientCnx cnx = consumer.getClientCnx();
 
         if (cnx == null) {
             return false;
         }
 
-        newAckCommand(consumer.consumerId, msgId, null, ackType, null, properties, cnx, true /* flush */);
+        newAckCommand(consumer.consumerId, msgId, null, ackType, null,
+                properties, cnx, true /* flush */, txnidMostBits, txnidLeastBits);
         return true;
     }
 
-    private boolean doImmediateBatchIndexAck(BatchMessageIdImpl msgId, int batchIndex, int batchSize, AckType ackType, Map<String, Long> properties) {
+    private boolean doImmediateBatchIndexAck(BatchMessageIdImpl msgId, int batchIndex, int batchSize, AckType ackType,
+                                             Map<String, Long> properties, long txnidMostBits, long txnidLeastBits) {
         ClientCnx cnx = consumer.getClientCnx();
 
         if (cnx == null) {
@@ -239,7 +248,8 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
             bitSet.clear(batchIndex);
         }
 
-        final ByteBuf cmd = Commands.newAck(consumer.consumerId, msgId.ledgerId, msgId.entryId, bitSet, ackType, null, properties);
+        final ByteBuf cmd = Commands.newAck(consumer.consumerId, msgId.ledgerId, msgId.entryId, bitSet, ackType,
+                null, properties, txnidLeastBits, txnidMostBits);
         bitSet.recycle();
         cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
         return true;
@@ -261,8 +271,7 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
 
         boolean shouldFlush = false;
         if (cumulativeAckFlushRequired) {
-            newAckCommand(consumer.consumerId, lastCumulativeAck, lastCumulativeAckSet, AckType.Cumulative, null, Collections.emptyMap(), cnx,
-                    false /* flush */);
+            newAckCommand(consumer.consumerId, lastCumulativeAck, lastCumulativeAckSet, AckType.Cumulative, null, Collections.emptyMap(), cnx, false /* flush */, -1, -1);
             shouldFlush=true;
             cumulativeAckFlushRequired = false;
         }
@@ -301,8 +310,7 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
                         break;
                     }
 
-                    newAckCommand(consumer.consumerId, msgId, null, AckType.Individual, null, Collections.emptyMap(),
-                            cnx, false);
+                    newAckCommand(consumer.consumerId, msgId, null, AckType.Individual, null, Collections.emptyMap(), cnx, false, -1, -1);
                     shouldFlush = true;
                 }
             }
@@ -349,11 +357,13 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
     }
 
     private void newAckCommand(long consumerId, MessageIdImpl msgId, BitSetRecyclable lastCumulativeAckSet,
-            AckType ackType, ValidationError validationError, Map<String, Long> map, ClientCnx cnx, boolean flush) {
+            AckType ackType, ValidationError validationError, Map<String, Long> map, ClientCnx cnx,
+                               boolean flush, long txnidMostBits, long txnidLeastBits) {
 
         MessageIdImpl[] chunkMsgIds = this.consumer.unAckedChunckedMessageIdSequenceMap.get(msgId);
-        if (chunkMsgIds != null) {
-            if (Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion()) && ackType != AckType.Cumulative) {
+        if (chunkMsgIds != null && txnidLeastBits < 0 && txnidMostBits < 0) {
+            if (Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion())
+                    && ackType != AckType.Cumulative) {
                 List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entriesToAck = new ArrayList<>(chunkMsgIds.length);
                 for (MessageIdImpl cMsgId : chunkMsgIds) {
                     if (cMsgId != null && chunkMsgIds.length > 1) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
index 37d85c8..a95c937 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
@@ -180,6 +180,43 @@ public class TransactionMetaStoreHandler extends HandlerState implements Connect
         onResponse(op);
     }
 
+    public CompletableFuture<Void> addSubscriptionToTxn(TxnID txnID, List<PulsarApi.Subscription> subscriptionList) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Add subscription {} to txn {}.", subscriptionList, txnID);
+        }
+        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+        long requestId = client.newRequestId();
+        ByteBuf cmd = Commands.newAddSubscriptionToTxn(
+                requestId, txnID.getLeastSigBits(), txnID.getMostSigBits(), subscriptionList);
+        OpForVoidCallBack op = OpForVoidCallBack.create(cmd, completableFuture);
+        pendingRequests.put(requestId, op);
+        timeoutQueue.add(new RequestTime(System.currentTimeMillis(), requestId));
+        cmd.retain();
+        cnx().ctx().writeAndFlush(cmd, cnx().ctx().voidPromise());
+        return completableFuture;
+    }
+
+    public void handleAddSubscriptionToTxnResponse(PulsarApi.CommandAddSubscriptionToTxnResponse response) {
+        OpForVoidCallBack op = (OpForVoidCallBack) pendingRequests.remove(response.getRequestId());
+        if (op == null) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Add subscription to txn timeout for request {}.", response.getRequestId());
+            }
+            return;
+        }
+        if (!response.hasError()) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Add subscription to txn success for request {}.", response.getRequestId());
+            }
+            op.callback.complete(null);
+        } else {
+            LOG.error("Add subscription to txn failed for request {} error {}.",
+                    response.getRequestId(), response.getError());
+            op.callback.completeExceptionally(getExceptionByServerError(response.getError(), response.getMessage()));
+        }
+        onResponse(op);
+    }
+
     public CompletableFuture<Void> commitAsync(TxnID txnID) {
         if (LOG.isDebugEnabled()) {
             LOG.debug("Commit txn {}", txnID);
@@ -388,7 +425,7 @@ public class TransactionMetaStoreHandler extends HandlerState implements Connect
             RequestTime lastPolled = timeoutQueue.poll();
             if (lastPolled != null) {
                 OpBase<?> op = pendingRequests.remove(lastPolled.requestId);
-                if (!op.callback.isDone()) {
+                if (op != null && !op.callback.isDone()) {
                     op.callback.completeExceptionally(new PulsarClientException.TimeoutException(
                             "Could not get response from transaction meta store within given timeout."));
                     if (LOG.isDebugEnabled()) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java
index 7ead17d..9d79bb2 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java
@@ -26,6 +26,7 @@ import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.TransactionMetaStoreHandler;
 import org.apache.pulsar.client.util.MathUtils;
+import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
@@ -33,6 +34,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
@@ -177,6 +179,30 @@ public class TransactionCoordinatorClientImpl implements TransactionCoordinatorC
     }
 
     @Override
+    public void addSubscriptionToTxn(TxnID txnID, String topic, String subscription)
+            throws TransactionCoordinatorClientException {
+        try {
+            addSubscriptionToTxnAsync(txnID, topic, subscription).get();
+        } catch (Exception e) {
+            throw TransactionCoordinatorClientException.unwrap(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> addSubscriptionToTxnAsync(TxnID txnID, String topic, String subscription) {
+        TransactionMetaStoreHandler handler = handlerMap.get(txnID.getMostSigBits());
+        if (handler == null) {
+            return FutureUtil.failedFuture(
+                    new TransactionCoordinatorClientException.MetaStoreHandlerNotExistsException(txnID.getMostSigBits()));
+        }
+        PulsarApi.Subscription sub = PulsarApi.Subscription.newBuilder()
+                .setTopic(topic)
+                .setSubscription(subscription)
+                .build();
+        return handler.addSubscriptionToTxn(txnID, Collections.singletonList(sub));
+    }
+
+    @Override
     public void commit(TxnID txnID) throws TransactionCoordinatorClientException {
         try {
             commitAsync(txnID).get();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
index 38530b7..7c04357 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
@@ -92,7 +92,7 @@ public class TransactionImpl implements Transaction {
     // register the topics that will be modified by this transaction
     public synchronized void registerProducedTopic(String topic) {
         if (producedTopics.add(topic)) {
-            // TODO: we need to issue the request to TC to register the produced topic
+            // we need to issue the request to TC to register the produced topic
             tcClient.addPublishPartitionToTxnAsync(new TxnID(txnIdMostBits, txnIdLeastBits), Lists.newArrayList(topic));
         }
     }
@@ -109,9 +109,10 @@ public class TransactionImpl implements Transaction {
     }
 
     // register the topics that will be modified by this transaction
-    public synchronized void registerAckedTopic(String topic) {
+    public synchronized void registerAckedTopic(String topic, String subscription) {
         if (ackedTopics.add(topic)) {
-            // TODO: we need to issue the request to TC to register the acked topic
+            // we need to issue the request to TC to register the acked topic
+            tcClient.addSubscriptionToTxnAsync(new TxnID(txnIdMostBits, txnIdLeastBits), topic, subscription);
         }
     }
 
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
index 1eb5b42..151f2ae 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
@@ -74,12 +74,12 @@ public class AcknowledgementsGroupingTrackerTest {
 
         assertFalse(tracker.isDuplicate(msg1));
 
-        tracker.addAcknowledgment(msg1, AckType.Individual, Collections.emptyMap());
+        tracker.addAcknowledgment(msg1, AckType.Individual, Collections.emptyMap(), -1, -1);
         assertTrue(tracker.isDuplicate(msg1));
 
         assertFalse(tracker.isDuplicate(msg2));
 
-        tracker.addAcknowledgment(msg5, AckType.Cumulative, Collections.emptyMap());
+        tracker.addAcknowledgment(msg5, AckType.Cumulative, Collections.emptyMap(), -1, -1);
         assertTrue(tracker.isDuplicate(msg1));
         assertTrue(tracker.isDuplicate(msg2));
         assertTrue(tracker.isDuplicate(msg3));
@@ -99,7 +99,7 @@ public class AcknowledgementsGroupingTrackerTest {
         assertTrue(tracker.isDuplicate(msg5));
         assertFalse(tracker.isDuplicate(msg6));
 
-        tracker.addAcknowledgment(msg6, AckType.Individual, Collections.emptyMap());
+        tracker.addAcknowledgment(msg6, AckType.Individual, Collections.emptyMap(), -1, -1);
         assertTrue(tracker.isDuplicate(msg6));
 
         when(consumer.getClientCnx()).thenReturn(cnx);
@@ -188,7 +188,7 @@ public class AcknowledgementsGroupingTrackerTest {
 
         when(consumer.getClientCnx()).thenReturn(null);
 
-        tracker.addAcknowledgment(msg1, AckType.Individual, Collections.emptyMap());
+        tracker.addAcknowledgment(msg1, AckType.Individual, Collections.emptyMap(), -1, -1);
         assertFalse(tracker.isDuplicate(msg1));
 
         when(consumer.getClientCnx()).thenReturn(cnx);
@@ -196,7 +196,7 @@ public class AcknowledgementsGroupingTrackerTest {
         tracker.flush();
         assertFalse(tracker.isDuplicate(msg1));
 
-        tracker.addAcknowledgment(msg2, AckType.Individual, Collections.emptyMap());
+        tracker.addAcknowledgment(msg2, AckType.Individual, Collections.emptyMap(), -1, -1);
         // Since we were connected, the ack went out immediately
         assertFalse(tracker.isDuplicate(msg2));
         tracker.close();
@@ -248,12 +248,12 @@ public class AcknowledgementsGroupingTrackerTest {
 
         assertFalse(tracker.isDuplicate(msg1));
 
-        tracker.addAcknowledgment(msg1, AckType.Individual, Collections.emptyMap());
+        tracker.addAcknowledgment(msg1, AckType.Individual, Collections.emptyMap(), -1, -1);
         assertTrue(tracker.isDuplicate(msg1));
 
         assertFalse(tracker.isDuplicate(msg2));
 
-        tracker.addAcknowledgment(msg5, AckType.Cumulative, Collections.emptyMap());
+        tracker.addAcknowledgment(msg5, AckType.Cumulative, Collections.emptyMap(), -1, -1);
         assertTrue(tracker.isDuplicate(msg1));
         assertTrue(tracker.isDuplicate(msg2));
         assertTrue(tracker.isDuplicate(msg3));
@@ -273,7 +273,7 @@ public class AcknowledgementsGroupingTrackerTest {
         assertTrue(tracker.isDuplicate(msg5));
         assertFalse(tracker.isDuplicate(msg6));
 
-        tracker.addAcknowledgment(msg6, AckType.Individual, Collections.emptyMap());
+        tracker.addAcknowledgment(msg6, AckType.Individual, Collections.emptyMap(), -1, -1);
         assertTrue(tracker.isDuplicate(msg6));
 
         when(consumer.getClientCnx()).thenReturn(cnx);
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index b33ff6d..7838067 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -932,7 +932,7 @@ public class Commands {
 
     public static ByteBuf newAck(long consumerId, long ledgerId, long entryId, BitSetRecyclable ackSet, AckType ackType,
                                  ValidationError validationError, Map<String, Long> properties) {
-        return newAck(consumerId, ledgerId, entryId, ackSet, ackType, validationError, properties, 0, 0);
+        return newAck(consumerId, ledgerId, entryId, ackSet, ackType, validationError, properties, -1, -1);
     }
 
     public static ByteBuf newAck(long consumerId, long ledgerId, long entryId, BitSetRecyclable ackSet, AckType ackType,
@@ -952,10 +952,10 @@ public class Commands {
         if (validationError != null) {
             ackBuilder.setValidationError(validationError);
         }
-        if (txnIdMostBits > 0) {
+        if (txnIdMostBits >= 0) {
             ackBuilder.setTxnidMostBits(txnIdMostBits);
         }
-        if (txnIdLeastBits > 0) {
+        if (txnIdLeastBits >= 0) {
             ackBuilder.setTxnidLeastBits(txnIdLeastBits);
         }
         for (Map.Entry<String, Long> e : properties.entrySet()) {
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStore.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStore.java
index 357aa9b..8d4b69f 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStore.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStore.java
@@ -79,7 +79,7 @@ public interface TransactionMetadataStore {
      * @return a future represents the result of the operation
      */
     CompletableFuture<Void> addAckedPartitionToTxn(
-        TxnID txnid, List<String> partitions);
+        TxnID txnid, List<TransactionSubscription> partitions);
 
     /**
      * Update the transaction from <tt>expectedStatus</tt> to <tt>newStatus</tt>.
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionSubscription.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionSubscription.java
new file mode 100644
index 0000000..bcda38e
--- /dev/null
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionSubscription.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator;
+
+import com.google.common.base.Objects;
+import lombok.Builder;
+import lombok.Data;
+
+/**
+ * A class for representing acked topic subscription info.
+ */
+@Data
+@Builder
+public class TransactionSubscription implements Comparable<TransactionSubscription> {
+
+    private String topic;
+    private String subscription;
+
+    @Override
+    public int hashCode() {
+        return Objects.hashCode(topic, subscription);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        TransactionSubscription that = (TransactionSubscription) o;
+        return topic.equals(that.topic)
+                && subscription.equals(that.subscription);
+    }
+
+    @Override
+    public int compareTo(TransactionSubscription o) {
+        int topicCompare = topic.compareTo(o.topic);
+        if (topicCompare == 0) {
+            return subscription.compareTo(o.subscription);
+        }
+        return topicCompare;
+    }
+}
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TxnMeta.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TxnMeta.java
index 39d3bc7..cbc8dc9 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TxnMeta.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TxnMeta.java
@@ -59,7 +59,7 @@ public interface TxnMeta {
      * @return the list of partitions that this transaction acknowledges to.
      *         the returned list is sorted by partition name.
      */
-    List<String> ackedPartitions();
+    List<TransactionSubscription> ackedPartitions();
 
     /**
      * Add the list of produced partitions to the transaction.
@@ -78,7 +78,7 @@ public interface TxnMeta {
      * @throws InvalidTxnStatusException if the transaction is not in
      *         {@link TxnStatus#OPEN}
      */
-    TxnMeta addAckedPartitions(List<String> partitions)
+    TxnMeta addAckedPartitions(List<TransactionSubscription> partitions)
         throws InvalidTxnStatusException;
 
     /**
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java
index e72cbf5..15bcd41 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java
@@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
 import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
+import org.apache.pulsar.transaction.coordinator.TransactionSubscription;
 import org.apache.pulsar.transaction.coordinator.TxnMeta;
 import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.InvalidTxnStatusException;
 import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.TransactionNotFoundException;
@@ -86,7 +87,7 @@ class InMemTransactionMetadataStore implements TransactionMetadataStore {
     }
 
     @Override
-    public CompletableFuture<Void> addAckedPartitionToTxn(TxnID txnid, List<String> partitions) {
+    public CompletableFuture<Void> addAckedPartitionToTxn(TxnID txnid, List<TransactionSubscription> partitions) {
         return getTxnMeta(txnid).thenCompose(txn -> {
             try {
                 txn.addAckedPartitions(partitions);
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnMetaImpl.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnMetaImpl.java
index ec9c021..0b1ae74 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnMetaImpl.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnMetaImpl.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Set;
 
 import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.transaction.coordinator.TransactionSubscription;
 import org.apache.pulsar.transaction.coordinator.TxnMeta;
 import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.InvalidTxnStatusException;
 import org.apache.pulsar.transaction.impl.common.TxnStatus;
@@ -37,7 +38,7 @@ class TxnMetaImpl implements TxnMeta {
 
     private final TxnID txnID;
     private final Set<String> producedPartitions = new HashSet<>();
-    private final Set<String> ackedPartitions = new HashSet<>();
+    private final Set<TransactionSubscription> ackedPartitions = new HashSet<>();
     private TxnStatus txnStatus;
 
     TxnMetaImpl(TxnID txnID) {
@@ -72,8 +73,8 @@ class TxnMetaImpl implements TxnMeta {
     }
 
     @Override
-    public List<String> ackedPartitions() {
-        List<String> returnedPartitions;
+    public List<TransactionSubscription> ackedPartitions() {
+        List<TransactionSubscription> returnedPartitions;
         synchronized (this) {
             returnedPartitions = new ArrayList<>(ackedPartitions.size());
             returnedPartitions.addAll(ackedPartitions);
@@ -118,9 +119,9 @@ class TxnMetaImpl implements TxnMeta {
      * @throws InvalidTxnStatusException
      */
     @Override
-    public synchronized TxnMetaImpl addAckedPartitions(List<String> partitions) throws InvalidTxnStatusException {
+    public synchronized TxnMetaImpl addAckedPartitions(List<TransactionSubscription> partitions)
+            throws InvalidTxnStatusException {
         checkTxnStatus(TxnStatus.OPEN);
-
         this.ackedPartitions.addAll(partitions);
         return this;
     }
diff --git a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java
index c5ae97c..ff4bd31 100644
--- a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java
+++ b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java
@@ -195,10 +195,14 @@ public class TransactionMetadataStoreProviderTest {
         TxnStatus txnStatus = this.store.getTxnStatus(txnID).get();
         assertEquals(txnStatus, TxnStatus.OPEN);
 
-        List<String> partitions = new ArrayList<>();
-        partitions.add("ptn-0");
-        partitions.add("ptn-1");
-        partitions.add("ptn-2");
+        String topicPartition1 = "persistent://public/default/txn-ack-partition-0";
+        String topicPartition2 = "persistent://public/default/txn-ack-partition-1";
+        String topicPartition3 = "persistent://public/default/txn-ack-partition-2";
+        List<TransactionSubscription> partitions = new ArrayList<>();
+        partitions.add(TransactionSubscription.builder().topic(topicPartition1).subscription("sub-0").build());
+        partitions.add(TransactionSubscription.builder().topic(topicPartition1).subscription("sub-1").build());
+        partitions.add(TransactionSubscription.builder().topic(topicPartition2).subscription("sub-2").build());
+        partitions.add(TransactionSubscription.builder().topic(topicPartition3).subscription("sub-4").build());
 
         // add the list of partitions to the transaction
         this.store.addAckedPartitionToTxn(txnID, partitions).get();
@@ -208,29 +212,31 @@ public class TransactionMetadataStoreProviderTest {
         assertEquals(txn.ackedPartitions(), partitions);
 
         // add another list of partition. duplicated partitions should be removed
-        List<String> newPartitions = new ArrayList<>();
-        newPartitions.add("ptn-2");
-        newPartitions.add("ptn-3");
-        newPartitions.add("ptn-4");
+        List<TransactionSubscription> newPartitions = new ArrayList<>();
+        newPartitions.add(TransactionSubscription.builder().topic(topicPartition1).subscription("sub-0").build());
+        newPartitions.add(TransactionSubscription.builder().topic(topicPartition1).subscription("sub-1").build());
+        newPartitions.add(TransactionSubscription.builder().topic(topicPartition2).subscription("sub-5").build());
+        newPartitions.add(TransactionSubscription.builder().topic(topicPartition3).subscription("sub-6").build());
         this.store.addAckedPartitionToTxn(txnID, newPartitions);
 
         txn = this.store.getTxnMeta(txnID).get();
         assertEquals(txn.status(), TxnStatus.OPEN);
-        List<String> finalPartitions = new ArrayList<>();
-        finalPartitions.add("ptn-0");
-        finalPartitions.add("ptn-1");
-        finalPartitions.add("ptn-2");
-        finalPartitions.add("ptn-3");
-        finalPartitions.add("ptn-4");
+        List<TransactionSubscription> finalPartitions = new ArrayList<>();
+        finalPartitions.add(TransactionSubscription.builder().topic(topicPartition1).subscription("sub-0").build());
+        finalPartitions.add(TransactionSubscription.builder().topic(topicPartition1).subscription("sub-1").build());
+        finalPartitions.add(TransactionSubscription.builder().topic(topicPartition2).subscription("sub-2").build());
+        finalPartitions.add(TransactionSubscription.builder().topic(topicPartition2).subscription("sub-5").build());
+        finalPartitions.add(TransactionSubscription.builder().topic(topicPartition3).subscription("sub-4").build());
+        finalPartitions.add(TransactionSubscription.builder().topic(topicPartition3).subscription("sub-6").build());
         assertEquals(txn.ackedPartitions(), finalPartitions);
 
         // change the transaction to `COMMITTING`
         this.store.updateTxnStatus(txnID, TxnStatus.COMMITTING, TxnStatus.OPEN).get();
 
         // add partitions should fail if it is already committing.
-        List<String> newPartitions2 = new ArrayList<>();
-        newPartitions2.add("ptn-5");
-        newPartitions2.add("ptn-6");
+        List<TransactionSubscription> newPartitions2 = new ArrayList<>();
+        newPartitions2.add(TransactionSubscription.builder().topic(topicPartition2).subscription("sub-7").build());
+        newPartitions2.add(TransactionSubscription.builder().topic(topicPartition3).subscription("sub-8").build());
         try {
             this.store.addAckedPartitionToTxn(txnID, newPartitions2).get();
             fail("Should fail to add acked partitions if the transaction is not in OPEN status");