You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by dongeforever <gi...@git.apache.org> on 2017/02/07 06:21:18 UTC

[GitHub] incubator-rocketmq pull request #53: [ROCKETMQ-80] Add batch feature

GitHub user dongeforever opened a pull request:

    https://github.com/apache/incubator-rocketmq/pull/53

    [ROCKETMQ-80] Add batch feature

    Tests show that Kafka's million-level TPS is mainly owed to batch. When set batch size to 1, the TPS is reduced an order of magnitude. So I try to add this feature to RocketMQ.
    
    For a minimal effort, it works as follows:
     
    * Only add synchronous send functions to MQProducer interface, just like **send(final Collection<Message> msgs)**
    * Use **MessageBatch** which extends **Message** and implements **Iterable\<Message\>**
    * Use byte buffer instead of list of objects to avoid too much GC in Broker.
    * Split the decode and encode logic from **lockForPutMessage** to avoid too many race conditions.
    
    Tests:
     On linux with 24 Core 48G Ram and SSD, single broker, using 50 threads to send 50Byte(body) message in batch size 50, we get about 150w TPS until the disk is full.
     
    
    Potential problems:
    Although the messages can be accumulated in the Broker very quickly, it need time to dispatch to the consume queue, which is much slower than accepting messages. So the messages may not be able to be consumed immediately.
    
    We may need to refactor the **ReputMessageService** to solve this problem.
    
    And if guys have some ideas, please let me know or just share it in this issue.
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/dongeforever/incubator-rocketmq ROCKETMQ-80

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-rocketmq/pull/53.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #53
    
----
commit e03b6e6a496526848df603fd406b77aa6afc87d2
Author: dongeforever <zh...@yeah.net>
Date:   2017-02-07T06:12:16Z

    [ROCKETMQ-80] Add batch feature

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rocketmq pull request #53: [ROCKETMQ-80] Add batch feature

