You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2022/04/14 15:06:21 UTC

[rocketmq] branch develop updated: [ISSUE #4167] Optimize the logic of MessageDecoder#decodeMessageId method (#4082)

This is an automated email from the ASF dual-hosted git repository.

yuzhou pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new f120645a1 [ISSUE #4167] Optimize the logic of MessageDecoder#decodeMessageId method (#4082)
f120645a1 is described below

commit f120645a11abc1ecd2b6a6f1a07a8d339cc1ea11
Author: 灼华 <43...@users.noreply.github.com>
AuthorDate: Thu Apr 14 23:06:09 2022 +0800

    [ISSUE #4167] Optimize the logic of MessageDecoder#decodeMessageId method (#4082)
    
    Co-authored-by: 灼华 <ge...@didiglobal.com>
---
 .../rocketmq/common/message/MessageDecoder.java    | 19 +++++++---------
 .../common/message/MessageDecoderTest.java         | 26 ++++++++++++++++++++++
 .../java/org/apache/rocketmq/store/MappedFile.java |  2 +-
 3 files changed, 35 insertions(+), 12 deletions(-)

diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
index 929912772..b127ac6cb 100644
--- a/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
+++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
@@ -82,20 +82,17 @@ public class MessageDecoder {
     }
 
     public static MessageId decodeMessageId(final String msgId) throws UnknownHostException {
-        SocketAddress address;
-        long offset;
-        int ipLength = msgId.length() == 32 ? 4 * 2 : 16 * 2;
+        byte[] bytes = UtilAll.string2bytes(msgId);
+        ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
 
-        byte[] ip = UtilAll.string2bytes(msgId.substring(0, ipLength));
-        byte[] port = UtilAll.string2bytes(msgId.substring(ipLength, ipLength + 8));
-        ByteBuffer bb = ByteBuffer.wrap(port);
-        int portInt = bb.getInt(0);
-        address = new InetSocketAddress(InetAddress.getByAddress(ip), portInt);
+        // address(ip+port)
+        byte[] ip = new byte[msgId.length() == 32 ? 4 : 16];
+        byteBuffer.get(ip);
+        int port = byteBuffer.getInt();
+        SocketAddress address = new InetSocketAddress(InetAddress.getByAddress(ip), port);
 
         // offset
-        byte[] data = UtilAll.string2bytes(msgId.substring(ipLength + 8, ipLength + 8 + 16));
-        bb = ByteBuffer.wrap(data);
-        offset = bb.getLong(0);
+        long offset = byteBuffer.getLong();
 
         return new MessageId(address, offset);
     }
diff --git a/common/src/test/java/org/apache/rocketmq/common/message/MessageDecoderTest.java b/common/src/test/java/org/apache/rocketmq/common/message/MessageDecoderTest.java
index b27f24669..51ea5971b 100644
--- a/common/src/test/java/org/apache/rocketmq/common/message/MessageDecoderTest.java
+++ b/common/src/test/java/org/apache/rocketmq/common/message/MessageDecoderTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.rocketmq.common.message;
 
+import org.apache.rocketmq.common.sysflag.MessageSysFlag;
 import org.junit.Test;
 
 import java.net.InetAddress;
@@ -28,6 +29,7 @@ import java.util.Map;
 import static org.apache.rocketmq.common.message.MessageDecoder.NAME_VALUE_SEPARATOR;
 import static org.apache.rocketmq.common.message.MessageDecoder.PROPERTY_SEPARATOR;
 import static org.apache.rocketmq.common.message.MessageDecoder.createMessageId;
+import static org.apache.rocketmq.common.message.MessageDecoder.decodeMessageId;
 import static org.assertj.core.api.Assertions.assertThat;
 
 public class MessageDecoderTest {
@@ -373,4 +375,28 @@ public class MessageDecoderTest {
         assertThat(m.get("1")).isEqualTo("1");
     }
 
+    @Test
+    public void testMessageId() throws Exception{
+        // ipv4 messageId test
+        MessageExt msgExt = new MessageExt();
+        msgExt.setStoreHost(new InetSocketAddress("127.0.0.1", 9103));
+        msgExt.setCommitLogOffset(123456);
+        verifyMessageId(msgExt);
+
+        // ipv6 messageId test
+        msgExt.setStoreHostAddressV6Flag();
+        msgExt.setStoreHost(new InetSocketAddress(InetAddress.getByName("::1"), 0));
+        verifyMessageId(msgExt);
+    }
+
+    private void verifyMessageId(MessageExt msgExt) throws UnknownHostException {
+        int storehostIPLength = (msgExt.getSysFlag() & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 : 16;
+        int msgIDLength = storehostIPLength + 4 + 8;
+        ByteBuffer byteBufferMsgId = ByteBuffer.allocate(msgIDLength);
+        String msgId = createMessageId(byteBufferMsgId, msgExt.getStoreHostBytes(), msgExt.getCommitLogOffset());
+
+        MessageId messageId = decodeMessageId(msgId);
+        assertThat(messageId.getAddress()).isEqualTo(msgExt.getStoreHost());
+        assertThat(messageId.getOffset()).isEqualTo(msgExt.getCommitLogOffset());
+    }
 }
\ No newline at end of file
diff --git a/store/src/main/java/org/apache/rocketmq/store/MappedFile.java b/store/src/main/java/org/apache/rocketmq/store/MappedFile.java
index b46e7cad1..f07031499 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MappedFile.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MappedFile.java
@@ -262,7 +262,7 @@ public class MappedFile extends ReferenceResource {
     }
 
     /**
-     * Content of data from offset to offset + length will be wrote to file.
+     * Content of data from offset to offset + length will be written to file.
      *
      * @param offset The offset of the subarray to be used.
      * @param length The length of the subarray to be used.