You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/08/12 01:19:20 UTC

[pulsar] branch master updated: [monitor][txn] Add metrics for transaction (#15140)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b7b2e37f017 [monitor][txn] Add metrics for transaction (#15140)
b7b2e37f017 is described below

commit b7b2e37f017ea044f7a1f7cbfc36779ed965655d
Author: Tao Jiuming <95...@users.noreply.github.com>
AuthorDate: Fri Aug 12 09:19:09 2022 +0800

    [monitor][txn] Add metrics for transaction (#15140)
---
 .../broker/service/persistent/PersistentTopic.java |   4 +
 .../stats/prometheus/AggregatedNamespaceStats.java |   8 ++
 .../stats/prometheus/NamespaceStatsAggregator.java |   6 +
 .../pulsar/broker/stats/prometheus/TopicStats.java |  15 +++
 .../transaction/buffer/TransactionBuffer.java      |   8 ++
 .../buffer/TransactionBufferClientStats.java       |  70 +++++++++++
 .../buffer/impl/InMemTransactionBuffer.java        |  24 ++++
 .../buffer/impl/TopicTransactionBuffer.java        |  21 ++++
 .../buffer/impl/TransactionBufferClientImpl.java   |  55 ++++++++-
 .../impl/TransactionBufferClientStatsImpl.java     | 130 +++++++++++++++++++++
 .../buffer/impl/TransactionBufferDisable.java      |  15 +++
 .../pendingack/PendingAckHandleStats.java          |  34 ++++++
 .../pendingack/impl/PendingAckHandleImpl.java      |  15 ++-
 .../pendingack/impl/PendingAckHandleStatsImpl.java | 117 +++++++++++++++++++
 .../pulsar/broker/transaction/TransactionTest.java |  41 +++++++
 .../buffer/TransactionBufferClientTest.java        |  55 +++++++++
 .../pendingack/PendingAckPersistentTest.java       |  78 ++++++++++++-
 .../common/policies/data/stats/TopicStatsImpl.java |   7 ++
 18 files changed, 694 insertions(+), 9 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 6673bedce23..f166121a88d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1873,6 +1873,10 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         stats.bytesOutCounter = bytesOutFromRemovedSubscriptions.longValue();
         stats.msgOutCounter = msgOutFromRemovedSubscriptions.longValue();
         stats.publishRateLimitedTimes = publishRateLimitedTimes;
+        TransactionBuffer txnBuffer = getTransactionBuffer();
+        stats.ongoingTxnCount = txnBuffer.getOngoingTxnCount();
+        stats.abortedTxnCount = txnBuffer.getAbortedTxnCount();
+        stats.committedTxnCount = txnBuffer.getCommittedTxnCount();
 
         subscriptions.forEach((name, subscription) -> {
             SubscriptionStatsImpl subStats =
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
index 761094ac0e6..e9811b8079b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
@@ -43,6 +43,10 @@ public class AggregatedNamespaceStats {
     public long msgBacklog;
     public long msgDelayed;
 
+    public long ongoingTxnCount;
+    public long abortedTxnCount;
+    public long committedTxnCount;
+
     long backlogQuotaLimit;
     long backlogQuotaLimitTime;
 
@@ -79,6 +83,10 @@ public class AggregatedNamespaceStats {
         msgOutCounter += stats.msgOutCounter;
         delayedTrackerMemoryUsage += stats.delayedTrackerMemoryUsage;
 
+        this.ongoingTxnCount += stats.ongoingTxnCount;
+        this.abortedTxnCount += stats.abortedTxnCount;
+        this.committedTxnCount += stats.committedTxnCount;
+
         managedLedgerStats.storageSize += stats.managedLedgerStats.storageSize;
         managedLedgerStats.storageLogicalSize += stats.managedLedgerStats.storageLogicalSize;
         managedLedgerStats.backlogSize += stats.managedLedgerStats.backlogSize;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
index f444ad0542e..0a885ef7356 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
@@ -179,6 +179,9 @@ public class NamespaceStatsAggregator {
         stats.averageMsgSize = tStatus.averageMsgSize;
         stats.publishRateLimitedTimes = tStatus.publishRateLimitedTimes;
         stats.delayedTrackerMemoryUsage = tStatus.delayedMessageIndexSizeInBytes;
+        stats.abortedTxnCount = tStatus.abortedTxnCount;
+        stats.ongoingTxnCount = tStatus.ongoingTxnCount;
+        stats.committedTxnCount = tStatus.committedTxnCount;
 
         stats.producersCount = 0;
         topic.getProducers().values().forEach(producer -> {
@@ -325,6 +328,9 @@ public class NamespaceStatsAggregator {
         metric(stream, cluster, namespace, "pulsar_rate_out", stats.rateOut);
         metric(stream, cluster, namespace, "pulsar_throughput_in", stats.throughputIn);
         metric(stream, cluster, namespace, "pulsar_throughput_out", stats.throughputOut);
+        metric(stream, cluster, namespace, "pulsar_txn_tb_active_total", stats.ongoingTxnCount);
+        metric(stream, cluster, namespace, "pulsar_txn_tb_aborted_total", stats.abortedTxnCount);
+        metric(stream, cluster, namespace, "pulsar_txn_tb_committed_total", stats.committedTxnCount);
         metric(stream, cluster, namespace, "pulsar_consumer_msg_ack_rate", stats.messageAckRate);
 
         metric(stream, cluster, namespace, "pulsar_in_bytes_total", stats.bytesInCounter);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
index 99838ccfae9..9e857271eab 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
@@ -42,6 +42,10 @@ class TopicStats {
     long bytesOutCounter;
     double averageMsgSize;
 
+    long ongoingTxnCount;
+    long abortedTxnCount;
+    long committedTxnCount;
+
     public long msgBacklog;
     long publishRateLimitedTimes;
 
@@ -82,6 +86,10 @@ class TopicStats {
         bytesOutCounter = 0;
         msgOutCounter = 0;
 
+        ongoingTxnCount = 0;
+        abortedTxnCount = 0;
+        committedTxnCount = 0;
+
         managedLedgerStats.reset();
         msgBacklog = 0;
         publishRateLimitedTimes = 0L;
@@ -129,6 +137,13 @@ class TopicStats {
         metric(stream, cluster, namespace, topic, "pulsar_average_msg_size", stats.averageMsgSize,
                 splitTopicAndPartitionIndexLabel);
 
+        metric(stream, cluster, namespace, topic, "pulsar_txn_tb_active_total", stats.ongoingTxnCount,
+                splitTopicAndPartitionIndexLabel);
+        metric(stream, cluster, namespace, topic, "pulsar_txn_tb_aborted_total", stats.abortedTxnCount,
+                splitTopicAndPartitionIndexLabel);
+        metric(stream, cluster, namespace, topic, "pulsar_txn_tb_committed_total", stats.committedTxnCount,
+                splitTopicAndPartitionIndexLabel);
+
         metric(stream, cluster, namespace, topic, "pulsar_storage_size", stats.managedLedgerStats.storageSize,
                 splitTopicAndPartitionIndexLabel);
         metric(stream, cluster, namespace, topic, "pulsar_storage_logical_size",
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java
index d9ab04dbe9c..ab1270ef0e6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java
@@ -176,4 +176,12 @@ public interface TransactionBuffer {
      * @return a future which has completely if isTxn = false. Or a future return by takeSnapshot.
      */
     CompletableFuture<Void> checkIfTBRecoverCompletely(boolean isTxn);
+
+
+
+    long getOngoingTxnCount();
+
+    long getAbortedTxnCount();
+
+    long getCommittedTxnCount();
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientStats.java
new file mode 100644
index 00000000000..5c91b4e13a2
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientStats.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer;
+
+import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientStatsImpl;
+import org.apache.pulsar.client.impl.transaction.TransactionBufferHandler;
+
+public interface TransactionBufferClientStats {
+
+    void recordAbortFailed(String topic);
+
+    void recordCommitFailed(String topic);
+
+    void recordAbortLatency(String topic, long nanos);
+
+    void recordCommitLatency(String topic, long nanos);
+
+    void close();
+
+
+    static TransactionBufferClientStats create(boolean exposeTopicMetrics, TransactionBufferHandler handler,
+                                               boolean enableTxnCoordinator) {
+        return enableTxnCoordinator
+                ? TransactionBufferClientStatsImpl.getInstance(exposeTopicMetrics, handler) : NOOP;
+    }
+
+
+    TransactionBufferClientStats NOOP = new TransactionBufferClientStats() {
+        @Override
+        public void recordAbortFailed(String topic) {
+
+        }
+
+        @Override
+        public void recordCommitFailed(String topic) {
+
+        }
+
+        @Override
+        public void recordAbortLatency(String topic, long nanos) {
+
+        }
+
+        @Override
+        public void recordCommitLatency(String topic, long nanos) {
+
+        }
+
+        @Override
+        public void close() {
+
+        }
+    };
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
index 8a818700f50..c4a9fc2b774 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
@@ -388,4 +388,28 @@ class InMemTransactionBuffer implements TransactionBuffer {
     public CompletableFuture<Void> checkIfTBRecoverCompletely(boolean isTxn) {
         return CompletableFuture.completedFuture(null);
     }
+
+    @Override
+    public long getOngoingTxnCount() {
+        return this.buffers.values().stream()
+                .filter(txnBuffer -> txnBuffer.status.equals(TxnStatus.OPEN)
+                        || txnBuffer.status.equals(TxnStatus.COMMITTING)
+                        || txnBuffer.status.equals(TxnStatus.ABORTING)
+                )
+                .count();
+    }
+
+    @Override
+    public long getAbortedTxnCount() {
+        return this.buffers.values().stream()
+                .filter(txnBuffer -> txnBuffer.status.equals(TxnStatus.ABORTED))
+                .count();
+    }
+
+    @Override
+    public long getCommittedTxnCount() {
+        return this.buffers.values().stream()
+                .filter(txnBuffer -> txnBuffer.status.equals(TxnStatus.COMMITTED))
+                .count();
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index 48aad16340c..ad778137001 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -30,6 +30,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAdder;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
@@ -89,6 +90,10 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
     // when add abort or change max read position, the count will +1. Take snapshot will set 0 into it.
     private final AtomicLong changeMaxReadPositionAndAddAbortTimes = new AtomicLong();
 
+    private final LongAdder txnCommittedCounter = new LongAdder();
+
+    private final LongAdder txnAbortedCounter = new LongAdder();
+
     private final Timer timer;
 
     private final int takeSnapshotIntervalNumber;
@@ -258,6 +263,20 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
         }
     }
 
+    @Override
+    public long getOngoingTxnCount() {
+        return this.ongoingTxns.size();
+    }
+
+    @Override
+    public long getAbortedTxnCount() {
+        return this.txnAbortedCounter.sum();
+    }
+
+    @Override
+    public long getCommittedTxnCount() {
+        return this.txnCommittedCounter.sum();
+    }
 
     @Override
     public CompletableFuture<Position> appendBufferToTxn(TxnID txnId, long sequenceId, ByteBuf buffer) {
@@ -322,6 +341,7 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
                             clearAbortedTransactions();
                             takeSnapshotByChangeTimes();
                         }
+                        txnCommittedCounter.increment();
                         completableFuture.complete(null);
                     }
 
@@ -368,6 +388,7 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
                             clearAbortedTransactions();
                             takeSnapshotByChangeTimes();
                         }
+                        txnAbortedCounter.increment();
                         completableFuture.complete(null);
                         handleLowWaterMark(txnID, lowWaterMark);
                     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferClientImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferClientImpl.java
index c531f9f1871..bc7c1db2e04 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferClientImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferClientImpl.java
@@ -23,6 +23,8 @@ import java.util.concurrent.CompletableFuture;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.transaction.buffer.TransactionBufferClientStats;
 import org.apache.pulsar.client.api.transaction.TransactionBufferClient;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.client.impl.transaction.TransactionBufferHandler;
@@ -35,47 +37,88 @@ import org.apache.pulsar.common.api.proto.TxnAction;
 public class TransactionBufferClientImpl implements TransactionBufferClient {
 
     private final TransactionBufferHandler tbHandler;
+    private final TransactionBufferClientStats stats;
 
-    private TransactionBufferClientImpl(TransactionBufferHandler tbHandler) {
+    private TransactionBufferClientImpl(TransactionBufferHandler tbHandler, boolean exposeTopicLevelMetrics,
+                                        boolean enableTxnCoordinator) {
         this.tbHandler = tbHandler;
+        this.stats = TransactionBufferClientStats.create(exposeTopicLevelMetrics, tbHandler, enableTxnCoordinator);
     }
 
     public static TransactionBufferClient create(PulsarService pulsarService, HashedWheelTimer timer,
         int maxConcurrentRequests, long operationTimeoutInMills) throws PulsarServerException {
         TransactionBufferHandler handler = new TransactionBufferHandlerImpl(pulsarService, timer,
                 maxConcurrentRequests, operationTimeoutInMills);
-        return new TransactionBufferClientImpl(handler);
+
+        ServiceConfiguration config = pulsarService.getConfig();
+        boolean exposeTopicLevelMetrics = config.isExposeTopicLevelMetricsInPrometheus();
+        boolean enableTxnCoordinator = config.isTransactionCoordinatorEnabled();
+        return new TransactionBufferClientImpl(handler, exposeTopicLevelMetrics, enableTxnCoordinator);
     }
 
     @Override
     public CompletableFuture<TxnID> commitTxnOnTopic(String topic, long txnIdMostBits,
                                                      long txnIdLeastBits, long lowWaterMark) {
-        return tbHandler.endTxnOnTopic(topic, txnIdMostBits, txnIdLeastBits, TxnAction.COMMIT, lowWaterMark);
+        long start = System.nanoTime();
+        return tbHandler.endTxnOnTopic(topic, txnIdMostBits, txnIdLeastBits, TxnAction.COMMIT, lowWaterMark)
+                .whenComplete((__, t) -> {
+                    if (null != t) {
+                        this.stats.recordCommitFailed(topic);
+                    } else {
+                        this.stats.recordCommitLatency(topic, System.nanoTime() - start);
+                    }
+                });
     }
 
     @Override
     public CompletableFuture<TxnID> abortTxnOnTopic(String topic, long txnIdMostBits,
                                                     long txnIdLeastBits, long lowWaterMark) {
-        return tbHandler.endTxnOnTopic(topic, txnIdMostBits, txnIdLeastBits, TxnAction.ABORT, lowWaterMark);
+        long start = System.nanoTime();
+        return tbHandler.endTxnOnTopic(topic, txnIdMostBits, txnIdLeastBits, TxnAction.ABORT, lowWaterMark)
+                .whenComplete((__, t) -> {
+                    if (null != t) {
+                        this.stats.recordAbortFailed(topic);
+                    } else {
+                        this.stats.recordAbortLatency(topic, System.nanoTime() - start);
+                    }
+                });
     }
 
     @Override
     public CompletableFuture<TxnID> commitTxnOnSubscription(String topic, String subscription, long txnIdMostBits,
                                                             long txnIdLeastBits, long lowWaterMark) {
+        long start = System.nanoTime();
         return tbHandler.endTxnOnSubscription(topic, subscription, txnIdMostBits, txnIdLeastBits,
-                TxnAction.COMMIT, lowWaterMark);
+                TxnAction.COMMIT, lowWaterMark)
+                .whenComplete((__, t) -> {
+                    if (null != t) {
+                        this.stats.recordCommitFailed(topic);
+                    } else {
+                        this.stats.recordCommitLatency(topic, System.nanoTime() - start);
+                    }
+                });
     }
 
     @Override
     public CompletableFuture<TxnID> abortTxnOnSubscription(String topic, String subscription,
                                                            long txnIdMostBits, long txnIdLeastBits, long lowWaterMark) {
+        long start = System.nanoTime();
         return tbHandler.endTxnOnSubscription(topic, subscription, txnIdMostBits, txnIdLeastBits,
-                TxnAction.ABORT, lowWaterMark);
+                TxnAction.ABORT, lowWaterMark)
+                .whenComplete((__, t) -> {
+                    if (null != t) {
+                        this.stats.recordAbortFailed(topic);
+
+                    } else {
+                        this.stats.recordAbortLatency(topic, System.nanoTime() - start);
+                    }
+                });
     }
 
     @Override
     public void close() {
         tbHandler.close();
+        this.stats.close();
     }
 
     @Override
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferClientStatsImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferClientStatsImpl.java
new file mode 100644
index 00000000000..73c2078e186
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferClientStatsImpl.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer.impl;
+
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.Counter;
+import io.prometheus.client.Gauge;
+import io.prometheus.client.Summary;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.pulsar.broker.transaction.buffer.TransactionBufferClientStats;
+import org.apache.pulsar.client.impl.transaction.TransactionBufferHandler;
+import org.apache.pulsar.common.naming.TopicName;
+
+public final class TransactionBufferClientStatsImpl implements TransactionBufferClientStats {
+    private static final double[] QUANTILES = {0.50, 0.75, 0.95, 0.99, 0.999, 0.9999, 1};
+    private final AtomicBoolean closed = new AtomicBoolean(false);
+
+    private final Counter abortFailed;
+    private final Counter commitFailed;
+    private final Summary abortLatency;
+    private final Summary commitLatency;
+    private final Gauge pendingRequests;
+
+    private final boolean exposeTopicLevelMetrics;
+
+    private static TransactionBufferClientStats instance;
+
+    private TransactionBufferClientStatsImpl(boolean exposeTopicLevelMetrics,
+                                             TransactionBufferHandler handler) {
+        this.exposeTopicLevelMetrics = exposeTopicLevelMetrics;
+        String[] labelNames = exposeTopicLevelMetrics
+                ? new String[]{"namespace", "topic"} : new String[]{"namespace"};
+
+        this.abortFailed = Counter.build("pulsar_txn_tb_client_abort_failed", "-")
+                .labelNames(labelNames)
+                .register();
+        this.commitFailed = Counter.build("pulsar_txn_tb_client_commit_failed", "-")
+                .labelNames(labelNames)
+                .register();
+        this.abortLatency =
+                this.buildSummary("pulsar_txn_tb_client_abort_latency", "-", labelNames);
+        this.commitLatency =
+                this.buildSummary("pulsar_txn_tb_client_commit_latency", "-", labelNames);
+
+        this.pendingRequests = Gauge.build("pulsar_txn_tb_client_pending_requests", "-")
+                .register()
+                .setChild(new Gauge.Child() {
+                    @Override
+                    public double get() {
+                        return null == handler ? 0 : handler.getPendingRequestsCount();
+                    }
+                });
+    }
+
+    private Summary buildSummary(String name, String help, String[] labelNames) {
+        Summary.Builder builder = Summary.build(name, help)
+                .labelNames(labelNames);
+        for (double quantile : QUANTILES) {
+            builder.quantile(quantile, 0.01D);
+        }
+        return builder.register();
+    }
+
+    public static synchronized TransactionBufferClientStats getInstance(boolean exposeTopicLevelMetrics,
+                                                                        TransactionBufferHandler handler) {
+        if (null == instance) {
+            instance = new TransactionBufferClientStatsImpl(exposeTopicLevelMetrics, handler);
+        }
+
+        return instance;
+    }
+
+    @Override
+    public void recordAbortFailed(String topic) {
+        this.abortFailed.labels(labelValues(topic)).inc();
+    }
+
+    @Override
+    public void recordCommitFailed(String topic) {
+        this.commitFailed.labels(labelValues(topic)).inc();
+    }
+
+    @Override
+    public void recordAbortLatency(String topic, long nanos) {
+        this.abortLatency.labels(labelValues(topic)).observe(nanos);
+    }
+
+    @Override
+    public void recordCommitLatency(String topic, long nanos) {
+        this.commitLatency.labels(labelValues(topic)).observe(nanos);
+    }
+
+    private String[] labelValues(String topic) {
+        try {
+            TopicName topicName = TopicName.get(topic);
+            return exposeTopicLevelMetrics
+                    ? new String[]{topicName.getNamespace(), topic} : new String[]{topicName.getNamespace()};
+        } catch (Throwable t) {
+            return exposeTopicLevelMetrics ? new String[]{"unknown", "unknown"} : new String[]{"unknown"};
+        }
+    }
+
+    @Override
+    public void close() {
+        if (this.closed.compareAndSet(false, true)) {
+            instance = null;
+            CollectorRegistry.defaultRegistry.unregister(this.abortFailed);
+            CollectorRegistry.defaultRegistry.unregister(this.commitFailed);
+            CollectorRegistry.defaultRegistry.unregister(this.abortLatency);
+            CollectorRegistry.defaultRegistry.unregister(this.commitLatency);
+            CollectorRegistry.defaultRegistry.unregister(this.pendingRequests);
+        }
+    }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
index 4290b475654..d700195416c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
@@ -107,4 +107,19 @@ public class TransactionBufferDisable implements TransactionBuffer {
     public CompletableFuture<Void> checkIfTBRecoverCompletely(boolean isTxn) {
         return CompletableFuture.completedFuture(null);
     }
+
+    @Override
+    public long getOngoingTxnCount() {
+        return 0;
+    }
+
+    @Override
+    public long getAbortedTxnCount() {
+        return 0;
+    }
+
+    @Override
+    public long getCommittedTxnCount() {
+        return 0;
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandleStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandleStats.java
new file mode 100644
index 00000000000..94102f36590
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandleStats.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.pendingack;
+
+import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleStatsImpl;
+
+public interface PendingAckHandleStats {
+
+    void recordCommitTxn(boolean success, long nanos);
+
+    void recordAbortTxn(boolean success);
+
+    void close();
+
+    static PendingAckHandleStats create(String topic, String subName, boolean exposeTopicLevelMetrics) {
+        return new PendingAckHandleStatsImpl(topic, subName, exposeTopicLevelMetrics);
+    }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
index b3aec6c67f5..283bc038d76 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
@@ -43,11 +43,13 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.commons.collections4.map.LinkedMap;
 import org.apache.commons.lang3.tuple.MutablePair;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
 import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
 import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.transaction.pendingack.PendingAckHandle;
+import org.apache.pulsar.broker.transaction.pendingack.PendingAckHandleStats;
 import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore;
 import org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider;
 import org.apache.pulsar.client.api.transaction.TxnID;
@@ -127,6 +129,8 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
     @Getter
     private final ExecutorService internalPinnedExecutor;
 
+    private final PendingAckHandleStats handleStats;
+
     public final RecoverTimeRecord recoverTime = new RecoverTimeRecord();
 
 
@@ -142,6 +146,10 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
                 .getTransactionExecutorProvider()
                 .getExecutor(this);
 
+        ServiceConfiguration config = persistentSubscription.getTopic().getBrokerService().pulsar().getConfig();
+        boolean exposeTopicLevelMetrics = config.isExposeTopicLevelMetricsInPrometheus();
+        this.handleStats = PendingAckHandleStats.create(topicName, subName, exposeTopicLevelMetrics);
+
         this.pendingAckStoreProvider = this.persistentSubscription.getTopic()
                         .getBrokerService().getPulsar().getTransactionPendingAckStoreProvider();
         pendingAckStoreProvider.checkInitializedBefore(persistentSubscription).thenAccept(init -> {
@@ -483,6 +491,7 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
 
     @Override
     public CompletableFuture<Void> commitTxn(TxnID txnID, Map<String, Long> properties, long lowWaterMark) {
+        long start = System.nanoTime();
         CompletableFuture<Void> commitFuture = new CompletableFuture<>();
         internalPinnedExecutor.execute(() -> {
             if (!checkIfReady()) {
@@ -507,7 +516,9 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
             }
             internalCommitTxn(txnID, properties, lowWaterMark, commitFuture);
         });
-        return commitFuture;
+        return commitFuture.whenComplete((__, t) ->
+                this.handleStats.recordCommitTxn(t == null, System.nanoTime() - start)
+        );
     }
 
     private void addAbortTxnRequest(TxnID txnId, Consumer consumer, long lowWaterMark,
@@ -575,7 +586,7 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
         } else {
             abortFuture.complete(null);
         }
-        return abortFuture;
+        return abortFuture.whenComplete((__, t) -> this.handleStats.recordAbortTxn(t == null));
     }
 
     @Override
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleStatsImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleStatsImpl.java
new file mode 100644
index 00000000000..f882cef956a
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleStatsImpl.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.pendingack.impl;
+
+import io.prometheus.client.Counter;
+import io.prometheus.client.Summary;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.broker.transaction.pendingack.PendingAckHandleStats;
+import org.apache.pulsar.common.naming.TopicName;
+
+public class PendingAckHandleStatsImpl implements PendingAckHandleStats {
+    private static final AtomicBoolean INITIALIZED = new AtomicBoolean(false);
+    private static Counter commitTxnCounter;
+    private static Counter abortTxnCounter;
+    private static Summary commitTxnLatency;
+    private static boolean exposeTopicLevelMetrics0;
+
+    private final String[] labelSucceed;
+    private final String[] labelFailed;
+    private final String[] commitLatencyLabel;
+
+    public PendingAckHandleStatsImpl(String topic, String subscription, boolean exposeTopicLevelMetrics) {
+        initialize(exposeTopicLevelMetrics);
+
+        String namespace;
+        if (StringUtils.isBlank(topic)) {
+            namespace = topic = "unknown";
+        } else {
+            try {
+                namespace = TopicName.get(topic).getNamespace();
+            } catch (IllegalArgumentException ex) {
+                namespace = "unknown";
+            }
+        }
+
+        labelSucceed = exposeTopicLevelMetrics0
+                ? new String[]{namespace, topic, subscription, "succeed"} : new String[]{namespace, "succeed"};
+        labelFailed = exposeTopicLevelMetrics0
+                ? new String[]{namespace, topic, subscription, "failed"} : new String[]{namespace, "failed"};
+        commitLatencyLabel = exposeTopicLevelMetrics0
+                ? new String[]{namespace, topic, subscription} : new String[]{namespace};
+    }
+
+    @Override
+    public void recordCommitTxn(boolean success, long nanos) {
+        String[] labels;
+        if (success) {
+            labels = labelSucceed;
+            commitTxnLatency.labels(commitLatencyLabel).observe(TimeUnit.NANOSECONDS.toMicros(nanos));
+        } else {
+            labels = labelFailed;
+        }
+        commitTxnCounter.labels(labels).inc();
+    }
+
+    @Override
+    public void recordAbortTxn(boolean success) {
+        abortTxnCounter.labels(success ? labelSucceed : labelFailed).inc();
+    }
+
+    @Override
+    public void close() {
+        if (exposeTopicLevelMetrics0) {
+            commitTxnCounter.remove(this.labelSucceed);
+            commitTxnCounter.remove(this.labelFailed);
+            abortTxnCounter.remove(this.labelFailed);
+            abortTxnCounter.remove(this.labelFailed);
+        }
+    }
+
+    static void initialize(boolean exposeTopicLevelMetrics) {
+        if (INITIALIZED.compareAndSet(false, true)) {
+            exposeTopicLevelMetrics0 = exposeTopicLevelMetrics;
+
+            String[] labelNames = exposeTopicLevelMetrics
+                    ? new String[]{"namespace", "topic", "subscription", "status"}
+                    : new String[]{"namespace", "status"};
+
+            commitTxnCounter = Counter
+                    .build("pulsar_txn_tp_committed_count", "-")
+                    .labelNames(labelNames)
+                    .register();
+
+            abortTxnCounter = Counter
+                    .build("pulsar_txn_tp_aborted_count", "-")
+                    .labelNames(labelNames)
+                    .register();
+
+            commitTxnLatency = Summary.build("pulsar_txn_tp_commit_latency", "-")
+                    .quantile(0.5, 0.01)
+                    .quantile(0.9, 0.01)
+                    .quantile(0.99, 0.01)
+                    .quantile(0.999, 0.01)
+                    .labelNames(exposeTopicLevelMetrics
+                            ? new String[]{"namespace", "topic", "subscription"} : new String[]{"namespace"})
+                    .register();
+        }
+    }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 7a3eabdb318..c377043df8d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -41,6 +41,7 @@ import io.netty.util.concurrent.DefaultThreadFactory;
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
+import java.nio.charset.StandardCharsets;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -122,6 +123,7 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.TopicPolicies;
+import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
@@ -162,6 +164,45 @@ public class TransactionTest extends TransactionTestBase {
         super.internalCleanup();
     }
 
+
+    @Test
+    public void testTopicTransactionMetrics() throws Exception {
+        final String topic = "persistent://tnx/ns1/test_transaction_topic";
+
+        @Cleanup
+        Producer<byte[]> producer = this.pulsarClient.newProducer()
+                .topic(topic)
+                .sendTimeout(0, TimeUnit.SECONDS)
+                .create();
+
+        Transaction txn = pulsarClient.newTransaction()
+                .withTransactionTimeout(1, TimeUnit.MINUTES).build().get();
+        producer.newMessage(txn).value(UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8))
+                .send();
+        txn.commit().get();
+
+        Transaction txn1 = pulsarClient.newTransaction()
+                .withTransactionTimeout(1, TimeUnit.MINUTES).build().get();
+        producer.newMessage(txn1).value(UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8))
+                .send();
+        txn1.abort().get();
+
+        Transaction txn2 = pulsarClient.newTransaction()
+                .withTransactionTimeout(1, TimeUnit.MINUTES).build().get();
+        producer.newMessage(txn2).value(UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8))
+                .send();
+
+        PulsarService pulsarService = pulsarServiceList.get(0);
+        Optional<Topic> optional = pulsarService.getBrokerService().getTopic(topic, false).get();
+        assertTrue(optional.isPresent());
+        PersistentTopic persistentTopic = (PersistentTopic) optional.get();
+        TopicStatsImpl stats = persistentTopic.getStats(false, false, false);
+
+        assertEquals(stats.committedTxnCount, 1);
+        assertEquals(stats.abortedTxnCount, 1);
+        assertEquals(stats.ongoingTxnCount, 1);
+    }
+
     @Test
     public void testCreateTransactionSystemTopic() throws Exception {
         String subName = "test";
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
index 66460778dc2..2e93b0ad707 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
@@ -19,19 +19,25 @@
 package org.apache.pulsar.broker.transaction.buffer;
 
 import static org.mockito.ArgumentMatchers.anyString;
+import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import lombok.Cleanup;
+import java.io.ByteArrayOutputStream;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.stats.PrometheusMetricsTest;
+import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
 import org.apache.pulsar.broker.transaction.TransactionTestBase;
 import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientImpl;
 import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferHandlerImpl;
@@ -148,6 +154,55 @@ public class TransactionBufferClientTest extends TransactionTestBase {
         }
     }
 
+
+    @Test
+    public void testTransactionBufferMetrics() throws Exception {
+        //Test commit
+        for (int i = 0; i < partitions; i++) {
+            String topic = partitionedTopicName.getPartition(i).toString();
+            tbClient.commitTxnOnSubscription(topic, "test", 1L, i, -1L).get();
+        }
+
+        //test abort
+        for (int i = 0; i < partitions; i++) {
+            String topic = partitionedTopicName.getPartition(i).toString();
+            tbClient.abortTxnOnSubscription(topic, "test", 1L, i, -1L).get();
+        }
+
+        @Cleanup
+        ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
+        PrometheusMetricsGenerator.generate(pulsarServiceList.get(0), true, false, false, statsOut);
+        String metricsStr = statsOut.toString();
+        Multimap<String, PrometheusMetricsTest.Metric> metricsMap = PrometheusMetricsTest.parseMetrics(metricsStr);
+
+        Collection<PrometheusMetricsTest.Metric> abortFailed = metricsMap.get("pulsar_txn_tb_client_abort_failed_total");
+        Collection<PrometheusMetricsTest.Metric> commitFailed = metricsMap.get("pulsar_txn_tb_client_commit_failed_total");
+        Collection<PrometheusMetricsTest.Metric> abortLatencyCount =
+                metricsMap.get("pulsar_txn_tb_client_abort_latency_count");
+        Collection<PrometheusMetricsTest.Metric> commitLatencyCount =
+                metricsMap.get("pulsar_txn_tb_client_commit_latency_count");
+        Collection<PrometheusMetricsTest.Metric> pending = metricsMap.get("pulsar_txn_tb_client_pending_requests");
+
+        assertEquals(abortFailed.stream().mapToDouble(metric -> metric.value).sum(), 0);
+        assertEquals(commitFailed.stream().mapToDouble(metric -> metric.value).sum(), 0);
+
+        for (int i = 0; i < partitions; i++) {
+            String topic = partitionedTopicName.getPartition(i).toString();
+            Optional<PrometheusMetricsTest.Metric> optional = abortLatencyCount.stream()
+                    .filter(metric -> metric.tags.get("topic").equals(topic)).findFirst();
+
+            assertTrue(optional.isPresent());
+            assertEquals(optional.get().value, 1D);
+
+            Optional<PrometheusMetricsTest.Metric> optional1 = commitLatencyCount.stream()
+                    .filter(metric -> metric.tags.get("topic").equals(topic)).findFirst();
+            assertTrue(optional1.isPresent());
+            assertEquals(optional1.get().value, 1D);
+        }
+
+        assertEquals(pending.size(), 1);
+    }
+
     @Test
     public void testTransactionBufferClientTimeout() throws Exception {
         PulsarService pulsarService = pulsarServiceList.get(0);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
index e75c9534c59..ff9e16fac7d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
@@ -21,16 +21,19 @@ package org.apache.pulsar.broker.transaction.pendingack;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
-
+import java.io.ByteArrayOutputStream;
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import com.google.common.collect.Multimap;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.ManagedCursor;
@@ -38,6 +41,8 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.commons.collections4.map.LinkedMap;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.stats.PrometheusMetricsTest;
+import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
 import org.apache.pulsar.broker.transaction.TransactionTestBase;
 import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
 import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl;
@@ -200,6 +205,77 @@ public class PendingAckPersistentTest extends TransactionTestBase {
                         .compareTo((PositionImpl) managedCursor.getManagedLedger().getLastConfirmedEntry()) == -1);
     }
 
+    @Test
+    public void testPendingAckMetrics() throws Exception {
+        final int messageCount = 100;
+        String subName = "testMetric" + UUID.randomUUID();
+
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(PENDING_ACK_REPLAY_TOPIC)
+                .create();
+
+        @Cleanup
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(PENDING_ACK_REPLAY_TOPIC)
+                .subscriptionName(subName)
+                .subscriptionType(SubscriptionType.Exclusive)
+                .enableBatchIndexAcknowledgment(true)
+                .subscribe();
+
+        for (int a = 0; a < messageCount; a++) {
+            producer.send(UUID.randomUUID().toString());
+        }
+
+        for (int a = 0; a < messageCount; a++) {
+            Message<String> message = consumer.receive(10, TimeUnit.SECONDS);
+            if (null == message) {
+                break;
+            }
+
+            Transaction txn = pulsarClient.newTransaction()
+                    .withTransactionTimeout(10, TimeUnit.SECONDS).build().get();
+            consumer.acknowledgeCumulativeAsync(message.getMessageId(), txn).get();
+            if (a % 2 == 0) {
+                txn.abort().get();
+            } else {
+                txn.commit().get();
+            }
+        }
+
+        @Cleanup
+        ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
+        PrometheusMetricsGenerator.generate(pulsarServiceList.get(0), true, false, false, statsOut);
+        String metricsStr = statsOut.toString();
+        Multimap<String, PrometheusMetricsTest.Metric> metricsMap = PrometheusMetricsTest.parseMetrics(metricsStr);
+
+        Collection<PrometheusMetricsTest.Metric> abortedCount = metricsMap.get("pulsar_txn_tp_aborted_count_total");
+        Collection<PrometheusMetricsTest.Metric> committedCount = metricsMap.get("pulsar_txn_tp_committed_count_total");
+        Collection<PrometheusMetricsTest.Metric> commitLatency = metricsMap.get("pulsar_txn_tp_commit_latency");
+        Assert.assertTrue(commitLatency.size() > 0);
+
+        int count = 0;
+        for (PrometheusMetricsTest.Metric metric : commitLatency) {
+            if (metric.tags.get("topic").endsWith(PENDING_ACK_REPLAY_TOPIC) && metric.value > 0) {
+                count++;
+            }
+        }
+        Assert.assertTrue(count > 0);
+
+        for (PrometheusMetricsTest.Metric metric : abortedCount) {
+            if (metric.tags.get("subscription").equals(subName) && metric.tags.get("status").equals("succeed")) {
+                assertTrue(metric.tags.get("topic").endsWith(PENDING_ACK_REPLAY_TOPIC));
+                assertTrue(metric.value > 0);
+            }
+        }
+        for (PrometheusMetricsTest.Metric metric : committedCount) {
+            if (metric.tags.get("subscription").equals(subName) && metric.tags.get("status").equals("succeed")) {
+                assertTrue(metric.tags.get("topic").endsWith(PENDING_ACK_REPLAY_TOPIC));
+                assertTrue(metric.value > 0);
+            }
+        }
+    }
+
     @Test
     public void cumulativePendingAckReplayTest() throws Exception {
         int messageCount = 1000;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
index f6cdbba5036..3c5ce5f14d6 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
@@ -101,6 +101,10 @@ public class TopicStatsImpl implements TopicStats {
     /** record last failed offloaded timestamp. If no failed offload, the value should be 0 */
     public long lastOffloadFailureTimeStamp;
 
+    public long ongoingTxnCount;
+    public long abortedTxnCount;
+    public long committedTxnCount;
+
     /** List of connected publishers on this topic w/ their stats. */
     @Getter(AccessLevel.NONE)
     @Setter(AccessLevel.NONE)
@@ -231,6 +235,9 @@ public class TopicStatsImpl implements TopicStats {
         this.nonContiguousDeletedMessagesRanges += stats.nonContiguousDeletedMessagesRanges;
         this.nonContiguousDeletedMessagesRangesSerializedSize += stats.nonContiguousDeletedMessagesRangesSerializedSize;
         this.delayedMessageIndexSizeInBytes += stats.delayedMessageIndexSizeInBytes;
+        this.ongoingTxnCount = stats.ongoingTxnCount;
+        this.abortedTxnCount = stats.abortedTxnCount;
+        this.committedTxnCount = stats.committedTxnCount;
 
         stats.getPublishers().forEach(s -> {
            if (s.isSupportsPartialProducer() && s.getProducerName() != null) {