Posted by vongosling <gi...@git.apache.org>.
Github user vongosling commented on a diff in the pull request:

    https://github.com/apache/incubator-rocketmq/pull/53#discussion_r101993057
  
    --- Diff: client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java ---
    @@ -737,6 +742,10 @@ public MQClientInstance getmQClientFactory() {
         }
     
         private boolean tryToCompressMessage(final Message msg) {
    +        if (msg instanceof MessageBatch) {
    --- End diff --
    
    why not support nowadays


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rocketmq pull request #53: [ROCKETMQ-80] Add batch feature

Posted by dongeforever <gi...@git.apache.org>.
Github user dongeforever commented on a diff in the pull request:

    https://github.com/apache/incubator-rocketmq/pull/53#discussion_r104348323
  
    --- Diff: store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java ---
    @@ -331,7 +331,7 @@ public long getMinOffsetInQueue() {
         public void putMessagePositionInfoWrapper(long offset, int size, long tagsCode, long storeTimestamp,
             long logicOffset) {
             final int maxRetries = 30;
    -        boolean canWrite = this.defaultMessageStore.getRunningFlags().isWriteable();
    +        boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable();
    --- End diff --
    
    @Jaskey Yeah, it is better to use the name isConsumeQWritable


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rocketmq issue #53: [ROCKETMQ-80] Add batch feature

Posted by coveralls <gi...@git.apache.org>.
Github user coveralls commented on the issue:

    https://github.com/apache/incubator-rocketmq/pull/53
  
    
    [![Coverage Status](https://coveralls.io/builds/10391083/badge)](https://coveralls.io/builds/10391083)
    
    Coverage increased (+0.3%) to 31.791% when pulling **eb1a5e78dfaf5f61b0a2d1be3a9f7cd1c965b918 on dongeforever:ROCKETMQ-80** into **573b22c37806a21543b90707bcce6022243a62da on apache:master**.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rocketmq issue #53: [ROCKETMQ-80] Add batch feature

Posted by shroman <gi...@git.apache.org>.
Github user shroman commented on the issue:

    https://github.com/apache/incubator-rocketmq/pull/53
  
    @dongeforever yes, I think we have to discuss it in the dev mailing list :+1: 
    I think having a class/methods for wire protocol version checking and applying them (and, as a result, accepting or rejecting) when a client wants to establish a connection to a broker, etc. should work. I guess that's what you meant by having "checking codes in one place" :)
    The version information can be placed into headers.
    
    Such protocol changes shouldn't happen that often (RocketMQ v.4.0.0, v.4.1.0 may use protocol v.1, but RocketMQ v.5.0.0 may use protocol v.2), but if they happen, in this way we can guarantee that the communication between system components is done correctly.
    
    Let's discuss it in the ml. My understanding of RocketMQ is still not complete ;) Maybe there are better ideas not to over-complicate is with versioning.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rocketmq pull request #53: [ROCKETMQ-80] Add batch feature

Posted by Jaskey <gi...@git.apache.org>.
Github user Jaskey commented on a diff in the pull request:

    https://github.com/apache/incubator-rocketmq/pull/53#discussion_r103854335
  
    --- Diff: store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java ---
    @@ -331,7 +331,7 @@ public long getMinOffsetInQueue() {
         public void putMessagePositionInfoWrapper(long offset, int size, long tagsCode, long storeTimestamp,
             long logicOffset) {
             final int maxRetries = 30;
    -        boolean canWrite = this.defaultMessageStore.getRunningFlags().isWriteable();
    +        boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable();
    --- End diff --
    
    isConsumeQWritable will be better I guess


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rocketmq pull request #53: [ROCKETMQ-80] Add batch feature

Posted by vongosling <gi...@git.apache.org>.
Github user vongosling commented on a diff in the pull request:

    https://github.com/apache/incubator-rocketmq/pull/53#discussion_r101980234
  
    --- Diff: broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java ---
    @@ -442,6 +449,202 @@ private RemotingCommand sendMessage(final ChannelHandlerContext ctx, //
             return response;
         }
     
    +    private RemotingCommand sendBatchMessage(final ChannelHandlerContext ctx, //
    +                                        final RemotingCommand request, //
    +                                        final SendMessageContext sendMessageContext, //
    +                                        final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
    +
    +        final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
    +        final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader();
    +
    +
    +        response.setOpaque(request.getOpaque());
    +
    +        response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());
    +        response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));
    +
    +        if (log.isDebugEnabled()) {
    --- End diff --
    
    IMO, we can remove redundant expression isDebugEnabled here. Also please capitalize the first letter in log output


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rocketmq issue #53: [ROCKETMQ-80] Add batch feature

Posted by zhouxinyu <gi...@git.apache.org>.
Github user zhouxinyu commented on the issue:

    https://github.com/apache/incubator-rocketmq/pull/53
  
    I wonder that are there some compatibility issues between new client version and old server version? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rocketmq pull request #53: [ROCKETMQ-80] Add batch feature

Posted by vongosling <gi...@git.apache.org>.
Github user vongosling commented on a diff in the pull request:

    https://github.com/apache/incubator-rocketmq/pull/53#discussion_r101994977
  
    --- Diff: common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java ---
    @@ -48,6 +48,8 @@
         private Integer reconsumeTimes;
         @CFNullable
         private boolean unitMode = false;
    +    @CFNullable
    --- End diff --
    
    Check this annotation works well


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rocketmq issue #53: [ROCKETMQ-80] Add batch feature

Posted by dongeforever <gi...@git.apache.org>.
Github user dongeforever commented on the issue:

    https://github.com/apache/incubator-rocketmq/pull/53
  
    @shroman thank you. 
    Great idea to create a version policy based on protocol changes.
    And we would need to control every api  separately.
    So maybe we can write all these checking codes in one place for all apis.
    
    BTW, version control is a global problem, not only for batch, maybe  we need more discussion in the mailing list.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rocketmq issue #53: [ROCKETMQ-80] Add batch feature

Posted by dongeforever <gi...@git.apache.org>.
Github user dongeforever commented on the issue:

    https://github.com/apache/incubator-rocketmq/pull/53
  
    @vongosling @zhouxinyu @WillemJiang please have a review.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rocketmq issue #53: [ROCKETMQ-80] Add batch feature

Posted by Jaskey <gi...@git.apache.org>.
Github user Jaskey commented on the issue:

    https://github.com/apache/incubator-rocketmq/pull/53
  
    @dongeforever 
    
    I have the same wishes for batch send too, but what drives me is that user may propably need a batch id for one batch of message, and these message should be success all to one single queue, which is nessecary when sneding order message. say msgA msgB and msgC should be consumed in order, they should be send to one same queue, but if we use for loop to send this, A may success and B may fail to the same queue since the queue numbers may changes at that exctly time.
    
    Batch send could solve this problem. But we may also need to generate a uniq batch id for this in client, which will help us to optimze the performance of consumeorderlyservice in the furture. Currently, message in one single queue can only be consumed only if the previous one consumed successfully which actually is too strict. Actully we only need the message in one batch consumed in order, batch id will help us to do this.
    
    **So in general, I suggest adding batch id  when sending batch message in all message property.**
    
    PS: There looks like two many repeated code, any ways or plans to clean it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rocketmq issue #53: [ROCKETMQ-80] Add batch feature

Posted by shroman <gi...@git.apache.org>.
Github user shroman commented on the issue:

    https://github.com/apache/incubator-rocketmq/pull/53
  
    Sounds good to check for the protocol version in the request header, and reject with an error when not matched.
    Probably, `RemotingCommand.REMOTING_VERSION_KEY` will work, but I would create a version policy based not on the release (that is used now) but on the protocol changes, so that we don't have to introduce a new checking condition in the code with each release. Just an idea.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rocketmq pull request #53: [ROCKETMQ-80] Add batch feature

Posted by vongosling <gi...@git.apache.org>.
Github user vongosling commented on a diff in the pull request:

    https://github.com/apache/incubator-rocketmq/pull/53#discussion_r101990343
  
    --- Diff: broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java ---
    @@ -442,6 +449,202 @@ private RemotingCommand sendMessage(final ChannelHandlerContext ctx, //
             return response;
         }
     
    +    private RemotingCommand sendBatchMessage(final ChannelHandlerContext ctx, //
    +                                        final RemotingCommand request, //
    +                                        final SendMessageContext sendMessageContext, //
    +                                        final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
    +
    +        final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
    +        final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader();
    +
    +
    +        response.setOpaque(request.getOpaque());
    +
    +        response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());
    +        response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));
    +
    +        if (log.isDebugEnabled()) {
    +            log.debug("receive SendMessage request command, " + request);
    +        }
    +
    +        final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();
    +        if (this.brokerController.getMessageStore().now() < startTimstamp) {
    +            response.setCode(ResponseCode.SYSTEM_ERROR);
    +            response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp)));
    +            return response;
    +        }
    +
    +        response.setCode(-1);
    +        super.msgCheck(ctx, requestHeader, response);
    +        if (response.getCode() != -1) {
    +            return response;
    +        }
    +
    +
    +        int queueIdInt = requestHeader.getQueueId();
    +        TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
    +
    +        if (queueIdInt < 0) {
    +            queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();
    +        }
    +
    +        int sysFlag = requestHeader.getSysFlag();
    +        if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) {
    +            sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG;
    +        }
    +
    +        String newTopic = requestHeader.getTopic();
    +        if (null != newTopic && newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
    +            String groupName = newTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
    +            SubscriptionGroupConfig subscriptionGroupConfig =
    +                    this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupName);
    +            if (null == subscriptionGroupConfig) {
    +                response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
    +                response.setRemark(
    +                        "subscription group not exist, " + groupName + " " + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
    +                return response;
    +            }
    +
    +
    +            int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
    +            if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
    +                maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
    +            }
    +            int reconsumeTimes = requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes();
    +            if (reconsumeTimes >= maxReconsumeTimes) {
    +                newTopic = MixAll.getDLQTopic(groupName);
    +                queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
    +                topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, //
    +                        DLQ_NUMS_PER_GROUP, //
    +                        PermName.PERM_WRITE, 0
    +                );
    +                if (null == topicConfig) {
    +                    response.setCode(ResponseCode.SYSTEM_ERROR);
    +                    response.setRemark("topic[" + newTopic + "] not exist");
    +                    return response;
    +                }
    +            }
    +        }
    +        if (newTopic.length() > Byte.MAX_VALUE) {
    +            response.setCode(ResponseCode.MESSAGE_ILLEGAL);
    +            response.setRemark("message topic length too long " + newTopic.length());
    +            return response;
    +        }
    +
    +
    +        MessageExtBatch messageExtBatch = new MessageExtBatch();
    +        messageExtBatch.setTopic(newTopic);
    --- End diff --
    
    Drawback 1 : only support the same topic


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rocketmq issue #53: [ROCKETMQ-80] Add batch feature

