You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2023/02/27 03:52:10 UTC
[rocketmq-clients] 02/02: Add lock for ClientMeterManager#Reset
This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit c66db15e47888381f7bd9e69033ad887c91ded5f
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Mon Feb 27 11:14:44 2023 +0800
Add lock for ClientMeterManager#Reset
---
.../rocketmq-client-csharp/ClientMeterManager.cs | 94 +++++++++++-----------
1 file changed, 49 insertions(+), 45 deletions(-)
diff --git a/csharp/rocketmq-client-csharp/ClientMeterManager.cs b/csharp/rocketmq-client-csharp/ClientMeterManager.cs
index ad808b34..2ac06b97 100644
--- a/csharp/rocketmq-client-csharp/ClientMeterManager.cs
+++ b/csharp/rocketmq-client-csharp/ClientMeterManager.cs
@@ -36,6 +36,7 @@ namespace Org.Apache.Rocketmq
private readonly IClient _client;
private volatile ClientMeter _clientMeter;
private readonly HttpClient _httpClient;
+ private readonly object _lock;
internal readonly Meter Meter;
public ClientMeterManager(IClient client)
@@ -44,6 +45,7 @@ namespace Org.Apache.Rocketmq
var httpDelegatingHandler = new MetricHttpDelegatingHandler(client);
_httpClient = new HttpClient(httpDelegatingHandler);
_clientMeter = ClientMeter.DisabledInstance(_client.GetClientId());
+ _lock = new object();
Meter = new Meter(MeterName, Version);
}
@@ -52,59 +54,61 @@ namespace Org.Apache.Rocketmq
_clientMeter.Shutdown();
}
- // TODO: add lock
public void Reset(Metric metric)
{
- var clientId = _client.GetClientId();
- if (_clientMeter.Satisfy(metric))
+ lock (_lock)
{
- Logger.Info(
- $"Metric settings is satisfied by the current message meter, metric={metric}, clientId={clientId}");
- return;
- }
-
- if (!metric.On)
- {
- Logger.Info($"Metric is off, clientId={clientId}");
- _clientMeter.Shutdown();
- _clientMeter = ClientMeter.DisabledInstance(clientId);
- return;
- }
-
- var meterProvider = Sdk.CreateMeterProviderBuilder()
- .SetResourceBuilder(ResourceBuilder.CreateEmpty())
- .AddMeter(MeterName)
- .AddOtlpExporter(delegate(OtlpExporterOptions options, MetricReaderOptions readerOptions)
+ var clientId = _client.GetClientId();
+ if (_clientMeter.Satisfy(metric))
{
- options.Protocol = OtlpExportProtocol.Grpc;
- options.Endpoint = new Uri(metric.Endpoints.GrpcTarget);
- options.TimeoutMilliseconds = (int)_client.GetClientConfig().RequestTimeout.TotalMilliseconds;
- options.HttpClientFactory = () => _httpClient;
- readerOptions.PeriodicExportingMetricReaderOptions.ExportIntervalMilliseconds =
- MetricExportPeriodInMillis;
- })
- .AddView(instrument =>
+ Logger.Info(
+ $"Metric settings is satisfied by the current message meter, metric={metric}, clientId={clientId}");
+ return;
+ }
+
+ if (!metric.On)
{
- if (MeterName != instrument.Meter.Name)
- {
- return null;
- }
+ Logger.Info($"Metric is off, clientId={clientId}");
+ _clientMeter.Shutdown();
+ _clientMeter = ClientMeter.DisabledInstance(clientId);
+ return;
+ }
- return instrument.Name switch
+ var meterProvider = Sdk.CreateMeterProviderBuilder()
+ .SetResourceBuilder(ResourceBuilder.CreateEmpty())
+ .AddMeter(MeterName)
+ .AddOtlpExporter(delegate(OtlpExporterOptions options, MetricReaderOptions readerOptions)
+ {
+ options.Protocol = OtlpExportProtocol.Grpc;
+ options.Endpoint = new Uri(metric.Endpoints.GrpcTarget);
+ options.TimeoutMilliseconds = (int)_client.GetClientConfig().RequestTimeout.TotalMilliseconds;
+ options.HttpClientFactory = () => _httpClient;
+ readerOptions.PeriodicExportingMetricReaderOptions.ExportIntervalMilliseconds =
+ MetricExportPeriodInMillis;
+ })
+ .AddView(instrument =>
{
- MetricConstant.SendCostTimeMetricName => MetricConstant.Instance.SendCostTimeBucket,
- MetricConstant.DeliveryLatencyMetricName => MetricConstant.Instance.DeliveryLatencyBucket,
- MetricConstant.AwaitTimeMetricName => MetricConstant.Instance.AwaitTimeBucket,
- MetricConstant.ProcessTimeMetricName => MetricConstant.Instance.ProcessTimeBucket,
- _ => null
- };
- })
- .Build();
+ if (MeterName != instrument.Meter.Name)
+ {
+ return null;
+ }
- var exist = _clientMeter;
- _clientMeter = new ClientMeter(metric.Endpoints, meterProvider, clientId);
- exist.Shutdown();
- Logger.Info($"Metric is on, endpoints={metric.Endpoints}, clientId={clientId}");
+ return instrument.Name switch
+ {
+ MetricConstant.SendCostTimeMetricName => MetricConstant.Instance.SendCostTimeBucket,
+ MetricConstant.DeliveryLatencyMetricName => MetricConstant.Instance.DeliveryLatencyBucket,
+ MetricConstant.AwaitTimeMetricName => MetricConstant.Instance.AwaitTimeBucket,
+ MetricConstant.ProcessTimeMetricName => MetricConstant.Instance.ProcessTimeBucket,
+ _ => null
+ };
+ })
+ .Build();
+
+ var exist = _clientMeter;
+ _clientMeter = new ClientMeter(metric.Endpoints, meterProvider, clientId);
+ exist.Shutdown();
+ Logger.Info($"Metric is on, endpoints={metric.Endpoints}, clientId={clientId}");
+ }
}
public bool IsEnabled()