You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/06/01 03:11:02 UTC
[rocketmq-client-csharp] branch develop updated: WIP:
This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-csharp.git
The following commit(s) were added to refs/heads/develop by this push:
new 3ada89b WIP:
3ada89b is described below
commit 3ada89b310ffcb99c8023a99189e8f476b257b8e
Author: Li Zhanhui <li...@gmail.com>
AuthorDate: Wed Jun 1 11:10:53 2022 +0800
WIP:
---
rocketmq-client-csharp/Client.cs | 12 ++--
rocketmq-client-csharp/ClientConfig.cs | 67 +++++++++++++++++++---
rocketmq-client-csharp/IClientConfig.cs | 2 -
rocketmq-client-csharp/Producer.cs | 2 +-
.../{StaticNameServerResolver.cs => Session.cs} | 21 +++----
rocketmq-client-csharp/StaticNameServerResolver.cs | 1 -
tests/RpcClientTest.cs | 16 ++++--
7 files changed, 87 insertions(+), 34 deletions(-)
diff --git a/rocketmq-client-csharp/Client.cs b/rocketmq-client-csharp/Client.cs
index 297217f..50f85af 100644
--- a/rocketmq-client-csharp/Client.cs
+++ b/rocketmq-client-csharp/Client.cs
@@ -247,7 +247,7 @@ namespace Org.Apache.Rocketmq
TopicRouteData topicRouteData;
try
{
- topicRouteData = await Manager.ResolveRoute(target, metadata, request, getIoTimeout());
+ topicRouteData = await Manager.ResolveRoute(target, metadata, request, RequestTimeout);
if (null != topicRouteData)
{
Logger.Debug($"Got route entries for {topic} from name server");
@@ -292,7 +292,7 @@ namespace Org.Apache.Rocketmq
List<Task> tasks = new List<Task>();
foreach (var endpoint in endpoints)
{
- tasks.Add(Manager.Heartbeat(endpoint, metadata, request, getIoTimeout()));
+ tasks.Add(Manager.Heartbeat(endpoint, metadata, request, RequestTimeout));
}
await Task.WhenAll(tasks);
@@ -325,7 +325,7 @@ namespace Org.Apache.Rocketmq
{
var metadata = new grpc::Metadata();
Signature.sign(this, metadata);
- return await Manager.QueryLoadAssignment(target, metadata, request, getIoTimeout());
+ return await Manager.QueryLoadAssignment(target, metadata, request, RequestTimeout);
}
catch (System.Exception e)
{
@@ -377,7 +377,7 @@ namespace Org.Apache.Rocketmq
var metadata = new grpc::Metadata();
Signature.sign(this, metadata);
- return await Manager.Ack(target, metadata, request, getIoTimeout());
+ return await Manager.Ack(target, metadata, request, RequestTimeout);
}
public async Task<Boolean> ChangeInvisibleDuration(string target, string group, string topic, string receiptHandle, String messageId)
@@ -396,7 +396,7 @@ namespace Org.Apache.Rocketmq
var metadata = new grpc::Metadata();
Signature.sign(this, metadata);
- return await Manager.ChangeInvisibleDuration(target, metadata, request, getIoTimeout());
+ return await Manager.ChangeInvisibleDuration(target, metadata, request, RequestTimeout);
}
public async Task<bool> NotifyClientTermination()
@@ -412,7 +412,7 @@ namespace Org.Apache.Rocketmq
foreach (var endpoint in endpoints)
{
- tasks.Add(Manager.NotifyClientTermination(endpoint, metadata, request, getIoTimeout()));
+ tasks.Add(Manager.NotifyClientTermination(endpoint, metadata, request, RequestTimeout));
}
bool[] results = await Task.WhenAll(tasks);
diff --git a/rocketmq-client-csharp/ClientConfig.cs b/rocketmq-client-csharp/ClientConfig.cs
index b708c32..dfc30c6 100644
--- a/rocketmq-client-csharp/ClientConfig.cs
+++ b/rocketmq-client-csharp/ClientConfig.cs
@@ -15,6 +15,8 @@
* limitations under the License.
*/
using System;
+using System.Collections.Generic;
+using rmq = Apache.Rocketmq.V2;
namespace Org.Apache.Rocketmq {
@@ -24,8 +26,11 @@ namespace Org.Apache.Rocketmq {
var hostName = System.Net.Dns.GetHostName();
var pid = System.Diagnostics.Process.GetCurrentProcess().Id;
this.clientId_ = string.Format("{0}@{1}#{2}", hostName, pid, instanceName_);
- this._ioTimeout = TimeSpan.FromSeconds(3);
+ this._requestTimeout = TimeSpan.FromSeconds(3);
this.longPollingIoTimeout_ = TimeSpan.FromSeconds(30);
+ this.client_type_ = rmq::ClientType.Unspecified;
+ this.access_point_ = new rmq::Endpoints();
+ this.back_off_policy_ = new rmq::RetryPolicy();
}
public string region() {
@@ -64,11 +69,16 @@ namespace Org.Apache.Rocketmq {
set { _tenantId = value; }
}
- public TimeSpan getIoTimeout() {
- return _ioTimeout;
- }
- public TimeSpan IoTimeout {
- set { _ioTimeout = value; }
+ public TimeSpan RequestTimeout
+ {
+ get
+ {
+ return _requestTimeout;
+ }
+ set
+ {
+ _requestTimeout = value;
+ }
}
public TimeSpan getLongPollingTimeout() {
@@ -109,7 +119,7 @@ namespace Org.Apache.Rocketmq {
private string _tenantId;
- private TimeSpan _ioTimeout;
+ private TimeSpan _requestTimeout;
private TimeSpan longPollingIoTimeout_;
@@ -120,6 +130,49 @@ namespace Org.Apache.Rocketmq {
private bool tracingEnabled_ = false;
private string instanceName_ = "default";
+
+ private rmq::ClientType client_type_;
+ public rmq::ClientType ClientType
+ {
+ get { return client_type_; }
+ set { client_type_ = value; }
+ }
+
+
+ private rmq::Endpoints access_point_;
+
+ public rmq::AddressScheme AccessPointAddressScheme
+ {
+ get { return access_point_.Scheme; }
+ set { access_point_.Scheme = value; }
+ }
+
+ public List<rmq::Address> AccessPointAddresses
+ {
+ get
+ {
+ List<rmq::Address> addresses = new List<rmq::Address>();
+ foreach (var item in access_point_.Addresses)
+ {
+ addresses.Add(item);
+ }
+ return addresses;
+ }
+
+ set
+ {
+ access_point_.Addresses.Clear();
+ foreach (var item in value)
+ {
+ access_point_.Addresses.Add(item);
+ }
+ }
+ }
+
+ private rmq::RetryPolicy back_off_policy_;
+
+
+
}
}
diff --git a/rocketmq-client-csharp/IClientConfig.cs b/rocketmq-client-csharp/IClientConfig.cs
index 86c311e..3726ac4 100644
--- a/rocketmq-client-csharp/IClientConfig.cs
+++ b/rocketmq-client-csharp/IClientConfig.cs
@@ -28,8 +28,6 @@ namespace Org.Apache.Rocketmq {
string tenantId();
- TimeSpan getIoTimeout();
-
TimeSpan getLongPollingTimeout();
string getGroupName();
diff --git a/rocketmq-client-csharp/Producer.cs b/rocketmq-client-csharp/Producer.cs
index 118a81c..226ed5f 100644
--- a/rocketmq-client-csharp/Producer.cs
+++ b/rocketmq-client-csharp/Producer.cs
@@ -113,7 +113,7 @@ namespace Org.Apache.Rocketmq
{
try
{
- rmq::SendMessageResponse response = await Manager.SendMessage(target, metadata, request, getIoTimeout());
+ rmq::SendMessageResponse response = await Manager.SendMessage(target, metadata, request, RequestTimeout);
if (null != response && rmq::Code.Ok == response.Status.Code)
{
diff --git a/rocketmq-client-csharp/StaticNameServerResolver.cs b/rocketmq-client-csharp/Session.cs
similarity index 66%
copy from rocketmq-client-csharp/StaticNameServerResolver.cs
copy to rocketmq-client-csharp/Session.cs
index 0fbf099..2cc079e 100644
--- a/rocketmq-client-csharp/StaticNameServerResolver.cs
+++ b/rocketmq-client-csharp/Session.cs
@@ -15,24 +15,19 @@
* limitations under the License.
*/
using System;
-using System.Collections.Generic;
-using System.Threading.Tasks;
+using System.Text;
+using grpc = global::Grpc.Core;
+using System.Security.Cryptography;
namespace Org.Apache.Rocketmq
{
- public class StaticNameServerResolver : INameServerResolver
+
+ class Session
{
- public StaticNameServerResolver(List<string> nameServerList)
- {
- this.nameServerList = nameServerList;
- }
+ public string Target { get; }
+
- public async Task<List<string>> resolveAsync()
- {
- return nameServerList;
- }
+ };
- private List<string> nameServerList;
- }
}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/StaticNameServerResolver.cs b/rocketmq-client-csharp/StaticNameServerResolver.cs
index 0fbf099..bc1f670 100644
--- a/rocketmq-client-csharp/StaticNameServerResolver.cs
+++ b/rocketmq-client-csharp/StaticNameServerResolver.cs
@@ -14,7 +14,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-using System;
using System.Collections.Generic;
using System.Threading.Tasks;
diff --git a/tests/RpcClientTest.cs b/tests/RpcClientTest.cs
index 987e82e..a1ecf82 100644
--- a/tests/RpcClientTest.cs
+++ b/tests/RpcClientTest.cs
@@ -117,12 +117,12 @@ namespace Org.Apache.Rocketmq
address.Port = 8081;
address.Host = "11.166.42.94";
request.Endpoints.Addresses.Add(address);
- var response = rpc_client.QueryRoute(metadata, request, client_config.getIoTimeout());
+ var response = rpc_client.QueryRoute(metadata, request, client_config.RequestTimeout);
var result = response.GetAwaiter().GetResult();
}
[TestMethod]
- public void TestSend()
+ public async Task TestSend()
{
string target = "https://11.166.42.94:8081";
var rpc_client = new RpcClient(target);
@@ -131,8 +131,16 @@ namespace Org.Apache.Rocketmq
Signature.sign(client_config, metadata);
var request = new rmq::SendMessageRequest();
-
-
+ var message = new rmq::Message();
+ message.Topic = new rmq::Resource();
+ message.Topic.Name = "cpp_sdk_standard";
+ message.Body = Google.Protobuf.ByteString.CopyFromUtf8("Test Body");
+ message.SystemProperties = new rmq::SystemProperties();
+ message.SystemProperties.Tag = "TagA";
+ message.SystemProperties.MessageId = "abc";
+ request.Messages.Add(message);
+ var response = await rpc_client.SendMessage(metadata, request, TimeSpan.FromSeconds(3));
+ Assert.AreEqual(rmq::Code.Ok, response.Status.Code);
}
}
}
\ No newline at end of file