Posted by dongeforever <gi...@git.apache.org>.
Github user dongeforever commented on the issue:

    https://github.com/apache/incubator-rocketmq/pull/53
  
    @zhouxinyu yeah, this is a problem. When new client send batched messages to old server, it will get no error, for the batched messages are treated as normal message internally. 
    
    Maybe I need a new request code, therefore the old server cannot recognize it and throw error.
      


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rocketmq issue #53: [ROCKETMQ-80] Add batch feature

Posted by Jaskey <gi...@git.apache.org>.
Github user Jaskey commented on the issue:

    https://github.com/apache/incubator-rocketmq/pull/53
  
    @dongeforever I know your implementation.What I suggest is that you add batch id for this so that we can inditify them, which is actully a very minimum effort for you.
    
    And as I sai,  in the future we can optimize consumeOrderlyService with batch id.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rocketmq issue #53: [ROCKETMQ-80] Add batch feature

Posted by zhouxinyu <gi...@git.apache.org>.
Github user zhouxinyu commented on the issue:

    https://github.com/apache/incubator-rocketmq/pull/53
  
    Hi, @shroman @Jaskey @lizhanhui , what's your opinion about this updated PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rocketmq issue #53: [ROCKETMQ-80] Add batch feature

Posted by dongeforever <gi...@git.apache.org>.
Github user dongeforever commented on the issue:

    https://github.com/apache/incubator-rocketmq/pull/53
  
    @zhouxinyu Note that the batch is a new api, I add a new request code named "SEND_BATCH_MESSAGE". As the old broker cannot recognize it,  the compatible problem is gone.
    
    Also I use the ThreadLocal to avoid race condition for MessageExtBatchEncoder, which previously was cached in the blocking queue.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rocketmq issue #53: [ROCKETMQ-80] Add batch feature

