You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by "yanchaomei (via GitHub)" <gi...@apache.org> on 2023/05/31 16:39:58 UTC

[GitHub] [rocketmq-clients] yanchaomei opened a new pull request, #530: add MessageIdCodec class

yanchaomei opened a new pull request, #530:
URL: https://github.com/apache/rocketmq-clients/pull/530

   <!-- Please make sure the target branch is right. In most case, the target branch should be `master`. -->
   
   ### Which Issue(s) This PR Fixes
   
   <!-- Please ensure that the related issue has already been created, and [link this pull request to that issue using keywords](<https://docs.github.com/en/issues/tracking-your-work-with-issues/linking-a-pull-request-to-an-issue#linking-a-pull-request-to-an-issue-using-a-keyword>) to ensure automatic closure. -->
   
   Fixes #529 
   
   ### Brief Description
   
   Implement message id generator for Python client
   <!-- Write a brief description for your pull request to help the maintainer understand the reasons behind your changes. -->
   
   ### How Did You Test This Change?
   
   <!-- In order to ensure the code quality of Apache RocketMQ, we expect every pull request to have undergone thorough testing. -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq-clients] aaron-ai commented on a diff in pull request #530: add MessageIdCodec class

Posted by "aaron-ai (via GitHub)" <gi...@apache.org>.
aaron-ai commented on code in PR #530:
URL: https://github.com/apache/rocketmq-clients/pull/530#discussion_r1212741408


##########
python/client/message/message_id_codec.py:
##########
@@ -0,0 +1,78 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import threading
+import time
+import uuid
+from datetime import datetime, timezone
+
+
+class MessageIdCodec:
+    MESSAGE_ID_LENGTH_FOR_V1_OR_LATER = 34
+    MESSAGE_ID_VERSION_V0 = "00"
+    MESSAGE_ID_VERSION_V1 = "01"
+
+    _instance = None
+    _lock = threading.Lock()
+
+    def __new__(cls, *args, **kwargs):
+        if not cls._instance:
+            with cls._lock:
+                if not cls._instance:
+                    cls._instance = super(MessageIdCodec, cls).__new__(cls)
+        return cls._instance
+
+    def __init__(self):
+        self.processFixedStringV1 = self._get_process_fixed_string()
+        self.secondsSinceCustomEpoch = self._get_seconds_since_custom_epoch()
+        self.secondsStartTimestamp = int(time.time())
+        self.seconds = self._delta_seconds()
+        self.sequence = 0
+
+    def _get_process_fixed_string(self):
+        mac = uuid.getnode()
+        mac = format(mac, '012x')
+        mac_bytes = bytes.fromhex(mac[-12:])
+        pid = os.getpid()
+        pid_bytes = pid.to_bytes(2, 'big')

Review Comment:
   ![image](https://github.com/apache/rocketmq-clients/assets/19537356/118a2020-b696-4573-bbd1-7cc891894173)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq-clients] aaron-ai merged pull request #530: add MessageIdCodec class

Posted by "aaron-ai (via GitHub)" <gi...@apache.org>.
aaron-ai merged PR #530:
URL: https://github.com/apache/rocketmq-clients/pull/530


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq-clients] aaron-ai commented on a diff in pull request #530: add MessageIdCodec class

Posted by "aaron-ai (via GitHub)" <gi...@apache.org>.
aaron-ai commented on code in PR #530:
URL: https://github.com/apache/rocketmq-clients/pull/530#discussion_r1213877119


##########
python/client/message/message_id_codec.py:
##########
@@ -0,0 +1,78 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import threading
+import time
+import uuid
+from datetime import datetime, timezone
+
+
+class MessageIdCodec:
+    MESSAGE_ID_LENGTH_FOR_V1_OR_LATER = 34
+    MESSAGE_ID_VERSION_V0 = "00"
+    MESSAGE_ID_VERSION_V1 = "01"
+
+    _instance = None
+    _lock = threading.Lock()
+
+    def __new__(cls, *args, **kwargs):
+        if not cls._instance:
+            with cls._lock:
+                if not cls._instance:
+                    cls._instance = super(MessageIdCodec, cls).__new__(cls)
+        return cls._instance
+
+    def __init__(self):
+        self.processFixedStringV1 = self._get_process_fixed_string()
+        self.secondsSinceCustomEpoch = self._get_seconds_since_custom_epoch()
+        self.secondsStartTimestamp = int(time.time())
+        self.seconds = self._delta_seconds()
+        self.sequence = 0
+
+    def _get_process_fixed_string(self):
+        mac = uuid.getnode()
+        mac = format(mac, '012x')
+        mac_bytes = bytes.fromhex(mac[-12:])
+        pid = os.getpid()
+        pid_bytes = pid.to_bytes(2, 'big')

