You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/02/20 13:00:01 UTC
[rocketmq-client-csharp] branch develop updated: Complete the basic send procedure (#5)
This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-csharp.git
The following commit(s) were added to refs/heads/develop by this push:
new 837f36b Complete the basic send procedure (#5)
837f36b is described below
commit 837f36b4fa537d0c2ea7637f01ff79837beda5a1
Author: Zhanhui Li <li...@apache.org>
AuthorDate: Sun Feb 20 20:59:58 2022 +0800
Complete the basic send procedure (#5)
Make Producer#Send work
---
rocketmq-client-csharp/Client.cs | 29 +++++--
rocketmq-client-csharp/ClientConfig.cs | 6 +-
rocketmq-client-csharp/Message.cs | 7 ++
rocketmq-client-csharp/Producer.cs | 3 +-
rocketmq-client-csharp/SequenceGenerator.cs | 128 ++++++++++++++++++++++++++++
tests/MessageTest.cs | 2 +
tests/ProducerTest.cs | 73 ++++++++++++++++
tests/RpcClientTest.cs | 2 +-
tests/SequenceGeneratorTest.cs | 49 +++++++++++
9 files changed, 286 insertions(+), 13 deletions(-)
diff --git a/rocketmq-client-csharp/Client.cs b/rocketmq-client-csharp/Client.cs
index e3cb547..cbc5160 100644
--- a/rocketmq-client-csharp/Client.cs
+++ b/rocketmq-client-csharp/Client.cs
@@ -29,10 +29,11 @@ namespace org.apache.rocketmq
public abstract class Client : ClientConfig, IClient
{
- public Client(INameServerResolver resolver)
+ public Client(INameServerResolver resolver, string resourceNamespace)
{
this.nameServerResolver = resolver;
- this.clientManager = ClientManagerFactory.getClientManager(resourceNamespace());
+ this.resourceNamespace_ = resourceNamespace;
+ this.clientManager = ClientManagerFactory.getClientManager(resourceNamespace);
this.nameServerResolverCTS = new CancellationTokenSource();
this.topicRouteTable = new ConcurrentDictionary<string, TopicRouteData>();
@@ -176,20 +177,30 @@ namespace org.apache.rocketmq
// We got one or more name servers available.
string nameServer = nameServers[currentNameServerIndex];
- var metadata = new grpc.Metadata();
- Signature.sign(this, metadata);
var request = new rmq::QueryRouteRequest();
request.Topic = new rmq::Resource();
- request.Topic.ResourceNamespace = resourceNamespace();
+ request.Topic.ResourceNamespace = resourceNamespace_;
request.Topic.Name = topic;
request.Endpoints = new rmq::Endpoints();
request.Endpoints.Scheme = rmq::AddressScheme.Ipv4;
var address = new rmq::Address();
- string[] segments = nameServer.Split(":");
- address.Host = segments[0];
- address.Port = Int32.Parse(segments[1]);
+ int pos = nameServer.LastIndexOf(':');
+ int protocolPrefix = 0;
+ if (nameServer.StartsWith("http://"))
+ {
+ protocolPrefix = 7;
+ }
+ else if (nameServer.StartsWith("https://"))
+ {
+ protocolPrefix = 8;
+ }
+
+ address.Host = nameServer.Substring(protocolPrefix, pos - protocolPrefix);
+ address.Port = Int32.Parse(nameServer.Substring(pos + 1));
request.Endpoints.Addresses.Add(address);
- var target = string.Format("https://{0}:{1}", segments[0], segments[1]);
+ var target = string.Format("https://{0}:{1}", address.Host, address.Port);
+ var metadata = new grpc.Metadata();
+ Signature.sign(this, metadata);
var topicRouteData = await clientManager.resolveRoute(target, metadata, request, getIoTimeout());
return topicRouteData;
}
diff --git a/rocketmq-client-csharp/ClientConfig.cs b/rocketmq-client-csharp/ClientConfig.cs
index 3f27713..949f8b4 100644
--- a/rocketmq-client-csharp/ClientConfig.cs
+++ b/rocketmq-client-csharp/ClientConfig.cs
@@ -23,7 +23,9 @@ namespace org.apache.rocketmq {
public ClientConfig() {
var hostName = System.Net.Dns.GetHostName();
var pid = System.Diagnostics.Process.GetCurrentProcess().Id;
- clientId_ = string.Format("{0}@{1}#{2}", hostName, pid, instanceName_);
+ this.clientId_ = string.Format("{0}@{1}#{2}", hostName, pid, instanceName_);
+ this.ioTimeout_ = TimeSpan.FromSeconds(3);
+ this.longPollingIoTimeout_ = TimeSpan.FromSeconds(15);
}
public string region() {
@@ -101,7 +103,7 @@ namespace org.apache.rocketmq {
private string region_ = "cn-hangzhou";
private string serviceName_ = "ONS";
- private string resourceNamespace_;
+ protected string resourceNamespace_;
private ICredentialsProvider credentialsProvider_;
diff --git a/rocketmq-client-csharp/Message.cs b/rocketmq-client-csharp/Message.cs
index 282e8aa..1e6ee0e 100644
--- a/rocketmq-client-csharp/Message.cs
+++ b/rocketmq-client-csharp/Message.cs
@@ -28,6 +28,7 @@ namespace org.apache.rocketmq
}
public Message(string topic, string tag, List<string> keys, byte[] body) {
+ this.messageId = SequenceGenerator.Instance.Next();
this.maxAttemptTimes = 3;
this.topic = topic;
this.tag = tag;
@@ -37,6 +38,12 @@ namespace org.apache.rocketmq
this.systemProperties = new Dictionary<string, string>();
}
+ private string messageId;
+ public string MessageId
+ {
+ get { return messageId; }
+ }
+
private string topic;
public string Topic {
diff --git a/rocketmq-client-csharp/Producer.cs b/rocketmq-client-csharp/Producer.cs
index ed837bc..0b3f8a0 100644
--- a/rocketmq-client-csharp/Producer.cs
+++ b/rocketmq-client-csharp/Producer.cs
@@ -27,7 +27,7 @@ namespace org.apache.rocketmq
{
public class Producer : Client, IProducer
{
- public Producer(INameServerResolver resolver) : base(resolver)
+ public Producer(INameServerResolver resolver, string resourceNamespace) : base(resolver, resourceNamespace)
{
this.loadBalancer = new ConcurrentDictionary<string, PublishLoadBalancer>();
}
@@ -79,6 +79,7 @@ namespace org.apache.rocketmq
}
request.Message.SystemAttribute = new rmq::SystemAttribute();
+ request.Message.SystemAttribute.MessageId = message.MessageId;
if (!string.IsNullOrEmpty(message.Tag))
{
request.Message.SystemAttribute.Tag = message.Tag;
diff --git a/rocketmq-client-csharp/SequenceGenerator.cs b/rocketmq-client-csharp/SequenceGenerator.cs
new file mode 100644
index 0000000..aa92c80
--- /dev/null
+++ b/rocketmq-client-csharp/SequenceGenerator.cs
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Threading;
+using System.Net.NetworkInformation;
+
+namespace org.apache.rocketmq
+{
+ /**
+ * See https://yuque.antfin-inc.com/aone709911/ca1edg/af2t6o for Sequence ID spec.
+ *
+ * In the implementation layer, this class follows Singleton pattern.
+ */
+ public sealed class SequenceGenerator
+ {
+
+ public static SequenceGenerator Instance
+ {
+ get
+ {
+ return Nested.instance;
+ }
+ }
+
+ private class Nested
+ {
+ static Nested()
+ {
+
+ }
+
+ internal static readonly SequenceGenerator instance = new SequenceGenerator();
+ }
+
+ private SequenceGenerator()
+ {
+ currentSecond = SecondsSinceCustomEpoch();
+ macAddress = MacAddress();
+ pidBytes = ToArray(pid);
+ if (BitConverter.IsLittleEndian)
+ {
+ Array.Reverse(version);
+ }
+ }
+
+ /**
+ * Sequence version, 2 bytes.
+ */
+ private static byte[] version = new byte[2] { 0x00, 0x01 };
+
+ /**
+ * MAC address, 6 bytes.
+ */
+ private byte[] macAddress;
+
+ private int sequenceInSecond = 0;
+ private int currentSecond;
+
+ private static int pid = System.Diagnostics.Process.GetCurrentProcess().Id;
+ private static byte[] pidBytes;
+
+ private static byte[] ToArray(int number)
+ {
+ byte[] bytes = BitConverter.GetBytes(number);
+ if (BitConverter.IsLittleEndian)
+ Array.Reverse(bytes);
+ return bytes;
+ }
+
+ private static int SecondsSinceCustomEpoch()
+ {
+ var customEpoch = new DateTime(2021, 01, 01, 00, 00, 00, DateTimeKind.Utc);
+ var diff = DateTime.UtcNow.Subtract(customEpoch);
+ return (int)diff.TotalSeconds;
+ }
+
+ private static byte[] MacAddress()
+ {
+ foreach (var nic in NetworkInterface.GetAllNetworkInterfaces())
+ {
+ if (nic.OperationalStatus == OperationalStatus.Up)
+ {
+ if (nic.Name.Equals("lo"))
+ {
+ continue;
+ }
+ return nic.GetPhysicalAddress().GetAddressBytes();
+ }
+ }
+ return null;
+ }
+
+ public string Next()
+ {
+ byte[] data = new byte[18];
+ Array.Copy(version, 0, data, 0, 2);
+ Array.Copy(macAddress, 0, data, 2, 6);
+ Array.Copy(pidBytes, 2, data, 8, 2);
+ int second = SecondsSinceCustomEpoch();
+ if (second != currentSecond)
+ {
+ currentSecond = second;
+ Interlocked.Exchange(ref sequenceInSecond, 0);
+ }
+ byte[] secondBytes = ToArray(second);
+ Array.Copy(secondBytes, 0, data, 10, 4);
+ int sequence = Interlocked.Increment(ref sequenceInSecond);
+ byte[] sequenceBytes = ToArray(sequence);
+ Array.Copy(sequenceBytes, 0, data, 14, 4);
+ return BitConverter.ToString(data).Replace("-", ""); ;
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/tests/MessageTest.cs b/tests/MessageTest.cs
index eebdc46..3dd7f4b 100644
--- a/tests/MessageTest.cs
+++ b/tests/MessageTest.cs
@@ -26,6 +26,8 @@ namespace org.apache.rocketmq {
[TestMethod]
public void testCtor() {
var msg1 = new Message();
+ Assert.IsNotNull(msg1.MessageId);
+ Assert.IsTrue(msg1.MessageId.StartsWith("01"));
Assert.IsNull(msg1.Topic);
Assert.IsNull(msg1.Body);
Assert.IsNull(msg1.Tag);
diff --git a/tests/ProducerTest.cs b/tests/ProducerTest.cs
new file mode 100644
index 0000000..ebf045e
--- /dev/null
+++ b/tests/ProducerTest.cs
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using System.Collections.Generic;
+using System;
+
+namespace org.apache.rocketmq
+{
+
+ [TestClass]
+ public class ProducerTest
+ {
+
+ [ClassInitialize]
+ public static void SetUp(TestContext context)
+ {
+ List<string> nameServerAddress = new List<string>();
+ nameServerAddress.Add(string.Format("https://{0}:{1}", host, port));
+ resolver = new StaticNameServerResolver(nameServerAddress);
+
+ credentialsProvider = new ConfigFileCredentialsProvider();
+ }
+
+ [ClassCleanup]
+ public static void TearDown()
+ {
+
+ }
+
+
+ [TestMethod]
+ public void testSendMessage()
+ {
+ var producer = new Producer(resolver, resourceNamespace);
+ producer.ResourceNamespace = resourceNamespace;
+ producer.CredentialsProvider = new ConfigFileCredentialsProvider();
+ producer.Region = "cn-hangzhou-pre";
+ producer.start();
+ byte[] body = new byte[1024];
+ Array.Fill(body, (byte)'x');
+ var msg = new Message(topic, body);
+ var sendResult = producer.send(msg).GetAwaiter().GetResult();
+ producer.shutdown();
+ }
+
+ private static string resourceNamespace = "MQ_INST_1080056302921134_BXuIbML7";
+
+ private static string topic = "cpp_sdk_standard";
+
+ private static string clientId = "C001";
+ private static string group = "GID_cpp_sdk_standard";
+
+ private static INameServerResolver resolver;
+ private static ICredentialsProvider credentialsProvider;
+ private static string host = "116.62.231.199";
+ private static int port = 80;
+ }
+
+}
\ No newline at end of file
diff --git a/tests/RpcClientTest.cs b/tests/RpcClientTest.cs
index dd5b322..5425973 100644
--- a/tests/RpcClientTest.cs
+++ b/tests/RpcClientTest.cs
@@ -117,7 +117,7 @@ namespace org.apache.rocketmq
request.Message.SystemAttribute = new rmq::SystemAttribute();
request.Message.SystemAttribute.Tag = "TagA";
request.Message.SystemAttribute.Keys.Add("key1");
- request.Message.SystemAttribute.MessageId = "message-id-1";
+ request.Message.SystemAttribute.MessageId = SequenceGenerator.Instance.Next();
var metadata = new grpc::Metadata();
Signature.sign(clientConfig, metadata);
diff --git a/tests/SequenceGeneratorTest.cs b/tests/SequenceGeneratorTest.cs
new file mode 100644
index 0000000..fc0ceb0
--- /dev/null
+++ b/tests/SequenceGeneratorTest.cs
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+
+using System;
+using System.Collections.Generic;
+
+namespace org.apache.rocketmq
+{
+ [TestClass]
+ public class SequenceGeneratorTest
+ {
+
+ [ClassInitialize]
+ public static void SetUp(TestContext context)
+ {
+ }
+
+ [TestMethod]
+ public void testNext()
+ {
+ var set = new HashSet<string>();
+ for (int i = 0; i < 500000; i++)
+ {
+ var nextId = SequenceGenerator.Instance.Next();
+ if (set.Contains(nextId))
+ {
+ Assert.Fail("SequenceGenerator violates uniqueness");
+ }
+ set.Add(nextId);
+ }
+ }
+ }
+}
+