You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/05/12 13:44:54 UTC
[pulsar] 03/17: Expose pulsar_out_bytes_total and
pulsar_out_messages_total for namespace/subscription/consumer. (#6918)
This is an automated email from the ASF dual-hosted git repository.
zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 9c5644b4b5bac05b8c3f82b7eda89ee750e5c725
Author: lipenghui <pe...@apache.org>
AuthorDate: Sat May 9 03:21:28 2020 +0800
Expose pulsar_out_bytes_total and pulsar_out_messages_total for namespace/subscription/consumer. (#6918)
Fixes #6891
Rated to #5802
Add pulsar_out_bytes_total and pulsar_out_messages_total for namespace/subscription/consumer.
New unit test added.
(cherry picked from commit 204f3271914f5fbdeb7bf0a7ff5a71e77b8cccbf)
---
.../org/apache/pulsar/broker/service/Consumer.java | 9 ++
.../nonpersistent/NonPersistentSubscription.java | 2 +
.../service/nonpersistent/NonPersistentTopic.java | 2 +
.../service/persistent/PersistentSubscription.java | 2 +
.../broker/service/persistent/PersistentTopic.java | 2 +
.../stats/prometheus/AggregatedConsumerStats.java | 4 +
.../stats/prometheus/AggregatedNamespaceStats.java | 10 ++
.../prometheus/AggregatedSubscriptionStats.java | 4 +
.../stats/prometheus/NamespaceStatsAggregator.java | 77 +++++----
.../pulsar/broker/stats/prometheus/TopicStats.java | 8 +
.../pulsar/broker/stats/PrometheusMetricsTest.java | 172 ++++++++++++++++++++-
.../pulsar/common/policies/data/ConsumerStats.java | 8 +
.../common/policies/data/SubscriptionStats.java | 10 ++
.../pulsar/common/policies/data/TopicStats.java | 19 ++-
site2/docs/reference-metrics.md | 2 +
15 files changed, 294 insertions(+), 37 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index c5f5bbf..31227db 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -35,6 +35,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.LongAdder;
import java.util.stream.Collectors;
import org.apache.bookkeeper.mledger.Entry;
@@ -78,6 +79,8 @@ public class Consumer {
private final String consumerName;
private final Rate msgOut;
private final Rate msgRedeliver;
+ private final LongAdder msgOutCounter;
+ private final LongAdder bytesOutCounter;
private long lastConsumedTimestamp;
private long lastAckedTimestamp;
@@ -129,6 +132,8 @@ public class Consumer {
this.cnx = cnx;
this.msgOut = new Rate();
this.msgRedeliver = new Rate();
+ this.bytesOutCounter = new LongAdder();
+ this.msgOutCounter = new LongAdder();
this.appId = appId;
this.authenticationData = cnx.authenticationData;
PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.set(this, 0);
@@ -222,6 +227,8 @@ public class Consumer {
MESSAGE_PERMITS_UPDATER.addAndGet(this, -totalMessages);
incrementUnackedMessages(totalMessages);
msgOut.recordMultipleEvents(totalMessages, totalBytes);
+ msgOutCounter.add(totalMessages);
+ bytesOutCounter.add(totalBytes);
ctx.channel().eventLoop().execute(() -> {
for (int i = 0; i < entries.size(); i++) {
@@ -457,6 +464,8 @@ public class Consumer {
}
public ConsumerStats getStats() {
+ stats.msgOutCounter = msgOutCounter.longValue();
+ stats.bytesOutCounter = bytesOutCounter.longValue();
stats.lastAckedTimestamp = lastAckedTimestamp;
stats.lastConsumedTimestamp = lastConsumedTimestamp;
stats.availablePermits = getAvailablePermits();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
index f653ee5..316024a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
@@ -384,6 +384,8 @@ public class NonPersistentSubscription implements Subscription {
subStats.consumers.add(consumerStats);
subStats.msgRateOut += consumerStats.msgRateOut;
subStats.msgThroughputOut += consumerStats.msgThroughputOut;
+ subStats.bytesOutCounter += consumerStats.bytesOutCounter;
+ subStats.msgOutCounter += consumerStats.msgOutCounter;
subStats.msgRateRedeliver += consumerStats.msgRateRedeliver;
});
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 332abfb..0a99a64 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -778,6 +778,8 @@ public class NonPersistentTopic extends AbstractTopic implements Topic {
stats.msgRateOut += subStats.msgRateOut;
stats.msgThroughputOut += subStats.msgThroughputOut;
+ stats.bytesOutCounter += subStats.bytesOutCounter;
+ stats.msgOutCounter += subStats.msgOutCounter;
stats.getSubscriptions().put(name, subStats);
});
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 5e6a213..25633f9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -946,6 +946,8 @@ public class PersistentSubscription implements Subscription {
subStats.consumers.add(consumerStats);
subStats.msgRateOut += consumerStats.msgRateOut;
subStats.msgThroughputOut += consumerStats.msgThroughputOut;
+ subStats.bytesOutCounter += consumerStats.bytesOutCounter;
+ subStats.msgOutCounter += consumerStats.msgOutCounter;
subStats.msgRateRedeliver += consumerStats.msgRateRedeliver;
subStats.unackedMessages += consumerStats.unackedMessages;
subStats.lastConsumedTimestamp = Math.max(subStats.lastConsumedTimestamp, consumerStats.lastConsumedTimestamp);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index b7470fa..28911bc 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1510,6 +1510,8 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
stats.msgRateOut += subStats.msgRateOut;
stats.msgThroughputOut += subStats.msgThroughputOut;
+ stats.bytesOutCounter += subStats.bytesOutCounter;
+ stats.msgOutCounter += subStats.msgOutCounter;
stats.subscriptions.put(name, subStats);
});
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedConsumerStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedConsumerStats.java
index 0fadf3e..8b6bf7d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedConsumerStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedConsumerStats.java
@@ -31,4 +31,8 @@ public class AggregatedConsumerStats {
public double msgThroughputOut;
public long availablePermits;
+
+ long msgOutCounter;
+
+ long bytesOutCounter;
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
index ea05ed0..1100523 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
@@ -34,6 +34,11 @@ public class AggregatedNamespaceStats {
public double throughputIn;
public double throughputOut;
+ public long bytesInCounter;
+ public long msgInCounter;
+ public long bytesOutCounter;
+ public long msgOutCounter;
+
public long storageSize;
public long msgBacklog;
public long msgDelayed;
@@ -65,6 +70,11 @@ public class AggregatedNamespaceStats {
throughputIn += stats.throughputIn;
throughputOut += stats.throughputOut;
+ bytesInCounter += stats.bytesInCounter;
+ msgInCounter += stats.msgInCounter;
+ bytesOutCounter += stats.bytesOutCounter;
+ msgOutCounter += stats.msgOutCounter;
+
storageSize += stats.storageSize;
backlogSize += stats.backlogSize;
offloadedStorageUsed += stats.offloadedStorageUsed;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
index d5b5353..1f1e879 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
@@ -41,5 +41,9 @@ public class AggregatedSubscriptionStats {
public long msgDelayed;
+ long msgOutCounter;
+
+ long bytesOutCounter;
+
public Map<Consumer, AggregatedConsumerStats> consumerStat = new HashMap<>();
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
index a7b35b8..f032ea1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
@@ -27,6 +27,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.policies.data.ReplicatorStats;
import org.apache.pulsar.common.util.SimpleTextOutputStream;
@@ -104,8 +105,11 @@ public class NamespaceStatsAggregator {
stats.storageReadRate = mlStats.getReadEntriesRate();
}
- stats.msgInCounter = topic.getStats(getPreciseBacklog).msgInCounter;
- stats.bytesInCounter = topic.getStats(getPreciseBacklog).bytesInCounter;
+ org.apache.pulsar.common.policies.data.TopicStats tStatus = topic.getStats(getPreciseBacklog);
+ stats.msgInCounter = tStatus.msgInCounter;
+ stats.bytesInCounter = tStatus.bytesInCounter;
+ stats.msgOutCounter = tStatus.msgOutCounter;
+ stats.bytesOutCounter = tStatus.bytesOutCounter;
stats.producersCount = 0;
topic.getProducers().values().forEach(producer -> {
@@ -123,43 +127,53 @@ public class NamespaceStatsAggregator {
}
});
- topic.getSubscriptions().forEach((name, subscription) -> {
+ tStatus.subscriptions.forEach((subName, subscriptionStats) -> {
stats.subscriptionsCount++;
- stats.msgBacklog += subscription.getNumberOfEntriesInBacklog(getPreciseBacklog);
+ stats.msgBacklog += subscriptionStats.msgBacklog;
AggregatedSubscriptionStats subsStats = stats.subscriptionStats
- .computeIfAbsent(name, k -> new AggregatedSubscriptionStats());
- subsStats.msgBacklog = subscription.getNumberOfEntriesInBacklog(getPreciseBacklog);
- subsStats.msgDelayed = subscription.getNumberOfEntriesDelayed();
+ .computeIfAbsent(subName, k -> new AggregatedSubscriptionStats());
+ subsStats.msgBacklog = subscriptionStats.msgBacklog;
+ subsStats.msgDelayed = subscriptionStats.msgDelayed;
subsStats.msgBacklogNoDelayed = subsStats.msgBacklog - subsStats.msgDelayed;
+ stats.rateOut += subsStats.msgRateOut;
+ stats.throughputOut += subsStats.msgThroughputOut;
+ subscriptionStats.consumers.forEach(cStats -> {
+ stats.consumersCount++;
+ subsStats.unackedMessages += cStats.unackedMessages;
+ subsStats.msgRateRedeliver += cStats.msgRateRedeliver;
+ subsStats.msgRateOut += cStats.msgRateOut;
+ subsStats.msgThroughputOut += cStats.msgThroughputOut;
+ subsStats.bytesOutCounter += cStats.bytesOutCounter;
+ subsStats.msgOutCounter += cStats.msgOutCounter;
+ if (!subsStats.blockedSubscriptionOnUnackedMsgs && cStats.blockedConsumerOnUnackedMsgs) {
+ subsStats.blockedSubscriptionOnUnackedMsgs = true;
+ }
+ });
+ });
- subscription.getConsumers().forEach(consumer -> {
+ // Consumer stats can be a lot if a subscription has many consumers
+ if (includeConsumerMetrics) {
+ topic.getSubscriptions().forEach((name, subscription) -> {
+ AggregatedSubscriptionStats subsStats = stats.subscriptionStats
+ .computeIfAbsent(name, k -> new AggregatedSubscriptionStats());
+ subscription.getConsumers().forEach(consumer -> {
+ ConsumerStats conStats = consumer.getStats();
- // Consumer stats can be a lot if a subscription has many consumers
- if (includeConsumerMetrics) {
AggregatedConsumerStats consumerStats = subsStats.consumerStat
.computeIfAbsent(consumer, k -> new AggregatedConsumerStats());
- consumerStats.unackedMessages = consumer.getStats().unackedMessages;
- consumerStats.msgRateRedeliver = consumer.getStats().msgRateRedeliver;
- consumerStats.msgRateOut = consumer.getStats().msgRateOut;
- consumerStats.msgThroughputOut = consumer.getStats().msgThroughputOut;
- consumerStats.availablePermits = consumer.getStats().availablePermits;
- consumerStats.blockedSubscriptionOnUnackedMsgs = consumer.getStats().blockedConsumerOnUnackedMsgs;
- }
-
- subsStats.unackedMessages += consumer.getStats().unackedMessages;
- subsStats.msgRateRedeliver += consumer.getStats().msgRateRedeliver;
- subsStats.msgRateOut += consumer.getStats().msgRateOut;
- subsStats.msgThroughputOut += consumer.getStats().msgThroughputOut;
- if (!subsStats.blockedSubscriptionOnUnackedMsgs && consumer.getStats().blockedConsumerOnUnackedMsgs) {
- subsStats.blockedSubscriptionOnUnackedMsgs = true;
- }
- stats.consumersCount++;
- stats.rateOut += consumer.getStats().msgRateOut;
- stats.throughputOut += consumer.getStats().msgThroughputOut;
+ consumerStats.unackedMessages = conStats.unackedMessages;
+ consumerStats.msgRateRedeliver = conStats.msgRateRedeliver;
+ consumerStats.msgRateOut = conStats.msgRateOut;
+ consumerStats.msgThroughputOut = conStats.msgThroughputOut;
+ consumerStats.bytesOutCounter = conStats.bytesOutCounter;
+ consumerStats.msgOutCounter = conStats.msgOutCounter;
+ consumerStats.availablePermits = conStats.availablePermits;
+ consumerStats.blockedSubscriptionOnUnackedMsgs = conStats.blockedConsumerOnUnackedMsgs;
+ });
});
- });
+ }
topic.getReplicators().forEach((cluster, replicator) -> {
AggregatedReplicationStats aggReplStats = stats.replicationStats.computeIfAbsent(cluster,
@@ -206,6 +220,11 @@ public class NamespaceStatsAggregator {
metric(stream, cluster, namespace, "pulsar_throughput_in", stats.throughputIn);
metric(stream, cluster, namespace, "pulsar_throughput_out", stats.throughputOut);
+ metric(stream, cluster, namespace, "pulsar_in_bytes_total", stats.bytesInCounter);
+ metric(stream, cluster, namespace, "pulsar_in_messages_total", stats.msgInCounter);
+ metric(stream, cluster, namespace, "pulsar_out_bytes_total", stats.bytesOutCounter);
+ metric(stream, cluster, namespace, "pulsar_out_messages_total", stats.msgOutCounter);
+
metric(stream, cluster, namespace, "pulsar_storage_size", stats.storageSize);
metric(stream, cluster, namespace, "pulsar_storage_backlog_size", stats.backlogSize);
metric(stream, cluster, namespace, "pulsar_storage_offloaded_size", stats.offloadedStorageUsed);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
index caf0ce8..0510d34 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
@@ -36,6 +36,8 @@ class TopicStats {
double throughputOut;
long msgInCounter;
long bytesInCounter;
+ long msgOutCounter;
+ long bytesOutCounter;
long storageSize;
public long msgBacklog;
@@ -67,6 +69,8 @@ class TopicStats {
throughputOut = 0;
bytesInCounter = 0;
msgInCounter = 0;
+ bytesOutCounter = 0;
+ msgOutCounter = 0;
storageSize = 0;
msgBacklog = 0;
@@ -141,6 +145,8 @@ class TopicStats {
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_blocked_on_unacked_messages", subsStats.blockedSubscriptionOnUnackedMsgs ? 1 : 0);
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_rate_out", subsStats.msgRateOut);
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_throughput_out", subsStats.msgThroughputOut);
+ metric(stream, cluster, namespace, topic, n, "pulsar_out_bytes_total", subsStats.bytesOutCounter);
+ metric(stream, cluster, namespace, topic, n, "pulsar_out_messages_total", subsStats.msgOutCounter);
subsStats.consumerStat.forEach((c, consumerStats) -> {
metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(), "pulsar_consumer_msg_rate_redeliver", consumerStats.msgRateRedeliver);
metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(), "pulsar_consumer_unacked_messages", consumerStats.unackedMessages);
@@ -148,6 +154,8 @@ class TopicStats {
metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(), "pulsar_consumer_msg_rate_out", consumerStats.msgRateOut);
metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(), "pulsar_consumer_msg_throughput_out", consumerStats.msgThroughputOut);
metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(), "pulsar_consumer_available_permits", consumerStats.availablePermits);
+ metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(), "pulsar_out_bytes_total", consumerStats.bytesOutCounter);
+ metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(), "pulsar_out_messages_total", consumerStats.msgOutCounter);
});
});
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index e569caf..92abb5c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -34,9 +34,12 @@ import java.util.regex.Pattern;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
+import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import com.google.common.base.MoreObjects;
@@ -45,13 +48,14 @@ import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
public class PrometheusMetricsTest extends BrokerTestBase {
- @BeforeClass
+
+ @BeforeMethod
@Override
protected void setup() throws Exception {
super.baseSetup();
}
- @AfterClass
+ @AfterMethod
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
@@ -61,12 +65,30 @@ public class PrometheusMetricsTest extends BrokerTestBase {
public void testPerTopicStats() throws Exception {
Producer<byte[]> p1 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();
Producer<byte[]> p2 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic2").create();
- for (int i = 0; i < 10; i++) {
+
+ Consumer<byte[]> c1 = pulsarClient.newConsumer()
+ .topic("persistent://my-property/use/my-ns/my-topic1")
+ .subscriptionName("test")
+ .subscribe();
+
+ Consumer<byte[]> c2 = pulsarClient.newConsumer()
+ .topic("persistent://my-property/use/my-ns/my-topic2")
+ .subscriptionName("test")
+ .subscribe();
+
+ final int messages = 10;
+
+ for (int i = 0; i < messages; i++) {
String message = "my-message-" + i;
p1.send(message.getBytes());
p2.send(message.getBytes());
}
+ for (int i = 0; i < messages; i++) {
+ c1.acknowledge(c1.receive());
+ c2.acknowledge(c2.receive());
+ }
+
ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
PrometheusMetricsGenerator.generate(pulsar, true, false, statsOut);
String metricsStr = new String(statsOut.toByteArray());
@@ -109,20 +131,58 @@ public class PrometheusMetricsTest extends BrokerTestBase {
assertEquals(cm.get(1).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns");
+ cm = (List<Metric>) metrics.get("pulsar_out_bytes_total");
+ assertEquals(cm.size(), 2);
+ assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic2");
+ assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
+ assertEquals(cm.get(0).tags.get("subscription"), "test");
+ assertEquals(cm.get(1).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
+ assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns");
+ assertEquals(cm.get(1).tags.get("subscription"), "test");
+
+ cm = (List<Metric>) metrics.get("pulsar_out_messages_total");
+ assertEquals(cm.size(), 2);
+ assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic2");
+ assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
+ assertEquals(cm.get(0).tags.get("subscription"), "test");
+ assertEquals(cm.get(1).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
+ assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns");
+ assertEquals(cm.get(1).tags.get("subscription"), "test");
+
p1.close();
p2.close();
+ c1.close();
+ c2.close();
}
@Test
public void testPerNamespaceStats() throws Exception {
Producer<byte[]> p1 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();
Producer<byte[]> p2 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic2").create();
- for (int i = 0; i < 10; i++) {
+
+ Consumer<byte[]> c1 = pulsarClient.newConsumer()
+ .topic("persistent://my-property/use/my-ns/my-topic1")
+ .subscriptionName("test")
+ .subscribe();
+
+ Consumer<byte[]> c2 = pulsarClient.newConsumer()
+ .topic("persistent://my-property/use/my-ns/my-topic2")
+ .subscriptionName("test")
+ .subscribe();
+
+ final int messages = 10;
+
+ for (int i = 0; i < messages; i++) {
String message = "my-message-" + i;
p1.send(message.getBytes());
p2.send(message.getBytes());
}
+ for (int i = 0; i < messages; i++) {
+ c1.acknowledge(c1.receive());
+ c2.acknowledge(c2.receive());
+ }
+
ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
PrometheusMetricsGenerator.generate(pulsar, false, false, statsOut);
String metricsStr = new String(statsOut.toByteArray());
@@ -141,12 +201,114 @@ public class PrometheusMetricsTest extends BrokerTestBase {
cm = (List<Metric>) metrics.get("pulsar_producers_count");
assertEquals(cm.size(), 2);
- assertEquals(cm.get(1).value, 2.0);
assertNull(cm.get(1).tags.get("topic"));
assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns");
+ cm = (List<Metric>) metrics.get("pulsar_in_bytes_total");
+ assertEquals(cm.size(), 1);
+ assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
+
+ cm = (List<Metric>) metrics.get("pulsar_in_messages_total");
+ assertEquals(cm.size(), 1);
+ assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
+
+ cm = (List<Metric>) metrics.get("pulsar_out_bytes_total");
+ assertEquals(cm.size(), 1);
+ assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
+
+ cm = (List<Metric>) metrics.get("pulsar_out_messages_total");
+ assertEquals(cm.size(), 1);
+ assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
+
+ p1.close();
+ p2.close();
+ c1.close();
+ c2.close();
+ }
+
+ @Test
+ public void testPerConsumerStats() throws Exception {
+ Producer<byte[]> p1 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();
+ Producer<byte[]> p2 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic2").create();
+
+ Consumer<byte[]> c1 = pulsarClient.newConsumer()
+ .topic("persistent://my-property/use/my-ns/my-topic1")
+ .subscriptionName("test")
+ .subscribe();
+
+ Consumer<byte[]> c2 = pulsarClient.newConsumer()
+ .topic("persistent://my-property/use/my-ns/my-topic2")
+ .subscriptionName("test")
+ .subscribe();
+
+ final int messages = 10;
+
+ for (int i = 0; i < messages; i++) {
+ String message = "my-message-" + i;
+ p1.send(message.getBytes());
+ p2.send(message.getBytes());
+ }
+
+ for (int i = 0; i < messages; i++) {
+ c1.acknowledge(c1.receive());
+ c2.acknowledge(c2.receive());
+ }
+
+ ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
+ PrometheusMetricsGenerator.generate(pulsar, true, true, statsOut);
+ String metricsStr = new String(statsOut.toByteArray());
+
+ Multimap<String, Metric> metrics = parseMetrics(metricsStr);
+
+ metrics.entries().forEach(e -> {
+ System.out.println(e.getKey() + ": " + e.getValue());
+ });
+
+ // There should be 1 metric aggregated per namespace
+ List<Metric> cm = (List<Metric>) metrics.get("pulsar_out_bytes_total");
+ assertEquals(cm.size(), 4);
+ assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
+ assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic2");
+ assertEquals(cm.get(0).tags.get("subscription"), "test");
+
+ assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns");
+ assertEquals(cm.get(1).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic2");
+ assertEquals(cm.get(1).tags.get("subscription"), "test");
+ assertEquals(cm.get(1).tags.get("consumer_id"), "1");
+
+ assertEquals(cm.get(2).tags.get("namespace"), "my-property/use/my-ns");
+ assertEquals(cm.get(2).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
+ assertEquals(cm.get(2).tags.get("subscription"), "test");
+
+ assertEquals(cm.get(3).tags.get("namespace"), "my-property/use/my-ns");
+ assertEquals(cm.get(3).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
+ assertEquals(cm.get(3).tags.get("subscription"), "test");
+ assertEquals(cm.get(3).tags.get("consumer_id"), "0");
+
+ cm = (List<Metric>) metrics.get("pulsar_out_messages_total");
+ assertEquals(cm.size(), 4);
+ assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
+ assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic2");
+ assertEquals(cm.get(0).tags.get("subscription"), "test");
+
+ assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns");
+ assertEquals(cm.get(1).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic2");
+ assertEquals(cm.get(1).tags.get("subscription"), "test");
+ assertEquals(cm.get(1).tags.get("consumer_id"), "1");
+
+ assertEquals(cm.get(2).tags.get("namespace"), "my-property/use/my-ns");
+ assertEquals(cm.get(2).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
+ assertEquals(cm.get(2).tags.get("subscription"), "test");
+
+ assertEquals(cm.get(3).tags.get("namespace"), "my-property/use/my-ns");
+ assertEquals(cm.get(3).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
+ assertEquals(cm.get(3).tags.get("subscription"), "test");
+ assertEquals(cm.get(3).tags.get("consumer_id"), "0");
+
p1.close();
p2.close();
+ c1.close();
+ c2.close();
}
/** Checks for duplicate type definitions for a metric in the Prometheus metrics output. If the Prometheus parser
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java
index 7411f03..0ecb944 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java
@@ -32,6 +32,12 @@ public class ConsumerStats {
/** Total throughput delivered to the consumer (bytes/s). */
public double msgThroughputOut;
+ /** Total bytes delivered to consumer (bytes). */
+ public long bytesOutCounter;
+
+ /** Total messages delivered to consumer (msg). */
+ public long msgOutCounter;
+
/** Total rate of messages redelivered by this consumer (msg/s). */
public double msgRateRedeliver;
@@ -75,6 +81,8 @@ public class ConsumerStats {
checkNotNull(stats);
this.msgRateOut += stats.msgRateOut;
this.msgThroughputOut += stats.msgThroughputOut;
+ this.bytesOutCounter += stats.bytesOutCounter;
+ this.msgOutCounter += stats.msgOutCounter;
this.msgRateRedeliver += stats.msgRateRedeliver;
this.availablePermits += stats.availablePermits;
this.unackedMessages += stats.unackedMessages;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
index 7064883..df8fc72 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
@@ -34,6 +34,12 @@ public class SubscriptionStats {
/** Total throughput delivered on this subscription (bytes/s). */
public double msgThroughputOut;
+ /** Total bytes delivered to consumer (bytes). */
+ public long bytesOutCounter;
+
+ /** Total messages delivered to consumer (msg). */
+ public long msgOutCounter;
+
/** Total rate of messages redelivered on this subscription (msg/s). */
public double msgRateRedeliver;
@@ -86,6 +92,8 @@ public class SubscriptionStats {
public void reset() {
msgRateOut = 0;
msgThroughputOut = 0;
+ bytesOutCounter = 0;
+ msgOutCounter = 0;
msgRateRedeliver = 0;
msgBacklog = 0;
msgBacklogNoDelayed = 0;
@@ -101,6 +109,8 @@ public class SubscriptionStats {
checkNotNull(stats);
this.msgRateOut += stats.msgRateOut;
this.msgThroughputOut += stats.msgThroughputOut;
+ this.bytesOutCounter += stats.bytesOutCounter;
+ this.msgOutCounter += stats.msgOutCounter;
this.msgRateRedeliver += stats.msgRateRedeliver;
this.msgBacklog += stats.msgBacklog;
this.msgBacklogNoDelayed += stats.msgBacklogNoDelayed;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java
index 9962fa9..be471e3 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java
@@ -43,6 +43,18 @@ public class TopicStats {
/** Total throughput of messages dispatched for the topic (byte/s). */
public double msgThroughputOut;
+ /** Total bytes published to the topic (bytes). */
+ public long bytesInCounter;
+
+ /** Total messages published to the topic (msg). */
+ public long msgInCounter;
+
+ /** Total bytes delivered to consumer (bytes). */
+ public long bytesOutCounter;
+
+ /** Total messages delivered to consumer (msg). */
+ public long msgOutCounter;
+
/** Average size of published messages (bytes). */
public double averageMsgSize;
@@ -63,9 +75,6 @@ public class TopicStats {
public String deduplicationStatus;
- public long bytesInCounter;
- public long msgInCounter;
-
public TopicStats() {
this.publishers = Lists.newArrayList();
this.subscriptions = Maps.newHashMap();
@@ -83,6 +92,8 @@ public class TopicStats {
this.backlogSize = 0;
this.bytesInCounter = 0;
this.msgInCounter = 0;
+ this.bytesOutCounter = 0;
+ this.msgOutCounter = 0;
this.publishers.clear();
this.subscriptions.clear();
this.replication.clear();
@@ -100,6 +111,8 @@ public class TopicStats {
this.msgThroughputOut += stats.msgThroughputOut;
this.bytesInCounter += stats.bytesInCounter;
this.msgInCounter += stats.msgInCounter;
+ this.bytesOutCounter += stats.bytesOutCounter;
+ this.msgOutCounter += stats.msgOutCounter;
double newAverageMsgSize = (this.averageMsgSize * (this.count - 1) + stats.averageMsgSize) / this.count;
this.averageMsgSize = newAverageMsgSize;
this.storageSize += stats.storageSize;
diff --git a/site2/docs/reference-metrics.md b/site2/docs/reference-metrics.md
index 3983dcd..b57406c 100644
--- a/site2/docs/reference-metrics.md
+++ b/site2/docs/reference-metrics.md
@@ -174,6 +174,8 @@ All the topic metrics are labelled with the following labels:
| pulsar_entry_size_le_* | Histogram | The entry rate of a topic that the entry size is smaller with a given threshold.<br> Available thresholds: <br><ul><li>pulsar_entry_size_le_128: <= 128 bytes </li><li>pulsar_entry_size_le_512: <= 512 bytes</li><li>pulsar_entry_size_le_1_kb: <= 1 KB</li><li>pulsar_entry_size_le_2_kb: <= 2 KB</li><li>pulsar_entry_size_le_4_kb: <= 4 KB</li><li>pulsar_entry_size_le_16_kb: <= 16 KB</li><li>pulsar_entry_size_le_100_kb: <= 100 KB</li><li>pulsar_entry_size_ [...]
| pulsar_in_bytes_total | Counter | The total number of bytes received for this topic |
| pulsar_producers_count | Counter | The total number of messages received for this topic |
+| pulsar_out_bytes_total | Counter | The total number of bytes read from this topic |
+| pulsar_out_messages_total | Counter | The total number of messages read from this topic |
#### Replication metrics