You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/06/27 07:39:33 UTC
[rocketmq-client-csharp] branch observability updated: WIP: prepare to add unit test for SimpleConsumer.Receive
This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch observability
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-csharp.git
The following commit(s) were added to refs/heads/observability by this push:
new 49a32f0 WIP: prepare to add unit test for SimpleConsumer.Receive
49a32f0 is described below
commit 49a32f0cf754383c6390b9fab32fbd5a55ec2502
Author: Li Zhanhui <li...@gmail.com>
AuthorDate: Mon Jun 27 15:39:24 2022 +0800
WIP: prepare to add unit test for SimpleConsumer.Receive
---
rocketmq-client-csharp/ClientManager.cs | 7 ++---
rocketmq-client-csharp/RpcClient.cs | 18 ++++++++----
tests/SimpleConsumerTest.cs | 51 ++++++++++++++++++++++-----------
3 files changed, 50 insertions(+), 26 deletions(-)
diff --git a/rocketmq-client-csharp/ClientManager.cs b/rocketmq-client-csharp/ClientManager.cs
index 0b03886..f786a05 100644
--- a/rocketmq-client-csharp/ClientManager.cs
+++ b/rocketmq-client-csharp/ClientManager.cs
@@ -178,7 +178,6 @@ namespace Org.Apache.Rocketmq
{
case rmq.Code.Ok:
{
-
break;
}
@@ -189,6 +188,7 @@ namespace Org.Apache.Rocketmq
}
case rmq.Code.TooManyRequests:
{
+ Logger.Warn("TooManyRequest: servers throttled");
break;
}
}
@@ -205,9 +205,8 @@ namespace Org.Apache.Rocketmq
case rmq.ReceiveMessageResponse.ContentOneofCase.DeliveryTimestamp:
{
var begin = entry.DeliveryTimestamp;
- var costs = DateTime.UtcNow.Subtract(TimeSpan.FromSeconds(begin.Seconds))
- .Subtract(TimeSpan.FromMilliseconds(begin.Nanos / 1_000_000));
- Logger.Debug($"Delivery of messages from server to clients cost {costs.ToShortTimeString()}");
+ var costs = DateTime.UtcNow - begin.ToDateTime();
+ // TODO: Collect metrics
break;
}
}
diff --git a/rocketmq-client-csharp/RpcClient.cs b/rocketmq-client-csharp/RpcClient.cs
index e0a2caf..c1f1cd6 100644
--- a/rocketmq-client-csharp/RpcClient.cs
+++ b/rocketmq-client-csharp/RpcClient.cs
@@ -25,11 +25,13 @@ using rmq = Apache.Rocketmq.V2;
using Grpc.Core;
using Grpc.Core.Interceptors;
using Grpc.Net.Client;
+using NLog;
namespace Org.Apache.Rocketmq
{
public class RpcClient : IRpcClient
{
+ protected static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger();
private readonly rmq::MessagingService.MessagingServiceClient _stub;
private readonly GrpcChannel _channel;
@@ -118,21 +120,25 @@ namespace Org.Apache.Rocketmq
return await call.ResponseAsync;
}
- public async Task<List<rmq::ReceiveMessageResponse>> ReceiveMessage(Metadata metadata, rmq::ReceiveMessageRequest request,
- TimeSpan timeout)
- {
+ public async Task<List<rmq::ReceiveMessageResponse>> ReceiveMessage(Metadata metadata,
+ rmq::ReceiveMessageRequest request, TimeSpan timeout) {
var deadline = DateTime.UtcNow.Add(timeout);
var callOptions = new CallOptions(metadata, deadline);
var call = _stub.ReceiveMessage(request, callOptions);
var result = new List<rmq::ReceiveMessageResponse>();
- while (await call.ResponseStream.MoveNext())
+ var stream = call.ResponseStream;
+ while (await stream.MoveNext())
{
- result.Add(call.ResponseStream.Current);
+ var entry = stream.Current;
+ Logger.Debug($"Got ReceiveMessageResponse {entry}");
+ result.Add(entry);
}
+ Logger.Debug($"Receiving of messages completed");
return result;
}
- public async Task<rmq::AckMessageResponse> AckMessage(Metadata metadata, rmq::AckMessageRequest request, TimeSpan timeout)
+ public async Task<rmq::AckMessageResponse> AckMessage(Metadata metadata, rmq::AckMessageRequest request,
+ TimeSpan timeout)
{
var deadline = DateTime.UtcNow.Add(timeout);
var callOptions = new CallOptions(metadata, deadline);
diff --git a/tests/SimpleConsumerTest.cs b/tests/SimpleConsumerTest.cs
index 70bcdd3..ee19c5e 100644
--- a/tests/SimpleConsumerTest.cs
+++ b/tests/SimpleConsumerTest.cs
@@ -14,39 +14,58 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
+using System;
using System.Threading;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using rmq = Apache.Rocketmq.V2;
using System.Threading.Tasks;
+using Org.Apache.Rocketmq;
-namespace Org.Apache.Rocketmq
+namespace tests
{
[TestClass]
public class SimpleConsumerTest
{
+ private static AccessPoint accessPoint;
+ private static string _resourceNamespace = "";
+ private static string _group = "GID_cpp_sdk_standard";
+ private static string _topic = "cpp_sdk_standard";
+
+
+ [ClassInitialize]
+ public static void SetUp(TestContext context)
+ {
+ accessPoint = new AccessPoint
+ {
+ Host = "127.0.0.1",
+ Port = 8081
+ };
+ }
+
[TestMethod]
- public async Task TestStart()
+ public async Task TestLifecycle()
{
- var accessPoint = new AccessPoint();
- // var host = "11.166.42.94";
- var host = "127.0.0.1";
- var port = 8081;
- accessPoint.Host = host;
- accessPoint.Port = port;
- var resourceNamespace = "";
- var group = "GID_cpp_sdk_standard";
- var topic = "cpp_sdk_standard";
-
- var simpleConsumer = new SimpleConsumer(accessPoint, resourceNamespace, group);
- simpleConsumer.Subscribe(topic, rmq::FilterType.Tag, "*");
+ var simpleConsumer = new SimpleConsumer(accessPoint, _resourceNamespace, _group);
+ simpleConsumer.Subscribe(_topic, rmq::FilterType.Tag, "*");
await simpleConsumer.Start();
Thread.Sleep(1_000);
await simpleConsumer.Shutdown();
}
+ [TestMethod]
+ public async Task TestReceive()
+ {
+ var simpleConsumer = new SimpleConsumer(accessPoint, _resourceNamespace, _group);
+ simpleConsumer.Subscribe(_topic, rmq::FilterType.Tag, "*");
+ await simpleConsumer.Start();
+ var batchSize = 32;
+ var receiveTimeout = TimeSpan.FromSeconds(10);
+ var messages = await simpleConsumer.Receive(batchSize, receiveTimeout);
+ Console.WriteLine($"{messages}");
+ await simpleConsumer.Shutdown();
+ }
}
-
-
}
\ No newline at end of file