You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2023/02/27 03:52:09 UTC

[rocketmq-clients] 01/02: WIP: implement client metrics by OpenTelemetry

This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git

commit e219eebfaaa81cfc7c624ff08d9af0a5ca139319
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Mon Feb 27 11:14:22 2023 +0800

    WIP: implement client metrics by OpenTelemetry
---
 csharp/rocketmq-client-csharp/Client.cs            |   5 +
 csharp/rocketmq-client-csharp/ClientMeter.cs       |  76 ++++++++++++++
 .../rocketmq-client-csharp/ClientMeterManager.cs   | 115 +++++++++++++++++++++
 csharp/rocketmq-client-csharp/Metric.cs            |  33 ++++++
 csharp/rocketmq-client-csharp/MetricConstant.cs    |  70 +++++++++++++
 .../MetricHttpDelegatingHandler.cs                 |  48 +++++++++
 csharp/rocketmq-client-csharp/Producer.cs          |  18 ++++
 csharp/rocketmq-client-csharp/RpcClient.cs         |  12 ++-
 csharp/rocketmq-client-csharp/Signature.cs         |  26 +++--
 9 files changed, 391 insertions(+), 12 deletions(-)

diff --git a/csharp/rocketmq-client-csharp/Client.cs b/csharp/rocketmq-client-csharp/Client.cs
index 305383e2..48700c33 100644
--- a/csharp/rocketmq-client-csharp/Client.cs
+++ b/csharp/rocketmq-client-csharp/Client.cs
@@ -51,6 +51,7 @@ namespace Org.Apache.Rocketmq
         protected readonly Endpoints Endpoints;
         protected readonly IClientManager ClientManager;
         protected readonly string ClientId;
+        protected readonly ClientMeterManager ClientMeterManager;
 
         protected readonly ConcurrentDictionary<Endpoints, bool> Isolated;
         private readonly ConcurrentDictionary<string, TopicRouteData> _topicRouteCache;
@@ -65,6 +66,7 @@ namespace Org.Apache.Rocketmq
             ClientConfig = clientConfig;
             Endpoints = new Endpoints(clientConfig.Endpoints);
             ClientId = Utilities.GetClientId();
+            ClientMeterManager = new ClientMeterManager(this);
 
             ClientManager = new ClientManager(this);
             Isolated = new ConcurrentDictionary<Endpoints, bool>();
@@ -107,6 +109,7 @@ namespace Org.Apache.Rocketmq
             _statsCts.Cancel();
             NotifyClientTermination();
             await ClientManager.Shutdown();
+            ClientMeterManager.Shutdown();
             Logger.Debug($"Shutdown the rocketmq client successfully, clientId={ClientId}");
         }
 
@@ -482,6 +485,8 @@ namespace Org.Apache.Rocketmq
 
         public void OnSettingsCommand(Endpoints endpoints, Proto.Settings settings)
         {
+            var metric = new Metric(settings.Metric);
+            ClientMeterManager.Reset(metric);
             GetSettings().Sync(settings);
         }
     }
