You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/06/27 07:39:33 UTC

[rocketmq-client-csharp] branch observability updated: WIP: prepare to add unit test for SimpleConsumer.Receive

This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch observability
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-csharp.git


The following commit(s) were added to refs/heads/observability by this push:
     new 49a32f0  WIP: prepare to add unit test for SimpleConsumer.Receive
49a32f0 is described below

commit 49a32f0cf754383c6390b9fab32fbd5a55ec2502
Author: Li Zhanhui <li...@gmail.com>
AuthorDate: Mon Jun 27 15:39:24 2022 +0800

    WIP: prepare to add unit test for SimpleConsumer.Receive
---
 rocketmq-client-csharp/ClientManager.cs |  7 ++---
 rocketmq-client-csharp/RpcClient.cs     | 18 ++++++++----
 tests/SimpleConsumerTest.cs             | 51 ++++++++++++++++++++++-----------
 3 files changed, 50 insertions(+), 26 deletions(-)

diff --git a/rocketmq-client-csharp/ClientManager.cs b/rocketmq-client-csharp/ClientManager.cs
index 0b03886..f786a05 100644
--- a/rocketmq-client-csharp/ClientManager.cs
+++ b/rocketmq-client-csharp/ClientManager.cs
@@ -178,7 +178,6 @@ namespace Org.Apache.Rocketmq
                         {
                             case rmq.Code.Ok:
                             {
-
                                 break;
                             }
 
@@ -189,6 +188,7 @@ namespace Org.Apache.Rocketmq
                             }
                             case rmq.Code.TooManyRequests:
                             {
+                                Logger.Warn("TooManyRequest: servers throttled");
                                 break;
                             }
                         }
@@ -205,9 +205,8 @@ namespace Org.Apache.Rocketmq
                     case rmq.ReceiveMessageResponse.ContentOneofCase.DeliveryTimestamp:
                     {
                         var begin = entry.DeliveryTimestamp;
-                        var costs = DateTime.UtcNow.Subtract(TimeSpan.FromSeconds(begin.Seconds))
-                            .Subtract(TimeSpan.FromMilliseconds(begin.Nanos / 1_000_000));
-                        Logger.Debug($"Delivery of messages from server to clients cost {costs.ToShortTimeString()}");
+                        var costs = DateTime.UtcNow - begin.ToDateTime();
+                        // TODO: Collect metrics
                         break;
                     }
                 }
diff --git a/rocketmq-client-csharp/RpcClient.cs b/rocketmq-client-csharp/RpcClient.cs
index e0a2caf..c1f1cd6 100644
--- a/rocketmq-client-csharp/RpcClient.cs
+++ b/rocketmq-client-csharp/RpcClient.cs
@@ -25,11 +25,13 @@ using rmq = Apache.Rocketmq.V2;
 using Grpc.Core;
 using Grpc.Core.Interceptors;
 using Grpc.Net.Client;
+using NLog;
 
 namespace Org.Apache.Rocketmq
 {
     public class RpcClient : IRpcClient
     {
+        protected static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger();
         private readonly rmq::MessagingService.MessagingServiceClient _stub;
         private readonly GrpcChannel _channel;
 
@@ -118,21 +120,25 @@ namespace Org.Apache.Rocketmq
             return await call.ResponseAsync;
         }
 
-        public async Task<List<rmq::ReceiveMessageResponse>> ReceiveMessage(Metadata metadata, rmq::ReceiveMessageRequest request,
-            TimeSpan timeout)
-        {
+        public async Task<List<rmq::ReceiveMessageResponse>> ReceiveMessage(Metadata metadata, 
+            rmq::ReceiveMessageRequest request, TimeSpan timeout) {
             var deadline = DateTime.UtcNow.Add(timeout);
             var callOptions = new CallOptions(metadata, deadline);
             var call = _stub.ReceiveMessage(request, callOptions);
             var result = new List<rmq::ReceiveMessageResponse>();
-            while (await call.ResponseStream.MoveNext())
+            var stream = call.ResponseStream;
+            while (await stream.MoveNext())
             {
-                result.Add(call.ResponseStream.Current);
+                var entry = stream.Current;
+                Logger.Debug($"Got ReceiveMessageResponse {entry}");
+                result.Add(entry);
             }
+            Logger.Debug($"Receiving of messages completed");
             return result;
         }
 
-        public async Task<rmq::AckMessageResponse> AckMessage(Metadata metadata, rmq::AckMessageRequest request, TimeSpan timeout)
+        public async Task<rmq::AckMessageResponse> AckMessage(Metadata metadata, rmq::AckMessageRequest request,
+            TimeSpan timeout)
         {
             var deadline = DateTime.UtcNow.Add(timeout);
             var callOptions = new CallOptions(metadata, deadline);
diff --git a/tests/SimpleConsumerTest.cs b/tests/SimpleConsumerTest.cs
index 70bcdd3..ee19c5e 100644
--- a/tests/SimpleConsumerTest.cs
+++ b/tests/SimpleConsumerTest.cs
@@ -14,39 +14,58 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
+using System;
 using System.Threading;
 using Microsoft.VisualStudio.TestTools.UnitTesting;
 using rmq = Apache.Rocketmq.V2;
 using System.Threading.Tasks;
+using Org.Apache.Rocketmq;
 
-namespace Org.Apache.Rocketmq
+namespace tests
 {
 
     [TestClass]
     public class SimpleConsumerTest
     {
 
+        private static AccessPoint accessPoint;
+        private static string _resourceNamespace = "";
+        private static string _group = "GID_cpp_sdk_standard";
+        private static string _topic = "cpp_sdk_standard";
+
+
+        [ClassInitialize]
+        public static void SetUp(TestContext context)
+        {
+            accessPoint = new AccessPoint
+            {
+                Host = "127.0.0.1",
+                Port = 8081
+            };
+        }
+
         [TestMethod]
-        public async Task TestStart()
+        public async Task TestLifecycle()
         {
-            var accessPoint = new AccessPoint();
-            // var host = "11.166.42.94";
-            var host = "127.0.0.1";
-            var port = 8081;
-            accessPoint.Host = host;
-            accessPoint.Port = port;
-            var resourceNamespace = "";
-            var group = "GID_cpp_sdk_standard";
-            var topic = "cpp_sdk_standard";
-
-            var simpleConsumer = new SimpleConsumer(accessPoint, resourceNamespace, group);
-            simpleConsumer.Subscribe(topic, rmq::FilterType.Tag, "*");
+            var simpleConsumer = new SimpleConsumer(accessPoint, _resourceNamespace, _group);
+            simpleConsumer.Subscribe(_topic, rmq::FilterType.Tag, "*");
             await simpleConsumer.Start();
             Thread.Sleep(1_000);
             await simpleConsumer.Shutdown();
         }
 
+        [TestMethod]
+        public async Task TestReceive()
+        {
+            var simpleConsumer = new SimpleConsumer(accessPoint, _resourceNamespace, _group);
+            simpleConsumer.Subscribe(_topic, rmq::FilterType.Tag, "*");
+            await simpleConsumer.Start();
+            var batchSize = 32;
+            var receiveTimeout = TimeSpan.FromSeconds(10);
+            var messages  = await simpleConsumer.Receive(batchSize, receiveTimeout);
+            Console.WriteLine($"{messages}");
+            await simpleConsumer.Shutdown();
+        }
     }
-
-
 }
\ No newline at end of file