You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2020/03/25 08:25:39 UTC

[GitHub] [rocketmq-spring] GongZhengMe opened a new issue #242: Timeout param maybe influence the batch consumer Message result

GongZhengMe opened a new issue #242: Timeout param maybe influence the batch consumer Message result
URL: https://github.com/apache/rocketmq-spring/issues/242
 
 
   I write a demo about Batch Message and I think maybe I find a bug about it.
   plz see the two demo ,I will show how about that.
   There is one consumer demo with two producer demo  ,the difference of producer demo which param is timeout.
   ```
   @Service
   @RocketMQMessageListener(topic = "msgBatchTopic",consumerGroup ="message-batch-consumer")
   public class BatchMsgConsumer implements RocketMQListener<MessageExt> {
       @Override
       public void onMessage(MessageExt message) {
           System.out.printf("------- MessageBatchConsumer received message, msgId: %s, body:%s \n", message.getMsgId(), new String(message.getBody()));
       }
   }
   ```
   the first producer demo ,the method syncSend without timeout param
   ```
    /**
        * RocketMQ发送消息批处理,没有timeout参数
        *
        * @author gongzheng
        * @date 2020/3/16
        */
       @RequestMapping("BatchSend")
       public void batchSend() {
           List<Message> msgs = new ArrayList<Message>();
           for (int i = 0; i < 10; i++) {
               msgs.add(MessageBuilder.withPayload("Hello RocketMQ Batch Msg#" + i).
                       setHeader(RocketMQHeaders.KEYS, "KEY_" + i).build());
           }
   
           SendResult sr = rocketMQTemplate.syncSend("msgBatchTopic", msgs);
   
           System.out.printf("--- Batch messages send result :" + sr + "\n");
       }
   ```
   the console print result
   ```
   --- Batch messages send result :SendResult [sendStatus=SEND_OK, msgId=FE80000000000000431374F820C47CCC000018B4AAC27F0F62FF000A, offsetMsgId=C0A82B6900002A9F000000000001BCC3, messageQueue=MessageQueue [topic=msgBatchTopic, brokerName=localhost, queueId=0], queueOffset=2]
   ------- MessageBatchConsumer received message, msgId: FE80000000000000431374F820C47CCC000018B4AAC27F0F62FF000A, body:[{"payload":"Hello RocketMQ Batch Msg#0","headers":{"KEYS":"KEY_0","id":"a09d42c5-48fb-c0a2-59e2-1255944037b7","timestamp":1585123714814}},{"payload":"Hello RocketMQ Batch Msg#1","headers":{"KEYS":"KEY_1","id":"90430362-ecbb-32da-4276-a2badd75c3de","timestamp":1585123714814}},{"payload":"Hello RocketMQ Batch Msg#2","headers":{"KEYS":"KEY_2","id":"4d3ed9d1-2d5b-73a5-11cf-ec12ccbd7693","timestamp":1585123714814}},{"payload":"Hello RocketMQ Batch Msg#3","headers":{"KEYS":"KEY_3","id":"5eaeca40-887e-36c8-f60a-7256344b9885","timestamp":1585123714814}},{"payload":"Hello RocketMQ Batch Msg#4","headers":{"KEYS":"KEY_4","id":"284dd59e-2ca9-69f3-5d9f-306e534a6cd5","timestamp":1585123714814}},{"payload":"Hello RocketMQ Batch Msg#5","headers":{"KEYS":"KEY_5","id":"8115e99f-01fb-d2b6-ab45-8a437c6146d2","timestamp":1585123714814}},{"payload":"Hello RocketMQ Batch Msg#6","headers":{"KEYS":"KEY_6","id":"82618c85-ec5b-2e26-267f-3e1df517b1e0","timestamp":1585123714814}},{"payload":"Hello RocketMQ Batch Msg#7","headers":{"KEYS":"KEY_7","id":"a1914316-5197-dbae-34e4-f4dc22476a0d","timestamp":1585123714814}},{"payload":"Hello RocketMQ Batch Msg#8","headers":{"KEYS":"KEY_8","id":"fe90e385-d92b-54fb-c7b2-c031e2f952ac","timestamp":1585123714814}},{"payload":"Hello RocketMQ Batch Msg#9","headers":{"KEYS":"KEY_9","id":"fcca974c-74d3-c797-1750-aa2c385e73cb","timestamp":1585123714814}}] 
   ```
   consumer just consume message once and the ten payloads in the message
   
   the second producer demo ,the method syncSend with timeout param
   ```
     @RequestMapping("BatchSend")
       public void batchSend() {
           List<Message> msgs = new ArrayList<Message>();
           for (int i = 0; i < 10; i++) {
               msgs.add(MessageBuilder.withPayload("Hello RocketMQ Batch Msg#" + i).
                       setHeader(RocketMQHeaders.KEYS, "KEY_" + i).build());
           }
   
           SendResult sr = rocketMQTemplate.syncSend("msgBatchTopic", msgs,60000);
   
           System.out.printf("--- Batch messages send result :" + sr + "\n");
       }
   ```
   the console print result
   ```
   --- Batch messages send result :SendResult [sendStatus=SEND_OK, msgId=FE80000000000000431374F820C47CCC000018B4AAC27F17EF2A0000,FE80000000000000431374F820C47CCC000018B4AAC27F17EF2A0001,FE80000000000000431374F820C47CCC000018B4AAC27F17EF2A0002,FE80000000000000431374F820C47CCC000018B4AAC27F17EF2A0003,FE80000000000000431374F820C47CCC000018B4AAC27F17EF2A0004,FE80000000000000431374F820C47CCC000018B4AAC27F17EF2A0005,FE80000000000000431374F820C47CCC000018B4AAC27F17EF2A0006,FE80000000000000431374F820C47CCC000018B4AAC27F17EF2A0007,FE80000000000000431374F820C47CCC000018B4AAC27F17EF2A0008,FE80000000000000431374F820C47CCC000018B4AAC27F17EF2A0009, offsetMsgId=C0A82B6900002A9F000000000001C520,C0A82B6900002A9F000000000001C65F,C0A82B6900002A9F000000000001C79E,C0A82B6900002A9F000000000001C8DD,C0A82B6900002A9F000000000001CA1C,C0A82B6900002A9F000000000001CB5B,C0A82B6900002A9F000000000001CC9A,C0A82B6900002A9F000000000001CDD9,C0A82B6900002A9F000000000001CF18,C0A82B6900002A9F000000000001D057, messageQueue=MessageQueue [topic=msgBatchTopic, brokerName=localhost, queueId=1], queueOffset=12]
   ------- MessageBatchConsumer received message, msgId: FE80000000000000431374F820C47CCC000018B4AAC27F17EF2A0000, body:Hello RocketMQ Batch Msg#0 
   ------- MessageBatchConsumer received message, msgId: FE80000000000000431374F820C47CCC000018B4AAC27F17EF2A0001, body:Hello RocketMQ Batch Msg#1 
   ------- MessageBatchConsumer received message, msgId: FE80000000000000431374F820C47CCC000018B4AAC27F17EF2A0002, body:Hello RocketMQ Batch Msg#2 
   ------- MessageBatchConsumer received message, msgId: FE80000000000000431374F820C47CCC000018B4AAC27F17EF2A0003, body:Hello RocketMQ Batch Msg#3 
   ------- MessageBatchConsumer received message, msgId: FE80000000000000431374F820C47CCC000018B4AAC27F17EF2A0004, body:Hello RocketMQ Batch Msg#4 
   ------- MessageBatchConsumer received message, msgId: FE80000000000000431374F820C47CCC000018B4AAC27F17EF2A0006, body:Hello RocketMQ Batch Msg#6 
   ------- MessageBatchConsumer received message, msgId: FE80000000000000431374F820C47CCC000018B4AAC27F17EF2A0005, body:Hello RocketMQ Batch Msg#5 
   ------- MessageBatchConsumer received message, msgId: FE80000000000000431374F820C47CCC000018B4AAC27F17EF2A0007, body:Hello RocketMQ Batch Msg#7 
   ------- MessageBatchConsumer received message, msgId: FE80000000000000431374F820C47CCC000018B4AAC27F17EF2A0008, body:Hello RocketMQ Batch Msg#8 
   ------- MessageBatchConsumer received message, msgId: FE80000000000000431374F820C47CCC000018B4AAC27F17EF2A0009, body:Hello RocketMQ Batch Msg#9 
   ```
   the consumer consume ten messages.
   I think the timeout param shouldn't  influence the consumer consume message,so I think this is a bug.
   And another thing ,I see the rocketMQ support a method for  split the batch message, and I don't know how to add the method in rocketMQ-spring 
   this is the method:
   ```
   public class ListSplitter implements Iterator<List<Message>> {
       private final int SIZE_LIMIT = 1000 * 1000;
       private final List<Message> messages;
       private int currIndex;
       public ListSplitter(List<Message> messages) {
               this.messages = messages;
       }
       @Override public boolean hasNext() {
           return currIndex < messages.size();
       }
       @Override public List<Message> next() {
           int nextIndex = currIndex;
           int totalSize = 0;
           for (; nextIndex < messages.size(); nextIndex++) {
               Message message = messages.get(nextIndex);
               int tmpSize = message.getTopic().length() + message.getBody().length;
               Map<String, String> properties = message.getProperties();
               for (Map.Entry<String, String> entry : properties.entrySet()) {
                   tmpSize += entry.getKey().length() + entry.getValue().length();
               }
               tmpSize = tmpSize + 20; //for log overhead
               if (tmpSize > SIZE_LIMIT) {
                   //it is unexpected that single message exceeds the SIZE_LIMIT
                   //here just let it go, otherwise it will block the splitting process
                   if (nextIndex - currIndex == 0) {
                      //if the next sublist has no element, add this one and then break, otherwise just break
                      nextIndex++;  
                   }
                   break;
               }
               if (tmpSize + totalSize > SIZE_LIMIT) {
                   break;
               } else {
                   totalSize += tmpSize;
               }
       
           }
           List<Message> subList = messages.subList(currIndex, nextIndex);
           currIndex = nextIndex;
           return subList;
       }
   }
   //then you could split the large list into small ones:
   ListSplitter splitter = new ListSplitter(messages);
   while (splitter.hasNext()) {
      try {
          List<Message>  listItem = splitter.next();
          producer.send(listItem);
      } catch (Exception e) {
          e.printStackTrace();
          //handle the error
      }
   }
   ```
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [rocketmq-spring] RongtongJin closed issue #242: Timeout param maybe influence the batch consumer Message result

