You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2020/03/26 11:06:10 UTC

[GitHub] [rocketmq-spring] RongtongJin commented on a change in pull request #244: [issue #242]Fix Bug Of BatchMessage SyncSend without timeout param

RongtongJin commented on a change in pull request #244: [issue #242]Fix Bug Of BatchMessage SyncSend without timeout param
URL: https://github.com/apache/rocketmq-spring/pull/244#discussion_r398487525
 
 

 ##########
 File path: rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
 ##########
 @@ -466,6 +466,42 @@ public SendResult syncSend(String destination, Message<?> message, long timeout)
         return syncSend(destination, message, timeout, 0);
     }
 
+    /**
+     * syncSend batch messages
+     *
+     * @param destination formats: `topicName:tags`
+     * @param messages Collection of {@link org.springframework.messaging.Message}
+     * @return {@link SendResult}
+     */
+    public <T extends Message> SendResult syncSend(String destination, Collection<T> messages) {
+        if (Objects.isNull(messages) || messages.size() == 0) {
+            log.error("syncSend with batch failed. destination:{}, messages is empty ", destination);
+            throw new IllegalArgumentException("`messages` can not be empty");
+        }
+
+        try {
+            long now = System.currentTimeMillis();
+            Collection<org.apache.rocketmq.common.message.Message> rmqMsgs = new ArrayList<>();
+            for (Message msg : messages) {
+                if (Objects.isNull(msg) || Objects.isNull(msg.getPayload())) {
+                    log.warn("Found a message empty in the batch, skip it");
+                    continue;
+                }
+                rmqMsgs.add(this.createRocketMqMessage(destination, msg));
+            }
+
+            SendResult sendResult = producer.send(rmqMsgs);
+            long costTime = System.currentTimeMillis() - now;
+            if (log.isDebugEnabled()) {
+                log.debug("send messages cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
+            }
+            return sendResult;
+        } catch (Exception e) {
+            log.error("syncSend with batch failed. destination:{}, messages.size:{} ", destination, messages.size());
+            throw new MessagingException(e.getMessage(), e);
+        }
+    }
 
 Review comment:
   It can be replaced with
   ```java
   public <T extends Message> SendResult syncSend(String destination, Collection<T> messages) {
           return syncSend(destination, messages, producer.getSendMsgTimeout());
    }
   ```
   And the code will be cleaner.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services