You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2023/02/23 15:14:57 UTC
[rocketmq-clients] 17/28: Add more stats info
This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit 7be8d6da9fae153f5781fea13022849e9288d4d6
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Fri Feb 17 10:31:42 2023 +0800
Add more stats info
---
csharp/rocketmq-client-csharp/Client.cs | 29 ++++++++++++++++++++++-------
1 file changed, 22 insertions(+), 7 deletions(-)
diff --git a/csharp/rocketmq-client-csharp/Client.cs b/csharp/rocketmq-client-csharp/Client.cs
index 63c24c91..8e49851a 100644
--- a/csharp/rocketmq-client-csharp/Client.cs
+++ b/csharp/rocketmq-client-csharp/Client.cs
@@ -37,11 +37,15 @@ namespace Org.Apache.Rocketmq
private static readonly TimeSpan TopicRouteUpdateScheduleDelay = TimeSpan.FromSeconds(10);
private static readonly TimeSpan TopicRouteUpdateSchedulePeriod = TimeSpan.FromSeconds(30);
- private readonly CancellationTokenSource _topicRouteUpdateCtx;
+ private readonly CancellationTokenSource _topicRouteUpdateCts;
private static readonly TimeSpan SettingsSyncScheduleDelay = TimeSpan.FromSeconds(1);
private static readonly TimeSpan SettingsSyncSchedulePeriod = TimeSpan.FromMinutes(5);
- private readonly CancellationTokenSource _settingsSyncCtx;
+ private readonly CancellationTokenSource _settingsSyncCts;
+
+ private static readonly TimeSpan StatsScheduleDelay = TimeSpan.FromSeconds(1);
+ private static readonly TimeSpan StatsSchedulePeriod = TimeSpan.FromSeconds(1);
+ private readonly CancellationTokenSource _statsCts;
protected readonly ClientConfig ClientConfig;
protected readonly IClientManager ClientManager;
@@ -63,10 +67,11 @@ namespace Org.Apache.Rocketmq
Isolated = new ConcurrentDictionary<Endpoints, bool>();
_topicRouteCache = new ConcurrentDictionary<string, TopicRouteData>();
- _topicRouteUpdateCtx = new CancellationTokenSource();
- _settingsSyncCtx = new CancellationTokenSource();
+ _topicRouteUpdateCts = new CancellationTokenSource();
+ _settingsSyncCts = new CancellationTokenSource();
_heartbeatCts = new CancellationTokenSource();
_telemetryCts = new CancellationTokenSource();
+ _statsCts = new CancellationTokenSource();
_sessionsTable = new Dictionary<Endpoints, Session>();
_sessionLock = new ReaderWriterLockSlim();
@@ -76,10 +81,11 @@ namespace Org.Apache.Rocketmq
{
Logger.Debug($"Begin to start the rocketmq client, clientId={ClientId}");
ScheduleWithFixedDelay(UpdateTopicRouteCache, TopicRouteUpdateScheduleDelay, TopicRouteUpdateSchedulePeriod,
- _topicRouteUpdateCtx.Token);
+ _topicRouteUpdateCts.Token);
ScheduleWithFixedDelay(Heartbeat, HeartbeatScheduleDelay, HeartbeatSchedulePeriod, _heartbeatCts.Token);
ScheduleWithFixedDelay(SyncSettings, SettingsSyncScheduleDelay, SettingsSyncSchedulePeriod,
- _settingsSyncCtx.Token);
+ _settingsSyncCts.Token);
+ ScheduleWithFixedDelay(Stats, StatsScheduleDelay, StatsSchedulePeriod, _statsCts.Token);
foreach (var topic in GetTopics())
{
await FetchTopicRoute(topic);
@@ -91,7 +97,7 @@ namespace Org.Apache.Rocketmq
public virtual async Task Shutdown()
{
Logger.Debug($"Begin to shutdown rocketmq client, clientId={ClientId}");
- _topicRouteUpdateCtx.Cancel();
+ _topicRouteUpdateCts.Cancel();
_heartbeatCts.Cancel();
_telemetryCts.Cancel();
NotifyClientTermination();
@@ -221,6 +227,15 @@ namespace Org.Apache.Rocketmq
}
}
+ private void Stats()
+ {
+ ThreadPool.GetAvailableThreads(out var availableWorker, out var availableIo);
+ Logger.Info($"ThreadCount={ThreadPool.ThreadCount}, " +
+ $"CompletedWorkItemCount={ThreadPool.CompletedWorkItemCount}, " +
+ $"PendingWorkItemCount={ThreadPool.PendingWorkItemCount}, AvailableWorkerThreads={availableWorker}, " +
+ $"AvailableCompletionPortThreads={availableIo}, ClientId={ClientId}");
+ }
+
private void ScheduleWithFixedDelay(Action action, TimeSpan delay, TimeSpan period, CancellationToken token)
{
Task.Run(async () =>