diff --git a/csharp/rocketmq-client-csharp/ClientMeter.cs b/csharp/rocketmq-client-csharp/ClientMeter.cs
new file mode 100644
index 00000000..53fc2cdf
--- /dev/null
+++ b/csharp/rocketmq-client-csharp/ClientMeter.cs
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using NLog;
+using OpenTelemetry.Metrics;
+
+namespace Org.Apache.Rocketmq
+{
+    public class ClientMeter
+    {
+        private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger();
+
+        public ClientMeter(Endpoints endpoints, MeterProvider meterProvider, string clientId)
+        {
+            Enabled = true;
+            Endpoints = endpoints;
+            MeterProvider = meterProvider;
+            ClientId = clientId;
+        }
+
+        private ClientMeter(string clientId)
+        {
+            Enabled = false;
+            ClientId = clientId;
+        }
+
+        public Endpoints Endpoints { get; }
+
+        public MeterProvider MeterProvider { get; }
+
+        public string ClientId { get; }
+
+        public bool Enabled { get; }
+
+        internal static ClientMeter DisabledInstance(string clientId)
+        {
+            return new ClientMeter(clientId);
+        }
+
+        public void Shutdown()
+        {
+            if (!Enabled)
+            {
+                return;
+            }
+
+            Logger.Info($"Begin to shutdown the client meter, clientId={ClientId}, endpoints={Endpoints}");
+            MeterProvider.Shutdown();
+            Logger.Info($"Shutdown the client meter successfully, clientId={ClientId}, endpoints={Endpoints}");
+        }
+
+        public bool Satisfy(Metric metric)
+        {
+            if (Enabled && metric.On && Endpoints.Equals(metric.Endpoints))
+            {
+                return true;
+            }
+
+            return !Enabled && !metric.On;
+        }
+    }
+}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/ClientMeterManager.cs b/csharp/rocketmq-client-csharp/ClientMeterManager.cs
new file mode 100644
index 00000000..ad808b34
--- /dev/null
+++ b/csharp/rocketmq-client-csharp/ClientMeterManager.cs
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Diagnostics.Metrics;
+using System.Net.Http;
+using NLog;
+using OpenTelemetry;
+using OpenTelemetry.Exporter;
+using OpenTelemetry.Metrics;
+using OpenTelemetry.Resources;
+
+namespace Org.Apache.Rocketmq
+{
+    public class ClientMeterManager
+    {
+        private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger();
+        private const string MeterName = "Apache.RocketMQ.Client";
+        private const string Version = "1.0";
+        private const int MetricExportPeriodInMillis = 60 * 1000;
+
+        private readonly IClient _client;
+        private volatile ClientMeter _clientMeter;
+        private readonly HttpClient _httpClient;
+        internal readonly Meter Meter;
+
+        public ClientMeterManager(IClient client)
+        {
+            _client = client;
+            var httpDelegatingHandler = new MetricHttpDelegatingHandler(client);
+            _httpClient = new HttpClient(httpDelegatingHandler);
+            _clientMeter = ClientMeter.DisabledInstance(_client.GetClientId());
+            Meter = new Meter(MeterName, Version);
+        }
+
+        public void Shutdown()
+        {
+            _clientMeter.Shutdown();
+        }
+
+        // TODO: add lock
+        public void Reset(Metric metric)
+        {
+            var clientId = _client.GetClientId();
+            if (_clientMeter.Satisfy(metric))
+            {
+                Logger.Info(
+                    $"Metric settings is satisfied by the current message meter, metric={metric}, clientId={clientId}");
+                return;
+            }
+
+            if (!metric.On)
+            {
+                Logger.Info($"Metric is off, clientId={clientId}");
+                _clientMeter.Shutdown();
+                _clientMeter = ClientMeter.DisabledInstance(clientId);
+                return;
+            }
+
+            var meterProvider = Sdk.CreateMeterProviderBuilder()
+                .SetResourceBuilder(ResourceBuilder.CreateEmpty())
+                .AddMeter(MeterName)
+                .AddOtlpExporter(delegate(OtlpExporterOptions options, MetricReaderOptions readerOptions)
+                {
+                    options.Protocol = OtlpExportProtocol.Grpc;
+                    options.Endpoint = new Uri(metric.Endpoints.GrpcTarget);
+                    options.TimeoutMilliseconds = (int)_client.GetClientConfig().RequestTimeout.TotalMilliseconds;
+                    options.HttpClientFactory = () => _httpClient;
+                    readerOptions.PeriodicExportingMetricReaderOptions.ExportIntervalMilliseconds =
+                        MetricExportPeriodInMillis;
+                })
+                .AddView(instrument =>
+                {
+                    if (MeterName != instrument.Meter.Name)
+                    {
+                        return null;
+                    }
+
+                    return instrument.Name switch
+                    {
+                        MetricConstant.SendCostTimeMetricName => MetricConstant.Instance.SendCostTimeBucket,
+                        MetricConstant.DeliveryLatencyMetricName => MetricConstant.Instance.DeliveryLatencyBucket,
+                        MetricConstant.AwaitTimeMetricName => MetricConstant.Instance.AwaitTimeBucket,
+                        MetricConstant.ProcessTimeMetricName => MetricConstant.Instance.ProcessTimeBucket,
+                        _ => null
+                    };
+                })
+                .Build();
+
+            var exist = _clientMeter;
+            _clientMeter = new ClientMeter(metric.Endpoints, meterProvider, clientId);
+            exist.Shutdown();
+            Logger.Info($"Metric is on, endpoints={metric.Endpoints}, clientId={clientId}");
+        }
+
+        public bool IsEnabled()
+        {
+            return _clientMeter.Enabled;
+        }
+    }
+}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/Metric.cs b/csharp/rocketmq-client-csharp/Metric.cs
new file mode 100644
index 00000000..4116f05f
--- /dev/null
+++ b/csharp/rocketmq-client-csharp/Metric.cs
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using Proto = Apache.Rocketmq.V2;
+
+namespace Org.Apache.Rocketmq
+{
+    public class Metric
+    {
+        public Endpoints Endpoints { get; }
+        public bool On { get; }
+
+        public Metric(Proto.Metric metric)
+        {
+            Endpoints = null == metric.Endpoints ? null : new Endpoints(metric.Endpoints);
+            On = metric.On && null != metric.Endpoints;
+        }
+    }
+}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/MetricConstant.cs b/csharp/rocketmq-client-csharp/MetricConstant.cs
new file mode 100644
index 00000000..15e6cceb
--- /dev/null
+++ b/csharp/rocketmq-client-csharp/MetricConstant.cs
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using OpenTelemetry.Metrics;
+
+namespace Org.Apache.Rocketmq
+{
+    public class MetricConstant
+    {
+        // Metric Name
+        public const string SendCostTimeMetricName = "rocketmq_send_cost_time";
+        public const string DeliveryLatencyMetricName = "rocketmq_delivery_latency";
+        public const string AwaitTimeMetricName = "rocketmq_await_time";
+        public const string ProcessTimeMetricName = "rocketmq_process_time";
+
+        // Metric Label Name
+        public const string Topic = "topic";
+        public const string ClientId = "client_id";
+        public const string ConsumerGroup = "consumer_group";
+        public const string InvocationStatus = "invocation_status";
+
+        // Metric Label Value
+        public const string True = "true";
+        public const string False = "false";
+
+        public readonly ExplicitBucketHistogramConfiguration SendCostTimeBucket;
+        public readonly ExplicitBucketHistogramConfiguration DeliveryLatencyBucket;
+        public readonly ExplicitBucketHistogramConfiguration AwaitTimeBucket;
+        public readonly ExplicitBucketHistogramConfiguration ProcessTimeBucket;
+
+        public static readonly MetricConstant Instance = new();
+
+        private MetricConstant()
+        {
+            SendCostTimeBucket = new ExplicitBucketHistogramConfiguration
+            {
+                Boundaries = new[] { 1.0, 5.0, 10.0, 20.0, 50.0, 200.0, 500.0 }
+            };
+
+            DeliveryLatencyBucket = new ExplicitBucketHistogramConfiguration
+            {
+                Boundaries = new[] { 1.0, 5.0, 10.0, 20.0, 50.0, 200.0, 500.0 }
+            };
+
+            AwaitTimeBucket = new ExplicitBucketHistogramConfiguration
+            {
+                Boundaries = new[] { 1.0, 5.0, 20.0, 100.0, 1000.0, 5 * 1000.0, 10 * 1000.0 }
+            };
+
+            ProcessTimeBucket = new ExplicitBucketHistogramConfiguration
+            {
+                Boundaries = new[] { 1.0, 5.0, 10.0, 100.0, 1000.0, 10 * 1000.0, 60 * 1000.0 }
+            };
+        }
+    }
+}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/MetricHttpDelegatingHandler.cs b/csharp/rocketmq-client-csharp/MetricHttpDelegatingHandler.cs
new file mode 100644
index 00000000..c5705548
--- /dev/null
+++ b/csharp/rocketmq-client-csharp/MetricHttpDelegatingHandler.cs
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System.Net.Http;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Org.Apache.Rocketmq
+{
+    public class MetricHttpDelegatingHandler : DelegatingHandler
+    {
+        private readonly IClient _client;
+
+        public MetricHttpDelegatingHandler(IClient client)
+        {
+            _client = client;
+            InnerHandler = RpcClient.CreateHttpHandler();
+        }
+
+        protected override async Task<HttpResponseMessage> SendAsync(
+            HttpRequestMessage request,
+            CancellationToken cancellationToken)
+        {
+            var headers = Signature.Sign(_client);
+            foreach (var (key, value) in headers)
+            {
+                // Add extra headers for auth.
+                request.Headers.TryAddWithoutValidation(key, value);
+            }
+
+            return await base.SendAsync(request, cancellationToken);
+        }
+    }
+}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/Producer.cs b/csharp/rocketmq-client-csharp/Producer.cs
index 838263a2..81d2b8d7 100644
--- a/csharp/rocketmq-client-csharp/Producer.cs
+++ b/csharp/rocketmq-client-csharp/Producer.cs
@@ -18,6 +18,8 @@
 using System;
 using System.Collections.Concurrent;
 using System.Collections.Generic;