Posted by vongosling <gi...@git.apache.org>.
Github user vongosling commented on the issue:

    https://github.com/apache/incubator-rocketmq/pull/53
  
    Yep agreed @lizhanhui . I will make some comment as much detail as possible.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rocketmq pull request #53: [ROCKETMQ-80] Add batch feature

Posted by vongosling <gi...@git.apache.org>.
Github user vongosling commented on a diff in the pull request:

    https://github.com/apache/incubator-rocketmq/pull/53#discussion_r101981097
  
    --- Diff: broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java ---
    @@ -442,6 +449,202 @@ private RemotingCommand sendMessage(final ChannelHandlerContext ctx, //
             return response;
         }
     
    +    private RemotingCommand sendBatchMessage(final ChannelHandlerContext ctx, //
    +                                        final RemotingCommand request, //
    +                                        final SendMessageContext sendMessageContext, //
    +                                        final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
    +
    +        final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
    +        final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader();
    +
    +
    +        response.setOpaque(request.getOpaque());
    +
    +        response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());
    +        response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));
    +
    +        if (log.isDebugEnabled()) {
    +            log.debug("receive SendMessage request command, " + request);
    +        }
    +
    +        final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();
    +        if (this.brokerController.getMessageStore().now() < startTimstamp) {
    +            response.setCode(ResponseCode.SYSTEM_ERROR);
    +            response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp)));
    +            return response;
    +        }
    +
    +        response.setCode(-1);
    +        super.msgCheck(ctx, requestHeader, response);
    --- End diff --
    
    Why not put msgCheck this precondition method into the first row of the outer method


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rocketmq pull request #53: [ROCKETMQ-80] Add batch feature

