You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2023/02/27 03:52:09 UTC
[rocketmq-clients] 01/02: WIP: implement client metrics by OpenTelemetry
This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit e219eebfaaa81cfc7c624ff08d9af0a5ca139319
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Mon Feb 27 11:14:22 2023 +0800
WIP: implement client metrics by OpenTelemetry
---
csharp/rocketmq-client-csharp/Client.cs | 5 +
csharp/rocketmq-client-csharp/ClientMeter.cs | 76 ++++++++++++++
.../rocketmq-client-csharp/ClientMeterManager.cs | 115 +++++++++++++++++++++
csharp/rocketmq-client-csharp/Metric.cs | 33 ++++++
csharp/rocketmq-client-csharp/MetricConstant.cs | 70 +++++++++++++
.../MetricHttpDelegatingHandler.cs | 48 +++++++++
csharp/rocketmq-client-csharp/Producer.cs | 18 ++++
csharp/rocketmq-client-csharp/RpcClient.cs | 12 ++-
csharp/rocketmq-client-csharp/Signature.cs | 26 +++--
9 files changed, 391 insertions(+), 12 deletions(-)
diff --git a/csharp/rocketmq-client-csharp/Client.cs b/csharp/rocketmq-client-csharp/Client.cs
index 305383e2..48700c33 100644
--- a/csharp/rocketmq-client-csharp/Client.cs
+++ b/csharp/rocketmq-client-csharp/Client.cs
@@ -51,6 +51,7 @@ namespace Org.Apache.Rocketmq
protected readonly Endpoints Endpoints;
protected readonly IClientManager ClientManager;
protected readonly string ClientId;
+ protected readonly ClientMeterManager ClientMeterManager;
protected readonly ConcurrentDictionary<Endpoints, bool> Isolated;
private readonly ConcurrentDictionary<string, TopicRouteData> _topicRouteCache;
@@ -65,6 +66,7 @@ namespace Org.Apache.Rocketmq
ClientConfig = clientConfig;
Endpoints = new Endpoints(clientConfig.Endpoints);
ClientId = Utilities.GetClientId();
+ ClientMeterManager = new ClientMeterManager(this);
ClientManager = new ClientManager(this);
Isolated = new ConcurrentDictionary<Endpoints, bool>();
@@ -107,6 +109,7 @@ namespace Org.Apache.Rocketmq
_statsCts.Cancel();
NotifyClientTermination();
await ClientManager.Shutdown();
+ ClientMeterManager.Shutdown();
Logger.Debug($"Shutdown the rocketmq client successfully, clientId={ClientId}");
}
@@ -482,6 +485,8 @@ namespace Org.Apache.Rocketmq
public void OnSettingsCommand(Endpoints endpoints, Proto.Settings settings)
{
+ var metric = new Metric(settings.Metric);
+ ClientMeterManager.Reset(metric);
GetSettings().Sync(settings);
}
}
diff --git a/csharp/rocketmq-client-csharp/ClientMeter.cs b/csharp/rocketmq-client-csharp/ClientMeter.cs
new file mode 100644
index 00000000..53fc2cdf
--- /dev/null
+++ b/csharp/rocketmq-client-csharp/ClientMeter.cs
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using NLog;
+using OpenTelemetry.Metrics;
+
+namespace Org.Apache.Rocketmq
+{
+ public class ClientMeter
+ {
+ private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger();
+
+ public ClientMeter(Endpoints endpoints, MeterProvider meterProvider, string clientId)
+ {
+ Enabled = true;
+ Endpoints = endpoints;
+ MeterProvider = meterProvider;
+ ClientId = clientId;
+ }
+
+ private ClientMeter(string clientId)
+ {
+ Enabled = false;
+ ClientId = clientId;
+ }
+
+ public Endpoints Endpoints { get; }
+
+ public MeterProvider MeterProvider { get; }
+
+ public string ClientId { get; }
+
+ public bool Enabled { get; }
+
+ internal static ClientMeter DisabledInstance(string clientId)
+ {
+ return new ClientMeter(clientId);
+ }
+
+ public void Shutdown()
+ {
+ if (!Enabled)
+ {
+ return;
+ }
+
+ Logger.Info($"Begin to shutdown the client meter, clientId={ClientId}, endpoints={Endpoints}");
+ MeterProvider.Shutdown();
+ Logger.Info($"Shutdown the client meter successfully, clientId={ClientId}, endpoints={Endpoints}");
+ }
+
+ public bool Satisfy(Metric metric)
+ {
+ if (Enabled && metric.On && Endpoints.Equals(metric.Endpoints))
+ {
+ return true;
+ }
+
+ return !Enabled && !metric.On;
+ }
+ }
+}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/ClientMeterManager.cs b/csharp/rocketmq-client-csharp/ClientMeterManager.cs
new file mode 100644
index 00000000..ad808b34
--- /dev/null
+++ b/csharp/rocketmq-client-csharp/ClientMeterManager.cs
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Diagnostics.Metrics;
+using System.Net.Http;
+using NLog;
+using OpenTelemetry;
+using OpenTelemetry.Exporter;
+using OpenTelemetry.Metrics;
+using OpenTelemetry.Resources;
+
+namespace Org.Apache.Rocketmq
+{
+ public class ClientMeterManager
+ {
+ private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger();
+ private const string MeterName = "Apache.RocketMQ.Client";
+ private const string Version = "1.0";
+ private const int MetricExportPeriodInMillis = 60 * 1000;
+
+ private readonly IClient _client;
+ private volatile ClientMeter _clientMeter;
+ private readonly HttpClient _httpClient;
+ internal readonly Meter Meter;
+
+ public ClientMeterManager(IClient client)
+ {
+ _client = client;
+ var httpDelegatingHandler = new MetricHttpDelegatingHandler(client);
+ _httpClient = new HttpClient(httpDelegatingHandler);
+ _clientMeter = ClientMeter.DisabledInstance(_client.GetClientId());
+ Meter = new Meter(MeterName, Version);
+ }
+
+ public void Shutdown()
+ {
+ _clientMeter.Shutdown();
+ }
+
+ // TODO: add lock
+ public void Reset(Metric metric)
+ {
+ var clientId = _client.GetClientId();
+ if (_clientMeter.Satisfy(metric))
+ {
+ Logger.Info(
+ $"Metric settings is satisfied by the current message meter, metric={metric}, clientId={clientId}");
+ return;
+ }
+
+ if (!metric.On)
+ {
+ Logger.Info($"Metric is off, clientId={clientId}");
+ _clientMeter.Shutdown();
+ _clientMeter = ClientMeter.DisabledInstance(clientId);
+ return;
+ }
+
+ var meterProvider = Sdk.CreateMeterProviderBuilder()
+ .SetResourceBuilder(ResourceBuilder.CreateEmpty())
+ .AddMeter(MeterName)
+ .AddOtlpExporter(delegate(OtlpExporterOptions options, MetricReaderOptions readerOptions)
+ {
+ options.Protocol = OtlpExportProtocol.Grpc;
+ options.Endpoint = new Uri(metric.Endpoints.GrpcTarget);
+ options.TimeoutMilliseconds = (int)_client.GetClientConfig().RequestTimeout.TotalMilliseconds;
+ options.HttpClientFactory = () => _httpClient;
+ readerOptions.PeriodicExportingMetricReaderOptions.ExportIntervalMilliseconds =
+ MetricExportPeriodInMillis;
+ })
+ .AddView(instrument =>
+ {
+ if (MeterName != instrument.Meter.Name)
+ {
+ return null;
+ }
+
+ return instrument.Name switch
+ {
+ MetricConstant.SendCostTimeMetricName => MetricConstant.Instance.SendCostTimeBucket,
+ MetricConstant.DeliveryLatencyMetricName => MetricConstant.Instance.DeliveryLatencyBucket,
+ MetricConstant.AwaitTimeMetricName => MetricConstant.Instance.AwaitTimeBucket,
+ MetricConstant.ProcessTimeMetricName => MetricConstant.Instance.ProcessTimeBucket,
+ _ => null
+ };
+ })
+ .Build();
+
+ var exist = _clientMeter;
+ _clientMeter = new ClientMeter(metric.Endpoints, meterProvider, clientId);
+ exist.Shutdown();
+ Logger.Info($"Metric is on, endpoints={metric.Endpoints}, clientId={clientId}");
+ }
+
+ public bool IsEnabled()
+ {
+ return _clientMeter.Enabled;
+ }
+ }
+}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/Metric.cs b/csharp/rocketmq-client-csharp/Metric.cs
new file mode 100644
index 00000000..4116f05f
--- /dev/null
+++ b/csharp/rocketmq-client-csharp/Metric.cs
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using Proto = Apache.Rocketmq.V2;
+
+namespace Org.Apache.Rocketmq
+{
+ public class Metric
+ {
+ public Endpoints Endpoints { get; }
+ public bool On { get; }
+
+ public Metric(Proto.Metric metric)
+ {
+ Endpoints = null == metric.Endpoints ? null : new Endpoints(metric.Endpoints);
+ On = metric.On && null != metric.Endpoints;
+ }
+ }
+}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/MetricConstant.cs b/csharp/rocketmq-client-csharp/MetricConstant.cs
new file mode 100644
index 00000000..15e6cceb
--- /dev/null
+++ b/csharp/rocketmq-client-csharp/MetricConstant.cs
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using OpenTelemetry.Metrics;
+
+namespace Org.Apache.Rocketmq
+{
+ public class MetricConstant
+ {
+ // Metric Name
+ public const string SendCostTimeMetricName = "rocketmq_send_cost_time";
+ public const string DeliveryLatencyMetricName = "rocketmq_delivery_latency";
+ public const string AwaitTimeMetricName = "rocketmq_await_time";
+ public const string ProcessTimeMetricName = "rocketmq_process_time";
+
+ // Metric Label Name
+ public const string Topic = "topic";
+ public const string ClientId = "client_id";
+ public const string ConsumerGroup = "consumer_group";
+ public const string InvocationStatus = "invocation_status";
+
+ // Metric Label Value
+ public const string True = "true";
+ public const string False = "false";
+
+ public readonly ExplicitBucketHistogramConfiguration SendCostTimeBucket;
+ public readonly ExplicitBucketHistogramConfiguration DeliveryLatencyBucket;
+ public readonly ExplicitBucketHistogramConfiguration AwaitTimeBucket;
+ public readonly ExplicitBucketHistogramConfiguration ProcessTimeBucket;
+
+ public static readonly MetricConstant Instance = new();
+
+ private MetricConstant()
+ {
+ SendCostTimeBucket = new ExplicitBucketHistogramConfiguration
+ {
+ Boundaries = new[] { 1.0, 5.0, 10.0, 20.0, 50.0, 200.0, 500.0 }
+ };
+
+ DeliveryLatencyBucket = new ExplicitBucketHistogramConfiguration
+ {
+ Boundaries = new[] { 1.0, 5.0, 10.0, 20.0, 50.0, 200.0, 500.0 }
+ };
+
+ AwaitTimeBucket = new ExplicitBucketHistogramConfiguration
+ {
+ Boundaries = new[] { 1.0, 5.0, 20.0, 100.0, 1000.0, 5 * 1000.0, 10 * 1000.0 }
+ };
+
+ ProcessTimeBucket = new ExplicitBucketHistogramConfiguration
+ {
+ Boundaries = new[] { 1.0, 5.0, 10.0, 100.0, 1000.0, 10 * 1000.0, 60 * 1000.0 }
+ };
+ }
+ }
+}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/MetricHttpDelegatingHandler.cs b/csharp/rocketmq-client-csharp/MetricHttpDelegatingHandler.cs
new file mode 100644
index 00000000..c5705548
--- /dev/null
+++ b/csharp/rocketmq-client-csharp/MetricHttpDelegatingHandler.cs
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System.Net.Http;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Org.Apache.Rocketmq
+{
+ public class MetricHttpDelegatingHandler : DelegatingHandler
+ {
+ private readonly IClient _client;
+
+ public MetricHttpDelegatingHandler(IClient client)
+ {
+ _client = client;
+ InnerHandler = RpcClient.CreateHttpHandler();
+ }
+
+ protected override async Task<HttpResponseMessage> SendAsync(
+ HttpRequestMessage request,
+ CancellationToken cancellationToken)
+ {
+ var headers = Signature.Sign(_client);
+ foreach (var (key, value) in headers)
+ {
+ // Add extra headers for auth.
+ request.Headers.TryAddWithoutValidation(key, value);
+ }
+
+ return await base.SendAsync(request, cancellationToken);
+ }
+ }
+}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/Producer.cs b/csharp/rocketmq-client-csharp/Producer.cs
index 838263a2..81d2b8d7 100644
--- a/csharp/rocketmq-client-csharp/Producer.cs
+++ b/csharp/rocketmq-client-csharp/Producer.cs
@@ -18,6 +18,8 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
+using System.Diagnostics;
+using System.Diagnostics.Metrics;
using System.Linq;
using System.Threading.Tasks;
using Proto = Apache.Rocketmq.V2;
@@ -33,6 +35,8 @@ namespace Org.Apache.Rocketmq
private readonly ConcurrentDictionary<string, bool> _publishingTopics;
private ITransactionChecker _checker = null;
+ private readonly Histogram<double> _sendCostTimeHistogram;
+
public Producer(ClientConfig clientConfig) : this(clientConfig, new ConcurrentDictionary<string, bool>(), 3)
{
}
@@ -51,6 +55,8 @@ namespace Org.Apache.Rocketmq
clientConfig.RequestTimeout, publishingTopics);
_publishingRouteDataCache = new ConcurrentDictionary<string, PublishingLoadBalancer>();
_publishingTopics = publishingTopics;
+ _sendCostTimeHistogram =
+ ClientMeterManager.Meter.CreateHistogram<double>(MetricConstant.SendCostTimeMetricName, "milliseconds");
}
public void SetTopics(params string[] topics)
@@ -163,6 +169,7 @@ namespace Org.Apache.Rocketmq
Exception exception = null;
for (var attempt = 1; attempt <= maxAttempts; attempt++)
{
+ var stopwatch = Stopwatch.StartNew();
try
{
var sendReceipt = await Send0(publishingMessage, candidates, attempt, maxAttempts);
@@ -172,6 +179,15 @@ namespace Org.Apache.Rocketmq
{
exception = e;
}
+ finally
+ {
+ var elapsed = stopwatch.Elapsed.Milliseconds;
+ _sendCostTimeHistogram.Record(elapsed,
+ new KeyValuePair<string, object>(MetricConstant.Topic, message.Topic),
+ new KeyValuePair<string, object>(MetricConstant.ClientId, ClientId),
+ new KeyValuePair<string, object>(MetricConstant.InvocationStatus,
+ null == exception ? MetricConstant.True : MetricConstant.False));
+ }
}
throw exception!;
@@ -183,6 +199,7 @@ namespace Org.Apache.Rocketmq
{
throw new InvalidOperationException("Producer is not running");
}
+
var sendReceipt = await Send(message, false);
return sendReceipt;
}
@@ -193,6 +210,7 @@ namespace Org.Apache.Rocketmq
{
throw new InvalidOperationException("Producer is not running");
}
+
var tx = (Transaction)transaction;
var publishingMessage = tx.TryAddMessage(message);
var sendReceipt = await Send(message, true);
diff --git a/csharp/rocketmq-client-csharp/RpcClient.cs b/csharp/rocketmq-client-csharp/RpcClient.cs
index bf45410c..47db6830 100644
--- a/csharp/rocketmq-client-csharp/RpcClient.cs
+++ b/csharp/rocketmq-client-csharp/RpcClient.cs
@@ -59,12 +59,14 @@ namespace Org.Apache.Rocketmq
* See https://docs.microsoft.com/en-us/aspnet/core/grpc/performance?view=aspnetcore-6.0 for performance consideration and
* why parameters are configured this way.
*/
- private HttpMessageHandler CreateHttpHandler()
+ internal static HttpMessageHandler CreateHttpHandler()
{
- var sslOptions = new SslClientAuthenticationOptions();
- // Disable server certificate validation during development phase.
- // Comment out the following line if server certificate validation is required.
- sslOptions.RemoteCertificateValidationCallback = (sender, cert, chain, sslPolicyErrors) => { return true; };
+ var sslOptions = new SslClientAuthenticationOptions
+ {
+ // Comment out the following line if server certificate validation is required.
+ // Disable server certificate validation during development phase.
+ RemoteCertificateValidationCallback = (_, _, _, _) => true
+ };
var handler = new SocketsHttpHandler
{
PooledConnectionIdleTimeout = Timeout.InfiniteTimeSpan,
diff --git a/csharp/rocketmq-client-csharp/Signature.cs b/csharp/rocketmq-client-csharp/Signature.cs
index 8588c25a..5ae8abc0 100644
--- a/csharp/rocketmq-client-csharp/Signature.cs
+++ b/csharp/rocketmq-client-csharp/Signature.cs
@@ -16,6 +16,7 @@
*/
using System;
+using System.Collections.Generic;
using System.Text;
using grpc = Grpc.Core;
using System.Security.Cryptography;
@@ -26,23 +27,33 @@ namespace Org.Apache.Rocketmq
{
public static void Sign(IClient client, grpc::Metadata metadata)
{
+ var headers = Sign(client);
+ foreach (var (key, value) in headers)
+ {
+ metadata.Add(key, value);
+ }
+ }
+
+ public static Dictionary<string, string> Sign(IClient client)
+ {
+ Dictionary<string, string> dictionary = new Dictionary<string, string>();
var clientConfig = client.GetClientConfig();
- metadata.Add(MetadataConstants.LanguageKey, MetadataConstants.LanguageValue);
- metadata.Add(MetadataConstants.ClientVersionKey, MetadataConstants.Instance.ClientVersion);
- metadata.Add(MetadataConstants.ClientIdKey, client.GetClientId());
+ dictionary.Add(MetadataConstants.LanguageKey, MetadataConstants.LanguageValue);
+ dictionary.Add(MetadataConstants.ClientVersionKey, MetadataConstants.Instance.ClientVersion);
+ dictionary.Add(MetadataConstants.ClientIdKey, client.GetClientId());
var time = DateTime.Now.ToString(MetadataConstants.DateTimeFormat);
- metadata.Add(MetadataConstants.DateTimeKey, time);
+ dictionary.Add(MetadataConstants.DateTimeKey, time);
var credentials = clientConfig.CredentialsProvider?.Credentials;
if (credentials == null || credentials.expired())
{
- return;
+ return dictionary;
}
if (!string.IsNullOrEmpty(credentials.SessionToken))
{
- metadata.Add(MetadataConstants.SessionTokenKey, credentials.SessionToken);
+ dictionary.Add(MetadataConstants.SessionTokenKey, credentials.SessionToken);
}
var secretData = Encoding.ASCII.GetBytes(credentials.AccessSecret);
@@ -54,7 +65,8 @@ namespace Org.Apache.Rocketmq
$"{MetadataConstants.CredentialKey}={credentials.AccessKey}, " +
$"{MetadataConstants.SignedHeadersKey}={MetadataConstants.DateTimeKey}, " +
$"{MetadataConstants.SignatureKey}={hmac}";
- metadata.Add(MetadataConstants.Authorization, authorization);
+ dictionary.Add(MetadataConstants.Authorization, authorization);
+ return dictionary;
}
}
}
\ No newline at end of file