+using System.Diagnostics;
+using System.Diagnostics.Metrics;
 using System.Linq;
 using System.Threading.Tasks;
 using Proto = Apache.Rocketmq.V2;
@@ -33,6 +35,8 @@ namespace Org.Apache.Rocketmq
         private readonly ConcurrentDictionary<string, bool> _publishingTopics;
         private ITransactionChecker _checker = null;
 
+        private readonly Histogram<double> _sendCostTimeHistogram;
+
         public Producer(ClientConfig clientConfig) : this(clientConfig, new ConcurrentDictionary<string, bool>(), 3)
         {
         }
@@ -51,6 +55,8 @@ namespace Org.Apache.Rocketmq
                 clientConfig.RequestTimeout, publishingTopics);
             _publishingRouteDataCache = new ConcurrentDictionary<string, PublishingLoadBalancer>();
             _publishingTopics = publishingTopics;
+            _sendCostTimeHistogram =
+                ClientMeterManager.Meter.CreateHistogram<double>(MetricConstant.SendCostTimeMetricName, "milliseconds");
         }
 
         public void SetTopics(params string[] topics)
@@ -163,6 +169,7 @@ namespace Org.Apache.Rocketmq
             Exception exception = null;
             for (var attempt = 1; attempt <= maxAttempts; attempt++)
             {
+                var stopwatch = Stopwatch.StartNew();
                 try
                 {
                     var sendReceipt = await Send0(publishingMessage, candidates, attempt, maxAttempts);
@@ -172,6 +179,15 @@ namespace Org.Apache.Rocketmq
                 {
                     exception = e;
                 }
+                finally
+                {
+                    var elapsed = stopwatch.Elapsed.Milliseconds;
+                    _sendCostTimeHistogram.Record(elapsed,
+                        new KeyValuePair<string, object>(MetricConstant.Topic, message.Topic),
+                        new KeyValuePair<string, object>(MetricConstant.ClientId, ClientId),
+                        new KeyValuePair<string, object>(MetricConstant.InvocationStatus,
+                            null == exception ? MetricConstant.True : MetricConstant.False));
+                }
             }
 
             throw exception!;
@@ -183,6 +199,7 @@ namespace Org.Apache.Rocketmq
             {
                 throw new InvalidOperationException("Producer is not running");
             }
+
             var sendReceipt = await Send(message, false);
             return sendReceipt;
         }
