You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/08/25 18:18:24 UTC
[pulsar-client-go] branch master updated: Fix missing metrics for
topics by registration of existing collector (#600)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 1a3ad70 Fix missing metrics for topics by registration of existing collector (#600)
1a3ad70 is described below
commit 1a3ad70403aa04feda83edcfd984e25cef2dcfe6
Author: pkutilina <77...@users.noreply.github.com>
AuthorDate: Wed Aug 25 20:18:18 2021 +0200
Fix missing metrics for topics by registration of existing collector (#600)
---
pulsar/internal/metrics.go | 213 ++++++++++++++++++++++++++++++++++++++-------
1 file changed, 180 insertions(+), 33 deletions(-)
diff --git a/pulsar/internal/metrics.go b/pulsar/internal/metrics.go
index d44522d..6902ca9 100644
--- a/pulsar/internal/metrics.go
+++ b/pulsar/internal/metrics.go
@@ -282,39 +282,186 @@ func NewMetricsProvider(userDefinedLabels map[string]string) *Metrics {
}),
}
- prometheus.DefaultRegisterer.Register(metrics.messagesPublished)
- prometheus.DefaultRegisterer.Register(metrics.bytesPublished)
- prometheus.DefaultRegisterer.Register(metrics.messagesPending)
- prometheus.DefaultRegisterer.Register(metrics.bytesPending)
- prometheus.DefaultRegisterer.Register(metrics.publishErrors)
- prometheus.DefaultRegisterer.Register(metrics.publishLatency)
- prometheus.DefaultRegisterer.Register(metrics.publishRPCLatency)
-
- prometheus.DefaultRegisterer.Register(metrics.messagesReceived)
- prometheus.DefaultRegisterer.Register(metrics.bytesReceived)
- prometheus.DefaultRegisterer.Register(metrics.prefetchedMessages)
- prometheus.DefaultRegisterer.Register(metrics.prefetchedBytes)
- prometheus.DefaultRegisterer.Register(metrics.acksCounter)
- prometheus.DefaultRegisterer.Register(metrics.nacksCounter)
- prometheus.DefaultRegisterer.Register(metrics.dlqCounter)
- prometheus.DefaultRegisterer.Register(metrics.processingTime)
-
- prometheus.DefaultRegisterer.Register(metrics.producersOpened)
- prometheus.DefaultRegisterer.Register(metrics.producersClosed)
- prometheus.DefaultRegisterer.Register(metrics.producersPartitions)
- prometheus.DefaultRegisterer.Register(metrics.consumersOpened)
- prometheus.DefaultRegisterer.Register(metrics.consumersClosed)
- prometheus.DefaultRegisterer.Register(metrics.consumersPartitions)
- prometheus.DefaultRegisterer.Register(metrics.readersOpened)
- prometheus.DefaultRegisterer.Register(metrics.readersClosed)
-
- prometheus.DefaultRegisterer.Register(metrics.ConnectionsOpened)
- prometheus.DefaultRegisterer.Register(metrics.ConnectionsClosed)
- prometheus.DefaultRegisterer.Register(metrics.ConnectionsEstablishmentErrors)
- prometheus.DefaultRegisterer.Register(metrics.ConnectionsHandshakeErrors)
- prometheus.DefaultRegisterer.Register(metrics.LookupRequestsCount)
- prometheus.DefaultRegisterer.Register(metrics.PartitionedTopicMetadataRequestsCount)
- prometheus.DefaultRegisterer.Register(metrics.RPCRequestCount)
+ err := prometheus.DefaultRegisterer.Register(metrics.messagesPublished)
+ if err != nil {
+ if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+ metrics.messagesPublished = are.ExistingCollector.(*prometheus.CounterVec)
+ }
+ }
+ err = prometheus.DefaultRegisterer.Register(metrics.bytesPublished)
+ if err != nil {
+ if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+ metrics.bytesPublished = are.ExistingCollector.(*prometheus.CounterVec)
+ }
+ }
+ err = prometheus.DefaultRegisterer.Register(metrics.messagesPending)
+ if err != nil {
+ if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+ metrics.messagesPending = are.ExistingCollector.(*prometheus.GaugeVec)
+ }
+ }
+ err = prometheus.DefaultRegisterer.Register(metrics.bytesPending)
+ if err != nil {
+ if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+ metrics.bytesPending = are.ExistingCollector.(*prometheus.GaugeVec)
+ }
+ }
+ err = prometheus.DefaultRegisterer.Register(metrics.publishErrors)
+ if err != nil {
+ if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+ metrics.publishErrors = are.ExistingCollector.(*prometheus.CounterVec)
+ }
+ }
+ err = prometheus.DefaultRegisterer.Register(metrics.publishLatency)
+ if err != nil {
+ if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+ metrics.publishLatency = are.ExistingCollector.(*prometheus.HistogramVec)
+ }
+ }
+ err = prometheus.DefaultRegisterer.Register(metrics.publishRPCLatency)
+ if err != nil {
+ if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+ metrics.publishRPCLatency = are.ExistingCollector.(*prometheus.HistogramVec)
+ }
+ }
+ err = prometheus.DefaultRegisterer.Register(metrics.messagesReceived)
+ if err != nil {
+ if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+ metrics.messagesReceived = are.ExistingCollector.(*prometheus.CounterVec)
+ }
+ }
+ err = prometheus.DefaultRegisterer.Register(metrics.bytesReceived)
+ if err != nil {
+ if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+ metrics.bytesReceived = are.ExistingCollector.(*prometheus.CounterVec)
+ }
+ }
+ err = prometheus.DefaultRegisterer.Register(metrics.prefetchedMessages)
+ if err != nil {
+ if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+ metrics.prefetchedMessages = are.ExistingCollector.(*prometheus.GaugeVec)
+ }
+ }
+ err = prometheus.DefaultRegisterer.Register(metrics.prefetchedBytes)
+ if err != nil {
+ if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+ metrics.prefetchedBytes = are.ExistingCollector.(*prometheus.GaugeVec)
+ }
+ }
+ err = prometheus.DefaultRegisterer.Register(metrics.acksCounter)
+ if err != nil {
+ if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+ metrics.acksCounter = are.ExistingCollector.(*prometheus.CounterVec)
+ }
+ }
+ err = prometheus.DefaultRegisterer.Register(metrics.nacksCounter)
+ if err != nil {
+ if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+ metrics.nacksCounter = are.ExistingCollector.(*prometheus.CounterVec)
+ }
+ }
+ err = prometheus.DefaultRegisterer.Register(metrics.dlqCounter)
+ if err != nil {
+ if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+ metrics.dlqCounter = are.ExistingCollector.(*prometheus.CounterVec)
+ }
+ }
+ err = prometheus.DefaultRegisterer.Register(metrics.processingTime)
+ if err != nil {
+ if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+ metrics.processingTime = are.ExistingCollector.(*prometheus.HistogramVec)
+ }
+ }
+ err = prometheus.DefaultRegisterer.Register(metrics.producersOpened)
+ if err != nil {
+ if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+ metrics.producersOpened = are.ExistingCollector.(*prometheus.CounterVec)
+ }
+ }
+ err = prometheus.DefaultRegisterer.Register(metrics.producersClosed)
+ if err != nil {
+ if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+ metrics.producersClosed = are.ExistingCollector.(*prometheus.CounterVec)
+ }
+ }
+ err = prometheus.DefaultRegisterer.Register(metrics.producersPartitions)
+ if err != nil {
+ if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+ metrics.producersPartitions = are.ExistingCollector.(*prometheus.GaugeVec)
+ }
+ }
+ err = prometheus.DefaultRegisterer.Register(metrics.consumersOpened)
+ if err != nil {
+ if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+ metrics.consumersOpened = are.ExistingCollector.(*prometheus.CounterVec)
+ }
+ }
+ err = prometheus.DefaultRegisterer.Register(metrics.consumersClosed)
+ if err != nil {
+ if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+ metrics.consumersClosed = are.ExistingCollector.(*prometheus.CounterVec)
+ }
+ }
+ err = prometheus.DefaultRegisterer.Register(metrics.consumersPartitions)
+ if err != nil {
+ if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+ metrics.consumersPartitions = are.ExistingCollector.(*prometheus.GaugeVec)
+ }
+ }
+ err = prometheus.DefaultRegisterer.Register(metrics.readersOpened)
+ if err != nil {
+ if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+ metrics.readersOpened = are.ExistingCollector.(*prometheus.CounterVec)
+ }
+ }
+ err = prometheus.DefaultRegisterer.Register(metrics.readersClosed)
+ if err != nil {
+ if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+ metrics.readersClosed = are.ExistingCollector.(*prometheus.CounterVec)
+ }
+ }
+ err = prometheus.DefaultRegisterer.Register(metrics.ConnectionsOpened)
+ if err != nil {
+ if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+ metrics.ConnectionsOpened = are.ExistingCollector.(prometheus.Counter)
+ }
+ }
+ err = prometheus.DefaultRegisterer.Register(metrics.ConnectionsClosed)
+ if err != nil {
+ if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+ metrics.ConnectionsClosed = are.ExistingCollector.(prometheus.Counter)
+ }
+ }
+ err = prometheus.DefaultRegisterer.Register(metrics.ConnectionsEstablishmentErrors)
+ if err != nil {
+ if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+ metrics.ConnectionsEstablishmentErrors = are.ExistingCollector.(prometheus.Counter)
+ }
+ }
+ err = prometheus.DefaultRegisterer.Register(metrics.ConnectionsHandshakeErrors)
+ if err != nil {
+ if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+ metrics.ConnectionsHandshakeErrors = are.ExistingCollector.(prometheus.Counter)
+ }
+ }
+ err = prometheus.DefaultRegisterer.Register(metrics.LookupRequestsCount)
+ if err != nil {
+ if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+ metrics.LookupRequestsCount = are.ExistingCollector.(prometheus.Counter)
+ }
+ }
+ err = prometheus.DefaultRegisterer.Register(metrics.PartitionedTopicMetadataRequestsCount)
+ if err != nil {
+ if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+ metrics.PartitionedTopicMetadataRequestsCount = are.ExistingCollector.(prometheus.Counter)
+ }
+ }
+ err = prometheus.DefaultRegisterer.Register(metrics.RPCRequestCount)
+ if err != nil {
+ if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+ metrics.RPCRequestCount = are.ExistingCollector.(prometheus.Counter)
+ }
+ }
return metrics
}