You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by hu...@apache.org on 2021/12/01 10:37:55 UTC

[rocketmq] 04/11: 混合 topic 的 Batch 生产,一次batch可以发送到多个topic和queue,用于外挂式延迟消息服务

This is an automated email from the ASF dual-hosted git repository.

huangli pushed a commit to branch 4.9.2_dev_community
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit f9eb9207f1398fbb25275fa81cb6330761b46464
Author: coding <za...@qq.com>
AuthorDate: Fri Nov 5 11:49:11 2021 +0800

    混合 topic 的 Batch 生产,一次batch可以发送到多个topic和queue,用于外挂式延迟消息服务
---
 .reviewboardrc                                     |   4 +
 .../processor/AbstractSendMessageProcessor.java    |  48 +++++++-
 .../broker/processor/SendMessageProcessor.java     |  32 +++++-
 .../org/apache/rocketmq/client/Validators.java     |   9 +-
 .../rocketmq/client/common/ClientErrorCode.java    |   1 +
 .../client/impl/factory/MQClientInstance.java      |   2 +
 .../impl/producer/DefaultMQProducerImpl.java       | 104 ++++++++++++++++-
 .../client/impl/producer/TopicPublishInfo.java     |  33 +++++-
 .../client/producer/DefaultMQProducer.java         |  16 ++-
 .../rocketmq/client/producer/MQProducer.java       |   4 +
 .../java/org/apache/rocketmq/common/MixAll.java    |   2 +
 .../rocketmq/common/message/MessageBatch.java      |  74 +++++++++++-
 .../rocketmq/common/message/MessageDecoder.java    |  24 +++-
 .../rocketmq/common/message/MessageExtBatch.java   |  30 +++++
 .../protocol/header/SendMessageRequestHeader.java  |  20 ++++
 .../header/SendMessageRequestHeaderV2.java         |  36 ++++++
 .../apache/rocketmq/common/MessageBatchTest.java   |  29 ++++-
 .../rocketmq/common/MessageEncodeDecodeTest.java   |   2 +-
 .../java/org/apache/rocketmq/store/CommitLog.java  | 125 +++++++++++++++++++--
 .../apache/rocketmq/store/DefaultMessageStore.java |  15 ++-
 20 files changed, 570 insertions(+), 40 deletions(-)

diff --git a/.reviewboardrc b/.reviewboardrc
new file mode 100644
index 0000000..16bd045
--- /dev/null
+++ b/.reviewboardrc
@@ -0,0 +1,4 @@
+REVIEWBOARD_URL = "http://rb.corp.kuaishou.com/reviewboard/"
+REPOSITORY = "git@git.corp.kuaishou.com:infra/rocketmq.git" 
+BRANCH = "master"
+LAND_DEST_BRANCH = "master"
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
index 66480ad..85cb705 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
@@ -16,10 +16,8 @@
  */
 package org.apache.rocketmq.broker.processor;
 
-import io.netty.channel.ChannelHandlerContext;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
@@ -27,7 +25,6 @@ import java.util.concurrent.ThreadLocalRandom;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.mqtrace.SendMessageContext;
 import org.apache.rocketmq.broker.mqtrace.SendMessageHook;
-import org.apache.rocketmq.common.topic.TopicValidator;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.TopicFilterType;
@@ -46,6 +43,7 @@ import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2;
 import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;
 import org.apache.rocketmq.common.sysflag.MessageSysFlag;
 import org.apache.rocketmq.common.sysflag.TopicSysFlag;
+import org.apache.rocketmq.common.topic.TopicValidator;
 import org.apache.rocketmq.common.utils.ChannelUtil;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
@@ -56,6 +54,8 @@ import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.store.MessageExtBrokerInner;
 
+import io.netty.channel.ChannelHandlerContext;
+
 public abstract class AbstractSendMessageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
     protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
 
@@ -163,8 +163,50 @@ public abstract class AbstractSendMessageProcessor extends AsyncNettyRequestProc
         return response;
     }
 
+    protected RemotingCommand msgCheck(final ChannelHandlerContext ctx, RemotingCommand response, String[] topics, int[] queueIds) {
+        for (int i = 0; i < topics.length; i++) {
+            String topic = topics[i];
+            int queueId = queueIds[i];
+            if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())
+                    && this.brokerController.getTopicConfigManager().isOrderTopic(topic)) {
+                response.setCode(ResponseCode.NO_PERMISSION);
+                response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending message is forbidden");
+                return response;
+            }
+
+            TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
+
+            if (null == topicConfig) {
+                response.setCode(ResponseCode.TOPIC_NOT_EXIST);
+                response.setRemark("topic[" + topic + "] not exist, apply first please!"
+                        + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
+                return response;
+            }
+
+            int idValid = Math.max(topicConfig.getWriteQueueNums(), topicConfig.getReadQueueNums());
+            if (queueId >= idValid) {
+                String errorInfo = String.format("request queueId[%d] is illegal, %s Producer: %s",
+                        queueId,
+                        topicConfig,
+                        RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+
+                log.warn(errorInfo);
+                response.setCode(ResponseCode.SYSTEM_ERROR);
+                response.setRemark(errorInfo);
+
+                return response;
+            }
+        }
+
+        return response;
+    }
+
     protected RemotingCommand msgCheck(final ChannelHandlerContext ctx,
         final SendMessageRequestHeader requestHeader, final RemotingCommand response) {
+        if (requestHeader.isMultiTopic()) {
+            return response;
+        }
+
         if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())
             && this.brokerController.getTopicConfigManager().isOrderTopic(requestHeader.getTopic())) {
             response.setCode(ResponseCode.NO_PERMISSION);
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index b31c71e..994d596 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -17,13 +17,13 @@
 package org.apache.rocketmq.broker.processor;
 
 import java.net.SocketAddress;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ThreadLocalRandom;
 
-import io.netty.channel.ChannelHandlerContext;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext;
 import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook;
@@ -62,6 +62,8 @@ import org.apache.rocketmq.store.config.MessageStoreConfig;
 import org.apache.rocketmq.store.config.StorePathConfigHelper;
 import org.apache.rocketmq.store.stats.BrokerStatsManager;
 
+import io.netty.channel.ChannelHandlerContext;
+
 public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
 
     private List<ConsumeMessageHook> consumeMessageHookList;
@@ -584,6 +586,27 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
             return CompletableFuture.completedFuture(response);
         }
 