Posted by vongosling <gi...@git.apache.org>.
Github user vongosling commented on a diff in the pull request:

    https://github.com/apache/incubator-rocketmq/pull/53#discussion_r101989915
  
    --- Diff: broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java ---
    @@ -442,6 +449,202 @@ private RemotingCommand sendMessage(final ChannelHandlerContext ctx, //
             return response;
         }
     
    +    private RemotingCommand sendBatchMessage(final ChannelHandlerContext ctx, //
    +                                        final RemotingCommand request, //
    +                                        final SendMessageContext sendMessageContext, //
    +                                        final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
    +
    +        final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
    +        final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader();
    +
    +
    +        response.setOpaque(request.getOpaque());
    +
    +        response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());
    +        response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));
    +
    +        if (log.isDebugEnabled()) {
    +            log.debug("receive SendMessage request command, " + request);
    +        }
    +
    +        final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();
    +        if (this.brokerController.getMessageStore().now() < startTimstamp) {
    +            response.setCode(ResponseCode.SYSTEM_ERROR);
    +            response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp)));
    +            return response;
    +        }
    +
    +        response.setCode(-1);
    +        super.msgCheck(ctx, requestHeader, response);
    +        if (response.getCode() != -1) {
    +            return response;
    +        }
    +
    +
    +        int queueIdInt = requestHeader.getQueueId();
    +        TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
    +
    +        if (queueIdInt < 0) {
    +            queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();
    +        }
    +
    +        int sysFlag = requestHeader.getSysFlag();
    +        if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) {
    +            sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG;
    +        }
    +
    +        String newTopic = requestHeader.getTopic();
    +        if (null != newTopic && newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
    +            String groupName = newTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
    +            SubscriptionGroupConfig subscriptionGroupConfig =
    +                    this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupName);
    +            if (null == subscriptionGroupConfig) {
    +                response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
    +                response.setRemark(
    +                        "subscription group not exist, " + groupName + " " + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
    +                return response;
    +            }
    +
    +
    +            int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
    +            if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
    +                maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
    +            }
    +            int reconsumeTimes = requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes();
    +            if (reconsumeTimes >= maxReconsumeTimes) {
    +                newTopic = MixAll.getDLQTopic(groupName);
    +                queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
    +                topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, //
    +                        DLQ_NUMS_PER_GROUP, //
    +                        PermName.PERM_WRITE, 0
    +                );
    +                if (null == topicConfig) {
    +                    response.setCode(ResponseCode.SYSTEM_ERROR);
    +                    response.setRemark("topic[" + newTopic + "] not exist");
    +                    return response;
    +                }
    +            }
    +        }
    +        if (newTopic.length() > Byte.MAX_VALUE) {
    +            response.setCode(ResponseCode.MESSAGE_ILLEGAL);
    +            response.setRemark("message topic length too long " + newTopic.length());
    +            return response;
    +        }
    +
    +
    +        MessageExtBatch messageExtBatch = new MessageExtBatch();
    +        messageExtBatch.setTopic(newTopic);
    +        messageExtBatch.setBody(request.getBody());
    +        messageExtBatch.setQueueId(queueIdInt);
    +        messageExtBatch.setSysFlag(sysFlag);
    +        messageExtBatch.setBornTimestamp(requestHeader.getBornTimestamp());
    +        messageExtBatch.setBornHost(ctx.channel().remoteAddress());
    +        messageExtBatch.setStoreHost(this.getStoreHost());
    +        messageExtBatch.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
    +
    --- End diff --
    
    Many duplicated code


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rocketmq pull request #53: [ROCKETMQ-80] Add batch feature

Posted by vongosling <gi...@git.apache.org>.
Github user vongosling commented on a diff in the pull request:

    https://github.com/apache/incubator-rocketmq/pull/53#discussion_r101965587
  
    --- Diff: broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java ---
    @@ -72,7 +73,13 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand
     
                     mqtraceContext = buildMsgContext(ctx, requestHeader);
                     this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
    -                final RemotingCommand response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);
    +
    +                RemotingCommand response = null;
    --- End diff --
    
    Please remove redundant initializer


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rocketmq issue #53: [ROCKETMQ-80] Add batch feature

