You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2020/03/26 11:25:11 UTC

[GitHub] [rocketmq-spring] GongZhengMe commented on a change in pull request #244: [issue #242]Fix Bug Of BatchMessage SyncSend without timeout param

GongZhengMe commented on a change in pull request #244: [issue #242]Fix Bug Of BatchMessage SyncSend without timeout param
URL: https://github.com/apache/rocketmq-spring/pull/244#discussion_r398498276
 
 

 ##########
 File path: rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
 ##########
 @@ -466,6 +466,42 @@ public SendResult syncSend(String destination, Message<?> message, long timeout)
         return syncSend(destination, message, timeout, 0);
     }
 
+    /**
+     * syncSend batch messages
+     *
+     * @param destination formats: `topicName:tags`
+     * @param messages Collection of {@link org.springframework.messaging.Message}
+     * @return {@link SendResult}
+     */
+    public <T extends Message> SendResult syncSend(String destination, Collection<T> messages) {
+        if (Objects.isNull(messages) || messages.size() == 0) {
+            log.error("syncSend with batch failed. destination:{}, messages is empty ", destination);
+            throw new IllegalArgumentException("`messages` can not be empty");
+        }
+
+        try {
+            long now = System.currentTimeMillis();
+            Collection<org.apache.rocketmq.common.message.Message> rmqMsgs = new ArrayList<>();
+            for (Message msg : messages) {
+                if (Objects.isNull(msg) || Objects.isNull(msg.getPayload())) {
+                    log.warn("Found a message empty in the batch, skip it");
+                    continue;
+                }
+                rmqMsgs.add(this.createRocketMqMessage(destination, msg));
+            }
+
+            SendResult sendResult = producer.send(rmqMsgs);
+            long costTime = System.currentTimeMillis() - now;
+            if (log.isDebugEnabled()) {
+                log.debug("send messages cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
+            }
+            return sendResult;
+        } catch (Exception e) {
+            log.error("syncSend with batch failed. destination:{}, messages.size:{} ", destination, messages.size());
+            throw new MessagingException(e.getMessage(), e);
+        }
+    }
 
 Review comment:
   Thanks for your solution,this is actually can made the code be cleaner.I think I ignore the producer default `sendMsgTimeOut` value in broker config.This is a lesson for me . I learn how about pull a request and remember  the param which producer sendTimeOut.Thanks for your kind of giving me an opportunity that I can really feeling join with RocketMq-Spring.
   End of all.Should I will change my branch code as your cleaner code and pull request?Or you already finish it.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services