+        String[] topics = null;
+        int[] queueIds = null;
+        if (requestHeader.isMultiTopic()) {
+            topics = requestHeader.getTopic().split(MixAll.BATCH_TOPIC_SPLITTER); // decode topics
+            if (requestHeader.getQueueIds() == null) {
+                queueIds = new int[topics.length];
+                for (int i = 0; i < topics.length; i++) {
+                    TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topics[i]);
+                    queueIds[i] = randomQueueId(topicConfig.getWriteQueueNums());
+                }
+            } else {
+                // decode queueIds
+                queueIds = Arrays.stream(requestHeader.getQueueIds().split(MixAll.BATCH_QUEUE_ID_SPLITTER)).mapToInt(Integer::parseInt).toArray();
+            }
+            msgCheck(ctx, response, topics, queueIds);
+            if (response.getCode() != -1) {
+                return CompletableFuture.completedFuture(response);
+            }
+            requestHeader.setTopic(topics[0]);
+        }
+
         int queueIdInt = requestHeader.getQueueId();
         TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
 
@@ -600,6 +623,11 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
         MessageExtBatch messageExtBatch = new MessageExtBatch();
         messageExtBatch.setTopic(requestHeader.getTopic());
         messageExtBatch.setQueueId(queueIdInt);
+        if (requestHeader.isMultiTopic()) {
+            messageExtBatch.setMultiTopic(true);
+            messageExtBatch.setTopics(topics);
+            messageExtBatch.setQueueIds(queueIds);
+        }
 
         int sysFlag = requestHeader.getSysFlag();
         if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) {
@@ -621,8 +649,6 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
         return handlePutMessageResultFuture(putMessageResult, response, request, messageExtBatch, responseHeader, mqtraceContext, ctx, queueIdInt);
     }
 
-
-
     public boolean hasConsumeMessageHook() {
         return consumeMessageHookList != null && !this.consumeMessageHookList.isEmpty();
     }
diff --git a/client/src/main/java/org/apache/rocketmq/client/Validators.java b/client/src/main/java/org/apache/rocketmq/client/Validators.java
index e712e2f..cf5f078 100644
--- a/client/src/main/java/org/apache/rocketmq/client/Validators.java
+++ b/client/src/main/java/org/apache/rocketmq/client/Validators.java
@@ -23,6 +23,7 @@ import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageBatch;
 import org.apache.rocketmq.common.protocol.ResponseCode;
 import org.apache.rocketmq.common.topic.TopicValidator;
 
@@ -84,8 +85,12 @@ public class Validators {
             throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null");
         }
         // topic
-        Validators.checkTopic(msg.getTopic());
-        Validators.isNotAllowedSendTopic(msg.getTopic());
+        if (msg instanceof MessageBatch && ((MessageBatch) msg).isMultiTopic()) {
+            // ignore check
+        } else {
+            Validators.checkTopic(msg.getTopic());
+            Validators.isNotAllowedSendTopic(msg.getTopic());
+        }
 
         // body
         if (null == msg.getBody()) {
diff --git a/client/src/main/java/org/apache/rocketmq/client/common/ClientErrorCode.java b/client/src/main/java/org/apache/rocketmq/client/common/ClientErrorCode.java
index bc03b14..8aa4856 100644
--- a/client/src/main/java/org/apache/rocketmq/client/common/ClientErrorCode.java
+++ b/client/src/main/java/org/apache/rocketmq/client/common/ClientErrorCode.java
@@ -25,4 +25,5 @@ public class ClientErrorCode {
     public static final int NOT_FOUND_TOPIC_EXCEPTION = 10005;
     public static final int REQUEST_TIMEOUT_EXCEPTION = 10006;
     public static final int CREATE_REPLY_MESSAGE_EXCEPTION = 10007;
+    public static final int NOT_FOUND_MULTI_TOPIC_EXCEPTION = 10008;
 }
\ No newline at end of file
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index 9651943..7db4ccf 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -168,6 +168,7 @@ public class MQClientInstance {
                 for (int i = 0; i < nums; i++) {
                     MessageQueue mq = new MessageQueue(topic, item[0], i);
                     info.getMessageQueueList().add(mq);
+                    info.getBrokers().add(mq.getBrokerName());
                 }
             }
 
@@ -196,6 +197,7 @@ public class MQClientInstance {
                     for (int i = 0; i < qd.getWriteQueueNums(); i++) {
                         MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
                         info.getMessageQueueList().add(mq);
+                        info.getBrokers().add(mq.getBrokerName());
                     }
                 }
             }
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index 2a784b5..117a8c6 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -16,12 +16,15 @@
  */
 package org.apache.rocketmq.client.impl.producer;
 
+
 import java.io.IOException;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
@@ -36,9 +39,11 @@ import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+
 import org.apache.rocketmq.client.QueryResult;
 import org.apache.rocketmq.client.Validators;
 import org.apache.rocketmq.client.common.ClientErrorCode;
