You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2022/04/14 15:06:21 UTC
[rocketmq] branch develop updated: [ISSUE #4167] Optimize the logic of MessageDecoder#decodeMessageId method (#4082)
This is an automated email from the ASF dual-hosted git repository.
yuzhou pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new f120645a1 [ISSUE #4167] Optimize the logic of MessageDecoder#decodeMessageId method (#4082)
f120645a1 is described below
commit f120645a11abc1ecd2b6a6f1a07a8d339cc1ea11
Author: 灼华 <43...@users.noreply.github.com>
AuthorDate: Thu Apr 14 23:06:09 2022 +0800
[ISSUE #4167] Optimize the logic of MessageDecoder#decodeMessageId method (#4082)
Co-authored-by: 灼华 <ge...@didiglobal.com>
---
.../rocketmq/common/message/MessageDecoder.java | 19 +++++++---------
.../common/message/MessageDecoderTest.java | 26 ++++++++++++++++++++++
.../java/org/apache/rocketmq/store/MappedFile.java | 2 +-
3 files changed, 35 insertions(+), 12 deletions(-)
diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
index 929912772..b127ac6cb 100644
--- a/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
+++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
@@ -82,20 +82,17 @@ public class MessageDecoder {
}
public static MessageId decodeMessageId(final String msgId) throws UnknownHostException {
- SocketAddress address;
- long offset;
- int ipLength = msgId.length() == 32 ? 4 * 2 : 16 * 2;
+ byte[] bytes = UtilAll.string2bytes(msgId);
+ ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
- byte[] ip = UtilAll.string2bytes(msgId.substring(0, ipLength));
- byte[] port = UtilAll.string2bytes(msgId.substring(ipLength, ipLength + 8));
- ByteBuffer bb = ByteBuffer.wrap(port);
- int portInt = bb.getInt(0);
- address = new InetSocketAddress(InetAddress.getByAddress(ip), portInt);
+ // address(ip+port)
+ byte[] ip = new byte[msgId.length() == 32 ? 4 : 16];
+ byteBuffer.get(ip);
+ int port = byteBuffer.getInt();
+ SocketAddress address = new InetSocketAddress(InetAddress.getByAddress(ip), port);
// offset
- byte[] data = UtilAll.string2bytes(msgId.substring(ipLength + 8, ipLength + 8 + 16));
- bb = ByteBuffer.wrap(data);
- offset = bb.getLong(0);
+ long offset = byteBuffer.getLong();
return new MessageId(address, offset);
}
diff --git a/common/src/test/java/org/apache/rocketmq/common/message/MessageDecoderTest.java b/common/src/test/java/org/apache/rocketmq/common/message/MessageDecoderTest.java
index b27f24669..51ea5971b 100644
--- a/common/src/test/java/org/apache/rocketmq/common/message/MessageDecoderTest.java
+++ b/common/src/test/java/org/apache/rocketmq/common/message/MessageDecoderTest.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.common.message;
+import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.junit.Test;
import java.net.InetAddress;
@@ -28,6 +29,7 @@ import java.util.Map;
import static org.apache.rocketmq.common.message.MessageDecoder.NAME_VALUE_SEPARATOR;
import static org.apache.rocketmq.common.message.MessageDecoder.PROPERTY_SEPARATOR;
import static org.apache.rocketmq.common.message.MessageDecoder.createMessageId;
+import static org.apache.rocketmq.common.message.MessageDecoder.decodeMessageId;
import static org.assertj.core.api.Assertions.assertThat;
public class MessageDecoderTest {
@@ -373,4 +375,28 @@ public class MessageDecoderTest {
assertThat(m.get("1")).isEqualTo("1");
}
+ @Test
+ public void testMessageId() throws Exception{
+ // ipv4 messageId test
+ MessageExt msgExt = new MessageExt();
+ msgExt.setStoreHost(new InetSocketAddress("127.0.0.1", 9103));
+ msgExt.setCommitLogOffset(123456);
+ verifyMessageId(msgExt);
+
+ // ipv6 messageId test
+ msgExt.setStoreHostAddressV6Flag();
+ msgExt.setStoreHost(new InetSocketAddress(InetAddress.getByName("::1"), 0));
+ verifyMessageId(msgExt);
+ }
+
+ private void verifyMessageId(MessageExt msgExt) throws UnknownHostException {
+ int storehostIPLength = (msgExt.getSysFlag() & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 : 16;
+ int msgIDLength = storehostIPLength + 4 + 8;
+ ByteBuffer byteBufferMsgId = ByteBuffer.allocate(msgIDLength);
+ String msgId = createMessageId(byteBufferMsgId, msgExt.getStoreHostBytes(), msgExt.getCommitLogOffset());
+
+ MessageId messageId = decodeMessageId(msgId);
+ assertThat(messageId.getAddress()).isEqualTo(msgExt.getStoreHost());
+ assertThat(messageId.getOffset()).isEqualTo(msgExt.getCommitLogOffset());
+ }
}
\ No newline at end of file
diff --git a/store/src/main/java/org/apache/rocketmq/store/MappedFile.java b/store/src/main/java/org/apache/rocketmq/store/MappedFile.java
index b46e7cad1..f07031499 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MappedFile.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MappedFile.java
@@ -262,7 +262,7 @@ public class MappedFile extends ReferenceResource {
}
/**
- * Content of data from offset to offset + length will be wrote to file.
+ * Content of data from offset to offset + length will be written to file.
*
* @param offset The offset of the subarray to be used.
* @param length The length of the subarray to be used.