You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/06/28 08:20:07 UTC
[rocketmq-client-csharp] branch observability updated: Collect metrics for Producer
This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch observability
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-csharp.git
The following commit(s) were added to refs/heads/observability by this push:
new 65ca656 Collect metrics for Producer
65ca656 is described below
commit 65ca656cc67d2b019f905d5666817fa07bf5c39d
Author: Li Zhanhui <li...@gmail.com>
AuthorDate: Tue Jun 28 16:19:58 2022 +0800
Collect metrics for Producer
---
rocketmq-client-csharp/Client.cs | 9 +++++++--
rocketmq-client-csharp/Producer.cs | 20 +++++++++++++++++++-
2 files changed, 26 insertions(+), 3 deletions(-)
diff --git a/rocketmq-client-csharp/Client.cs b/rocketmq-client-csharp/Client.cs
index 14d9b5b..8347650 100644
--- a/rocketmq-client-csharp/Client.cs
+++ b/rocketmq-client-csharp/Client.cs
@@ -24,6 +24,9 @@ using System;
using rmq = Apache.Rocketmq.V2;
using grpc = global::Grpc.Core;
using NLog;
+using System.Diagnostics.Metrics;
+using OpenTelemetry;
+using OpenTelemetry.Metrics;
namespace Org.Apache.Rocketmq
@@ -235,7 +238,7 @@ namespace Org.Apache.Rocketmq
var metadata = new grpc.Metadata();
Signature.sign(this, metadata);
- int index = random.Next(0, AccessPointEndpoints.Count);
+ int index = _random.Next(0, AccessPointEndpoints.Count);
var serviceEndpoint = AccessPointEndpoints[index];
// AccessPointAddresses.Count
string target = $"https://{serviceEndpoint.Host}:{serviceEndpoint.Port}";
@@ -481,8 +484,10 @@ namespace Org.Apache.Rocketmq
// This field is subject changes from servers.
protected readonly rmq::Settings _clientSettings;
- private readonly Random random = new Random();
+ private readonly Random _random = new Random();
protected readonly ConcurrentDictionary<string, Session> _sessions = new ConcurrentDictionary<string, Session>();
+
+ protected static readonly Meter MetricMeter = new("Apache.RocketMQ.Client", "1.0");
}
}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/Producer.cs b/rocketmq-client-csharp/Producer.cs
index 40d62a0..8d2da7d 100644
--- a/rocketmq-client-csharp/Producer.cs
+++ b/rocketmq-client-csharp/Producer.cs
@@ -20,6 +20,8 @@ using System.Threading.Tasks;
using rmq = Apache.Rocketmq.V2;
using System.Collections.Generic;
using System.Collections.Concurrent;
+using System.Diagnostics;
+using System.Diagnostics.Metrics;
using Google.Protobuf;
using Google.Protobuf.WellKnownTypes;
using Grpc.Core;
@@ -32,6 +34,10 @@ namespace Org.Apache.Rocketmq
public Producer(AccessPoint accessPoint, string resourceNamespace) : base(accessPoint, resourceNamespace)
{
_loadBalancer = new ConcurrentDictionary<string, PublishLoadBalancer>();
+ _sendFailureTotal = MetricMeter.CreateCounter<UInt64>("rocketmq_send_failure_total");
+ _sendLatency = MetricMeter.CreateHistogram<double>("rocketmq_send_success_cost_time",
+ description: "Measure the duration of publishing messages to brokers",
+ unit: "milliseconds");
}
public override async Task Start()
@@ -132,16 +138,25 @@ namespace Org.Apache.Rocketmq
{
try
{
+ var stopWatch = new Stopwatch();
+ stopWatch.Start();
rmq::SendMessageResponse response = await Manager.SendMessage(target, metadata, request, RequestTimeout);
if (null != response && rmq::Code.Ok == response.Status.Code)
{
-
var messageId = response.Entries[0].MessageId;
+
+ // Account latency histogram
+ stopWatch.Stop();
+ var latency = stopWatch.ElapsedMilliseconds;
+ _sendLatency.Record(latency, new("topic", message.Topic), new("client_id", clientId()));
+
return new SendReceipt(messageId);
}
}
catch (Exception e)
{
+ // Account failure count
+ _sendFailureTotal.Add(1, new("topic", message.Topic), new("client_id", clientId()));
Logger.Info(e, $"Failed to send message to {target}");
ex = e;
}
@@ -158,5 +173,8 @@ namespace Org.Apache.Rocketmq
}
private readonly ConcurrentDictionary<string, PublishLoadBalancer> _loadBalancer;
+
+ private readonly Counter<UInt64> _sendFailureTotal;
+ private readonly Histogram<double> _sendLatency;
}
}
\ No newline at end of file