+import org.apache.rocketmq.client.common.ThreadLocalIndex;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.exception.RequestTimeoutException;
@@ -98,6 +103,7 @@ import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
 public class DefaultMQProducerImpl implements MQProducerInner {
     private final InternalLogger log = ClientLogger.getLog();
     private final Random random = new Random();
+    private final ThreadLocalIndex sendWhichBroker = new ThreadLocalIndex();
     private final DefaultMQProducer defaultMQProducer;
     private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable =
         new ConcurrentHashMap<String, TopicPublishInfo>();
@@ -547,6 +553,52 @@ public class DefaultMQProducerImpl implements MQProducerInner {
 
     }
 
+    public MessageQueue selectOneMessageQueue(MessageBatch message, final Map<String, TopicPublishInfo> tpInfoMap, final String lastBrokerName)
+            throws MQClientException {
+        // select intersection brokerName first.
+        Set<String> sharedBrokers = null;
+        for (TopicPublishInfo tpi : tpInfoMap.values()) {
+            Set<String> brokers = tpi.getBrokers();
+            if (sharedBrokers == null) {
+                sharedBrokers = brokers;
+            } else {
+                sharedBrokers.retainAll(brokers);
+            }
+        }
+
+        if (sharedBrokers == null || sharedBrokers.isEmpty()) {
+            throw new MQClientException(ClientErrorCode.NOT_FOUND_MULTI_TOPIC_EXCEPTION, "multi topic batch route not found, sharedBroker empty");
+        }
+
+        List<String> brokers = new ArrayList<>(sharedBrokers);
+        int index = Math.abs(sendWhichBroker.incrementAndGet()) % brokers.size();
+        if (index < 0) index = 0;
+        String brokerName = brokers.get(index);
+        if (lastBrokerName != null && brokers.size() != 1 && brokerName.equals(lastBrokerName)) {
+            index++;
+            if (index == brokers.size()) {
+                index = 0;
+            }
+            brokerName = brokers.get(index);
+        }
+
+        Map<String, Integer> queueIdMap = new HashMap<>(tpInfoMap.size());
+        String firstTopic = null;
+        for (Map.Entry<String, TopicPublishInfo> entry : tpInfoMap.entrySet()) {
+            MessageQueue mq = entry.getValue().selectOneMessageQueueByBrokerName(brokerName);
+            if (mq == null) {
+                throw new MQClientException(ClientErrorCode.NOT_FOUND_MULTI_TOPIC_EXCEPTION, "multi topic batch route not found");
+            }
+            queueIdMap.put(entry.getKey(), mq.getQueueId());
+            if (firstTopic == null) {
+                firstTopic = entry.getKey();
+            }
+        }
+
+        message.setQueueIdMap(queueIdMap);
+        return new MessageQueue(firstTopic, brokerName, 0); // only brokerName matters.
+    }
+
     public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
         return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);
     }