Posted by GitBox <gi...@apache.org>.
RongtongJin closed issue #242: Timeout param maybe influence the batch consumer Message result
URL: https://github.com/apache/rocketmq-spring/issues/242
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [rocketmq-spring] RongtongJin commented on issue #242: Timeout param maybe influence the batch consumer Message result

Posted by GitBox <gi...@apache.org>.
RongtongJin commented on issue #242: Timeout param maybe influence the batch consumer Message result
URL: https://github.com/apache/rocketmq-spring/issues/242#issuecomment-603913850
 
 
   @GongZhengMe Good catch! I found that when there is no timeout parameter, `org.apache.rocketmq.spring.core.RocketMQTemplate#syncSend(java.lang.String, java.lang.Object)` is actually called.  `org.apache.rocketmq.spring.core.RocketMQTemplate#syncSend(java.lang.String, java.util.Collection<T>,)` method is  missing. Could you please submit a PR to complement this method?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [rocketmq-spring] RongtongJin commented on issue #242: Timeout param maybe influence the batch consumer Message result

Posted by GitBox <gi...@apache.org>.
RongtongJin commented on issue #242: Timeout param maybe influence the batch consumer Message result
URL: https://github.com/apache/rocketmq-spring/issues/242#issuecomment-603914032
 
 
   > @GongZhengMe Good catch! I found that when there is no timeout parameter, `org.apache.rocketmq.spring.core.RocketMQTemplate#syncSend(java.lang.String, java.lang.Object)` is actually called. `org.apache.rocketmq.spring.core.RocketMQTemplate#syncSend(java.lang.String, java.util.Collection<T>)` method is missing. Could you please submit a PR to complement this method?
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [rocketmq-spring] RongtongJin edited a comment on issue #242: Timeout param maybe influence the batch consumer Message result