Review Comment:
   > I didn't encounter this issue on my machine (macos). It seems to be due to the process id being too large, causing an overflow that can't fit into 2 bytes. This can be resolved by allocating more bytes. However, the process id in the message id is constrained to two bytes. If we do not consider the issue of duplication, we can also address this issue by making the following modifications:
   > 
   > Pid=os. getpid ()% 65536 Pid_ Bytes=pid. to_ Bytes (2, 'big')
   
   Nice! actually we pick the lower 2 bytes rather than the whole process id, so it makes sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq-clients] aaron-ai commented on a diff in pull request #530: add MessageIdCodec class

Posted by "aaron-ai (via GitHub)" <gi...@apache.org>.
aaron-ai commented on code in PR #530:
URL: https://github.com/apache/rocketmq-clients/pull/530#discussion_r1214057192


##########
python/client/message/message_id_codec.py:
##########
@@ -0,0 +1,78 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import threading
+import time
+import uuid
+from datetime import datetime, timezone
+
+
+class MessageIdCodec:
+    MESSAGE_ID_LENGTH_FOR_V1_OR_LATER = 34
+    MESSAGE_ID_VERSION_V0 = "00"
+    MESSAGE_ID_VERSION_V1 = "01"
+
+    _instance = None
+    _lock = threading.Lock()
+
+    def __new__(cls, *args, **kwargs):
+        if not cls._instance:
+            with cls._lock:
+                if not cls._instance:
+                    cls._instance = super(MessageIdCodec, cls).__new__(cls)
+        return cls._instance
+
+    def __init__(self):
+        self.processFixedStringV1 = self._get_process_fixed_string()
+        self.secondsSinceCustomEpoch = self._get_seconds_since_custom_epoch()
+        self.secondsStartTimestamp = int(time.time())
+        self.seconds = self._delta_seconds()
+        self.sequence = 0
+
+    def _get_process_fixed_string(self):
+        mac = uuid.getnode()
+        mac = format(mac, '012x')
+        mac_bytes = bytes.fromhex(mac[-12:])
+        pid = os.getpid() % 65536
+        pid_bytes = pid.to_bytes(2, 'big')
+        return mac_bytes.hex() + pid_bytes.hex()
+
+    def _get_seconds_since_custom_epoch(self):
+        custom_epoch = datetime(2021, 1, 1, tzinfo=timezone.utc)
+        now = datetime.now(timezone.utc)
+        return int((now - custom_epoch).total_seconds())
+
+    def _delta_seconds(self):
+        return int(time.time()) - self.secondsStartTimestamp + self.secondsSinceCustomEpoch
+
+    def next_message_id(self):
+        self.sequence += 1
+        self.seconds = self._delta_seconds()
+        seconds_bytes = self.seconds.to_bytes(4, 'big')
+        sequence_bytes = self.sequence.to_bytes(4, 'big')
+        return self.MESSAGE_ID_VERSION_V1 + self.processFixedStringV1 + seconds_bytes.hex() + sequence_bytes.hex()
+
+    def decode(self, message_id):
+        if len(message_id) != self.MESSAGE_ID_LENGTH_FOR_V1_OR_LATER:
+            return self.MESSAGE_ID_VERSION_V0, message_id
+        return message_id[:2], message_id[2:]
+
+
+if __name__ == "__main__":
+    codec = MessageIdCodec()
+    next_id = codec.next_message_id()
+    print(next_id)
+    print(codec.decode(next_id + '123'))