@@ -576,8 +628,19 @@ public class DefaultMQProducerImpl implements MQProducerInner {
         long beginTimestampFirst = System.currentTimeMillis();
         long beginTimestampPrev = beginTimestampFirst;
         long endTimestamp = beginTimestampFirst;
-        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
-        if (topicPublishInfo != null && topicPublishInfo.ok()) {
+
+        Map<String, TopicPublishInfo> topicPublishInfoMap = null;
+        TopicPublishInfo topicPublishInfo = null;
+        boolean multiTopic = false;
+        if (msg instanceof MessageBatch && ((MessageBatch) msg).isMultiTopic()) {
+            multiTopic = true;
+            MessageBatch messageBatch = (MessageBatch) msg;
+            topicPublishInfoMap = this.tryToFindTopicPublishInfo(messageBatch.getTopicIndexMap().keySet());
+        } else {
+            topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
+        }
+
+        if (multiTopic || (topicPublishInfo != null && topicPublishInfo.ok())) {
             boolean callTimeout = false;
             MessageQueue mq = null;
             Exception exception = null;
@@ -587,7 +650,12 @@ public class DefaultMQProducerImpl implements MQProducerInner {
             String[] brokersSent = new String[timesTotal];
             for (; times < timesTotal; times++) {
                 String lastBrokerName = null == mq ? null : mq.getBrokerName();
-                MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
+                MessageQueue mqSelected;
+                if (multiTopic) {
+                    mqSelected = this.selectOneMessageQueue((MessageBatch) msg, topicPublishInfoMap, lastBrokerName);
+                } else {
+                    mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
+                }
                 if (mqSelected != null) {
                     mq = mqSelected;
                     brokersSent[times] = mq.getBrokerName();
@@ -695,6 +763,19 @@ public class DefaultMQProducerImpl implements MQProducerInner {
             null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
     }
 
+    public Map<String, TopicPublishInfo> tryToFindTopicPublishInfo(final Set<String> topics) throws MQClientException {
+        Map<String, TopicPublishInfo> topicPublishInfoMap = new HashMap<>(topics.size());
+        for (String topic : topics) {
+            TopicPublishInfo topicPublishInfo = tryToFindTopicPublishInfo(topic);
+            if (topicPublishInfo == null || !topicPublishInfo.ok()) {
+                throw new MQClientException("No route info of this topic: " + topic + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
+                        null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
+            }
+            topicPublishInfoMap.put(topic, topicPublishInfo);
+        }
+        return topicPublishInfoMap;
+    }
+
     private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
         TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
         if (null == topicPublishInfo || !topicPublishInfo.ok()) {
@@ -800,6 +881,23 @@ public class DefaultMQProducerImpl implements MQProducerInner {
                 requestHeader.setReconsumeTimes(0);
                 requestHeader.setUnitMode(this.isUnitMode());
                 requestHeader.setBatch(msg instanceof MessageBatch);
+
+                if (msg instanceof MessageBatch && ((MessageBatch) msg).isMultiTopic()) {
+                    Map<String, Integer> queueIdMap = ((MessageBatch) msg).getQueueIdMap();
+                    String topic = msg.getTopic();
+                    StringBuilder sb = new StringBuilder();
+                    int idx = 0;
+                    for (String s : topic.split(MixAll.BATCH_TOPIC_SPLITTER)) {
+                        if (idx != 0) {
+                            sb.append(MixAll.BATCH_QUEUE_ID_SPLITTER);
+                        }
+                        sb.append(queueIdMap.get(s));
+                        idx++;
+                    }
+                    requestHeader.setMultiTopic(true);
+                    requestHeader.setQueueIds(sb.toString());
+                }
+
                 if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                     String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
                     if (reconsumeTimes != null) {
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java
index 2f8337e..29c2b12 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java
@@ -17,7 +17,11 @@
 package org.apache.rocketmq.client.impl.producer;
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
 import org.apache.rocketmq.client.common.ThreadLocalIndex;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.route.QueueData;
@@ -27,6 +31,7 @@ public class TopicPublishInfo {
     private boolean orderTopic = false;
     private boolean haveTopicRouterInfo = false;
     private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
+    private Set<String> brokers = new HashSet<>();
     private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
     private TopicRouteData topicRouteData;
 
@@ -42,6 +47,14 @@ public class TopicPublishInfo {
         return null != this.messageQueueList && !this.messageQueueList.isEmpty();
     }
 
+    public Set<String> getBrokers() {
+        return brokers;
+    }
+
+    public void setBrokers(Set<String> brokers) {
+        this.brokers = brokers;
+    }
+
     public List<MessageQueue> getMessageQueueList() {
         return messageQueueList;
     }
@@ -84,6 +97,24 @@ public class TopicPublishInfo {
         }
     }
 
+    public MessageQueue selectOneMessageQueueByBrokerName(final String brokerName) {
+        if (this.messageQueueList == null) {
+            return null;
+        }
+        List<MessageQueue> messageQueues = this.messageQueueList.stream()
+                .filter(mq -> mq.getBrokerName().equals(brokerName))
+                .collect(Collectors.toList());
+        if (messageQueues.isEmpty()) {
+            return null;
+        }
+
+        int index = this.sendWhichQueue.incrementAndGet();
+        int pos = Math.abs(index) % messageQueues.size();
+        if (pos < 0)
+            pos = 0;
+        return messageQueues.get(pos);
+    }
+
     public MessageQueue selectOneMessageQueue() {
         int index = this.sendWhichQueue.incrementAndGet();
         int pos = Math.abs(index) % this.messageQueueList.size();
@@ -106,7 +137,7 @@ public class TopicPublishInfo {
     @Override
     public String toString() {
         return "TopicPublishInfo [orderTopic=" + orderTopic + ", messageQueueList=" + messageQueueList
-            + ", sendWhichQueue=" + sendWhichQueue + ", haveTopicRouterInfo=" + haveTopicRouterInfo + "]";
+            + ", sendWhichQueue=" + sendWhichQueue + ", haveTopicRouterInfo=" + haveTopicRouterInfo + ", brokers=" +  brokers + "]";
     }
 
     public TopicRouteData getTopicRouteData() {
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
index 9f91b41..92e5a41 100644
--- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
@@ -22,6 +22,7 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.ExecutorService;
+
 import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.QueryResult;
 import org.apache.rocketmq.client.Validators;
@@ -910,6 +911,11 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
     }
 
     @Override
+    public SendResult sendMultiTopicBatch(Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+        return this.defaultMQProducerImpl.send(multiTopicBatch(msgs));
+    }
+
+    @Override
     public SendResult send(Collection<Message> msgs,
         long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
         return this.defaultMQProducerImpl.send(batch(msgs), timeout);
@@ -980,9 +986,13 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
     }
 
     private MessageBatch batch(Collection<Message> msgs) throws MQClientException {
+        return batch(msgs, false);
+    }
+
+    private MessageBatch batch(Collection<Message> msgs, boolean allowMultiTopic) throws MQClientException {
         MessageBatch msgBatch;
         try {
-            msgBatch = MessageBatch.generateFromList(msgs);
+            msgBatch = MessageBatch.generateFromList(msgs, allowMultiTopic);
             for (Message message : msgBatch) {
                 Validators.checkMessage(message, this);
                 MessageClientIDSetter.setUniqID(message);
@@ -996,6 +1006,10 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
         return msgBatch;
     }
 
+    private MessageBatch multiTopicBatch(Collection<Message> msgs) throws MQClientException {
+        return batch(msgs, true);
+    }
+
     public String getProducerGroup() {
         return producerGroup;
     }
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
index f70ddb2..9325a62 100644
--- a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
@@ -91,6 +91,10 @@ public interface MQProducer extends MQAdmin {
     SendResult send(final Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException,
         InterruptedException;
 
+    // for batch msgs with multi topics.
+    SendResult sendMultiTopicBatch(final Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException,
+            InterruptedException;
+
     SendResult send(final Collection<Message> msgs, final long timeout) throws MQClientException,
         RemotingException, MQBrokerException, InterruptedException;
 
diff --git a/common/src/main/java/org/apache/rocketmq/common/MixAll.java b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
index ec1e1f0..1ab67cb 100644
--- a/common/src/main/java/org/apache/rocketmq/common/MixAll.java
+++ b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
@@ -83,6 +83,8 @@ public class MixAll {
     public static final String CID_SYS_RMQ_TRANS = "CID_RMQ_SYS_TRANS";
     public static final String ACL_CONF_TOOLS_FILE = "/conf/tools.yml";
     public static final String REPLY_MESSAGE_FLAG = "reply";
+    public static final String BATCH_TOPIC_SPLITTER = "%%";
+    public static final String BATCH_QUEUE_ID_SPLITTER = ",";
     private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
 
     public static String getWSAddr() {
diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageBatch.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageBatch.java
index a6b801e..9d1e340 100644
--- a/common/src/main/java/org/apache/rocketmq/common/message/MessageBatch.java
+++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageBatch.java
@@ -18,8 +18,13 @@ package org.apache.rocketmq.common.message;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
 import org.apache.rocketmq.common.MixAll;
 
 public class MessageBatch extends Message implements Iterable<Message> {
@@ -27,23 +32,59 @@ public class MessageBatch extends Message implements Iterable<Message> {
     private static final long serialVersionUID = 621335151046335557L;
     private final List<Message> messages;
 
+    private boolean multiTopic = false;
+
+    private Map<String/*topic*/, Integer/*index*/> topicIndexMap;
+    private Map<String/*topic*/, Integer/*queueId*/> queueIdMap;
+
     private MessageBatch(List<Message> messages) {
         this.messages = messages;
     }
 
+    private MessageBatch(List<Message> messages, boolean multiTopic) {
+        this.messages = messages;
+        this.multiTopic = multiTopic;
+    }
+
+    public boolean isMultiTopic() {
+        return multiTopic;
+    }
+
+    public void setTopicIndexMap(Map<String, Integer> topicIndexMap) {
+        this.topicIndexMap = topicIndexMap;
+    }
+
+    public Map<String, Integer> getTopicIndexMap() {
+        return topicIndexMap;
+    }
+
+    public Map<String, Integer> getQueueIdMap() {
+        return queueIdMap;
+    }
+
+    public void setQueueIdMap(Map<String, Integer> queueIdMap) {
+        this.queueIdMap = queueIdMap;
+    }
+
     public byte[] encode() {
-        return MessageDecoder.encodeMessages(messages);
+        if (multiTopic) {
+            return MessageDecoder.encodeMultiTopicMessages(messages, topicIndexMap);
+        } else {
+            return MessageDecoder.encodeMessages(messages);
+        }
     }
 
     public Iterator<Message> iterator() {
         return messages.iterator();
     }
 
-    public static MessageBatch generateFromList(Collection<Message> messages) {
+    public static MessageBatch generateFromList(Collection<Message> messages, boolean allowMultiTopic) {
         assert messages != null;
         assert messages.size() > 0;
         List<Message> messageList = new ArrayList<Message>(messages.size());
         Message first = null;
+        boolean multiTopic = false;
+        Set<String> topics = new HashSet<>();
         for (Message message : messages) {
             if (message.getDelayTimeLevel() > 0) {
                 throw new UnsupportedOperationException("TimeDelayLevel is not supported for batching");
@@ -55,19 +96,40 @@ public class MessageBatch extends Message implements Iterable<Message> {
                 first = message;
             } else {
                 if (!first.getTopic().equals(message.getTopic())) {
-                    throw new UnsupportedOperationException("The topic of the messages in one batch should be the same");
+                    if (!allowMultiTopic) {
+                        throw new UnsupportedOperationException("The topic of the messages in one batch should be the same");
+                    }
+                    multiTopic = true;
                 }
                 if (first.isWaitStoreMsgOK() != message.isWaitStoreMsgOK()) {
                     throw new UnsupportedOperationException("The waitStoreMsgOK of the messages in one batch should the same");
                 }
             }
             messageList.add(message);
+            topics.add(message.getTopic());
         }
-        MessageBatch messageBatch = new MessageBatch(messageList);
 
-        messageBatch.setTopic(first.getTopic());
+        MessageBatch messageBatch = new MessageBatch(messageList, multiTopic);
+
+        if (multiTopic) {
+            Map<String, Integer> topicIndexMap = new HashMap<>(topics.size());
+            int index = 0;
+            StringBuilder sb = new StringBuilder();
+            for (String topic : topics) {
+                if (index != 0) {
+                    sb.append(MixAll.BATCH_TOPIC_SPLITTER);
+                }
+                sb.append(topic);
+                topicIndexMap.put(topic, index);
+                index++;
+            }
+            messageBatch.setTopic(sb.toString());
+            messageBatch.setTopicIndexMap(topicIndexMap);
+        } else {
+            messageBatch.setTopic(first.getTopic());
+        }
         messageBatch.setWaitStoreMsgOK(first.isWaitStoreMsgOK());
+
         return messageBatch;
     }
-
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
index c94700e..f6a9bf9 100644
--- a/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
+++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
@@ -27,6 +27,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.sysflag.MessageSysFlag;
 
@@ -471,7 +472,7 @@ public class MessageDecoder {
         return map;
     }
 
-    public static byte[] encodeMessage(Message message) {
+    public static byte[] encodeMessage(Message message, boolean multiTopic, Map<String, Integer> topicIndex) {
         //only need flag, body, properties
         byte[] body = message.getBody();
         int bodyLen = body.length;
@@ -484,8 +485,9 @@ public class MessageDecoder {
             + 4 // 2 MAGICCOD
             + 4 // 3 BODYCRC
             + 4 // 4 FLAG
-            + 4 + bodyLen // 4 BODY
-            + 2 + propertiesLength;
+            + 4 + bodyLen // 5 BODY
+            + 2 + propertiesLength // 6 PROPERTY
+            + (multiTopic ? 4 : 0); // 7 TOPIC_INDEX
         ByteBuffer byteBuffer = ByteBuffer.allocate(storeSize);
         // 1 TOTALSIZE
         byteBuffer.putInt(storeSize);
@@ -508,6 +510,10 @@ public class MessageDecoder {
         byteBuffer.putShort(propertiesLength);
         byteBuffer.put(propertiesBytes);
 
+        if (multiTopic) {
+            // 7. topic_index.
+            byteBuffer.putInt(topicIndex.get(message.getTopic()));
+        }
         return byteBuffer.array();
     }
 
@@ -542,12 +548,16 @@ public class MessageDecoder {
         return message;
     }
 
-    public static byte[] encodeMessages(List<Message> messages) {
+    public static byte[] encodeMultiTopicMessages(List<Message> messages, Map<String, Integer> map) {
+        return doEncodeMessages(messages, true, map);
+    }
+
+    private static byte[] doEncodeMessages(List<Message> messages, boolean multiTopic, Map<String, Integer> topicIndexMap) {
         //TO DO refactor, accumulate in one buffer, avoid copies
         List<byte[]> encodedMessages = new ArrayList<byte[]>(messages.size());
         int allSize = 0;
         for (Message message : messages) {
-            byte[] tmp = encodeMessage(message);
+            byte[] tmp = encodeMessage(message, multiTopic, topicIndexMap);
             encodedMessages.add(tmp);
             allSize += tmp.length;
         }
@@ -560,6 +570,10 @@ public class MessageDecoder {
         return allBytes;
     }
 
+    public static byte[] encodeMessages(List<Message> messages) {
+        return doEncodeMessages(messages, false, null);
+    }
+
     public static List<Message> decodeMessages(ByteBuffer byteBuffer) throws Exception {
         //TO DO add a callback for processing,  avoid creating lists
         List<Message> msgs = new ArrayList<Message>();
diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBatch.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBatch.java
index a2713cb..21fa505 100644
--- a/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBatch.java
+++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBatch.java
@@ -30,6 +30,12 @@ public class MessageExtBatch extends MessageExt {
 
     private ByteBuffer encodedBuff;
 
+    private boolean multiTopic;
+
+    private String[] topics;
+
+    private int[] queueIds;
+
     public ByteBuffer getEncodedBuff() {
         return encodedBuff;
     }
@@ -37,4 +43,28 @@ public class MessageExtBatch extends MessageExt {
     public void setEncodedBuff(ByteBuffer encodedBuff) {
         this.encodedBuff = encodedBuff;
     }
+
+    public boolean isMultiTopic() {
+        return multiTopic;
+    }
+
+    public void setMultiTopic(boolean multiTopic) {
+        this.multiTopic = multiTopic;
+    }
+
+    public String[] getTopics() {
+        return topics;
+    }
+
+    public void setTopics(String[] topics) {
+        this.topics = topics;
+    }
+
+    public int[] getQueueIds() {
+        return queueIds;
+    }
+
+    public void setQueueIds(int[] queueIds) {
+        this.queueIds = queueIds;
+    }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java
index 2df31e6..fc828d9 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java
@@ -50,6 +50,10 @@ public class SendMessageRequestHeader implements CommandCustomHeader {
     private boolean unitMode = false;
     @CFNullable
     private boolean batch = false;
+    @CFNullable
+    private boolean multiTopic = false;
+    @CFNullable
+    private String queueIds;
     private Integer maxReconsumeTimes;
 
     @Override
@@ -159,4 +163,20 @@ public class SendMessageRequestHeader implements CommandCustomHeader {
     public void setBatch(boolean batch) {
         this.batch = batch;
     }
+
+    public boolean isMultiTopic() {
+        return multiTopic;
+    }
+
+    public void setMultiTopic(boolean multiTopic) {
+        this.multiTopic = multiTopic;
+    }
+
+    public String getQueueIds() {
+        return queueIds;
+    }
+
+    public void setQueueIds(String queueIds) {
+        this.queueIds = queueIds;
+    }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java
index 498a7fa..de26947 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java
@@ -57,6 +57,12 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader, FastCode
     @CFNullable
     private boolean m; //batch
 
+    @CFNullable
+    private boolean n; //multi topic
+
+    @CFNullable
+    private String o; // queueIds
+
     public static SendMessageRequestHeader createSendMessageRequestHeaderV1(final SendMessageRequestHeaderV2 v2) {
         SendMessageRequestHeader v1 = new SendMessageRequestHeader();
         v1.setProducerGroup(v2.a);
@@ -72,6 +78,8 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader, FastCode
         v1.setUnitMode(v2.k);
         v1.setMaxReconsumeTimes(v2.l);
         v1.setBatch(v2.m);
+        v1.setMultiTopic(v2.n);
+        v1.setQueueIds(v2.o);
         return v1;
     }
 
@@ -90,6 +98,8 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader, FastCode
         v2.k = v1.isUnitMode();
         v2.l = v1.getMaxReconsumeTimes();
         v2.m = v1.isBatch();
+        v2.n = v1.isMultiTopic();
+        v2.o = v1.getQueueIds();
         return v2;
     }
 
@@ -156,6 +166,16 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader, FastCode
         if (str != null) {
             m = Boolean.parseBoolean(str);
         }
+
+        str = fields.get("n");
+        if (str != null) {
+            n = Boolean.parseBoolean(str);
+        }
+
+        str = fields.get("o");
+        if (str != null) {
+            o = str;
+        }
     }
 
     public String getA() {
@@ -261,4 +281,20 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader, FastCode
     public void setM(boolean m) {
         this.m = m;
     }
+
+    public boolean isN() {
+        return n;
+    }
+
+    public void setN(boolean n) {
+        this.n = n;
+    }
+
+    public String getO() {
+        return o;
+    }
+
+    public void setO(String o) {
+        this.o = o;
+    }
 }
\ No newline at end of file
diff --git a/common/src/test/java/org/apache/rocketmq/common/MessageBatchTest.java b/common/src/test/java/org/apache/rocketmq/common/MessageBatchTest.java
index f264420..363f576 100644
--- a/common/src/test/java/org/apache/rocketmq/common/MessageBatchTest.java
+++ b/common/src/test/java/org/apache/rocketmq/common/MessageBatchTest.java
@@ -18,9 +18,11 @@
 package org.apache.rocketmq.common;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.common.message.MessageBatch;
+import org.junit.Assert;
 import org.junit.Test;
 
 public class MessageBatchTest {
@@ -38,34 +40,51 @@ public class MessageBatchTest {
     @Test
     public void testGenerate_OK() throws Exception {
         List<Message> messages = generateMessages();
-        MessageBatch.generateFromList(messages);
+        MessageBatch.generateFromList(messages, false);
     }
 
     @Test(expected = UnsupportedOperationException.class)
     public void testGenerate_DiffTopic() throws Exception {
         List<Message> messages = generateMessages();
         messages.get(1).setTopic("topic2");
-        MessageBatch.generateFromList(messages);
+        MessageBatch.generateFromList(messages, false);
     }
 
     @Test(expected = UnsupportedOperationException.class)
     public void testGenerate_DiffWaitOK() throws Exception {
         List<Message> messages = generateMessages();
         messages.get(1).setWaitStoreMsgOK(false);
-        MessageBatch.generateFromList(messages);
+        MessageBatch.generateFromList(messages, false);
     }
 
     @Test(expected = UnsupportedOperationException.class)
     public void testGenerate_Delay() throws Exception {
         List<Message> messages = generateMessages();
         messages.get(1).setDelayTimeLevel(1);
-        MessageBatch.generateFromList(messages);
+        MessageBatch.generateFromList(messages, false);
     }
 
     @Test(expected = UnsupportedOperationException.class)
     public void testGenerate_Retry() throws Exception {
         List<Message> messages = generateMessages();
         messages.get(1).setTopic(MixAll.RETRY_GROUP_TOPIC_PREFIX + "topic");
-        MessageBatch.generateFromList(messages);
+        MessageBatch.generateFromList(messages, false);
+    }
+
+    @Test
+    public void testGenerate_MultiTopic() {
+        List<Message> messages = Arrays.asList(
+                new Message("topicA", "bodyA1".getBytes()),
+                new Message("topicB", "bodyB1".getBytes()),
+                new Message("topicA", "bodyA2".getBytes()),
+                new Message("topicB", "bodyB2".getBytes())
+                );
+
+        MessageBatch messageBatch = MessageBatch.generateFromList(messages, true);
+        Assert.assertEquals(messageBatch.getTopic(), "topicA%%topicB");
+        String[] topics = messageBatch.getTopic().split(MixAll.BATCH_TOPIC_SPLITTER);
+        for (int i = 0; i < topics.length; i++) {
+            Assert.assertEquals((int)messageBatch.getTopicIndexMap().get(topics[i]), i);
+        }
     }
 }
diff --git a/common/src/test/java/org/apache/rocketmq/common/MessageEncodeDecodeTest.java b/common/src/test/java/org/apache/rocketmq/common/MessageEncodeDecodeTest.java
index 42d3909..fe7a939 100644
--- a/common/src/test/java/org/apache/rocketmq/common/MessageEncodeDecodeTest.java
+++ b/common/src/test/java/org/apache/rocketmq/common/MessageEncodeDecodeTest.java
@@ -33,7 +33,7 @@ public class MessageEncodeDecodeTest {
         Message message = new Message("topic", "body".getBytes());
         message.setFlag(12);
         message.putUserProperty("key", "value");
-        byte[] bytes = MessageDecoder.encodeMessage(message);
+        byte[] bytes = MessageDecoder.encodeMessage(message, false, null);
         ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
         buffer.put(bytes);
         buffer.flip();
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index 36db2f5..49ad725 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -1259,16 +1259,13 @@ public class CommitLog {
     class DefaultAppendMessageCallback implements AppendMessageCallback {
         // File at the end of the minimum fixed length empty
         private static final int END_FILE_MIN_BLANK_LENGTH = 4 + 4;
-        private final ByteBuffer msgIdMemory;
-        private final ByteBuffer msgIdV6Memory;
         // Store the message content
         private final ByteBuffer msgStoreItemMemory;
         // The maximum length of the message
         private final int maxMessageSize;
+        private final StringBuilder keyBuilder = new StringBuilder();
 
         DefaultAppendMessageCallback(final int size) {
-            this.msgIdMemory = ByteBuffer.allocate(4 + 4 + 8);
-            this.msgIdV6Memory = ByteBuffer.allocate(16 + 4 + 8);
             this.msgStoreItemMemory = ByteBuffer.allocate(END_FILE_MIN_BLANK_LENGTH);
             this.maxMessageSize = size;
         }
@@ -1370,6 +1367,9 @@ public class CommitLog {
 
         public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
             final MessageExtBatch messageExtBatch, PutMessageContext putMessageContext) {
+            if (messageExtBatch.isMultiTopic()) {
+                return doAppendMultiTopic(fileFromOffset, byteBuffer, maxBlank, messageExtBatch, putMessageContext);
+            }
             byteBuffer.mark();
             //physical offset
             long wroteOffset = fileFromOffset + byteBuffer.position();
@@ -1468,6 +1468,105 @@ public class CommitLog {
             return result;
         }
 
+        private AppendMessageResult doAppendMultiTopic(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
+                                                       final MessageExtBatch messageExtBatch, PutMessageContext putMessageContext) {
+            //physical offset
+            long wroteOffset = fileFromOffset + byteBuffer.position();
+
+            int totalMsgLen = 0;
+            int msgNum = 0;
+            byteBuffer.mark();
+
+            final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
+            ByteBuffer messageByteBuff = messageExtBatch.getEncodedBuff();
+            messageByteBuff.mark();
+
+            int sysFlag = messageExtBatch.getSysFlag();
+            int bornHostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
+            int storeHostLength = (sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
+            Supplier<String> msgIdSupplier = () -> {
+                int msgIdLen = storeHostLength + 8;
+                int batchCount = putMessageContext.getBatchSize();
+                long[] phyPosArray = putMessageContext.getPhyPos();
+                ByteBuffer msgIdBuffer = ByteBuffer.allocate(msgIdLen);
+                MessageExt.socketAddress2ByteBuffer(messageExtBatch.getStoreHost(), msgIdBuffer);
+                msgIdBuffer.clear();
+                StringBuilder buffer = new StringBuilder(batchCount * msgIdLen * 2 + batchCount - 1);
+                for (int i = 0; i < phyPosArray.length; i++) {
+                    msgIdBuffer.putLong(msgIdLen - 8, phyPosArray[i]);
+                    String msgId = UtilAll.bytes2string(msgIdBuffer.array());
+                    if (i != 0) {
+                        buffer.append(',');
+                    }
+                    buffer.append(msgId);
+                }
+                return buffer.toString();
+            };
+
+            if (messageExtBatch.getStoreSize() + END_FILE_MIN_BLANK_LENGTH > maxBlank) {
+                this.msgStoreItemMemory.clear();
+                this.msgStoreItemMemory.putInt(maxBlank);
+                this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
+                messageByteBuff.reset();
+                byteBuffer.reset();
+                byteBuffer.put(this.msgStoreItemMemory.array(), 0, 8);
+                return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgIdSupplier, messageExtBatch.getStoreTimestamp(),
+                        0, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
+            }
+
+            int index = 0;
+            while (messageByteBuff.hasRemaining()) {
+                final int msgPos = messageByteBuff.position();
+                final int msgLen = messageByteBuff.getInt();
+                totalMsgLen += msgLen;
+
+                messageByteBuff.position(msgPos + 12); // move to queueId
+                int queueId = messageByteBuff.getInt();
+                messageByteBuff.position(msgPos + 20); // move to topic_index(queueOffset)
+                int topicIndex = (int)messageByteBuff.getLong();
+                String topic = messageExtBatch.getTopics()[topicIndex];
+
+                // move to add queue offset and commitlog offset
+                int pos = msgPos + 20;
+                messageByteBuff.putLong(pos, getQueueOffset(topic, queueId)); // update queueOffset
+                pos += 8;
+                messageByteBuff.putLong(pos, wroteOffset + totalMsgLen - msgLen); // update commitLog offset
+                // 8 SYSFLAG, 9 BORNTIMESTAMP, 10 BORNHOST, 11 STORETIMESTAMP
+                // refresh store time stamp in lock
+                pos += 8 + 4 + 8 + bornHostLength;
+                messageByteBuff.putLong(pos, messageExtBatch.getStoreTimestamp()); // update store timestamp
+
+                putMessageContext.getPhyPos()[index++] = wroteOffset + totalMsgLen - msgLen;
+                msgNum++;
+                messageByteBuff.position(msgPos + msgLen);
+            }
+
+            messageByteBuff.position(0);
+            messageByteBuff.limit(totalMsgLen);
+            byteBuffer.put(messageByteBuff);
+            messageExtBatch.setEncodedBuff(null);
+            AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, totalMsgLen,
+                    msgIdSupplier, messageExtBatch.getStoreTimestamp(), 0,
+                    CommitLog.this.defaultMessageStore.now() - beginTimeMills);
+            result.setMsgNum(msgNum);
+            return result;
+        }
+
+        private long getQueueOffset(String topic, int queueId) {
+            keyBuilder.setLength(0);
+            keyBuilder.append(topic);
+            keyBuilder.append('-');
+            keyBuilder.append(queueId);
+            String key = keyBuilder.toString();
+            Long queueOffset = CommitLog.this.topicQueueTable.get(key);
+            if (null == queueOffset) {
+                queueOffset = 0L;
+            }
+
+            CommitLog.this.topicQueueTable.put(key, queueOffset + 1);
+            return queueOffset;
+        }
+
         private void resetByteBuffer(final ByteBuffer byteBuffer, final int limit) {
             byteBuffer.flip();
             byteBuffer.limit(limit);
@@ -1608,7 +1707,18 @@ public class CommitLog {
                 int propertiesPos = messagesByteBuff.position();
                 messagesByteBuff.position(propertiesPos + propertiesLen);
 
-                final byte[] topicData = messageExtBatch.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
+                byte[] topicData;
+                int queueId;
+                int index = 0;
+                if (messageExtBatch.isMultiTopic()) {
+                    // 7. index
+                    index = messagesByteBuff.getInt();
+                    topicData = messageExtBatch.getTopics()[index].getBytes(MessageDecoder.CHARSET_UTF8);
+                    queueId = messageExtBatch.getQueueIds()[index];
+                } else {
+                    topicData = messageExtBatch.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
+                    queueId = messageExtBatch.getQueueId();
+                }
 
                 final int topicLength = topicData.length;
 
@@ -1635,11 +1745,11 @@ public class CommitLog {
                 // 3 BODYCRC
                 this.encoderBuffer.putInt(bodyCrc);
                 // 4 QUEUEID
-                this.encoderBuffer.putInt(messageExtBatch.getQueueId());
+                this.encoderBuffer.putInt(queueId);
                 // 5 FLAG
                 this.encoderBuffer.putInt(flag);
                 // 6 QUEUEOFFSET
-                this.encoderBuffer.putLong(0);
+                this.encoderBuffer.putLong(index);
                 // 7 PHYSICALOFFSET
                 this.encoderBuffer.putLong(0);
                 // 8 SYSFLAG
@@ -1677,6 +1787,7 @@ public class CommitLog {
             putMessageContext.setBatchSize(batchSize);
             putMessageContext.setPhyPos(new long[batchSize]);
             encoderBuffer.flip();
+            messageExtBatch.setStoreSize(maxMessageSize);
             return encoderBuffer;
         }
 
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 7f4fcc8..3b788f0 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -375,9 +375,18 @@ public class DefaultMessageStore implements MessageStore {
     }
 
     private PutMessageStatus checkMessages(MessageExtBatch messageExtBatch) {
-        if (messageExtBatch.getTopic().length() > Byte.MAX_VALUE) {
-            log.warn("putMessage message topic length too long " + messageExtBatch.getTopic().length());
-            return PutMessageStatus.MESSAGE_ILLEGAL;
+        if (messageExtBatch.isMultiTopic()) {
+            for (String topic : messageExtBatch.getTopics()) {
+                if (topic.length() > Byte.MAX_VALUE) {
+                    log.warn("putMessage message topic length too long " + topic.length());
+                    return PutMessageStatus.MESSAGE_ILLEGAL;
+                }
+            }
+        } else {
+            if (messageExtBatch.getTopic().length() > Byte.MAX_VALUE) {
+                log.warn("putMessage message topic length too long " + messageExtBatch.getTopic().length());
+                return PutMessageStatus.MESSAGE_ILLEGAL;
+            }
         }
 
         if (messageExtBatch.getBody().length > messageStoreConfig.getMaxMessageSize()) {