You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/08/12 01:19:20 UTC
[pulsar] branch master updated: [monitor][txn] Add metrics for transaction (#15140)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b7b2e37f017 [monitor][txn] Add metrics for transaction (#15140)
b7b2e37f017 is described below
commit b7b2e37f017ea044f7a1f7cbfc36779ed965655d
Author: Tao Jiuming <95...@users.noreply.github.com>
AuthorDate: Fri Aug 12 09:19:09 2022 +0800
[monitor][txn] Add metrics for transaction (#15140)
---
.../broker/service/persistent/PersistentTopic.java | 4 +
.../stats/prometheus/AggregatedNamespaceStats.java | 8 ++
.../stats/prometheus/NamespaceStatsAggregator.java | 6 +
.../pulsar/broker/stats/prometheus/TopicStats.java | 15 +++
.../transaction/buffer/TransactionBuffer.java | 8 ++
.../buffer/TransactionBufferClientStats.java | 70 +++++++++++
.../buffer/impl/InMemTransactionBuffer.java | 24 ++++
.../buffer/impl/TopicTransactionBuffer.java | 21 ++++
.../buffer/impl/TransactionBufferClientImpl.java | 55 ++++++++-
.../impl/TransactionBufferClientStatsImpl.java | 130 +++++++++++++++++++++
.../buffer/impl/TransactionBufferDisable.java | 15 +++
.../pendingack/PendingAckHandleStats.java | 34 ++++++
.../pendingack/impl/PendingAckHandleImpl.java | 15 ++-
.../pendingack/impl/PendingAckHandleStatsImpl.java | 117 +++++++++++++++++++
.../pulsar/broker/transaction/TransactionTest.java | 41 +++++++
.../buffer/TransactionBufferClientTest.java | 55 +++++++++
.../pendingack/PendingAckPersistentTest.java | 78 ++++++++++++-
.../common/policies/data/stats/TopicStatsImpl.java | 7 ++
18 files changed, 694 insertions(+), 9 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 6673bedce23..f166121a88d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1873,6 +1873,10 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
stats.bytesOutCounter = bytesOutFromRemovedSubscriptions.longValue();
stats.msgOutCounter = msgOutFromRemovedSubscriptions.longValue();
stats.publishRateLimitedTimes = publishRateLimitedTimes;
+ TransactionBuffer txnBuffer = getTransactionBuffer();
+ stats.ongoingTxnCount = txnBuffer.getOngoingTxnCount();
+ stats.abortedTxnCount = txnBuffer.getAbortedTxnCount();
+ stats.committedTxnCount = txnBuffer.getCommittedTxnCount();
subscriptions.forEach((name, subscription) -> {
SubscriptionStatsImpl subStats =
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
index 761094ac0e6..e9811b8079b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
@@ -43,6 +43,10 @@ public class AggregatedNamespaceStats {
public long msgBacklog;
public long msgDelayed;
+ public long ongoingTxnCount;
+ public long abortedTxnCount;
+ public long committedTxnCount;
+
long backlogQuotaLimit;
long backlogQuotaLimitTime;
@@ -79,6 +83,10 @@ public class AggregatedNamespaceStats {
msgOutCounter += stats.msgOutCounter;
delayedTrackerMemoryUsage += stats.delayedTrackerMemoryUsage;
+ this.ongoingTxnCount += stats.ongoingTxnCount;
+ this.abortedTxnCount += stats.abortedTxnCount;
+ this.committedTxnCount += stats.committedTxnCount;
+
managedLedgerStats.storageSize += stats.managedLedgerStats.storageSize;
managedLedgerStats.storageLogicalSize += stats.managedLedgerStats.storageLogicalSize;
managedLedgerStats.backlogSize += stats.managedLedgerStats.backlogSize;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
index f444ad0542e..0a885ef7356 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
@@ -179,6 +179,9 @@ public class NamespaceStatsAggregator {
stats.averageMsgSize = tStatus.averageMsgSize;
stats.publishRateLimitedTimes = tStatus.publishRateLimitedTimes;
stats.delayedTrackerMemoryUsage = tStatus.delayedMessageIndexSizeInBytes;
+ stats.abortedTxnCount = tStatus.abortedTxnCount;
+ stats.ongoingTxnCount = tStatus.ongoingTxnCount;
+ stats.committedTxnCount = tStatus.committedTxnCount;
stats.producersCount = 0;
topic.getProducers().values().forEach(producer -> {
@@ -325,6 +328,9 @@ public class NamespaceStatsAggregator {
metric(stream, cluster, namespace, "pulsar_rate_out", stats.rateOut);
metric(stream, cluster, namespace, "pulsar_throughput_in", stats.throughputIn);
metric(stream, cluster, namespace, "pulsar_throughput_out", stats.throughputOut);
+ metric(stream, cluster, namespace, "pulsar_txn_tb_active_total", stats.ongoingTxnCount);
+ metric(stream, cluster, namespace, "pulsar_txn_tb_aborted_total", stats.abortedTxnCount);
+ metric(stream, cluster, namespace, "pulsar_txn_tb_committed_total", stats.committedTxnCount);
metric(stream, cluster, namespace, "pulsar_consumer_msg_ack_rate", stats.messageAckRate);
metric(stream, cluster, namespace, "pulsar_in_bytes_total", stats.bytesInCounter);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
index 99838ccfae9..9e857271eab 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
@@ -42,6 +42,10 @@ class TopicStats {
long bytesOutCounter;
double averageMsgSize;
+ long ongoingTxnCount;
+ long abortedTxnCount;
+ long committedTxnCount;
+
public long msgBacklog;
long publishRateLimitedTimes;
@@ -82,6 +86,10 @@ class TopicStats {
bytesOutCounter = 0;
msgOutCounter = 0;
+ ongoingTxnCount = 0;
+ abortedTxnCount = 0;
+ committedTxnCount = 0;
+
managedLedgerStats.reset();
msgBacklog = 0;
publishRateLimitedTimes = 0L;
@@ -129,6 +137,13 @@ class TopicStats {
metric(stream, cluster, namespace, topic, "pulsar_average_msg_size", stats.averageMsgSize,
splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_txn_tb_active_total", stats.ongoingTxnCount,
+ splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_txn_tb_aborted_total", stats.abortedTxnCount,
+ splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, "pulsar_txn_tb_committed_total", stats.committedTxnCount,
+ splitTopicAndPartitionIndexLabel);
+
metric(stream, cluster, namespace, topic, "pulsar_storage_size", stats.managedLedgerStats.storageSize,
splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_storage_logical_size",
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java
index d9ab04dbe9c..ab1270ef0e6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java
@@ -176,4 +176,12 @@ public interface TransactionBuffer {
* @return a future which has completely if isTxn = false. Or a future return by takeSnapshot.
*/
CompletableFuture<Void> checkIfTBRecoverCompletely(boolean isTxn);
+
+
+
+ long getOngoingTxnCount();
+
+ long getAbortedTxnCount();
+
+ long getCommittedTxnCount();
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientStats.java
new file mode 100644
index 00000000000..5c91b4e13a2
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientStats.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer;
+
+import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientStatsImpl;
+import org.apache.pulsar.client.impl.transaction.TransactionBufferHandler;
+
+public interface TransactionBufferClientStats {
+
+ void recordAbortFailed(String topic);
+
+ void recordCommitFailed(String topic);
+
+ void recordAbortLatency(String topic, long nanos);
+
+ void recordCommitLatency(String topic, long nanos);
+
+ void close();
+
+
+ static TransactionBufferClientStats create(boolean exposeTopicMetrics, TransactionBufferHandler handler,
+ boolean enableTxnCoordinator) {
+ return enableTxnCoordinator
+ ? TransactionBufferClientStatsImpl.getInstance(exposeTopicMetrics, handler) : NOOP;
+ }
+
+
+ TransactionBufferClientStats NOOP = new TransactionBufferClientStats() {
+ @Override
+ public void recordAbortFailed(String topic) {
+
+ }
+
+ @Override
+ public void recordCommitFailed(String topic) {
+
+ }
+
+ @Override
+ public void recordAbortLatency(String topic, long nanos) {
+
+ }
+
+ @Override
+ public void recordCommitLatency(String topic, long nanos) {
+
+ }
+
+ @Override
+ public void close() {
+
+ }
+ };
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
index 8a818700f50..c4a9fc2b774 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
@@ -388,4 +388,28 @@ class InMemTransactionBuffer implements TransactionBuffer {
public CompletableFuture<Void> checkIfTBRecoverCompletely(boolean isTxn) {
return CompletableFuture.completedFuture(null);
}
+
+ @Override
+ public long getOngoingTxnCount() {
+ return this.buffers.values().stream()
+ .filter(txnBuffer -> txnBuffer.status.equals(TxnStatus.OPEN)
+ || txnBuffer.status.equals(TxnStatus.COMMITTING)
+ || txnBuffer.status.equals(TxnStatus.ABORTING)
+ )
+ .count();
+ }
+
+ @Override
+ public long getAbortedTxnCount() {
+ return this.buffers.values().stream()
+ .filter(txnBuffer -> txnBuffer.status.equals(TxnStatus.ABORTED))
+ .count();
+ }
+
+ @Override
+ public long getCommittedTxnCount() {
+ return this.buffers.values().stream()
+ .filter(txnBuffer -> txnBuffer.status.equals(TxnStatus.COMMITTED))
+ .count();
+ }
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index 48aad16340c..ad778137001 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -30,6 +30,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAdder;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
@@ -89,6 +90,10 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
// when add abort or change max read position, the count will +1. Take snapshot will set 0 into it.
private final AtomicLong changeMaxReadPositionAndAddAbortTimes = new AtomicLong();
+ private final LongAdder txnCommittedCounter = new LongAdder();
+
+ private final LongAdder txnAbortedCounter = new LongAdder();
+
private final Timer timer;
private final int takeSnapshotIntervalNumber;
@@ -258,6 +263,20 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
}
}
+ @Override
+ public long getOngoingTxnCount() {
+ return this.ongoingTxns.size();
+ }
+
+ @Override
+ public long getAbortedTxnCount() {
+ return this.txnAbortedCounter.sum();
+ }
+
+ @Override
+ public long getCommittedTxnCount() {
+ return this.txnCommittedCounter.sum();
+ }
@Override
public CompletableFuture<Position> appendBufferToTxn(TxnID txnId, long sequenceId, ByteBuf buffer) {
@@ -322,6 +341,7 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
clearAbortedTransactions();
takeSnapshotByChangeTimes();
}
+ txnCommittedCounter.increment();
completableFuture.complete(null);
}
@@ -368,6 +388,7 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
clearAbortedTransactions();
takeSnapshotByChangeTimes();
}
+ txnAbortedCounter.increment();
completableFuture.complete(null);
handleLowWaterMark(txnID, lowWaterMark);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferClientImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferClientImpl.java
index c531f9f1871..bc7c1db2e04 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferClientImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferClientImpl.java
@@ -23,6 +23,8 @@ import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.transaction.buffer.TransactionBufferClientStats;
import org.apache.pulsar.client.api.transaction.TransactionBufferClient;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.transaction.TransactionBufferHandler;
@@ -35,47 +37,88 @@ import org.apache.pulsar.common.api.proto.TxnAction;
public class TransactionBufferClientImpl implements TransactionBufferClient {
private final TransactionBufferHandler tbHandler;
+ private final TransactionBufferClientStats stats;
- private TransactionBufferClientImpl(TransactionBufferHandler tbHandler) {
+ private TransactionBufferClientImpl(TransactionBufferHandler tbHandler, boolean exposeTopicLevelMetrics,
+ boolean enableTxnCoordinator) {
this.tbHandler = tbHandler;
+ this.stats = TransactionBufferClientStats.create(exposeTopicLevelMetrics, tbHandler, enableTxnCoordinator);
}
public static TransactionBufferClient create(PulsarService pulsarService, HashedWheelTimer timer,
int maxConcurrentRequests, long operationTimeoutInMills) throws PulsarServerException {
TransactionBufferHandler handler = new TransactionBufferHandlerImpl(pulsarService, timer,
maxConcurrentRequests, operationTimeoutInMills);
- return new TransactionBufferClientImpl(handler);
+
+ ServiceConfiguration config = pulsarService.getConfig();
+ boolean exposeTopicLevelMetrics = config.isExposeTopicLevelMetricsInPrometheus();
+ boolean enableTxnCoordinator = config.isTransactionCoordinatorEnabled();
+ return new TransactionBufferClientImpl(handler, exposeTopicLevelMetrics, enableTxnCoordinator);
}
@Override
public CompletableFuture<TxnID> commitTxnOnTopic(String topic, long txnIdMostBits,
long txnIdLeastBits, long lowWaterMark) {
- return tbHandler.endTxnOnTopic(topic, txnIdMostBits, txnIdLeastBits, TxnAction.COMMIT, lowWaterMark);
+ long start = System.nanoTime();
+ return tbHandler.endTxnOnTopic(topic, txnIdMostBits, txnIdLeastBits, TxnAction.COMMIT, lowWaterMark)
+ .whenComplete((__, t) -> {
+ if (null != t) {
+ this.stats.recordCommitFailed(topic);
+ } else {
+ this.stats.recordCommitLatency(topic, System.nanoTime() - start);
+ }
+ });
}
@Override
public CompletableFuture<TxnID> abortTxnOnTopic(String topic, long txnIdMostBits,
long txnIdLeastBits, long lowWaterMark) {
- return tbHandler.endTxnOnTopic(topic, txnIdMostBits, txnIdLeastBits, TxnAction.ABORT, lowWaterMark);
+ long start = System.nanoTime();
+ return tbHandler.endTxnOnTopic(topic, txnIdMostBits, txnIdLeastBits, TxnAction.ABORT, lowWaterMark)
+ .whenComplete((__, t) -> {
+ if (null != t) {
+ this.stats.recordAbortFailed(topic);
+ } else {
+ this.stats.recordAbortLatency(topic, System.nanoTime() - start);
+ }
+ });
}
@Override
public CompletableFuture<TxnID> commitTxnOnSubscription(String topic, String subscription, long txnIdMostBits,
long txnIdLeastBits, long lowWaterMark) {
+ long start = System.nanoTime();
return tbHandler.endTxnOnSubscription(topic, subscription, txnIdMostBits, txnIdLeastBits,
- TxnAction.COMMIT, lowWaterMark);
+ TxnAction.COMMIT, lowWaterMark)
+ .whenComplete((__, t) -> {
+ if (null != t) {
+ this.stats.recordCommitFailed(topic);
+ } else {
+ this.stats.recordCommitLatency(topic, System.nanoTime() - start);
+ }
+ });
}
@Override
public CompletableFuture<TxnID> abortTxnOnSubscription(String topic, String subscription,
long txnIdMostBits, long txnIdLeastBits, long lowWaterMark) {
+ long start = System.nanoTime();
return tbHandler.endTxnOnSubscription(topic, subscription, txnIdMostBits, txnIdLeastBits,
- TxnAction.ABORT, lowWaterMark);
+ TxnAction.ABORT, lowWaterMark)
+ .whenComplete((__, t) -> {
+ if (null != t) {
+ this.stats.recordAbortFailed(topic);
+
+ } else {
+ this.stats.recordAbortLatency(topic, System.nanoTime() - start);
+ }
+ });
}
@Override
public void close() {
tbHandler.close();
+ this.stats.close();
}
@Override
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferClientStatsImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferClientStatsImpl.java
new file mode 100644
index 00000000000..73c2078e186
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferClientStatsImpl.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer.impl;
+
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.Counter;
+import io.prometheus.client.Gauge;
+import io.prometheus.client.Summary;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.pulsar.broker.transaction.buffer.TransactionBufferClientStats;
+import org.apache.pulsar.client.impl.transaction.TransactionBufferHandler;
+import org.apache.pulsar.common.naming.TopicName;
+
+public final class TransactionBufferClientStatsImpl implements TransactionBufferClientStats {
+ private static final double[] QUANTILES = {0.50, 0.75, 0.95, 0.99, 0.999, 0.9999, 1};
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+
+ private final Counter abortFailed;
+ private final Counter commitFailed;
+ private final Summary abortLatency;
+ private final Summary commitLatency;
+ private final Gauge pendingRequests;
+
+ private final boolean exposeTopicLevelMetrics;
+
+ private static TransactionBufferClientStats instance;
+
+ private TransactionBufferClientStatsImpl(boolean exposeTopicLevelMetrics,
+ TransactionBufferHandler handler) {
+ this.exposeTopicLevelMetrics = exposeTopicLevelMetrics;
+ String[] labelNames = exposeTopicLevelMetrics
+ ? new String[]{"namespace", "topic"} : new String[]{"namespace"};
+
+ this.abortFailed = Counter.build("pulsar_txn_tb_client_abort_failed", "-")
+ .labelNames(labelNames)
+ .register();
+ this.commitFailed = Counter.build("pulsar_txn_tb_client_commit_failed", "-")
+ .labelNames(labelNames)
+ .register();
+ this.abortLatency =
+ this.buildSummary("pulsar_txn_tb_client_abort_latency", "-", labelNames);
+ this.commitLatency =
+ this.buildSummary("pulsar_txn_tb_client_commit_latency", "-", labelNames);
+
+ this.pendingRequests = Gauge.build("pulsar_txn_tb_client_pending_requests", "-")
+ .register()
+ .setChild(new Gauge.Child() {
+ @Override
+ public double get() {
+ return null == handler ? 0 : handler.getPendingRequestsCount();
+ }
+ });
+ }
+
+ private Summary buildSummary(String name, String help, String[] labelNames) {
+ Summary.Builder builder = Summary.build(name, help)
+ .labelNames(labelNames);
+ for (double quantile : QUANTILES) {
+ builder.quantile(quantile, 0.01D);
+ }
+ return builder.register();
+ }
+
+ public static synchronized TransactionBufferClientStats getInstance(boolean exposeTopicLevelMetrics,
+ TransactionBufferHandler handler) {
+ if (null == instance) {
+ instance = new TransactionBufferClientStatsImpl(exposeTopicLevelMetrics, handler);
+ }
+
+ return instance;
+ }
+
+ @Override
+ public void recordAbortFailed(String topic) {
+ this.abortFailed.labels(labelValues(topic)).inc();
+ }
+
+ @Override
+ public void recordCommitFailed(String topic) {
+ this.commitFailed.labels(labelValues(topic)).inc();
+ }
+
+ @Override
+ public void recordAbortLatency(String topic, long nanos) {
+ this.abortLatency.labels(labelValues(topic)).observe(nanos);
+ }
+
+ @Override
+ public void recordCommitLatency(String topic, long nanos) {
+ this.commitLatency.labels(labelValues(topic)).observe(nanos);
+ }
+
+ private String[] labelValues(String topic) {
+ try {
+ TopicName topicName = TopicName.get(topic);
+ return exposeTopicLevelMetrics
+ ? new String[]{topicName.getNamespace(), topic} : new String[]{topicName.getNamespace()};
+ } catch (Throwable t) {
+ return exposeTopicLevelMetrics ? new String[]{"unknown", "unknown"} : new String[]{"unknown"};
+ }
+ }
+
+ @Override
+ public void close() {
+ if (this.closed.compareAndSet(false, true)) {
+ instance = null;
+ CollectorRegistry.defaultRegistry.unregister(this.abortFailed);
+ CollectorRegistry.defaultRegistry.unregister(this.commitFailed);
+ CollectorRegistry.defaultRegistry.unregister(this.abortLatency);
+ CollectorRegistry.defaultRegistry.unregister(this.commitLatency);
+ CollectorRegistry.defaultRegistry.unregister(this.pendingRequests);
+ }
+ }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
index 4290b475654..d700195416c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
@@ -107,4 +107,19 @@ public class TransactionBufferDisable implements TransactionBuffer {
public CompletableFuture<Void> checkIfTBRecoverCompletely(boolean isTxn) {
return CompletableFuture.completedFuture(null);
}
+
+ @Override
+ public long getOngoingTxnCount() {
+ return 0;
+ }
+
+ @Override
+ public long getAbortedTxnCount() {
+ return 0;
+ }
+
+ @Override
+ public long getCommittedTxnCount() {
+ return 0;
+ }
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandleStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandleStats.java
new file mode 100644
index 00000000000..94102f36590
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandleStats.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.pendingack;
+
+import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleStatsImpl;
+
+public interface PendingAckHandleStats {
+
+ void recordCommitTxn(boolean success, long nanos);
+
+ void recordAbortTxn(boolean success);
+
+ void close();
+
+ static PendingAckHandleStats create(String topic, String subName, boolean exposeTopicLevelMetrics) {
+ return new PendingAckHandleStatsImpl(topic, subName, exposeTopicLevelMetrics);
+ }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
index b3aec6c67f5..283bc038d76 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
@@ -43,11 +43,13 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.collections4.map.LinkedMap;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.transaction.pendingack.PendingAckHandle;
+import org.apache.pulsar.broker.transaction.pendingack.PendingAckHandleStats;
import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore;
import org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider;
import org.apache.pulsar.client.api.transaction.TxnID;
@@ -127,6 +129,8 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
@Getter
private final ExecutorService internalPinnedExecutor;
+ private final PendingAckHandleStats handleStats;
+
public final RecoverTimeRecord recoverTime = new RecoverTimeRecord();
@@ -142,6 +146,10 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
.getTransactionExecutorProvider()
.getExecutor(this);
+ ServiceConfiguration config = persistentSubscription.getTopic().getBrokerService().pulsar().getConfig();
+ boolean exposeTopicLevelMetrics = config.isExposeTopicLevelMetricsInPrometheus();
+ this.handleStats = PendingAckHandleStats.create(topicName, subName, exposeTopicLevelMetrics);
+
this.pendingAckStoreProvider = this.persistentSubscription.getTopic()
.getBrokerService().getPulsar().getTransactionPendingAckStoreProvider();
pendingAckStoreProvider.checkInitializedBefore(persistentSubscription).thenAccept(init -> {
@@ -483,6 +491,7 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
@Override
public CompletableFuture<Void> commitTxn(TxnID txnID, Map<String, Long> properties, long lowWaterMark) {
+ long start = System.nanoTime();
CompletableFuture<Void> commitFuture = new CompletableFuture<>();
internalPinnedExecutor.execute(() -> {
if (!checkIfReady()) {
@@ -507,7 +516,9 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
}
internalCommitTxn(txnID, properties, lowWaterMark, commitFuture);
});
- return commitFuture;
+ return commitFuture.whenComplete((__, t) ->
+ this.handleStats.recordCommitTxn(t == null, System.nanoTime() - start)
+ );
}
private void addAbortTxnRequest(TxnID txnId, Consumer consumer, long lowWaterMark,
@@ -575,7 +586,7 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
} else {
abortFuture.complete(null);
}
- return abortFuture;
+ return abortFuture.whenComplete((__, t) -> this.handleStats.recordAbortTxn(t == null));
}
@Override
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleStatsImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleStatsImpl.java
new file mode 100644
index 00000000000..f882cef956a
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleStatsImpl.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.pendingack.impl;
+
+import io.prometheus.client.Counter;
+import io.prometheus.client.Summary;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.broker.transaction.pendingack.PendingAckHandleStats;
+import org.apache.pulsar.common.naming.TopicName;
+
+public class PendingAckHandleStatsImpl implements PendingAckHandleStats {
+ private static final AtomicBoolean INITIALIZED = new AtomicBoolean(false);
+ private static Counter commitTxnCounter;
+ private static Counter abortTxnCounter;
+ private static Summary commitTxnLatency;
+ private static boolean exposeTopicLevelMetrics0;
+
+ private final String[] labelSucceed;
+ private final String[] labelFailed;
+ private final String[] commitLatencyLabel;
+
+ public PendingAckHandleStatsImpl(String topic, String subscription, boolean exposeTopicLevelMetrics) {
+ initialize(exposeTopicLevelMetrics);
+
+ String namespace;
+ if (StringUtils.isBlank(topic)) {
+ namespace = topic = "unknown";
+ } else {
+ try {
+ namespace = TopicName.get(topic).getNamespace();
+ } catch (IllegalArgumentException ex) {
+ namespace = "unknown";
+ }
+ }
+
+ labelSucceed = exposeTopicLevelMetrics0
+ ? new String[]{namespace, topic, subscription, "succeed"} : new String[]{namespace, "succeed"};
+ labelFailed = exposeTopicLevelMetrics0
+ ? new String[]{namespace, topic, subscription, "failed"} : new String[]{namespace, "failed"};
+ commitLatencyLabel = exposeTopicLevelMetrics0
+ ? new String[]{namespace, topic, subscription} : new String[]{namespace};
+ }
+
+ @Override
+ public void recordCommitTxn(boolean success, long nanos) {
+ String[] labels;
+ if (success) {
+ labels = labelSucceed;
+ commitTxnLatency.labels(commitLatencyLabel).observe(TimeUnit.NANOSECONDS.toMicros(nanos));
+ } else {
+ labels = labelFailed;
+ }
+ commitTxnCounter.labels(labels).inc();
+ }
+
+ @Override
+ public void recordAbortTxn(boolean success) {
+ abortTxnCounter.labels(success ? labelSucceed : labelFailed).inc();
+ }
+
+ @Override
+ public void close() {
+ if (exposeTopicLevelMetrics0) {
+ commitTxnCounter.remove(this.labelSucceed);
+ commitTxnCounter.remove(this.labelFailed);
+ abortTxnCounter.remove(this.labelFailed);
+ abortTxnCounter.remove(this.labelFailed);
+ }
+ }
+
+ static void initialize(boolean exposeTopicLevelMetrics) {
+ if (INITIALIZED.compareAndSet(false, true)) {
+ exposeTopicLevelMetrics0 = exposeTopicLevelMetrics;
+
+ String[] labelNames = exposeTopicLevelMetrics
+ ? new String[]{"namespace", "topic", "subscription", "status"}
+ : new String[]{"namespace", "status"};
+
+ commitTxnCounter = Counter
+ .build("pulsar_txn_tp_committed_count", "-")
+ .labelNames(labelNames)
+ .register();
+
+ abortTxnCounter = Counter
+ .build("pulsar_txn_tp_aborted_count", "-")
+ .labelNames(labelNames)
+ .register();
+
+ commitTxnLatency = Summary.build("pulsar_txn_tp_commit_latency", "-")
+ .quantile(0.5, 0.01)
+ .quantile(0.9, 0.01)
+ .quantile(0.99, 0.01)
+ .quantile(0.999, 0.01)
+ .labelNames(exposeTopicLevelMetrics
+ ? new String[]{"namespace", "topic", "subscription"} : new String[]{"namespace"})
+ .register();
+ }
+ }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 7a3eabdb318..c377043df8d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -41,6 +41,7 @@ import io.netty.util.concurrent.DefaultThreadFactory;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.ArrayList;
+import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -122,6 +123,7 @@ import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TopicPolicies;
+import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
@@ -162,6 +164,45 @@ public class TransactionTest extends TransactionTestBase {
super.internalCleanup();
}
+
+ @Test
+ public void testTopicTransactionMetrics() throws Exception {
+ final String topic = "persistent://tnx/ns1/test_transaction_topic";
+
+ @Cleanup
+ Producer<byte[]> producer = this.pulsarClient.newProducer()
+ .topic(topic)
+ .sendTimeout(0, TimeUnit.SECONDS)
+ .create();
+
+ Transaction txn = pulsarClient.newTransaction()
+ .withTransactionTimeout(1, TimeUnit.MINUTES).build().get();
+ producer.newMessage(txn).value(UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8))
+ .send();
+ txn.commit().get();
+
+ Transaction txn1 = pulsarClient.newTransaction()
+ .withTransactionTimeout(1, TimeUnit.MINUTES).build().get();
+ producer.newMessage(txn1).value(UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8))
+ .send();
+ txn1.abort().get();
+
+ Transaction txn2 = pulsarClient.newTransaction()
+ .withTransactionTimeout(1, TimeUnit.MINUTES).build().get();
+ producer.newMessage(txn2).value(UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8))
+ .send();
+
+ PulsarService pulsarService = pulsarServiceList.get(0);
+ Optional<Topic> optional = pulsarService.getBrokerService().getTopic(topic, false).get();
+ assertTrue(optional.isPresent());
+ PersistentTopic persistentTopic = (PersistentTopic) optional.get();
+ TopicStatsImpl stats = persistentTopic.getStats(false, false, false);
+
+ assertEquals(stats.committedTxnCount, 1);
+ assertEquals(stats.abortedTxnCount, 1);
+ assertEquals(stats.ongoingTxnCount, 1);
+ }
+
@Test
public void testCreateTransactionSystemTopic() throws Exception {
String subName = "test";
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
index 66460778dc2..2e93b0ad707 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
@@ -19,19 +19,25 @@
package org.apache.pulsar.broker.transaction.buffer;
import static org.mockito.ArgumentMatchers.anyString;
+import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.HashedWheelTimer;
import io.netty.util.concurrent.DefaultThreadFactory;
import lombok.Cleanup;
+import java.io.ByteArrayOutputStream;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.stats.PrometheusMetricsTest;
+import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
import org.apache.pulsar.broker.transaction.TransactionTestBase;
import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientImpl;
import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferHandlerImpl;
@@ -148,6 +154,55 @@ public class TransactionBufferClientTest extends TransactionTestBase {
}
}
+
+ @Test
+ public void testTransactionBufferMetrics() throws Exception {
+ //Test commit
+ for (int i = 0; i < partitions; i++) {
+ String topic = partitionedTopicName.getPartition(i).toString();
+ tbClient.commitTxnOnSubscription(topic, "test", 1L, i, -1L).get();
+ }
+
+ //test abort
+ for (int i = 0; i < partitions; i++) {
+ String topic = partitionedTopicName.getPartition(i).toString();
+ tbClient.abortTxnOnSubscription(topic, "test", 1L, i, -1L).get();
+ }
+
+ @Cleanup
+ ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
+ PrometheusMetricsGenerator.generate(pulsarServiceList.get(0), true, false, false, statsOut);
+ String metricsStr = statsOut.toString();
+ Multimap<String, PrometheusMetricsTest.Metric> metricsMap = PrometheusMetricsTest.parseMetrics(metricsStr);
+
+ Collection<PrometheusMetricsTest.Metric> abortFailed = metricsMap.get("pulsar_txn_tb_client_abort_failed_total");
+ Collection<PrometheusMetricsTest.Metric> commitFailed = metricsMap.get("pulsar_txn_tb_client_commit_failed_total");
+ Collection<PrometheusMetricsTest.Metric> abortLatencyCount =
+ metricsMap.get("pulsar_txn_tb_client_abort_latency_count");
+ Collection<PrometheusMetricsTest.Metric> commitLatencyCount =
+ metricsMap.get("pulsar_txn_tb_client_commit_latency_count");
+ Collection<PrometheusMetricsTest.Metric> pending = metricsMap.get("pulsar_txn_tb_client_pending_requests");
+
+ assertEquals(abortFailed.stream().mapToDouble(metric -> metric.value).sum(), 0);
+ assertEquals(commitFailed.stream().mapToDouble(metric -> metric.value).sum(), 0);
+
+ for (int i = 0; i < partitions; i++) {
+ String topic = partitionedTopicName.getPartition(i).toString();
+ Optional<PrometheusMetricsTest.Metric> optional = abortLatencyCount.stream()
+ .filter(metric -> metric.tags.get("topic").equals(topic)).findFirst();
+
+ assertTrue(optional.isPresent());
+ assertEquals(optional.get().value, 1D);
+
+ Optional<PrometheusMetricsTest.Metric> optional1 = commitLatencyCount.stream()
+ .filter(metric -> metric.tags.get("topic").equals(topic)).findFirst();
+ assertTrue(optional1.isPresent());
+ assertEquals(optional1.get().value, 1D);
+ }
+
+ assertEquals(pending.size(), 1);
+ }
+
@Test
public void testTransactionBufferClientTimeout() throws Exception {
PulsarService pulsarService = pulsarServiceList.get(0);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
index e75c9534c59..ff9e16fac7d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
@@ -21,16 +21,19 @@ package org.apache.pulsar.broker.transaction.pendingack;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
-
+import java.io.ByteArrayOutputStream;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.List;
+import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import com.google.common.collect.Multimap;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedCursor;
@@ -38,6 +41,8 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.collections4.map.LinkedMap;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.stats.PrometheusMetricsTest;
+import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
import org.apache.pulsar.broker.transaction.TransactionTestBase;
import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl;
@@ -200,6 +205,77 @@ public class PendingAckPersistentTest extends TransactionTestBase {
.compareTo((PositionImpl) managedCursor.getManagedLedger().getLastConfirmedEntry()) == -1);
}
+ @Test
+ public void testPendingAckMetrics() throws Exception {
+ final int messageCount = 100;
+ String subName = "testMetric" + UUID.randomUUID();
+
+ @Cleanup
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .topic(PENDING_ACK_REPLAY_TOPIC)
+ .create();
+
+ @Cleanup
+ Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+ .topic(PENDING_ACK_REPLAY_TOPIC)
+ .subscriptionName(subName)
+ .subscriptionType(SubscriptionType.Exclusive)
+ .enableBatchIndexAcknowledgment(true)
+ .subscribe();
+
+ for (int a = 0; a < messageCount; a++) {
+ producer.send(UUID.randomUUID().toString());
+ }
+
+ for (int a = 0; a < messageCount; a++) {
+ Message<String> message = consumer.receive(10, TimeUnit.SECONDS);
+ if (null == message) {
+ break;
+ }
+
+ Transaction txn = pulsarClient.newTransaction()
+ .withTransactionTimeout(10, TimeUnit.SECONDS).build().get();
+ consumer.acknowledgeCumulativeAsync(message.getMessageId(), txn).get();
+ if (a % 2 == 0) {
+ txn.abort().get();
+ } else {
+ txn.commit().get();
+ }
+ }
+
+ @Cleanup
+ ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
+ PrometheusMetricsGenerator.generate(pulsarServiceList.get(0), true, false, false, statsOut);
+ String metricsStr = statsOut.toString();
+ Multimap<String, PrometheusMetricsTest.Metric> metricsMap = PrometheusMetricsTest.parseMetrics(metricsStr);
+
+ Collection<PrometheusMetricsTest.Metric> abortedCount = metricsMap.get("pulsar_txn_tp_aborted_count_total");
+ Collection<PrometheusMetricsTest.Metric> committedCount = metricsMap.get("pulsar_txn_tp_committed_count_total");
+ Collection<PrometheusMetricsTest.Metric> commitLatency = metricsMap.get("pulsar_txn_tp_commit_latency");
+ Assert.assertTrue(commitLatency.size() > 0);
+
+ int count = 0;
+ for (PrometheusMetricsTest.Metric metric : commitLatency) {
+ if (metric.tags.get("topic").endsWith(PENDING_ACK_REPLAY_TOPIC) && metric.value > 0) {
+ count++;
+ }
+ }
+ Assert.assertTrue(count > 0);
+
+ for (PrometheusMetricsTest.Metric metric : abortedCount) {
+ if (metric.tags.get("subscription").equals(subName) && metric.tags.get("status").equals("succeed")) {
+ assertTrue(metric.tags.get("topic").endsWith(PENDING_ACK_REPLAY_TOPIC));
+ assertTrue(metric.value > 0);
+ }
+ }
+ for (PrometheusMetricsTest.Metric metric : committedCount) {
+ if (metric.tags.get("subscription").equals(subName) && metric.tags.get("status").equals("succeed")) {
+ assertTrue(metric.tags.get("topic").endsWith(PENDING_ACK_REPLAY_TOPIC));
+ assertTrue(metric.value > 0);
+ }
+ }
+ }
+
@Test
public void cumulativePendingAckReplayTest() throws Exception {
int messageCount = 1000;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
index f6cdbba5036..3c5ce5f14d6 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
@@ -101,6 +101,10 @@ public class TopicStatsImpl implements TopicStats {
/** record last failed offloaded timestamp. If no failed offload, the value should be 0 */
public long lastOffloadFailureTimeStamp;
+ public long ongoingTxnCount;
+ public long abortedTxnCount;
+ public long committedTxnCount;
+
/** List of connected publishers on this topic w/ their stats. */
@Getter(AccessLevel.NONE)
@Setter(AccessLevel.NONE)
@@ -231,6 +235,9 @@ public class TopicStatsImpl implements TopicStats {
this.nonContiguousDeletedMessagesRanges += stats.nonContiguousDeletedMessagesRanges;
this.nonContiguousDeletedMessagesRangesSerializedSize += stats.nonContiguousDeletedMessagesRangesSerializedSize;
this.delayedMessageIndexSizeInBytes += stats.delayedMessageIndexSizeInBytes;
+ this.ongoingTxnCount = stats.ongoingTxnCount;
+ this.abortedTxnCount = stats.abortedTxnCount;
+ this.committedTxnCount = stats.committedTxnCount;
stats.getPublishers().forEach(s -> {
if (s.isSupportsPartialProducer() && s.getProducerName() != null) {