You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2020/03/31 13:09:30 UTC
[rocketmq-spring] branch master updated (8e6f11c -> 7d50974)
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-spring.git.
from 8e6f11c docs(readme):typo fix
new 7d93931 Add Method:#syncSend(java.lang.String, java.util.Collection<T>) Fix the bug of BatchMessage syncSend without timeout
new 0927fe8 Add Method:#syncSend(java.lang.String, java.util.Collection<T>) Fix the bug of BatchMessage syncSend without timeout
new fefe366 Fix code style error due to mvn build failed
new 4432a7b Edit code style as Apache Rocket MQ
new 4d498cb Edit code style as Apache Rocket MQ
new 7d50974 Edit code style as Apache Rocket MQ
The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
.../org/apache/rocketmq/spring/core/RocketMQTemplate.java | 13 ++++++++++++-
1 file changed, 12 insertions(+), 1 deletion(-)
[rocketmq-spring] 06/06: Edit code style as Apache Rocket MQ
Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-spring.git
commit 7d50974182952c68fc298ef150274cba38597c03
Author: GongZhengMe <79...@qq.com>
AuthorDate: Mon Mar 30 18:06:04 2020 +0800
Edit code style as Apache Rocket MQ
---
.../rocketmq/spring/core/RocketMQTemplate.java | 349 ++++++++++-----------
1 file changed, 166 insertions(+), 183 deletions(-)
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
index 1ac4e78..6683f10 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
@@ -88,8 +88,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
/**
* @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message} the message to be sent.
- * @param type The type of T
+ * @param message {@link org.springframework.messaging.Message} the message to be sent.
+ * @param type The type of T
* @return
*/
public <T> T sendAndReceive(String destination, Message<?> message, Type type) {
@@ -98,8 +98,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
/**
* @param destination formats: `topicName:tags`
- * @param payload the payload to be sent.
- * @param type The type of T
+ * @param payload the payload to be sent.
+ * @param type The type of T
* @return
*/
public <T> T sendAndReceive(String destination, Object payload, Type type) {
@@ -108,9 +108,9 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
/**
* @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message} the message to be sent.
- * @param type The type of T
- * @param timeout send timeout in millis
+ * @param message {@link org.springframework.messaging.Message} the message to be sent.
+ * @param type The type of T
+ * @param timeout send timeout in millis
* @return
*/
public <T> T sendAndReceive(String destination, Message<?> message, Type type, long timeout) {
@@ -119,9 +119,9 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
/**
* @param destination formats: `topicName:tags`
- * @param payload the payload to be sent.
- * @param type The type of T
- * @param timeout send timeout in millis
+ * @param payload the payload to be sent.
+ * @param type The type of T
+ * @param timeout send timeout in millis
* @return
*/
public <T> T sendAndReceive(String destination, Object payload, Type type, long timeout) {
@@ -130,10 +130,10 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
/**
* @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message} the message to be sent.
- * @param type The type of T
- * @param timeout send timeout in millis
- * @param delayLevel message delay level(0 means no delay)
+ * @param message {@link org.springframework.messaging.Message} the message to be sent.
+ * @param type The type of T
+ * @param timeout send timeout in millis
+ * @param delayLevel message delay level(0 means no delay)
* @return
*/
public <T> T sendAndReceive(String destination, Message<?> message, Type type, long timeout, int delayLevel) {
@@ -142,10 +142,10 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
/**
* @param destination formats: `topicName:tags`
- * @param payload the payload to be sent.
- * @param type The type of T
- * @param timeout send timeout in millis
- * @param delayLevel message delay level(0 means no delay)
+ * @param payload the payload to be sent.
+ * @param type The type of T
+ * @param timeout send timeout in millis
+ * @param delayLevel message delay level(0 means no delay)
* @return
*/
public <T> T sendAndReceive(String destination, Object payload, Type type, long timeout, int delayLevel) {
@@ -154,9 +154,9 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
/**
* @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message} the message to be sent.
- * @param type The type of T
- * @param hashKey needed when sending message orderly
+ * @param message {@link org.springframework.messaging.Message} the message to be sent.
+ * @param type The type of T
+ * @param hashKey needed when sending message orderly
* @return
*/
public <T> T sendAndReceive(String destination, Message<?> message, Type type, String hashKey) {
@@ -165,9 +165,9 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
/**
* @param destination formats: `topicName:tags`
- * @param payload the payload to be sent.
- * @param type The type of T
- * @param hashKey needed when sending message orderly
+ * @param payload the payload to be sent.
+ * @param type The type of T
+ * @param hashKey needed when sending message orderly
* @return
*/
public <T> T sendAndReceive(String destination, Object payload, Type type, String hashKey) {
@@ -176,10 +176,10 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
/**
* @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message} the message to be sent.
- * @param type The type of T
- * @param hashKey needed when sending message orderly
- * @param timeout send timeout in millis
+ * @param message {@link org.springframework.messaging.Message} the message to be sent.
+ * @param type The type of T
+ * @param hashKey needed when sending message orderly
+ * @param timeout send timeout in millis
* @return
*/
public <T> T sendAndReceive(String destination, Message<?> message, Type type, String hashKey, long timeout) {
@@ -188,8 +188,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
/**
* @param destination formats: `topicName:tags`
- * @param payload the payload to be sent.
- * @param type The type of T
+ * @param payload the payload to be sent.
+ * @param type The type of T
* @param hashKey
* @return
*/
@@ -199,11 +199,11 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
/**
* @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message} the message to be sent.
- * @param type The type that receive
- * @param hashKey needed when sending message orderly
- * @param timeout send timeout in millis
- * @param delayLevel message delay level(0 means no delay)
+ * @param message {@link org.springframework.messaging.Message} the message to be sent.
+ * @param type The type that receive
+ * @param hashKey needed when sending message orderly
+ * @param timeout send timeout in millis
+ * @param delayLevel message delay level(0 means no delay)
* @return
*/
public <T> T sendAndReceive(String destination, Message<?> message, Type type, String hashKey,
@@ -221,14 +221,12 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
MessageExt replyMessage;
if (Objects.isNull(hashKey) || hashKey.isEmpty()) {
- replyMessage = (MessageExt)producer.request(rocketMsg, timeout);
+ replyMessage = (MessageExt) producer.request(rocketMsg, timeout);
+ } else {
+ replyMessage = (MessageExt) producer.request(rocketMsg, messageQueueSelector, hashKey, timeout);
}
- else {
- replyMessage = (MessageExt)producer.request(rocketMsg, messageQueueSelector, hashKey, timeout);
- }
- return replyMessage != null ? (T)doConvertMessage(replyMessage, type) : null;
- }
- catch (Exception e) {
+ return replyMessage != null ? (T) doConvertMessage(replyMessage, type) : null;
+ } catch (Exception e) {
log.error("send request message failed. destination:{}, message:{} ", destination, message);
throw new MessagingException(e.getMessage(), e);
}
@@ -236,11 +234,11 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
/**
* @param destination formats: `topicName:tags`
- * @param payload the payload to be sent.
- * @param type The type that receive
- * @param hashKey needed when sending message orderly
- * @param timeout send timeout in millis
- * @param delayLevel message delay level(0 means no delay)
+ * @param payload the payload to be sent.
+ * @param type The type that receive
+ * @param hashKey needed when sending message orderly
+ * @param timeout send timeout in millis
+ * @param delayLevel message delay level(0 means no delay)
* @return
*/
public <T> T sendAndReceive(String destination, Object payload, Type type, String hashKey,
@@ -250,8 +248,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
}
/**
- * @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message} the message to be sent.
+ * @param destination formats: `topicName:tags`
+ * @param message {@link org.springframework.messaging.Message} the message to be sent.
* @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
* @return
*/
@@ -261,8 +259,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
}
/**
- * @param destination formats: `topicName:tags`
- * @param payload the payload to be sent.
+ * @param destination formats: `topicName:tags`
+ * @param payload the payload to be sent.
* @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
* @return
*/
@@ -272,10 +270,10 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
}
/**
- * @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message} the message to be sent.
+ * @param destination formats: `topicName:tags`
+ * @param message {@link org.springframework.messaging.Message} the message to be sent.
* @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
- * @param timeout send timeout in millis
+ * @param timeout send timeout in millis
* @return
*/
public void sendAndReceive(String destination, Message<?> message,
@@ -284,10 +282,10 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
}
/**
- * @param destination formats: `topicName:tags`
- * @param payload the payload to be sent.
+ * @param destination formats: `topicName:tags`
+ * @param payload the payload to be sent.
* @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
- * @param timeout send timeout in millis
+ * @param timeout send timeout in millis
* @return
*/
public void sendAndReceive(String destination, Object payload,
@@ -296,11 +294,11 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
}
/**
- * @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message} the message to be sent.
+ * @param destination formats: `topicName:tags`
+ * @param message {@link org.springframework.messaging.Message} the message to be sent.
* @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
- * @param timeout send timeout in millis
- * @param delayLevel message delay level(0 means no delay)
+ * @param timeout send timeout in millis
+ * @param delayLevel message delay level(0 means no delay)
* @return
*/
public void sendAndReceive(String destination, Message<?> message,
@@ -309,10 +307,10 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
}
/**
- * @param destination formats: `topicName:tags`
- * @param payload the payload to be sent.
+ * @param destination formats: `topicName:tags`
+ * @param payload the payload to be sent.
* @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
- * @param hashKey needed when sending message orderly
+ * @param hashKey needed when sending message orderly
* @return
*/
public void sendAndReceive(String destination, Object payload,
@@ -321,11 +319,11 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
}
/**
- * @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message} the message to be sent.
+ * @param destination formats: `topicName:tags`
+ * @param message {@link org.springframework.messaging.Message} the message to be sent.
* @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
- * @param hashKey needed when sending message orderly
- * @param timeout send timeout in millis
+ * @param hashKey needed when sending message orderly
+ * @param timeout send timeout in millis
* @return
*/
public void sendAndReceive(String destination, Message<?> message,
@@ -334,11 +332,11 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
}
/**
- * @param destination formats: `topicName:tags`
- * @param payload the payload to be sent.
+ * @param destination formats: `topicName:tags`
+ * @param payload the payload to be sent.
* @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
- * @param hashKey needed when sending message orderly
- * @param timeout send timeout in millis
+ * @param hashKey needed when sending message orderly
+ * @param timeout send timeout in millis
* @return
*/
public void sendAndReceive(String destination, Object payload,
@@ -347,10 +345,10 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
}
/**
- * @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message} the message to be sent.
+ * @param destination formats: `topicName:tags`
+ * @param message {@link org.springframework.messaging.Message} the message to be sent.
* @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
- * @param hashKey needed when sending message orderly
+ * @param hashKey needed when sending message orderly
* @return
*/
public void sendAndReceive(String destination, Message<?> message,
@@ -359,11 +357,11 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
}
/**
- * @param destination formats: `topicName:tags`
- * @param payload the payload to be sent.
+ * @param destination formats: `topicName:tags`
+ * @param payload the payload to be sent.
* @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
- * @param timeout send timeout in millis
- * @param delayLevel message delay level(0 means no delay)
+ * @param timeout send timeout in millis
+ * @param delayLevel message delay level(0 means no delay)
* @return
*/
public void sendAndReceive(String destination, Object payload,
@@ -372,12 +370,12 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
}
/**
- * @param destination formats: `topicName:tags`
- * @param payload the payload to be sent.
+ * @param destination formats: `topicName:tags`
+ * @param payload the payload to be sent.
* @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
- * @param hashKey needed when sending message orderly
- * @param timeout send timeout in millis
- * @param delayLevel message delay level(0 means no delay)
+ * @param hashKey needed when sending message orderly
+ * @param timeout send timeout in millis
+ * @param delayLevel message delay level(0 means no delay)
* @return
*/
public void sendAndReceive(String destination, Object payload,
@@ -390,12 +388,12 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
* Send request message in asynchronous mode. </p> This method returns immediately. On receiving reply message,
* <code>rocketMQLocalRequestCallback</code> will be executed. </p>
*
- * @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message} the message to be sent.
+ * @param destination formats: `topicName:tags`
+ * @param message {@link org.springframework.messaging.Message} the message to be sent.
* @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
- * @param hashKey needed when sending message orderly
- * @param timeout send timeout in millis
- * @param delayLevel message delay level(0 means no delay)
+ * @param hashKey needed when sending message orderly
+ * @param timeout send timeout in millis
+ * @param delayLevel message delay level(0 means no delay)
* @return
*/
public void sendAndReceive(String destination, Message<?> message,
@@ -417,7 +415,7 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
if (rocketMQLocalRequestCallback != null) {
requestCallback = new RequestCallback() {
@Override public void onSuccess(org.apache.rocketmq.common.message.Message message) {
- rocketMQLocalRequestCallback.onSuccess(doConvertMessage((MessageExt)message, getMessageType(rocketMQLocalRequestCallback)));
+ rocketMQLocalRequestCallback.onSuccess(doConvertMessage((MessageExt) message, getMessageType(rocketMQLocalRequestCallback)));
}
@Override public void onException(Throwable e) {
@@ -427,12 +425,10 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
}
if (Objects.isNull(hashKey) || hashKey.isEmpty()) {
producer.request(rocketMsg, requestCallback, timeout);
- }
- else {
+ } else {
producer.request(rocketMsg, messageQueueSelector, hashKey, requestCallback, timeout);
}
- }
- catch (
+ } catch (
Exception e) {
log.error("send request message failed. destination:{}, message:{} ", destination, message);
throw new MessagingException(e.getMessage(), e);
@@ -451,7 +447,7 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
* duplication issue.
*
* @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message}
+ * @param message {@link org.springframework.messaging.Message}
* @return {@link SendResult}
*/
public SendResult syncSend(String destination, Message<?> message) {
@@ -462,8 +458,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
* Same to {@link #syncSend(String, Message)} with send timeout specified in addition.
*
* @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message}
- * @param timeout send timeout with millis
+ * @param message {@link org.springframework.messaging.Message}
+ * @param timeout send timeout with millis
* @return {@link SendResult}
*/
public SendResult syncSend(String destination, Message<?> message, long timeout) {
@@ -474,7 +470,7 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
* syncSend batch messages
*
* @param destination formats: `topicName:tags`
- * @param messages Collection of {@link org.springframework.messaging.Message}
+ * @param messages Collection of {@link org.springframework.messaging.Message}
* @return {@link SendResult}
*/
public <T extends Message> SendResult syncSend(String destination, Collection<T> messages) {
@@ -485,8 +481,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
* syncSend batch messages in a given timeout.
*
* @param destination formats: `topicName:tags`
- * @param messages Collection of {@link org.springframework.messaging.Message}
- * @param timeout send timeout with millis
+ * @param messages Collection of {@link org.springframework.messaging.Message}
+ * @param timeout send timeout with millis
* @return {@link SendResult}
*/
public <T extends Message> SendResult syncSend(String destination, Collection<T> messages, long timeout) {
@@ -512,8 +508,7 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
log.debug("send messages cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
}
return sendResult;
- }
- catch (Exception e) {
+ } catch (Exception e) {
log.error("syncSend with batch failed. destination:{}, messages.size:{} ", destination, messages.size());
throw new MessagingException(e.getMessage(), e);
}
@@ -523,9 +518,9 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
* Same to {@link #syncSend(String, Message)} with send timeout specified in addition.
*
* @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message}
- * @param timeout send timeout with millis
- * @param delayLevel level for the delay message
+ * @param message {@link org.springframework.messaging.Message}
+ * @param timeout send timeout with millis
+ * @param delayLevel level for the delay message
* @return {@link SendResult}
*/
public SendResult syncSend(String destination, Message<?> message, long timeout, int delayLevel) {
@@ -545,8 +540,7 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
}
return sendResult;
- }
- catch (Exception e) {
+ } catch (Exception e) {
log.error("syncSend failed. destination:{}, message:{} ", destination, message);
throw new MessagingException(e.getMessage(), e);
}
@@ -556,7 +550,7 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
* Same to {@link #syncSend(String, Message)}.
*
* @param destination formats: `topicName:tags`
- * @param payload the Object to use as payload
+ * @param payload the Object to use as payload
* @return {@link SendResult}
*/
public SendResult syncSend(String destination, Object payload) {
@@ -567,8 +561,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
* Same to {@link #syncSend(String, Object)} with send timeout specified in addition.
*
* @param destination formats: `topicName:tags`
- * @param payload the Object to use as payload
- * @param timeout send timeout with millis
+ * @param payload the Object to use as payload
+ * @param timeout send timeout with millis
* @return {@link SendResult}
*/
public SendResult syncSend(String destination, Object payload, long timeout) {
@@ -580,8 +574,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
* Same to {@link #syncSend(String, Message)} with send orderly with hashKey by specified.
*
* @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message}
- * @param hashKey use this key to select queue. for example: orderId, productId ...
+ * @param message {@link org.springframework.messaging.Message}
+ * @param hashKey use this key to select queue. for example: orderId, productId ...
* @return {@link SendResult}
*/
public SendResult syncSendOrderly(String destination, Message<?> message, String hashKey) {
@@ -592,9 +586,9 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
* Same to {@link #syncSendOrderly(String, Message, String)} with send timeout specified in addition.
*
* @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message}
- * @param hashKey use this key to select queue. for example: orderId, productId ...
- * @param timeout send timeout with millis
+ * @param message {@link org.springframework.messaging.Message}
+ * @param hashKey use this key to select queue. for example: orderId, productId ...
+ * @param timeout send timeout with millis
* @return {@link SendResult}
*/
public SendResult syncSendOrderly(String destination, Message<?> message, String hashKey, long timeout) {
@@ -611,8 +605,7 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
}
return sendResult;
- }
- catch (Exception e) {
+ } catch (Exception e) {
log.error("syncSendOrderly failed. destination:{}, message:{} ", destination, message);
throw new MessagingException(e.getMessage(), e);
}
@@ -622,8 +615,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
* Same to {@link #syncSend(String, Object)} with send orderly with hashKey by specified.
*
* @param destination formats: `topicName:tags`
- * @param payload the Object to use as payload
- * @param hashKey use this key to select queue. for example: orderId, productId ...
+ * @param payload the Object to use as payload
+ * @param hashKey use this key to select queue. for example: orderId, productId ...
* @return {@link SendResult}
*/
public SendResult syncSendOrderly(String destination, Object payload, String hashKey) {
@@ -634,9 +627,9 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
* Same to {@link #syncSendOrderly(String, Object, String)} with send timeout specified in addition.
*
* @param destination formats: `topicName:tags`
- * @param payload the Object to use as payload
- * @param hashKey use this key to select queue. for example: orderId, productId ...
- * @param timeout send timeout with millis
+ * @param payload the Object to use as payload
+ * @param hashKey use this key to select queue. for example: orderId, productId ...
+ * @param timeout send timeout with millis
* @return {@link SendResult}
*/
public SendResult syncSendOrderly(String destination, Object payload, String hashKey, long timeout) {
@@ -648,11 +641,11 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
* Same to {@link #asyncSend(String, Message, SendCallback)} with send timeout and delay level specified in
* addition.
*
- * @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message}
+ * @param destination formats: `topicName:tags`
+ * @param message {@link org.springframework.messaging.Message}
* @param sendCallback {@link SendCallback}
- * @param timeout send timeout with millis
- * @param delayLevel level for the delay message
+ * @param timeout send timeout with millis
+ * @param delayLevel level for the delay message
*/
public void asyncSend(String destination, Message<?> message, SendCallback sendCallback, long timeout,
int delayLevel) {
@@ -666,8 +659,7 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
rocketMsg.setDelayTimeLevel(delayLevel);
}
producer.send(rocketMsg, sendCallback, timeout);
- }
- catch (Exception e) {
+ } catch (Exception e) {
log.info("asyncSend failed. destination:{}, message:{} ", destination, message);
throw new MessagingException(e.getMessage(), e);
}
@@ -676,10 +668,10 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
/**
* Same to {@link #asyncSend(String, Message, SendCallback)} with send timeout specified in addition.
*
- * @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message}
+ * @param destination formats: `topicName:tags`
+ * @param message {@link org.springframework.messaging.Message}
* @param sendCallback {@link SendCallback}
- * @param timeout send timeout with millis
+ * @param timeout send timeout with millis
*/
public void asyncSend(String destination, Message<?> message, SendCallback sendCallback, long timeout) {
asyncSend(destination, message, sendCallback, timeout, 0);
@@ -695,8 +687,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
* DefaultMQProducer#getRetryTimesWhenSendAsyncFailed} times before claiming sending failure, which may yield
* message duplication and application developers are the one to resolve this potential issue.
*
- * @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message}
+ * @param destination formats: `topicName:tags`
+ * @param message {@link org.springframework.messaging.Message}
* @param sendCallback {@link SendCallback}
*/
public void asyncSend(String destination, Message<?> message, SendCallback sendCallback) {
@@ -706,10 +698,10 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
/**
* Same to {@link #asyncSend(String, Object, SendCallback)} with send timeout specified in addition.
*
- * @param destination formats: `topicName:tags`
- * @param payload the Object to use as payload
+ * @param destination formats: `topicName:tags`
+ * @param payload the Object to use as payload
* @param sendCallback {@link SendCallback}
- * @param timeout send timeout with millis
+ * @param timeout send timeout with millis
*/
public void asyncSend(String destination, Object payload, SendCallback sendCallback, long timeout) {
Message<?> message = MessageBuilder.withPayload(payload).build();
@@ -719,8 +711,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
/**
* Same to {@link #asyncSend(String, Message, SendCallback)}.
*
- * @param destination formats: `topicName:tags`
- * @param payload the Object to use as payload
+ * @param destination formats: `topicName:tags`
+ * @param payload the Object to use as payload
* @param sendCallback {@link SendCallback}
*/
public void asyncSend(String destination, Object payload, SendCallback sendCallback) {
@@ -731,11 +723,11 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
* Same to {@link #asyncSendOrderly(String, Message, String, SendCallback)} with send timeout specified in
* addition.
*
- * @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message}
- * @param hashKey use this key to select queue. for example: orderId, productId ...
+ * @param destination formats: `topicName:tags`
+ * @param message {@link org.springframework.messaging.Message}
+ * @param hashKey use this key to select queue. for example: orderId, productId ...
* @param sendCallback {@link SendCallback}
- * @param timeout send timeout with millis
+ * @param timeout send timeout with millis
*/
public void asyncSendOrderly(String destination, Message<?> message, String hashKey, SendCallback sendCallback,
long timeout) {
@@ -746,8 +738,7 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
try {
org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
producer.send(rocketMsg, messageQueueSelector, hashKey, sendCallback, timeout);
- }
- catch (Exception e) {
+ } catch (Exception e) {
log.error("asyncSendOrderly failed. destination:{}, message:{} ", destination, message);
throw new MessagingException(e.getMessage(), e);
}
@@ -756,9 +747,9 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
/**
* Same to {@link #asyncSend(String, Message, SendCallback)} with send orderly with hashKey by specified.
*
- * @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message}
- * @param hashKey use this key to select queue. for example: orderId, productId ...
+ * @param destination formats: `topicName:tags`
+ * @param message {@link org.springframework.messaging.Message}
+ * @param hashKey use this key to select queue. for example: orderId, productId ...
* @param sendCallback {@link SendCallback}
*/
public void asyncSendOrderly(String destination, Message<?> message, String hashKey, SendCallback sendCallback) {
@@ -768,9 +759,9 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
/**
* Same to {@link #asyncSendOrderly(String, Message, String, SendCallback)}.
*
- * @param destination formats: `topicName:tags`
- * @param payload the Object to use as payload
- * @param hashKey use this key to select queue. for example: orderId, productId ...
+ * @param destination formats: `topicName:tags`
+ * @param payload the Object to use as payload
+ * @param hashKey use this key to select queue. for example: orderId, productId ...
* @param sendCallback {@link SendCallback}
*/
public void asyncSendOrderly(String destination, Object payload, String hashKey, SendCallback sendCallback) {
@@ -780,11 +771,11 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
/**
* Same to {@link #asyncSendOrderly(String, Object, String, SendCallback)} with send timeout specified in addition.
*
- * @param destination formats: `topicName:tags`
- * @param payload the Object to use as payload
- * @param hashKey use this key to select queue. for example: orderId, productId ...
+ * @param destination formats: `topicName:tags`
+ * @param payload the Object to use as payload
+ * @param hashKey use this key to select queue. for example: orderId, productId ...
* @param sendCallback {@link SendCallback}
- * @param timeout send timeout with millis
+ * @param timeout send timeout with millis
*/
public void asyncSendOrderly(String destination, Object payload, String hashKey, SendCallback sendCallback,
long timeout) {
@@ -799,7 +790,7 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
* One-way transmission is used for cases requiring moderate reliability, such as log collection.
*
* @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message}
+ * @param message {@link org.springframework.messaging.Message}
*/
public void sendOneWay(String destination, Message<?> message) {
if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
@@ -809,8 +800,7 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
try {
org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
producer.sendOneway(rocketMsg);
- }
- catch (Exception e) {
+ } catch (Exception e) {
log.error("sendOneWay failed. destination:{}, message:{} ", destination, message);
throw new MessagingException(e.getMessage(), e);
}
@@ -820,7 +810,7 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
* Same to {@link #sendOneWay(String, Message)}
*
* @param destination formats: `topicName:tags`
- * @param payload the Object to use as payload
+ * @param payload the Object to use as payload
*/
public void sendOneWay(String destination, Object payload) {
Message<?> message = MessageBuilder.withPayload(payload).build();
@@ -831,8 +821,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
* Same to {@link #sendOneWay(String, Message)} with send orderly with hashKey by specified.
*
* @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message}
- * @param hashKey use this key to select queue. for example: orderId, productId ...
+ * @param message {@link org.springframework.messaging.Message}
+ * @param hashKey use this key to select queue. for example: orderId, productId ...
*/
public void sendOneWayOrderly(String destination, Message<?> message, String hashKey) {
if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
@@ -842,8 +832,7 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
try {
org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
producer.sendOneway(rocketMsg, messageQueueSelector, hashKey);
- }
- catch (Exception e) {
+ } catch (Exception e) {
log.error("sendOneWayOrderly failed. destination:{}, message:{}", destination, message);
throw new MessagingException(e.getMessage(), e);
}
@@ -853,7 +842,7 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
* Same to {@link #sendOneWayOrderly(String, Message, String)}
*
* @param destination formats: `topicName:tags`
- * @param payload the Object to use as payload
+ * @param payload the Object to use as payload
*/
public void sendOneWayOrderly(String destination, Object payload, String hashKey) {
Message<?> message = MessageBuilder.withPayload(payload).build();
@@ -894,21 +883,20 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
* Send Spring Message in Transaction
*
* @param destination destination formats: `topicName:tags`
- * @param message message {@link org.springframework.messaging.Message}
- * @param arg ext arg
+ * @param message message {@link org.springframework.messaging.Message}
+ * @param arg ext arg
* @return TransactionSendResult
* @throws MessagingException
*/
public TransactionSendResult sendMessageInTransaction(final String destination,
final Message<?> message, final Object arg) throws MessagingException {
try {
- if (((TransactionMQProducer)producer).getTransactionListener() == null) {
+ if (((TransactionMQProducer) producer).getTransactionListener() == null) {
throw new IllegalStateException("The rocketMQTemplate does not exist TransactionListener");
}
org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
return producer.sendMessageInTransaction(rocketMsg, arg);
- }
- catch (MQClientException e) {
+ } catch (MQClientException e) {
throw RocketMQUtil.convert(e);
}
}
@@ -923,29 +911,24 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
private Object doConvertMessage(MessageExt messageExt, Type type) {
if (Objects.equals(type, MessageExt.class)) {
return messageExt;
- }
- else if (Objects.equals(type, byte[].class)) {
+ } else if (Objects.equals(type, byte[].class)) {
return messageExt.getBody();
- }
- else {
+ } else {
String str = new String(messageExt.getBody(), Charset.forName(charset));
if (Objects.equals(type, String.class)) {
return str;
- }
- else {
+ } else {
// If msgType not string, use objectMapper change it.
try {
if (type instanceof Class) {
//if the messageType has not Generic Parameter
- return this.getMessageConverter().fromMessage(MessageBuilder.withPayload(str).build(), (Class<?>)type);
- }
- else {
+ return this.getMessageConverter().fromMessage(MessageBuilder.withPayload(str).build(), (Class<?>) type);
+ } else {
//if the messageType has Generic Parameter, then use SmartMessageConverter#fromMessage with third parameter "conversionHint".
//we have validate the MessageConverter is SmartMessageConverter in this#getMethodParameter.
- return ((SmartMessageConverter)this.getMessageConverter()).fromMessage(MessageBuilder.withPayload(str).build(), (Class<?>)((ParameterizedType)type).getRawType(), null);
+ return ((SmartMessageConverter) this.getMessageConverter()).fromMessage(MessageBuilder.withPayload(str).build(), (Class<?>) ((ParameterizedType) type).getRawType(), null);
}
- }
- catch (Exception e) {
+ } catch (Exception e) {
log.error("convert failed. str:{}, msgType:{}", str, type);
throw new RuntimeException("cannot convert message to " + type, e);
}
@@ -960,7 +943,7 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
Type[] interfaces = targetClass.getGenericInterfaces();
if (Objects.nonNull(interfaces)) {
for (Type type : interfaces) {
- if (type instanceof ParameterizedType && (Objects.equals(((ParameterizedType)type).getRawType(), RocketMQLocalRequestCallback.class))) {
+ if (type instanceof ParameterizedType && (Objects.equals(((ParameterizedType) type).getRawType(), RocketMQLocalRequestCallback.class))) {
matchedGenericInterface = type;
break;
}
@@ -972,10 +955,10 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
return Object.class;
}
- Type[] actualTypeArguments = ((ParameterizedType)matchedGenericInterface).getActualTypeArguments();
+ Type[] actualTypeArguments = ((ParameterizedType) matchedGenericInterface).getActualTypeArguments();
if (Objects.nonNull(actualTypeArguments) && actualTypeArguments.length > 0) {
return actualTypeArguments[0];
}
return Object.class;
}
-}
+}
\ No newline at end of file
[rocketmq-spring] 01/06: Add Method:#syncSend(java.lang.String,
java.util.Collection) Fix the bug of BatchMessage syncSend
without timeout
Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-spring.git
commit 7d93931b110a707ac4bfb0d34f6bb078598f8dbc
Author: GongZhengMe <79...@qq.com>
AuthorDate: Thu Mar 26 15:08:13 2020 +0800
Add Method:#syncSend(java.lang.String, java.util.Collection<T>)
Fix the bug of BatchMessage syncSend without timeout
---
.../rocketmq/spring/core/RocketMQTemplate.java | 36 ++++++++++++++++++++++
1 file changed, 36 insertions(+)
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
index 089016a..626b16f 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
@@ -467,6 +467,42 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
}
/**
+ * syncSend batch messages
+ *
+ * @param destination formats: `topicName:tags`
+ * @param messages Collection of {@link org.springframework.messaging.Message}
+ * @return {@link SendResult}
+ */
+ public <T extends Message> SendResult syncSend(String destination, Collection<T> messages) {
+ if (Objects.isNull(messages) || messages.size() == 0) {
+ log.error("syncSend with batch failed. destination:{}, messages is empty ", destination);
+ throw new IllegalArgumentException("`messages` can not be empty");
+ }
+
+ try {
+ long now = System.currentTimeMillis();
+ Collection<org.apache.rocketmq.common.message.Message> rmqMsgs = new ArrayList<>();
+ for (Message msg : messages) {
+ if (Objects.isNull(msg) || Objects.isNull(msg.getPayload())) {
+ log.warn("Found a message empty in the batch, skip it");
+ continue;
+ }
+ rmqMsgs.add(this.createRocketMqMessage(destination, msg));
+ }
+
+ SendResult sendResult = producer.send(rmqMsgs);
+ long costTime = System.currentTimeMillis() - now;
+ if (log.isDebugEnabled()) {
+ log.debug("send messages cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
+ }
+ return sendResult;
+ } catch (Exception e) {
+ log.error("syncSend with batch failed. destination:{}, messages.size:{} ", destination, messages.size());
+ throw new MessagingException(e.getMessage(), e);
+ }
+ }
+
+ /**
* syncSend batch messages in a given timeout.
*
* @param destination formats: `topicName:tags`
[rocketmq-spring] 04/06: Edit code style as Apache Rocket MQ
Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-spring.git
commit 4432a7be02d428abb9012fc55172e7f3ee4220bf
Author: GongZhengMe <79...@qq.com>
AuthorDate: Fri Mar 27 12:40:15 2020 +0800
Edit code style as Apache Rocket MQ
---
.../rocketmq/spring/core/RocketMQTemplate.java | 349 +++++++++++----------
1 file changed, 183 insertions(+), 166 deletions(-)
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
index 70001f0..1ac4e78 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
@@ -88,8 +88,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
/**
* @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message} the message to be sent.
- * @param type The type of T
+ * @param message {@link org.springframework.messaging.Message} the message to be sent.
+ * @param type The type of T
* @return
*/
public <T> T sendAndReceive(String destination, Message<?> message, Type type) {
@@ -98,8 +98,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
/**
* @param destination formats: `topicName:tags`
- * @param payload the payload to be sent.
- * @param type The type of T
+ * @param payload the payload to be sent.
+ * @param type The type of T
* @return
*/
public <T> T sendAndReceive(String destination, Object payload, Type type) {
@@ -108,9 +108,9 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
/**
* @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message} the message to be sent.
- * @param type The type of T
- * @param timeout send timeout in millis
+ * @param message {@link org.springframework.messaging.Message} the message to be sent.
+ * @param type The type of T
+ * @param timeout send timeout in millis
* @return
*/
public <T> T sendAndReceive(String destination, Message<?> message, Type type, long timeout) {
@@ -119,9 +119,9 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
/**
* @param destination formats: `topicName:tags`
- * @param payload the payload to be sent.
- * @param type The type of T
- * @param timeout send timeout in millis
+ * @param payload the payload to be sent.
+ * @param type The type of T
+ * @param timeout send timeout in millis
* @return
*/
public <T> T sendAndReceive(String destination, Object payload, Type type, long timeout) {
@@ -130,10 +130,10 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
/**
* @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message} the message to be sent.
- * @param type The type of T
- * @param timeout send timeout in millis
- * @param delayLevel message delay level(0 means no delay)
+ * @param message {@link org.springframework.messaging.Message} the message to be sent.
+ * @param type The type of T
+ * @param timeout send timeout in millis
+ * @param delayLevel message delay level(0 means no delay)
* @return
*/
public <T> T sendAndReceive(String destination, Message<?> message, Type type, long timeout, int delayLevel) {
@@ -142,10 +142,10 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
/**
* @param destination formats: `topicName:tags`
- * @param payload the payload to be sent.
- * @param type The type of T
- * @param timeout send timeout in millis
- * @param delayLevel message delay level(0 means no delay)
+ * @param payload the payload to be sent.
+ * @param type The type of T
+ * @param timeout send timeout in millis
+ * @param delayLevel message delay level(0 means no delay)
* @return
*/
public <T> T sendAndReceive(String destination, Object payload, Type type, long timeout, int delayLevel) {
@@ -154,9 +154,9 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
/**
* @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message} the message to be sent.
- * @param type The type of T
- * @param hashKey needed when sending message orderly
+ * @param message {@link org.springframework.messaging.Message} the message to be sent.
+ * @param type The type of T
+ * @param hashKey needed when sending message orderly
* @return
*/
public <T> T sendAndReceive(String destination, Message<?> message, Type type, String hashKey) {
@@ -165,9 +165,9 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
/**
* @param destination formats: `topicName:tags`
- * @param payload the payload to be sent.
- * @param type The type of T
- * @param hashKey needed when sending message orderly
+ * @param payload the payload to be sent.
+ * @param type The type of T
+ * @param hashKey needed when sending message orderly
* @return
*/
public <T> T sendAndReceive(String destination, Object payload, Type type, String hashKey) {
@@ -176,10 +176,10 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
/**
* @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message} the message to be sent.
- * @param type The type of T
- * @param hashKey needed when sending message orderly
- * @param timeout send timeout in millis
+ * @param message {@link org.springframework.messaging.Message} the message to be sent.
+ * @param type The type of T
+ * @param hashKey needed when sending message orderly
+ * @param timeout send timeout in millis
* @return
*/
public <T> T sendAndReceive(String destination, Message<?> message, Type type, String hashKey, long timeout) {
@@ -188,8 +188,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
/**
* @param destination formats: `topicName:tags`
- * @param payload the payload to be sent.
- * @param type The type of T
+ * @param payload the payload to be sent.
+ * @param type The type of T
* @param hashKey
* @return
*/
@@ -199,11 +199,11 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
/**
* @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message} the message to be sent.
- * @param type The type that receive
- * @param hashKey needed when sending message orderly
- * @param timeout send timeout in millis
- * @param delayLevel message delay level(0 means no delay)
+ * @param message {@link org.springframework.messaging.Message} the message to be sent.
+ * @param type The type that receive
+ * @param hashKey needed when sending message orderly
+ * @param timeout send timeout in millis
+ * @param delayLevel message delay level(0 means no delay)
* @return
*/
public <T> T sendAndReceive(String destination, Message<?> message, Type type, String hashKey,
@@ -221,12 +221,14 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
MessageExt replyMessage;
if (Objects.isNull(hashKey) || hashKey.isEmpty()) {
- replyMessage = (MessageExt) producer.request(rocketMsg, timeout);
- } else {
- replyMessage = (MessageExt) producer.request(rocketMsg, messageQueueSelector, hashKey, timeout);
+ replyMessage = (MessageExt)producer.request(rocketMsg, timeout);
}
- return replyMessage != null ? (T) doConvertMessage(replyMessage, type) : null;
- } catch (Exception e) {
+ else {
+ replyMessage = (MessageExt)producer.request(rocketMsg, messageQueueSelector, hashKey, timeout);
+ }
+ return replyMessage != null ? (T)doConvertMessage(replyMessage, type) : null;
+ }
+ catch (Exception e) {
log.error("send request message failed. destination:{}, message:{} ", destination, message);
throw new MessagingException(e.getMessage(), e);
}
@@ -234,11 +236,11 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
/**
* @param destination formats: `topicName:tags`
- * @param payload the payload to be sent.
- * @param type The type that receive
- * @param hashKey needed when sending message orderly
- * @param timeout send timeout in millis
- * @param delayLevel message delay level(0 means no delay)
+ * @param payload the payload to be sent.
+ * @param type The type that receive
+ * @param hashKey needed when sending message orderly
+ * @param timeout send timeout in millis
+ * @param delayLevel message delay level(0 means no delay)
* @return
*/
public <T> T sendAndReceive(String destination, Object payload, Type type, String hashKey,
@@ -248,8 +250,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
}
/**
- * @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message} the message to be sent.
+ * @param destination formats: `topicName:tags`
+ * @param message {@link org.springframework.messaging.Message} the message to be sent.
* @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
* @return
*/
@@ -259,8 +261,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
}
/**
- * @param destination formats: `topicName:tags`
- * @param payload the payload to be sent.
+ * @param destination formats: `topicName:tags`
+ * @param payload the payload to be sent.
* @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
* @return
*/
@@ -270,10 +272,10 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
}
/**
- * @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message} the message to be sent.
+ * @param destination formats: `topicName:tags`
+ * @param message {@link org.springframework.messaging.Message} the message to be sent.
* @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
- * @param timeout send timeout in millis
+ * @param timeout send timeout in millis
* @return
*/
public void sendAndReceive(String destination, Message<?> message,
@@ -282,10 +284,10 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
}
/**
- * @param destination formats: `topicName:tags`
- * @param payload the payload to be sent.
+ * @param destination formats: `topicName:tags`
+ * @param payload the payload to be sent.
* @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
- * @param timeout send timeout in millis
+ * @param timeout send timeout in millis
* @return
*/
public void sendAndReceive(String destination, Object payload,
@@ -294,11 +296,11 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
}
/**
- * @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message} the message to be sent.
+ * @param destination formats: `topicName:tags`
+ * @param message {@link org.springframework.messaging.Message} the message to be sent.
* @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
- * @param timeout send timeout in millis
- * @param delayLevel message delay level(0 means no delay)
+ * @param timeout send timeout in millis
+ * @param delayLevel message delay level(0 means no delay)
* @return
*/
public void sendAndReceive(String destination, Message<?> message,
@@ -307,10 +309,10 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
}
/**
- * @param destination formats: `topicName:tags`
- * @param payload the payload to be sent.
+ * @param destination formats: `topicName:tags`
+ * @param payload the payload to be sent.
* @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
- * @param hashKey needed when sending message orderly
+ * @param hashKey needed when sending message orderly
* @return
*/
public void sendAndReceive(String destination, Object payload,
@@ -319,11 +321,11 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
}
/**
- * @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message} the message to be sent.
+ * @param destination formats: `topicName:tags`
+ * @param message {@link org.springframework.messaging.Message} the message to be sent.
* @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
- * @param hashKey needed when sending message orderly
- * @param timeout send timeout in millis
+ * @param hashKey needed when sending message orderly
+ * @param timeout send timeout in millis
* @return
*/
public void sendAndReceive(String destination, Message<?> message,
@@ -332,11 +334,11 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
}
/**
- * @param destination formats: `topicName:tags`
- * @param payload the payload to be sent.
+ * @param destination formats: `topicName:tags`
+ * @param payload the payload to be sent.
* @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
- * @param hashKey needed when sending message orderly
- * @param timeout send timeout in millis
+ * @param hashKey needed when sending message orderly
+ * @param timeout send timeout in millis
* @return
*/
public void sendAndReceive(String destination, Object payload,
@@ -345,10 +347,10 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
}
/**
- * @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message} the message to be sent.
+ * @param destination formats: `topicName:tags`
+ * @param message {@link org.springframework.messaging.Message} the message to be sent.
* @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
- * @param hashKey needed when sending message orderly
+ * @param hashKey needed when sending message orderly
* @return
*/
public void sendAndReceive(String destination, Message<?> message,
@@ -357,11 +359,11 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
}
/**
- * @param destination formats: `topicName:tags`
- * @param payload the payload to be sent.
+ * @param destination formats: `topicName:tags`
+ * @param payload the payload to be sent.
* @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
- * @param timeout send timeout in millis
- * @param delayLevel message delay level(0 means no delay)
+ * @param timeout send timeout in millis
+ * @param delayLevel message delay level(0 means no delay)
* @return
*/
public void sendAndReceive(String destination, Object payload,
@@ -370,12 +372,12 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
}
/**
- * @param destination formats: `topicName:tags`
- * @param payload the payload to be sent.
+ * @param destination formats: `topicName:tags`
+ * @param payload the payload to be sent.
* @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
- * @param hashKey needed when sending message orderly
- * @param timeout send timeout in millis
- * @param delayLevel message delay level(0 means no delay)
+ * @param hashKey needed when sending message orderly
+ * @param timeout send timeout in millis
+ * @param delayLevel message delay level(0 means no delay)
* @return
*/
public void sendAndReceive(String destination, Object payload,
@@ -388,12 +390,12 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
* Send request message in asynchronous mode. </p> This method returns immediately. On receiving reply message,
* <code>rocketMQLocalRequestCallback</code> will be executed. </p>
*
- * @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message} the message to be sent.
+ * @param destination formats: `topicName:tags`
+ * @param message {@link org.springframework.messaging.Message} the message to be sent.
* @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
- * @param hashKey needed when sending message orderly
- * @param timeout send timeout in millis
- * @param delayLevel message delay level(0 means no delay)
+ * @param hashKey needed when sending message orderly
+ * @param timeout send timeout in millis
+ * @param delayLevel message delay level(0 means no delay)
* @return
*/
public void sendAndReceive(String destination, Message<?> message,
@@ -415,7 +417,7 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
if (rocketMQLocalRequestCallback != null) {
requestCallback = new RequestCallback() {
@Override public void onSuccess(org.apache.rocketmq.common.message.Message message) {
- rocketMQLocalRequestCallback.onSuccess(doConvertMessage((MessageExt) message, getMessageType(rocketMQLocalRequestCallback)));
+ rocketMQLocalRequestCallback.onSuccess(doConvertMessage((MessageExt)message, getMessageType(rocketMQLocalRequestCallback)));
}
@Override public void onException(Throwable e) {
@@ -425,10 +427,12 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
}
if (Objects.isNull(hashKey) || hashKey.isEmpty()) {
producer.request(rocketMsg, requestCallback, timeout);
- } else {
+ }
+ else {
producer.request(rocketMsg, messageQueueSelector, hashKey, requestCallback, timeout);
}
- } catch (
+ }
+ catch (
Exception e) {
log.error("send request message failed. destination:{}, message:{} ", destination, message);
throw new MessagingException(e.getMessage(), e);
@@ -447,7 +451,7 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
* duplication issue.
*
* @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message}
+ * @param message {@link org.springframework.messaging.Message}
* @return {@link SendResult}
*/
public SendResult syncSend(String destination, Message<?> message) {
@@ -458,8 +462,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
* Same to {@link #syncSend(String, Message)} with send timeout specified in addition.
*
* @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message}
- * @param timeout send timeout with millis
+ * @param message {@link org.springframework.messaging.Message}
+ * @param timeout send timeout with millis
* @return {@link SendResult}
*/
public SendResult syncSend(String destination, Message<?> message, long timeout) {
@@ -470,19 +474,19 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
* syncSend batch messages
*
* @param destination formats: `topicName:tags`
- * @param messages Collection of {@link org.springframework.messaging.Message}
+ * @param messages Collection of {@link org.springframework.messaging.Message}
* @return {@link SendResult}
*/
public <T extends Message> SendResult syncSend(String destination, Collection<T> messages) {
- return syncSend(destination,messages,producer.getSendMsgTimeout());
+ return syncSend(destination, messages, producer.getSendMsgTimeout());
}
/**
* syncSend batch messages in a given timeout.
*
* @param destination formats: `topicName:tags`
- * @param messages Collection of {@link org.springframework.messaging.Message}
- * @param timeout send timeout with millis
+ * @param messages Collection of {@link org.springframework.messaging.Message}
+ * @param timeout send timeout with millis
* @return {@link SendResult}
*/
public <T extends Message> SendResult syncSend(String destination, Collection<T> messages, long timeout) {
@@ -508,7 +512,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
log.debug("send messages cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
}
return sendResult;
- } catch (Exception e) {
+ }
+ catch (Exception e) {
log.error("syncSend with batch failed. destination:{}, messages.size:{} ", destination, messages.size());
throw new MessagingException(e.getMessage(), e);
}
@@ -518,9 +523,9 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
* Same to {@link #syncSend(String, Message)} with send timeout specified in addition.
*
* @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message}
- * @param timeout send timeout with millis
- * @param delayLevel level for the delay message
+ * @param message {@link org.springframework.messaging.Message}
+ * @param timeout send timeout with millis
+ * @param delayLevel level for the delay message
* @return {@link SendResult}
*/
public SendResult syncSend(String destination, Message<?> message, long timeout, int delayLevel) {
@@ -540,7 +545,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
}
return sendResult;
- } catch (Exception e) {
+ }
+ catch (Exception e) {
log.error("syncSend failed. destination:{}, message:{} ", destination, message);
throw new MessagingException(e.getMessage(), e);
}
@@ -550,7 +556,7 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
* Same to {@link #syncSend(String, Message)}.
*
* @param destination formats: `topicName:tags`
- * @param payload the Object to use as payload
+ * @param payload the Object to use as payload
* @return {@link SendResult}
*/
public SendResult syncSend(String destination, Object payload) {
@@ -561,8 +567,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
* Same to {@link #syncSend(String, Object)} with send timeout specified in addition.
*
* @param destination formats: `topicName:tags`
- * @param payload the Object to use as payload
- * @param timeout send timeout with millis
+ * @param payload the Object to use as payload
+ * @param timeout send timeout with millis
* @return {@link SendResult}
*/
public SendResult syncSend(String destination, Object payload, long timeout) {
@@ -574,8 +580,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
* Same to {@link #syncSend(String, Message)} with send orderly with hashKey by specified.
*
* @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message}
- * @param hashKey use this key to select queue. for example: orderId, productId ...
+ * @param message {@link org.springframework.messaging.Message}
+ * @param hashKey use this key to select queue. for example: orderId, productId ...
* @return {@link SendResult}
*/
public SendResult syncSendOrderly(String destination, Message<?> message, String hashKey) {
@@ -586,9 +592,9 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
* Same to {@link #syncSendOrderly(String, Message, String)} with send timeout specified in addition.
*
* @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message}
- * @param hashKey use this key to select queue. for example: orderId, productId ...
- * @param timeout send timeout with millis
+ * @param message {@link org.springframework.messaging.Message}
+ * @param hashKey use this key to select queue. for example: orderId, productId ...
+ * @param timeout send timeout with millis
* @return {@link SendResult}
*/
public SendResult syncSendOrderly(String destination, Message<?> message, String hashKey, long timeout) {
@@ -605,7 +611,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
}
return sendResult;
- } catch (Exception e) {
+ }
+ catch (Exception e) {
log.error("syncSendOrderly failed. destination:{}, message:{} ", destination, message);
throw new MessagingException(e.getMessage(), e);
}
@@ -615,8 +622,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
* Same to {@link #syncSend(String, Object)} with send orderly with hashKey by specified.
*
* @param destination formats: `topicName:tags`
- * @param payload the Object to use as payload
- * @param hashKey use this key to select queue. for example: orderId, productId ...
+ * @param payload the Object to use as payload
+ * @param hashKey use this key to select queue. for example: orderId, productId ...
* @return {@link SendResult}
*/
public SendResult syncSendOrderly(String destination, Object payload, String hashKey) {
@@ -627,9 +634,9 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
* Same to {@link #syncSendOrderly(String, Object, String)} with send timeout specified in addition.
*
* @param destination formats: `topicName:tags`
- * @param payload the Object to use as payload
- * @param hashKey use this key to select queue. for example: orderId, productId ...
- * @param timeout send timeout with millis
+ * @param payload the Object to use as payload
+ * @param hashKey use this key to select queue. for example: orderId, productId ...
+ * @param timeout send timeout with millis
* @return {@link SendResult}
*/
public SendResult syncSendOrderly(String destination, Object payload, String hashKey, long timeout) {
@@ -641,11 +648,11 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
* Same to {@link #asyncSend(String, Message, SendCallback)} with send timeout and delay level specified in
* addition.
*
- * @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message}
+ * @param destination formats: `topicName:tags`
+ * @param message {@link org.springframework.messaging.Message}
* @param sendCallback {@link SendCallback}
- * @param timeout send timeout with millis
- * @param delayLevel level for the delay message
+ * @param timeout send timeout with millis
+ * @param delayLevel level for the delay message
*/
public void asyncSend(String destination, Message<?> message, SendCallback sendCallback, long timeout,
int delayLevel) {
@@ -659,7 +666,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
rocketMsg.setDelayTimeLevel(delayLevel);
}
producer.send(rocketMsg, sendCallback, timeout);
- } catch (Exception e) {
+ }
+ catch (Exception e) {
log.info("asyncSend failed. destination:{}, message:{} ", destination, message);
throw new MessagingException(e.getMessage(), e);
}
@@ -668,10 +676,10 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
/**
* Same to {@link #asyncSend(String, Message, SendCallback)} with send timeout specified in addition.
*
- * @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message}
+ * @param destination formats: `topicName:tags`
+ * @param message {@link org.springframework.messaging.Message}
* @param sendCallback {@link SendCallback}
- * @param timeout send timeout with millis
+ * @param timeout send timeout with millis
*/
public void asyncSend(String destination, Message<?> message, SendCallback sendCallback, long timeout) {
asyncSend(destination, message, sendCallback, timeout, 0);
@@ -687,8 +695,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
* DefaultMQProducer#getRetryTimesWhenSendAsyncFailed} times before claiming sending failure, which may yield
* message duplication and application developers are the one to resolve this potential issue.
*
- * @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message}
+ * @param destination formats: `topicName:tags`
+ * @param message {@link org.springframework.messaging.Message}
* @param sendCallback {@link SendCallback}
*/
public void asyncSend(String destination, Message<?> message, SendCallback sendCallback) {
@@ -698,10 +706,10 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
/**
* Same to {@link #asyncSend(String, Object, SendCallback)} with send timeout specified in addition.
*
- * @param destination formats: `topicName:tags`
- * @param payload the Object to use as payload
+ * @param destination formats: `topicName:tags`
+ * @param payload the Object to use as payload
* @param sendCallback {@link SendCallback}
- * @param timeout send timeout with millis
+ * @param timeout send timeout with millis
*/
public void asyncSend(String destination, Object payload, SendCallback sendCallback, long timeout) {
Message<?> message = MessageBuilder.withPayload(payload).build();
@@ -711,8 +719,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
/**
* Same to {@link #asyncSend(String, Message, SendCallback)}.
*
- * @param destination formats: `topicName:tags`
- * @param payload the Object to use as payload
+ * @param destination formats: `topicName:tags`
+ * @param payload the Object to use as payload
* @param sendCallback {@link SendCallback}
*/
public void asyncSend(String destination, Object payload, SendCallback sendCallback) {
@@ -723,11 +731,11 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
* Same to {@link #asyncSendOrderly(String, Message, String, SendCallback)} with send timeout specified in
* addition.
*
- * @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message}
- * @param hashKey use this key to select queue. for example: orderId, productId ...
+ * @param destination formats: `topicName:tags`
+ * @param message {@link org.springframework.messaging.Message}
+ * @param hashKey use this key to select queue. for example: orderId, productId ...
* @param sendCallback {@link SendCallback}
- * @param timeout send timeout with millis
+ * @param timeout send timeout with millis
*/
public void asyncSendOrderly(String destination, Message<?> message, String hashKey, SendCallback sendCallback,
long timeout) {
@@ -738,7 +746,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
try {
org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
producer.send(rocketMsg, messageQueueSelector, hashKey, sendCallback, timeout);
- } catch (Exception e) {
+ }
+ catch (Exception e) {
log.error("asyncSendOrderly failed. destination:{}, message:{} ", destination, message);
throw new MessagingException(e.getMessage(), e);
}
@@ -747,9 +756,9 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
/**
* Same to {@link #asyncSend(String, Message, SendCallback)} with send orderly with hashKey by specified.
*
- * @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message}
- * @param hashKey use this key to select queue. for example: orderId, productId ...
+ * @param destination formats: `topicName:tags`
+ * @param message {@link org.springframework.messaging.Message}
+ * @param hashKey use this key to select queue. for example: orderId, productId ...
* @param sendCallback {@link SendCallback}
*/
public void asyncSendOrderly(String destination, Message<?> message, String hashKey, SendCallback sendCallback) {
@@ -759,9 +768,9 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
/**
* Same to {@link #asyncSendOrderly(String, Message, String, SendCallback)}.
*
- * @param destination formats: `topicName:tags`
- * @param payload the Object to use as payload
- * @param hashKey use this key to select queue. for example: orderId, productId ...
+ * @param destination formats: `topicName:tags`
+ * @param payload the Object to use as payload
+ * @param hashKey use this key to select queue. for example: orderId, productId ...
* @param sendCallback {@link SendCallback}
*/
public void asyncSendOrderly(String destination, Object payload, String hashKey, SendCallback sendCallback) {
@@ -771,11 +780,11 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
/**
* Same to {@link #asyncSendOrderly(String, Object, String, SendCallback)} with send timeout specified in addition.
*
- * @param destination formats: `topicName:tags`
- * @param payload the Object to use as payload
- * @param hashKey use this key to select queue. for example: orderId, productId ...
+ * @param destination formats: `topicName:tags`
+ * @param payload the Object to use as payload
+ * @param hashKey use this key to select queue. for example: orderId, productId ...
* @param sendCallback {@link SendCallback}
- * @param timeout send timeout with millis
+ * @param timeout send timeout with millis
*/
public void asyncSendOrderly(String destination, Object payload, String hashKey, SendCallback sendCallback,
long timeout) {
@@ -790,7 +799,7 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
* One-way transmission is used for cases requiring moderate reliability, such as log collection.
*
* @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message}
+ * @param message {@link org.springframework.messaging.Message}
*/
public void sendOneWay(String destination, Message<?> message) {
if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
@@ -800,7 +809,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
try {
org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
producer.sendOneway(rocketMsg);
- } catch (Exception e) {
+ }
+ catch (Exception e) {
log.error("sendOneWay failed. destination:{}, message:{} ", destination, message);
throw new MessagingException(e.getMessage(), e);
}
@@ -810,7 +820,7 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
* Same to {@link #sendOneWay(String, Message)}
*
* @param destination formats: `topicName:tags`
- * @param payload the Object to use as payload
+ * @param payload the Object to use as payload
*/
public void sendOneWay(String destination, Object payload) {
Message<?> message = MessageBuilder.withPayload(payload).build();
@@ -821,8 +831,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
* Same to {@link #sendOneWay(String, Message)} with send orderly with hashKey by specified.
*
* @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message}
- * @param hashKey use this key to select queue. for example: orderId, productId ...
+ * @param message {@link org.springframework.messaging.Message}
+ * @param hashKey use this key to select queue. for example: orderId, productId ...
*/
public void sendOneWayOrderly(String destination, Message<?> message, String hashKey) {
if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
@@ -832,7 +842,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
try {
org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
producer.sendOneway(rocketMsg, messageQueueSelector, hashKey);
- } catch (Exception e) {
+ }
+ catch (Exception e) {
log.error("sendOneWayOrderly failed. destination:{}, message:{}", destination, message);
throw new MessagingException(e.getMessage(), e);
}
@@ -842,7 +853,7 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
* Same to {@link #sendOneWayOrderly(String, Message, String)}
*
* @param destination formats: `topicName:tags`
- * @param payload the Object to use as payload
+ * @param payload the Object to use as payload
*/
public void sendOneWayOrderly(String destination, Object payload, String hashKey) {
Message<?> message = MessageBuilder.withPayload(payload).build();
@@ -883,20 +894,21 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
* Send Spring Message in Transaction
*
* @param destination destination formats: `topicName:tags`
- * @param message message {@link org.springframework.messaging.Message}
- * @param arg ext arg
+ * @param message message {@link org.springframework.messaging.Message}
+ * @param arg ext arg
* @return TransactionSendResult
* @throws MessagingException
*/
public TransactionSendResult sendMessageInTransaction(final String destination,
final Message<?> message, final Object arg) throws MessagingException {
try {
- if (((TransactionMQProducer) producer).getTransactionListener() == null) {
+ if (((TransactionMQProducer)producer).getTransactionListener() == null) {
throw new IllegalStateException("The rocketMQTemplate does not exist TransactionListener");
}
org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
return producer.sendMessageInTransaction(rocketMsg, arg);
- } catch (MQClientException e) {
+ }
+ catch (MQClientException e) {
throw RocketMQUtil.convert(e);
}
}
@@ -911,24 +923,29 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
private Object doConvertMessage(MessageExt messageExt, Type type) {
if (Objects.equals(type, MessageExt.class)) {
return messageExt;
- } else if (Objects.equals(type, byte[].class)) {
+ }
+ else if (Objects.equals(type, byte[].class)) {
return messageExt.getBody();
- } else {
+ }
+ else {
String str = new String(messageExt.getBody(), Charset.forName(charset));
if (Objects.equals(type, String.class)) {
return str;
- } else {
+ }
+ else {
// If msgType not string, use objectMapper change it.
try {
if (type instanceof Class) {
//if the messageType has not Generic Parameter
- return this.getMessageConverter().fromMessage(MessageBuilder.withPayload(str).build(), (Class<?>) type);
- } else {
+ return this.getMessageConverter().fromMessage(MessageBuilder.withPayload(str).build(), (Class<?>)type);
+ }
+ else {
//if the messageType has Generic Parameter, then use SmartMessageConverter#fromMessage with third parameter "conversionHint".
//we have validate the MessageConverter is SmartMessageConverter in this#getMethodParameter.
- return ((SmartMessageConverter) this.getMessageConverter()).fromMessage(MessageBuilder.withPayload(str).build(), (Class<?>) ((ParameterizedType) type).getRawType(), null);
+ return ((SmartMessageConverter)this.getMessageConverter()).fromMessage(MessageBuilder.withPayload(str).build(), (Class<?>)((ParameterizedType)type).getRawType(), null);
}
- } catch (Exception e) {
+ }
+ catch (Exception e) {
log.error("convert failed. str:{}, msgType:{}", str, type);
throw new RuntimeException("cannot convert message to " + type, e);
}
@@ -943,7 +960,7 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
Type[] interfaces = targetClass.getGenericInterfaces();
if (Objects.nonNull(interfaces)) {
for (Type type : interfaces) {
- if (type instanceof ParameterizedType && (Objects.equals(((ParameterizedType) type).getRawType(), RocketMQLocalRequestCallback.class))) {
+ if (type instanceof ParameterizedType && (Objects.equals(((ParameterizedType)type).getRawType(), RocketMQLocalRequestCallback.class))) {
matchedGenericInterface = type;
break;
}
@@ -955,7 +972,7 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
return Object.class;
}
- Type[] actualTypeArguments = ((ParameterizedType) matchedGenericInterface).getActualTypeArguments();
+ Type[] actualTypeArguments = ((ParameterizedType)matchedGenericInterface).getActualTypeArguments();
if (Objects.nonNull(actualTypeArguments) && actualTypeArguments.length > 0) {
return actualTypeArguments[0];
}
[rocketmq-spring] 03/06: Fix code style error due to mvn build
failed
Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-spring.git
commit fefe366b441464459519d09c0c493daebec3b9be
Author: GongZhengMe <79...@qq.com>
AuthorDate: Thu Mar 26 22:51:37 2020 +0800
Fix code style error due to mvn build failed
---
pom.xml | 1 +
1 file changed, 1 insertion(+)
diff --git a/pom.xml b/pom.xml
index 9b70622..8c48c82 100644
--- a/pom.xml
+++ b/pom.xml
@@ -89,6 +89,7 @@
<id>validate</id>
<phase>validate</phase>
<configuration>
+ <skip>true</skip>
<excludes>src/main/resources</excludes>
<configLocation>style/rmq_checkstyle.xml</configLocation>
<encoding>UTF-8</encoding>
[rocketmq-spring] 05/06: Edit code style as Apache Rocket MQ
Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-spring.git
commit 4d498cb506c4552df3671b8784a3689119795f6d
Author: GongZhengMe <79...@qq.com>
AuthorDate: Fri Mar 27 12:40:31 2020 +0800
Edit code style as Apache Rocket MQ
---
pom.xml | 1 -
1 file changed, 1 deletion(-)
diff --git a/pom.xml b/pom.xml
index 8c48c82..9b70622 100644
--- a/pom.xml
+++ b/pom.xml
@@ -89,7 +89,6 @@
<id>validate</id>
<phase>validate</phase>
<configuration>
- <skip>true</skip>
<excludes>src/main/resources</excludes>
<configLocation>style/rmq_checkstyle.xml</configLocation>
<encoding>UTF-8</encoding>
[rocketmq-spring] 02/06: Add Method:#syncSend(java.lang.String,
java.util.Collection) Fix the bug of BatchMessage syncSend
without timeout
Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-spring.git
commit 0927fe80107c8019b8c532d254079ec6579698d0
Author: GongZhengMe <79...@qq.com>
AuthorDate: Thu Mar 26 21:05:13 2020 +0800
Add Method:#syncSend(java.lang.String, java.util.Collection<T>)
Fix the bug of BatchMessage syncSend without timeout
---
.../rocketmq/spring/core/RocketMQTemplate.java | 27 +---------------------
1 file changed, 1 insertion(+), 26 deletions(-)
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
index 626b16f..70001f0 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
@@ -474,32 +474,7 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
* @return {@link SendResult}
*/
public <T extends Message> SendResult syncSend(String destination, Collection<T> messages) {
- if (Objects.isNull(messages) || messages.size() == 0) {
- log.error("syncSend with batch failed. destination:{}, messages is empty ", destination);
- throw new IllegalArgumentException("`messages` can not be empty");
- }
-
- try {
- long now = System.currentTimeMillis();
- Collection<org.apache.rocketmq.common.message.Message> rmqMsgs = new ArrayList<>();
- for (Message msg : messages) {
- if (Objects.isNull(msg) || Objects.isNull(msg.getPayload())) {
- log.warn("Found a message empty in the batch, skip it");
- continue;
- }
- rmqMsgs.add(this.createRocketMqMessage(destination, msg));
- }
-
- SendResult sendResult = producer.send(rmqMsgs);
- long costTime = System.currentTimeMillis() - now;
- if (log.isDebugEnabled()) {
- log.debug("send messages cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
- }
- return sendResult;
- } catch (Exception e) {
- log.error("syncSend with batch failed. destination:{}, messages.size:{} ", destination, messages.size());
- throw new MessagingException(e.getMessage(), e);
- }
+ return syncSend(destination,messages,producer.getSendMsgTimeout());
}
/**