You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/02/20 13:00:01 UTC

[rocketmq-client-csharp] branch develop updated: Complete the basic send procedure (#5)

This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-csharp.git


The following commit(s) were added to refs/heads/develop by this push:
     new 837f36b  Complete the basic send procedure (#5)
837f36b is described below

commit 837f36b4fa537d0c2ea7637f01ff79837beda5a1
Author: Zhanhui Li <li...@apache.org>
AuthorDate: Sun Feb 20 20:59:58 2022 +0800

    Complete the basic send procedure (#5)
    
    Make Producer#Send work
---
 rocketmq-client-csharp/Client.cs            |  29 +++++--
 rocketmq-client-csharp/ClientConfig.cs      |   6 +-
 rocketmq-client-csharp/Message.cs           |   7 ++
 rocketmq-client-csharp/Producer.cs          |   3 +-
 rocketmq-client-csharp/SequenceGenerator.cs | 128 ++++++++++++++++++++++++++++
 tests/MessageTest.cs                        |   2 +
 tests/ProducerTest.cs                       |  73 ++++++++++++++++
 tests/RpcClientTest.cs                      |   2 +-
 tests/SequenceGeneratorTest.cs              |  49 +++++++++++
 9 files changed, 286 insertions(+), 13 deletions(-)

diff --git a/rocketmq-client-csharp/Client.cs b/rocketmq-client-csharp/Client.cs
index e3cb547..cbc5160 100644
--- a/rocketmq-client-csharp/Client.cs
+++ b/rocketmq-client-csharp/Client.cs
@@ -29,10 +29,11 @@ namespace org.apache.rocketmq
     public abstract class Client : ClientConfig, IClient
     {
 
-        public Client(INameServerResolver resolver)
+        public Client(INameServerResolver resolver, string resourceNamespace)
         {
             this.nameServerResolver = resolver;
-            this.clientManager = ClientManagerFactory.getClientManager(resourceNamespace());
+            this.resourceNamespace_ = resourceNamespace;
+            this.clientManager = ClientManagerFactory.getClientManager(resourceNamespace);
             this.nameServerResolverCTS = new CancellationTokenSource();
 
             this.topicRouteTable = new ConcurrentDictionary<string, TopicRouteData>();
@@ -176,20 +177,30 @@ namespace org.apache.rocketmq
 
             // We got one or more name servers available.
             string nameServer = nameServers[currentNameServerIndex];
-            var metadata = new grpc.Metadata();
-            Signature.sign(this, metadata);
             var request = new rmq::QueryRouteRequest();
             request.Topic = new rmq::Resource();
-            request.Topic.ResourceNamespace = resourceNamespace();
+            request.Topic.ResourceNamespace = resourceNamespace_;
             request.Topic.Name = topic;
             request.Endpoints = new rmq::Endpoints();
             request.Endpoints.Scheme = rmq::AddressScheme.Ipv4;
             var address = new rmq::Address();
-            string[] segments = nameServer.Split(":");
-            address.Host = segments[0];
-            address.Port = Int32.Parse(segments[1]);
+            int pos = nameServer.LastIndexOf(':');
+            int protocolPrefix = 0;
+            if (nameServer.StartsWith("http://"))
+            {
+                protocolPrefix = 7;
+            }
+            else if (nameServer.StartsWith("https://"))
+            {
+                protocolPrefix = 8;
+            }
+
+            address.Host = nameServer.Substring(protocolPrefix, pos - protocolPrefix);
+            address.Port = Int32.Parse(nameServer.Substring(pos + 1));
             request.Endpoints.Addresses.Add(address);
-            var target = string.Format("https://{0}:{1}", segments[0], segments[1]);
+            var target = string.Format("https://{0}:{1}", address.Host, address.Port);
+            var metadata = new grpc.Metadata();
+            Signature.sign(this, metadata);
             var topicRouteData = await clientManager.resolveRoute(target, metadata, request, getIoTimeout());
             return topicRouteData;
         }
diff --git a/rocketmq-client-csharp/ClientConfig.cs b/rocketmq-client-csharp/ClientConfig.cs
index 3f27713..949f8b4 100644
--- a/rocketmq-client-csharp/ClientConfig.cs
+++ b/rocketmq-client-csharp/ClientConfig.cs
@@ -23,7 +23,9 @@ namespace org.apache.rocketmq {
         public ClientConfig() {
             var hostName = System.Net.Dns.GetHostName();
             var pid = System.Diagnostics.Process.GetCurrentProcess().Id;
-            clientId_ = string.Format("{0}@{1}#{2}", hostName, pid, instanceName_);
+            this.clientId_ = string.Format("{0}@{1}#{2}", hostName, pid, instanceName_);
+            this.ioTimeout_ = TimeSpan.FromSeconds(3);
+            this.longPollingIoTimeout_ = TimeSpan.FromSeconds(15);
         }
 
         public string region() {
@@ -101,7 +103,7 @@ namespace org.apache.rocketmq {
         private string region_ = "cn-hangzhou";
         private string serviceName_ = "ONS";
 
-        private string resourceNamespace_;
+        protected string resourceNamespace_;
 
         private ICredentialsProvider credentialsProvider_;
 
diff --git a/rocketmq-client-csharp/Message.cs b/rocketmq-client-csharp/Message.cs
index 282e8aa..1e6ee0e 100644
--- a/rocketmq-client-csharp/Message.cs
+++ b/rocketmq-client-csharp/Message.cs
@@ -28,6 +28,7 @@ namespace org.apache.rocketmq
         }
 
         public Message(string topic, string tag, List<string> keys, byte[] body) {
+            this.messageId = SequenceGenerator.Instance.Next();
             this.maxAttemptTimes = 3;
             this.topic = topic;
             this.tag = tag;
@@ -37,6 +38,12 @@ namespace org.apache.rocketmq
             this.systemProperties = new Dictionary<string, string>();
         }
 
+        private string messageId;
+        public string MessageId
+        {
+            get { return messageId; }
+        }
+
         private string topic;
 
         public string Topic {
diff --git a/rocketmq-client-csharp/Producer.cs b/rocketmq-client-csharp/Producer.cs
index ed837bc..0b3f8a0 100644
--- a/rocketmq-client-csharp/Producer.cs
+++ b/rocketmq-client-csharp/Producer.cs
@@ -27,7 +27,7 @@ namespace org.apache.rocketmq
 {
     public class Producer : Client, IProducer
     {
-        public Producer(INameServerResolver resolver) : base(resolver)
+        public Producer(INameServerResolver resolver, string resourceNamespace) : base(resolver, resourceNamespace)
         {
             this.loadBalancer = new ConcurrentDictionary<string, PublishLoadBalancer>();
         }
@@ -79,6 +79,7 @@ namespace org.apache.rocketmq
             }
 
             request.Message.SystemAttribute = new rmq::SystemAttribute();
+            request.Message.SystemAttribute.MessageId = message.MessageId;
             if (!string.IsNullOrEmpty(message.Tag))
             {
                 request.Message.SystemAttribute.Tag = message.Tag;
diff --git a/rocketmq-client-csharp/SequenceGenerator.cs b/rocketmq-client-csharp/SequenceGenerator.cs
new file mode 100644
index 0000000..aa92c80
--- /dev/null
+++ b/rocketmq-client-csharp/SequenceGenerator.cs
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Threading;
+using System.Net.NetworkInformation;
+
+namespace org.apache.rocketmq
+{
+    /**
+     * See https://yuque.antfin-inc.com/aone709911/ca1edg/af2t6o for Sequence ID spec.
+     * 
+     * In the implementation layer, this class follows Singleton pattern.
+     */
+    public sealed class SequenceGenerator
+    {
+
+        public static SequenceGenerator Instance
+        {
+            get
+            {
+                return Nested.instance;
+            }
+        }
+
+        private class Nested
+        {
+            static Nested()
+            {
+
+            }
+
+            internal static readonly SequenceGenerator instance = new SequenceGenerator();
+        }
+
+        private SequenceGenerator()
+        {
+            currentSecond = SecondsSinceCustomEpoch();
+            macAddress = MacAddress();
+            pidBytes = ToArray(pid);
+            if (BitConverter.IsLittleEndian)
+            {
+                Array.Reverse(version);
+            }
+        }
+
+        /**
+         * Sequence version, 2 bytes.
+         */
+        private static byte[] version = new byte[2] { 0x00, 0x01 };
+
+        /**
+         * MAC address, 6 bytes.
+         */
+        private byte[] macAddress;
+
+        private int sequenceInSecond = 0;
+        private int currentSecond;
+
+        private static int pid = System.Diagnostics.Process.GetCurrentProcess().Id;
+        private static byte[] pidBytes;
+
+        private static byte[] ToArray(int number)
+        {
+            byte[] bytes = BitConverter.GetBytes(number);
+            if (BitConverter.IsLittleEndian)
+                Array.Reverse(bytes);
+            return bytes;
+        }
+
+        private static int SecondsSinceCustomEpoch()
+        {
+            var customEpoch = new DateTime(2021, 01, 01, 00, 00, 00, DateTimeKind.Utc);
+            var diff = DateTime.UtcNow.Subtract(customEpoch);
+            return (int)diff.TotalSeconds;
+        }
+
+        private static byte[] MacAddress()
+        {
+            foreach (var nic in NetworkInterface.GetAllNetworkInterfaces())
+            {
+                if (nic.OperationalStatus == OperationalStatus.Up)
+                {
+                    if (nic.Name.Equals("lo"))
+                    {
+                        continue;
+                    }
+                    return nic.GetPhysicalAddress().GetAddressBytes();
+                }
+            }
+            return null;
+        }
+
+        public string Next()
+        {
+            byte[] data = new byte[18];
+            Array.Copy(version, 0, data, 0, 2);
+            Array.Copy(macAddress, 0, data, 2, 6);
+            Array.Copy(pidBytes, 2, data, 8, 2);
+            int second = SecondsSinceCustomEpoch();
+            if (second != currentSecond)
+            {
+                currentSecond = second;
+                Interlocked.Exchange(ref sequenceInSecond, 0);
+            }
+            byte[] secondBytes = ToArray(second);
+            Array.Copy(secondBytes, 0, data, 10, 4);
+            int sequence = Interlocked.Increment(ref sequenceInSecond);
+            byte[] sequenceBytes = ToArray(sequence);
+            Array.Copy(sequenceBytes, 0, data, 14, 4);
+            return BitConverter.ToString(data).Replace("-", ""); ;
+        }
+    }
+
+}
\ No newline at end of file
diff --git a/tests/MessageTest.cs b/tests/MessageTest.cs
index eebdc46..3dd7f4b 100644
--- a/tests/MessageTest.cs
+++ b/tests/MessageTest.cs
@@ -26,6 +26,8 @@ namespace org.apache.rocketmq {
         [TestMethod]
         public void testCtor() {
             var msg1 = new Message();
+            Assert.IsNotNull(msg1.MessageId);
+            Assert.IsTrue(msg1.MessageId.StartsWith("01"));
             Assert.IsNull(msg1.Topic);
             Assert.IsNull(msg1.Body);
             Assert.IsNull(msg1.Tag);
diff --git a/tests/ProducerTest.cs b/tests/ProducerTest.cs
new file mode 100644
index 0000000..ebf045e
--- /dev/null
+++ b/tests/ProducerTest.cs
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using System.Collections.Generic;
+using System;
+
+namespace org.apache.rocketmq
+{
+
+    [TestClass]
+    public class ProducerTest
+    {
+
+        [ClassInitialize]
+        public static void SetUp(TestContext context)
+        {
+            List<string> nameServerAddress = new List<string>();
+            nameServerAddress.Add(string.Format("https://{0}:{1}", host, port));
+            resolver = new StaticNameServerResolver(nameServerAddress);
+
+            credentialsProvider = new ConfigFileCredentialsProvider();
+        }
+
+        [ClassCleanup]
+        public static void TearDown()
+        {
+
+        }
+
+
+        [TestMethod]
+        public void testSendMessage()
+        {
+            var producer = new Producer(resolver, resourceNamespace);
+            producer.ResourceNamespace = resourceNamespace;
+            producer.CredentialsProvider = new ConfigFileCredentialsProvider();
+            producer.Region = "cn-hangzhou-pre";
+            producer.start();
+            byte[] body = new byte[1024];
+            Array.Fill(body, (byte)'x');
+            var msg = new Message(topic, body);
+            var sendResult = producer.send(msg).GetAwaiter().GetResult();
+            producer.shutdown();
+        }
+
+        private static string resourceNamespace = "MQ_INST_1080056302921134_BXuIbML7";
+
+        private static string topic = "cpp_sdk_standard";
+
+        private static string clientId = "C001";
+        private static string group = "GID_cpp_sdk_standard";
+
+        private static INameServerResolver resolver;
+        private static ICredentialsProvider credentialsProvider;
+        private static string host = "116.62.231.199";
+        private static int port = 80;
+    }
+
+}
\ No newline at end of file
diff --git a/tests/RpcClientTest.cs b/tests/RpcClientTest.cs
index dd5b322..5425973 100644
--- a/tests/RpcClientTest.cs
+++ b/tests/RpcClientTest.cs
@@ -117,7 +117,7 @@ namespace org.apache.rocketmq
             request.Message.SystemAttribute = new rmq::SystemAttribute();
             request.Message.SystemAttribute.Tag = "TagA";
             request.Message.SystemAttribute.Keys.Add("key1");
-            request.Message.SystemAttribute.MessageId = "message-id-1";
+            request.Message.SystemAttribute.MessageId = SequenceGenerator.Instance.Next();
 
             var metadata = new grpc::Metadata();
             Signature.sign(clientConfig, metadata);
diff --git a/tests/SequenceGeneratorTest.cs b/tests/SequenceGeneratorTest.cs
new file mode 100644
index 0000000..fc0ceb0
--- /dev/null
+++ b/tests/SequenceGeneratorTest.cs
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+
+using System;
+using System.Collections.Generic;
+
+namespace org.apache.rocketmq
+{
+    [TestClass]
+    public class SequenceGeneratorTest
+    {
+
+        [ClassInitialize]
+        public static void SetUp(TestContext context)
+        {
+        }
+
+        [TestMethod]
+        public void testNext()
+        {
+            var set = new HashSet<string>();
+            for (int i = 0; i < 500000; i++)
+            {
+                var nextId = SequenceGenerator.Instance.Next();
+                if (set.Contains(nextId))
+                {
+                    Assert.Fail("SequenceGenerator violates uniqueness");
+                }
+                set.Add(nextId);
+            }
+        }
+    }
+}
+