You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by hu...@apache.org on 2021/12/01 10:37:54 UTC

[rocketmq] 03/11: 优化发送、消费的解码速度

This is an automated email from the ASF dual-hosted git repository.

huangli pushed a commit to branch 4.9.2_dev_community
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit f5e0151fc4ce725317834525fc5a91a310c9aafb
Author: huangli <ar...@gmail.com>
AuthorDate: Thu Nov 4 16:46:12 2021 +0800

    优化发送、消费的解码速度
---
 .../processor/AbstractSendMessageProcessor.java    | 77 +---------------------
 .../protocol/header/PullMessageRequestHeader.java  | 48 +++++++++++++-
 .../protocol/header/PullMessageResponseHeader.java | 24 ++++++-
 .../header/SendMessageRequestHeaderV2.java         | 66 ++++++++++++++++++-
 .../protocol/header/SendMessageResponseHeader.java | 23 ++++++-
 .../protocol/header/FastCodesHeaderTest.java       | 28 +++++---
 .../remoting/protocol/FastCodesHeader.java         | 34 ++++++++++
 .../remoting/protocol/RemotingCommand.java         |  5 ++
 8 files changed, 219 insertions(+), 86 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
index 3303d70..66480ad 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
@@ -288,7 +288,9 @@ public abstract class AbstractSendMessageProcessor extends AsyncNettyRequestProc
         switch (request.getCode()) {
             case RequestCode.SEND_BATCH_MESSAGE:
             case RequestCode.SEND_MESSAGE_V2:
-                requestHeaderV2 = decodeSendMessageHeaderV2(request);
+                requestHeaderV2 =
+                        (SendMessageRequestHeaderV2) request
+                                .decodeCommandCustomHeader(SendMessageRequestHeaderV2.class);
             case RequestCode.SEND_MESSAGE:
                 if (null == requestHeaderV2) {
                     requestHeader =
@@ -303,79 +305,6 @@ public abstract class AbstractSendMessageProcessor extends AsyncNettyRequestProc
         return requestHeader;
     }
 
-    static SendMessageRequestHeaderV2 decodeSendMessageHeaderV2(RemotingCommand request)
-            throws RemotingCommandException {
-        SendMessageRequestHeaderV2 r = new SendMessageRequestHeaderV2();
-        HashMap<String, String> fields = request.getExtFields();
-        if (fields == null) {
-            throw new RemotingCommandException("the ext fields is null");
-        }
-
-        String s = fields.get("a");
-        checkNotNull(s, "the custom field <a> is null");
-        r.setA(s);
-
-        s = fields.get("b");
-        checkNotNull(s, "the custom field <b> is null");
-        r.setB(s);
-
-        s = fields.get("c");
-        checkNotNull(s, "the custom field <c> is null");
-        r.setC(s);
-
-        s = fields.get("d");
-        checkNotNull(s, "the custom field <d> is null");
-        r.setD(Integer.parseInt(s));
-
-        s = fields.get("e");
-        checkNotNull(s, "the custom field <e> is null");
-        r.setE(Integer.parseInt(s));
-
-        s = fields.get("f");
-        checkNotNull(s, "the custom field <f> is null");
-        r.setF(Integer.parseInt(s));
-
-        s = fields.get("g");
-        checkNotNull(s, "the custom field <g> is null");
-        r.setG(Long.parseLong(s));
-
-        s = fields.get("h");
-        checkNotNull(s, "the custom field <h> is null");
-        r.setH(Integer.parseInt(s));
-
-        s = fields.get("i");
-        if (s != null) {
-            r.setI(s);
-        }
-
-        s = fields.get("j");
-        if (s != null) {
-            r.setJ(Integer.parseInt(s));
-        }
-
-        s = fields.get("k");
-        if (s != null) {
-            r.setK(Boolean.parseBoolean(s));
-        }
-
-        s = fields.get("l");
-        if (s != null) {
-            r.setL(Integer.parseInt(s));
-        }
-
-        s = fields.get("m");
-        if (s != null) {
-            r.setM(Boolean.parseBoolean(s));
-        }
-        return r;
-    }
-
-    private static void checkNotNull(String s, String msg) throws RemotingCommandException {
-        if (s == null) {
-            throw new RemotingCommandException(msg);
-        }
-    }
-
     public void executeSendMessageHookAfter(final RemotingCommand response, final SendMessageContext context) {
         if (hasSendMessageHook()) {
             for (SendMessageHook hook : this.sendMessageHookList) {
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java
index 106e89e..adc32df 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java
@@ -20,12 +20,15 @@
  */
 package org.apache.rocketmq.common.protocol.header;
 
+import java.util.HashMap;
+
 import org.apache.rocketmq.remoting.CommandCustomHeader;
 import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.annotation.CFNullable;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.protocol.FastCodesHeader;
 
-public class PullMessageRequestHeader implements CommandCustomHeader {
+public class PullMessageRequestHeader implements CommandCustomHeader, FastCodesHeader {
     @CFNotNull
     private String consumerGroup;
     @CFNotNull
@@ -52,6 +55,49 @@ public class PullMessageRequestHeader implements CommandCustomHeader {
     public void checkFields() throws RemotingCommandException {
     }
 
+    @Override
+    public void decode(HashMap<String, String> fields) throws RemotingCommandException {
+        String str = fields.get("consumerGroup");
+        checkNotNull(str, "the custom field <consumerGroup> is null");
+        this.consumerGroup = str;
+
+        str = fields.get("topic");
+        checkNotNull(str, "the custom field <topic> is null");
+        this.topic = str;
+
+        str = fields.get("queueId");
+        checkNotNull(str, "the custom field <queueId> is null");
+        this.queueId = Integer.parseInt(str);
+
+        str = fields.get("queueOffset");
+        checkNotNull(str, "the custom field <queueOffset> is null");
+        this.queueOffset = Long.parseLong(str);
+
+        str = fields.get("maxMsgNums");
+        checkNotNull(str, "the custom field <maxMsgNums> is null");
+        this.maxMsgNums = Integer.parseInt(str);
+
+        str = fields.get("sysFlag");
+        checkNotNull(str, "the custom field <sysFlag> is null");
+        this.sysFlag = Integer.parseInt(str);
+
+        str = fields.get("commitOffset");
+        checkNotNull(str, "the custom field <commitOffset> is null");
+        this.commitOffset = Long.parseLong(str);
+
+        str = fields.get("suspendTimeoutMillis");
+        checkNotNull(str, "the custom field <suspendTimeoutMillis> is null");
+        this.suspendTimeoutMillis = Long.parseLong(str);
+
+        this.subscription = fields.get("subscription");;
+
+        str = fields.get("subVersion");
+        checkNotNull(str, "the custom field <subVersion> is null");
+        this.subVersion = Long.parseLong(str);
+
+        this.expressionType = fields.get("expressionType");
+    }
+
     public String getConsumerGroup() {
         return consumerGroup;
     }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageResponseHeader.java
index 0112f7d..db7f24b 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageResponseHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageResponseHeader.java
@@ -20,11 +20,14 @@
  */
 package org.apache.rocketmq.common.protocol.header;
 
+import java.util.HashMap;
+
 import org.apache.rocketmq.remoting.CommandCustomHeader;
 import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.protocol.FastCodesHeader;
 
-public class PullMessageResponseHeader implements CommandCustomHeader {
+public class PullMessageResponseHeader implements CommandCustomHeader, FastCodesHeader {
     @CFNotNull
     private Long suggestWhichBrokerId;
     @CFNotNull
@@ -38,6 +41,25 @@ public class PullMessageResponseHeader implements CommandCustomHeader {
     public void checkFields() throws RemotingCommandException {
     }
 
+    @Override
+    public void decode(HashMap<String, String> fields) throws RemotingCommandException {
+        String str = fields.get("suggestWhichBrokerId");
+        checkNotNull(str, "the custom field <suggestWhichBrokerId> is null");
+        this.suggestWhichBrokerId = Long.parseLong(str);
+
+        str = fields.get("nextBeginOffset");
+        checkNotNull(str, "the custom field <nextBeginOffset> is null");
+        this.nextBeginOffset = Long.parseLong(str);
+
+        str = fields.get("minOffset");
+        checkNotNull(str, "the custom field <minOffset> is null");
+        this.minOffset = Long.parseLong(str);
+
+        str = fields.get("maxOffset");
+        checkNotNull(str, "the custom field <maxOffset> is null");
+        this.maxOffset = Long.parseLong(str);
+    }
+
     public Long getNextBeginOffset() {
         return nextBeginOffset;
     }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java
index 4e0098b..498a7fa 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java
@@ -17,6 +17,9 @@
 
 package org.apache.rocketmq.common.protocol.header;
 
+import java.util.HashMap;
+
+import org.apache.rocketmq.remoting.protocol.FastCodesHeader;
 import org.apache.rocketmq.remoting.CommandCustomHeader;
 import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.annotation.CFNullable;
@@ -25,7 +28,7 @@ import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 /**
  * Use short variable name to speed up FastJson deserialization process.
  */
-public class SendMessageRequestHeaderV2 implements CommandCustomHeader {
+public class SendMessageRequestHeaderV2 implements CommandCustomHeader, FastCodesHeader {
     @CFNotNull
     private String a; // producerGroup;
     @CFNotNull
@@ -94,6 +97,67 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader {
     public void checkFields() throws RemotingCommandException {
     }
 
+    @Override
+    public void decode(HashMap<String, String> fields) throws RemotingCommandException {
+
+        String str = fields.get("a");
+        checkNotNull(str, "the custom field <a> is null");
+        a = str;
+
+        str = fields.get("b");
+        checkNotNull(str, "the custom field <b> is null");
+        b = str;
+
+        str = fields.get("c");
+        checkNotNull(str, "the custom field <c> is null");
+        c = str;
+
+        str = fields.get("d");
+        checkNotNull(str, "the custom field <d> is null");
+        d = Integer.parseInt(str);
+
+        str = fields.get("e");
+        checkNotNull(str, "the custom field <e> is null");
+        e = Integer.parseInt(str);
+
+        str = fields.get("f");
+        checkNotNull(str, "the custom field <f> is null");
+        f = Integer.parseInt(str);
+
+        str = fields.get("g");
+        checkNotNull(str, "the custom field <g> is null");
+        g = Long.parseLong(str);
+
+        str = fields.get("h");
+        checkNotNull(str, "the custom field <h> is null");
+        h = Integer.parseInt(str);
+
+        str = fields.get("i");
+        if (str != null) {
+            i = str;
+        }
+
+        str = fields.get("j");
+        if (str != null) {
+            j = Integer.parseInt(str);
+        }
+
+        str = fields.get("k");
+        if (str != null) {
+            k = Boolean.parseBoolean(str);
+        }
+
+        str = fields.get("l");
+        if (str != null) {
+            l = Integer.parseInt(str);
+        }
+
+        str = fields.get("m");
+        if (str != null) {
+            m = Boolean.parseBoolean(str);
+        }
+    }
+
     public String getA() {
         return a;
     }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageResponseHeader.java
index 6834881..9d8786f 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageResponseHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageResponseHeader.java
@@ -20,11 +20,14 @@
  */
 package org.apache.rocketmq.common.protocol.header;
 
+import java.util.HashMap;
+
 import org.apache.rocketmq.remoting.CommandCustomHeader;
 import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.protocol.FastCodesHeader;
 
-public class SendMessageResponseHeader implements CommandCustomHeader {
+public class SendMessageResponseHeader implements CommandCustomHeader, FastCodesHeader {
     @CFNotNull
     private String msgId;
     @CFNotNull
@@ -37,6 +40,24 @@ public class SendMessageResponseHeader implements CommandCustomHeader {
     public void checkFields() throws RemotingCommandException {
     }
 
+    @Override
+    public void decode(HashMap<String, String> fields) throws RemotingCommandException {
+        String str = fields.get("msgId");
+        checkNotNull(str, "the custom field <msgId> is null");
+        this.msgId = str;
+
+        str = fields.get("queueId");
+        checkNotNull(str, "the custom field <queueId> is null");
+        this.queueId = Integer.parseInt(str);
+
+        str = fields.get("queueOffset");
+        checkNotNull(str, "the custom field <queueOffset> is null");
+        this.queueOffset = Long.parseLong(str);
+
+        str = fields.get("transactionId");
+        this.transactionId = str;
+    }
+
     public String getMsgId() {
         return msgId;
     }
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessorTest.java b/common/src/test/java/org/apache/rocketmq/common/protocol/header/FastCodesHeaderTest.java
similarity index 73%
rename from broker/src/test/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessorTest.java
rename to common/src/test/java/org/apache/rocketmq/common/protocol/header/FastCodesHeaderTest.java
index da2611b..9e28aa9 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessorTest.java
+++ b/common/src/test/java/org/apache/rocketmq/common/protocol/header/FastCodesHeaderTest.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.broker.processor;
+package org.apache.rocketmq.common.protocol.header;
 
 import java.lang.reflect.Field;
 import java.util.ArrayList;
@@ -24,14 +24,24 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2;
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.protocol.FastCodesHeader;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.junit.Assert;
 import org.junit.Test;
 
-public class AbstractSendMessageProcessorTest {
+public class FastCodesHeaderTest {
+
     @Test
-    public void testDecodeSendMessageHeaderV2() throws Exception {
-        Field[] declaredFields = SendMessageRequestHeaderV2.class.getDeclaredFields();
+    public void testFastDecode() throws Exception {
+        testFastDecode(SendMessageRequestHeaderV2.class);
+        testFastDecode(SendMessageResponseHeader.class);
+        testFastDecode(PullMessageRequestHeader.class);
+        testFastDecode(PullMessageResponseHeader.class);
+    }
+
+    private void testFastDecode(Class<? extends CommandCustomHeader> classHeader) throws Exception {
+        Field[] declaredFields = classHeader.getDeclaredFields();
         List<Field> declaredFieldsList = new ArrayList<>();
         for (Field f : declaredFields) {
             if (f.getName().startsWith("$")) {
@@ -43,7 +53,7 @@ public class AbstractSendMessageProcessorTest {
         RemotingCommand command = RemotingCommand.createRequestCommand(0, null);
         HashMap<String, String> m = buildExtFields(declaredFieldsList);
         command.setExtFields(m);
-        check(command, declaredFieldsList);
+        check(command, declaredFieldsList, classHeader);
     }
 
     private HashMap<String, String> buildExtFields(List<Field> fields) {
@@ -65,9 +75,11 @@ public class AbstractSendMessageProcessorTest {
         return extFields;
     }
 
-    private void check(RemotingCommand command, List<Field> fields) throws Exception {
-        SendMessageRequestHeaderV2 o1 = (SendMessageRequestHeaderV2) command.decodeCommandCustomHeader(SendMessageRequestHeaderV2.class);
-        SendMessageRequestHeaderV2 o2 = AbstractSendMessageProcessor.decodeSendMessageHeaderV2(command);
+    private void check(RemotingCommand command, List<Field> fields,
+            Class<? extends CommandCustomHeader> classHeader) throws Exception {
+        CommandCustomHeader o1 = command.decodeCommandCustomHeader(classHeader);
+        CommandCustomHeader o2 = classHeader.newInstance();
+        ((FastCodesHeader)o2).decode(command.getExtFields());
         for (Field f : fields) {
             Object value1 = f.get(o1);
             Object value2 = f.get(o2);
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/FastCodesHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/FastCodesHeader.java
new file mode 100644
index 0000000..4604ae1
--- /dev/null
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/FastCodesHeader.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.protocol;
+
+import java.util.HashMap;
+
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+public interface FastCodesHeader {
+    default void checkNotNull(String s, String msg) throws RemotingCommandException {
+        if (s == null) {
+            throw new RemotingCommandException(msg);
+        }
+    }
+
+    void decode(HashMap<String, String> fields) throws RemotingCommandException;
+
+
+}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
index 51b6194..912eea5 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
@@ -243,6 +243,11 @@ public class RemotingCommand {
         }
 
         if (this.extFields != null) {
+            if (objectHeader instanceof FastCodesHeader) {
+                ((FastCodesHeader) objectHeader).decode(this.extFields);
+                objectHeader.checkFields();
+                return objectHeader;
+            }
 
             Field[] fields = getClazzFields(classHeader);
             for (Field field : fields) {