Posted by coveralls <gi...@git.apache.org>.
Github user coveralls commented on the issue:

    https://github.com/apache/incubator-rocketmq/pull/53
  
    
    [![Coverage Status](https://coveralls.io/builds/10543227/badge)](https://coveralls.io/builds/10543227)
    
    Coverage increased (+0.9%) to 31.863% when pulling **854d4693ad99ae12485762f1f3bec9a43ae3c8c7 on dongeforever:ROCKETMQ-80** into **d7decc84abc32dab63ee423d4d904f28d18cb1d7 on apache:develop**.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rocketmq issue #53: [ROCKETMQ-80] Add batch feature

Posted by vongosling <gi...@git.apache.org>.
Github user vongosling commented on the issue:

    https://github.com/apache/incubator-rocketmq/pull/53
  
    LGTM~


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rocketmq issue #53: [ROCKETMQ-80] Add batch feature

Posted by dongeforever <gi...@git.apache.org>.
Github user dongeforever commented on the issue:

    https://github.com/apache/incubator-rocketmq/pull/53
  
    this PR has been merged into develop branch


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rocketmq pull request #53: [ROCKETMQ-80] Add batch feature

Posted by vongosling <gi...@git.apache.org>.
Github user vongosling commented on a diff in the pull request:

    https://github.com/apache/incubator-rocketmq/pull/53#discussion_r101995981
  
    --- Diff: store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java ---
    @@ -331,7 +331,7 @@ public long getMinOffsetInQueue() {
         public void putMessagePositionInfoWrapper(long offset, int size, long tagsCode, long storeTimestamp,
             long logicOffset) {
             final int maxRetries = 30;
    -        boolean canWrite = this.defaultMessageStore.getRunningFlags().isWriteable();
    +        boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable();
    --- End diff --
    
    what is isCQWriteable, why not change the name


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rocketmq pull request #53: [ROCKETMQ-80] Add batch feature

Posted by vongosling <gi...@git.apache.org>.
Github user vongosling commented on a diff in the pull request:

    https://github.com/apache/incubator-rocketmq/pull/53#discussion_r101995717
  
    --- Diff: store/src/main/java/org/apache/rocketmq/store/AppendMessageResult.java ---
    @@ -119,6 +129,7 @@ public String toString() {
                 ", storeTimestamp=" + storeTimestamp +
                 ", logicsOffset=" + logicsOffset +
                 ", pagecacheRT=" + pagecacheRT +
    --- End diff --
    
    Consider apache commons lang toStringBuilder method to reflect fields


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rocketmq pull request #53: [ROCKETMQ-80] Add batch feature

Posted by vongosling <gi...@git.apache.org>.
Github user vongosling commented on a diff in the pull request:

    https://github.com/apache/incubator-rocketmq/pull/53#discussion_r101992848
  
    --- Diff: client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java ---
    @@ -595,8 +596,11 @@ private SendResult sendKernelImpl(final Message msg, //
     
                 byte[] prevBody = msg.getBody();
                 try {
    -
    -                MessageClientIDSetter.setUniqID(msg);
    +                if (msg instanceof MessageBatch) {
    --- End diff --
    
    Why not 
        if (!(msg instanceof MessageBatch)) {
                        MessageClientIDSetter.setUniqID(msg);
                    } 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rocketmq issue #53: [ROCKETMQ-80] Add batch feature

Posted by vongosling <gi...@git.apache.org>.
Github user vongosling commented on the issue:

    https://github.com/apache/incubator-rocketmq/pull/53
  
    @dongeforever we can continue to polish this PR, IMO. if you have any problem. please let me know. BTW, can you post your performance test result for us


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rocketmq pull request #53: [ROCKETMQ-80] Add batch feature

Posted by dongeforever <gi...@git.apache.org>.
Github user dongeforever closed the pull request at:

    https://github.com/apache/incubator-rocketmq/pull/53


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rocketmq issue #53: [ROCKETMQ-80] Add batch feature

Posted by dongeforever <gi...@git.apache.org>.
Github user dongeforever commented on the issue:

    https://github.com/apache/incubator-rocketmq/pull/53
  
    @Jaskey Now there is no batch id, but the messages in one batch are sent to the same queue, and they can only be sent all successfully or all unsuccessfully.
    You could check the code or test it.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rocketmq issue #53: [ROCKETMQ-80] Add batch feature

Posted by coveralls <gi...@git.apache.org>.
Github user coveralls commented on the issue:

    https://github.com/apache/incubator-rocketmq/pull/53
  
    
    [![Coverage Status](https://coveralls.io/builds/10167156/badge)](https://coveralls.io/builds/10167156)
    
    Coverage increased (+0.5%) to 32.046% when pulling **6579e7a9299f9e6f6ac49a2dfdf31a6e278c25ff on dongeforever:ROCKETMQ-80** into **573b22c37806a21543b90707bcce6022243a62da on apache:master**.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rocketmq pull request #53: [ROCKETMQ-80] Add batch feature

Posted by vongosling <gi...@git.apache.org>.
Github user vongosling commented on a diff in the pull request:

    https://github.com/apache/incubator-rocketmq/pull/53#discussion_r101991874
  
    --- Diff: client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java ---
    @@ -278,14 +280,14 @@ public void createTopic(final String addr, final String defaultTopic, final Topi
         }
     
         public SendResult sendMessage(//
    -        final String addr, // 1
    -        final String brokerName, // 2
    -        final Message msg, // 3
    -        final SendMessageRequestHeader requestHeader, // 4
    -        final long timeoutMillis, // 5
    -        final CommunicationMode communicationMode, // 6
    -        final SendMessageContext context, // 7
    -        final DefaultMQProducerImpl producer // 8
    +                                  final String addr, // 1
    --- End diff --
    
    unnecessary format as our code style, http://rocketmq.incubator.apache.org/docs/code-guidelines/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rocketmq pull request #53: [ROCKETMQ-80] Add batch feature

Posted by vongosling <gi...@git.apache.org>.
Github user vongosling commented on a diff in the pull request:

    https://github.com/apache/incubator-rocketmq/pull/53#discussion_r101994094
  
    --- Diff: common/src/main/java/org/apache/rocketmq/common/message/MessageBatch.java ---
    @@ -0,0 +1,77 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.rocketmq.common.message;
    +
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Iterator;
    +import java.util.List;
    +
    +public class MessageBatch extends Message implements Iterable<Message> {
    --- End diff --
    
    May be, we can consider to implement equals in collection senario


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rocketmq issue #53: [ROCKETMQ-80] Add batch feature

Posted by lizhanhui <gi...@git.apache.org>.
Github user lizhanhui commented on the issue:

    https://github.com/apache/incubator-rocketmq/pull/53
  
    This feature is definitely nice to have and it expands scenarios that RocketMQ fits best. For example, RocketMQ may have close, if not better, performance with Kafka in log collecting usage.
    
    That said, this is a pretty important feature and it matters so much that we need to get it right at the beginning.  We'd better have a design document first, then discuss various impacts it brings about in the mailing list.
    
    As for this PR, it's generally good, yet, still needs more work: code duplication, message validation logic discrepancy, excessive constraints on usages(No delay messages, messages of a batch must have same topic, etc) and previously mentioned compatibility issue with older brokers.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rocketmq issue #53: [ROCKETMQ-80] Add batch feature

Posted by dongeforever <gi...@git.apache.org>.
Github user dongeforever commented on the issue:

    https://github.com/apache/incubator-rocketmq/pull/53
  
    @Jaskey Sorry. You are right. And it is worth a new PR, maybe we can add batch id along with a new optimized consumeOrderlyService.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rocketmq issue #53: [ROCKETMQ-80] Add batch feature

Posted by vongosling <gi...@git.apache.org>.
Github user vongosling commented on the issue:

    https://github.com/apache/incubator-rocketmq/pull/53
  
    Cool, Thanks @dongeforever providing this feature. We will have a look at this implementation. please hold your horses :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rocketmq issue #53: [ROCKETMQ-80] Add batch feature

Posted by coveralls <gi...@git.apache.org>.
Github user coveralls commented on the issue:

    https://github.com/apache/incubator-rocketmq/pull/53
  
    
    [![Coverage Status](https://coveralls.io/builds/10029487/badge)](https://coveralls.io/builds/10029487)
    
    Coverage increased (+0.4%) to 31.909% when pulling **e03b6e6a496526848df603fd406b77aa6afc87d2 on dongeforever:ROCKETMQ-80** into **9a2de7b555b390c1c55f5a275d6fe7e251ac3f62 on apache:master**.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---