You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/06/28 08:20:07 UTC

[rocketmq-client-csharp] branch observability updated: Collect metrics for Producer

This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch observability
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-csharp.git


The following commit(s) were added to refs/heads/observability by this push:
     new 65ca656  Collect metrics for Producer
65ca656 is described below

commit 65ca656cc67d2b019f905d5666817fa07bf5c39d
Author: Li Zhanhui <li...@gmail.com>
AuthorDate: Tue Jun 28 16:19:58 2022 +0800

    Collect metrics for Producer
---
 rocketmq-client-csharp/Client.cs   |  9 +++++++--
 rocketmq-client-csharp/Producer.cs | 20 +++++++++++++++++++-
 2 files changed, 26 insertions(+), 3 deletions(-)

diff --git a/rocketmq-client-csharp/Client.cs b/rocketmq-client-csharp/Client.cs
index 14d9b5b..8347650 100644
--- a/rocketmq-client-csharp/Client.cs
+++ b/rocketmq-client-csharp/Client.cs
@@ -24,6 +24,9 @@ using System;
 using rmq = Apache.Rocketmq.V2;
 using grpc = global::Grpc.Core;
 using NLog;
+using System.Diagnostics.Metrics;
+using OpenTelemetry;
+using OpenTelemetry.Metrics;
 
 
 namespace Org.Apache.Rocketmq
@@ -235,7 +238,7 @@ namespace Org.Apache.Rocketmq
 
             var metadata = new grpc.Metadata();
             Signature.sign(this, metadata);
-            int index = random.Next(0, AccessPointEndpoints.Count);
+            int index = _random.Next(0, AccessPointEndpoints.Count);
             var serviceEndpoint = AccessPointEndpoints[index];
             // AccessPointAddresses.Count
             string target = $"https://{serviceEndpoint.Host}:{serviceEndpoint.Port}";
@@ -481,8 +484,10 @@ namespace Org.Apache.Rocketmq
         // This field is subject changes from servers.
         protected readonly rmq::Settings _clientSettings;
 
-        private readonly Random random = new Random();
+        private readonly Random _random = new Random();
         
         protected readonly ConcurrentDictionary<string, Session> _sessions = new ConcurrentDictionary<string, Session>();
+
+        protected static readonly Meter MetricMeter = new("Apache.RocketMQ.Client", "1.0");
     }
 }
\ No newline at end of file
diff --git a/rocketmq-client-csharp/Producer.cs b/rocketmq-client-csharp/Producer.cs
index 40d62a0..8d2da7d 100644
--- a/rocketmq-client-csharp/Producer.cs
+++ b/rocketmq-client-csharp/Producer.cs
@@ -20,6 +20,8 @@ using System.Threading.Tasks;
 using rmq = Apache.Rocketmq.V2;
 using System.Collections.Generic;
 using System.Collections.Concurrent;
+using System.Diagnostics;
+using System.Diagnostics.Metrics;
 using Google.Protobuf;
 using Google.Protobuf.WellKnownTypes;
 using Grpc.Core;
@@ -32,6 +34,10 @@ namespace Org.Apache.Rocketmq
         public Producer(AccessPoint accessPoint, string resourceNamespace) : base(accessPoint, resourceNamespace)
         {
             _loadBalancer = new ConcurrentDictionary<string, PublishLoadBalancer>();
+            _sendFailureTotal = MetricMeter.CreateCounter<UInt64>("rocketmq_send_failure_total");
+            _sendLatency = MetricMeter.CreateHistogram<double>("rocketmq_send_success_cost_time", 
+                description: "Measure the duration of publishing messages to brokers",
+                unit: "milliseconds");
         }
 
         public override async Task Start()
@@ -132,16 +138,25 @@ namespace Org.Apache.Rocketmq
             {
                 try
                 {
+                    var stopWatch = new Stopwatch();
+                    stopWatch.Start();
                     rmq::SendMessageResponse response = await Manager.SendMessage(target, metadata, request, RequestTimeout);
                     if (null != response && rmq::Code.Ok == response.Status.Code)
                     {
-
                         var messageId = response.Entries[0].MessageId;
+                        
+                        // Account latency histogram
+                        stopWatch.Stop();
+                        var latency = stopWatch.ElapsedMilliseconds;
+                        _sendLatency.Record(latency, new("topic", message.Topic), new("client_id", clientId()));
+                        
                         return new SendReceipt(messageId);
                     }
                 }
                 catch (Exception e)
                 {
+                    // Account failure count
+                    _sendFailureTotal.Add(1, new("topic", message.Topic), new("client_id", clientId()));                    
                     Logger.Info(e, $"Failed to send message to {target}");
                     ex = e;
                 }
@@ -158,5 +173,8 @@ namespace Org.Apache.Rocketmq
         }
 
         private readonly ConcurrentDictionary<string, PublishLoadBalancer> _loadBalancer;
+
+        private readonly Counter<UInt64> _sendFailureTotal;
+        private readonly Histogram<double> _sendLatency;
     }
 }
\ No newline at end of file