You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/08/25 18:18:24 UTC

[pulsar-client-go] branch master updated: Fix missing metrics for topics by registration of existing collector (#600)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 1a3ad70  Fix missing metrics for topics by registration of existing collector (#600)
1a3ad70 is described below

commit 1a3ad70403aa04feda83edcfd984e25cef2dcfe6
Author: pkutilina <77...@users.noreply.github.com>
AuthorDate: Wed Aug 25 20:18:18 2021 +0200

    Fix missing metrics for topics by registration of existing collector (#600)
---
 pulsar/internal/metrics.go | 213 ++++++++++++++++++++++++++++++++++++++-------
 1 file changed, 180 insertions(+), 33 deletions(-)

diff --git a/pulsar/internal/metrics.go b/pulsar/internal/metrics.go
index d44522d..6902ca9 100644
--- a/pulsar/internal/metrics.go
+++ b/pulsar/internal/metrics.go
@@ -282,39 +282,186 @@ func NewMetricsProvider(userDefinedLabels map[string]string) *Metrics {
 		}),
 	}
 
-	prometheus.DefaultRegisterer.Register(metrics.messagesPublished)
-	prometheus.DefaultRegisterer.Register(metrics.bytesPublished)
-	prometheus.DefaultRegisterer.Register(metrics.messagesPending)
-	prometheus.DefaultRegisterer.Register(metrics.bytesPending)
-	prometheus.DefaultRegisterer.Register(metrics.publishErrors)
-	prometheus.DefaultRegisterer.Register(metrics.publishLatency)
-	prometheus.DefaultRegisterer.Register(metrics.publishRPCLatency)
-
-	prometheus.DefaultRegisterer.Register(metrics.messagesReceived)
-	prometheus.DefaultRegisterer.Register(metrics.bytesReceived)
-	prometheus.DefaultRegisterer.Register(metrics.prefetchedMessages)
-	prometheus.DefaultRegisterer.Register(metrics.prefetchedBytes)
-	prometheus.DefaultRegisterer.Register(metrics.acksCounter)
-	prometheus.DefaultRegisterer.Register(metrics.nacksCounter)
-	prometheus.DefaultRegisterer.Register(metrics.dlqCounter)
-	prometheus.DefaultRegisterer.Register(metrics.processingTime)
-
-	prometheus.DefaultRegisterer.Register(metrics.producersOpened)
-	prometheus.DefaultRegisterer.Register(metrics.producersClosed)
-	prometheus.DefaultRegisterer.Register(metrics.producersPartitions)
-	prometheus.DefaultRegisterer.Register(metrics.consumersOpened)
-	prometheus.DefaultRegisterer.Register(metrics.consumersClosed)
-	prometheus.DefaultRegisterer.Register(metrics.consumersPartitions)
-	prometheus.DefaultRegisterer.Register(metrics.readersOpened)
-	prometheus.DefaultRegisterer.Register(metrics.readersClosed)
-
-	prometheus.DefaultRegisterer.Register(metrics.ConnectionsOpened)
-	prometheus.DefaultRegisterer.Register(metrics.ConnectionsClosed)
-	prometheus.DefaultRegisterer.Register(metrics.ConnectionsEstablishmentErrors)
-	prometheus.DefaultRegisterer.Register(metrics.ConnectionsHandshakeErrors)
-	prometheus.DefaultRegisterer.Register(metrics.LookupRequestsCount)
-	prometheus.DefaultRegisterer.Register(metrics.PartitionedTopicMetadataRequestsCount)
-	prometheus.DefaultRegisterer.Register(metrics.RPCRequestCount)
+	err := prometheus.DefaultRegisterer.Register(metrics.messagesPublished)
+	if err != nil {
+		if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+			metrics.messagesPublished = are.ExistingCollector.(*prometheus.CounterVec)
+		}
+	}
+	err = prometheus.DefaultRegisterer.Register(metrics.bytesPublished)
+	if err != nil {
+		if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+			metrics.bytesPublished = are.ExistingCollector.(*prometheus.CounterVec)
+		}
+	}
+	err = prometheus.DefaultRegisterer.Register(metrics.messagesPending)
+	if err != nil {
+		if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+			metrics.messagesPending = are.ExistingCollector.(*prometheus.GaugeVec)
+		}
+	}
+	err = prometheus.DefaultRegisterer.Register(metrics.bytesPending)
+	if err != nil {
+		if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+			metrics.bytesPending = are.ExistingCollector.(*prometheus.GaugeVec)
+		}
+	}
+	err = prometheus.DefaultRegisterer.Register(metrics.publishErrors)
+	if err != nil {
+		if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+			metrics.publishErrors = are.ExistingCollector.(*prometheus.CounterVec)
+		}
+	}
+	err = prometheus.DefaultRegisterer.Register(metrics.publishLatency)
+	if err != nil {
+		if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+			metrics.publishLatency = are.ExistingCollector.(*prometheus.HistogramVec)
+		}
+	}
+	err = prometheus.DefaultRegisterer.Register(metrics.publishRPCLatency)
+	if err != nil {
+		if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+			metrics.publishRPCLatency = are.ExistingCollector.(*prometheus.HistogramVec)
+		}
+	}
+	err = prometheus.DefaultRegisterer.Register(metrics.messagesReceived)
+	if err != nil {
+		if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+			metrics.messagesReceived = are.ExistingCollector.(*prometheus.CounterVec)
+		}
+	}
+	err = prometheus.DefaultRegisterer.Register(metrics.bytesReceived)
+	if err != nil {
+		if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+			metrics.bytesReceived = are.ExistingCollector.(*prometheus.CounterVec)
+		}
+	}
+	err = prometheus.DefaultRegisterer.Register(metrics.prefetchedMessages)
+	if err != nil {
+		if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+			metrics.prefetchedMessages = are.ExistingCollector.(*prometheus.GaugeVec)
+		}
+	}
+	err = prometheus.DefaultRegisterer.Register(metrics.prefetchedBytes)
+	if err != nil {
+		if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+			metrics.prefetchedBytes = are.ExistingCollector.(*prometheus.GaugeVec)
+		}
+	}
+	err = prometheus.DefaultRegisterer.Register(metrics.acksCounter)
+	if err != nil {
+		if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+			metrics.acksCounter = are.ExistingCollector.(*prometheus.CounterVec)
+		}
+	}
+	err = prometheus.DefaultRegisterer.Register(metrics.nacksCounter)
+	if err != nil {
+		if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+			metrics.nacksCounter = are.ExistingCollector.(*prometheus.CounterVec)
+		}
+	}
+	err = prometheus.DefaultRegisterer.Register(metrics.dlqCounter)
+	if err != nil {
+		if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+			metrics.dlqCounter = are.ExistingCollector.(*prometheus.CounterVec)
+		}
+	}
+	err = prometheus.DefaultRegisterer.Register(metrics.processingTime)
+	if err != nil {
+		if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+			metrics.processingTime = are.ExistingCollector.(*prometheus.HistogramVec)
+		}
+	}
+	err = prometheus.DefaultRegisterer.Register(metrics.producersOpened)
+	if err != nil {
+		if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+			metrics.producersOpened = are.ExistingCollector.(*prometheus.CounterVec)
+		}
+	}
+	err = prometheus.DefaultRegisterer.Register(metrics.producersClosed)
+	if err != nil {
+		if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+			metrics.producersClosed = are.ExistingCollector.(*prometheus.CounterVec)
+		}
+	}
+	err = prometheus.DefaultRegisterer.Register(metrics.producersPartitions)
+	if err != nil {
+		if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+			metrics.producersPartitions = are.ExistingCollector.(*prometheus.GaugeVec)
+		}
+	}
+	err = prometheus.DefaultRegisterer.Register(metrics.consumersOpened)
+	if err != nil {
+		if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+			metrics.consumersOpened = are.ExistingCollector.(*prometheus.CounterVec)
+		}
+	}
+	err = prometheus.DefaultRegisterer.Register(metrics.consumersClosed)
+	if err != nil {
+		if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+			metrics.consumersClosed = are.ExistingCollector.(*prometheus.CounterVec)
+		}
+	}
+	err = prometheus.DefaultRegisterer.Register(metrics.consumersPartitions)
+	if err != nil {
+		if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+			metrics.consumersPartitions = are.ExistingCollector.(*prometheus.GaugeVec)
+		}
+	}
+	err = prometheus.DefaultRegisterer.Register(metrics.readersOpened)
+	if err != nil {
+		if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+			metrics.readersOpened = are.ExistingCollector.(*prometheus.CounterVec)
+		}
+	}
+	err = prometheus.DefaultRegisterer.Register(metrics.readersClosed)
+	if err != nil {
+		if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+			metrics.readersClosed = are.ExistingCollector.(*prometheus.CounterVec)
+		}
+	}
+	err = prometheus.DefaultRegisterer.Register(metrics.ConnectionsOpened)
+	if err != nil {
+		if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+			metrics.ConnectionsOpened = are.ExistingCollector.(prometheus.Counter)
+		}
+	}
+	err = prometheus.DefaultRegisterer.Register(metrics.ConnectionsClosed)
+	if err != nil {
+		if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+			metrics.ConnectionsClosed = are.ExistingCollector.(prometheus.Counter)
+		}
+	}
+	err = prometheus.DefaultRegisterer.Register(metrics.ConnectionsEstablishmentErrors)
+	if err != nil {
+		if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+			metrics.ConnectionsEstablishmentErrors = are.ExistingCollector.(prometheus.Counter)
+		}
+	}
+	err = prometheus.DefaultRegisterer.Register(metrics.ConnectionsHandshakeErrors)
+	if err != nil {
+		if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+			metrics.ConnectionsHandshakeErrors = are.ExistingCollector.(prometheus.Counter)
+		}
+	}
+	err = prometheus.DefaultRegisterer.Register(metrics.LookupRequestsCount)
+	if err != nil {
+		if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+			metrics.LookupRequestsCount = are.ExistingCollector.(prometheus.Counter)
+		}
+	}
+	err = prometheus.DefaultRegisterer.Register(metrics.PartitionedTopicMetadataRequestsCount)
+	if err != nil {
+		if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+			metrics.PartitionedTopicMetadataRequestsCount = are.ExistingCollector.(prometheus.Counter)
+		}
+	}
+	err = prometheus.DefaultRegisterer.Register(metrics.RPCRequestCount)
+	if err != nil {
+		if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+			metrics.RPCRequestCount = are.ExistingCollector.(prometheus.Counter)
+		}
+	}
 	return metrics
 }