You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/02/20 13:03:05 UTC
[rocketmq-client-csharp] branch develop updated: Implement MessageIdGenerator (#4)
This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-csharp.git
The following commit(s) were added to refs/heads/develop by this push:
new 6b3a3b0 Implement MessageIdGenerator (#4)
6b3a3b0 is described below
commit 6b3a3b06b08381dc998b9e01d195582a1935979e
Author: aaron ai <ya...@gmail.com>
AuthorDate: Sun Feb 20 21:02:58 2022 +0800
Implement MessageIdGenerator (#4)
---
.gitignore | 5 +-
rocketmq-client-csharp/MessageIdGenerator.cs | 104 +++++++++++++++++++++++++++
rocketmq-client-csharp/Utilities.cs | 53 ++++++++++++++
tests/MessageIdGeneratorTest.cs | 42 +++++++++++
4 files changed, 203 insertions(+), 1 deletion(-)
diff --git a/.gitignore b/.gitignore
index 4f19896..678b6cc 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,3 +1,6 @@
bin
obj
-.vscode
\ No newline at end of file
+.vscode/
+.idea
+*.user
+*DS_Store
\ No newline at end of file
diff --git a/rocketmq-client-csharp/MessageIdGenerator.cs b/rocketmq-client-csharp/MessageIdGenerator.cs
new file mode 100644
index 0000000..8af1fda
--- /dev/null
+++ b/rocketmq-client-csharp/MessageIdGenerator.cs
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Diagnostics;
+using System.IO;
+using System.Threading;
+
+namespace org.apache.rocketmq
+{
+ /**
+ * MessageId generate rules refer: https://yuque.antfin-inc.com/aone709911/ca1edg/af2t6o
+ */
+ public class MessageIdGenerator
+ {
+ public static readonly string version = "01";
+ private static readonly MessageIdGenerator Instance = new();
+
+ private readonly string _prefix;
+
+ private readonly long _secondsSinceCustomEpoch;
+ private readonly Stopwatch _stopwatch;
+
+ private int _sequence;
+
+ private MessageIdGenerator()
+ {
+ MemoryStream stream = new MemoryStream();
+ BinaryWriter writer = new BinaryWriter(stream);
+
+ var macAddress = Utilities.GetMacAddress();
+ writer.Write(macAddress, 0, 6);
+
+ int processId = Utilities.GetProcessId();
+
+ byte[] processIdBytes = BitConverter.GetBytes(processId);
+ if (BitConverter.IsLittleEndian)
+ {
+ Array.Reverse(processIdBytes);
+ }
+
+ writer.Write(processIdBytes, 2, 2);
+ var array = stream.ToArray();
+ _prefix = version + Utilities.ByteArrayToHexString(array);
+
+ DateTime epoch = new DateTime(2021, 1, 1,
+ 0, 0, 0, 0, DateTimeKind.Utc);
+
+ var now = DateTime.Now;
+ _secondsSinceCustomEpoch = Convert.ToInt64(now.ToUniversalTime().Subtract(epoch).TotalSeconds);
+ _stopwatch = Stopwatch.StartNew();
+
+ _sequence = 0;
+ }
+
+ public String Next()
+ {
+ long deltaSeconds = _secondsSinceCustomEpoch + _stopwatch.ElapsedMilliseconds / 1_000;
+
+ MemoryStream stream = new MemoryStream();
+ BinaryWriter writer = new BinaryWriter(stream);
+
+ byte[] deltaSecondsBytes = BitConverter.GetBytes(deltaSeconds);
+ if (BitConverter.IsLittleEndian)
+ {
+ Array.Reverse(deltaSecondsBytes);
+ }
+
+ writer.Write(deltaSecondsBytes, 4, 4);
+
+ int no = Interlocked.Increment(ref _sequence);
+ byte[] noBytes = BitConverter.GetBytes(no);
+ if (BitConverter.IsLittleEndian)
+ {
+ Array.Reverse(noBytes);
+ }
+
+ writer.Write(noBytes);
+ var suffixBytes = stream.ToArray();
+
+ return _prefix + Utilities.ByteArrayToHexString(suffixBytes);
+ }
+
+
+ public static MessageIdGenerator GetInstance()
+ {
+ return Instance;
+ }
+ }
+}
\ No newline at end of file
diff --git a/rocketmq-client-csharp/Utilities.cs b/rocketmq-client-csharp/Utilities.cs
new file mode 100644
index 0000000..1834a77
--- /dev/null
+++ b/rocketmq-client-csharp/Utilities.cs
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System.Diagnostics;
+using System.Linq;
+using System.Net.NetworkInformation;
+using System.Text;
+
+namespace org.apache.rocketmq
+{
+ public static class Utilities
+ {
+ public static byte[] GetMacAddress()
+ {
+ return NetworkInterface.GetAllNetworkInterfaces().FirstOrDefault(nic =>
+ nic.OperationalStatus == OperationalStatus.Up &&
+ nic.NetworkInterfaceType != NetworkInterfaceType.Loopback)?.GetPhysicalAddress().GetAddressBytes();
+ }
+
+ public static int GetProcessId()
+ {
+ return Process.GetCurrentProcess().Id;
+ }
+
+ public static string ByteArrayToHexString(byte[] bytes)
+ {
+ StringBuilder result = new StringBuilder(bytes.Length * 2);
+ const string hexAlphabet = "0123456789ABCDEF";
+
+ foreach (byte b in bytes)
+ {
+ result.Append(hexAlphabet[(int)(b >> 4)]);
+ result.Append(hexAlphabet[(int)(b & 0xF)]);
+ }
+
+ return result.ToString();
+ }
+ }
+}
\ No newline at end of file
diff --git a/tests/MessageIdGeneratorTest.cs b/tests/MessageIdGeneratorTest.cs
new file mode 100644
index 0000000..6ed34d6
--- /dev/null
+++ b/tests/MessageIdGeneratorTest.cs
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using org.apache.rocketmq;
+
+namespace tests
+{
+ [TestClass]
+ public class MessageIdGeneratorTest
+ {
+ [TestMethod]
+ public void TestNext()
+ {
+ MessageIdGenerator instance = MessageIdGenerator.GetInstance();
+ var firstMessageId = instance.Next();
+ Assert.AreEqual(34, firstMessageId.Length);
+ Assert.AreEqual(MessageIdGenerator.version, firstMessageId.Substring(0, 2));
+
+ var secondMessageId = instance.Next();
+ Assert.AreEqual(34, secondMessageId.Length);
+ Assert.AreEqual(MessageIdGenerator.version, secondMessageId.Substring(0, 2));
+
+ Assert.AreNotEqual(firstMessageId, secondMessageId);
+ Assert.AreEqual(firstMessageId.Substring(0, 24), secondMessageId.Substring(0, 24));
+ }
+ }
+}
\ No newline at end of file