You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/05/13 12:22:31 UTC

[GitHub] [pulsar] eolivelli commented on a diff in pull request #15558: [fix][broker/functions-worker] Ensure prometheus metrics are grouped by type (#8407, #13865)

eolivelli commented on code in PR #15558:
URL: https://github.com/apache/pulsar/pull/15558#discussion_r872336085


##########
conf/standalone.conf:
##########
@@ -1099,4 +1099,4 @@ configurationStoreServers=
 # than this limit then broker will persist unacked ranges into bookkeeper to avoid additional data overhead into
 # zookeeper.
 # Deprecated: use managedLedgerMaxUnackedRangesToPersistInMetadataStore
-managedLedgerMaxUnackedRangesToPersistInZooKeeper=-1
\ No newline at end of file
+managedLedgerMaxUnackedRangesToPersistInZooKeeper=-1

Review Comment:
   nit: please revert unnecessary changes



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java:
##########
@@ -106,269 +112,270 @@ static void resetTypes() {
         metricWithTypeDefinition.clear();
     }
 
-    static void printTopicStats(SimpleTextOutputStream stream, String cluster, String namespace, String topic,
+    static void printTopicStats(Map<String, List<String>> metrics, String cluster, String namespace, String topic,

Review Comment:
   it looks like this change is not only about the Function Worker.
   Please update the title



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java:
##########
@@ -382,77 +389,102 @@ static void metricType(SimpleTextOutputStream stream, String name) {
 
     }
 
-    private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic,
-            String name, double value, boolean splitTopicAndPartitionIndexLabel) {
-        metricType(stream, name);
-        appendRequiredLabels(stream, cluster, namespace, topic, name, splitTopicAndPartitionIndexLabel).write("\"} ");
-        stream.write(value);
-        appendEndings(stream);
+    private static void metric(Map<String, List<String>> metrics, String cluster, String namespace, String topic,
+                               String name, double value, boolean splitTopicAndPartitionIndexLabel) {
+        writeMetric(metrics, name, stream -> {
+            writeRequiredLabels(stream, cluster, namespace, topic, name, splitTopicAndPartitionIndexLabel);
+            stream.write("\"} ")
+                    .write(value);
+            writeEndings(stream);
+        });
     }
 
-    private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic,
-           String subscription, String name, long value, boolean splitTopicAndPartitionIndexLabel) {
-        metricType(stream, name);
-        appendRequiredLabels(stream, cluster, namespace, topic, name, splitTopicAndPartitionIndexLabel)
-                .write("\",subscription=\"").write(subscription).write("\"} ");
-        stream.write(value);
-        appendEndings(stream);
+    private static void metric(Map<String, List<String>> metrics, String cluster, String namespace, String topic,
+                               String subscription, String name, long value, boolean splitTopicAndPartitionIndexLabel) {
+        writeMetric(metrics, name, stream -> {
+            writeRequiredLabels(stream, cluster, namespace, topic, name, splitTopicAndPartitionIndexLabel);
+            stream.write("\",subscription=\"").write(subscription)
+                    .write("\"} ")
+                    .write(value);
+            writeEndings(stream);
+        });
+    }
+
+    private static void metric(Map<String, List<String>> metrics, String cluster, String namespace, String topic,
+                               String producerName, long produceId, String name, double value,
+                               boolean splitTopicAndPartitionIndexLabel) {
+        writeMetric(metrics, name, stream -> {
+            writeRequiredLabels(stream, cluster, namespace, topic, name, splitTopicAndPartitionIndexLabel);
+            stream.write("\",producer_name=\"").write(producerName)
+                    .write("\",producer_id=\"").write(produceId)
+                    .write("\"} ")
+                    .write(value);
+            writeEndings(stream);
+        });
     }
 
-    private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic,
-            String producerName, long produceId, String name, double value, boolean splitTopicAndPartitionIndexLabel) {
-        metricType(stream, name);
-        appendRequiredLabels(stream, cluster, namespace, topic, name, splitTopicAndPartitionIndexLabel)
-                .write("\",producer_name=\"").write(producerName)
-                .write("\",producer_id=\"").write(produceId).write("\"} ");
-        stream.write(value);
-        appendEndings(stream);
+    private static void metric(Map<String, List<String>> metrics, String cluster, String namespace, String topic,
+                               String subscription, String name, double value,
+                               boolean splitTopicAndPartitionIndexLabel) {
+        writeMetric(metrics, name, stream -> {
+            writeRequiredLabels(stream, cluster, namespace, topic, name, splitTopicAndPartitionIndexLabel);
+            stream.write("\",subscription=\"").write(subscription)
+                    .write("\"} ")
+                    .write(value);
+            writeEndings(stream);
+        });
     }
 
-    private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic,
-            String subscription, String name, double value, boolean splitTopicAndPartitionIndexLabel) {
-        metricType(stream, name);
-        appendRequiredLabels(stream, cluster, namespace, topic, name, splitTopicAndPartitionIndexLabel)
-                .write("\",subscription=\"").write(subscription).write("\"} ");
-        stream.write(value);
-        appendEndings(stream);
+    private static void metric(Map<String, List<String>> metrics, String cluster, String namespace, String topic,
+                               String subscription, String consumerName, long consumerId, String name, long value,
+                               boolean splitTopicAndPartitionIndexLabel) {
+        writeMetric(metrics, name, stream -> {
+            writeRequiredLabels(stream, cluster, namespace, topic, name, splitTopicAndPartitionIndexLabel);
+            stream.write("\",subscription=\"").write(subscription)
+                    .write("\",consumer_name=\"").write(consumerName).write("\",consumer_id=\"").write(consumerId)
+                    .write("\"} ")
+                    .write(value);
+            writeEndings(stream);
+        });
     }
 
-    private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic,
-            String subscription, String consumerName, long consumerId, String name, long value,
-            boolean splitTopicAndPartitionIndexLabel) {
-        metricType(stream, name);
-        appendRequiredLabels(stream, cluster, namespace, topic, name, splitTopicAndPartitionIndexLabel)
-                .write("\",subscription=\"").write(subscription)
-                .write("\",consumer_name=\"").write(consumerName).write("\",consumer_id=\"").write(consumerId)
-                .write("\"} ");
-        stream.write(value);
-        appendEndings(stream);
+    private static void metric(Map<String, List<String>> metrics, String cluster, String namespace, String topic,
+                               String subscription, String consumerName, long consumerId, String name, double value,
+                               boolean splitTopicAndPartitionIndexLabel) {
+        writeMetric(metrics, name, stream -> {
+            writeRequiredLabels(stream, cluster, namespace, topic, name, splitTopicAndPartitionIndexLabel);
+            stream.write("\",subscription=\"").write(subscription)
+                    .write("\",consumer_name=\"").write(consumerName).write("\",consumer_id=\"")
+                    .write(consumerId).write("\"} ")
+                    .write(value);
+            writeEndings(stream);
+        });
     }
 
-    private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic,
-            String subscription, String consumerName, long consumerId, String name, double value,
-            boolean splitTopicAndPartitionIndexLabel) {
-        metricType(stream, name);
-        appendRequiredLabels(stream, cluster, namespace, topic, name, splitTopicAndPartitionIndexLabel)
-                .write("\",subscription=\"").write(subscription)
-                .write("\",consumer_name=\"").write(consumerName).write("\",consumer_id=\"")
-                .write(consumerId).write("\"} ");
-        stream.write(value);
-        appendEndings(stream);
+    private static void metricWithRemoteCluster(Map<String, List<String>> metrics, String cluster, String namespace,
+                                                String topic, String name, String remoteCluster, double value,
+                                                boolean splitTopicAndPartitionIndexLabel) {
+        writeMetric(metrics, name, stream -> {
+            writeRequiredLabels(stream, cluster, namespace, topic, name, splitTopicAndPartitionIndexLabel);
+            stream.write("\",remote_cluster=\"").write(remoteCluster).write("\"} ").write(value);
+            writeEndings(stream);
+        });
     }
 
-    private static void metricWithRemoteCluster(SimpleTextOutputStream stream, String cluster, String namespace,
-            String topic, String name, String remoteCluster, double value, boolean splitTopicAndPartitionIndexLabel) {
-        metricType(stream, name);
-        appendRequiredLabels(stream, cluster, namespace, topic, name, splitTopicAndPartitionIndexLabel)
-                .write("\",remote_cluster=\"").write(remoteCluster).write("\"} ");
-        stream.write(value);
-        appendEndings(stream);
+    private static void writeMetric(Map<String, List<String>> metrics, String name,
+                                    Consumer<SimpleTextOutputStream> metricWriter) {
+        ByteBuf buf = ByteBufAllocator.DEFAULT.heapBuffer();

Review Comment:
   it looks like we are doing more memory allocations (even if you call 'buf.release()') 
   is it possible to move back to creating the SimpleTextOutputStream only once ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org