You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/05/31 07:43:58 UTC

[rocketmq-client-csharp] branch develop updated: WIP: debug telemetry bi-direction streaming

This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-csharp.git


The following commit(s) were added to refs/heads/develop by this push:
     new a5561e7  WIP: debug telemetry bi-direction streaming
a5561e7 is described below

commit a5561e737fde06ac04370cb5faeac2ea626f69b1
Author: Li Zhanhui <li...@gmail.com>
AuthorDate: Tue May 31 15:43:48 2022 +0800

    WIP: debug telemetry bi-direction streaming
---
 rocketmq-client-csharp/IRpcClient.cs               |   2 +
 rocketmq-client-csharp/Producer.cs                 |   2 +-
 .../Protos/apache/rocketmq/v2/service.proto        |   3 +
 rocketmq-client-csharp/RpcClient.cs                |   7 ++
 tests/RpcClientTest.cs                             | 105 +++++++++++++++++++++
 5 files changed, 118 insertions(+), 1 deletion(-)

diff --git a/rocketmq-client-csharp/IRpcClient.cs b/rocketmq-client-csharp/IRpcClient.cs
index 115b3d6..146d4f7 100644
--- a/rocketmq-client-csharp/IRpcClient.cs
+++ b/rocketmq-client-csharp/IRpcClient.cs
@@ -25,6 +25,8 @@ namespace Org.Apache.Rocketmq
 {
     public interface IRpcClient
     {
+        AsyncDuplexStreamingCall<TelemetryCommand, TelemetryCommand> Telemetry(Metadata metadata);
+
         Task<QueryRouteResponse> QueryRoute(Metadata metadata, QueryRouteRequest request, TimeSpan timeout);
 
         Task<HeartbeatResponse> Heartbeat(Metadata metadata, HeartbeatRequest request, TimeSpan timeout);
diff --git a/rocketmq-client-csharp/Producer.cs b/rocketmq-client-csharp/Producer.cs
index b2248ae..118a81c 100644
--- a/rocketmq-client-csharp/Producer.cs
+++ b/rocketmq-client-csharp/Producer.cs
@@ -36,7 +36,7 @@ namespace Org.Apache.Rocketmq
         public override void Start()
         {
             base.Start();
-            // More initalization
+            // More initialization
         }
 
         public override void Shutdown()
diff --git a/rocketmq-client-csharp/Protos/apache/rocketmq/v2/service.proto b/rocketmq-client-csharp/Protos/apache/rocketmq/v2/service.proto
index ad6de07..c7ce2e9 100644
--- a/rocketmq-client-csharp/Protos/apache/rocketmq/v2/service.proto
+++ b/rocketmq-client-csharp/Protos/apache/rocketmq/v2/service.proto
@@ -16,6 +16,7 @@
 syntax = "proto3";
 
 import "google/protobuf/duration.proto";
+import "google/protobuf/timestamp.proto";
 
 import "apache/rocketmq/v2/definition.proto";
 
@@ -101,6 +102,8 @@ message ReceiveMessageResponse {
   oneof content {
     Status status = 1;
     Message message = 2;
+    // The timestamp that brokers start to deliver status line or message.
+    google.protobuf.Timestamp delivery_timestamp = 3;
   }
 }
 
diff --git a/rocketmq-client-csharp/RpcClient.cs b/rocketmq-client-csharp/RpcClient.cs
index 3405c0e..f56a07f 100644
--- a/rocketmq-client-csharp/RpcClient.cs
+++ b/rocketmq-client-csharp/RpcClient.cs
@@ -72,6 +72,13 @@ namespace Org.Apache.Rocketmq
             return handler;
         }
 
+        public AsyncDuplexStreamingCall<rmq::TelemetryCommand, rmq::TelemetryCommand> Telemetry(Metadata metadata)
+        {
+            var deadline = DateTime.UtcNow.Add(TimeSpan.FromSeconds(3));
+            var callOptions = new CallOptions(metadata, deadline);
+            return _stub.Telemetry(callOptions);
+        }
+
         public async Task<rmq::QueryRouteResponse> QueryRoute(Metadata metadata, rmq::QueryRouteRequest request, TimeSpan timeout)
         {
             var deadline = DateTime.UtcNow.Add(timeout);
diff --git a/tests/RpcClientTest.cs b/tests/RpcClientTest.cs
new file mode 100644
index 0000000..d6cdc91
--- /dev/null
+++ b/tests/RpcClientTest.cs
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using System;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+using System.Collections.Generic;
+using grpc = Grpc.Core;
+using rmq = Apache.Rocketmq.V2;
+
+
+namespace Org.Apache.Rocketmq
+{
+
+    [TestClass]
+    public class RpcClientTest
+    {
+
+        [TestMethod]
+        public async Task testTelemetry()
+        {
+            Console.WriteLine("Test Telemetry streaming");
+            string target = "https://11.166.42.94:8081";
+            var rpc_client = new RpcClient(target);
+            var client_config = new ClientConfig();
+            var metadata = new grpc::Metadata();
+            Signature.sign(client_config, metadata);
+
+            var cmd = new rmq::TelemetryCommand();
+            cmd.Settings = new rmq::Settings();
+            cmd.Settings.ClientType = rmq::ClientType.Producer;
+            cmd.Settings.AccessPoint = new rmq::Endpoints();
+            cmd.Settings.AccessPoint.Scheme = rmq::AddressScheme.Ipv4;
+            var address = new rmq::Address();
+            address.Port = 8081;
+            address.Host = "11.166.42.94";
+            cmd.Settings.AccessPoint.Addresses.Add(address);
+            cmd.Settings.Publishing = new rmq::Publishing();
+            var topic = new rmq::Resource();
+            topic.Name = "cpp_sdk_standard";
+            cmd.Settings.Publishing.Topics.Add(topic);
+            cmd.Settings.UserAgent = new rmq::UA();
+            cmd.Settings.UserAgent.Language = rmq::Language.DotNet;
+            cmd.Settings.UserAgent.Version = "1.0";
+            cmd.Settings.UserAgent.Hostname = System.Net.Dns.GetHostName();
+            cmd.Settings.UserAgent.Platform = System.Environment.OSVersion.ToString();
+
+            var duplexStreaming = rpc_client.Telemetry(metadata);
+            var reader = duplexStreaming.ResponseStream;
+            var writer = duplexStreaming.RequestStream;
+
+            var cts = new CancellationTokenSource();
+            await writer.WriteAsync(cmd);
+            Console.WriteLine("Command written");
+            if (await reader.MoveNext(cts.Token))
+            {
+                // var cmd = reader.Current;
+                Console.WriteLine("Server responded");
+            }
+            else
+            {
+                Console.WriteLine("Server is not responding");
+            }
+
+        }
+
+        [TestMethod]
+        public void testQueryRoute()
+        {
+            string target = "https://11.166.42.94:8081";
+            var rpc_client = new RpcClient(target);
+            var client_config = new ClientConfig();
+            var metadata = new grpc::Metadata();
+            Signature.sign(client_config, metadata);
+            var request = new rmq::QueryRouteRequest();
+            request.Topic = new rmq::Resource();
+            request.Topic.Name = "cpp_sdk_standard";
+            request.Endpoints = new rmq::Endpoints();
+            request.Endpoints.Scheme = rmq::AddressScheme.Ipv4;
+            var address = new rmq::Address();
+            address.Port = 8081;
+            address.Host = "11.166.42.94";
+            request.Endpoints.Addresses.Add(address);
+            var response = rpc_client.QueryRoute(metadata, request, client_config.getIoTimeout());
+            var result = response.GetAwaiter().GetResult();
+        }
+
+    }
+
+}
\ No newline at end of file