You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/06/21 09:05:18 UTC

[rocketmq-client-csharp] 04/04: Make Shutdown async

This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch observability
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-csharp.git

commit a6b1ee9e0525e004a01fa78148da8cf13f3526a5
Author: Li Zhanhui <li...@gmail.com>
AuthorDate: Tue Jun 21 17:04:44 2022 +0800

    Make Shutdown async
---
 rocketmq-client-csharp/Client.cs         |  6 +++---
 rocketmq-client-csharp/IClient.cs        |  2 +-
 rocketmq-client-csharp/IConsumer.cs      |  4 +++-
 rocketmq-client-csharp/IProducer.cs      |  2 +-
 rocketmq-client-csharp/Producer.cs       |  4 ++--
 rocketmq-client-csharp/PushConsumer.cs   |  4 ++--
 rocketmq-client-csharp/Session.cs        |  2 +-
 rocketmq-client-csharp/SimpleConsumer.cs | 13 +++++++++----
 tests/ProducerTest.cs                    |  2 +-
 tests/SimpleConsumerTest.cs              |  4 +++-
 10 files changed, 26 insertions(+), 17 deletions(-)

diff --git a/rocketmq-client-csharp/Client.cs b/rocketmq-client-csharp/Client.cs
index 607daf4..28ef42b 100644
--- a/rocketmq-client-csharp/Client.cs
+++ b/rocketmq-client-csharp/Client.cs
@@ -75,11 +75,11 @@ namespace Org.Apache.Rocketmq
             }, 30, _updateTopicRouteCts.Token);
         }
 
-        public virtual void Shutdown()
+        public virtual async Task Shutdown()
         {
             Logger.Info($"Shutdown client[resource-namespace={_resourceNamespace}");
             _updateTopicRouteCts.Cancel();
-            Manager.Shutdown().GetAwaiter().GetResult();
+            await Manager.Shutdown();
         }
 
         protected string FilterBroker(Func<string, bool> acceptor)
@@ -398,7 +398,7 @@ namespace Org.Apache.Rocketmq
             return true;
         }
 
-        public virtual void OnReceive(rmq::Settings settings)
+        public virtual void OnSettingsReceived(rmq::Settings settings)
         {
             if (null != settings.Metric)
             {
diff --git a/rocketmq-client-csharp/IClient.cs b/rocketmq-client-csharp/IClient.cs
index 4b7206b..461c452 100644
--- a/rocketmq-client-csharp/IClient.cs
+++ b/rocketmq-client-csharp/IClient.cs
@@ -31,6 +31,6 @@ namespace Org.Apache.Rocketmq
         void BuildClientSetting(rmq::Settings settings);
 
 
-        void OnReceive(rmq::Settings settings);
+        void OnSettingsReceived(rmq::Settings settings);
     }
 }
\ No newline at end of file
diff --git a/rocketmq-client-csharp/IConsumer.cs b/rocketmq-client-csharp/IConsumer.cs
index ac4d787..de27f1f 100644
--- a/rocketmq-client-csharp/IConsumer.cs
+++ b/rocketmq-client-csharp/IConsumer.cs
@@ -14,12 +14,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
+using System.Threading.Tasks;
 namespace Org.Apache.Rocketmq
 {
     public interface IConsumer
     {
         void Start();
 
-        void Shutdown();
+        Task Shutdown();
     }
 }
\ No newline at end of file
diff --git a/rocketmq-client-csharp/IProducer.cs b/rocketmq-client-csharp/IProducer.cs
index 9c30c6c..088df5e 100644
--- a/rocketmq-client-csharp/IProducer.cs
+++ b/rocketmq-client-csharp/IProducer.cs
@@ -23,7 +23,7 @@ namespace Org.Apache.Rocketmq
     {
         void Start();
 
-        void Shutdown();
+        Task Shutdown();
 
         Task<SendReceipt> Send(Message message);
     }
diff --git a/rocketmq-client-csharp/Producer.cs b/rocketmq-client-csharp/Producer.cs
index 32606ae..4a39f33 100644
--- a/rocketmq-client-csharp/Producer.cs
+++ b/rocketmq-client-csharp/Producer.cs
@@ -39,10 +39,10 @@ namespace Org.Apache.Rocketmq
             // More initialization
         }
 
-        public override void Shutdown()
+        public override async Task Shutdown()
         {
             // Release local resources
-            base.Shutdown();
+            await base.Shutdown();
         }
 
         protected override void PrepareHeartbeatData(rmq::HeartbeatRequest request)
