You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/05/31 07:43:58 UTC
[rocketmq-client-csharp] branch develop updated: WIP: debug telemetry bi-direction streaming
This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-csharp.git
The following commit(s) were added to refs/heads/develop by this push:
new a5561e7 WIP: debug telemetry bi-direction streaming
a5561e7 is described below
commit a5561e737fde06ac04370cb5faeac2ea626f69b1
Author: Li Zhanhui <li...@gmail.com>
AuthorDate: Tue May 31 15:43:48 2022 +0800
WIP: debug telemetry bi-direction streaming
---
rocketmq-client-csharp/IRpcClient.cs | 2 +
rocketmq-client-csharp/Producer.cs | 2 +-
.../Protos/apache/rocketmq/v2/service.proto | 3 +
rocketmq-client-csharp/RpcClient.cs | 7 ++
tests/RpcClientTest.cs | 105 +++++++++++++++++++++
5 files changed, 118 insertions(+), 1 deletion(-)
diff --git a/rocketmq-client-csharp/IRpcClient.cs b/rocketmq-client-csharp/IRpcClient.cs
index 115b3d6..146d4f7 100644
--- a/rocketmq-client-csharp/IRpcClient.cs
+++ b/rocketmq-client-csharp/IRpcClient.cs
@@ -25,6 +25,8 @@ namespace Org.Apache.Rocketmq
{
public interface IRpcClient
{
+ AsyncDuplexStreamingCall<TelemetryCommand, TelemetryCommand> Telemetry(Metadata metadata);
+
Task<QueryRouteResponse> QueryRoute(Metadata metadata, QueryRouteRequest request, TimeSpan timeout);
Task<HeartbeatResponse> Heartbeat(Metadata metadata, HeartbeatRequest request, TimeSpan timeout);
diff --git a/rocketmq-client-csharp/Producer.cs b/rocketmq-client-csharp/Producer.cs
index b2248ae..118a81c 100644
--- a/rocketmq-client-csharp/Producer.cs
+++ b/rocketmq-client-csharp/Producer.cs
@@ -36,7 +36,7 @@ namespace Org.Apache.Rocketmq
public override void Start()
{
base.Start();
- // More initalization
+ // More initialization
}
public override void Shutdown()
diff --git a/rocketmq-client-csharp/Protos/apache/rocketmq/v2/service.proto b/rocketmq-client-csharp/Protos/apache/rocketmq/v2/service.proto
index ad6de07..c7ce2e9 100644
--- a/rocketmq-client-csharp/Protos/apache/rocketmq/v2/service.proto
+++ b/rocketmq-client-csharp/Protos/apache/rocketmq/v2/service.proto
@@ -16,6 +16,7 @@
syntax = "proto3";
import "google/protobuf/duration.proto";
+import "google/protobuf/timestamp.proto";
import "apache/rocketmq/v2/definition.proto";
@@ -101,6 +102,8 @@ message ReceiveMessageResponse {
oneof content {
Status status = 1;
Message message = 2;
+ // The timestamp that brokers start to deliver status line or message.
+ google.protobuf.Timestamp delivery_timestamp = 3;
}
}
diff --git a/rocketmq-client-csharp/RpcClient.cs b/rocketmq-client-csharp/RpcClient.cs
index 3405c0e..f56a07f 100644
--- a/rocketmq-client-csharp/RpcClient.cs
+++ b/rocketmq-client-csharp/RpcClient.cs
@@ -72,6 +72,13 @@ namespace Org.Apache.Rocketmq
return handler;
}
+ public AsyncDuplexStreamingCall<rmq::TelemetryCommand, rmq::TelemetryCommand> Telemetry(Metadata metadata)
+ {
+ var deadline = DateTime.UtcNow.Add(TimeSpan.FromSeconds(3));
+ var callOptions = new CallOptions(metadata, deadline);
+ return _stub.Telemetry(callOptions);
+ }
+
public async Task<rmq::QueryRouteResponse> QueryRoute(Metadata metadata, rmq::QueryRouteRequest request, TimeSpan timeout)
{
var deadline = DateTime.UtcNow.Add(timeout);
diff --git a/tests/RpcClientTest.cs b/tests/RpcClientTest.cs
new file mode 100644
index 0000000..d6cdc91
--- /dev/null
+++ b/tests/RpcClientTest.cs
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using System;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+using System.Collections.Generic;
+using grpc = Grpc.Core;
+using rmq = Apache.Rocketmq.V2;
+
+
+namespace Org.Apache.Rocketmq
+{
+
+ [TestClass]
+ public class RpcClientTest
+ {
+
+ [TestMethod]
+ public async Task testTelemetry()
+ {
+ Console.WriteLine("Test Telemetry streaming");
+ string target = "https://11.166.42.94:8081";
+ var rpc_client = new RpcClient(target);
+ var client_config = new ClientConfig();
+ var metadata = new grpc::Metadata();
+ Signature.sign(client_config, metadata);
+
+ var cmd = new rmq::TelemetryCommand();
+ cmd.Settings = new rmq::Settings();
+ cmd.Settings.ClientType = rmq::ClientType.Producer;
+ cmd.Settings.AccessPoint = new rmq::Endpoints();
+ cmd.Settings.AccessPoint.Scheme = rmq::AddressScheme.Ipv4;
+ var address = new rmq::Address();
+ address.Port = 8081;
+ address.Host = "11.166.42.94";
+ cmd.Settings.AccessPoint.Addresses.Add(address);
+ cmd.Settings.Publishing = new rmq::Publishing();
+ var topic = new rmq::Resource();
+ topic.Name = "cpp_sdk_standard";
+ cmd.Settings.Publishing.Topics.Add(topic);
+ cmd.Settings.UserAgent = new rmq::UA();
+ cmd.Settings.UserAgent.Language = rmq::Language.DotNet;
+ cmd.Settings.UserAgent.Version = "1.0";
+ cmd.Settings.UserAgent.Hostname = System.Net.Dns.GetHostName();
+ cmd.Settings.UserAgent.Platform = System.Environment.OSVersion.ToString();
+
+ var duplexStreaming = rpc_client.Telemetry(metadata);
+ var reader = duplexStreaming.ResponseStream;
+ var writer = duplexStreaming.RequestStream;
+
+ var cts = new CancellationTokenSource();
+ await writer.WriteAsync(cmd);
+ Console.WriteLine("Command written");
+ if (await reader.MoveNext(cts.Token))
+ {
+ // var cmd = reader.Current;
+ Console.WriteLine("Server responded");
+ }
+ else
+ {
+ Console.WriteLine("Server is not responding");
+ }
+
+ }
+
+ [TestMethod]
+ public void testQueryRoute()
+ {
+ string target = "https://11.166.42.94:8081";
+ var rpc_client = new RpcClient(target);
+ var client_config = new ClientConfig();
+ var metadata = new grpc::Metadata();
+ Signature.sign(client_config, metadata);
+ var request = new rmq::QueryRouteRequest();
+ request.Topic = new rmq::Resource();
+ request.Topic.Name = "cpp_sdk_standard";
+ request.Endpoints = new rmq::Endpoints();
+ request.Endpoints.Scheme = rmq::AddressScheme.Ipv4;
+ var address = new rmq::Address();
+ address.Port = 8081;
+ address.Host = "11.166.42.94";
+ request.Endpoints.Addresses.Add(address);
+ var response = rpc_client.QueryRoute(metadata, request, client_config.getIoTimeout());
+ var result = response.GetAwaiter().GetResult();
+ }
+
+ }
+
+}
\ No newline at end of file