You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by hu...@apache.org on 2021/12/01 10:37:51 UTC

[rocketmq] branch 4.9.2_dev_community created (now e70d8e4)

This is an automated email from the ASF dual-hosted git repository.

huangli pushed a change to branch 4.9.2_dev_community
in repository https://gitbox.apache.org/repos/asf/rocketmq.git.


      at e70d8e4  优化Topic和Group检查的性能,改变算法消除正则表达式匹配

This branch includes the following new commits:

     new 44a9172  change fixed 500ms timeout for "putMessage not in lock" log message.
     new 38e00c4  消除反向DNS解析
     new f5e0151  优化发送、消费的解码速度
     new f9eb920  混合 topic 的 Batch 生产,一次batch可以发送到多个topic和queue,用于外挂式延迟消息服务
     new 1042880  优化rocketmq编解码实现零拷贝,配合前面的header编解码优化,和fastjson版本相比,在生产者(client)的火焰图中,encode的占比从7.37%下降到1.78%,decode的占比从3.84降低到1.64%
     new f05f416  对parseChannelRemoteAddr的结果进行缓存,这个方法在生产者(client)的火焰图中占比4.84%,优化后几乎消失
     new c7fe273  优化createUniqID使其在生产者(client)火焰图中的占比从2.41%下降到0.42%
     new 2b1955c  优化消除无意义的getNamespace()调用
     new 0bc5c1e  [Issue #3476] Fix last separator of properties string is missing when using batch send.
     new 50c1da3  使slave状态不影响事务消息发送
     new e70d8e4  优化Topic和Group检查的性能,改变算法消除正则表达式匹配

The 11 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[rocketmq] 11/11: 优化Topic和Group检查的性能,改变算法消除正则表达式匹配

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

huangli pushed a commit to branch 4.9.2_dev_community
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit e70d8e46e159c8dc98c92ca8cb1bf0b5d53af08e
Author: sunshuangcheng <sh...@163.com>
AuthorDate: Mon Nov 22 11:25:11 2021 +0800

    优化Topic和Group检查的性能,改变算法消除正则表达式匹配
---
 .../org/apache/rocketmq/client/Validators.java     | 52 ++++++----------------
 .../org/apache/rocketmq/client/ValidatorsTest.java |  2 +-
 .../rocketmq/common/topic/TopicValidator.java      | 45 ++++++++++++++-----
 3 files changed, 47 insertions(+), 52 deletions(-)

diff --git a/client/src/main/java/org/apache/rocketmq/client/Validators.java b/client/src/main/java/org/apache/rocketmq/client/Validators.java
index cf5f078..0db55cc 100644
--- a/client/src/main/java/org/apache/rocketmq/client/Validators.java
+++ b/client/src/main/java/org/apache/rocketmq/client/Validators.java
@@ -17,8 +17,8 @@
 
 package org.apache.rocketmq.client;
 
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
+import static org.apache.rocketmq.common.topic.TopicValidator.isTopicOrGroupIllegal;
+
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.common.UtilAll;
@@ -31,24 +31,10 @@ import org.apache.rocketmq.common.topic.TopicValidator;
  * Common Validator
  */
 public class Validators {
-    public static final String VALID_PATTERN_STR = "^[%|a-zA-Z0-9_-]+$";
-    public static final Pattern PATTERN = Pattern.compile(VALID_PATTERN_STR);
     public static final int CHARACTER_MAX_LENGTH = 255;
     public static final int TOPIC_MAX_LENGTH = 127;
 
     /**
-     * @return The resulting {@code String}
-     */
-    public static String getGroupWithRegularExpression(String origin, String patternStr) {
-        Pattern pattern = Pattern.compile(patternStr);
-        Matcher matcher = pattern.matcher(origin);
-        while (matcher.find()) {
-            return matcher.group(0);
-        }
-        return null;
-    }
-
-    /**
      * Validate group
      */
     public static void checkGroup(String group) throws MQClientException {
@@ -60,27 +46,15 @@ public class Validators {
             throw new MQClientException("the specified group is longer than group max length 255.", null);
         }
 
-        if (!regularExpressionMatcher(group, PATTERN)) {
-            throw new MQClientException(String.format(
-                "the specified group[%s] contains illegal characters, allowing only %s", group,
-                VALID_PATTERN_STR), null);
-        }
-
-    }
 
-    /**
-     * @return <tt>true</tt> if, and only if, the entire origin sequence matches this matcher's pattern
-     */
-    public static boolean regularExpressionMatcher(String origin, Pattern pattern) {
-        if (pattern == null) {
-            return true;
+        if (isTopicOrGroupIllegal(group)) {
+            throw new MQClientException(String.format(
+                    "the specified group[%s] contains illegal characters, allowing only %s", group,
+                    "^[%|a-zA-Z0-9_-]+$"), null);
         }
-        Matcher matcher = pattern.matcher(origin);
-        return matcher.matches();
     }
 
-    public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer)
-        throws MQClientException {
+    public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer) throws MQClientException {
         if (null == msg) {
             throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null");
         }
@@ -112,16 +86,16 @@ public class Validators {
             throw new MQClientException("The specified topic is blank", null);
         }
 
-        if (!regularExpressionMatcher(topic, PATTERN)) {
-            throw new MQClientException(String.format(
-                "The specified topic[%s] contains illegal characters, allowing only %s", topic,
-                VALID_PATTERN_STR), null);
-        }
-
         if (topic.length() > TOPIC_MAX_LENGTH) {
             throw new MQClientException(
                 String.format("The specified topic is longer than topic max length %d.", TOPIC_MAX_LENGTH), null);
         }
+
+        if (isTopicOrGroupIllegal(topic)) {
+            throw new MQClientException(String.format(
+                    "The specified topic[%s] contains illegal characters, allowing only %s", topic,
+                    "^[%|a-zA-Z0-9_-]+$"), null);
+        }
     }
 
     public static void isSystemTopic(String topic) throws MQClientException {
diff --git a/client/src/test/java/org/apache/rocketmq/client/ValidatorsTest.java b/client/src/test/java/org/apache/rocketmq/client/ValidatorsTest.java
index 343fe4b..aa448dc 100644
--- a/client/src/test/java/org/apache/rocketmq/client/ValidatorsTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/ValidatorsTest.java
@@ -44,7 +44,7 @@ public class ValidatorsTest {
             Validators.checkTopic(illegalTopic);
             failBecauseExceptionWasNotThrown(MQClientException.class);
         } catch (MQClientException e) {
-            assertThat(e).hasMessageStartingWith(String.format("The specified topic[%s] contains illegal characters, allowing only %s", illegalTopic, Validators.VALID_PATTERN_STR));
+            assertThat(e).hasMessageStartingWith(String.format("The specified topic[%s] contains illegal characters, allowing only %s", illegalTopic, "^[%|a-zA-Z0-9_-]+$"));
         }
     }
 
diff --git a/common/src/main/java/org/apache/rocketmq/common/topic/TopicValidator.java b/common/src/main/java/org/apache/rocketmq/common/topic/TopicValidator.java
index 7b0a839..5e67dc5 100644
--- a/common/src/main/java/org/apache/rocketmq/common/topic/TopicValidator.java
+++ b/common/src/main/java/org/apache/rocketmq/common/topic/TopicValidator.java
@@ -22,8 +22,6 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 
 import java.util.HashSet;
 import java.util.Set;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 
 public class TopicValidator {
 
@@ -38,9 +36,7 @@ public class TopicValidator {
     public static final String RMQ_SYS_OFFSET_MOVED_EVENT = "OFFSET_MOVED_EVENT";
 
     public static final String SYSTEM_TOPIC_PREFIX = "rmq_sys_";
-
-    private static final String VALID_PATTERN_STR = "^[%|a-zA-Z0-9_-]+$";
-    private static final Pattern PATTERN = Pattern.compile(VALID_PATTERN_STR);
+    public static final boolean[] VALID_CHAR_BIT_MAP = new boolean[128];
     private static final int TOPIC_MAX_LENGTH = 127;
 
     private static final Set<String> SYSTEM_TOPIC_SET = new HashSet<String>();
@@ -62,14 +58,39 @@ public class TopicValidator {
         SYSTEM_TOPIC_SET.add(RMQ_SYS_OFFSET_MOVED_EVENT);
 
         NOT_ALLOWED_SEND_TOPIC_SET.add(RMQ_SYS_SCHEDULE_TOPIC);
+
+        // regex:^[%|a-zA-Z0-9_-]+$
+        // %
+        VALID_CHAR_BIT_MAP['%'] = true;
+        // -
+        VALID_CHAR_BIT_MAP['-'] = true;
+        // _
+        VALID_CHAR_BIT_MAP['_'] = true;
+        for (int i = 0; i < VALID_CHAR_BIT_MAP.length; i++) {
+            if (i >= '0' && i <= '9') {
+                // 0-9
+                VALID_CHAR_BIT_MAP[i] = true;
+            } else if (i >= 'A' && i <= 'Z') {
+                // A-Z
+                VALID_CHAR_BIT_MAP[i] = true;
+            } else if (i >= 'a' && i <= 'z') {
+                // a-z
+                VALID_CHAR_BIT_MAP[i] = true;
+            }
+        }
     }
 
-    private static boolean regularExpressionMatcher(String origin, Pattern pattern) {
-        if (pattern == null) {
-            return true;
+    public static boolean isTopicOrGroupIllegal(String str) {
+        int strLen = str.length();
+        int len = VALID_CHAR_BIT_MAP.length;
+        boolean[] bitMap = VALID_CHAR_BIT_MAP;
+        for (int i = 0; i < strLen; i++) {
+            char ch = str.charAt(i);
+            if (ch >= len || !bitMap[ch]) {
+                return true;
+            }
         }
-        Matcher matcher = pattern.matcher(origin);
-        return matcher.matches();
+        return false;
     }
 
     public static boolean validateTopic(String topic, RemotingCommand response) {
@@ -80,9 +101,9 @@ public class TopicValidator {
             return false;
         }
 
-        if (!regularExpressionMatcher(topic, PATTERN)) {
+        if (isTopicOrGroupIllegal(topic)) {
             response.setCode(ResponseCode.SYSTEM_ERROR);
-            response.setRemark("The specified topic contains illegal characters, allowing only " + VALID_PATTERN_STR);
+            response.setRemark("The specified topic contains illegal characters, allowing only ^[%|a-zA-Z0-9_-]+$");
             return false;
         }
 

[rocketmq] 08/11: 优化消除无意义的getNamespace()调用

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

huangli pushed a commit to branch 4.9.2_dev_community
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 2b1955c63be7a83885c534b992b5ca92bde504b0
Author: colin <39...@qq.com>
AuthorDate: Mon Oct 25 16:24:57 2021 +0800

    优化消除无意义的getNamespace()调用
---
 .../src/main/java/org/apache/rocketmq/client/ClientConfig.java | 10 +++++++++-
 1 file changed, 9 insertions(+), 1 deletion(-)

diff --git a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
index 8d7f5a1..4452bbd 100644
--- a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
+++ b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
@@ -39,6 +39,7 @@ public class ClientConfig {
     private String instanceName = System.getProperty("rocketmq.client.name", "DEFAULT");
     private int clientCallbackExecutorThreads = Runtime.getRuntime().availableProcessors();
     protected String namespace;
+    private boolean namespaceInitialized = false;
     protected AccessChannel accessChannel = AccessChannel.LOCAL;
 
     /**
@@ -195,6 +196,7 @@ public class ClientConfig {
      */
     public void setNamesrvAddr(String namesrvAddr) {
         this.namesrvAddr = namesrvAddr;
+        this.namespaceInitialized = false;
     }
 
     public int getClientCallbackExecutorThreads() {
@@ -278,20 +280,26 @@ public class ClientConfig {
     }
 
     public String getNamespace() {
+        if (namespaceInitialized) {
+            return namespace;
+        }
+
         if (StringUtils.isNotEmpty(namespace)) {
             return namespace;
         }
 
         if (StringUtils.isNotEmpty(this.namesrvAddr)) {
             if (NameServerAddressUtils.validateInstanceEndpoint(namesrvAddr)) {
-                return NameServerAddressUtils.parseInstanceIdFromEndpoint(namesrvAddr);
+                namespace = NameServerAddressUtils.parseInstanceIdFromEndpoint(namesrvAddr);
             }
         }
+        namespaceInitialized = true;
         return namespace;
     }
 
     public void setNamespace(String namespace) {
         this.namespace = namespace;
+        this.namespaceInitialized = true;
     }
 
     public AccessChannel getAccessChannel() {

[rocketmq] 10/11: 使slave状态不影响事务消息发送

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

huangli pushed a commit to branch 4.9.2_dev_community
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 50c1da38720b1132d45bfde67d8aa6a05d47bac5
Author: sunshuangcheng <sh...@163.com>
AuthorDate: Mon Nov 22 12:53:46 2021 +0800

    使slave状态不影响事务消息发送
---
 .../apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index 117a8c6..cdb6836 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -1351,6 +1351,8 @@ public class DefaultMQProducerImpl implements MQProducerInner {
         LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
         Throwable localException = null;
         switch (sendResult.getSendStatus()) {
+            case FLUSH_SLAVE_TIMEOUT:
+            case SLAVE_NOT_AVAILABLE:
             case SEND_OK: {
                 try {
                     if (sendResult.getTransactionId() != null) {
@@ -1380,10 +1382,8 @@ public class DefaultMQProducerImpl implements MQProducerInner {
                     localException = e;
                 }
             }
-            break;
+                break;
             case FLUSH_DISK_TIMEOUT:
-            case FLUSH_SLAVE_TIMEOUT:
-            case SLAVE_NOT_AVAILABLE:
                 localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
                 break;
             default:

[rocketmq] 04/11: 混合 topic 的 Batch 生产,一次batch可以发送到多个topic和queue,用于外挂式延迟消息服务

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

huangli pushed a commit to branch 4.9.2_dev_community
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit f9eb9207f1398fbb25275fa81cb6330761b46464
Author: coding <za...@qq.com>
AuthorDate: Fri Nov 5 11:49:11 2021 +0800

    混合 topic 的 Batch 生产,一次batch可以发送到多个topic和queue,用于外挂式延迟消息服务
---
 .reviewboardrc                                     |   4 +
 .../processor/AbstractSendMessageProcessor.java    |  48 +++++++-
 .../broker/processor/SendMessageProcessor.java     |  32 +++++-
 .../org/apache/rocketmq/client/Validators.java     |   9 +-
 .../rocketmq/client/common/ClientErrorCode.java    |   1 +
 .../client/impl/factory/MQClientInstance.java      |   2 +
 .../impl/producer/DefaultMQProducerImpl.java       | 104 ++++++++++++++++-
 .../client/impl/producer/TopicPublishInfo.java     |  33 +++++-
 .../client/producer/DefaultMQProducer.java         |  16 ++-
 .../rocketmq/client/producer/MQProducer.java       |   4 +
 .../java/org/apache/rocketmq/common/MixAll.java    |   2 +
 .../rocketmq/common/message/MessageBatch.java      |  74 +++++++++++-
 .../rocketmq/common/message/MessageDecoder.java    |  24 +++-
 .../rocketmq/common/message/MessageExtBatch.java   |  30 +++++
 .../protocol/header/SendMessageRequestHeader.java  |  20 ++++
 .../header/SendMessageRequestHeaderV2.java         |  36 ++++++
 .../apache/rocketmq/common/MessageBatchTest.java   |  29 ++++-
 .../rocketmq/common/MessageEncodeDecodeTest.java   |   2 +-
 .../java/org/apache/rocketmq/store/CommitLog.java  | 125 +++++++++++++++++++--
 .../apache/rocketmq/store/DefaultMessageStore.java |  15 ++-
 20 files changed, 570 insertions(+), 40 deletions(-)

diff --git a/.reviewboardrc b/.reviewboardrc
new file mode 100644
index 0000000..16bd045
--- /dev/null
+++ b/.reviewboardrc
@@ -0,0 +1,4 @@
+REVIEWBOARD_URL = "http://rb.corp.kuaishou.com/reviewboard/"
+REPOSITORY = "git@git.corp.kuaishou.com:infra/rocketmq.git" 
+BRANCH = "master"
+LAND_DEST_BRANCH = "master"
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
index 66480ad..85cb705 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
@@ -16,10 +16,8 @@
  */
 package org.apache.rocketmq.broker.processor;
 
-import io.netty.channel.ChannelHandlerContext;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
@@ -27,7 +25,6 @@ import java.util.concurrent.ThreadLocalRandom;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.mqtrace.SendMessageContext;
 import org.apache.rocketmq.broker.mqtrace.SendMessageHook;
-import org.apache.rocketmq.common.topic.TopicValidator;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.TopicFilterType;
@@ -46,6 +43,7 @@ import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2;
 import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;
 import org.apache.rocketmq.common.sysflag.MessageSysFlag;
 import org.apache.rocketmq.common.sysflag.TopicSysFlag;
+import org.apache.rocketmq.common.topic.TopicValidator;
 import org.apache.rocketmq.common.utils.ChannelUtil;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
@@ -56,6 +54,8 @@ import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.store.MessageExtBrokerInner;
 
+import io.netty.channel.ChannelHandlerContext;
+
 public abstract class AbstractSendMessageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
     protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
 
@@ -163,8 +163,50 @@ public abstract class AbstractSendMessageProcessor extends AsyncNettyRequestProc
         return response;
     }
 
+    protected RemotingCommand msgCheck(final ChannelHandlerContext ctx, RemotingCommand response, String[] topics, int[] queueIds) {
+        for (int i = 0; i < topics.length; i++) {
+            String topic = topics[i];
+            int queueId = queueIds[i];
+            if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())
+                    && this.brokerController.getTopicConfigManager().isOrderTopic(topic)) {
+                response.setCode(ResponseCode.NO_PERMISSION);
+                response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending message is forbidden");
+                return response;
+            }
+
+            TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
+
+            if (null == topicConfig) {
+                response.setCode(ResponseCode.TOPIC_NOT_EXIST);
+                response.setRemark("topic[" + topic + "] not exist, apply first please!"
+                        + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
+                return response;
+            }
+
+            int idValid = Math.max(topicConfig.getWriteQueueNums(), topicConfig.getReadQueueNums());
+            if (queueId >= idValid) {
+                String errorInfo = String.format("request queueId[%d] is illegal, %s Producer: %s",
+                        queueId,
+                        topicConfig,
+                        RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+
+                log.warn(errorInfo);
+                response.setCode(ResponseCode.SYSTEM_ERROR);
+                response.setRemark(errorInfo);
+
+                return response;
+            }
+        }
+
+        return response;
+    }
+
     protected RemotingCommand msgCheck(final ChannelHandlerContext ctx,
         final SendMessageRequestHeader requestHeader, final RemotingCommand response) {
+        if (requestHeader.isMultiTopic()) {
+            return response;
+        }
+
         if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())
             && this.brokerController.getTopicConfigManager().isOrderTopic(requestHeader.getTopic())) {
             response.setCode(ResponseCode.NO_PERMISSION);
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index b31c71e..994d596 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -17,13 +17,13 @@
 package org.apache.rocketmq.broker.processor;
 
 import java.net.SocketAddress;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ThreadLocalRandom;
 
-import io.netty.channel.ChannelHandlerContext;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext;
 import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook;
@@ -62,6 +62,8 @@ import org.apache.rocketmq.store.config.MessageStoreConfig;
 import org.apache.rocketmq.store.config.StorePathConfigHelper;
 import org.apache.rocketmq.store.stats.BrokerStatsManager;
 
+import io.netty.channel.ChannelHandlerContext;
+
 public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
 
     private List<ConsumeMessageHook> consumeMessageHookList;
@@ -584,6 +586,27 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
             return CompletableFuture.completedFuture(response);
         }
 
+        String[] topics = null;
+        int[] queueIds = null;
+        if (requestHeader.isMultiTopic()) {
+            topics = requestHeader.getTopic().split(MixAll.BATCH_TOPIC_SPLITTER); // decode topics
+            if (requestHeader.getQueueIds() == null) {
+                queueIds = new int[topics.length];
+                for (int i = 0; i < topics.length; i++) {
+                    TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topics[i]);
+                    queueIds[i] = randomQueueId(topicConfig.getWriteQueueNums());
+                }
+            } else {
+                // decode queueIds
+                queueIds = Arrays.stream(requestHeader.getQueueIds().split(MixAll.BATCH_QUEUE_ID_SPLITTER)).mapToInt(Integer::parseInt).toArray();
+            }
+            msgCheck(ctx, response, topics, queueIds);
+            if (response.getCode() != -1) {
+                return CompletableFuture.completedFuture(response);
+            }
+            requestHeader.setTopic(topics[0]);
+        }
+
         int queueIdInt = requestHeader.getQueueId();
         TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
 
@@ -600,6 +623,11 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
         MessageExtBatch messageExtBatch = new MessageExtBatch();
         messageExtBatch.setTopic(requestHeader.getTopic());
         messageExtBatch.setQueueId(queueIdInt);
+        if (requestHeader.isMultiTopic()) {
+            messageExtBatch.setMultiTopic(true);
+            messageExtBatch.setTopics(topics);
+            messageExtBatch.setQueueIds(queueIds);
+        }
 
         int sysFlag = requestHeader.getSysFlag();
         if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) {
@@ -621,8 +649,6 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
         return handlePutMessageResultFuture(putMessageResult, response, request, messageExtBatch, responseHeader, mqtraceContext, ctx, queueIdInt);
     }
 
-
-
     public boolean hasConsumeMessageHook() {
         return consumeMessageHookList != null && !this.consumeMessageHookList.isEmpty();
     }
diff --git a/client/src/main/java/org/apache/rocketmq/client/Validators.java b/client/src/main/java/org/apache/rocketmq/client/Validators.java
index e712e2f..cf5f078 100644
--- a/client/src/main/java/org/apache/rocketmq/client/Validators.java
+++ b/client/src/main/java/org/apache/rocketmq/client/Validators.java
@@ -23,6 +23,7 @@ import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageBatch;
 import org.apache.rocketmq.common.protocol.ResponseCode;
 import org.apache.rocketmq.common.topic.TopicValidator;
 
@@ -84,8 +85,12 @@ public class Validators {
             throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null");
         }
         // topic
-        Validators.checkTopic(msg.getTopic());
-        Validators.isNotAllowedSendTopic(msg.getTopic());
+        if (msg instanceof MessageBatch && ((MessageBatch) msg).isMultiTopic()) {
+            // ignore check
+        } else {
+            Validators.checkTopic(msg.getTopic());
+            Validators.isNotAllowedSendTopic(msg.getTopic());
+        }
 
         // body
         if (null == msg.getBody()) {
diff --git a/client/src/main/java/org/apache/rocketmq/client/common/ClientErrorCode.java b/client/src/main/java/org/apache/rocketmq/client/common/ClientErrorCode.java
index bc03b14..8aa4856 100644
--- a/client/src/main/java/org/apache/rocketmq/client/common/ClientErrorCode.java
+++ b/client/src/main/java/org/apache/rocketmq/client/common/ClientErrorCode.java
@@ -25,4 +25,5 @@ public class ClientErrorCode {
     public static final int NOT_FOUND_TOPIC_EXCEPTION = 10005;
     public static final int REQUEST_TIMEOUT_EXCEPTION = 10006;
     public static final int CREATE_REPLY_MESSAGE_EXCEPTION = 10007;
+    public static final int NOT_FOUND_MULTI_TOPIC_EXCEPTION = 10008;
 }
\ No newline at end of file
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index 9651943..7db4ccf 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -168,6 +168,7 @@ public class MQClientInstance {
                 for (int i = 0; i < nums; i++) {
                     MessageQueue mq = new MessageQueue(topic, item[0], i);
                     info.getMessageQueueList().add(mq);
+                    info.getBrokers().add(mq.getBrokerName());
                 }
             }
 
@@ -196,6 +197,7 @@ public class MQClientInstance {
                     for (int i = 0; i < qd.getWriteQueueNums(); i++) {
                         MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
                         info.getMessageQueueList().add(mq);
+                        info.getBrokers().add(mq.getBrokerName());
                     }
                 }
             }
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index 2a784b5..117a8c6 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -16,12 +16,15 @@
  */
 package org.apache.rocketmq.client.impl.producer;
 
+
 import java.io.IOException;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
@@ -36,9 +39,11 @@ import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+
 import org.apache.rocketmq.client.QueryResult;
 import org.apache.rocketmq.client.Validators;
 import org.apache.rocketmq.client.common.ClientErrorCode;
+import org.apache.rocketmq.client.common.ThreadLocalIndex;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.exception.RequestTimeoutException;
@@ -98,6 +103,7 @@ import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
 public class DefaultMQProducerImpl implements MQProducerInner {
     private final InternalLogger log = ClientLogger.getLog();
     private final Random random = new Random();
+    private final ThreadLocalIndex sendWhichBroker = new ThreadLocalIndex();
     private final DefaultMQProducer defaultMQProducer;
     private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable =
         new ConcurrentHashMap<String, TopicPublishInfo>();
@@ -547,6 +553,52 @@ public class DefaultMQProducerImpl implements MQProducerInner {
 
     }
 
+    public MessageQueue selectOneMessageQueue(MessageBatch message, final Map<String, TopicPublishInfo> tpInfoMap, final String lastBrokerName)
+            throws MQClientException {
+        // select intersection brokerName first.
+        Set<String> sharedBrokers = null;
+        for (TopicPublishInfo tpi : tpInfoMap.values()) {
+            Set<String> brokers = tpi.getBrokers();
+            if (sharedBrokers == null) {
+                sharedBrokers = brokers;
+            } else {
+                sharedBrokers.retainAll(brokers);
+            }
+        }
+
+        if (sharedBrokers == null || sharedBrokers.isEmpty()) {
+            throw new MQClientException(ClientErrorCode.NOT_FOUND_MULTI_TOPIC_EXCEPTION, "multi topic batch route not found, sharedBroker empty");
+        }
+
+        List<String> brokers = new ArrayList<>(sharedBrokers);
+        int index = Math.abs(sendWhichBroker.incrementAndGet()) % brokers.size();
+        if (index < 0) index = 0;
+        String brokerName = brokers.get(index);
+        if (lastBrokerName != null && brokers.size() != 1 && brokerName.equals(lastBrokerName)) {
+            index++;
+            if (index == brokers.size()) {
+                index = 0;
+            }
+            brokerName = brokers.get(index);
+        }
+
+        Map<String, Integer> queueIdMap = new HashMap<>(tpInfoMap.size());
+        String firstTopic = null;
+        for (Map.Entry<String, TopicPublishInfo> entry : tpInfoMap.entrySet()) {
+            MessageQueue mq = entry.getValue().selectOneMessageQueueByBrokerName(brokerName);
+            if (mq == null) {
+                throw new MQClientException(ClientErrorCode.NOT_FOUND_MULTI_TOPIC_EXCEPTION, "multi topic batch route not found");
+            }
+            queueIdMap.put(entry.getKey(), mq.getQueueId());
+            if (firstTopic == null) {
+                firstTopic = entry.getKey();
+            }
+        }
+
+        message.setQueueIdMap(queueIdMap);
+        return new MessageQueue(firstTopic, brokerName, 0); // only brokerName matters.
+    }
+
     public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
         return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);
     }
@@ -576,8 +628,19 @@ public class DefaultMQProducerImpl implements MQProducerInner {
         long beginTimestampFirst = System.currentTimeMillis();
         long beginTimestampPrev = beginTimestampFirst;
         long endTimestamp = beginTimestampFirst;
-        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
-        if (topicPublishInfo != null && topicPublishInfo.ok()) {
+
+        Map<String, TopicPublishInfo> topicPublishInfoMap = null;
+        TopicPublishInfo topicPublishInfo = null;
+        boolean multiTopic = false;
+        if (msg instanceof MessageBatch && ((MessageBatch) msg).isMultiTopic()) {
+            multiTopic = true;
+            MessageBatch messageBatch = (MessageBatch) msg;
+            topicPublishInfoMap = this.tryToFindTopicPublishInfo(messageBatch.getTopicIndexMap().keySet());
+        } else {
+            topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
+        }
+
+        if (multiTopic || (topicPublishInfo != null && topicPublishInfo.ok())) {
             boolean callTimeout = false;
             MessageQueue mq = null;
             Exception exception = null;
@@ -587,7 +650,12 @@ public class DefaultMQProducerImpl implements MQProducerInner {
             String[] brokersSent = new String[timesTotal];
             for (; times < timesTotal; times++) {
                 String lastBrokerName = null == mq ? null : mq.getBrokerName();
-                MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
+                MessageQueue mqSelected;
+                if (multiTopic) {
+                    mqSelected = this.selectOneMessageQueue((MessageBatch) msg, topicPublishInfoMap, lastBrokerName);
+                } else {
+                    mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
+                }
                 if (mqSelected != null) {
                     mq = mqSelected;
                     brokersSent[times] = mq.getBrokerName();
@@ -695,6 +763,19 @@ public class DefaultMQProducerImpl implements MQProducerInner {
             null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
     }
 
+    public Map<String, TopicPublishInfo> tryToFindTopicPublishInfo(final Set<String> topics) throws MQClientException {
+        Map<String, TopicPublishInfo> topicPublishInfoMap = new HashMap<>(topics.size());
+        for (String topic : topics) {
+            TopicPublishInfo topicPublishInfo = tryToFindTopicPublishInfo(topic);
+            if (topicPublishInfo == null || !topicPublishInfo.ok()) {
+                throw new MQClientException("No route info of this topic: " + topic + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
+                        null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
+            }
+            topicPublishInfoMap.put(topic, topicPublishInfo);
+        }
+        return topicPublishInfoMap;
+    }
+
     private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
         TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
         if (null == topicPublishInfo || !topicPublishInfo.ok()) {
@@ -800,6 +881,23 @@ public class DefaultMQProducerImpl implements MQProducerInner {
                 requestHeader.setReconsumeTimes(0);
                 requestHeader.setUnitMode(this.isUnitMode());
                 requestHeader.setBatch(msg instanceof MessageBatch);
+
+                if (msg instanceof MessageBatch && ((MessageBatch) msg).isMultiTopic()) {
+                    Map<String, Integer> queueIdMap = ((MessageBatch) msg).getQueueIdMap();
+                    String topic = msg.getTopic();
+                    StringBuilder sb = new StringBuilder();
+                    int idx = 0;
+                    for (String s : topic.split(MixAll.BATCH_TOPIC_SPLITTER)) {
+                        if (idx != 0) {
+                            sb.append(MixAll.BATCH_QUEUE_ID_SPLITTER);
+                        }
+                        sb.append(queueIdMap.get(s));
+                        idx++;
+                    }
+                    requestHeader.setMultiTopic(true);
+                    requestHeader.setQueueIds(sb.toString());
+                }
+
                 if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                     String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
                     if (reconsumeTimes != null) {
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java
index 2f8337e..29c2b12 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java
@@ -17,7 +17,11 @@
 package org.apache.rocketmq.client.impl.producer;
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
 import org.apache.rocketmq.client.common.ThreadLocalIndex;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.route.QueueData;
@@ -27,6 +31,7 @@ public class TopicPublishInfo {
     private boolean orderTopic = false;
     private boolean haveTopicRouterInfo = false;
     private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
+    private Set<String> brokers = new HashSet<>();
     private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
     private TopicRouteData topicRouteData;
 
@@ -42,6 +47,14 @@ public class TopicPublishInfo {
         return null != this.messageQueueList && !this.messageQueueList.isEmpty();
     }
 
+    public Set<String> getBrokers() {
+        return brokers;
+    }
+
+    public void setBrokers(Set<String> brokers) {
+        this.brokers = brokers;
+    }
+
     public List<MessageQueue> getMessageQueueList() {
         return messageQueueList;
     }
@@ -84,6 +97,24 @@ public class TopicPublishInfo {
         }
     }
 
+    public MessageQueue selectOneMessageQueueByBrokerName(final String brokerName) {
+        if (this.messageQueueList == null) {
+            return null;
+        }
+        List<MessageQueue> messageQueues = this.messageQueueList.stream()
+                .filter(mq -> mq.getBrokerName().equals(brokerName))
+                .collect(Collectors.toList());
+        if (messageQueues.isEmpty()) {
+            return null;
+        }
+
+        int index = this.sendWhichQueue.incrementAndGet();
+        int pos = Math.abs(index) % messageQueues.size();
+        if (pos < 0)
+            pos = 0;
+        return messageQueues.get(pos);
+    }
+
     public MessageQueue selectOneMessageQueue() {
         int index = this.sendWhichQueue.incrementAndGet();
         int pos = Math.abs(index) % this.messageQueueList.size();
@@ -106,7 +137,7 @@ public class TopicPublishInfo {
     @Override
     public String toString() {
         return "TopicPublishInfo [orderTopic=" + orderTopic + ", messageQueueList=" + messageQueueList
-            + ", sendWhichQueue=" + sendWhichQueue + ", haveTopicRouterInfo=" + haveTopicRouterInfo + "]";
+            + ", sendWhichQueue=" + sendWhichQueue + ", haveTopicRouterInfo=" + haveTopicRouterInfo + ", brokers=" +  brokers + "]";
     }
 
     public TopicRouteData getTopicRouteData() {
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
index 9f91b41..92e5a41 100644
--- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
@@ -22,6 +22,7 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.ExecutorService;
+
 import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.QueryResult;
 import org.apache.rocketmq.client.Validators;
@@ -910,6 +911,11 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
     }
 
     @Override
+    public SendResult sendMultiTopicBatch(Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+        return this.defaultMQProducerImpl.send(multiTopicBatch(msgs));
+    }
+
+    @Override
     public SendResult send(Collection<Message> msgs,
         long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
         return this.defaultMQProducerImpl.send(batch(msgs), timeout);
@@ -980,9 +986,13 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
     }
 
     private MessageBatch batch(Collection<Message> msgs) throws MQClientException {
+        return batch(msgs, false);
+    }
+
+    private MessageBatch batch(Collection<Message> msgs, boolean allowMultiTopic) throws MQClientException {
         MessageBatch msgBatch;
         try {
-            msgBatch = MessageBatch.generateFromList(msgs);
+            msgBatch = MessageBatch.generateFromList(msgs, allowMultiTopic);
             for (Message message : msgBatch) {
                 Validators.checkMessage(message, this);
                 MessageClientIDSetter.setUniqID(message);
@@ -996,6 +1006,10 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
         return msgBatch;
     }
 
+    private MessageBatch multiTopicBatch(Collection<Message> msgs) throws MQClientException {
+        return batch(msgs, true);
+    }
+
     public String getProducerGroup() {
         return producerGroup;
     }
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
index f70ddb2..9325a62 100644
--- a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
@@ -91,6 +91,10 @@ public interface MQProducer extends MQAdmin {
     SendResult send(final Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException,
         InterruptedException;
 
+    // for batch msgs with multi topics.
+    SendResult sendMultiTopicBatch(final Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException,
+            InterruptedException;
+
     SendResult send(final Collection<Message> msgs, final long timeout) throws MQClientException,
         RemotingException, MQBrokerException, InterruptedException;
 
diff --git a/common/src/main/java/org/apache/rocketmq/common/MixAll.java b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
index ec1e1f0..1ab67cb 100644
--- a/common/src/main/java/org/apache/rocketmq/common/MixAll.java
+++ b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
@@ -83,6 +83,8 @@ public class MixAll {
     public static final String CID_SYS_RMQ_TRANS = "CID_RMQ_SYS_TRANS";
     public static final String ACL_CONF_TOOLS_FILE = "/conf/tools.yml";
     public static final String REPLY_MESSAGE_FLAG = "reply";
+    public static final String BATCH_TOPIC_SPLITTER = "%%";
+    public static final String BATCH_QUEUE_ID_SPLITTER = ",";
     private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
 
     public static String getWSAddr() {
diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageBatch.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageBatch.java
index a6b801e..9d1e340 100644
--- a/common/src/main/java/org/apache/rocketmq/common/message/MessageBatch.java
+++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageBatch.java
@@ -18,8 +18,13 @@ package org.apache.rocketmq.common.message;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
 import org.apache.rocketmq.common.MixAll;
 
 public class MessageBatch extends Message implements Iterable<Message> {
@@ -27,23 +32,59 @@ public class MessageBatch extends Message implements Iterable<Message> {
     private static final long serialVersionUID = 621335151046335557L;
     private final List<Message> messages;
 
+    private boolean multiTopic = false;
+
+    private Map<String/*topic*/, Integer/*index*/> topicIndexMap;
+    private Map<String/*topic*/, Integer/*queueId*/> queueIdMap;
+
     private MessageBatch(List<Message> messages) {
         this.messages = messages;
     }
 
+    private MessageBatch(List<Message> messages, boolean multiTopic) {
+        this.messages = messages;
+        this.multiTopic = multiTopic;
+    }
+
+    public boolean isMultiTopic() {
+        return multiTopic;
+    }
+
+    public void setTopicIndexMap(Map<String, Integer> topicIndexMap) {
+        this.topicIndexMap = topicIndexMap;
+    }
+
+    public Map<String, Integer> getTopicIndexMap() {
+        return topicIndexMap;
+    }
+
+    public Map<String, Integer> getQueueIdMap() {
+        return queueIdMap;
+    }
+
+    public void setQueueIdMap(Map<String, Integer> queueIdMap) {
+        this.queueIdMap = queueIdMap;
+    }
+
     public byte[] encode() {
-        return MessageDecoder.encodeMessages(messages);
+        if (multiTopic) {
+            return MessageDecoder.encodeMultiTopicMessages(messages, topicIndexMap);
+        } else {
+            return MessageDecoder.encodeMessages(messages);
+        }
     }
 
     public Iterator<Message> iterator() {
         return messages.iterator();
     }
 
-    public static MessageBatch generateFromList(Collection<Message> messages) {
+    public static MessageBatch generateFromList(Collection<Message> messages, boolean allowMultiTopic) {
         assert messages != null;
         assert messages.size() > 0;
         List<Message> messageList = new ArrayList<Message>(messages.size());
         Message first = null;
+        boolean multiTopic = false;
+        Set<String> topics = new HashSet<>();
         for (Message message : messages) {
             if (message.getDelayTimeLevel() > 0) {
                 throw new UnsupportedOperationException("TimeDelayLevel is not supported for batching");
@@ -55,19 +96,40 @@ public class MessageBatch extends Message implements Iterable<Message> {
                 first = message;
             } else {
                 if (!first.getTopic().equals(message.getTopic())) {
-                    throw new UnsupportedOperationException("The topic of the messages in one batch should be the same");
+                    if (!allowMultiTopic) {
+                        throw new UnsupportedOperationException("The topic of the messages in one batch should be the same");
+                    }
+                    multiTopic = true;
                 }
                 if (first.isWaitStoreMsgOK() != message.isWaitStoreMsgOK()) {
                     throw new UnsupportedOperationException("The waitStoreMsgOK of the messages in one batch should the same");
                 }
             }
             messageList.add(message);
+            topics.add(message.getTopic());
         }
-        MessageBatch messageBatch = new MessageBatch(messageList);
 
-        messageBatch.setTopic(first.getTopic());
+        MessageBatch messageBatch = new MessageBatch(messageList, multiTopic);
+
+        if (multiTopic) {
+            Map<String, Integer> topicIndexMap = new HashMap<>(topics.size());
+            int index = 0;
+            StringBuilder sb = new StringBuilder();
+            for (String topic : topics) {
+                if (index != 0) {
+                    sb.append(MixAll.BATCH_TOPIC_SPLITTER);
+                }
+                sb.append(topic);
+                topicIndexMap.put(topic, index);
+                index++;
+            }
+            messageBatch.setTopic(sb.toString());
+            messageBatch.setTopicIndexMap(topicIndexMap);
+        } else {
+            messageBatch.setTopic(first.getTopic());
+        }
         messageBatch.setWaitStoreMsgOK(first.isWaitStoreMsgOK());
+
         return messageBatch;
     }
-
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
index c94700e..f6a9bf9 100644
--- a/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
+++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
@@ -27,6 +27,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.sysflag.MessageSysFlag;
 
@@ -471,7 +472,7 @@ public class MessageDecoder {
         return map;
     }
 
-    public static byte[] encodeMessage(Message message) {
+    public static byte[] encodeMessage(Message message, boolean multiTopic, Map<String, Integer> topicIndex) {
         //only need flag, body, properties
         byte[] body = message.getBody();
         int bodyLen = body.length;
@@ -484,8 +485,9 @@ public class MessageDecoder {
             + 4 // 2 MAGICCOD
             + 4 // 3 BODYCRC
             + 4 // 4 FLAG
-            + 4 + bodyLen // 4 BODY
-            + 2 + propertiesLength;
+            + 4 + bodyLen // 5 BODY
+            + 2 + propertiesLength // 6 PROPERTY
+            + (multiTopic ? 4 : 0); // 7 TOPIC_INDEX
         ByteBuffer byteBuffer = ByteBuffer.allocate(storeSize);
         // 1 TOTALSIZE
         byteBuffer.putInt(storeSize);
@@ -508,6 +510,10 @@ public class MessageDecoder {
         byteBuffer.putShort(propertiesLength);
         byteBuffer.put(propertiesBytes);
 
+        if (multiTopic) {
+            // 7. topic_index.
+            byteBuffer.putInt(topicIndex.get(message.getTopic()));
+        }
         return byteBuffer.array();
     }
 
@@ -542,12 +548,16 @@ public class MessageDecoder {
         return message;
     }
 
-    public static byte[] encodeMessages(List<Message> messages) {
+    public static byte[] encodeMultiTopicMessages(List<Message> messages, Map<String, Integer> map) {
+        return doEncodeMessages(messages, true, map);
+    }
+
+    private static byte[] doEncodeMessages(List<Message> messages, boolean multiTopic, Map<String, Integer> topicIndexMap) {
         //TO DO refactor, accumulate in one buffer, avoid copies
         List<byte[]> encodedMessages = new ArrayList<byte[]>(messages.size());
         int allSize = 0;
         for (Message message : messages) {
-            byte[] tmp = encodeMessage(message);
+            byte[] tmp = encodeMessage(message, multiTopic, topicIndexMap);
             encodedMessages.add(tmp);
             allSize += tmp.length;
         }
@@ -560,6 +570,10 @@ public class MessageDecoder {
         return allBytes;
     }
 
+    public static byte[] encodeMessages(List<Message> messages) {
+        return doEncodeMessages(messages, false, null);
+    }
+
     public static List<Message> decodeMessages(ByteBuffer byteBuffer) throws Exception {
         //TO DO add a callback for processing,  avoid creating lists
         List<Message> msgs = new ArrayList<Message>();
diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBatch.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBatch.java
index a2713cb..21fa505 100644
--- a/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBatch.java
+++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBatch.java
@@ -30,6 +30,12 @@ public class MessageExtBatch extends MessageExt {
 
     private ByteBuffer encodedBuff;
 
+    private boolean multiTopic;
+
+    private String[] topics;
+
+    private int[] queueIds;
+
     public ByteBuffer getEncodedBuff() {
         return encodedBuff;
     }
@@ -37,4 +43,28 @@ public class MessageExtBatch extends MessageExt {
     public void setEncodedBuff(ByteBuffer encodedBuff) {
         this.encodedBuff = encodedBuff;
     }
+
+    public boolean isMultiTopic() {
+        return multiTopic;
+    }
+
+    public void setMultiTopic(boolean multiTopic) {
+        this.multiTopic = multiTopic;
+    }
+
+    public String[] getTopics() {
+        return topics;
+    }
+
+    public void setTopics(String[] topics) {
+        this.topics = topics;
+    }
+
+    public int[] getQueueIds() {
+        return queueIds;
+    }
+
+    public void setQueueIds(int[] queueIds) {
+        this.queueIds = queueIds;
+    }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java
index 2df31e6..fc828d9 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java
@@ -50,6 +50,10 @@ public class SendMessageRequestHeader implements CommandCustomHeader {
     private boolean unitMode = false;
     @CFNullable
     private boolean batch = false;
+    @CFNullable
+    private boolean multiTopic = false;
+    @CFNullable
+    private String queueIds;
     private Integer maxReconsumeTimes;
 
     @Override
@@ -159,4 +163,20 @@ public class SendMessageRequestHeader implements CommandCustomHeader {
     public void setBatch(boolean batch) {
         this.batch = batch;
     }
+
+    public boolean isMultiTopic() {
+        return multiTopic;
+    }
+
+    public void setMultiTopic(boolean multiTopic) {
+        this.multiTopic = multiTopic;
+    }
+
+    public String getQueueIds() {
+        return queueIds;
+    }
+
+    public void setQueueIds(String queueIds) {
+        this.queueIds = queueIds;
+    }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java
index 498a7fa..de26947 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java
@@ -57,6 +57,12 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader, FastCode
     @CFNullable
     private boolean m; //batch
 
+    @CFNullable
+    private boolean n; //multi topic
+
+    @CFNullable
+    private String o; // queueIds
+
     public static SendMessageRequestHeader createSendMessageRequestHeaderV1(final SendMessageRequestHeaderV2 v2) {
         SendMessageRequestHeader v1 = new SendMessageRequestHeader();
         v1.setProducerGroup(v2.a);
@@ -72,6 +78,8 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader, FastCode
         v1.setUnitMode(v2.k);
         v1.setMaxReconsumeTimes(v2.l);
         v1.setBatch(v2.m);
+        v1.setMultiTopic(v2.n);
+        v1.setQueueIds(v2.o);
         return v1;
     }
 
@@ -90,6 +98,8 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader, FastCode
         v2.k = v1.isUnitMode();
         v2.l = v1.getMaxReconsumeTimes();
         v2.m = v1.isBatch();
+        v2.n = v1.isMultiTopic();
+        v2.o = v1.getQueueIds();
         return v2;
     }
 
@@ -156,6 +166,16 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader, FastCode
         if (str != null) {
             m = Boolean.parseBoolean(str);
         }
+
+        str = fields.get("n");
+        if (str != null) {
+            n = Boolean.parseBoolean(str);
+        }
+
+        str = fields.get("o");
+        if (str != null) {
+            o = str;
+        }
     }
 
     public String getA() {
@@ -261,4 +281,20 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader, FastCode
     public void setM(boolean m) {
         this.m = m;
     }
+
+    public boolean isN() {
+        return n;
+    }
+
+    public void setN(boolean n) {
+        this.n = n;
+    }
+
+    public String getO() {
+        return o;
+    }
+
+    public void setO(String o) {
+        this.o = o;
+    }
 }
\ No newline at end of file
diff --git a/common/src/test/java/org/apache/rocketmq/common/MessageBatchTest.java b/common/src/test/java/org/apache/rocketmq/common/MessageBatchTest.java
index f264420..363f576 100644
--- a/common/src/test/java/org/apache/rocketmq/common/MessageBatchTest.java
+++ b/common/src/test/java/org/apache/rocketmq/common/MessageBatchTest.java
@@ -18,9 +18,11 @@
 package org.apache.rocketmq.common;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.common.message.MessageBatch;
+import org.junit.Assert;
 import org.junit.Test;
 
 public class MessageBatchTest {
@@ -38,34 +40,51 @@ public class MessageBatchTest {
     @Test
     public void testGenerate_OK() throws Exception {
         List<Message> messages = generateMessages();
-        MessageBatch.generateFromList(messages);
+        MessageBatch.generateFromList(messages, false);
     }
 
     @Test(expected = UnsupportedOperationException.class)
     public void testGenerate_DiffTopic() throws Exception {
         List<Message> messages = generateMessages();
         messages.get(1).setTopic("topic2");
-        MessageBatch.generateFromList(messages);
+        MessageBatch.generateFromList(messages, false);
     }
 
     @Test(expected = UnsupportedOperationException.class)
     public void testGenerate_DiffWaitOK() throws Exception {
         List<Message> messages = generateMessages();
         messages.get(1).setWaitStoreMsgOK(false);
-        MessageBatch.generateFromList(messages);
+        MessageBatch.generateFromList(messages, false);
     }
 
     @Test(expected = UnsupportedOperationException.class)
     public void testGenerate_Delay() throws Exception {
         List<Message> messages = generateMessages();
         messages.get(1).setDelayTimeLevel(1);
-        MessageBatch.generateFromList(messages);
+        MessageBatch.generateFromList(messages, false);
     }
 
     @Test(expected = UnsupportedOperationException.class)
     public void testGenerate_Retry() throws Exception {
         List<Message> messages = generateMessages();
         messages.get(1).setTopic(MixAll.RETRY_GROUP_TOPIC_PREFIX + "topic");
-        MessageBatch.generateFromList(messages);
+        MessageBatch.generateFromList(messages, false);
+    }
+
+    @Test
+    public void testGenerate_MultiTopic() {
+        List<Message> messages = Arrays.asList(
+                new Message("topicA", "bodyA1".getBytes()),
+                new Message("topicB", "bodyB1".getBytes()),
+                new Message("topicA", "bodyA2".getBytes()),
+                new Message("topicB", "bodyB2".getBytes())
+                );
+
+        MessageBatch messageBatch = MessageBatch.generateFromList(messages, true);
+        Assert.assertEquals(messageBatch.getTopic(), "topicA%%topicB");
+        String[] topics = messageBatch.getTopic().split(MixAll.BATCH_TOPIC_SPLITTER);
+        for (int i = 0; i < topics.length; i++) {
+            Assert.assertEquals((int)messageBatch.getTopicIndexMap().get(topics[i]), i);
+        }
     }
 }
diff --git a/common/src/test/java/org/apache/rocketmq/common/MessageEncodeDecodeTest.java b/common/src/test/java/org/apache/rocketmq/common/MessageEncodeDecodeTest.java
index 42d3909..fe7a939 100644
--- a/common/src/test/java/org/apache/rocketmq/common/MessageEncodeDecodeTest.java
+++ b/common/src/test/java/org/apache/rocketmq/common/MessageEncodeDecodeTest.java
@@ -33,7 +33,7 @@ public class MessageEncodeDecodeTest {
         Message message = new Message("topic", "body".getBytes());
         message.setFlag(12);
         message.putUserProperty("key", "value");
-        byte[] bytes = MessageDecoder.encodeMessage(message);
+        byte[] bytes = MessageDecoder.encodeMessage(message, false, null);
         ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
         buffer.put(bytes);
         buffer.flip();
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index 36db2f5..49ad725 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -1259,16 +1259,13 @@ public class CommitLog {
     class DefaultAppendMessageCallback implements AppendMessageCallback {
         // File at the end of the minimum fixed length empty
         private static final int END_FILE_MIN_BLANK_LENGTH = 4 + 4;
-        private final ByteBuffer msgIdMemory;
-        private final ByteBuffer msgIdV6Memory;
         // Store the message content
         private final ByteBuffer msgStoreItemMemory;
         // The maximum length of the message
         private final int maxMessageSize;
+        private final StringBuilder keyBuilder = new StringBuilder();
 
         DefaultAppendMessageCallback(final int size) {
-            this.msgIdMemory = ByteBuffer.allocate(4 + 4 + 8);
-            this.msgIdV6Memory = ByteBuffer.allocate(16 + 4 + 8);
             this.msgStoreItemMemory = ByteBuffer.allocate(END_FILE_MIN_BLANK_LENGTH);
             this.maxMessageSize = size;
         }
@@ -1370,6 +1367,9 @@ public class CommitLog {
 
         public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
             final MessageExtBatch messageExtBatch, PutMessageContext putMessageContext) {
+            if (messageExtBatch.isMultiTopic()) {
+                return doAppendMultiTopic(fileFromOffset, byteBuffer, maxBlank, messageExtBatch, putMessageContext);
+            }
             byteBuffer.mark();
             //physical offset
             long wroteOffset = fileFromOffset + byteBuffer.position();
@@ -1468,6 +1468,105 @@ public class CommitLog {
             return result;
         }
 
+        private AppendMessageResult doAppendMultiTopic(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
+                                                       final MessageExtBatch messageExtBatch, PutMessageContext putMessageContext) {
+            //physical offset
+            long wroteOffset = fileFromOffset + byteBuffer.position();
+
+            int totalMsgLen = 0;
+            int msgNum = 0;
+            byteBuffer.mark();
+
+            final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
+            ByteBuffer messageByteBuff = messageExtBatch.getEncodedBuff();
+            messageByteBuff.mark();
+
+            int sysFlag = messageExtBatch.getSysFlag();
+            int bornHostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
+            int storeHostLength = (sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
+            Supplier<String> msgIdSupplier = () -> {
+                int msgIdLen = storeHostLength + 8;
+                int batchCount = putMessageContext.getBatchSize();
+                long[] phyPosArray = putMessageContext.getPhyPos();
+                ByteBuffer msgIdBuffer = ByteBuffer.allocate(msgIdLen);
+                MessageExt.socketAddress2ByteBuffer(messageExtBatch.getStoreHost(), msgIdBuffer);
+                msgIdBuffer.clear();
+                StringBuilder buffer = new StringBuilder(batchCount * msgIdLen * 2 + batchCount - 1);
+                for (int i = 0; i < phyPosArray.length; i++) {
+                    msgIdBuffer.putLong(msgIdLen - 8, phyPosArray[i]);
+                    String msgId = UtilAll.bytes2string(msgIdBuffer.array());
+                    if (i != 0) {
+                        buffer.append(',');
+                    }
+                    buffer.append(msgId);
+                }
+                return buffer.toString();
+            };
+
+            if (messageExtBatch.getStoreSize() + END_FILE_MIN_BLANK_LENGTH > maxBlank) {
+                this.msgStoreItemMemory.clear();
+                this.msgStoreItemMemory.putInt(maxBlank);
+                this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
+                messageByteBuff.reset();
+                byteBuffer.reset();
+                byteBuffer.put(this.msgStoreItemMemory.array(), 0, 8);
+                return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgIdSupplier, messageExtBatch.getStoreTimestamp(),
+                        0, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
+            }
+
+            int index = 0;
+            while (messageByteBuff.hasRemaining()) {
+                final int msgPos = messageByteBuff.position();
+                final int msgLen = messageByteBuff.getInt();
+                totalMsgLen += msgLen;
+
+                messageByteBuff.position(msgPos + 12); // move to queueId
+                int queueId = messageByteBuff.getInt();
+                messageByteBuff.position(msgPos + 20); // move to topic_index(queueOffset)
+                int topicIndex = (int)messageByteBuff.getLong();
+                String topic = messageExtBatch.getTopics()[topicIndex];
+
+                // move to add queue offset and commitlog offset
+                int pos = msgPos + 20;
+                messageByteBuff.putLong(pos, getQueueOffset(topic, queueId)); // update queueOffset
+                pos += 8;
+                messageByteBuff.putLong(pos, wroteOffset + totalMsgLen - msgLen); // update commitLog offset
+                // 8 SYSFLAG, 9 BORNTIMESTAMP, 10 BORNHOST, 11 STORETIMESTAMP
+                // refresh store time stamp in lock
+                pos += 8 + 4 + 8 + bornHostLength;
+                messageByteBuff.putLong(pos, messageExtBatch.getStoreTimestamp()); // update store timestamp
+
+                putMessageContext.getPhyPos()[index++] = wroteOffset + totalMsgLen - msgLen;
+                msgNum++;
+                messageByteBuff.position(msgPos + msgLen);
+            }
+
+            messageByteBuff.position(0);
+            messageByteBuff.limit(totalMsgLen);
+            byteBuffer.put(messageByteBuff);
+            messageExtBatch.setEncodedBuff(null);
+            AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, totalMsgLen,
+                    msgIdSupplier, messageExtBatch.getStoreTimestamp(), 0,
+                    CommitLog.this.defaultMessageStore.now() - beginTimeMills);
+            result.setMsgNum(msgNum);
+            return result;
+        }
+
+        private long getQueueOffset(String topic, int queueId) {
+            keyBuilder.setLength(0);
+            keyBuilder.append(topic);
+            keyBuilder.append('-');
+            keyBuilder.append(queueId);
+            String key = keyBuilder.toString();
+            Long queueOffset = CommitLog.this.topicQueueTable.get(key);
+            if (null == queueOffset) {
+                queueOffset = 0L;
+            }
+
+            CommitLog.this.topicQueueTable.put(key, queueOffset + 1);
+            return queueOffset;
+        }
+
         private void resetByteBuffer(final ByteBuffer byteBuffer, final int limit) {
             byteBuffer.flip();
             byteBuffer.limit(limit);
@@ -1608,7 +1707,18 @@ public class CommitLog {
                 int propertiesPos = messagesByteBuff.position();
                 messagesByteBuff.position(propertiesPos + propertiesLen);
 
-                final byte[] topicData = messageExtBatch.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
+                byte[] topicData;
+                int queueId;
+                int index = 0;
+                if (messageExtBatch.isMultiTopic()) {
+                    // 7. index
+                    index = messagesByteBuff.getInt();
+                    topicData = messageExtBatch.getTopics()[index].getBytes(MessageDecoder.CHARSET_UTF8);
+                    queueId = messageExtBatch.getQueueIds()[index];
+                } else {
+                    topicData = messageExtBatch.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
+                    queueId = messageExtBatch.getQueueId();
+                }
 
                 final int topicLength = topicData.length;
 
@@ -1635,11 +1745,11 @@ public class CommitLog {
                 // 3 BODYCRC
                 this.encoderBuffer.putInt(bodyCrc);
                 // 4 QUEUEID
-                this.encoderBuffer.putInt(messageExtBatch.getQueueId());
+                this.encoderBuffer.putInt(queueId);
                 // 5 FLAG
                 this.encoderBuffer.putInt(flag);
                 // 6 QUEUEOFFSET
-                this.encoderBuffer.putLong(0);
+                this.encoderBuffer.putLong(index);
                 // 7 PHYSICALOFFSET
                 this.encoderBuffer.putLong(0);
                 // 8 SYSFLAG
@@ -1677,6 +1787,7 @@ public class CommitLog {
             putMessageContext.setBatchSize(batchSize);
             putMessageContext.setPhyPos(new long[batchSize]);
             encoderBuffer.flip();
+            messageExtBatch.setStoreSize(maxMessageSize);
             return encoderBuffer;
         }
 
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 7f4fcc8..3b788f0 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -375,9 +375,18 @@ public class DefaultMessageStore implements MessageStore {
     }
 
     private PutMessageStatus checkMessages(MessageExtBatch messageExtBatch) {
-        if (messageExtBatch.getTopic().length() > Byte.MAX_VALUE) {
-            log.warn("putMessage message topic length too long " + messageExtBatch.getTopic().length());
-            return PutMessageStatus.MESSAGE_ILLEGAL;
+        if (messageExtBatch.isMultiTopic()) {
+            for (String topic : messageExtBatch.getTopics()) {
+                if (topic.length() > Byte.MAX_VALUE) {
+                    log.warn("putMessage message topic length too long " + topic.length());
+                    return PutMessageStatus.MESSAGE_ILLEGAL;
+                }
+            }
+        } else {
+            if (messageExtBatch.getTopic().length() > Byte.MAX_VALUE) {
+                log.warn("putMessage message topic length too long " + messageExtBatch.getTopic().length());
+                return PutMessageStatus.MESSAGE_ILLEGAL;
+            }
         }
 
         if (messageExtBatch.getBody().length > messageStoreConfig.getMaxMessageSize()) {

[rocketmq] 03/11: 优化发送、消费的解码速度

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

huangli pushed a commit to branch 4.9.2_dev_community
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit f5e0151fc4ce725317834525fc5a91a310c9aafb
Author: huangli <ar...@gmail.com>
AuthorDate: Thu Nov 4 16:46:12 2021 +0800

    优化发送、消费的解码速度
---
 .../processor/AbstractSendMessageProcessor.java    | 77 +---------------------
 .../protocol/header/PullMessageRequestHeader.java  | 48 +++++++++++++-
 .../protocol/header/PullMessageResponseHeader.java | 24 ++++++-
 .../header/SendMessageRequestHeaderV2.java         | 66 ++++++++++++++++++-
 .../protocol/header/SendMessageResponseHeader.java | 23 ++++++-
 .../protocol/header/FastCodesHeaderTest.java       | 28 +++++---
 .../remoting/protocol/FastCodesHeader.java         | 34 ++++++++++
 .../remoting/protocol/RemotingCommand.java         |  5 ++
 8 files changed, 219 insertions(+), 86 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
index 3303d70..66480ad 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
@@ -288,7 +288,9 @@ public abstract class AbstractSendMessageProcessor extends AsyncNettyRequestProc
         switch (request.getCode()) {
             case RequestCode.SEND_BATCH_MESSAGE:
             case RequestCode.SEND_MESSAGE_V2:
-                requestHeaderV2 = decodeSendMessageHeaderV2(request);
+                requestHeaderV2 =
+                        (SendMessageRequestHeaderV2) request
+                                .decodeCommandCustomHeader(SendMessageRequestHeaderV2.class);
             case RequestCode.SEND_MESSAGE:
                 if (null == requestHeaderV2) {
                     requestHeader =
@@ -303,79 +305,6 @@ public abstract class AbstractSendMessageProcessor extends AsyncNettyRequestProc
         return requestHeader;
     }
 
-    static SendMessageRequestHeaderV2 decodeSendMessageHeaderV2(RemotingCommand request)
-            throws RemotingCommandException {
-        SendMessageRequestHeaderV2 r = new SendMessageRequestHeaderV2();
-        HashMap<String, String> fields = request.getExtFields();
-        if (fields == null) {
-            throw new RemotingCommandException("the ext fields is null");
-        }
-
-        String s = fields.get("a");
-        checkNotNull(s, "the custom field <a> is null");
-        r.setA(s);
-
-        s = fields.get("b");
-        checkNotNull(s, "the custom field <b> is null");
-        r.setB(s);
-
-        s = fields.get("c");
-        checkNotNull(s, "the custom field <c> is null");
-        r.setC(s);
-
-        s = fields.get("d");
-        checkNotNull(s, "the custom field <d> is null");
-        r.setD(Integer.parseInt(s));
-
-        s = fields.get("e");
-        checkNotNull(s, "the custom field <e> is null");
-        r.setE(Integer.parseInt(s));
-
-        s = fields.get("f");
-        checkNotNull(s, "the custom field <f> is null");
-        r.setF(Integer.parseInt(s));
-
-        s = fields.get("g");
-        checkNotNull(s, "the custom field <g> is null");
-        r.setG(Long.parseLong(s));
-
-        s = fields.get("h");
-        checkNotNull(s, "the custom field <h> is null");
-        r.setH(Integer.parseInt(s));
-
-        s = fields.get("i");
-        if (s != null) {
-            r.setI(s);
-        }
-
-        s = fields.get("j");
-        if (s != null) {
-            r.setJ(Integer.parseInt(s));
-        }
-
-        s = fields.get("k");
-        if (s != null) {
-            r.setK(Boolean.parseBoolean(s));
-        }
-
-        s = fields.get("l");
-        if (s != null) {
-            r.setL(Integer.parseInt(s));
-        }
-
-        s = fields.get("m");
-        if (s != null) {
-            r.setM(Boolean.parseBoolean(s));
-        }
-        return r;
-    }
-
-    private static void checkNotNull(String s, String msg) throws RemotingCommandException {
-        if (s == null) {
-            throw new RemotingCommandException(msg);
-        }
-    }
-
     public void executeSendMessageHookAfter(final RemotingCommand response, final SendMessageContext context) {
         if (hasSendMessageHook()) {
             for (SendMessageHook hook : this.sendMessageHookList) {
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java
index 106e89e..adc32df 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java
@@ -20,12 +20,15 @@
  */
 package org.apache.rocketmq.common.protocol.header;
 
+import java.util.HashMap;
+
 import org.apache.rocketmq.remoting.CommandCustomHeader;
 import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.annotation.CFNullable;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.protocol.FastCodesHeader;
 
-public class PullMessageRequestHeader implements CommandCustomHeader {
+public class PullMessageRequestHeader implements CommandCustomHeader, FastCodesHeader {
     @CFNotNull
     private String consumerGroup;
     @CFNotNull
@@ -52,6 +55,49 @@ public class PullMessageRequestHeader implements CommandCustomHeader {
     public void checkFields() throws RemotingCommandException {
     }
 
+    @Override
+    public void decode(HashMap<String, String> fields) throws RemotingCommandException {
+        String str = fields.get("consumerGroup");
+        checkNotNull(str, "the custom field <consumerGroup> is null");
+        this.consumerGroup = str;
+
+        str = fields.get("topic");
+        checkNotNull(str, "the custom field <topic> is null");
+        this.topic = str;
+
+        str = fields.get("queueId");
+        checkNotNull(str, "the custom field <queueId> is null");
+        this.queueId = Integer.parseInt(str);
+
+        str = fields.get("queueOffset");
+        checkNotNull(str, "the custom field <queueOffset> is null");
+        this.queueOffset = Long.parseLong(str);
+
+        str = fields.get("maxMsgNums");
+        checkNotNull(str, "the custom field <maxMsgNums> is null");
+        this.maxMsgNums = Integer.parseInt(str);
+
+        str = fields.get("sysFlag");
+        checkNotNull(str, "the custom field <sysFlag> is null");
+        this.sysFlag = Integer.parseInt(str);
+
+        str = fields.get("commitOffset");
+        checkNotNull(str, "the custom field <commitOffset> is null");
+        this.commitOffset = Long.parseLong(str);
+
+        str = fields.get("suspendTimeoutMillis");
+        checkNotNull(str, "the custom field <suspendTimeoutMillis> is null");
+        this.suspendTimeoutMillis = Long.parseLong(str);
+
+        this.subscription = fields.get("subscription");;
+
+        str = fields.get("subVersion");
+        checkNotNull(str, "the custom field <subVersion> is null");
+        this.subVersion = Long.parseLong(str);
+
+        this.expressionType = fields.get("expressionType");
+    }
+
     public String getConsumerGroup() {
         return consumerGroup;
     }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageResponseHeader.java
index 0112f7d..db7f24b 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageResponseHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageResponseHeader.java
@@ -20,11 +20,14 @@
  */
 package org.apache.rocketmq.common.protocol.header;
 
+import java.util.HashMap;
+
 import org.apache.rocketmq.remoting.CommandCustomHeader;
 import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.protocol.FastCodesHeader;
 
-public class PullMessageResponseHeader implements CommandCustomHeader {
+public class PullMessageResponseHeader implements CommandCustomHeader, FastCodesHeader {
     @CFNotNull
     private Long suggestWhichBrokerId;
     @CFNotNull
@@ -38,6 +41,25 @@ public class PullMessageResponseHeader implements CommandCustomHeader {
     public void checkFields() throws RemotingCommandException {
     }
 
+    @Override
+    public void decode(HashMap<String, String> fields) throws RemotingCommandException {
+        String str = fields.get("suggestWhichBrokerId");
+        checkNotNull(str, "the custom field <suggestWhichBrokerId> is null");
+        this.suggestWhichBrokerId = Long.parseLong(str);
+
+        str = fields.get("nextBeginOffset");
+        checkNotNull(str, "the custom field <nextBeginOffset> is null");
+        this.nextBeginOffset = Long.parseLong(str);
+
+        str = fields.get("minOffset");
+        checkNotNull(str, "the custom field <minOffset> is null");
+        this.minOffset = Long.parseLong(str);
+
+        str = fields.get("maxOffset");
+        checkNotNull(str, "the custom field <maxOffset> is null");
+        this.maxOffset = Long.parseLong(str);
+    }
+
     public Long getNextBeginOffset() {
         return nextBeginOffset;
     }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java
index 4e0098b..498a7fa 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java
@@ -17,6 +17,9 @@
 
 package org.apache.rocketmq.common.protocol.header;
 
+import java.util.HashMap;
+
+import org.apache.rocketmq.remoting.protocol.FastCodesHeader;
 import org.apache.rocketmq.remoting.CommandCustomHeader;
 import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.annotation.CFNullable;
@@ -25,7 +28,7 @@ import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 /**
  * Use short variable name to speed up FastJson deserialization process.
  */
-public class SendMessageRequestHeaderV2 implements CommandCustomHeader {
+public class SendMessageRequestHeaderV2 implements CommandCustomHeader, FastCodesHeader {
     @CFNotNull
     private String a; // producerGroup;
     @CFNotNull
@@ -94,6 +97,67 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader {
     public void checkFields() throws RemotingCommandException {
     }
 
+    @Override
+    public void decode(HashMap<String, String> fields) throws RemotingCommandException {
+
+        String str = fields.get("a");
+        checkNotNull(str, "the custom field <a> is null");
+        a = str;
+
+        str = fields.get("b");
+        checkNotNull(str, "the custom field <b> is null");
+        b = str;
+
+        str = fields.get("c");
+        checkNotNull(str, "the custom field <c> is null");
+        c = str;
+
+        str = fields.get("d");
+        checkNotNull(str, "the custom field <d> is null");
+        d = Integer.parseInt(str);
+
+        str = fields.get("e");
+        checkNotNull(str, "the custom field <e> is null");
+        e = Integer.parseInt(str);
+
+        str = fields.get("f");
+        checkNotNull(str, "the custom field <f> is null");
+        f = Integer.parseInt(str);
+
+        str = fields.get("g");
+        checkNotNull(str, "the custom field <g> is null");
+        g = Long.parseLong(str);
+
+        str = fields.get("h");
+        checkNotNull(str, "the custom field <h> is null");
+        h = Integer.parseInt(str);
+
+        str = fields.get("i");
+        if (str != null) {
+            i = str;
+        }
+
+        str = fields.get("j");
+        if (str != null) {
+            j = Integer.parseInt(str);
+        }
+
+        str = fields.get("k");
+        if (str != null) {
+            k = Boolean.parseBoolean(str);
+        }
+
+        str = fields.get("l");
+        if (str != null) {
+            l = Integer.parseInt(str);
+        }
+
+        str = fields.get("m");
+        if (str != null) {
+            m = Boolean.parseBoolean(str);
+        }
+    }
+
     public String getA() {
         return a;
     }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageResponseHeader.java
index 6834881..9d8786f 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageResponseHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageResponseHeader.java
@@ -20,11 +20,14 @@
  */
 package org.apache.rocketmq.common.protocol.header;
 
+import java.util.HashMap;
+
 import org.apache.rocketmq.remoting.CommandCustomHeader;
 import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.protocol.FastCodesHeader;
 
-public class SendMessageResponseHeader implements CommandCustomHeader {
+public class SendMessageResponseHeader implements CommandCustomHeader, FastCodesHeader {
     @CFNotNull
     private String msgId;
     @CFNotNull
@@ -37,6 +40,24 @@ public class SendMessageResponseHeader implements CommandCustomHeader {
     public void checkFields() throws RemotingCommandException {
     }
 
+    @Override
+    public void decode(HashMap<String, String> fields) throws RemotingCommandException {
+        String str = fields.get("msgId");
+        checkNotNull(str, "the custom field <msgId> is null");
+        this.msgId = str;
+
+        str = fields.get("queueId");
+        checkNotNull(str, "the custom field <queueId> is null");
+        this.queueId = Integer.parseInt(str);
+
+        str = fields.get("queueOffset");
+        checkNotNull(str, "the custom field <queueOffset> is null");
+        this.queueOffset = Long.parseLong(str);
+
+        str = fields.get("transactionId");
+        this.transactionId = str;
+    }
+
     public String getMsgId() {
         return msgId;
     }
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessorTest.java b/common/src/test/java/org/apache/rocketmq/common/protocol/header/FastCodesHeaderTest.java
similarity index 73%
rename from broker/src/test/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessorTest.java
rename to common/src/test/java/org/apache/rocketmq/common/protocol/header/FastCodesHeaderTest.java
index da2611b..9e28aa9 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessorTest.java
+++ b/common/src/test/java/org/apache/rocketmq/common/protocol/header/FastCodesHeaderTest.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.broker.processor;
+package org.apache.rocketmq.common.protocol.header;
 
 import java.lang.reflect.Field;
 import java.util.ArrayList;
@@ -24,14 +24,24 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2;
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.protocol.FastCodesHeader;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.junit.Assert;
 import org.junit.Test;
 
-public class AbstractSendMessageProcessorTest {
+public class FastCodesHeaderTest {
+
     @Test
-    public void testDecodeSendMessageHeaderV2() throws Exception {
-        Field[] declaredFields = SendMessageRequestHeaderV2.class.getDeclaredFields();
+    public void testFastDecode() throws Exception {
+        testFastDecode(SendMessageRequestHeaderV2.class);
+        testFastDecode(SendMessageResponseHeader.class);
+        testFastDecode(PullMessageRequestHeader.class);
+        testFastDecode(PullMessageResponseHeader.class);
+    }
+
+    private void testFastDecode(Class<? extends CommandCustomHeader> classHeader) throws Exception {
+        Field[] declaredFields = classHeader.getDeclaredFields();
         List<Field> declaredFieldsList = new ArrayList<>();
         for (Field f : declaredFields) {
             if (f.getName().startsWith("$")) {
@@ -43,7 +53,7 @@ public class AbstractSendMessageProcessorTest {
         RemotingCommand command = RemotingCommand.createRequestCommand(0, null);
         HashMap<String, String> m = buildExtFields(declaredFieldsList);
         command.setExtFields(m);
-        check(command, declaredFieldsList);
+        check(command, declaredFieldsList, classHeader);
     }
 
     private HashMap<String, String> buildExtFields(List<Field> fields) {
@@ -65,9 +75,11 @@ public class AbstractSendMessageProcessorTest {
         return extFields;
     }
 
-    private void check(RemotingCommand command, List<Field> fields) throws Exception {
-        SendMessageRequestHeaderV2 o1 = (SendMessageRequestHeaderV2) command.decodeCommandCustomHeader(SendMessageRequestHeaderV2.class);
-        SendMessageRequestHeaderV2 o2 = AbstractSendMessageProcessor.decodeSendMessageHeaderV2(command);
+    private void check(RemotingCommand command, List<Field> fields,
+            Class<? extends CommandCustomHeader> classHeader) throws Exception {
+        CommandCustomHeader o1 = command.decodeCommandCustomHeader(classHeader);
+        CommandCustomHeader o2 = classHeader.newInstance();
+        ((FastCodesHeader)o2).decode(command.getExtFields());
         for (Field f : fields) {
             Object value1 = f.get(o1);
             Object value2 = f.get(o2);
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/FastCodesHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/FastCodesHeader.java
new file mode 100644
index 0000000..4604ae1
--- /dev/null
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/FastCodesHeader.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.protocol;
+
+import java.util.HashMap;
+
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+public interface FastCodesHeader {
+    default void checkNotNull(String s, String msg) throws RemotingCommandException {
+        if (s == null) {
+            throw new RemotingCommandException(msg);
+        }
+    }
+
+    void decode(HashMap<String, String> fields) throws RemotingCommandException;
+
+
+}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
index 51b6194..912eea5 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
@@ -243,6 +243,11 @@ public class RemotingCommand {
         }
 
         if (this.extFields != null) {
+            if (objectHeader instanceof FastCodesHeader) {
+                ((FastCodesHeader) objectHeader).decode(this.extFields);
+                objectHeader.checkFields();
+                return objectHeader;
+            }
 
             Field[] fields = getClazzFields(classHeader);
             for (Field field : fields) {

[rocketmq] 09/11: [Issue #3476] Fix last separator of properties string is missing when using batch send.

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

huangli pushed a commit to branch 4.9.2_dev_community
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 0bc5c1e8435e04b9a1807543c1e0c01e55029b31
Author: huangli <ar...@gmail.com>
AuthorDate: Wed Nov 10 20:05:50 2021 +0800

    [Issue #3476] Fix last separator of properties string is missing when using batch send.
    
    This problem introduced since 4.9.1, may cause tag incorrect.
---
 store/src/main/java/org/apache/rocketmq/store/CommitLog.java | 12 +++++++++---
 .../java/org/apache/rocketmq/store/BatchPutMessageTest.java  |  2 +-
 2 files changed, 10 insertions(+), 4 deletions(-)

diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index 49ad725..c01282a 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -1706,6 +1706,8 @@ public class CommitLog {
                 short propertiesLen = messagesByteBuff.getShort();
                 int propertiesPos = messagesByteBuff.position();
                 messagesByteBuff.position(propertiesPos + propertiesLen);
+                boolean needAppendLastPropertySeparator = propertiesLen > 0 && batchPropLen > 0
+                            && messagesByteBuff.get(messagesByteBuff.position() - 1) != MessageDecoder.PROPERTY_SEPARATOR;
 
                 byte[] topicData;
                 int queueId;
@@ -1722,8 +1724,9 @@ public class CommitLog {
 
                 final int topicLength = topicData.length;
 
-                final int msgLen = calMsgLength(messageExtBatch.getSysFlag(), bodyLen, topicLength,
-                        propertiesLen + batchPropLen);
+                int totalPropLen = needAppendLastPropertySeparator ? propertiesLen + batchPropLen + 1
+                                                                     : propertiesLen + batchPropLen;
+                final int msgLen = calMsgLength(messageExtBatch.getSysFlag(), bodyLen, topicLength, totalPropLen);
 
                 // Exceeds the maximum message
                 if (msgLen > this.maxMessageSize) {
@@ -1776,11 +1779,14 @@ public class CommitLog {
                 this.encoderBuffer.put((byte) topicLength);
                 this.encoderBuffer.put(topicData);
                 // 17 PROPERTIES
-                this.encoderBuffer.putShort((short) (propertiesLen + batchPropLen));
+                this.encoderBuffer.putShort((short) totalPropLen);
                 if (propertiesLen > 0) {
                     this.encoderBuffer.put(messagesByteBuff.array(), propertiesPos, propertiesLen);
                 }
                 if (batchPropLen > 0) {
+                    if (needAppendLastPropertySeparator) {
+                        this.encoderBuffer.put((byte) MessageDecoder.PROPERTY_SEPARATOR);
+                    }
                     this.encoderBuffer.put(batchPropData, 0, batchPropLen);
                 }
             }
diff --git a/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java b/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java
index 2c1fd25..3bc52e3 100644
--- a/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java
@@ -105,7 +105,7 @@ public class BatchPutMessageTest {
             short propertiesLength = (short) propertiesBytes.length;
             final byte[] topicData = msg.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
             final int topicLength = topicData.length;
-            msgLengthArr[j] = calMsgLength(msg.getBody().length, topicLength, propertiesLength+batchPropLen) + msgLengthArr[j - 1];
+            msgLengthArr[j] = calMsgLength(msg.getBody().length, topicLength, propertiesLength+batchPropLen+1) + msgLengthArr[j - 1];
             j++;
         }
         byte[] batchMessageBody = MessageDecoder.encodeMessages(messages);

[rocketmq] 06/11: 对parseChannelRemoteAddr的结果进行缓存,这个方法在生产者(client)的火焰图中占比4.84%,优化后几乎消失

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

huangli pushed a commit to branch 4.9.2_dev_community
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit f05f416abda132d75a916504c8a3667908b2b6a3
Author: huangli <ar...@gmail.com>
AuthorDate: Mon Nov 15 00:32:59 2021 +0800

    对parseChannelRemoteAddr的结果进行缓存,这个方法在生产者(client)的火焰图中占比4.84%,优化后几乎消失
---
 .../apache/rocketmq/remoting/common/RemotingHelper.java    | 14 ++++++++++++++
 1 file changed, 14 insertions(+)

diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
index 7dacea9..34a6b36 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
@@ -17,6 +17,9 @@
 package org.apache.rocketmq.remoting.common;
 
 import io.netty.channel.Channel;
+import io.netty.util.Attribute;
+import io.netty.util.AttributeKey;
+
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
@@ -34,6 +37,7 @@ public class RemotingHelper {
     public static final String DEFAULT_CHARSET = "UTF-8";
 
     private static final InternalLogger log = InternalLoggerFactory.getLogger(ROCKETMQ_REMOTING);
+    private static final AttributeKey<String> REMOTE_ADDR_KEY = AttributeKey.valueOf("RemoteAddr");
 
     public static String exceptionSimpleDesc(final Throwable e) {
         StringBuilder sb = new StringBuilder();
@@ -155,6 +159,16 @@ public class RemotingHelper {
         if (null == channel) {
             return "";
         }
+        Attribute<String> att = channel.attr(REMOTE_ADDR_KEY);
+        String addr = att.get();
+        if (addr == null) {
+            addr = parseChannelRemoteAddr0(channel);
+            att.set(addr);
+        }
+        return addr;
+    }
+
+    private static String parseChannelRemoteAddr0(final Channel channel) {
         SocketAddress remote = channel.remoteAddress();
         final String addr = remote != null ? remote.toString() : "";
 

[rocketmq] 01/11: change fixed 500ms timeout for "putMessage not in lock" log message.

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

huangli pushed a commit to branch 4.9.2_dev_community
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 44a9172803191a4ba42936eba2e3fea6d1f4feea
Author: huangli <ar...@gmail.com>
AuthorDate: Wed Oct 21 09:51:24 2020 +0800

    change fixed 500ms timeout for "putMessage not in lock" log message.
---
 store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index ed882aa..7f4fcc8 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -436,7 +436,7 @@ public class DefaultMessageStore implements MessageStore {
 
         putResultFuture.thenAccept((result) -> {
             long elapsedTime = this.getSystemClock().now() - beginTime;
-            if (elapsedTime > 500) {
+            if (elapsedTime > brokerConfig.getWaitTimeMillsInSendQueue()) {
                 log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);
             }
             this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);

[rocketmq] 02/11: 消除反向DNS解析

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

huangli pushed a commit to branch 4.9.2_dev_community
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 38e00c41fc752e614c5638a6ee50ac4985dc3a50
Author: huangli <ar...@gmail.com>
AuthorDate: Tue May 25 14:41:47 2021 +0800

    消除反向DNS解析
---
 .../src/main/java/org/apache/rocketmq/common/message/MessageExt.java  | 4 ++++
 1 file changed, 4 insertions(+)

diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageExt.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageExt.java
index 577c4f4..133cb93 100644
--- a/common/src/main/java/org/apache/rocketmq/common/message/MessageExt.java
+++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageExt.java
@@ -153,6 +153,10 @@ public class MessageExt extends Message {
 
     public String getBornHostNameString() {
         if (null != this.bornHost) {
+            if (bornHost instanceof InetSocketAddress) {
+                // without reverse dns lookup
+                return ((InetSocketAddress) bornHost).getHostString();
+            }
             InetAddress inetAddress = ((InetSocketAddress) this.bornHost).getAddress();
 
             return null != inetAddress ? inetAddress.getHostName() : null;

[rocketmq] 05/11: 优化rocketmq编解码实现零拷贝,配合前面的header编解码优化,和fastjson版本相比,在生产者(client)的火焰图中,encode的占比从7.37%下降到1.78%,decode的占比从3.84降低到1.64%

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

huangli pushed a commit to branch 4.9.2_dev_community
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 10428801df75187ba914e553dd341adf8a1ae6cb
Author: huangli <ar...@gmail.com>
AuthorDate: Fri Nov 5 17:48:40 2021 +0800

    优化rocketmq编解码实现零拷贝,配合前面的header编解码优化,和fastjson版本相比,在生产者(client)的火焰图中,encode的占比从7.37%下降到1.78%,decode的占比从3.84降低到1.64%
---
 .../processor/AbstractSendMessageProcessor.java    |  13 ---
 .../broker/processor/SendMessageProcessor.java     |   4 +-
 .../protocol/header/PullMessageRequestHeader.java  |  17 +++
 .../protocol/header/PullMessageResponseHeader.java |  10 ++
 .../header/SendMessageRequestHeaderV2.java         |  21 ++++
 .../protocol/header/SendMessageResponseHeader.java |  10 ++
 .../rocketmq/example/benchmark/BatchProducer.java  |   3 +
 .../rocketmq/example/benchmark/Consumer.java       |   3 +
 .../rocketmq/example/benchmark/Producer.java       |   3 +
 .../example/benchmark/TransactionProducer.java     |   3 +
 .../rocketmq/remoting/netty/NettyDecoder.java      |   5 +-
 .../rocketmq/remoting/netty/NettyEncoder.java      |   3 +-
 .../remoting/netty/NettyRemotingAbstract.java      |   1 +
 .../remoting/protocol/FastCodesHeader.java         |  11 ++
 .../remoting/protocol/RemotingCommand.java         |  59 ++++++----
 .../remoting/protocol/RocketMQSerializable.java    | 119 ++++++++++++++-------
 .../remoting/protocol/RemotingCommandTest.java     |  15 ++-
 .../protocol/RocketMQSerializableTest.java         |  68 +++++++++++-
 18 files changed, 284 insertions(+), 84 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
index 85cb705..1f0744e 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
@@ -279,19 +279,6 @@ public abstract class AbstractSendMessageProcessor extends AsyncNettyRequestProc
         this.sendMessageHookList = sendMessageHookList;
     }
 
-    protected void doResponse(ChannelHandlerContext ctx, RemotingCommand request,
-        final RemotingCommand response) {
-        if (!request.isOnewayRPC()) {
-            try {
-                ctx.writeAndFlush(response);
-            } catch (Throwable e) {
-                log.error("SendMessageProcessor process request over, but response failed", e);
-                log.error(request.toString());
-                log.error(response.toString());
-            }
-        }
-    }
-
     public void executeSendMessageHookBefore(final ChannelHandlerContext ctx, final RemotingCommand request,
         SendMessageContext context) {
         if (hasSendMessageHook()) {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index 994d596..0499695 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -545,8 +545,6 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
             responseHeader.setQueueId(queueIdInt);
             responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset());
 
-            doResponse(ctx, request, response);
-
             if (hasSendMessageHook()) {
                 sendMessageContext.setMsgId(responseHeader.getMsgId());
                 sendMessageContext.setQueueId(responseHeader.getQueueId());
@@ -561,7 +559,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
                 sendMessageContext.setCommercialSendSize(wroteSize);
                 sendMessageContext.setCommercialOwner(owner);
             }
-            return null;
+            return response;
         } else {
             if (hasSendMessageHook()) {
                 int wroteSize = request.getBody().length;
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java
index adc32df..e351344 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java
@@ -28,6 +28,8 @@ import org.apache.rocketmq.remoting.annotation.CFNullable;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.protocol.FastCodesHeader;
 
+import io.netty.buffer.ByteBuf;
+
 public class PullMessageRequestHeader implements CommandCustomHeader, FastCodesHeader {
     @CFNotNull
     private String consumerGroup;
@@ -56,6 +58,21 @@ public class PullMessageRequestHeader implements CommandCustomHeader, FastCodesH
     }
 
     @Override
+    public void encode(ByteBuf out) {
+        writeIfNotNull(out, "consumerGroup", consumerGroup);
+        writeIfNotNull(out, "topic", topic);
+        writeIfNotNull(out, "queueId", queueId);
+        writeIfNotNull(out, "queueOffset", queueOffset);
+        writeIfNotNull(out, "maxMsgNums", maxMsgNums);
+        writeIfNotNull(out, "sysFlag", sysFlag);
+        writeIfNotNull(out, "commitOffset", commitOffset);
+        writeIfNotNull(out, "suspendTimeoutMillis", suspendTimeoutMillis);
+        writeIfNotNull(out, "subscription", subscription);
+        writeIfNotNull(out, "subVersion", subVersion);
+        writeIfNotNull(out, "expressionType", expressionType);
+    }
+
+    @Override
     public void decode(HashMap<String, String> fields) throws RemotingCommandException {
         String str = fields.get("consumerGroup");
         checkNotNull(str, "the custom field <consumerGroup> is null");
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageResponseHeader.java
index db7f24b..1ac5050 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageResponseHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageResponseHeader.java
@@ -27,6 +27,8 @@ import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.protocol.FastCodesHeader;
 
+import io.netty.buffer.ByteBuf;
+
 public class PullMessageResponseHeader implements CommandCustomHeader, FastCodesHeader {
     @CFNotNull
     private Long suggestWhichBrokerId;
@@ -42,6 +44,14 @@ public class PullMessageResponseHeader implements CommandCustomHeader, FastCodes
     }
 
     @Override
+    public void encode(ByteBuf out) {
+        writeIfNotNull(out, "suggestWhichBrokerId", suggestWhichBrokerId);
+        writeIfNotNull(out, "nextBeginOffset", nextBeginOffset);
+        writeIfNotNull(out, "minOffset", minOffset);
+        writeIfNotNull(out, "maxOffset", maxOffset);
+    }
+
+    @Override
     public void decode(HashMap<String, String> fields) throws RemotingCommandException {
         String str = fields.get("suggestWhichBrokerId");
         checkNotNull(str, "the custom field <suggestWhichBrokerId> is null");
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java
index de26947..f0cd9e5 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java
@@ -25,6 +25,8 @@ import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.annotation.CFNullable;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
+import io.netty.buffer.ByteBuf;
+
 /**
  * Use short variable name to speed up FastJson deserialization process.
  */
@@ -108,6 +110,25 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader, FastCode
     }
 
     @Override
+    public void encode(ByteBuf out) {
+        writeIfNotNull(out, "a", a);
+        writeIfNotNull(out, "b", b);
+        writeIfNotNull(out, "c", c);
+        writeIfNotNull(out, "d", d);
+        writeIfNotNull(out, "e", e);
+        writeIfNotNull(out, "f", f);
+        writeIfNotNull(out, "g", g);
+        writeIfNotNull(out, "h", h);
+        writeIfNotNull(out, "i", i);
+        writeIfNotNull(out, "j", j);
+        writeIfNotNull(out, "k", k);
+        writeIfNotNull(out, "l", l);
+        writeIfNotNull(out, "m", m);
+        writeIfNotNull(out, "n", n);
+        writeIfNotNull(out, "o", o);
+    }
+
+    @Override
     public void decode(HashMap<String, String> fields) throws RemotingCommandException {
 
         String str = fields.get("a");
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageResponseHeader.java
index 9d8786f..cc60e37 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageResponseHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageResponseHeader.java
@@ -27,6 +27,8 @@ import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.protocol.FastCodesHeader;
 
+import io.netty.buffer.ByteBuf;
+
 public class SendMessageResponseHeader implements CommandCustomHeader, FastCodesHeader {
     @CFNotNull
     private String msgId;
@@ -41,6 +43,14 @@ public class SendMessageResponseHeader implements CommandCustomHeader, FastCodes
     }
 
     @Override
+    public void encode(ByteBuf out) {
+        writeIfNotNull(out, "msgId", msgId);
+        writeIfNotNull(out, "queueId", queueId);
+        writeIfNotNull(out, "queueOffset", queueOffset);
+        writeIfNotNull(out, "transactionId", transactionId);
+    }
+
+    @Override
     public void decode(HashMap<String, String> fields) throws RemotingCommandException {
         String str = fields.get("msgId");
         checkNotNull(str, "the custom field <msgId> is null");
diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java
index cf207cd..eadb9c3 100644
--- a/example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java
@@ -46,6 +46,8 @@ import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.protocol.SerializeType;
 import org.apache.rocketmq.srvutil.ServerUtil;
 
 public class BatchProducer {
@@ -53,6 +55,7 @@ public class BatchProducer {
     private static byte[] msgBody;
 
     public static void main(String[] args) throws MQClientException {
+        System.setProperty(RemotingCommand.SERIALIZE_TYPE_PROPERTY, SerializeType.ROCKETMQ.name());
 
         Options options = ServerUtil.buildCommandlineOptions(new Options());
         CommandLine commandLine = ServerUtil.parseCmdLine("benchmarkBatchProducer", args, buildCommandlineOptions(options), new PosixParser());
diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
index d08795d..c9e64f3 100644
--- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
@@ -34,6 +34,8 @@ import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.filter.ExpressionType;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.protocol.SerializeType;
 import org.apache.rocketmq.srvutil.ServerUtil;
 
 import java.io.IOException;
@@ -49,6 +51,7 @@ import java.util.concurrent.atomic.AtomicLong;
 public class Consumer {
 
     public static void main(String[] args) throws MQClientException, IOException {
+        System.setProperty(RemotingCommand.SERIALIZE_TYPE_PROPERTY, SerializeType.ROCKETMQ.name());
         Options options = ServerUtil.buildCommandlineOptions(new Options());
         CommandLine commandLine = ServerUtil.parseCmdLine("benchmarkConsumer", args, buildCommandlineOptions(options), new PosixParser());
         if (null == commandLine) {
diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java
index c32e00e..bdef16e 100644
--- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java
@@ -32,6 +32,8 @@ import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.protocol.SerializeType;
 import org.apache.rocketmq.srvutil.ServerUtil;
 
 import java.util.Arrays;
@@ -50,6 +52,7 @@ public class Producer {
     private static byte[] msgBody;
 
     public static void main(String[] args) throws MQClientException {
+        System.setProperty(RemotingCommand.SERIALIZE_TYPE_PROPERTY, SerializeType.ROCKETMQ.name());
 
         Options options = ServerUtil.buildCommandlineOptions(new Options());
         CommandLine commandLine = ServerUtil.parseCmdLine("benchmarkProducer", args, buildCommandlineOptions(options), new PosixParser());
diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java
index 5e2f287..be5ccf2 100644
--- a/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java
@@ -32,6 +32,8 @@ import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.protocol.SerializeType;
 import org.apache.rocketmq.srvutil.ServerUtil;
 
 import java.io.UnsupportedEncodingException;
@@ -61,6 +63,7 @@ public class TransactionProducer {
     static final int MAX_CHECK_RESULT_IN_MSG = 20;
 
     public static void main(String[] args) throws MQClientException, UnsupportedEncodingException {
+        System.setProperty(RemotingCommand.SERIALIZE_TYPE_PROPERTY, SerializeType.ROCKETMQ.name());
         Options options = ServerUtil.buildCommandlineOptions(new Options());
         CommandLine commandLine = ServerUtil.parseCmdLine("TransactionProducer", args, buildCommandlineOptions(options), new PosixParser());
         TxSendConfig config = new TxSendConfig();
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyDecoder.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyDecoder.java
index f64ab2d..57ee601 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyDecoder.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyDecoder.java
@@ -44,10 +44,7 @@ public class NettyDecoder extends LengthFieldBasedFrameDecoder {
             if (null == frame) {
                 return null;
             }
-
-            ByteBuffer byteBuffer = frame.nioBuffer();
-
-            return RemotingCommand.decode(byteBuffer);
+            return RemotingCommand.decode(frame);
         } catch (Exception e) {
             log.error("decode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
             RemotingUtil.closeChannel(ctx.channel());
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEncoder.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEncoder.java
index 4506e71..7463619 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEncoder.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEncoder.java
@@ -35,8 +35,7 @@ public class NettyEncoder extends MessageToByteEncoder<RemotingCommand> {
     public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out)
         throws Exception {
         try {
-            ByteBuffer header = remotingCommand.encodeHeader();
-            out.writeBytes(header);
+            remotingCommand.fastEncodeHeader(out);
             byte[] body = remotingCommand.getBody();
             if (body != null) {
                 out.writeBytes(body);
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
index b2e7294..eaa2e0d 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
@@ -209,6 +209,7 @@ public abstract class NettyRemotingAbstract {
                                     if (response != null) {
                                         response.setOpaque(opaque);
                                         response.markResponseType();
+                                        response.setSerializeTypeCurrentRPC(cmd.getSerializeTypeCurrentRPC());
                                         try {
                                             ctx.writeAndFlush(response);
                                         } catch (Throwable e) {
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/FastCodesHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/FastCodesHeader.java
index 4604ae1..f313da2 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/FastCodesHeader.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/FastCodesHeader.java
@@ -21,6 +21,8 @@ import java.util.HashMap;
 
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
+import io.netty.buffer.ByteBuf;
+
 public interface FastCodesHeader {
     default void checkNotNull(String s, String msg) throws RemotingCommandException {
         if (s == null) {
@@ -28,6 +30,15 @@ public interface FastCodesHeader {
         }
     }
 
+    default void writeIfNotNull(ByteBuf out, String key, Object value) {
+        if (value != null) {
+            RocketMQSerializable.writeStr(out, true, key);
+            RocketMQSerializable.writeStr(out, false, value.toString());
+        }
+    }
+
+    public void encode(ByteBuf out);
+
     void decode(HashMap<String, String> fields) throws RemotingCommandException;
 
 
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
index 912eea5..d469d10 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
@@ -31,6 +31,9 @@ import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
 public class RemotingCommand {
     public static final String SERIALIZE_TYPE_PROPERTY = "rocketmq.serialize.type";
     public static final String SERIALIZE_TYPE_ENV = "ROCKETMQ_SERIALIZE_TYPE";
@@ -142,20 +145,21 @@ public class RemotingCommand {
     }
 
     public static RemotingCommand decode(final ByteBuffer byteBuffer) {
-        int length = byteBuffer.limit();
-        int oriHeaderLen = byteBuffer.getInt();
-        int headerLength = getHeaderLength(oriHeaderLen);
+        return decode(Unpooled.wrappedBuffer(byteBuffer));
+    }
 
-        byte[] headerData = new byte[headerLength];
-        byteBuffer.get(headerData);
+    public static RemotingCommand decode(final ByteBuf byteBuffer) {
+        int length = byteBuffer.readableBytes();
+        int oriHeaderLen = byteBuffer.readInt();
+        int headerLength = getHeaderLength(oriHeaderLen);
 
-        RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));
+        RemotingCommand cmd = headerDecode(byteBuffer, headerLength, getProtocolType(oriHeaderLen));
 
         int bodyLength = length - 4 - headerLength;
         byte[] bodyData = null;
         if (bodyLength > 0) {
             bodyData = new byte[bodyLength];
-            byteBuffer.get(bodyData);
+            byteBuffer.readBytes(bodyData);
         }
         cmd.body = bodyData;
 
@@ -166,14 +170,16 @@ public class RemotingCommand {
         return length & 0xFFFFFF;
     }
 
-    private static RemotingCommand headerDecode(byte[] headerData, SerializeType type) {
+    private static RemotingCommand headerDecode(ByteBuf byteBuffer, int len, SerializeType type) {
         switch (type) {
             case JSON:
+                byte[] headerData = new byte[len];
+                byteBuffer.readBytes(headerData);
                 RemotingCommand resultJson = RemotingSerializable.decode(headerData, RemotingCommand.class);
                 resultJson.setSerializeTypeCurrentRPC(type);
                 return resultJson;
             case ROCKETMQ:
-                RemotingCommand resultRMQ = RocketMQSerializable.rocketMQProtocolDecode(headerData);
+                RemotingCommand resultRMQ = RocketMQSerializable.rocketMQProtocolDecode(byteBuffer);
                 resultRMQ.setSerializeTypeCurrentRPC(type);
                 return resultRMQ;
             default:
@@ -208,14 +214,8 @@ public class RemotingCommand {
         return true;
     }
 
-    public static byte[] markProtocolType(int source, SerializeType type) {
-        byte[] result = new byte[4];
-
-        result[0] = type.getCode();
-        result[1] = (byte) ((source >> 16) & 0xFF);
-        result[2] = (byte) ((source >> 8) & 0xFF);
-        result[3] = (byte) (source & 0xFF);
-        return result;
+    public static int markProtocolType(int source, SerializeType type) {
+        return (type.getCode() << 24) | (source & 0x00FFFFFF);
     }
 
     public void markResponseType() {
@@ -349,7 +349,7 @@ public class RemotingCommand {
         result.putInt(length);
 
         // header length
-        result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));
+        result.putInt(markProtocolType(headerData.length, serializeTypeCurrentRPC));
 
         // header data
         result.put(headerData);
@@ -401,6 +401,27 @@ public class RemotingCommand {
         }
     }
 
+    public void fastEncodeHeader(ByteBuf out) {
+        int bodySize = this.body != null ? this.body.length : 0;
+        int beginIndex = out.writerIndex();
+        // skip 8 bytes
+        out.writeLong(0);
+        int headerSize;
+        if (SerializeType.ROCKETMQ == serializeTypeCurrentRPC) {
+            if (customHeader != null && !(customHeader instanceof FastCodesHeader)) {
+                this.makeCustomHeaderToNet();
+            }
+            headerSize = RocketMQSerializable.rocketMQProtocolEncode(this, out);
+        } else {
+            this.makeCustomHeaderToNet();
+            byte[] header = RemotingSerializable.encode(this);
+            headerSize = header.length;
+            out.writeBytes(header);
+        }
+        out.setInt(beginIndex, 4 + headerSize + bodySize);
+        out.setInt(beginIndex + 4, markProtocolType(headerSize, serializeTypeCurrentRPC));
+    }
+
     public ByteBuffer encodeHeader() {
         return encodeHeader(this.body != null ? this.body.length : 0);
     }
@@ -424,7 +445,7 @@ public class RemotingCommand {
         result.putInt(length);
 
         // header length
-        result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));
+        result.putInt(markProtocolType(headerData.length, serializeTypeCurrentRPC));
 
         // header data
         result.put(headerData);
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RocketMQSerializable.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RocketMQSerializable.java
index 66119e0..ed8a28f 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RocketMQSerializable.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RocketMQSerializable.java
@@ -18,12 +18,77 @@ package org.apache.rocketmq.remoting.protocol;
 
 import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 
+import io.netty.buffer.ByteBuf;
+
 public class RocketMQSerializable {
-    private static final Charset CHARSET_UTF8 = Charset.forName("UTF-8");
+    private static final Charset CHARSET_UTF8 = StandardCharsets.UTF_8;
+
+    public static void writeStr(ByteBuf buf, boolean useShortLength, String str) {
+        int lenIndex = buf.writerIndex();
+        if (useShortLength) {
+            buf.writeShort(0);
+        } else {
+            buf.writeInt(0);
+        }
+        int len = buf.writeCharSequence(str, StandardCharsets.UTF_8);
+        if (useShortLength) {
+            buf.setShort(lenIndex, len);
+        } else {
+            buf.setInt(lenIndex, len);
+        }
+    }
+
+    public static String readStr(ByteBuf buf, boolean useShortLength) {
+        int len = useShortLength ? buf.readShort() : buf.readInt();
+        if (len == 0) {
+            return null;
+        }
+        CharSequence cs = buf.readCharSequence(len, StandardCharsets.UTF_8);
+        return cs == null ? null : cs.toString();
+    }
+
+    public static int rocketMQProtocolEncode(RemotingCommand cmd, ByteBuf out) {
+        int beginIndex = out.writerIndex();
+        // int code(~32767)
+        out.writeShort(cmd.getCode());
+        // LanguageCode language
+        out.writeByte(cmd.getLanguage().getCode());
+        // int version(~32767)
+        out.writeShort(cmd.getVersion());
+        // int opaque
+        out.writeInt(cmd.getOpaque());
+        // int flag
+        out.writeInt(cmd.getFlag());
+        // String remark
+        String remark = cmd.getRemark();
+        if (remark != null && !remark.isEmpty()) {
+            writeStr(out, false, remark);
+        } else {
+            out.writeInt(0);
+        }
+
+        int mapLenIndex = out.writerIndex();
+        out.writeInt(0);
+        if (cmd.readCustomHeader() instanceof FastCodesHeader) {
+            ((FastCodesHeader) cmd.readCustomHeader()).encode(out);
+        }
+        HashMap<String, String> map = cmd.getExtFields();
+        if (map != null && !map.isEmpty()) {
+            map.forEach((k, v) -> {
+                if (k != null && v != null) {
+                    writeStr(out, true, k);
+                    writeStr(out, false, v);
+                }
+            });
+        }
+        out.setInt(mapLenIndex, out.writerIndex() - mapLenIndex - 4);
+        return out.writerIndex() - beginIndex;
+    }
 
     public static byte[] rocketMQProtocolEncode(RemotingCommand cmd) {
         // String remark
@@ -133,58 +198,38 @@ public class RocketMQSerializable {
         return length;
     }
 
-    public static RemotingCommand rocketMQProtocolDecode(final byte[] headerArray) {
+    public static RemotingCommand rocketMQProtocolDecode(final ByteBuf headerBuffer) {
         RemotingCommand cmd = new RemotingCommand();
-        ByteBuffer headerBuffer = ByteBuffer.wrap(headerArray);
         // int code(~32767)
-        cmd.setCode(headerBuffer.getShort());
+        cmd.setCode(headerBuffer.readShort());
         // LanguageCode language
-        cmd.setLanguage(LanguageCode.valueOf(headerBuffer.get()));
+        cmd.setLanguage(LanguageCode.valueOf(headerBuffer.readByte()));
         // int version(~32767)
-        cmd.setVersion(headerBuffer.getShort());
+        cmd.setVersion(headerBuffer.readShort());
         // int opaque
-        cmd.setOpaque(headerBuffer.getInt());
+        cmd.setOpaque(headerBuffer.readInt());
         // int flag
-        cmd.setFlag(headerBuffer.getInt());
+        cmd.setFlag(headerBuffer.readInt());
         // String remark
-        int remarkLength = headerBuffer.getInt();
-        if (remarkLength > 0) {
-            byte[] remarkContent = new byte[remarkLength];
-            headerBuffer.get(remarkContent);
-            cmd.setRemark(new String(remarkContent, CHARSET_UTF8));
-        }
+        cmd.setRemark(readStr(headerBuffer, false));
 
         // HashMap<String, String> extFields
-        int extFieldsLength = headerBuffer.getInt();
+        int extFieldsLength = headerBuffer.readInt();
         if (extFieldsLength > 0) {
-            byte[] extFieldsBytes = new byte[extFieldsLength];
-            headerBuffer.get(extFieldsBytes);
-            cmd.setExtFields(mapDeserialize(extFieldsBytes));
+            cmd.setExtFields(mapDeserialize(headerBuffer, extFieldsLength));
         }
         return cmd;
     }
 
-    public static HashMap<String, String> mapDeserialize(byte[] bytes) {
-        if (bytes == null || bytes.length <= 0)
-            return null;
-
-        HashMap<String, String> map = new HashMap<String, String>();
-        ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
-
-        short keySize;
-        byte[] keyContent;
-        int valSize;
-        byte[] valContent;
-        while (byteBuffer.hasRemaining()) {
-            keySize = byteBuffer.getShort();
-            keyContent = new byte[keySize];
-            byteBuffer.get(keyContent);
+    public static HashMap<String, String> mapDeserialize(ByteBuf byteBuffer, int len) {
 
-            valSize = byteBuffer.getInt();
-            valContent = new byte[valSize];
-            byteBuffer.get(valContent);
+        HashMap<String, String> map = new HashMap<>();
+        int endIndex = byteBuffer.readerIndex() + len;
 
-            map.put(new String(keyContent, CHARSET_UTF8), new String(valContent, CHARSET_UTF8));
+        while (byteBuffer.readerIndex() < endIndex) {
+            String k = readStr(byteBuffer, true);
+            String v = readStr(byteBuffer, false);
+            map.put(k, v);
         }
         return map;
     }
diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingCommandTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingCommandTest.java
index 2bd41ce..a5d1993 100644
--- a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingCommandTest.java
+++ b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingCommandTest.java
@@ -31,7 +31,13 @@ public class RemotingCommandTest {
     public void testMarkProtocolType_JSONProtocolType() {
         int source = 261;
         SerializeType type = SerializeType.JSON;
-        byte[] result = RemotingCommand.markProtocolType(source, type);
+
+        byte[] result = new byte[4];
+        int x = RemotingCommand.markProtocolType(source, type);
+        result[0] = (byte) (x >> 24);
+        result[1] = (byte) (x >> 16);
+        result[2] = (byte) (x >> 8);
+        result[3] = (byte) x;
         assertThat(result).isEqualTo(new byte[] {0, 0, 1, 5});
     }
 
@@ -39,7 +45,12 @@ public class RemotingCommandTest {
     public void testMarkProtocolType_ROCKETMQProtocolType() {
         int source = 16777215;
         SerializeType type = SerializeType.ROCKETMQ;
-        byte[] result = RemotingCommand.markProtocolType(source, type);
+        byte[] result = new byte[4];
+        int x = RemotingCommand.markProtocolType(source, type);
+        result[0] = (byte) (x >> 24);
+        result[1] = (byte) (x >> 16);
+        result[2] = (byte) (x >> 8);
+        result[3] = (byte) x;
         assertThat(result).isEqualTo(new byte[] {1, -1, -1, -1});
     }
 
diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RocketMQSerializableTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RocketMQSerializableTest.java
index f1db54f..83e3cae 100644
--- a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RocketMQSerializableTest.java
+++ b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RocketMQSerializableTest.java
@@ -17,10 +17,17 @@
 package org.apache.rocketmq.remoting.protocol;
 
 import java.util.HashMap;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.junit.Test;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.Unpooled;
+
 public class RocketMQSerializableTest {
     @Test
     public void testRocketMQProtocolEncodeAndDecode_WithoutRemarkWithoutExtFields() {
@@ -42,7 +49,7 @@ public class RocketMQSerializableTest {
         assertThat(parseToInt(result, 13)).isEqualTo(0); //empty remark
         assertThat(parseToInt(result, 17)).isEqualTo(0); //empty extFields
 
-        RemotingCommand decodedCommand = RocketMQSerializable.rocketMQProtocolDecode(result);
+        RemotingCommand decodedCommand = RocketMQSerializable.rocketMQProtocolDecode(Unpooled.wrappedBuffer(result));
 
         assertThat(decodedCommand.getCode()).isEqualTo(code);
         assertThat(decodedCommand.getLanguage()).isEqualTo(LanguageCode.JAVA);
@@ -80,7 +87,7 @@ public class RocketMQSerializableTest {
 
         assertThat(parseToInt(result, 30)).isEqualTo(0); //empty extFields
 
-        RemotingCommand decodedCommand = RocketMQSerializable.rocketMQProtocolDecode(result);
+        RemotingCommand decodedCommand = RocketMQSerializable.rocketMQProtocolDecode(Unpooled.wrappedBuffer(result));
 
         assertThat(decodedCommand.getCode()).isEqualTo(code);
         assertThat(decodedCommand.getLanguage()).isEqualTo(LanguageCode.JAVA);
@@ -115,10 +122,11 @@ public class RocketMQSerializableTest {
 
         byte[] extFieldsArray = new byte[14];
         System.arraycopy(result, 21, extFieldsArray, 0, 14);
-        HashMap<String, String> extFields = RocketMQSerializable.mapDeserialize(extFieldsArray);
+        HashMap<String, String> extFields =
+                RocketMQSerializable.mapDeserialize(Unpooled.wrappedBuffer(extFieldsArray), extFieldsArray.length);
         assertThat(extFields).contains(new HashMap.SimpleEntry("key", "value"));
 
-        RemotingCommand decodedCommand = RocketMQSerializable.rocketMQProtocolDecode(result);
+        RemotingCommand decodedCommand = RocketMQSerializable.rocketMQProtocolDecode(Unpooled.wrappedBuffer(result));
 
         assertThat(decodedCommand.getCode()).isEqualTo(code);
         assertThat(decodedCommand.getLanguage()).isEqualTo(LanguageCode.JAVA);
@@ -150,4 +158,56 @@ public class RocketMQSerializableTest {
         return array[index] * 16777216 + array[++index] * 65536 + array[++index] * 256
             + array[++index];
     }
+
+    public static class MyHeader1 implements CommandCustomHeader {
+        private String str;
+        private int num;
+
+        @Override
+        public void checkFields() throws RemotingCommandException {
+        }
+
+        public String getStr() {
+            return str;
+        }
+
+        public void setStr(String str) {
+            this.str = str;
+        }
+
+        public int getNum() {
+            return num;
+        }
+
+        public void setNum(int num) {
+            this.num = num;
+        }
+    }
+
+    @Test
+    public void testFastEncode() throws Exception {
+        ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(16);
+        MyHeader1 header1 = new MyHeader1();
+        header1.setStr("s1");
+        header1.setNum(100);
+        RemotingCommand cmd = RemotingCommand.createRequestCommand(1, header1);
+        cmd.setRemark("remark");
+        cmd.setOpaque(1001);
+        cmd.setVersion(99);
+        cmd.setLanguage(LanguageCode.JAVA);
+        cmd.setFlag(3);
+        cmd.makeCustomHeaderToNet();
+        RocketMQSerializable.rocketMQProtocolEncode(cmd, buf);
+        RemotingCommand cmd2 = RocketMQSerializable.rocketMQProtocolDecode(buf);
+        assertThat(cmd2.getRemark()).isEqualTo("remark");
+        assertThat(cmd2.getCode()).isEqualTo(1);
+        assertThat(cmd2.getOpaque()).isEqualTo(1001);
+        assertThat(cmd2.getVersion()).isEqualTo(99);
+        assertThat(cmd2.getLanguage()).isEqualTo(LanguageCode.JAVA);
+        assertThat(cmd2.getFlag()).isEqualTo(3);
+
+        MyHeader1 h2 = (MyHeader1) cmd2.decodeCommandCustomHeader(MyHeader1.class);
+        assertThat(h2.getStr()).isEqualTo("s1");
+        assertThat(h2.getNum()).isEqualTo(100);
+    }
 }
\ No newline at end of file

[rocketmq] 07/11: 优化createUniqID使其在生产者(client)火焰图中的占比从2.41%下降到0.42%

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

huangli pushed a commit to branch 4.9.2_dev_community
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit c7fe273c5f3ff6b60d138badebedec51fa4dbcfd
Author: huangli <ar...@gmail.com>
AuthorDate: Tue Nov 16 00:16:21 2021 +0800

    优化createUniqID使其在生产者(client)火焰图中的占比从2.41%下降到0.42%
---
 .../java/org/apache/rocketmq/common/UtilAll.java   | 14 +++++++++++
 .../common/message/MessageClientIDSetter.java      | 27 +++++++++++-----------
 .../common/message/MessageClientIDSetterTest.java  | 22 ++++++++++++++++++
 3 files changed, 50 insertions(+), 13 deletions(-)

diff --git a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java
index ea22aa7..a15b4fa 100644
--- a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java
+++ b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java
@@ -262,6 +262,20 @@ public class UtilAll {
         return new String(hexChars);
     }
 
+    public static void writeInt(char[] buffer, int pos, int value) {
+        char[] hexArray = HEX_ARRAY;
+        for (int moveBits = 28; moveBits >= 0; moveBits -= 4) {
+            buffer[pos++] = hexArray[(value >>> moveBits) & 0x0F];
+        }
+    }
+
+    public static void writeShort(char[] buffer, int pos, int value) {
+        char[] hexArray = HEX_ARRAY;
+        for (int moveBits = 12; moveBits >= 0; moveBits -= 4) {
+            buffer[pos++] = hexArray[(value >>> moveBits) & 0x0F];
+        }
+    }
+
     public static byte[] string2bytes(String hexString) {
         if (hexString == null || hexString.equals("")) {
             return null;
diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageClientIDSetter.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageClientIDSetter.java
index 041bf6b..57090c1 100644
--- a/common/src/main/java/org/apache/rocketmq/common/message/MessageClientIDSetter.java
+++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageClientIDSetter.java
@@ -25,7 +25,7 @@ import org.apache.rocketmq.common.UtilAll;
 public class MessageClientIDSetter {
     private static final String TOPIC_KEY_SPLITTER = "#";
     private static final int LEN;
-    private static final String FIX_STRING;
+    private static final char[] FIX_STRING;
     private static final AtomicInteger COUNTER;
     private static long startTime;
     private static long nextStartTime;
@@ -42,7 +42,7 @@ public class MessageClientIDSetter {
         tempBuffer.put(ip);
         tempBuffer.putShort((short) UtilAll.getPid());
         tempBuffer.putInt(MessageClientIDSetter.class.getClassLoader().hashCode());
-        FIX_STRING = UtilAll.bytes2string(tempBuffer.array());
+        FIX_STRING = UtilAll.bytes2string(tempBuffer.array()).toCharArray();
         setStartTime(System.currentTimeMillis());
         COUNTER = new AtomicInteger(0);
     }
@@ -112,21 +112,22 @@ public class MessageClientIDSetter {
     }
 
     public static String createUniqID() {
-        StringBuilder sb = new StringBuilder(LEN * 2);
-        sb.append(FIX_STRING);
-        sb.append(UtilAll.bytes2string(createUniqIDBuffer()));
-        return sb.toString();
-    }
-
-    private static byte[] createUniqIDBuffer() {
-        ByteBuffer buffer = ByteBuffer.allocate(4 + 2);
+        char[] sb = new char[LEN * 2];
+        System.arraycopy(FIX_STRING, 0, sb, 0, FIX_STRING.length);
         long current = System.currentTimeMillis();
         if (current >= nextStartTime) {
             setStartTime(current);
         }
-        buffer.putInt((int) (System.currentTimeMillis() - startTime));
-        buffer.putShort((short) COUNTER.getAndIncrement());
-        return buffer.array();
+        int diff = (int)(current - startTime);
+        if (diff < 0 && diff > -1000_000) {
+            // may cause by NTP
+            diff = 0;
+        }
+        int pos = FIX_STRING.length;
+        UtilAll.writeInt(sb, pos, diff);
+        pos += 8;
+        UtilAll.writeShort(sb, pos, COUNTER.getAndIncrement());
+        return new String(sb);
     }
 
     public static void setUniqID(final Message msg) {
diff --git a/common/src/test/java/org/apache/rocketmq/common/message/MessageClientIDSetterTest.java b/common/src/test/java/org/apache/rocketmq/common/message/MessageClientIDSetterTest.java
index 0a17c36..1734cbd 100644
--- a/common/src/test/java/org/apache/rocketmq/common/message/MessageClientIDSetterTest.java
+++ b/common/src/test/java/org/apache/rocketmq/common/message/MessageClientIDSetterTest.java
@@ -22,9 +22,31 @@ import org.junit.Test;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
+import java.nio.charset.StandardCharsets;
+
 public class MessageClientIDSetterTest {
 
     @Test
+    public void testGetTimeFromID() {
+        long t = System.currentTimeMillis();
+        String uniqID = MessageClientIDSetter.createUniqID();
+        long t2 = MessageClientIDSetter.getNearlyTimeFromID(uniqID).getTime();
+        assertThat(t2 - t < 20);
+    }
+
+    @Test
+    public void testGetCountFromID() {
+        String uniqID = MessageClientIDSetter.createUniqID();
+        String uniqID2 = MessageClientIDSetter.createUniqID();
+        String idHex = uniqID.substring(uniqID.length() - 4);
+        String idHex2 = uniqID2.substring(uniqID2.length() - 4);
+        int s1 = Integer.parseInt(idHex, 16);
+        int s2 = Integer.parseInt(idHex2, 16);
+        assertThat(s1 == s2 - 1);
+    }
+
+
+    @Test
     public void testGetIPStrFromID() {
         byte[] ip = UtilAll.getIP();
         String ipStr = (4 == ip.length) ? UtilAll.ipToIPv4Str(ip) : UtilAll.ipToIPv6Str(ip);