diff --git a/rocketmq-client-csharp/PushConsumer.cs b/rocketmq-client-csharp/PushConsumer.cs
index 909e7a2..3b37950 100644
--- a/rocketmq-client-csharp/PushConsumer.cs
+++ b/rocketmq-client-csharp/PushConsumer.cs
@@ -66,13 +66,13 @@ namespace Org.Apache.Rocketmq
             }, 10, _scanExpiredProcessQueueCTS.Token);
         }
 
-        public override void Shutdown()
+        public override async Task Shutdown()
         {
             _scanAssignmentCTS.Cancel();
             _scanExpiredProcessQueueCTS.Cancel();
 
             // Shutdown resources of derived class
-            base.Shutdown();
+            await base.Shutdown();
         }
 
         private async Task scanLoadAssignments()
diff --git a/rocketmq-client-csharp/Session.cs b/rocketmq-client-csharp/Session.cs
index f5e7795..51eb09c 100644
--- a/rocketmq-client-csharp/Session.cs
+++ b/rocketmq-client-csharp/Session.cs
@@ -61,7 +61,7 @@ namespace Org.Apache.Rocketmq
                         case rmq::TelemetryCommand.CommandOneofCase.Settings:
                             {
                                 Logger.Info($"Received settings from server {cmd.Settings.ToString()}");
-                                _client.OnReceive(cmd.Settings);
+                                _client.OnSettingsReceived(cmd.Settings);
                                 break;
                             }
                         case rmq::TelemetryCommand.CommandOneofCase.PrintThreadStackTraceCommand:
diff --git a/rocketmq-client-csharp/SimpleConsumer.cs b/rocketmq-client-csharp/SimpleConsumer.cs
index 9eaf365..afd447a 100644
--- a/rocketmq-client-csharp/SimpleConsumer.cs
+++ b/rocketmq-client-csharp/SimpleConsumer.cs
@@ -17,6 +17,7 @@
 
 using rmq = Apache.Rocketmq.V2;
 using NLog;
+using System.Threading.Tasks;
 using System.Collections.Generic;
 using System.Collections.Concurrent;
 using Apache.Rocketmq.V2;
@@ -57,9 +58,13 @@ namespace Org.Apache.Rocketmq
             base.createSession(_accessPoint.TargetUrl());
         }
 
-        public override void Shutdown()
+        public override async Task Shutdown()
         {
-            base.Shutdown();
+            await base.Shutdown();
+            if (!await NotifyClientTermination())
+            {
+                Logger.Warn("Failed to NotifyClientTermination");
+            }
         }
 
         protected override void PrepareHeartbeatData(rmq::HeartbeatRequest request)
@@ -82,9 +87,9 @@ namespace Org.Apache.Rocketmq
             subscriptions_.AddOrUpdate(topic, entry, (k, prev) => { return entry; });
         }
 
-        public override void OnReceive(Settings settings)
+        public override void OnSettingsReceived(Settings settings)
         {
-            base.OnReceive(settings);
+            base.OnSettingsReceived(settings);
 
             if (settings.Subscription.Fifo)
             {
diff --git a/tests/ProducerTest.cs b/tests/ProducerTest.cs
index a6746ff..baeca17 100644
--- a/tests/ProducerTest.cs
+++ b/tests/ProducerTest.cs
@@ -52,7 +52,7 @@ namespace Org.Apache.Rocketmq
             var msg = new Message(topic, body);
             var sendResult = await producer.Send(msg);
             Assert.IsNotNull(sendResult);
-            producer.Shutdown();
+            await producer.Shutdown();
         }
 
         private static string resourceNamespace = "";
diff --git a/tests/SimpleConsumerTest.cs b/tests/SimpleConsumerTest.cs
index 1bc1a45..29f155f 100644
--- a/tests/SimpleConsumerTest.cs
+++ b/tests/SimpleConsumerTest.cs
@@ -17,6 +17,7 @@
 using System.Threading;
 using Microsoft.VisualStudio.TestTools.UnitTesting;
 using rmq = Apache.Rocketmq.V2;
+using System.Threading.Tasks;
 
 namespace Org.Apache.Rocketmq
 {
@@ -26,7 +27,7 @@ namespace Org.Apache.Rocketmq
     {
 
         [TestMethod]
-        public void TestStart()
+        public async Task TestStart()
         {
             var accessPoint = new AccessPoint();
             var host = "11.166.42.94";
@@ -41,6 +42,7 @@ namespace Org.Apache.Rocketmq
             simpleConsumer.Subscribe(topic, rmq::FilterType.Tag, "*");
             simpleConsumer.Start();
             Thread.Sleep(10_000);
+            await simpleConsumer.Shutdown();
         }
 
     }