You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/11/20 11:19:56 UTC

[pulsar-client-go] branch master updated: Add metric for internal publish latency (#397)

This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new b8bd55b  Add metric for internal publish latency (#397)
b8bd55b is described below

commit b8bd55bc02bdaef89774b1b3e57c9652ec69cd90
Author: cckellogg <cc...@gmail.com>
AuthorDate: Fri Nov 20 03:19:47 2020 -0800

    Add metric for internal publish latency (#397)
    
    The current publish latency metric measures the time when send is called until the message is acked. The metric can be skewed if batching is enabled.
    
    For example, if we call send the client will hold that message until x number of messages are collected or an interval has passed. This new metric measures the time when we send the data to the connection until we get an ack from the broker. This cuts out any batch skewing and provides a more accurate latency of what the client sees at the connection level.
---
 pulsar/producer_partition.go | 11 +++++++++++
 1 file changed, 11 insertions(+)

diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 21bea9f..3397b6e 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -82,6 +82,12 @@ var (
 		Help:    "Publish latency experienced by the client",
 		Buckets: []float64{.0005, .001, .005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10},
 	})
+
+	publishRPCLatency = promauto.NewHistogram(prometheus.HistogramOpts{
+		Name:    "pulsar_client_producer_rpc_latency_seconds",
+		Help:    "Publish RPC latency experienced internally by the client when sending data to receiving an ack",
+		Buckets: []float64{.0005, .001, .005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10},
+	})
 )
 
 type partitionProducer struct {
@@ -424,6 +430,7 @@ type pendingItem struct {
 	sync.Mutex
 	batchData    internal.Buffer
 	sequenceID   uint64
+	sentAt       int64
 	sendRequests []interface{}
 	completed    bool
 }
@@ -437,6 +444,7 @@ func (p *partitionProducer) internalFlushCurrentBatch() {
 	p.pendingQueue.Put(&pendingItem{
 		batchData:    batchData,
 		sequenceID:   sequenceID,
+		sentAt:       time.Now().UnixNano(),
 		sendRequests: callbacks,
 	})
 	p.cnx.WriteData(batchData)
@@ -545,6 +553,9 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt)
 	// lock the pending item while sending the requests
 	pi.Lock()
 	defer pi.Unlock()
+	if pi.sentAt > 0 {
+		publishRPCLatency.Observe(float64(now-pi.sentAt) / 1.0e9)
+	}
 	for idx, i := range pi.sendRequests {
 		sr := i.(*sendRequest)
 		if sr.msg != nil {