You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2017/06/06 03:38:22 UTC
[02/51] [abbrv] incubator-rocketmq git commit: ROCKETMQ-80 Add batch
feature closes apache/incubator-rocketmq#53
ROCKETMQ-80 Add batch feature closes apache/incubator-rocketmq#53
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/47fad3c1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/47fad3c1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/47fad3c1
Branch: refs/heads/master
Commit: 47fad3c17ab2d161743d4e52efc6258b7bcafde9
Parents: 0d6c56b
Author: dongeforever <do...@apache.org>
Authored: Fri Mar 17 18:59:43 2017 +0800
Committer: dongeforever <zh...@yeah.net>
Committed: Tue Jun 6 11:37:29 2017 +0800
----------------------------------------------------------------------
.../rocketmq/broker/BrokerController.java | 2 +
.../processor/AbstractSendMessageProcessor.java | 12 +-
.../broker/processor/SendMessageProcessor.java | 350 ++++++++++++-------
.../org/apache/rocketmq/client/Validators.java | 1 +
.../rocketmq/client/impl/MQClientAPIImpl.java | 56 +--
.../impl/producer/DefaultMQProducerImpl.java | 30 +-
.../client/producer/DefaultMQProducer.java | 38 ++
.../rocketmq/client/producer/MQProducer.java | 14 +
.../apache/rocketmq/common/TopicFilterType.java | 1 +
.../rocketmq/common/message/MessageBatch.java | 73 ++++
.../rocketmq/common/message/MessageDecoder.java | 103 ++++++
.../rocketmq/common/message/MessageExt.java | 2 +-
.../common/message/MessageExtBatch.java | 42 +++
.../rocketmq/common/protocol/RequestCode.java | 3 +
.../header/SendMessageRequestHeader.java | 10 +
.../header/SendMessageRequestHeaderV2.java | 14 +
.../rocketmq/common/MessageBatchTest.java | 70 ++++
.../common/MessageEncodeDecodeTest.java | 81 +++++
.../rocketmq/store/AppendMessageCallback.java | 15 +-
.../rocketmq/store/AppendMessageResult.java | 11 +
.../org/apache/rocketmq/store/CommitLog.java | 344 ++++++++++++++++--
.../org/apache/rocketmq/store/ConsumeQueue.java | 2 +-
.../rocketmq/store/DefaultMessageStore.java | 57 +++
.../org/apache/rocketmq/store/MappedFile.java | 32 +-
.../org/apache/rocketmq/store/MessageStore.java | 3 +
.../org/apache/rocketmq/store/RunningFlags.java | 11 +
.../store/config/MessageStoreConfig.java | 2 +
.../store/stats/BrokerStatsManager.java | 8 +-
.../rocketmq/store/AppendCallbackTest.java | 150 ++++++++
.../test/client/producer/batch/BatchSendIT.java | 131 +++++++
.../exception/msg/MessageExceptionIT.java | 2 +-
31 files changed, 1464 insertions(+), 206 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/47fad3c1/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index b656870..7e9e7ac 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -374,9 +374,11 @@ public class BrokerController {
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
+ this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
+ this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
/**
* PullMessageProcessor
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/47fad3c1/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
index 9f23bad..3faa7ae 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
@@ -17,11 +17,6 @@
package org.apache.rocketmq.broker.processor;
import io.netty.channel.ChannelHandlerContext;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.mqtrace.SendMessageContext;
import org.apache.rocketmq.broker.mqtrace.SendMessageHook;
@@ -51,6 +46,12 @@ import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
public abstract class AbstractSendMessageProcessor implements NettyRequestProcessor {
protected static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
@@ -279,6 +280,7 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces
SendMessageRequestHeaderV2 requestHeaderV2 = null;
SendMessageRequestHeader requestHeader = null;
switch (request.getCode()) {
+ case RequestCode.SEND_BATCH_MESSAGE:
case RequestCode.SEND_MESSAGE_V2:
requestHeaderV2 =
(SendMessageRequestHeaderV2) request
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/47fad3c1/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index a440462..56a0b99 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -34,6 +34,7 @@ import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageExtBatch;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.ConsumerSendMsgBackRequestHeader;
@@ -72,7 +73,13 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
mqtraceContext = buildMsgContext(ctx, requestHeader);
this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
- final RemotingCommand response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);
+
+ RemotingCommand response;
+ if (requestHeader.isBatch()) {
+ response = this.sendBatchMessage(ctx, request, mqtraceContext, requestHeader);
+ } else {
+ response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);
+ }
this.executeSendMessageHookAfter(response, mqtraceContext);
return response;
@@ -238,6 +245,50 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
return response;
}
+
+ private boolean handleRetryAndDLQ(SendMessageRequestHeader requestHeader, RemotingCommand response, RemotingCommand request,
+ MessageExt msg, TopicConfig topicConfig) {
+ String newTopic = requestHeader.getTopic();
+ if (null != newTopic && newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
+ String groupName = newTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
+ SubscriptionGroupConfig subscriptionGroupConfig =
+ this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupName);
+ if (null == subscriptionGroupConfig) {
+ response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
+ response.setRemark(
+ "subscription group not exist, " + groupName + " " + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
+ return false;
+ }
+
+ int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
+ if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
+ maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
+ }
+ int reconsumeTimes = requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes();
+ if (reconsumeTimes >= maxReconsumeTimes) {
+ newTopic = MixAll.getDLQTopic(groupName);
+ int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
+ topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, //
+ DLQ_NUMS_PER_GROUP, //
+ PermName.PERM_WRITE, 0
+ );
+ msg.setTopic(newTopic);
+ msg.setQueueId(queueIdInt);
+ if (null == topicConfig) {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("topic[" + newTopic + "] not exist");
+ return false;
+ }
+ }
+ }
+ int sysFlag = requestHeader.getSysFlag();
+ if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) {
+ sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG;
+ }
+ msg.setSysFlag(sysFlag);
+ return true;
+ }
+
private RemotingCommand sendMessage(final ChannelHandlerContext ctx, //
final RemotingCommand request, //
final SendMessageContext sendMessageContext, //
@@ -251,9 +302,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());
response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));
- if (log.isDebugEnabled()) {
- log.debug("receive SendMessage request command, {}", request);
- }
+ log.debug("receive SendMessage request command, {}", request);
final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();
if (this.brokerController.getMessageStore().now() < startTimstamp) {
@@ -270,6 +319,8 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
final byte[] body = request.getBody();
+
+
int queueIdInt = requestHeader.getQueueId();
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
@@ -277,53 +328,18 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();
}
- int sysFlag = requestHeader.getSysFlag();
+ MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
+ msgInner.setTopic(requestHeader.getTopic());
+ msgInner.setQueueId(queueIdInt);
- if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) {
- sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG;
+ if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {
+ return response;
}
- String newTopic = requestHeader.getTopic();
- if (null != newTopic && newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
- String groupName = newTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
- SubscriptionGroupConfig subscriptionGroupConfig =
- this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupName);
- if (null == subscriptionGroupConfig) {
- response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
- response.setRemark(
- "subscription group not exist, " + groupName + " " + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
- return response;
- }
-
- int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
- if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
- maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
- }
- int reconsumeTimes = requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes();
- if (reconsumeTimes >= maxReconsumeTimes) {
- newTopic = MixAll.getDLQTopic(groupName);
- queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
- topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, //
- DLQ_NUMS_PER_GROUP, //
- PermName.PERM_WRITE, 0
- );
- if (null == topicConfig) {
- response.setCode(ResponseCode.SYSTEM_ERROR);
- response.setRemark("topic[" + newTopic + "] not exist");
- return response;
- }
- }
- }
- MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
- msgInner.setTopic(newTopic);
msgInner.setBody(body);
msgInner.setFlag(requestHeader.getFlag());
MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
msgInner.setPropertiesString(requestHeader.getProperties());
- msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(topicConfig.getTopicFilterType(), msgInner.getTags()));
-
- msgInner.setQueueId(queueIdInt);
- msgInner.setSysFlag(sysFlag);
msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
msgInner.setBornHost(ctx.channel().remoteAddress());
msgInner.setStoreHost(this.getStoreHost());
@@ -340,105 +356,183 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
}
PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
- if (putMessageResult != null) {
- boolean sendOK = false;
- switch (putMessageResult.getPutMessageStatus()) {
- // Success
- case PUT_OK:
- sendOK = true;
- response.setCode(ResponseCode.SUCCESS);
- break;
- case FLUSH_DISK_TIMEOUT:
- response.setCode(ResponseCode.FLUSH_DISK_TIMEOUT);
- sendOK = true;
- break;
- case FLUSH_SLAVE_TIMEOUT:
- response.setCode(ResponseCode.FLUSH_SLAVE_TIMEOUT);
- sendOK = true;
- break;
- case SLAVE_NOT_AVAILABLE:
- response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
- sendOK = true;
- break;
+ return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt);
- // Failed
- case CREATE_MAPEDFILE_FAILED:
- response.setCode(ResponseCode.SYSTEM_ERROR);
- response.setRemark("create mapped file failed, server is busy or broken.");
- break;
- case MESSAGE_ILLEGAL:
- case PROPERTIES_SIZE_EXCEEDED:
- response.setCode(ResponseCode.MESSAGE_ILLEGAL);
- response.setRemark(
- "the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k.");
- break;
- case SERVICE_NOT_AVAILABLE:
- response.setCode(ResponseCode.SERVICE_NOT_AVAILABLE);
- response.setRemark(
- "service not available now, maybe disk full, " + diskUtil() + ", maybe your broker machine memory too small.");
- break;
- case OS_PAGECACHE_BUSY:
- response.setCode(ResponseCode.SYSTEM_ERROR);
- response.setRemark("[PC_SYNCHRONIZED]broker busy, start flow control for a while");
- break;
- case UNKNOWN_ERROR:
- response.setCode(ResponseCode.SYSTEM_ERROR);
- response.setRemark("UNKNOWN_ERROR");
- break;
- default:
- response.setCode(ResponseCode.SYSTEM_ERROR);
- response.setRemark("UNKNOWN_ERROR DEFAULT");
- break;
- }
+ }
- String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);
- if (sendOK) {
- this.brokerController.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic());
- this.brokerController.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(),
- putMessageResult.getAppendMessageResult().getWroteBytes());
- this.brokerController.getBrokerStatsManager().incBrokerPutNums();
+ private RemotingCommand handlePutMessageResult(PutMessageResult putMessageResult, RemotingCommand response, RemotingCommand request, MessageExt msg,
+ SendMessageResponseHeader responseHeader, SendMessageContext sendMessageContext, ChannelHandlerContext ctx, int queueIdInt) {
+ if (putMessageResult == null) {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("store putMessage return null");
+ return response;
+ }
+ boolean sendOK = false;
+
+ switch (putMessageResult.getPutMessageStatus()) {
+ // Success
+ case PUT_OK:
+ sendOK = true;
+ response.setCode(ResponseCode.SUCCESS);
+ break;
+ case FLUSH_DISK_TIMEOUT:
+ response.setCode(ResponseCode.FLUSH_DISK_TIMEOUT);
+ sendOK = true;
+ break;
+ case FLUSH_SLAVE_TIMEOUT:
+ response.setCode(ResponseCode.FLUSH_SLAVE_TIMEOUT);
+ sendOK = true;
+ break;
+ case SLAVE_NOT_AVAILABLE:
+ response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
+ sendOK = true;
+ break;
+
+ // Failed
+ case CREATE_MAPEDFILE_FAILED:
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("create mapped file failed, server is busy or broken.");
+ break;
+ case MESSAGE_ILLEGAL:
+ case PROPERTIES_SIZE_EXCEEDED:
+ response.setCode(ResponseCode.MESSAGE_ILLEGAL);
+ response.setRemark(
+ "the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k.");
+ break;
+ case SERVICE_NOT_AVAILABLE:
+ response.setCode(ResponseCode.SERVICE_NOT_AVAILABLE);
+ response.setRemark(
+ "service not available now, maybe disk full, " + diskUtil() + ", maybe your broker machine memory too small.");
+ break;
+ case OS_PAGECACHE_BUSY:
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("[PC_SYNCHRONIZED]broker busy, start flow control for a while");
+ break;
+ case UNKNOWN_ERROR:
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("UNKNOWN_ERROR");
+ break;
+ default:
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("UNKNOWN_ERROR DEFAULT");
+ break;
+ }
+
+ String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);
+ if (sendOK) {
- response.setRemark(null);
+ this.brokerController.getBrokerStatsManager().incTopicPutNums(msg.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1);
+ this.brokerController.getBrokerStatsManager().incTopicPutSize(msg.getTopic(),
+ putMessageResult.getAppendMessageResult().getWroteBytes());
+ this.brokerController.getBrokerStatsManager().incBrokerPutNums(putMessageResult.getAppendMessageResult().getMsgNum());
- responseHeader.setMsgId(putMessageResult.getAppendMessageResult().getMsgId());
- responseHeader.setQueueId(queueIdInt);
- responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset());
+ response.setRemark(null);
- doResponse(ctx, request, response);
+ responseHeader.setMsgId(putMessageResult.getAppendMessageResult().getMsgId());
+ responseHeader.setQueueId(queueIdInt);
+ responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset());
- if (hasSendMessageHook()) {
- sendMessageContext.setMsgId(responseHeader.getMsgId());
- sendMessageContext.setQueueId(responseHeader.getQueueId());
- sendMessageContext.setQueueOffset(responseHeader.getQueueOffset());
+ doResponse(ctx, request, response);
- int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount();
- int wroteSize = putMessageResult.getAppendMessageResult().getWroteBytes();
- int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT) * commercialBaseCount;
+ if (hasSendMessageHook()) {
+ sendMessageContext.setMsgId(responseHeader.getMsgId());
+ sendMessageContext.setQueueId(responseHeader.getQueueId());
+ sendMessageContext.setQueueOffset(responseHeader.getQueueOffset());
- sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_SUCCESS);
- sendMessageContext.setCommercialSendTimes(incValue);
- sendMessageContext.setCommercialSendSize(wroteSize);
- sendMessageContext.setCommercialOwner(owner);
- }
- return null;
- } else {
- if (hasSendMessageHook()) {
- int wroteSize = request.getBody().length;
- int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT);
-
- sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_FAILURE);
- sendMessageContext.setCommercialSendTimes(incValue);
- sendMessageContext.setCommercialSendSize(wroteSize);
- sendMessageContext.setCommercialOwner(owner);
- }
+ int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount();
+ int wroteSize = putMessageResult.getAppendMessageResult().getWroteBytes();
+ int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT) * commercialBaseCount;
+
+ sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_SUCCESS);
+ sendMessageContext.setCommercialSendTimes(incValue);
+ sendMessageContext.setCommercialSendSize(wroteSize);
+ sendMessageContext.setCommercialOwner(owner);
}
+ return null;
} else {
+ if (hasSendMessageHook()) {
+ int wroteSize = request.getBody().length;
+ int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT);
+
+ sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_FAILURE);
+ sendMessageContext.setCommercialSendTimes(incValue);
+ sendMessageContext.setCommercialSendSize(wroteSize);
+ sendMessageContext.setCommercialOwner(owner);
+ }
+ }
+ return response;
+ }
+ private RemotingCommand sendBatchMessage(final ChannelHandlerContext ctx, //
+ final RemotingCommand request, //
+ final SendMessageContext sendMessageContext, //
+ final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
+
+ final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
+ final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader();
+
+
+ response.setOpaque(request.getOpaque());
+
+ response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());
+ response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));
+
+ log.debug("Receive SendMessage request command {}", request);
+
+ final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();
+ if (this.brokerController.getMessageStore().now() < startTimstamp) {
response.setCode(ResponseCode.SYSTEM_ERROR);
- response.setRemark("store putMessage return null");
+ response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp)));
+ return response;
+ }
+
+ response.setCode(-1);
+ super.msgCheck(ctx, requestHeader, response);
+ if (response.getCode() != -1) {
+ return response;
+ }
+
+
+ int queueIdInt = requestHeader.getQueueId();
+ TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
+
+ if (queueIdInt < 0) {
+ queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();
+ }
+
+ if (requestHeader.getTopic().length() > Byte.MAX_VALUE) {
+ response.setCode(ResponseCode.MESSAGE_ILLEGAL);
+ response.setRemark("message topic length too long " + requestHeader.getTopic().length());
+ return response;
}
+ if (requestHeader.getTopic() != null && requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
+ response.setCode(ResponseCode.MESSAGE_ILLEGAL);
+ response.setRemark("batch request does not support retry group " + requestHeader.getTopic());
+ return response;
+ }
+ MessageExtBatch messageExtBatch = new MessageExtBatch();
+ messageExtBatch.setTopic(requestHeader.getTopic());
+ messageExtBatch.setQueueId(queueIdInt);
+
+ int sysFlag = requestHeader.getSysFlag();
+ if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) {
+ sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG;
+ }
+ messageExtBatch.setSysFlag(sysFlag);
+
+ messageExtBatch.setFlag(requestHeader.getFlag());
+ MessageAccessor.setProperties(messageExtBatch, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
+ messageExtBatch.setBody(request.getBody());
+ messageExtBatch.setBornTimestamp(requestHeader.getBornTimestamp());
+ messageExtBatch.setBornHost(ctx.channel().remoteAddress());
+ messageExtBatch.setStoreHost(this.getStoreHost());
+ messageExtBatch.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
+
+ PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessages(messageExtBatch);
+
+ handlePutMessageResult(putMessageResult, response, request, messageExtBatch, responseHeader, sendMessageContext, ctx, queueIdInt);
return response;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/47fad3c1/client/src/main/java/org/apache/rocketmq/client/Validators.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/Validators.java b/client/src/main/java/org/apache/rocketmq/client/Validators.java
index 899efa6..b49537f 100644
--- a/client/src/main/java/org/apache/rocketmq/client/Validators.java
+++ b/client/src/main/java/org/apache/rocketmq/client/Validators.java
@@ -95,6 +95,7 @@ public class Validators {
}
// topic
Validators.checkTopic(msg.getTopic());
+
// body
if (null == msg.getBody()) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null");
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/47fad3c1/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 12580c1..bdce883 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -18,14 +18,14 @@ package org.apache.rocketmq.client.impl;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
+import java.util.Iterator;
+import java.util.Collections;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.consumer.PullCallback;
@@ -50,10 +50,11 @@ import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.TopicStatsTable;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageClientIDSetter;
-import org.apache.rocketmq.common.message.MessageConst;
-import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageBatch;
import org.apache.rocketmq.common.namesrv.TopAddressing;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
@@ -147,6 +148,7 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.slf4j.Logger;
+
public class MQClientAPIImpl {
private final static Logger log = ClientLogger.getLog();
@@ -278,14 +280,14 @@ public class MQClientAPIImpl {
}
public SendResult sendMessage(//
- final String addr, // 1
- final String brokerName, // 2
- final Message msg, // 3
- final SendMessageRequestHeader requestHeader, // 4
- final long timeoutMillis, // 5
- final CommunicationMode communicationMode, // 6
- final SendMessageContext context, // 7
- final DefaultMQProducerImpl producer // 8
+ final String addr, // 1
+ final String brokerName, // 2
+ final Message msg, // 3
+ final SendMessageRequestHeader requestHeader, // 4
+ final long timeoutMillis, // 5
+ final CommunicationMode communicationMode, // 6
+ final SendMessageContext context, // 7
+ final DefaultMQProducerImpl producer // 8
) throws RemotingException, MQBrokerException, InterruptedException {
return sendMessage(addr, brokerName, msg, requestHeader, timeoutMillis, communicationMode, null, null, null, 0, context, producer);
}
@@ -305,9 +307,9 @@ public class MQClientAPIImpl {
final DefaultMQProducerImpl producer // 12
) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand request = null;
- if (sendSmartMsg) {
+ if (sendSmartMsg || msg instanceof MessageBatch) {
SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
- request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
+ request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
} else {
request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
}
@@ -334,11 +336,11 @@ public class MQClientAPIImpl {
}
private SendResult sendMessageSync(//
- final String addr, //
- final String brokerName, //
- final Message msg, //
- final long timeoutMillis, //
- final RemotingCommand request//
+ final String addr, //
+ final String brokerName, //
+ final Message msg, //
+ final long timeoutMillis, //
+ final RemotingCommand request//
) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
assert response != null;
@@ -507,8 +509,16 @@ public class MQClientAPIImpl {
MessageQueue messageQueue = new MessageQueue(msg.getTopic(), brokerName, responseHeader.getQueueId());
+ String uniqMsgId = MessageClientIDSetter.getUniqID(msg);
+ if (msg instanceof MessageBatch) {
+ StringBuilder sb = new StringBuilder();
+ for (Message message : (MessageBatch) msg) {
+ sb.append(sb.length() == 0 ? "" : ",").append(MessageClientIDSetter.getUniqID(message));
+ }
+ uniqMsgId = sb.toString();
+ }
SendResult sendResult = new SendResult(sendStatus,
- MessageClientIDSetter.getUniqID(msg),
+ uniqMsgId,
responseHeader.getMsgId(), messageQueue, responseHeader.getQueueOffset());
sendResult.setTransactionId(responseHeader.getTransactionId());
String regionId = response.getExtFields().get(MessageConst.PROPERTY_MSG_REGION);
@@ -1452,7 +1462,7 @@ public class MQClientAPIImpl {
}
public Map<String, Map<MessageQueue, Long>> invokeBrokerToGetConsumerStatus(final String addr, final String topic, final String group,
- final String clientAddr, final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
+ final String clientAddr, final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
GetConsumerStatusRequestHeader requestHeader = new GetConsumerStatusRequestHeader();
requestHeader.setTopic(topic);
requestHeader.setGroup(group);
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/47fad3c1/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index 8e81979..d828875 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -30,6 +30,16 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageClientIDSetter;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageBatch;
+import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageType;
+import org.apache.rocketmq.common.message.MessageId;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.common.ClientErrorCode;
@@ -58,15 +68,6 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ServiceState;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.help.FAQUrl;
-import org.apache.rocketmq.common.message.Message;
-import org.apache.rocketmq.common.message.MessageAccessor;
-import org.apache.rocketmq.common.message.MessageClientIDSetter;
-import org.apache.rocketmq.common.message.MessageConst;
-import org.apache.rocketmq.common.message.MessageDecoder;
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.common.message.MessageId;
-import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.common.message.MessageType;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader;
import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader;
@@ -595,8 +596,10 @@ public class DefaultMQProducerImpl implements MQProducerInner {
byte[] prevBody = msg.getBody();
try {
-
- MessageClientIDSetter.setUniqID(msg);
+ //for MessageBatch,ID has been set in the generating process
+ if (!(msg instanceof MessageBatch)) {
+ MessageClientIDSetter.setUniqID(msg);
+ }
int sysFlag = 0;
if (this.tryToCompressMessage(msg)) {
@@ -652,6 +655,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode());
+ requestHeader.setBatch(msg instanceof MessageBatch);
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
if (reconsumeTimes != null) {
@@ -737,6 +741,10 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
private boolean tryToCompressMessage(final Message msg) {
+ if (msg instanceof MessageBatch) {
+ //batch dose not support compressing right now
+ return false;
+ }
byte[] body = msg.getBody();
if (body != null) {
if (body.length >= this.defaultMQProducer.getCompressMsgBodyOverHowmuch()) {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/47fad3c1/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
index 3480c92..135a447 100644
--- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
@@ -16,14 +16,18 @@
*/
package org.apache.rocketmq.client.producer;
+import java.util.Collection;
import java.util.List;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.QueryResult;
+import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageBatch;
+import org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageId;
@@ -577,6 +581,40 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
return this.defaultMQProducerImpl.queryMessageByUniqKey(topic, msgId);
}
+ @Override
+ public SendResult send(Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+ return this.defaultMQProducerImpl.send(batch(msgs));
+ }
+
+ @Override
+ public SendResult send(Collection<Message> msgs, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+ return this.defaultMQProducerImpl.send(batch(msgs), timeout);
+ }
+
+ @Override
+ public SendResult send(Collection<Message> msgs, MessageQueue messageQueue) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+ return this.defaultMQProducerImpl.send(batch(msgs), messageQueue);
+ }
+
+ @Override
+ public SendResult send(Collection<Message> msgs, MessageQueue messageQueue, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+ return this.defaultMQProducerImpl.send(batch(msgs), messageQueue, timeout);
+ }
+
+ private MessageBatch batch(Collection<Message> msgs) throws MQClientException {
+ MessageBatch msgBatch;
+ try {
+ msgBatch = MessageBatch.generateFromList(msgs);
+ for (Message message : msgBatch) {
+ Validators.checkMessage(message, this);
+ MessageClientIDSetter.setUniqID(message);
+ }
+ msgBatch.setBody(msgBatch.encode());
+ } catch (Exception e) {
+ throw new MQClientException("Failed to initiate the MessageBatch", e);
+ }
+ return msgBatch;
+ }
public String getProducerGroup() {
return producerGroup;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/47fad3c1/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
index 9fc7586..14caf6f 100644
--- a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.client.producer;
+import java.util.Collection;
import java.util.List;
import org.apache.rocketmq.client.MQAdmin;
import org.apache.rocketmq.client.exception.MQBrokerException;
@@ -81,4 +82,17 @@ public interface MQProducer extends MQAdmin {
TransactionSendResult sendMessageInTransaction(final Message msg,
final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException;
+
+ //for batch
+ SendResult send(final Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException,
+ InterruptedException;
+
+ SendResult send(final Collection<Message> msgs, final long timeout) throws MQClientException,
+ RemotingException, MQBrokerException, InterruptedException;
+
+ SendResult send(final Collection<Message> msgs, final MessageQueue mq) throws MQClientException,
+ RemotingException, MQBrokerException, InterruptedException;
+
+ SendResult send(final Collection<Message> msgs, final MessageQueue mq, final long timeout)
+ throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/47fad3c1/common/src/main/java/org/apache/rocketmq/common/TopicFilterType.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicFilterType.java b/common/src/main/java/org/apache/rocketmq/common/TopicFilterType.java
index 58459e0..8dde5d8 100644
--- a/common/src/main/java/org/apache/rocketmq/common/TopicFilterType.java
+++ b/common/src/main/java/org/apache/rocketmq/common/TopicFilterType.java
@@ -19,4 +19,5 @@ package org.apache.rocketmq.common;
public enum TopicFilterType {
SINGLE_TAG,
MULTI_TAG
+
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/47fad3c1/common/src/main/java/org/apache/rocketmq/common/message/MessageBatch.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageBatch.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageBatch.java
new file mode 100644
index 0000000..ca2ce88
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageBatch.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.message;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.rocketmq.common.MixAll;
+
+public class MessageBatch extends Message implements Iterable<Message> {
+
+ private static final long serialVersionUID = 621335151046335557L;
+ private final List<Message> messages;
+
+ private MessageBatch(List<Message> messages) {
+ this.messages = messages;
+ }
+
+ public byte[] encode() {
+ return MessageDecoder.encodeMessages(messages);
+ }
+
+ public Iterator<Message> iterator() {
+ return messages.iterator();
+ }
+
+ public static MessageBatch generateFromList(Collection<Message> messages) {
+ assert messages != null;
+ assert messages.size() > 0;
+ List<Message> messageList = new ArrayList<Message>(messages.size());
+ Message first = null;
+ for (Message message : messages) {
+ if (message.getDelayTimeLevel() > 0) {
+ throw new UnsupportedOperationException("TimeDelayLevel in not supported for batching");
+ }
+ if (message.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
+ throw new UnsupportedOperationException("Retry Group is not supported for batching");
+ }
+ if (first == null) {
+ first = message;
+ } else {
+ if (!first.getTopic().equals(message.getTopic())) {
+ throw new UnsupportedOperationException("The topic of the messages in one batch should be the same");
+ }
+ if (first.isWaitStoreMsgOK() != message.isWaitStoreMsgOK()) {
+ throw new UnsupportedOperationException("The waitStoreMsgOK of the messages in one batch should the same");
+ }
+ }
+ messageList.add(message);
+ }
+ MessageBatch messageBatch = new MessageBatch(messageList);
+
+ messageBatch.setTopic(first.getTopic());
+ messageBatch.setWaitStoreMsgOK(first.isWaitStoreMsgOK());
+ return messageBatch;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/47fad3c1/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
index 4f4e158..90b837a 100644
--- a/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
+++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
@@ -200,6 +200,8 @@ public class MessageDecoder {
return byteBuffer.array();
}
+
+
public static MessageExt decode(
java.nio.ByteBuffer byteBuffer, final boolean readBody, final boolean deCompressBody) {
return decode(byteBuffer, readBody, deCompressBody, false);
@@ -372,4 +374,105 @@ public class MessageDecoder {
return map;
}
+
+
+ public static byte[] encodeMessage(Message message) {
+ //only need flag, body, properties
+ byte[] body = message.getBody();
+ int bodyLen = body.length;
+ String properties = messageProperties2String(message.getProperties());
+ byte[] propertiesBytes = properties.getBytes(CHARSET_UTF8);
+ //note properties length must not more than Short.MAX
+ short propertiesLength = (short) propertiesBytes.length;
+ int sysFlag = message.getFlag();
+ int storeSize = 4 // 1 TOTALSIZE
+ + 4 // 2 MAGICCOD
+ + 4 // 3 BODYCRC
+ + 4 // 4 FLAG
+ + 4 + bodyLen // 4 BODY
+ + 2 + propertiesLength;
+ ByteBuffer byteBuffer = ByteBuffer.allocate(storeSize);
+ // 1 TOTALSIZE
+ byteBuffer.putInt(storeSize);
+
+ // 2 MAGICCODE
+ byteBuffer.putInt(0);
+
+ // 3 BODYCRC
+ byteBuffer.putInt(0);
+
+ // 4 FLAG
+ int flag = message.getFlag();
+ byteBuffer.putInt(flag);
+
+ // 5 BODY
+ byteBuffer.putInt(bodyLen);
+ byteBuffer.put(body);
+
+ // 6 properties
+ byteBuffer.putShort(propertiesLength);
+ byteBuffer.put(propertiesBytes);
+
+ return byteBuffer.array();
+ }
+
+ public static Message decodeMessage(ByteBuffer byteBuffer) throws Exception {
+ Message message = new Message();
+
+ // 1 TOTALSIZE
+ byteBuffer.getInt();
+
+ // 2 MAGICCODE
+ byteBuffer.getInt();
+
+ // 3 BODYCRC
+ byteBuffer.getInt();
+
+ // 4 FLAG
+ int flag = byteBuffer.getInt();
+ message.setFlag(flag);
+
+ // 5 BODY
+ int bodyLen = byteBuffer.getInt();
+ byte[] body = new byte[bodyLen];
+ byteBuffer.get(body);
+ message.setBody(body);
+
+ // 6 properties
+ short propertiesLen = byteBuffer.getShort();
+ byte[] propertiesBytes = new byte[propertiesLen];
+ byteBuffer.get(propertiesBytes);
+ message.setProperties(string2messageProperties(new String(propertiesBytes, CHARSET_UTF8)));
+
+ return message;
+ }
+
+ public static byte[] encodeMessages(List<Message> messages) {
+ //TO DO refactor, accumulate in one buffer, avoid copies
+ List<byte[]> encodedMessages = new ArrayList<byte[]>(messages.size());
+ int allSize = 0;
+ for (Message message: messages) {
+ byte[] tmp = encodeMessage(message);
+ encodedMessages.add(tmp);
+ allSize += tmp.length;
+ }
+ byte[] allBytes = new byte[allSize];
+ int pos = 0;
+ for (byte[] bytes : encodedMessages) {
+ System.arraycopy(bytes, 0, allBytes, pos, bytes.length);
+ pos += bytes.length;
+ }
+ return allBytes;
+ }
+
+
+ public static List<Message> decodeMessages(ByteBuffer byteBuffer) throws Exception {
+ //TO DO add a callback for processing, avoid creating lists
+ List<Message> msgs = new ArrayList<Message>();
+ while (byteBuffer.hasRemaining()) {
+ Message msg = decodeMessage(byteBuffer);
+ msgs.add(msg);
+ }
+ return msgs;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/47fad3c1/common/src/main/java/org/apache/rocketmq/common/message/MessageExt.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageExt.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageExt.java
index d11069f..3f77767 100644
--- a/common/src/main/java/org/apache/rocketmq/common/message/MessageExt.java
+++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageExt.java
@@ -64,7 +64,7 @@ public class MessageExt extends Message {
return TopicFilterType.SINGLE_TAG;
}
- private static ByteBuffer socketAddress2ByteBuffer(final SocketAddress socketAddress, final ByteBuffer byteBuffer) {
+ public static ByteBuffer socketAddress2ByteBuffer(final SocketAddress socketAddress, final ByteBuffer byteBuffer) {
InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress;
byteBuffer.put(inetSocketAddress.getAddress().getAddress(), 0, 4);
byteBuffer.putInt(inetSocketAddress.getPort());
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/47fad3c1/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBatch.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBatch.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBatch.java
new file mode 100644
index 0000000..352ab37
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBatch.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.message;
+
+import java.nio.ByteBuffer;
+
+public class MessageExtBatch extends MessageExt {
+
+ private static final long serialVersionUID = -2353110995348498537L;
+
+
+ public ByteBuffer wrap() {
+ assert getBody() != null;
+ return ByteBuffer.wrap(getBody(), 0, getBody().length);
+ }
+
+
+ private ByteBuffer encodedBuff;
+
+ public ByteBuffer getEncodedBuff() {
+ return encodedBuff;
+ }
+
+ public void setEncodedBuff(ByteBuffer encodedBuff) {
+ this.encodedBuff = encodedBuff;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/47fad3c1/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
index 217e8df..c6b0925 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
@@ -159,4 +159,7 @@ public class RequestCode {
* get config from name server
*/
public static final int GET_NAMESRV_CONFIG = 319;
+
+
+ public static final int SEND_BATCH_MESSAGE = 320;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/47fad3c1/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java
index 38b6589..2df31e6 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java
@@ -48,6 +48,8 @@ public class SendMessageRequestHeader implements CommandCustomHeader {
private Integer reconsumeTimes;
@CFNullable
private boolean unitMode = false;
+ @CFNullable
+ private boolean batch = false;
private Integer maxReconsumeTimes;
@Override
@@ -149,4 +151,12 @@ public class SendMessageRequestHeader implements CommandCustomHeader {
public void setMaxReconsumeTimes(final Integer maxReconsumeTimes) {
this.maxReconsumeTimes = maxReconsumeTimes;
}
+
+ public boolean isBatch() {
+ return batch;
+ }
+
+ public void setBatch(boolean batch) {
+ this.batch = batch;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/47fad3c1/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java
index 34c83cb..757ef0c 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java
@@ -51,6 +51,10 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader {
private Integer l; // consumeRetryTimes
+ @CFNullable
+ private boolean m; //batch
+
+
public static SendMessageRequestHeader createSendMessageRequestHeaderV1(final SendMessageRequestHeaderV2 v2) {
SendMessageRequestHeader v1 = new SendMessageRequestHeader();
v1.setProducerGroup(v2.a);
@@ -65,6 +69,7 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader {
v1.setReconsumeTimes(v2.j);
v1.setUnitMode(v2.k);
v1.setMaxReconsumeTimes(v2.l);
+ v1.setBatch(v2.m);
return v1;
}
@@ -82,6 +87,7 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader {
v2.j = v1.getReconsumeTimes();
v2.k = v1.isUnitMode();
v2.l = v1.getMaxReconsumeTimes();
+ v2.m = v1.isBatch();
return v2;
}
@@ -184,4 +190,12 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader {
public void setL(final Integer l) {
this.l = l;
}
+
+ public boolean isM() {
+ return m;
+ }
+
+ public void setM(boolean m) {
+ this.m = m;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/47fad3c1/common/src/test/java/org/apache/rocketmq/common/MessageBatchTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/rocketmq/common/MessageBatchTest.java b/common/src/test/java/org/apache/rocketmq/common/MessageBatchTest.java
new file mode 100644
index 0000000..1e406d2
--- /dev/null
+++ b/common/src/test/java/org/apache/rocketmq/common/MessageBatchTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageBatch;
+import org.junit.Test;
+
+public class MessageBatchTest {
+
+
+ public List<Message> generateMessages() {
+ List<Message> messages = new ArrayList<Message>();
+ Message message1 = new Message("topic1", "body".getBytes());
+ Message message2 = new Message("topic1", "body".getBytes());
+
+ messages.add(message1);
+ messages.add(message2);
+ return messages;
+ }
+
+ @Test
+ public void testGenerate_OK() throws Exception{
+ List<Message> messages = generateMessages();
+ MessageBatch.generateFromList(messages);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testGenerate_DiffTopic() throws Exception{
+ List<Message> messages = generateMessages();
+ messages.get(1).setTopic("topic2");
+ MessageBatch.generateFromList(messages);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testGenerate_DiffWaitOK() throws Exception{
+ List<Message> messages = generateMessages();
+ messages.get(1).setWaitStoreMsgOK(false);
+ MessageBatch.generateFromList(messages);
+ }
+ @Test(expected = UnsupportedOperationException.class)
+ public void testGenerate_Delay() throws Exception{
+ List<Message> messages = generateMessages();
+ messages.get(1).setDelayTimeLevel(1);
+ MessageBatch.generateFromList(messages);
+ }
+ @Test(expected = UnsupportedOperationException.class)
+ public void testGenerate_Retry() throws Exception{
+ List<Message> messages = generateMessages();
+ messages.get(1).setTopic(MixAll.RETRY_GROUP_TOPIC_PREFIX + "topic");
+ MessageBatch.generateFromList(messages);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/47fad3c1/common/src/test/java/org/apache/rocketmq/common/MessageEncodeDecodeTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/rocketmq/common/MessageEncodeDecodeTest.java b/common/src/test/java/org/apache/rocketmq/common/MessageEncodeDecodeTest.java
new file mode 100644
index 0000000..a219eda
--- /dev/null
+++ b/common/src/test/java/org/apache/rocketmq/common/MessageEncodeDecodeTest.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common;
+
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Created by liuzhendong on 16/12/21.
+ */
+public class MessageEncodeDecodeTest {
+
+
+ @Test
+ public void testEncodeDecodeSingle() throws Exception{
+ Message message = new Message("topic", "body".getBytes());
+ message.setFlag(12);
+ message.putUserProperty("key","value");
+ byte[] bytes = MessageDecoder.encodeMessage(message);
+ ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
+ buffer.put(bytes);
+ buffer.flip();
+ Message newMessage = MessageDecoder.decodeMessage(buffer);
+
+ assertTrue(message.getFlag() == newMessage.getFlag());
+ assertTrue(newMessage.getProperty("key").equals(newMessage.getProperty("key")));
+ assertTrue(Arrays.equals(newMessage.getBody(), message.getBody()));
+ }
+
+ @Test
+ public void testEncodeDecodeList() throws Exception {
+ List<Message> messages = new ArrayList<Message>(128);
+ for (int i = 0; i < 100; i++) {
+ Message message = new Message("topic", ("body" + i).getBytes());
+ message.setFlag(i);
+ message.putUserProperty("key", "value" + i);
+ messages.add(message);
+ }
+ byte[] bytes = MessageDecoder.encodeMessages(messages);
+
+ ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
+ buffer.put(bytes);
+ buffer.flip();
+
+ List<Message> newMsgs = MessageDecoder.decodeMessages(buffer);
+
+ assertTrue(newMsgs.size() == messages.size());
+
+ for (int i = 0; i < newMsgs.size(); i++) {
+ Message message = messages.get(i);
+ Message newMessage = newMsgs.get(i);
+ assertTrue(message.getFlag() == newMessage.getFlag());
+ assertTrue(newMessage.getProperty("key").equals(newMessage.getProperty("key")));
+ assertTrue(Arrays.equals(newMessage.getBody(), message.getBody()));
+
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/47fad3c1/store/src/main/java/org/apache/rocketmq/store/AppendMessageCallback.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/AppendMessageCallback.java b/store/src/main/java/org/apache/rocketmq/store/AppendMessageCallback.java
index 70b702e..16a62fa 100644
--- a/store/src/main/java/org/apache/rocketmq/store/AppendMessageCallback.java
+++ b/store/src/main/java/org/apache/rocketmq/store/AppendMessageCallback.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.store;
import java.nio.ByteBuffer;
+import org.apache.rocketmq.common.message.MessageExtBatch;
/**
* Write messages callback interface
@@ -32,5 +33,17 @@ public interface AppendMessageCallback {
* @return How many bytes to write
*/
AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer,
- final int maxBlank, final MessageExtBrokerInner msg);
+ final int maxBlank, final MessageExtBrokerInner msg);
+
+ /**
+ * After batched message serialization, write MapedByteBuffer
+ *
+ * @param byteBuffer
+ * @param maxBlank
+ * @param messageExtBatch, backed up by a byte array
+ *
+ * @return How many bytes to write
+ */
+ AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer,
+ final int maxBlank, final MessageExtBatch messageExtBatch);
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/47fad3c1/store/src/main/java/org/apache/rocketmq/store/AppendMessageResult.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/AppendMessageResult.java b/store/src/main/java/org/apache/rocketmq/store/AppendMessageResult.java
index 5182dc4..d6d1aa6 100644
--- a/store/src/main/java/org/apache/rocketmq/store/AppendMessageResult.java
+++ b/store/src/main/java/org/apache/rocketmq/store/AppendMessageResult.java
@@ -34,6 +34,8 @@ public class AppendMessageResult {
private long logicsOffset;
private long pagecacheRT = 0;
+ private int msgNum = 1;
+
public AppendMessageResult(AppendMessageStatus status) {
this(status, 0, 0, "", 0, 0, 0);
}
@@ -109,6 +111,14 @@ public class AppendMessageResult {
this.logicsOffset = logicsOffset;
}
+ public int getMsgNum() {
+ return msgNum;
+ }
+
+ public void setMsgNum(int msgNum) {
+ this.msgNum = msgNum;
+ }
+
@Override
public String toString() {
return "AppendMessageResult{" +
@@ -119,6 +129,7 @@ public class AppendMessageResult {
", storeTimestamp=" + storeTimestamp +
", logicsOffset=" + logicsOffset +
", pagecacheRT=" + pagecacheRT +
+ ", msgNum=" + msgNum +
'}';
}
}