You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/07/29 05:00:39 UTC

[pulsar-client-go] branch master updated: [issue 814] consumer and producer reconnect failure metrics counter (#815)

This is an automated email from the ASF dual-hosted git repository.

mmarshall pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new bd19458  [issue 814] consumer and producer reconnect failure metrics counter (#815)
bd19458 is described below

commit bd19458b32ff89206c135cc647336e690e99c32f
Author: ming <it...@gmail.com>
AuthorDate: Fri Jul 29 01:00:33 2022 -0400

    [issue 814] consumer and producer reconnect failure metrics counter (#815)
    
    * consumer and producer reconnect failure metrics counter
    
    * increment on every reconnect failure
    
    * producer consumer reconnect max retry counter
    
    Implement #814
    
    ### Motivation
    In a Pulsar cluster's kubernetes deployment or a deployment with Proxy/LB in the front, we need metrics counter to track the re-connection failure producers and consumers.
    
    When brokers go offline but the proxy/LB is still functioning, TCP connection can still be established but the topic look up failed. pulsar_client_connections_establishment_errors counter is not incremented in this case.  Therefore new counters are required to track such failure cases.
    
    ### Modifications
    
    Two new counter metrics `pulsar_client_producers_reconnect_failure` and `pulsar_client_consumers_reconnect_failure` will be incremented at the producer_partition and consumer_partition retry failure code block.
    
    Two new counter metrics `pulsar_client_producers_reconnect_max_retry` and `pulsar_client_consumers_reconnect_max_retry` will be incremented at the producer_partition and consumer_partition when either the max retry or max back off is reached.
    
    The existing code logic already covers the case when the topic does not exist. The counters will not be pegged if the topic does not exist. It simply exists from the retry loop at once.
    
    ### Verifying this change
    
    This has been verified in the Pulsar cluster deployment with Proxy. We do not have such set up in CI because it's not possible to test with Pulsar standalone mode.
    
    ### Does this pull request potentially affect one of the following parts:
    
    *If `yes` was chosen, please highlight the changes*
    
      - Dependencies (does it add or upgrade a dependency): ( no)
      - The public API: ( no)
      - The schema: (no)
      - The default values of configurations: ( no)
      - The wire protocol: (no)
    
    ### Documentation
    
      - Does this pull request introduce a new feature? (no)
      - If yes, how is the feature documented? (not applicable / docs / GoDocs / not documented)
      - If a feature is not applicable for documentation, explain why?
      - If a feature is not documented yet in this PR, please create a followup issue for adding the documentation
---
 pulsar/consumer_partition.go    |   4 ++
 pulsar/internal/backoff.go      |   5 ++
 pulsar/internal/backoff_test.go |   2 +
 pulsar/internal/metrics.go      | 108 +++++++++++++++++++++++++++++++---------
 pulsar/producer_partition.go    |   4 ++
 5 files changed, 99 insertions(+), 24 deletions(-)

diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index fac9d4b..50fa3c4 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -1155,6 +1155,10 @@ func (pc *partitionConsumer) reconnectToBroker() {
 		if maxRetry > 0 {
 			maxRetry--
 		}
+		pc.metrics.ConsumersReconnectFailure.Inc()
+		if maxRetry == 0 || backoff.IsMaxBackoffReached() {
+			pc.metrics.ConsumersReconnectMaxRetry.Inc()
+		}
 	}
 }
 
diff --git a/pulsar/internal/backoff.go b/pulsar/internal/backoff.go
index f172bdc..ff9b0bc 100644
--- a/pulsar/internal/backoff.go
+++ b/pulsar/internal/backoff.go
@@ -51,3 +51,8 @@ func (b *Backoff) Next() time.Duration {
 
 	return b.backoff + time.Duration(jitter)
 }
+
+// IsMaxBackReached evaluates if the max number of retries is reached
+func (b *Backoff) IsMaxBackoffReached() bool {
+	return b.backoff >= maxBackoff
+}
diff --git a/pulsar/internal/backoff_test.go b/pulsar/internal/backoff_test.go
index 46c9211..ad6e764 100644
--- a/pulsar/internal/backoff_test.go
+++ b/pulsar/internal/backoff_test.go
@@ -42,6 +42,7 @@ func TestBackoff_NextExponentialBackoff(t *testing.T) {
 		// the jitter introduces at most 20% difference so delay is less than twice the previous value
 		assert.LessOrEqual(t, int64(float64(delay)*.8), int64(2*float64(previousDelay)))
 		previousDelay = delay
+		assert.Equal(t, false, backoff.IsMaxBackoffReached())
 	}
 }
 
@@ -54,6 +55,7 @@ func TestBackoff_NextMaxValue(t *testing.T) {
 
 	cappedDelay := backoff.Next()
 	assert.GreaterOrEqual(t, int64(cappedDelay), int64(maxBackoff))
+	assert.Equal(t, true, backoff.IsMaxBackoffReached())
 	// max value is 60 seconds + 20% jitter = 72 seconds
 	assert.LessOrEqual(t, int64(cappedDelay), int64(72*time.Second))
 }
diff --git a/pulsar/internal/metrics.go b/pulsar/internal/metrics.go
index 1cab470..a2af33c 100644
--- a/pulsar/internal/metrics.go
+++ b/pulsar/internal/metrics.go
@@ -40,14 +40,18 @@ type Metrics struct {
 	dlqCounter         *prometheus.CounterVec
 	processingTime     *prometheus.HistogramVec
 
-	producersOpened     *prometheus.CounterVec
-	producersClosed     *prometheus.CounterVec
-	producersPartitions *prometheus.GaugeVec
-	consumersOpened     *prometheus.CounterVec
-	consumersClosed     *prometheus.CounterVec
-	consumersPartitions *prometheus.GaugeVec
-	readersOpened       *prometheus.CounterVec
-	readersClosed       *prometheus.CounterVec
+	producersOpened            *prometheus.CounterVec
+	producersClosed            *prometheus.CounterVec
+	producersReconnectFailure  *prometheus.CounterVec
+	producersReconnectMaxRetry *prometheus.CounterVec
+	producersPartitions        *prometheus.GaugeVec
+	consumersOpened            *prometheus.CounterVec
+	consumersClosed            *prometheus.CounterVec
+	consumersReconnectFailure  *prometheus.CounterVec
+	consumersReconnectMaxRetry *prometheus.CounterVec
+	consumersPartitions        *prometheus.GaugeVec
+	readersOpened              *prometheus.CounterVec
+	readersClosed              *prometheus.CounterVec
 
 	// Metrics that are not labeled with specificity are immediately available
 	ConnectionsOpened                     prometheus.Counter
@@ -78,14 +82,18 @@ type LeveledMetrics struct {
 	DlqCounter         prometheus.Counter
 	ProcessingTime     prometheus.Observer
 
-	ProducersOpened     prometheus.Counter
-	ProducersClosed     prometheus.Counter
-	ProducersPartitions prometheus.Gauge
-	ConsumersOpened     prometheus.Counter
-	ConsumersClosed     prometheus.Counter
-	ConsumersPartitions prometheus.Gauge
-	ReadersOpened       prometheus.Counter
-	ReadersClosed       prometheus.Counter
+	ProducersOpened            prometheus.Counter
+	ProducersClosed            prometheus.Counter
+	ProducersReconnectFailure  prometheus.Counter
+	ProducersReconnectMaxRetry prometheus.Counter
+	ProducersPartitions        prometheus.Gauge
+	ConsumersOpened            prometheus.Counter
+	ConsumersClosed            prometheus.Counter
+	ConsumersReconnectFailure  prometheus.Counter
+	ConsumersReconnectMaxRetry prometheus.Counter
+	ConsumersPartitions        prometheus.Gauge
+	ReadersOpened              prometheus.Counter
+	ReadersClosed              prometheus.Counter
 }
 
 func NewMetricsProvider(metricsCardinality int, userDefinedLabels map[string]string) *Metrics {
@@ -175,6 +183,18 @@ func NewMetricsProvider(metricsCardinality int, userDefinedLabels map[string]str
 			ConstLabels: constLabels,
 		}, metricsLevelLabels),
 
+		producersReconnectFailure: prometheus.NewCounterVec(prometheus.CounterOpts{
+			Name:        "pulsar_client_producers_reconnect_failure",
+			Help:        "Counter of reconnect failure of producers",
+			ConstLabels: constLabels,
+		}, metricsLevelLabels),
+
+		producersReconnectMaxRetry: prometheus.NewCounterVec(prometheus.CounterOpts{
+			Name:        "pulsar_client_producers_reconnect_max_retry",
+			Help:        "Counter of producer reconnect max retry reached",
+			ConstLabels: constLabels,
+		}, metricsLevelLabels),
+
 		consumersOpened: prometheus.NewCounterVec(prometheus.CounterOpts{
 			Name:        "pulsar_client_consumers_opened",
 			Help:        "Counter of consumers created by the client",
@@ -187,6 +207,18 @@ func NewMetricsProvider(metricsCardinality int, userDefinedLabels map[string]str
 			ConstLabels: constLabels,
 		}, metricsLevelLabels),
 
+		consumersReconnectFailure: prometheus.NewCounterVec(prometheus.CounterOpts{
+			Name:        "pulsar_client_consumers_reconnect_failure",
+			Help:        "Counter of reconnect failure of consumers",
+			ConstLabels: constLabels,
+		}, metricsLevelLabels),
+
+		consumersReconnectMaxRetry: prometheus.NewCounterVec(prometheus.CounterOpts{
+			Name:        "pulsar_client_consumers_reconnect_max_retry",
+			Help:        "Counter of consumer reconnect max retry reached",
+			ConstLabels: constLabels,
+		}, metricsLevelLabels),
+
 		consumersPartitions: prometheus.NewGaugeVec(prometheus.GaugeOpts{
 			Name:        "pulsar_client_consumers_partitions_active",
 			Help:        "Counter of individual partitions the consumers are currently active",
@@ -399,6 +431,18 @@ func NewMetricsProvider(metricsCardinality int, userDefinedLabels map[string]str
 			metrics.producersClosed = are.ExistingCollector.(*prometheus.CounterVec)
 		}
 	}
+	err = prometheus.DefaultRegisterer.Register(metrics.producersReconnectFailure)
+	if err != nil {
+		if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+			metrics.producersReconnectFailure = are.ExistingCollector.(*prometheus.CounterVec)
+		}
+	}
+	err = prometheus.DefaultRegisterer.Register(metrics.producersReconnectMaxRetry)
+	if err != nil {
+		if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+			metrics.producersReconnectMaxRetry = are.ExistingCollector.(*prometheus.CounterVec)
+		}
+	}
 	err = prometheus.DefaultRegisterer.Register(metrics.producersPartitions)
 	if err != nil {
 		if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
@@ -417,6 +461,18 @@ func NewMetricsProvider(metricsCardinality int, userDefinedLabels map[string]str
 			metrics.consumersClosed = are.ExistingCollector.(*prometheus.CounterVec)
 		}
 	}
+	err = prometheus.DefaultRegisterer.Register(metrics.consumersReconnectFailure)
+	if err != nil {
+		if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+			metrics.consumersReconnectFailure = are.ExistingCollector.(*prometheus.CounterVec)
+		}
+	}
+	err = prometheus.DefaultRegisterer.Register(metrics.consumersReconnectMaxRetry)
+	if err != nil {
+		if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+			metrics.consumersReconnectMaxRetry = are.ExistingCollector.(*prometheus.CounterVec)
+		}
+	}
 	err = prometheus.DefaultRegisterer.Register(metrics.consumersPartitions)
 	if err != nil {
 		if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
@@ -517,14 +573,18 @@ func (mp *Metrics) GetLeveledMetrics(t string) *LeveledMetrics {
 		DlqCounter:         mp.dlqCounter.With(labels),
 		ProcessingTime:     mp.processingTime.With(labels),
 
-		ProducersOpened:     mp.producersOpened.With(labels),
-		ProducersClosed:     mp.producersClosed.With(labels),
-		ProducersPartitions: mp.producersPartitions.With(labels),
-		ConsumersOpened:     mp.consumersOpened.With(labels),
-		ConsumersClosed:     mp.consumersClosed.With(labels),
-		ConsumersPartitions: mp.consumersPartitions.With(labels),
-		ReadersOpened:       mp.readersOpened.With(labels),
-		ReadersClosed:       mp.readersClosed.With(labels),
+		ProducersOpened:            mp.producersOpened.With(labels),
+		ProducersClosed:            mp.producersClosed.With(labels),
+		ProducersReconnectFailure:  mp.producersReconnectFailure.With(labels),
+		ProducersReconnectMaxRetry: mp.producersReconnectMaxRetry.With(labels),
+		ProducersPartitions:        mp.producersPartitions.With(labels),
+		ConsumersOpened:            mp.consumersOpened.With(labels),
+		ConsumersClosed:            mp.consumersClosed.With(labels),
+		ConsumersReconnectFailure:  mp.consumersReconnectFailure.With(labels),
+		ConsumersReconnectMaxRetry: mp.consumersReconnectMaxRetry.With(labels),
+		ConsumersPartitions:        mp.consumersPartitions.With(labels),
+		ReadersOpened:              mp.readersOpened.With(labels),
+		ReadersClosed:              mp.readersClosed.With(labels),
 	}
 
 	return lm
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index ec92415..210a929 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -419,6 +419,10 @@ func (p *partitionProducer) reconnectToBroker() {
 		if maxRetry > 0 {
 			maxRetry--
 		}
+		p.metrics.ProducersReconnectFailure.Inc()
+		if maxRetry == 0 || backoff.IsMaxBackoffReached() {
+			p.metrics.ProducersReconnectMaxRetry.Inc()
+		}
 	}
 }