@@ -193,6 +210,7 @@ namespace Org.Apache.Rocketmq
             {
                 throw new InvalidOperationException("Producer is not running");
             }
+
             var tx = (Transaction)transaction;
             var publishingMessage = tx.TryAddMessage(message);
             var sendReceipt = await Send(message, true);
diff --git a/csharp/rocketmq-client-csharp/RpcClient.cs b/csharp/rocketmq-client-csharp/RpcClient.cs
index bf45410c..47db6830 100644
--- a/csharp/rocketmq-client-csharp/RpcClient.cs
+++ b/csharp/rocketmq-client-csharp/RpcClient.cs
@@ -59,12 +59,14 @@ namespace Org.Apache.Rocketmq
          * See https://docs.microsoft.com/en-us/aspnet/core/grpc/performance?view=aspnetcore-6.0 for performance consideration and
          * why parameters are configured this way.
          */
-        private HttpMessageHandler CreateHttpHandler()
+        internal static HttpMessageHandler CreateHttpHandler()
         {
-            var sslOptions = new SslClientAuthenticationOptions();
-            // Disable server certificate validation during development phase.
-            // Comment out the following line if server certificate validation is required. 
-            sslOptions.RemoteCertificateValidationCallback = (sender, cert, chain, sslPolicyErrors) => { return true; };
+            var sslOptions = new SslClientAuthenticationOptions
+            {
+                // Comment out the following line if server certificate validation is required. 
+                // Disable server certificate validation during development phase.
+                RemoteCertificateValidationCallback = (_, _, _, _) => true
+            };
             var handler = new SocketsHttpHandler
             {
                 PooledConnectionIdleTimeout = Timeout.InfiniteTimeSpan,
diff --git a/csharp/rocketmq-client-csharp/Signature.cs b/csharp/rocketmq-client-csharp/Signature.cs
index 8588c25a..5ae8abc0 100644
--- a/csharp/rocketmq-client-csharp/Signature.cs
+++ b/csharp/rocketmq-client-csharp/Signature.cs
@@ -16,6 +16,7 @@
  */
 
 using System;
+using System.Collections.Generic;
 using System.Text;
 using grpc = Grpc.Core;
 using System.Security.Cryptography;
@@ -26,23 +27,33 @@ namespace Org.Apache.Rocketmq
     {
         public static void Sign(IClient client, grpc::Metadata metadata)
         {
+            var headers = Sign(client);
+            foreach (var (key, value) in headers)
+            {
+                metadata.Add(key, value);
+            }
+        }
+
+        public static Dictionary<string, string> Sign(IClient client)
+        {
+            Dictionary<string, string> dictionary = new Dictionary<string, string>();
             var clientConfig = client.GetClientConfig();
-            metadata.Add(MetadataConstants.LanguageKey, MetadataConstants.LanguageValue);
-            metadata.Add(MetadataConstants.ClientVersionKey, MetadataConstants.Instance.ClientVersion);
-            metadata.Add(MetadataConstants.ClientIdKey, client.GetClientId());
+            dictionary.Add(MetadataConstants.LanguageKey, MetadataConstants.LanguageValue);
+            dictionary.Add(MetadataConstants.ClientVersionKey, MetadataConstants.Instance.ClientVersion);
+            dictionary.Add(MetadataConstants.ClientIdKey, client.GetClientId());
 
             var time = DateTime.Now.ToString(MetadataConstants.DateTimeFormat);
-            metadata.Add(MetadataConstants.DateTimeKey, time);
+            dictionary.Add(MetadataConstants.DateTimeKey, time);
 
             var credentials = clientConfig.CredentialsProvider?.Credentials;
             if (credentials == null || credentials.expired())
             {
-                return;
+                return dictionary;
             }
 
             if (!string.IsNullOrEmpty(credentials.SessionToken))
             {
-                metadata.Add(MetadataConstants.SessionTokenKey, credentials.SessionToken);
+                dictionary.Add(MetadataConstants.SessionTokenKey, credentials.SessionToken);
             }
 
             var secretData = Encoding.ASCII.GetBytes(credentials.AccessSecret);
@@ -54,7 +65,8 @@ namespace Org.Apache.Rocketmq
                                 $"{MetadataConstants.CredentialKey}={credentials.AccessKey}, " +
                                 $"{MetadataConstants.SignedHeadersKey}={MetadataConstants.DateTimeKey}, " +
                                 $"{MetadataConstants.SignatureKey}={hmac}";
-            metadata.Add(MetadataConstants.Authorization, authorization);
+            dictionary.Add(MetadataConstants.Authorization, authorization);
+            return dictionary;
         }
     }
 }
\ No newline at end of file