You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/10/18 18:06:36 UTC

[pulsar] branch master updated: Include replication when reporting topic stats to Prometheus (#2789)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 3bde91d  Include replication when reporting topic stats to Prometheus (#2789)
3bde91d is described below

commit 3bde91d6f8f241ec5d0618e3134b4de2cc54797c
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu Oct 18 11:06:31 2018 -0700

    Include replication when reporting topic stats to Prometheus (#2789)
---
 .../pulsar/broker/stats/prometheus/TopicStats.java | 25 ++++++++++++++++++++++
 1 file changed, 25 insertions(+)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
index 5968b8c..3549820 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
@@ -126,6 +126,22 @@ class TopicStats {
             });
         });
 
+        if (!stats.replicationStats.isEmpty()) {
+            stats.replicationStats.forEach((remoteCluster, replStats) -> {
+                metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_rate_in", remoteCluster,
+                        replStats.msgRateIn);
+                metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_rate_out", remoteCluster,
+                        replStats.msgRateOut);
+                metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_throughput_in",
+                        remoteCluster,
+                        replStats.msgThroughputIn);
+                metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_throughput_out",
+                        remoteCluster,
+                        replStats.msgThroughputOut);
+                metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_backlog", remoteCluster,
+                        replStats.replicationBacklog);
+            });
+        }
     }
 
     static void metricType(SimpleTextOutputStream stream, String name) {
@@ -173,4 +189,13 @@ class TopicStats {
                 .write("\",consumer_name=\"").write(consumerName).write("\",consumer_id=\"").write(consumerId).write("\"} ");
         stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
     }
+
+    private static void metricWithRemoteCluster(SimpleTextOutputStream stream, String cluster, String namespace,
+            String topic,
+            String name, String remoteCluster, double value) {
+        metricType(stream, name);
+        stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace);
+        stream.write("\",topic=\"").write(topic).write("remote_cluster=\"").write(remoteCluster).write("\"} ");
+        stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
+    }
 }