Posted by GitBox <gi...@apache.org>.
RongtongJin edited a comment on issue #242: Timeout param maybe influence the batch consumer Message result
URL: https://github.com/apache/rocketmq-spring/issues/242#issuecomment-603914032
 
 
   @GongZhengMe Good catch! I found that when there is no timeout parameter, `org.apache.rocketmq.spring.core.RocketMQTemplate#syncSend(java.lang.String, java.lang.Object)` is actually called. `org.apache.rocketmq.spring.core.RocketMQTemplate#syncSend(java.lang.String, java.util.Collection<T>)` method is missing. Could you please submit a PR to complement this method?
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [rocketmq-spring] GongZhengMe commented on issue #242: Timeout param maybe influence the batch consumer Message result

Posted by GitBox <gi...@apache.org>.
GongZhengMe commented on issue #242: Timeout param maybe influence the batch consumer Message result
URL: https://github.com/apache/rocketmq-spring/issues/242#issuecomment-604202533
 
 
   I don't have the experience about Pull Request.I will try about it. If I can't fix it , I will tell you after I try my best.
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [rocketmq-spring] RongtongJin removed a comment on issue #242: Timeout param maybe influence the batch consumer Message result

Posted by GitBox <gi...@apache.org>.
RongtongJin removed a comment on issue #242: Timeout param maybe influence the batch consumer Message result
URL: https://github.com/apache/rocketmq-spring/issues/242#issuecomment-603914032
 
 
   @GongZhengMe Good catch! I found that when there is no timeout parameter, `org.apache.rocketmq.spring.core.RocketMQTemplate#syncSend(java.lang.String, java.lang.Object)` is actually called. `org.apache.rocketmq.spring.core.RocketMQTemplate#syncSend(java.lang.String, java.util.Collection<T>)` method is missing. Could you please submit a PR to complement this method?
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [rocketmq-spring] GongZhengMe commented on issue #242: Timeout param maybe influence the batch consumer Message result

Posted by GitBox <gi...@apache.org>.
GongZhengMe commented on issue #242: Timeout param maybe influence the batch consumer Message result
URL: https://github.com/apache/rocketmq-spring/issues/242#issuecomment-604290650
 
 
   I pull a request https://github.com/apache/rocketmq-spring/pull/244,but  coverage/coveralls doesn't let my method pass .
   because I copy the method :
   `syncSend(String destination, Collection<T> messages, long timeout) `
   the repetition rate is  too high.
   So I want to override `syncSend(String destination, Collection<T> messages, long timeout) `
   But now I have a problem about it. the timeout param's type is long.So I can't give long a default value:
   - null :long can't be determine null
   - 0: if I give 0 as default value ,the producer will have a timeout Exception.
   ps: 0 I don't write demo ,but I remember another method like scheduleMessage demo  if I set timeout as 0,the producer has exception.
   So I don't know how to override it.I think I can't alter the timeout param's type be Long.
   So I just copy the method `syncSend(String destination, Collection<T> messages, long timeout) ` for fix bug.
   But your code is strict in repetition rate.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services