You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by hu...@apache.org on 2021/12/01 10:37:55 UTC
[rocketmq] 04/11: 混合 topic 的 Batch 生产,一次batch可以发送到多个topic和queue,用于外挂式延迟消息服务
This is an automated email from the ASF dual-hosted git repository.
huangli pushed a commit to branch 4.9.2_dev_community
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit f9eb9207f1398fbb25275fa81cb6330761b46464
Author: coding <za...@qq.com>
AuthorDate: Fri Nov 5 11:49:11 2021 +0800
混合 topic 的 Batch 生产,一次batch可以发送到多个topic和queue,用于外挂式延迟消息服务
---
.reviewboardrc | 4 +
.../processor/AbstractSendMessageProcessor.java | 48 +++++++-
.../broker/processor/SendMessageProcessor.java | 32 +++++-
.../org/apache/rocketmq/client/Validators.java | 9 +-
.../rocketmq/client/common/ClientErrorCode.java | 1 +
.../client/impl/factory/MQClientInstance.java | 2 +
.../impl/producer/DefaultMQProducerImpl.java | 104 ++++++++++++++++-
.../client/impl/producer/TopicPublishInfo.java | 33 +++++-
.../client/producer/DefaultMQProducer.java | 16 ++-
.../rocketmq/client/producer/MQProducer.java | 4 +
.../java/org/apache/rocketmq/common/MixAll.java | 2 +
.../rocketmq/common/message/MessageBatch.java | 74 +++++++++++-
.../rocketmq/common/message/MessageDecoder.java | 24 +++-
.../rocketmq/common/message/MessageExtBatch.java | 30 +++++
.../protocol/header/SendMessageRequestHeader.java | 20 ++++
.../header/SendMessageRequestHeaderV2.java | 36 ++++++
.../apache/rocketmq/common/MessageBatchTest.java | 29 ++++-
.../rocketmq/common/MessageEncodeDecodeTest.java | 2 +-
.../java/org/apache/rocketmq/store/CommitLog.java | 125 +++++++++++++++++++--
.../apache/rocketmq/store/DefaultMessageStore.java | 15 ++-
20 files changed, 570 insertions(+), 40 deletions(-)
diff --git a/.reviewboardrc b/.reviewboardrc
new file mode 100644
index 0000000..16bd045
--- /dev/null
+++ b/.reviewboardrc
@@ -0,0 +1,4 @@
+REVIEWBOARD_URL = "http://rb.corp.kuaishou.com/reviewboard/"
+REPOSITORY = "git@git.corp.kuaishou.com:infra/rocketmq.git"
+BRANCH = "master"
+LAND_DEST_BRANCH = "master"
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
index 66480ad..85cb705 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
@@ -16,10 +16,8 @@
*/
package org.apache.rocketmq.broker.processor;
-import io.netty.channel.ChannelHandlerContext;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
@@ -27,7 +25,6 @@ import java.util.concurrent.ThreadLocalRandom;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.mqtrace.SendMessageContext;
import org.apache.rocketmq.broker.mqtrace.SendMessageHook;
-import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.TopicFilterType;
@@ -46,6 +43,7 @@ import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2;
import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.common.sysflag.TopicSysFlag;
+import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.common.utils.ChannelUtil;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
@@ -56,6 +54,8 @@ import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.MessageExtBrokerInner;
+import io.netty.channel.ChannelHandlerContext;
+
public abstract class AbstractSendMessageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
@@ -163,8 +163,50 @@ public abstract class AbstractSendMessageProcessor extends AsyncNettyRequestProc
return response;
}
+ protected RemotingCommand msgCheck(final ChannelHandlerContext ctx, RemotingCommand response, String[] topics, int[] queueIds) {
+ for (int i = 0; i < topics.length; i++) {
+ String topic = topics[i];
+ int queueId = queueIds[i];
+ if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())
+ && this.brokerController.getTopicConfigManager().isOrderTopic(topic)) {
+ response.setCode(ResponseCode.NO_PERMISSION);
+ response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending message is forbidden");
+ return response;
+ }
+
+ TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
+
+ if (null == topicConfig) {
+ response.setCode(ResponseCode.TOPIC_NOT_EXIST);
+ response.setRemark("topic[" + topic + "] not exist, apply first please!"
+ + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
+ return response;
+ }
+
+ int idValid = Math.max(topicConfig.getWriteQueueNums(), topicConfig.getReadQueueNums());
+ if (queueId >= idValid) {
+ String errorInfo = String.format("request queueId[%d] is illegal, %s Producer: %s",
+ queueId,
+ topicConfig,
+ RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+
+ log.warn(errorInfo);
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark(errorInfo);
+
+ return response;
+ }
+ }
+
+ return response;
+ }
+
protected RemotingCommand msgCheck(final ChannelHandlerContext ctx,
final SendMessageRequestHeader requestHeader, final RemotingCommand response) {
+ if (requestHeader.isMultiTopic()) {
+ return response;
+ }
+
if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())
&& this.brokerController.getTopicConfigManager().isOrderTopic(requestHeader.getTopic())) {
response.setCode(ResponseCode.NO_PERMISSION);
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index b31c71e..994d596 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -17,13 +17,13 @@
package org.apache.rocketmq.broker.processor;
import java.net.SocketAddress;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
-import io.netty.channel.ChannelHandlerContext;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook;
@@ -62,6 +62,8 @@ import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.config.StorePathConfigHelper;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
+import io.netty.channel.ChannelHandlerContext;
+
public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
private List<ConsumeMessageHook> consumeMessageHookList;
@@ -584,6 +586,27 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
return CompletableFuture.completedFuture(response);
}
+ String[] topics = null;
+ int[] queueIds = null;
+ if (requestHeader.isMultiTopic()) {
+ topics = requestHeader.getTopic().split(MixAll.BATCH_TOPIC_SPLITTER); // decode topics
+ if (requestHeader.getQueueIds() == null) {
+ queueIds = new int[topics.length];
+ for (int i = 0; i < topics.length; i++) {
+ TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topics[i]);
+ queueIds[i] = randomQueueId(topicConfig.getWriteQueueNums());
+ }
+ } else {
+ // decode queueIds
+ queueIds = Arrays.stream(requestHeader.getQueueIds().split(MixAll.BATCH_QUEUE_ID_SPLITTER)).mapToInt(Integer::parseInt).toArray();
+ }
+ msgCheck(ctx, response, topics, queueIds);
+ if (response.getCode() != -1) {
+ return CompletableFuture.completedFuture(response);
+ }
+ requestHeader.setTopic(topics[0]);
+ }
+
int queueIdInt = requestHeader.getQueueId();
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
@@ -600,6 +623,11 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
MessageExtBatch messageExtBatch = new MessageExtBatch();
messageExtBatch.setTopic(requestHeader.getTopic());
messageExtBatch.setQueueId(queueIdInt);
+ if (requestHeader.isMultiTopic()) {
+ messageExtBatch.setMultiTopic(true);
+ messageExtBatch.setTopics(topics);
+ messageExtBatch.setQueueIds(queueIds);
+ }
int sysFlag = requestHeader.getSysFlag();
if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) {
@@ -621,8 +649,6 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
return handlePutMessageResultFuture(putMessageResult, response, request, messageExtBatch, responseHeader, mqtraceContext, ctx, queueIdInt);
}
-
-
public boolean hasConsumeMessageHook() {
return consumeMessageHookList != null && !this.consumeMessageHookList.isEmpty();
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/Validators.java b/client/src/main/java/org/apache/rocketmq/client/Validators.java
index e712e2f..cf5f078 100644
--- a/client/src/main/java/org/apache/rocketmq/client/Validators.java
+++ b/client/src/main/java/org/apache/rocketmq/client/Validators.java
@@ -23,6 +23,7 @@ import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageBatch;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.topic.TopicValidator;
@@ -84,8 +85,12 @@ public class Validators {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null");
}
// topic
- Validators.checkTopic(msg.getTopic());
- Validators.isNotAllowedSendTopic(msg.getTopic());
+ if (msg instanceof MessageBatch && ((MessageBatch) msg).isMultiTopic()) {
+ // ignore check
+ } else {
+ Validators.checkTopic(msg.getTopic());
+ Validators.isNotAllowedSendTopic(msg.getTopic());
+ }
// body
if (null == msg.getBody()) {
diff --git a/client/src/main/java/org/apache/rocketmq/client/common/ClientErrorCode.java b/client/src/main/java/org/apache/rocketmq/client/common/ClientErrorCode.java
index bc03b14..8aa4856 100644
--- a/client/src/main/java/org/apache/rocketmq/client/common/ClientErrorCode.java
+++ b/client/src/main/java/org/apache/rocketmq/client/common/ClientErrorCode.java
@@ -25,4 +25,5 @@ public class ClientErrorCode {
public static final int NOT_FOUND_TOPIC_EXCEPTION = 10005;
public static final int REQUEST_TIMEOUT_EXCEPTION = 10006;
public static final int CREATE_REPLY_MESSAGE_EXCEPTION = 10007;
+ public static final int NOT_FOUND_MULTI_TOPIC_EXCEPTION = 10008;
}
\ No newline at end of file
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index 9651943..7db4ccf 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -168,6 +168,7 @@ public class MQClientInstance {
for (int i = 0; i < nums; i++) {
MessageQueue mq = new MessageQueue(topic, item[0], i);
info.getMessageQueueList().add(mq);
+ info.getBrokers().add(mq.getBrokerName());
}
}
@@ -196,6 +197,7 @@ public class MQClientInstance {
for (int i = 0; i < qd.getWriteQueueNums(); i++) {
MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
info.getMessageQueueList().add(mq);
+ info.getBrokers().add(mq.getBrokerName());
}
}
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index 2a784b5..117a8c6 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -16,12 +16,15 @@
*/
package org.apache.rocketmq.client.impl.producer;
+
import java.io.IOException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
@@ -36,9 +39,11 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.common.ClientErrorCode;
+import org.apache.rocketmq.client.common.ThreadLocalIndex;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.exception.RequestTimeoutException;
@@ -98,6 +103,7 @@ import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
public class DefaultMQProducerImpl implements MQProducerInner {
private final InternalLogger log = ClientLogger.getLog();
private final Random random = new Random();
+ private final ThreadLocalIndex sendWhichBroker = new ThreadLocalIndex();
private final DefaultMQProducer defaultMQProducer;
private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable =
new ConcurrentHashMap<String, TopicPublishInfo>();
@@ -547,6 +553,52 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
+ public MessageQueue selectOneMessageQueue(MessageBatch message, final Map<String, TopicPublishInfo> tpInfoMap, final String lastBrokerName)
+ throws MQClientException {
+ // select intersection brokerName first.
+ Set<String> sharedBrokers = null;
+ for (TopicPublishInfo tpi : tpInfoMap.values()) {
+ Set<String> brokers = tpi.getBrokers();
+ if (sharedBrokers == null) {
+ sharedBrokers = brokers;
+ } else {
+ sharedBrokers.retainAll(brokers);
+ }
+ }
+
+ if (sharedBrokers == null || sharedBrokers.isEmpty()) {
+ throw new MQClientException(ClientErrorCode.NOT_FOUND_MULTI_TOPIC_EXCEPTION, "multi topic batch route not found, sharedBroker empty");
+ }
+
+ List<String> brokers = new ArrayList<>(sharedBrokers);
+ int index = Math.abs(sendWhichBroker.incrementAndGet()) % brokers.size();
+ if (index < 0) index = 0;
+ String brokerName = brokers.get(index);
+ if (lastBrokerName != null && brokers.size() != 1 && brokerName.equals(lastBrokerName)) {
+ index++;
+ if (index == brokers.size()) {
+ index = 0;
+ }
+ brokerName = brokers.get(index);
+ }
+
+ Map<String, Integer> queueIdMap = new HashMap<>(tpInfoMap.size());
+ String firstTopic = null;
+ for (Map.Entry<String, TopicPublishInfo> entry : tpInfoMap.entrySet()) {
+ MessageQueue mq = entry.getValue().selectOneMessageQueueByBrokerName(brokerName);
+ if (mq == null) {
+ throw new MQClientException(ClientErrorCode.NOT_FOUND_MULTI_TOPIC_EXCEPTION, "multi topic batch route not found");
+ }
+ queueIdMap.put(entry.getKey(), mq.getQueueId());
+ if (firstTopic == null) {
+ firstTopic = entry.getKey();
+ }
+ }
+
+ message.setQueueIdMap(queueIdMap);
+ return new MessageQueue(firstTopic, brokerName, 0); // only brokerName matters.
+ }
+
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);
}
@@ -576,8 +628,19 @@ public class DefaultMQProducerImpl implements MQProducerInner {
long beginTimestampFirst = System.currentTimeMillis();
long beginTimestampPrev = beginTimestampFirst;
long endTimestamp = beginTimestampFirst;
- TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
- if (topicPublishInfo != null && topicPublishInfo.ok()) {
+
+ Map<String, TopicPublishInfo> topicPublishInfoMap = null;
+ TopicPublishInfo topicPublishInfo = null;
+ boolean multiTopic = false;
+ if (msg instanceof MessageBatch && ((MessageBatch) msg).isMultiTopic()) {
+ multiTopic = true;
+ MessageBatch messageBatch = (MessageBatch) msg;
+ topicPublishInfoMap = this.tryToFindTopicPublishInfo(messageBatch.getTopicIndexMap().keySet());
+ } else {
+ topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
+ }
+
+ if (multiTopic || (topicPublishInfo != null && topicPublishInfo.ok())) {
boolean callTimeout = false;
MessageQueue mq = null;
Exception exception = null;
@@ -587,7 +650,12 @@ public class DefaultMQProducerImpl implements MQProducerInner {
String[] brokersSent = new String[timesTotal];
for (; times < timesTotal; times++) {
String lastBrokerName = null == mq ? null : mq.getBrokerName();
- MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
+ MessageQueue mqSelected;
+ if (multiTopic) {
+ mqSelected = this.selectOneMessageQueue((MessageBatch) msg, topicPublishInfoMap, lastBrokerName);
+ } else {
+ mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
+ }
if (mqSelected != null) {
mq = mqSelected;
brokersSent[times] = mq.getBrokerName();
@@ -695,6 +763,19 @@ public class DefaultMQProducerImpl implements MQProducerInner {
null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
}
+ public Map<String, TopicPublishInfo> tryToFindTopicPublishInfo(final Set<String> topics) throws MQClientException {
+ Map<String, TopicPublishInfo> topicPublishInfoMap = new HashMap<>(topics.size());
+ for (String topic : topics) {
+ TopicPublishInfo topicPublishInfo = tryToFindTopicPublishInfo(topic);
+ if (topicPublishInfo == null || !topicPublishInfo.ok()) {
+ throw new MQClientException("No route info of this topic: " + topic + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
+ null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
+ }
+ topicPublishInfoMap.put(topic, topicPublishInfo);
+ }
+ return topicPublishInfoMap;
+ }
+
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
@@ -800,6 +881,23 @@ public class DefaultMQProducerImpl implements MQProducerInner {
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode());
requestHeader.setBatch(msg instanceof MessageBatch);
+
+ if (msg instanceof MessageBatch && ((MessageBatch) msg).isMultiTopic()) {
+ Map<String, Integer> queueIdMap = ((MessageBatch) msg).getQueueIdMap();
+ String topic = msg.getTopic();
+ StringBuilder sb = new StringBuilder();
+ int idx = 0;
+ for (String s : topic.split(MixAll.BATCH_TOPIC_SPLITTER)) {
+ if (idx != 0) {
+ sb.append(MixAll.BATCH_QUEUE_ID_SPLITTER);
+ }
+ sb.append(queueIdMap.get(s));
+ idx++;
+ }
+ requestHeader.setMultiTopic(true);
+ requestHeader.setQueueIds(sb.toString());
+ }
+
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
if (reconsumeTimes != null) {
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java
index 2f8337e..29c2b12 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java
@@ -17,7 +17,11 @@
package org.apache.rocketmq.client.impl.producer;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
import org.apache.rocketmq.client.common.ThreadLocalIndex;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.route.QueueData;
@@ -27,6 +31,7 @@ public class TopicPublishInfo {
private boolean orderTopic = false;
private boolean haveTopicRouterInfo = false;
private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
+ private Set<String> brokers = new HashSet<>();
private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
private TopicRouteData topicRouteData;
@@ -42,6 +47,14 @@ public class TopicPublishInfo {
return null != this.messageQueueList && !this.messageQueueList.isEmpty();
}
+ public Set<String> getBrokers() {
+ return brokers;
+ }
+
+ public void setBrokers(Set<String> brokers) {
+ this.brokers = brokers;
+ }
+
public List<MessageQueue> getMessageQueueList() {
return messageQueueList;
}
@@ -84,6 +97,24 @@ public class TopicPublishInfo {
}
}
+ public MessageQueue selectOneMessageQueueByBrokerName(final String brokerName) {
+ if (this.messageQueueList == null) {
+ return null;
+ }
+ List<MessageQueue> messageQueues = this.messageQueueList.stream()
+ .filter(mq -> mq.getBrokerName().equals(brokerName))
+ .collect(Collectors.toList());
+ if (messageQueues.isEmpty()) {
+ return null;
+ }
+
+ int index = this.sendWhichQueue.incrementAndGet();
+ int pos = Math.abs(index) % messageQueues.size();
+ if (pos < 0)
+ pos = 0;
+ return messageQueues.get(pos);
+ }
+
public MessageQueue selectOneMessageQueue() {
int index = this.sendWhichQueue.incrementAndGet();
int pos = Math.abs(index) % this.messageQueueList.size();
@@ -106,7 +137,7 @@ public class TopicPublishInfo {
@Override
public String toString() {
return "TopicPublishInfo [orderTopic=" + orderTopic + ", messageQueueList=" + messageQueueList
- + ", sendWhichQueue=" + sendWhichQueue + ", haveTopicRouterInfo=" + haveTopicRouterInfo + "]";
+ + ", sendWhichQueue=" + sendWhichQueue + ", haveTopicRouterInfo=" + haveTopicRouterInfo + ", brokers=" + brokers + "]";
}
public TopicRouteData getTopicRouteData() {
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
index 9f91b41..92e5a41 100644
--- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
@@ -22,6 +22,7 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
+
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.Validators;
@@ -910,6 +911,11 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
}
@Override
+ public SendResult sendMultiTopicBatch(Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+ return this.defaultMQProducerImpl.send(multiTopicBatch(msgs));
+ }
+
+ @Override
public SendResult send(Collection<Message> msgs,
long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQProducerImpl.send(batch(msgs), timeout);
@@ -980,9 +986,13 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
}
private MessageBatch batch(Collection<Message> msgs) throws MQClientException {
+ return batch(msgs, false);
+ }
+
+ private MessageBatch batch(Collection<Message> msgs, boolean allowMultiTopic) throws MQClientException {
MessageBatch msgBatch;
try {
- msgBatch = MessageBatch.generateFromList(msgs);
+ msgBatch = MessageBatch.generateFromList(msgs, allowMultiTopic);
for (Message message : msgBatch) {
Validators.checkMessage(message, this);
MessageClientIDSetter.setUniqID(message);
@@ -996,6 +1006,10 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
return msgBatch;
}
+ private MessageBatch multiTopicBatch(Collection<Message> msgs) throws MQClientException {
+ return batch(msgs, true);
+ }
+
public String getProducerGroup() {
return producerGroup;
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
index f70ddb2..9325a62 100644
--- a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
@@ -91,6 +91,10 @@ public interface MQProducer extends MQAdmin {
SendResult send(final Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException,
InterruptedException;
+ // for batch msgs with multi topics.
+ SendResult sendMultiTopicBatch(final Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException,
+ InterruptedException;
+
SendResult send(final Collection<Message> msgs, final long timeout) throws MQClientException,
RemotingException, MQBrokerException, InterruptedException;
diff --git a/common/src/main/java/org/apache/rocketmq/common/MixAll.java b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
index ec1e1f0..1ab67cb 100644
--- a/common/src/main/java/org/apache/rocketmq/common/MixAll.java
+++ b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
@@ -83,6 +83,8 @@ public class MixAll {
public static final String CID_SYS_RMQ_TRANS = "CID_RMQ_SYS_TRANS";
public static final String ACL_CONF_TOOLS_FILE = "/conf/tools.yml";
public static final String REPLY_MESSAGE_FLAG = "reply";
+ public static final String BATCH_TOPIC_SPLITTER = "%%";
+ public static final String BATCH_QUEUE_ID_SPLITTER = ",";
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
public static String getWSAddr() {
diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageBatch.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageBatch.java
index a6b801e..9d1e340 100644
--- a/common/src/main/java/org/apache/rocketmq/common/message/MessageBatch.java
+++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageBatch.java
@@ -18,8 +18,13 @@ package org.apache.rocketmq.common.message;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
import org.apache.rocketmq.common.MixAll;
public class MessageBatch extends Message implements Iterable<Message> {
@@ -27,23 +32,59 @@ public class MessageBatch extends Message implements Iterable<Message> {
private static final long serialVersionUID = 621335151046335557L;
private final List<Message> messages;
+ private boolean multiTopic = false;
+
+ private Map<String/*topic*/, Integer/*index*/> topicIndexMap;
+ private Map<String/*topic*/, Integer/*queueId*/> queueIdMap;
+
private MessageBatch(List<Message> messages) {
this.messages = messages;
}
+ private MessageBatch(List<Message> messages, boolean multiTopic) {
+ this.messages = messages;
+ this.multiTopic = multiTopic;
+ }
+
+ public boolean isMultiTopic() {
+ return multiTopic;
+ }
+
+ public void setTopicIndexMap(Map<String, Integer> topicIndexMap) {
+ this.topicIndexMap = topicIndexMap;
+ }
+
+ public Map<String, Integer> getTopicIndexMap() {
+ return topicIndexMap;
+ }
+
+ public Map<String, Integer> getQueueIdMap() {
+ return queueIdMap;
+ }
+
+ public void setQueueIdMap(Map<String, Integer> queueIdMap) {
+ this.queueIdMap = queueIdMap;
+ }
+
public byte[] encode() {
- return MessageDecoder.encodeMessages(messages);
+ if (multiTopic) {
+ return MessageDecoder.encodeMultiTopicMessages(messages, topicIndexMap);
+ } else {
+ return MessageDecoder.encodeMessages(messages);
+ }
}
public Iterator<Message> iterator() {
return messages.iterator();
}
- public static MessageBatch generateFromList(Collection<Message> messages) {
+ public static MessageBatch generateFromList(Collection<Message> messages, boolean allowMultiTopic) {
assert messages != null;
assert messages.size() > 0;
List<Message> messageList = new ArrayList<Message>(messages.size());
Message first = null;
+ boolean multiTopic = false;
+ Set<String> topics = new HashSet<>();
for (Message message : messages) {
if (message.getDelayTimeLevel() > 0) {
throw new UnsupportedOperationException("TimeDelayLevel is not supported for batching");
@@ -55,19 +96,40 @@ public class MessageBatch extends Message implements Iterable<Message> {
first = message;
} else {
if (!first.getTopic().equals(message.getTopic())) {
- throw new UnsupportedOperationException("The topic of the messages in one batch should be the same");
+ if (!allowMultiTopic) {
+ throw new UnsupportedOperationException("The topic of the messages in one batch should be the same");
+ }
+ multiTopic = true;
}
if (first.isWaitStoreMsgOK() != message.isWaitStoreMsgOK()) {
throw new UnsupportedOperationException("The waitStoreMsgOK of the messages in one batch should the same");
}
}
messageList.add(message);
+ topics.add(message.getTopic());
}
- MessageBatch messageBatch = new MessageBatch(messageList);
- messageBatch.setTopic(first.getTopic());
+ MessageBatch messageBatch = new MessageBatch(messageList, multiTopic);
+
+ if (multiTopic) {
+ Map<String, Integer> topicIndexMap = new HashMap<>(topics.size());
+ int index = 0;
+ StringBuilder sb = new StringBuilder();
+ for (String topic : topics) {
+ if (index != 0) {
+ sb.append(MixAll.BATCH_TOPIC_SPLITTER);
+ }
+ sb.append(topic);
+ topicIndexMap.put(topic, index);
+ index++;
+ }
+ messageBatch.setTopic(sb.toString());
+ messageBatch.setTopicIndexMap(topicIndexMap);
+ } else {
+ messageBatch.setTopic(first.getTopic());
+ }
messageBatch.setWaitStoreMsgOK(first.isWaitStoreMsgOK());
+
return messageBatch;
}
-
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
index c94700e..f6a9bf9 100644
--- a/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
+++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
@@ -27,6 +27,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
@@ -471,7 +472,7 @@ public class MessageDecoder {
return map;
}
- public static byte[] encodeMessage(Message message) {
+ public static byte[] encodeMessage(Message message, boolean multiTopic, Map<String, Integer> topicIndex) {
//only need flag, body, properties
byte[] body = message.getBody();
int bodyLen = body.length;
@@ -484,8 +485,9 @@ public class MessageDecoder {
+ 4 // 2 MAGICCOD
+ 4 // 3 BODYCRC
+ 4 // 4 FLAG
- + 4 + bodyLen // 4 BODY
- + 2 + propertiesLength;
+ + 4 + bodyLen // 5 BODY
+ + 2 + propertiesLength // 6 PROPERTY
+ + (multiTopic ? 4 : 0); // 7 TOPIC_INDEX
ByteBuffer byteBuffer = ByteBuffer.allocate(storeSize);
// 1 TOTALSIZE
byteBuffer.putInt(storeSize);
@@ -508,6 +510,10 @@ public class MessageDecoder {
byteBuffer.putShort(propertiesLength);
byteBuffer.put(propertiesBytes);
+ if (multiTopic) {
+ // 7. topic_index.
+ byteBuffer.putInt(topicIndex.get(message.getTopic()));
+ }
return byteBuffer.array();
}
@@ -542,12 +548,16 @@ public class MessageDecoder {
return message;
}
- public static byte[] encodeMessages(List<Message> messages) {
+ public static byte[] encodeMultiTopicMessages(List<Message> messages, Map<String, Integer> map) {
+ return doEncodeMessages(messages, true, map);
+ }
+
+ private static byte[] doEncodeMessages(List<Message> messages, boolean multiTopic, Map<String, Integer> topicIndexMap) {
//TO DO refactor, accumulate in one buffer, avoid copies
List<byte[]> encodedMessages = new ArrayList<byte[]>(messages.size());
int allSize = 0;
for (Message message : messages) {
- byte[] tmp = encodeMessage(message);
+ byte[] tmp = encodeMessage(message, multiTopic, topicIndexMap);
encodedMessages.add(tmp);
allSize += tmp.length;
}
@@ -560,6 +570,10 @@ public class MessageDecoder {
return allBytes;
}
+ public static byte[] encodeMessages(List<Message> messages) {
+ return doEncodeMessages(messages, false, null);
+ }
+
public static List<Message> decodeMessages(ByteBuffer byteBuffer) throws Exception {
//TO DO add a callback for processing, avoid creating lists
List<Message> msgs = new ArrayList<Message>();
diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBatch.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBatch.java
index a2713cb..21fa505 100644
--- a/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBatch.java
+++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBatch.java
@@ -30,6 +30,12 @@ public class MessageExtBatch extends MessageExt {
private ByteBuffer encodedBuff;
+ private boolean multiTopic;
+
+ private String[] topics;
+
+ private int[] queueIds;
+
public ByteBuffer getEncodedBuff() {
return encodedBuff;
}
@@ -37,4 +43,28 @@ public class MessageExtBatch extends MessageExt {
public void setEncodedBuff(ByteBuffer encodedBuff) {
this.encodedBuff = encodedBuff;
}
+
+ public boolean isMultiTopic() {
+ return multiTopic;
+ }
+
+ public void setMultiTopic(boolean multiTopic) {
+ this.multiTopic = multiTopic;
+ }
+
+ public String[] getTopics() {
+ return topics;
+ }
+
+ public void setTopics(String[] topics) {
+ this.topics = topics;
+ }
+
+ public int[] getQueueIds() {
+ return queueIds;
+ }
+
+ public void setQueueIds(int[] queueIds) {
+ this.queueIds = queueIds;
+ }
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java
index 2df31e6..fc828d9 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java
@@ -50,6 +50,10 @@ public class SendMessageRequestHeader implements CommandCustomHeader {
private boolean unitMode = false;
@CFNullable
private boolean batch = false;
+ @CFNullable
+ private boolean multiTopic = false;
+ @CFNullable
+ private String queueIds;
private Integer maxReconsumeTimes;
@Override
@@ -159,4 +163,20 @@ public class SendMessageRequestHeader implements CommandCustomHeader {
public void setBatch(boolean batch) {
this.batch = batch;
}
+
+ public boolean isMultiTopic() {
+ return multiTopic;
+ }
+
+ public void setMultiTopic(boolean multiTopic) {
+ this.multiTopic = multiTopic;
+ }
+
+ public String getQueueIds() {
+ return queueIds;
+ }
+
+ public void setQueueIds(String queueIds) {
+ this.queueIds = queueIds;
+ }
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java
index 498a7fa..de26947 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java
@@ -57,6 +57,12 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader, FastCode
@CFNullable
private boolean m; //batch
+ @CFNullable
+ private boolean n; //multi topic
+
+ @CFNullable
+ private String o; // queueIds
+
public static SendMessageRequestHeader createSendMessageRequestHeaderV1(final SendMessageRequestHeaderV2 v2) {
SendMessageRequestHeader v1 = new SendMessageRequestHeader();
v1.setProducerGroup(v2.a);
@@ -72,6 +78,8 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader, FastCode
v1.setUnitMode(v2.k);
v1.setMaxReconsumeTimes(v2.l);
v1.setBatch(v2.m);
+ v1.setMultiTopic(v2.n);
+ v1.setQueueIds(v2.o);
return v1;
}
@@ -90,6 +98,8 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader, FastCode
v2.k = v1.isUnitMode();
v2.l = v1.getMaxReconsumeTimes();
v2.m = v1.isBatch();
+ v2.n = v1.isMultiTopic();
+ v2.o = v1.getQueueIds();
return v2;
}
@@ -156,6 +166,16 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader, FastCode
if (str != null) {
m = Boolean.parseBoolean(str);
}
+
+ str = fields.get("n");
+ if (str != null) {
+ n = Boolean.parseBoolean(str);
+ }
+
+ str = fields.get("o");
+ if (str != null) {
+ o = str;
+ }
}
public String getA() {
@@ -261,4 +281,20 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader, FastCode
public void setM(boolean m) {
this.m = m;
}
+
+ public boolean isN() {
+ return n;
+ }
+
+ public void setN(boolean n) {
+ this.n = n;
+ }
+
+ public String getO() {
+ return o;
+ }
+
+ public void setO(String o) {
+ this.o = o;
+ }
}
\ No newline at end of file
diff --git a/common/src/test/java/org/apache/rocketmq/common/MessageBatchTest.java b/common/src/test/java/org/apache/rocketmq/common/MessageBatchTest.java
index f264420..363f576 100644
--- a/common/src/test/java/org/apache/rocketmq/common/MessageBatchTest.java
+++ b/common/src/test/java/org/apache/rocketmq/common/MessageBatchTest.java
@@ -18,9 +18,11 @@
package org.apache.rocketmq.common;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageBatch;
+import org.junit.Assert;
import org.junit.Test;
public class MessageBatchTest {
@@ -38,34 +40,51 @@ public class MessageBatchTest {
@Test
public void testGenerate_OK() throws Exception {
List<Message> messages = generateMessages();
- MessageBatch.generateFromList(messages);
+ MessageBatch.generateFromList(messages, false);
}
@Test(expected = UnsupportedOperationException.class)
public void testGenerate_DiffTopic() throws Exception {
List<Message> messages = generateMessages();
messages.get(1).setTopic("topic2");
- MessageBatch.generateFromList(messages);
+ MessageBatch.generateFromList(messages, false);
}
@Test(expected = UnsupportedOperationException.class)
public void testGenerate_DiffWaitOK() throws Exception {
List<Message> messages = generateMessages();
messages.get(1).setWaitStoreMsgOK(false);
- MessageBatch.generateFromList(messages);
+ MessageBatch.generateFromList(messages, false);
}
@Test(expected = UnsupportedOperationException.class)
public void testGenerate_Delay() throws Exception {
List<Message> messages = generateMessages();
messages.get(1).setDelayTimeLevel(1);
- MessageBatch.generateFromList(messages);
+ MessageBatch.generateFromList(messages, false);
}
@Test(expected = UnsupportedOperationException.class)
public void testGenerate_Retry() throws Exception {
List<Message> messages = generateMessages();
messages.get(1).setTopic(MixAll.RETRY_GROUP_TOPIC_PREFIX + "topic");
- MessageBatch.generateFromList(messages);
+ MessageBatch.generateFromList(messages, false);
+ }
+
+ @Test
+ public void testGenerate_MultiTopic() {
+ List<Message> messages = Arrays.asList(
+ new Message("topicA", "bodyA1".getBytes()),
+ new Message("topicB", "bodyB1".getBytes()),
+ new Message("topicA", "bodyA2".getBytes()),
+ new Message("topicB", "bodyB2".getBytes())
+ );
+
+ MessageBatch messageBatch = MessageBatch.generateFromList(messages, true);
+ Assert.assertEquals(messageBatch.getTopic(), "topicA%%topicB");
+ String[] topics = messageBatch.getTopic().split(MixAll.BATCH_TOPIC_SPLITTER);
+ for (int i = 0; i < topics.length; i++) {
+ Assert.assertEquals((int)messageBatch.getTopicIndexMap().get(topics[i]), i);
+ }
}
}
diff --git a/common/src/test/java/org/apache/rocketmq/common/MessageEncodeDecodeTest.java b/common/src/test/java/org/apache/rocketmq/common/MessageEncodeDecodeTest.java
index 42d3909..fe7a939 100644
--- a/common/src/test/java/org/apache/rocketmq/common/MessageEncodeDecodeTest.java
+++ b/common/src/test/java/org/apache/rocketmq/common/MessageEncodeDecodeTest.java
@@ -33,7 +33,7 @@ public class MessageEncodeDecodeTest {
Message message = new Message("topic", "body".getBytes());
message.setFlag(12);
message.putUserProperty("key", "value");
- byte[] bytes = MessageDecoder.encodeMessage(message);
+ byte[] bytes = MessageDecoder.encodeMessage(message, false, null);
ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
buffer.put(bytes);
buffer.flip();
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index 36db2f5..49ad725 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -1259,16 +1259,13 @@ public class CommitLog {
class DefaultAppendMessageCallback implements AppendMessageCallback {
// File at the end of the minimum fixed length empty
private static final int END_FILE_MIN_BLANK_LENGTH = 4 + 4;
- private final ByteBuffer msgIdMemory;
- private final ByteBuffer msgIdV6Memory;
// Store the message content
private final ByteBuffer msgStoreItemMemory;
// The maximum length of the message
private final int maxMessageSize;
+ private final StringBuilder keyBuilder = new StringBuilder();
DefaultAppendMessageCallback(final int size) {
- this.msgIdMemory = ByteBuffer.allocate(4 + 4 + 8);
- this.msgIdV6Memory = ByteBuffer.allocate(16 + 4 + 8);
this.msgStoreItemMemory = ByteBuffer.allocate(END_FILE_MIN_BLANK_LENGTH);
this.maxMessageSize = size;
}
@@ -1370,6 +1367,9 @@ public class CommitLog {
public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
final MessageExtBatch messageExtBatch, PutMessageContext putMessageContext) {
+ if (messageExtBatch.isMultiTopic()) {
+ return doAppendMultiTopic(fileFromOffset, byteBuffer, maxBlank, messageExtBatch, putMessageContext);
+ }
byteBuffer.mark();
//physical offset
long wroteOffset = fileFromOffset + byteBuffer.position();
@@ -1468,6 +1468,105 @@ public class CommitLog {
return result;
}
+ private AppendMessageResult doAppendMultiTopic(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
+ final MessageExtBatch messageExtBatch, PutMessageContext putMessageContext) {
+ //physical offset
+ long wroteOffset = fileFromOffset + byteBuffer.position();
+
+ int totalMsgLen = 0;
+ int msgNum = 0;
+ byteBuffer.mark();
+
+ final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
+ ByteBuffer messageByteBuff = messageExtBatch.getEncodedBuff();
+ messageByteBuff.mark();
+
+ int sysFlag = messageExtBatch.getSysFlag();
+ int bornHostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
+ int storeHostLength = (sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
+ Supplier<String> msgIdSupplier = () -> {
+ int msgIdLen = storeHostLength + 8;
+ int batchCount = putMessageContext.getBatchSize();
+ long[] phyPosArray = putMessageContext.getPhyPos();
+ ByteBuffer msgIdBuffer = ByteBuffer.allocate(msgIdLen);
+ MessageExt.socketAddress2ByteBuffer(messageExtBatch.getStoreHost(), msgIdBuffer);
+ msgIdBuffer.clear();
+ StringBuilder buffer = new StringBuilder(batchCount * msgIdLen * 2 + batchCount - 1);
+ for (int i = 0; i < phyPosArray.length; i++) {
+ msgIdBuffer.putLong(msgIdLen - 8, phyPosArray[i]);
+ String msgId = UtilAll.bytes2string(msgIdBuffer.array());
+ if (i != 0) {
+ buffer.append(',');
+ }
+ buffer.append(msgId);
+ }
+ return buffer.toString();
+ };
+
+ if (messageExtBatch.getStoreSize() + END_FILE_MIN_BLANK_LENGTH > maxBlank) {
+ this.msgStoreItemMemory.clear();
+ this.msgStoreItemMemory.putInt(maxBlank);
+ this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
+ messageByteBuff.reset();
+ byteBuffer.reset();
+ byteBuffer.put(this.msgStoreItemMemory.array(), 0, 8);
+ return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgIdSupplier, messageExtBatch.getStoreTimestamp(),
+ 0, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
+ }
+
+ int index = 0;
+ while (messageByteBuff.hasRemaining()) {
+ final int msgPos = messageByteBuff.position();
+ final int msgLen = messageByteBuff.getInt();
+ totalMsgLen += msgLen;
+
+ messageByteBuff.position(msgPos + 12); // move to queueId
+ int queueId = messageByteBuff.getInt();
+ messageByteBuff.position(msgPos + 20); // move to topic_index(queueOffset)
+ int topicIndex = (int)messageByteBuff.getLong();
+ String topic = messageExtBatch.getTopics()[topicIndex];
+
+ // move to add queue offset and commitlog offset
+ int pos = msgPos + 20;
+ messageByteBuff.putLong(pos, getQueueOffset(topic, queueId)); // update queueOffset
+ pos += 8;
+ messageByteBuff.putLong(pos, wroteOffset + totalMsgLen - msgLen); // update commitLog offset
+ // 8 SYSFLAG, 9 BORNTIMESTAMP, 10 BORNHOST, 11 STORETIMESTAMP
+ // refresh store time stamp in lock
+ pos += 8 + 4 + 8 + bornHostLength;
+ messageByteBuff.putLong(pos, messageExtBatch.getStoreTimestamp()); // update store timestamp
+
+ putMessageContext.getPhyPos()[index++] = wroteOffset + totalMsgLen - msgLen;
+ msgNum++;
+ messageByteBuff.position(msgPos + msgLen);
+ }
+
+ messageByteBuff.position(0);
+ messageByteBuff.limit(totalMsgLen);
+ byteBuffer.put(messageByteBuff);
+ messageExtBatch.setEncodedBuff(null);
+ AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, totalMsgLen,
+ msgIdSupplier, messageExtBatch.getStoreTimestamp(), 0,
+ CommitLog.this.defaultMessageStore.now() - beginTimeMills);
+ result.setMsgNum(msgNum);
+ return result;
+ }
+
+ private long getQueueOffset(String topic, int queueId) {
+ keyBuilder.setLength(0);
+ keyBuilder.append(topic);
+ keyBuilder.append('-');
+ keyBuilder.append(queueId);
+ String key = keyBuilder.toString();
+ Long queueOffset = CommitLog.this.topicQueueTable.get(key);
+ if (null == queueOffset) {
+ queueOffset = 0L;
+ }
+
+ CommitLog.this.topicQueueTable.put(key, queueOffset + 1);
+ return queueOffset;
+ }
+
private void resetByteBuffer(final ByteBuffer byteBuffer, final int limit) {
byteBuffer.flip();
byteBuffer.limit(limit);
@@ -1608,7 +1707,18 @@ public class CommitLog {
int propertiesPos = messagesByteBuff.position();
messagesByteBuff.position(propertiesPos + propertiesLen);
- final byte[] topicData = messageExtBatch.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
+ byte[] topicData;
+ int queueId;
+ int index = 0;
+ if (messageExtBatch.isMultiTopic()) {
+ // 7. index
+ index = messagesByteBuff.getInt();
+ topicData = messageExtBatch.getTopics()[index].getBytes(MessageDecoder.CHARSET_UTF8);
+ queueId = messageExtBatch.getQueueIds()[index];
+ } else {
+ topicData = messageExtBatch.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
+ queueId = messageExtBatch.getQueueId();
+ }
final int topicLength = topicData.length;
@@ -1635,11 +1745,11 @@ public class CommitLog {
// 3 BODYCRC
this.encoderBuffer.putInt(bodyCrc);
// 4 QUEUEID
- this.encoderBuffer.putInt(messageExtBatch.getQueueId());
+ this.encoderBuffer.putInt(queueId);
// 5 FLAG
this.encoderBuffer.putInt(flag);
// 6 QUEUEOFFSET
- this.encoderBuffer.putLong(0);
+ this.encoderBuffer.putLong(index);
// 7 PHYSICALOFFSET
this.encoderBuffer.putLong(0);
// 8 SYSFLAG
@@ -1677,6 +1787,7 @@ public class CommitLog {
putMessageContext.setBatchSize(batchSize);
putMessageContext.setPhyPos(new long[batchSize]);
encoderBuffer.flip();
+ messageExtBatch.setStoreSize(maxMessageSize);
return encoderBuffer;
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 7f4fcc8..3b788f0 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -375,9 +375,18 @@ public class DefaultMessageStore implements MessageStore {
}
private PutMessageStatus checkMessages(MessageExtBatch messageExtBatch) {
- if (messageExtBatch.getTopic().length() > Byte.MAX_VALUE) {
- log.warn("putMessage message topic length too long " + messageExtBatch.getTopic().length());
- return PutMessageStatus.MESSAGE_ILLEGAL;
+ if (messageExtBatch.isMultiTopic()) {
+ for (String topic : messageExtBatch.getTopics()) {
+ if (topic.length() > Byte.MAX_VALUE) {
+ log.warn("putMessage message topic length too long " + topic.length());
+ return PutMessageStatus.MESSAGE_ILLEGAL;
+ }
+ }
+ } else {
+ if (messageExtBatch.getTopic().length() > Byte.MAX_VALUE) {
+ log.warn("putMessage message topic length too long " + messageExtBatch.getTopic().length());
+ return PutMessageStatus.MESSAGE_ILLEGAL;
+ }
}
if (messageExtBatch.getBody().length > messageStoreConfig.getMaxMessageSize()) {