Review Comment:
   ```suggestion
   import math
   import os
   import threading
   import time
   import uuid
   from datetime import datetime, timezone
   
   
   class MessageIdCodec:
       __MESSAGE_ID_VERSION_V1 = "01"
   
       @staticmethod
       def __get_process_fixed_string():
           mac = uuid.getnode()
           mac = format(mac, "012x")
           mac_bytes = bytes.fromhex(mac[-12:])
           pid = os.getpid() % 65536
           pid_bytes = pid.to_bytes(2, "big")
           return mac_bytes.hex().upper() + pid_bytes.hex().upper()
   
       @staticmethod
       def __get_seconds_since_custom_epoch():
           custom_epoch = datetime(2021, 1, 1, tzinfo=timezone.utc)
           now = datetime.now(timezone.utc)
           return int((now - custom_epoch).total_seconds())
   
       __PROCESS_FIXED_STRING_V1 = __get_process_fixed_string()
       __SECONDS_SINCE_CUSTOM_EPOCH = __get_seconds_since_custom_epoch()
       __SECONDS_START_TIMESTAMP = int(time.time())
   
       @staticmethod
       def __delta_seconds():
           return (
               int(time.time())
               - MessageIdCodec.__SECONDS_START_TIMESTAMP
               + MessageIdCodec.__SECONDS_SINCE_CUSTOM_EPOCH
           )
   
       @staticmethod
       def __int_to_bytes_with_big_endian(number: int, min_bytes: int):
           num_bytes = max(math.ceil(number.bit_length() / 8), min_bytes)
           return number.to_bytes(num_bytes, "big")
   
       __SEQUENCE = 0
       __SEQUENCE_LOCK = threading.Lock()
   
       @staticmethod
       def __get_and_increment_sequence():
           with MessageIdCodec.__SEQUENCE_LOCK:
               temp = MessageIdCodec.__SEQUENCE
               MessageIdCodec.__SEQUENCE += 1
               return temp
   
       @staticmethod
       def next_message_id():
           seconds = MessageIdCodec.__delta_seconds()
           seconds_bytes = MessageIdCodec.__int_to_bytes_with_big_endian(seconds, 4)[-4:]
           sequence_bytes = MessageIdCodec.__int_to_bytes_with_big_endian(
               MessageIdCodec.__get_and_increment_sequence(), 4
           )[-4:]
           return (
               MessageIdCodec.__MESSAGE_ID_VERSION_V1
               + MessageIdCodec.__PROCESS_FIXED_STRING_V1
               + seconds_bytes.hex().upper()
               + sequence_bytes.hex().upper()
           )
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq-clients] yanchaomei commented on a diff in pull request #530: add MessageIdCodec class

Posted by "yanchaomei (via GitHub)" <gi...@apache.org>.
yanchaomei commented on code in PR #530:
URL: https://github.com/apache/rocketmq-clients/pull/530#discussion_r1213841433


##########
python/client/message/message_id_codec.py:
##########
@@ -0,0 +1,78 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import threading
+import time
+import uuid
+from datetime import datetime, timezone
+
+
+class MessageIdCodec:
+    MESSAGE_ID_LENGTH_FOR_V1_OR_LATER = 34
+    MESSAGE_ID_VERSION_V0 = "00"
+    MESSAGE_ID_VERSION_V1 = "01"
+
+    _instance = None
+    _lock = threading.Lock()
+
+    def __new__(cls, *args, **kwargs):
+        if not cls._instance:
+            with cls._lock:
+                if not cls._instance:
+                    cls._instance = super(MessageIdCodec, cls).__new__(cls)
+        return cls._instance
+
+    def __init__(self):
+        self.processFixedStringV1 = self._get_process_fixed_string()
+        self.secondsSinceCustomEpoch = self._get_seconds_since_custom_epoch()
+        self.secondsStartTimestamp = int(time.time())
+        self.seconds = self._delta_seconds()
+        self.sequence = 0
+
+    def _get_process_fixed_string(self):
+        mac = uuid.getnode()
+        mac = format(mac, '012x')
+        mac_bytes = bytes.fromhex(mac[-12:])
+        pid = os.getpid()
+        pid_bytes = pid.to_bytes(2, 'big')

Review Comment:
   I didn't encounter this issue on my machine (macos). It seems to be due to the process id being too large, causing an overflow that can't fit into 2 bytes. This can be resolved by allocating more bytes. However, the process id in the message id is constrained to two bytes. If we do not consider the issue of duplication, we can also address this issue by making the following modifications:
   
   Pid=os. getpid ()% 65536
   Pid_ Bytes=pid. to_ Bytes (2, 'big')



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org