You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by hu...@apache.org on 2021/12/01 10:37:54 UTC
[rocketmq] 03/11: 优化发送、消费的解码速度
This is an automated email from the ASF dual-hosted git repository.
huangli pushed a commit to branch 4.9.2_dev_community
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit f5e0151fc4ce725317834525fc5a91a310c9aafb
Author: huangli <ar...@gmail.com>
AuthorDate: Thu Nov 4 16:46:12 2021 +0800
优化发送、消费的解码速度
---
.../processor/AbstractSendMessageProcessor.java | 77 +---------------------
.../protocol/header/PullMessageRequestHeader.java | 48 +++++++++++++-
.../protocol/header/PullMessageResponseHeader.java | 24 ++++++-
.../header/SendMessageRequestHeaderV2.java | 66 ++++++++++++++++++-
.../protocol/header/SendMessageResponseHeader.java | 23 ++++++-
.../protocol/header/FastCodesHeaderTest.java | 28 +++++---
.../remoting/protocol/FastCodesHeader.java | 34 ++++++++++
.../remoting/protocol/RemotingCommand.java | 5 ++
8 files changed, 219 insertions(+), 86 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
index 3303d70..66480ad 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
@@ -288,7 +288,9 @@ public abstract class AbstractSendMessageProcessor extends AsyncNettyRequestProc
switch (request.getCode()) {
case RequestCode.SEND_BATCH_MESSAGE:
case RequestCode.SEND_MESSAGE_V2:
- requestHeaderV2 = decodeSendMessageHeaderV2(request);
+ requestHeaderV2 =
+ (SendMessageRequestHeaderV2) request
+ .decodeCommandCustomHeader(SendMessageRequestHeaderV2.class);
case RequestCode.SEND_MESSAGE:
if (null == requestHeaderV2) {
requestHeader =
@@ -303,79 +305,6 @@ public abstract class AbstractSendMessageProcessor extends AsyncNettyRequestProc
return requestHeader;
}
- static SendMessageRequestHeaderV2 decodeSendMessageHeaderV2(RemotingCommand request)
- throws RemotingCommandException {
- SendMessageRequestHeaderV2 r = new SendMessageRequestHeaderV2();
- HashMap<String, String> fields = request.getExtFields();
- if (fields == null) {
- throw new RemotingCommandException("the ext fields is null");
- }
-
- String s = fields.get("a");
- checkNotNull(s, "the custom field <a> is null");
- r.setA(s);
-
- s = fields.get("b");
- checkNotNull(s, "the custom field <b> is null");
- r.setB(s);
-
- s = fields.get("c");
- checkNotNull(s, "the custom field <c> is null");
- r.setC(s);
-
- s = fields.get("d");
- checkNotNull(s, "the custom field <d> is null");
- r.setD(Integer.parseInt(s));
-
- s = fields.get("e");
- checkNotNull(s, "the custom field <e> is null");
- r.setE(Integer.parseInt(s));
-
- s = fields.get("f");
- checkNotNull(s, "the custom field <f> is null");
- r.setF(Integer.parseInt(s));
-
- s = fields.get("g");
- checkNotNull(s, "the custom field <g> is null");
- r.setG(Long.parseLong(s));
-
- s = fields.get("h");
- checkNotNull(s, "the custom field <h> is null");
- r.setH(Integer.parseInt(s));
-
- s = fields.get("i");
- if (s != null) {
- r.setI(s);
- }
-
- s = fields.get("j");
- if (s != null) {
- r.setJ(Integer.parseInt(s));
- }
-
- s = fields.get("k");
- if (s != null) {
- r.setK(Boolean.parseBoolean(s));
- }
-
- s = fields.get("l");
- if (s != null) {
- r.setL(Integer.parseInt(s));
- }
-
- s = fields.get("m");
- if (s != null) {
- r.setM(Boolean.parseBoolean(s));
- }
- return r;
- }
-
- private static void checkNotNull(String s, String msg) throws RemotingCommandException {
- if (s == null) {
- throw new RemotingCommandException(msg);
- }
- }
-
public void executeSendMessageHookAfter(final RemotingCommand response, final SendMessageContext context) {
if (hasSendMessageHook()) {
for (SendMessageHook hook : this.sendMessageHookList) {
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java
index 106e89e..adc32df 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java
@@ -20,12 +20,15 @@
*/
package org.apache.rocketmq.common.protocol.header;
+import java.util.HashMap;
+
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.annotation.CFNullable;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.protocol.FastCodesHeader;
-public class PullMessageRequestHeader implements CommandCustomHeader {
+public class PullMessageRequestHeader implements CommandCustomHeader, FastCodesHeader {
@CFNotNull
private String consumerGroup;
@CFNotNull
@@ -52,6 +55,49 @@ public class PullMessageRequestHeader implements CommandCustomHeader {
public void checkFields() throws RemotingCommandException {
}
+ @Override
+ public void decode(HashMap<String, String> fields) throws RemotingCommandException {
+ String str = fields.get("consumerGroup");
+ checkNotNull(str, "the custom field <consumerGroup> is null");
+ this.consumerGroup = str;
+
+ str = fields.get("topic");
+ checkNotNull(str, "the custom field <topic> is null");
+ this.topic = str;
+
+ str = fields.get("queueId");
+ checkNotNull(str, "the custom field <queueId> is null");
+ this.queueId = Integer.parseInt(str);
+
+ str = fields.get("queueOffset");
+ checkNotNull(str, "the custom field <queueOffset> is null");
+ this.queueOffset = Long.parseLong(str);
+
+ str = fields.get("maxMsgNums");
+ checkNotNull(str, "the custom field <maxMsgNums> is null");
+ this.maxMsgNums = Integer.parseInt(str);
+
+ str = fields.get("sysFlag");
+ checkNotNull(str, "the custom field <sysFlag> is null");
+ this.sysFlag = Integer.parseInt(str);
+
+ str = fields.get("commitOffset");
+ checkNotNull(str, "the custom field <commitOffset> is null");
+ this.commitOffset = Long.parseLong(str);
+
+ str = fields.get("suspendTimeoutMillis");
+ checkNotNull(str, "the custom field <suspendTimeoutMillis> is null");
+ this.suspendTimeoutMillis = Long.parseLong(str);
+
+ this.subscription = fields.get("subscription");;
+
+ str = fields.get("subVersion");
+ checkNotNull(str, "the custom field <subVersion> is null");
+ this.subVersion = Long.parseLong(str);
+
+ this.expressionType = fields.get("expressionType");
+ }
+
public String getConsumerGroup() {
return consumerGroup;
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageResponseHeader.java
index 0112f7d..db7f24b 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageResponseHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageResponseHeader.java
@@ -20,11 +20,14 @@
*/
package org.apache.rocketmq.common.protocol.header;
+import java.util.HashMap;
+
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.protocol.FastCodesHeader;
-public class PullMessageResponseHeader implements CommandCustomHeader {
+public class PullMessageResponseHeader implements CommandCustomHeader, FastCodesHeader {
@CFNotNull
private Long suggestWhichBrokerId;
@CFNotNull
@@ -38,6 +41,25 @@ public class PullMessageResponseHeader implements CommandCustomHeader {
public void checkFields() throws RemotingCommandException {
}
+ @Override
+ public void decode(HashMap<String, String> fields) throws RemotingCommandException {
+ String str = fields.get("suggestWhichBrokerId");
+ checkNotNull(str, "the custom field <suggestWhichBrokerId> is null");
+ this.suggestWhichBrokerId = Long.parseLong(str);
+
+ str = fields.get("nextBeginOffset");
+ checkNotNull(str, "the custom field <nextBeginOffset> is null");
+ this.nextBeginOffset = Long.parseLong(str);
+
+ str = fields.get("minOffset");
+ checkNotNull(str, "the custom field <minOffset> is null");
+ this.minOffset = Long.parseLong(str);
+
+ str = fields.get("maxOffset");
+ checkNotNull(str, "the custom field <maxOffset> is null");
+ this.maxOffset = Long.parseLong(str);
+ }
+
public Long getNextBeginOffset() {
return nextBeginOffset;
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java
index 4e0098b..498a7fa 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java
@@ -17,6 +17,9 @@
package org.apache.rocketmq.common.protocol.header;
+import java.util.HashMap;
+
+import org.apache.rocketmq.remoting.protocol.FastCodesHeader;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.annotation.CFNullable;
@@ -25,7 +28,7 @@ import org.apache.rocketmq.remoting.exception.RemotingCommandException;
/**
* Use short variable name to speed up FastJson deserialization process.
*/
-public class SendMessageRequestHeaderV2 implements CommandCustomHeader {
+public class SendMessageRequestHeaderV2 implements CommandCustomHeader, FastCodesHeader {
@CFNotNull
private String a; // producerGroup;
@CFNotNull
@@ -94,6 +97,67 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader {
public void checkFields() throws RemotingCommandException {
}
+ @Override
+ public void decode(HashMap<String, String> fields) throws RemotingCommandException {
+
+ String str = fields.get("a");
+ checkNotNull(str, "the custom field <a> is null");
+ a = str;
+
+ str = fields.get("b");
+ checkNotNull(str, "the custom field <b> is null");
+ b = str;
+
+ str = fields.get("c");
+ checkNotNull(str, "the custom field <c> is null");
+ c = str;
+
+ str = fields.get("d");
+ checkNotNull(str, "the custom field <d> is null");
+ d = Integer.parseInt(str);
+
+ str = fields.get("e");
+ checkNotNull(str, "the custom field <e> is null");
+ e = Integer.parseInt(str);
+
+ str = fields.get("f");
+ checkNotNull(str, "the custom field <f> is null");
+ f = Integer.parseInt(str);
+
+ str = fields.get("g");
+ checkNotNull(str, "the custom field <g> is null");
+ g = Long.parseLong(str);
+
+ str = fields.get("h");
+ checkNotNull(str, "the custom field <h> is null");
+ h = Integer.parseInt(str);
+
+ str = fields.get("i");
+ if (str != null) {
+ i = str;
+ }
+
+ str = fields.get("j");
+ if (str != null) {
+ j = Integer.parseInt(str);
+ }
+
+ str = fields.get("k");
+ if (str != null) {
+ k = Boolean.parseBoolean(str);
+ }
+
+ str = fields.get("l");
+ if (str != null) {
+ l = Integer.parseInt(str);
+ }
+
+ str = fields.get("m");
+ if (str != null) {
+ m = Boolean.parseBoolean(str);
+ }
+ }
+
public String getA() {
return a;
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageResponseHeader.java
index 6834881..9d8786f 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageResponseHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageResponseHeader.java
@@ -20,11 +20,14 @@
*/
package org.apache.rocketmq.common.protocol.header;
+import java.util.HashMap;
+
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.protocol.FastCodesHeader;
-public class SendMessageResponseHeader implements CommandCustomHeader {
+public class SendMessageResponseHeader implements CommandCustomHeader, FastCodesHeader {
@CFNotNull
private String msgId;
@CFNotNull
@@ -37,6 +40,24 @@ public class SendMessageResponseHeader implements CommandCustomHeader {
public void checkFields() throws RemotingCommandException {
}
+ @Override
+ public void decode(HashMap<String, String> fields) throws RemotingCommandException {
+ String str = fields.get("msgId");
+ checkNotNull(str, "the custom field <msgId> is null");
+ this.msgId = str;
+
+ str = fields.get("queueId");
+ checkNotNull(str, "the custom field <queueId> is null");
+ this.queueId = Integer.parseInt(str);
+
+ str = fields.get("queueOffset");
+ checkNotNull(str, "the custom field <queueOffset> is null");
+ this.queueOffset = Long.parseLong(str);
+
+ str = fields.get("transactionId");
+ this.transactionId = str;
+ }
+
public String getMsgId() {
return msgId;
}
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessorTest.java b/common/src/test/java/org/apache/rocketmq/common/protocol/header/FastCodesHeaderTest.java
similarity index 73%
rename from broker/src/test/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessorTest.java
rename to common/src/test/java/org/apache/rocketmq/common/protocol/header/FastCodesHeaderTest.java
index da2611b..9e28aa9 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessorTest.java
+++ b/common/src/test/java/org/apache/rocketmq/common/protocol/header/FastCodesHeaderTest.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.broker.processor;
+package org.apache.rocketmq.common.protocol.header;
import java.lang.reflect.Field;
import java.util.ArrayList;
@@ -24,14 +24,24 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2;
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.protocol.FastCodesHeader;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.junit.Assert;
import org.junit.Test;
-public class AbstractSendMessageProcessorTest {
+public class FastCodesHeaderTest {
+
@Test
- public void testDecodeSendMessageHeaderV2() throws Exception {
- Field[] declaredFields = SendMessageRequestHeaderV2.class.getDeclaredFields();
+ public void testFastDecode() throws Exception {
+ testFastDecode(SendMessageRequestHeaderV2.class);
+ testFastDecode(SendMessageResponseHeader.class);
+ testFastDecode(PullMessageRequestHeader.class);
+ testFastDecode(PullMessageResponseHeader.class);
+ }
+
+ private void testFastDecode(Class<? extends CommandCustomHeader> classHeader) throws Exception {
+ Field[] declaredFields = classHeader.getDeclaredFields();
List<Field> declaredFieldsList = new ArrayList<>();
for (Field f : declaredFields) {
if (f.getName().startsWith("$")) {
@@ -43,7 +53,7 @@ public class AbstractSendMessageProcessorTest {
RemotingCommand command = RemotingCommand.createRequestCommand(0, null);
HashMap<String, String> m = buildExtFields(declaredFieldsList);
command.setExtFields(m);
- check(command, declaredFieldsList);
+ check(command, declaredFieldsList, classHeader);
}
private HashMap<String, String> buildExtFields(List<Field> fields) {
@@ -65,9 +75,11 @@ public class AbstractSendMessageProcessorTest {
return extFields;
}
- private void check(RemotingCommand command, List<Field> fields) throws Exception {
- SendMessageRequestHeaderV2 o1 = (SendMessageRequestHeaderV2) command.decodeCommandCustomHeader(SendMessageRequestHeaderV2.class);
- SendMessageRequestHeaderV2 o2 = AbstractSendMessageProcessor.decodeSendMessageHeaderV2(command);
+ private void check(RemotingCommand command, List<Field> fields,
+ Class<? extends CommandCustomHeader> classHeader) throws Exception {
+ CommandCustomHeader o1 = command.decodeCommandCustomHeader(classHeader);
+ CommandCustomHeader o2 = classHeader.newInstance();
+ ((FastCodesHeader)o2).decode(command.getExtFields());
for (Field f : fields) {
Object value1 = f.get(o1);
Object value2 = f.get(o2);
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/FastCodesHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/FastCodesHeader.java
new file mode 100644
index 0000000..4604ae1
--- /dev/null
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/FastCodesHeader.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.protocol;
+
+import java.util.HashMap;
+
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+public interface FastCodesHeader {
+ default void checkNotNull(String s, String msg) throws RemotingCommandException {
+ if (s == null) {
+ throw new RemotingCommandException(msg);
+ }
+ }
+
+ void decode(HashMap<String, String> fields) throws RemotingCommandException;
+
+
+}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
index 51b6194..912eea5 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
@@ -243,6 +243,11 @@ public class RemotingCommand {
}
if (this.extFields != null) {
+ if (objectHeader instanceof FastCodesHeader) {
+ ((FastCodesHeader) objectHeader).decode(this.extFields);
+ objectHeader.checkFields();
+ return objectHeader;
+ }
Field[] fields = getClazzFields(classHeader);
for (Field field : fields) {