You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/06/21 09:05:18 UTC
[rocketmq-client-csharp] 04/04: Make Shutdown async
This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch observability
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-csharp.git
commit a6b1ee9e0525e004a01fa78148da8cf13f3526a5
Author: Li Zhanhui <li...@gmail.com>
AuthorDate: Tue Jun 21 17:04:44 2022 +0800
Make Shutdown async
---
rocketmq-client-csharp/Client.cs | 6 +++---
rocketmq-client-csharp/IClient.cs | 2 +-
rocketmq-client-csharp/IConsumer.cs | 4 +++-
rocketmq-client-csharp/IProducer.cs | 2 +-
rocketmq-client-csharp/Producer.cs | 4 ++--
rocketmq-client-csharp/PushConsumer.cs | 4 ++--
rocketmq-client-csharp/Session.cs | 2 +-
rocketmq-client-csharp/SimpleConsumer.cs | 13 +++++++++----
tests/ProducerTest.cs | 2 +-
tests/SimpleConsumerTest.cs | 4 +++-
10 files changed, 26 insertions(+), 17 deletions(-)
diff --git a/rocketmq-client-csharp/Client.cs b/rocketmq-client-csharp/Client.cs
index 607daf4..28ef42b 100644
--- a/rocketmq-client-csharp/Client.cs
+++ b/rocketmq-client-csharp/Client.cs
@@ -75,11 +75,11 @@ namespace Org.Apache.Rocketmq
}, 30, _updateTopicRouteCts.Token);
}
- public virtual void Shutdown()
+ public virtual async Task Shutdown()
{
Logger.Info($"Shutdown client[resource-namespace={_resourceNamespace}");
_updateTopicRouteCts.Cancel();
- Manager.Shutdown().GetAwaiter().GetResult();
+ await Manager.Shutdown();
}
protected string FilterBroker(Func<string, bool> acceptor)
@@ -398,7 +398,7 @@ namespace Org.Apache.Rocketmq
return true;
}
- public virtual void OnReceive(rmq::Settings settings)
+ public virtual void OnSettingsReceived(rmq::Settings settings)
{
if (null != settings.Metric)
{
diff --git a/rocketmq-client-csharp/IClient.cs b/rocketmq-client-csharp/IClient.cs
index 4b7206b..461c452 100644
--- a/rocketmq-client-csharp/IClient.cs
+++ b/rocketmq-client-csharp/IClient.cs
@@ -31,6 +31,6 @@ namespace Org.Apache.Rocketmq
void BuildClientSetting(rmq::Settings settings);
- void OnReceive(rmq::Settings settings);
+ void OnSettingsReceived(rmq::Settings settings);
}
}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/IConsumer.cs b/rocketmq-client-csharp/IConsumer.cs
index ac4d787..de27f1f 100644
--- a/rocketmq-client-csharp/IConsumer.cs
+++ b/rocketmq-client-csharp/IConsumer.cs
@@ -14,12 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
+using System.Threading.Tasks;
namespace Org.Apache.Rocketmq
{
public interface IConsumer
{
void Start();
- void Shutdown();
+ Task Shutdown();
}
}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/IProducer.cs b/rocketmq-client-csharp/IProducer.cs
index 9c30c6c..088df5e 100644
--- a/rocketmq-client-csharp/IProducer.cs
+++ b/rocketmq-client-csharp/IProducer.cs
@@ -23,7 +23,7 @@ namespace Org.Apache.Rocketmq
{
void Start();
- void Shutdown();
+ Task Shutdown();
Task<SendReceipt> Send(Message message);
}
diff --git a/rocketmq-client-csharp/Producer.cs b/rocketmq-client-csharp/Producer.cs
index 32606ae..4a39f33 100644
--- a/rocketmq-client-csharp/Producer.cs
+++ b/rocketmq-client-csharp/Producer.cs
@@ -39,10 +39,10 @@ namespace Org.Apache.Rocketmq
// More initialization
}
- public override void Shutdown()
+ public override async Task Shutdown()
{
// Release local resources
- base.Shutdown();
+ await base.Shutdown();
}
protected override void PrepareHeartbeatData(rmq::HeartbeatRequest request)
diff --git a/rocketmq-client-csharp/PushConsumer.cs b/rocketmq-client-csharp/PushConsumer.cs
index 909e7a2..3b37950 100644
--- a/rocketmq-client-csharp/PushConsumer.cs
+++ b/rocketmq-client-csharp/PushConsumer.cs
@@ -66,13 +66,13 @@ namespace Org.Apache.Rocketmq
}, 10, _scanExpiredProcessQueueCTS.Token);
}
- public override void Shutdown()
+ public override async Task Shutdown()
{
_scanAssignmentCTS.Cancel();
_scanExpiredProcessQueueCTS.Cancel();
// Shutdown resources of derived class
- base.Shutdown();
+ await base.Shutdown();
}
private async Task scanLoadAssignments()
diff --git a/rocketmq-client-csharp/Session.cs b/rocketmq-client-csharp/Session.cs
index f5e7795..51eb09c 100644
--- a/rocketmq-client-csharp/Session.cs
+++ b/rocketmq-client-csharp/Session.cs
@@ -61,7 +61,7 @@ namespace Org.Apache.Rocketmq
case rmq::TelemetryCommand.CommandOneofCase.Settings:
{
Logger.Info($"Received settings from server {cmd.Settings.ToString()}");
- _client.OnReceive(cmd.Settings);
+ _client.OnSettingsReceived(cmd.Settings);
break;
}
case rmq::TelemetryCommand.CommandOneofCase.PrintThreadStackTraceCommand:
diff --git a/rocketmq-client-csharp/SimpleConsumer.cs b/rocketmq-client-csharp/SimpleConsumer.cs
index 9eaf365..afd447a 100644
--- a/rocketmq-client-csharp/SimpleConsumer.cs
+++ b/rocketmq-client-csharp/SimpleConsumer.cs
@@ -17,6 +17,7 @@
using rmq = Apache.Rocketmq.V2;
using NLog;
+using System.Threading.Tasks;
using System.Collections.Generic;
using System.Collections.Concurrent;
using Apache.Rocketmq.V2;
@@ -57,9 +58,13 @@ namespace Org.Apache.Rocketmq
base.createSession(_accessPoint.TargetUrl());
}
- public override void Shutdown()
+ public override async Task Shutdown()
{
- base.Shutdown();
+ await base.Shutdown();
+ if (!await NotifyClientTermination())
+ {
+ Logger.Warn("Failed to NotifyClientTermination");
+ }
}
protected override void PrepareHeartbeatData(rmq::HeartbeatRequest request)
@@ -82,9 +87,9 @@ namespace Org.Apache.Rocketmq
subscriptions_.AddOrUpdate(topic, entry, (k, prev) => { return entry; });
}
- public override void OnReceive(Settings settings)
+ public override void OnSettingsReceived(Settings settings)
{
- base.OnReceive(settings);
+ base.OnSettingsReceived(settings);
if (settings.Subscription.Fifo)
{
diff --git a/tests/ProducerTest.cs b/tests/ProducerTest.cs
index a6746ff..baeca17 100644
--- a/tests/ProducerTest.cs
+++ b/tests/ProducerTest.cs
@@ -52,7 +52,7 @@ namespace Org.Apache.Rocketmq
var msg = new Message(topic, body);
var sendResult = await producer.Send(msg);
Assert.IsNotNull(sendResult);
- producer.Shutdown();
+ await producer.Shutdown();
}
private static string resourceNamespace = "";
diff --git a/tests/SimpleConsumerTest.cs b/tests/SimpleConsumerTest.cs
index 1bc1a45..29f155f 100644
--- a/tests/SimpleConsumerTest.cs
+++ b/tests/SimpleConsumerTest.cs
@@ -17,6 +17,7 @@
using System.Threading;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using rmq = Apache.Rocketmq.V2;
+using System.Threading.Tasks;
namespace Org.Apache.Rocketmq
{
@@ -26,7 +27,7 @@ namespace Org.Apache.Rocketmq
{
[TestMethod]
- public void TestStart()
+ public async Task TestStart()
{
var accessPoint = new AccessPoint();
var host = "11.166.42.94";
@@ -41,6 +42,7 @@ namespace Org.Apache.Rocketmq
simpleConsumer.Subscribe(topic, rmq::FilterType.Tag, "*");
simpleConsumer.Start();
Thread.Sleep(10_000);
+ await simpleConsumer.Shutdown();
}
}