You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/06/01 03:11:02 UTC

[rocketmq-client-csharp] branch develop updated: WIP:

This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-csharp.git


The following commit(s) were added to refs/heads/develop by this push:
     new 3ada89b  WIP:
3ada89b is described below

commit 3ada89b310ffcb99c8023a99189e8f476b257b8e
Author: Li Zhanhui <li...@gmail.com>
AuthorDate: Wed Jun 1 11:10:53 2022 +0800

    WIP:
---
 rocketmq-client-csharp/Client.cs                   | 12 ++--
 rocketmq-client-csharp/ClientConfig.cs             | 67 +++++++++++++++++++---
 rocketmq-client-csharp/IClientConfig.cs            |  2 -
 rocketmq-client-csharp/Producer.cs                 |  2 +-
 .../{StaticNameServerResolver.cs => Session.cs}    | 21 +++----
 rocketmq-client-csharp/StaticNameServerResolver.cs |  1 -
 tests/RpcClientTest.cs                             | 16 ++++--
 7 files changed, 87 insertions(+), 34 deletions(-)

diff --git a/rocketmq-client-csharp/Client.cs b/rocketmq-client-csharp/Client.cs
index 297217f..50f85af 100644
--- a/rocketmq-client-csharp/Client.cs
+++ b/rocketmq-client-csharp/Client.cs
@@ -247,7 +247,7 @@ namespace Org.Apache.Rocketmq
                 TopicRouteData topicRouteData;
                 try
                 {
-                    topicRouteData = await Manager.ResolveRoute(target, metadata, request, getIoTimeout());
+                    topicRouteData = await Manager.ResolveRoute(target, metadata, request, RequestTimeout);
                     if (null != topicRouteData)
                     {
                         Logger.Debug($"Got route entries for {topic} from name server");
@@ -292,7 +292,7 @@ namespace Org.Apache.Rocketmq
             List<Task> tasks = new List<Task>();
             foreach (var endpoint in endpoints)
             {
-                tasks.Add(Manager.Heartbeat(endpoint, metadata, request, getIoTimeout()));
+                tasks.Add(Manager.Heartbeat(endpoint, metadata, request, RequestTimeout));
             }
 
             await Task.WhenAll(tasks);
@@ -325,7 +325,7 @@ namespace Org.Apache.Rocketmq
             {
                 var metadata = new grpc::Metadata();
                 Signature.sign(this, metadata);
-                return await Manager.QueryLoadAssignment(target, metadata, request, getIoTimeout());
+                return await Manager.QueryLoadAssignment(target, metadata, request, RequestTimeout);
             }
             catch (System.Exception e)
             {
@@ -377,7 +377,7 @@ namespace Org.Apache.Rocketmq
 
             var metadata = new grpc::Metadata();
             Signature.sign(this, metadata);
-            return await Manager.Ack(target, metadata, request, getIoTimeout());
+            return await Manager.Ack(target, metadata, request, RequestTimeout);
         }
 
         public async Task<Boolean> ChangeInvisibleDuration(string target, string group, string topic, string receiptHandle, String messageId)
@@ -396,7 +396,7 @@ namespace Org.Apache.Rocketmq
 
             var metadata = new grpc::Metadata();
             Signature.sign(this, metadata);
-            return await Manager.ChangeInvisibleDuration(target, metadata, request, getIoTimeout());
+            return await Manager.ChangeInvisibleDuration(target, metadata, request, RequestTimeout);
         }
 
         public async Task<bool> NotifyClientTermination()
@@ -412,7 +412,7 @@ namespace Org.Apache.Rocketmq
 
             foreach (var endpoint in endpoints)
             {
-                tasks.Add(Manager.NotifyClientTermination(endpoint, metadata, request, getIoTimeout()));
+                tasks.Add(Manager.NotifyClientTermination(endpoint, metadata, request, RequestTimeout));
             }
 
             bool[] results = await Task.WhenAll(tasks);
diff --git a/rocketmq-client-csharp/ClientConfig.cs b/rocketmq-client-csharp/ClientConfig.cs
index b708c32..dfc30c6 100644
--- a/rocketmq-client-csharp/ClientConfig.cs
+++ b/rocketmq-client-csharp/ClientConfig.cs
@@ -15,6 +15,8 @@
  * limitations under the License.
  */
 using System;
+using System.Collections.Generic;
+using rmq = Apache.Rocketmq.V2;
 
 namespace Org.Apache.Rocketmq {
 
@@ -24,8 +26,11 @@ namespace Org.Apache.Rocketmq {
             var hostName = System.Net.Dns.GetHostName();
             var pid = System.Diagnostics.Process.GetCurrentProcess().Id;
             this.clientId_ = string.Format("{0}@{1}#{2}", hostName, pid, instanceName_);
-            this._ioTimeout = TimeSpan.FromSeconds(3);
+            this._requestTimeout = TimeSpan.FromSeconds(3);
             this.longPollingIoTimeout_ = TimeSpan.FromSeconds(30);
+            this.client_type_ = rmq::ClientType.Unspecified;
+            this.access_point_ = new rmq::Endpoints();
+            this.back_off_policy_ = new rmq::RetryPolicy();
         }
 
         public string region() {
@@ -64,11 +69,16 @@ namespace Org.Apache.Rocketmq {
             set { _tenantId = value; }
         }
 
-        public TimeSpan getIoTimeout() {
-            return _ioTimeout;
-        }
-        public TimeSpan IoTimeout {
-            set { _ioTimeout = value; }
+        public TimeSpan RequestTimeout
+        {
+            get
+            {
+                return _requestTimeout;
+            }
+            set
+            {
+                _requestTimeout = value;
+            }
         }
 
         public TimeSpan getLongPollingTimeout() {
@@ -109,7 +119,7 @@ namespace Org.Apache.Rocketmq {
 
         private string _tenantId;
 
-        private TimeSpan _ioTimeout;
+        private TimeSpan _requestTimeout;
 
         private TimeSpan longPollingIoTimeout_;
 
@@ -120,6 +130,49 @@ namespace Org.Apache.Rocketmq {
         private bool tracingEnabled_ = false;
 
         private string instanceName_ = "default";
+
+        private rmq::ClientType client_type_;
+        public rmq::ClientType ClientType
+        {
+            get { return client_type_; }
+            set { client_type_ = value; }
+        }
+
+
+        private rmq::Endpoints access_point_;
+
+        public rmq::AddressScheme AccessPointAddressScheme
+        {
+            get { return access_point_.Scheme; }
+            set { access_point_.Scheme = value; }
+        }
+
+        public List<rmq::Address> AccessPointAddresses
+        {
+            get
+            {
+                List<rmq::Address> addresses = new List<rmq::Address>();
+                foreach (var item in access_point_.Addresses)
+                {
+                    addresses.Add(item);
+                }
+                return addresses;
+            }
+
+            set
+            {
+                access_point_.Addresses.Clear();
+                foreach (var item in value)
+                {
+                    access_point_.Addresses.Add(item);
+                }
+            }
+        }
+
+        private rmq::RetryPolicy back_off_policy_;
+
+
+
     }
 
 }
diff --git a/rocketmq-client-csharp/IClientConfig.cs b/rocketmq-client-csharp/IClientConfig.cs
index 86c311e..3726ac4 100644
--- a/rocketmq-client-csharp/IClientConfig.cs
+++ b/rocketmq-client-csharp/IClientConfig.cs
@@ -28,8 +28,6 @@ namespace Org.Apache.Rocketmq {
 
         string tenantId();
 
-        TimeSpan getIoTimeout();
-
         TimeSpan getLongPollingTimeout();
 
         string getGroupName();
diff --git a/rocketmq-client-csharp/Producer.cs b/rocketmq-client-csharp/Producer.cs
index 118a81c..226ed5f 100644
--- a/rocketmq-client-csharp/Producer.cs
+++ b/rocketmq-client-csharp/Producer.cs
@@ -113,7 +113,7 @@ namespace Org.Apache.Rocketmq
             {
                 try
                 {
-                    rmq::SendMessageResponse response = await Manager.SendMessage(target, metadata, request, getIoTimeout());
+                    rmq::SendMessageResponse response = await Manager.SendMessage(target, metadata, request, RequestTimeout);
                     if (null != response && rmq::Code.Ok == response.Status.Code)
                     {
 
diff --git a/rocketmq-client-csharp/StaticNameServerResolver.cs b/rocketmq-client-csharp/Session.cs
similarity index 66%
copy from rocketmq-client-csharp/StaticNameServerResolver.cs
copy to rocketmq-client-csharp/Session.cs
index 0fbf099..2cc079e 100644
--- a/rocketmq-client-csharp/StaticNameServerResolver.cs
+++ b/rocketmq-client-csharp/Session.cs
@@ -15,24 +15,19 @@
  * limitations under the License.
  */
 using System;
-using System.Collections.Generic;
-using System.Threading.Tasks;
+using System.Text;
+using grpc = global::Grpc.Core;
+using System.Security.Cryptography;
 
 namespace Org.Apache.Rocketmq
 {
-    public class StaticNameServerResolver : INameServerResolver
+
+    class Session
     {
 
-        public StaticNameServerResolver(List<string> nameServerList)
-        {
-            this.nameServerList = nameServerList;
-        }
+        public string Target { get; }
+
 
-        public async Task<List<string>> resolveAsync()
-        {
-            return nameServerList;
-        }
+    };
 
-        private List<string> nameServerList;
-    }
 }
\ No newline at end of file
diff --git a/rocketmq-client-csharp/StaticNameServerResolver.cs b/rocketmq-client-csharp/StaticNameServerResolver.cs
index 0fbf099..bc1f670 100644
--- a/rocketmq-client-csharp/StaticNameServerResolver.cs
+++ b/rocketmq-client-csharp/StaticNameServerResolver.cs
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-using System;
 using System.Collections.Generic;
 using System.Threading.Tasks;
 
diff --git a/tests/RpcClientTest.cs b/tests/RpcClientTest.cs
index 987e82e..a1ecf82 100644
--- a/tests/RpcClientTest.cs
+++ b/tests/RpcClientTest.cs
@@ -117,12 +117,12 @@ namespace Org.Apache.Rocketmq
             address.Port = 8081;
             address.Host = "11.166.42.94";
             request.Endpoints.Addresses.Add(address);
-            var response = rpc_client.QueryRoute(metadata, request, client_config.getIoTimeout());
+            var response = rpc_client.QueryRoute(metadata, request, client_config.RequestTimeout);
             var result = response.GetAwaiter().GetResult();
         }
 
         [TestMethod]
-        public void TestSend()
+        public async Task TestSend()
         {
             string target = "https://11.166.42.94:8081";
             var rpc_client = new RpcClient(target);
@@ -131,8 +131,16 @@ namespace Org.Apache.Rocketmq
             Signature.sign(client_config, metadata);
 
             var request = new rmq::SendMessageRequest();
-
-
+            var message = new rmq::Message();
+            message.Topic = new rmq::Resource();
+            message.Topic.Name = "cpp_sdk_standard";
+            message.Body = Google.Protobuf.ByteString.CopyFromUtf8("Test Body");
+            message.SystemProperties = new rmq::SystemProperties();
+            message.SystemProperties.Tag = "TagA";
+            message.SystemProperties.MessageId = "abc";
+            request.Messages.Add(message);
+            var response = await rpc_client.SendMessage(metadata, request, TimeSpan.FromSeconds(3));
+            Assert.AreEqual(rmq::Code.Ok, response.Status.Code);
         }
     }
 }
\ No newline at end of file