You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2020/03/31 13:12:05 UTC
[rocketmq-spring] 01/06: Revert "Edit code style as Apache Rocket
MQ"
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch revert-244-BatchMsgBranch
in repository https://gitbox.apache.org/repos/asf/rocketmq-spring.git
commit 357987d119ee6e15c928fa26b9f3ada21d810c69
Author: rongtong <ji...@163.com>
AuthorDate: Tue Mar 31 21:11:57 2020 +0800
Revert "Edit code style as Apache Rocket MQ"
This reverts commit 7d50974182952c68fc298ef150274cba38597c03.
---
.../rocketmq/spring/core/RocketMQTemplate.java | 349 +++++++++++----------
1 file changed, 183 insertions(+), 166 deletions(-)
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
index 6683f10..1ac4e78 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
@@ -88,8 +88,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
/**
* @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message} the message to be sent.
- * @param type The type of T
+ * @param message {@link org.springframework.messaging.Message} the message to be sent.
+ * @param type The type of T
* @return
*/
public <T> T sendAndReceive(String destination, Message<?> message, Type type) {
@@ -98,8 +98,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
/**
* @param destination formats: `topicName:tags`
- * @param payload the payload to be sent.
- * @param type The type of T
+ * @param payload the payload to be sent.
+ * @param type The type of T
* @return
*/
public <T> T sendAndReceive(String destination, Object payload, Type type) {
@@ -108,9 +108,9 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
/**
* @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message} the message to be sent.
- * @param type The type of T
- * @param timeout send timeout in millis
+ * @param message {@link org.springframework.messaging.Message} the message to be sent.
+ * @param type The type of T
+ * @param timeout send timeout in millis
* @return
*/
public <T> T sendAndReceive(String destination, Message<?> message, Type type, long timeout) {
@@ -119,9 +119,9 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
/**
* @param destination formats: `topicName:tags`
- * @param payload the payload to be sent.
- * @param type The type of T
- * @param timeout send timeout in millis
+ * @param payload the payload to be sent.
+ * @param type The type of T
+ * @param timeout send timeout in millis
* @return
*/
public <T> T sendAndReceive(String destination, Object payload, Type type, long timeout) {
@@ -130,10 +130,10 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
/**
* @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message} the message to be sent.
- * @param type The type of T
- * @param timeout send timeout in millis
- * @param delayLevel message delay level(0 means no delay)
+ * @param message {@link org.springframework.messaging.Message} the message to be sent.
+ * @param type The type of T
+ * @param timeout send timeout in millis
+ * @param delayLevel message delay level(0 means no delay)
* @return
*/
public <T> T sendAndReceive(String destination, Message<?> message, Type type, long timeout, int delayLevel) {
@@ -142,10 +142,10 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
/**
* @param destination formats: `topicName:tags`
- * @param payload the payload to be sent.
- * @param type The type of T
- * @param timeout send timeout in millis
- * @param delayLevel message delay level(0 means no delay)
+ * @param payload the payload to be sent.
+ * @param type The type of T
+ * @param timeout send timeout in millis
+ * @param delayLevel message delay level(0 means no delay)
* @return
*/
public <T> T sendAndReceive(String destination, Object payload, Type type, long timeout, int delayLevel) {
@@ -154,9 +154,9 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
/**
* @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message} the message to be sent.
- * @param type The type of T
- * @param hashKey needed when sending message orderly
+ * @param message {@link org.springframework.messaging.Message} the message to be sent.
+ * @param type The type of T
+ * @param hashKey needed when sending message orderly
* @return
*/
public <T> T sendAndReceive(String destination, Message<?> message, Type type, String hashKey) {
@@ -165,9 +165,9 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
/**
* @param destination formats: `topicName:tags`
- * @param payload the payload to be sent.
- * @param type The type of T
- * @param hashKey needed when sending message orderly
+ * @param payload the payload to be sent.
+ * @param type The type of T
+ * @param hashKey needed when sending message orderly
* @return
*/
public <T> T sendAndReceive(String destination, Object payload, Type type, String hashKey) {
@@ -176,10 +176,10 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
/**
* @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message} the message to be sent.
- * @param type The type of T
- * @param hashKey needed when sending message orderly
- * @param timeout send timeout in millis
+ * @param message {@link org.springframework.messaging.Message} the message to be sent.
+ * @param type The type of T
+ * @param hashKey needed when sending message orderly
+ * @param timeout send timeout in millis
* @return
*/
public <T> T sendAndReceive(String destination, Message<?> message, Type type, String hashKey, long timeout) {
@@ -188,8 +188,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
/**
* @param destination formats: `topicName:tags`
- * @param payload the payload to be sent.
- * @param type The type of T
+ * @param payload the payload to be sent.
+ * @param type The type of T
* @param hashKey
* @return
*/
@@ -199,11 +199,11 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
/**
* @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message} the message to be sent.
- * @param type The type that receive
- * @param hashKey needed when sending message orderly
- * @param timeout send timeout in millis
- * @param delayLevel message delay level(0 means no delay)
+ * @param message {@link org.springframework.messaging.Message} the message to be sent.
+ * @param type The type that receive
+ * @param hashKey needed when sending message orderly
+ * @param timeout send timeout in millis
+ * @param delayLevel message delay level(0 means no delay)
* @return
*/
public <T> T sendAndReceive(String destination, Message<?> message, Type type, String hashKey,
@@ -221,12 +221,14 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
MessageExt replyMessage;
if (Objects.isNull(hashKey) || hashKey.isEmpty()) {
- replyMessage = (MessageExt) producer.request(rocketMsg, timeout);
- } else {
- replyMessage = (MessageExt) producer.request(rocketMsg, messageQueueSelector, hashKey, timeout);
+ replyMessage = (MessageExt)producer.request(rocketMsg, timeout);
}
- return replyMessage != null ? (T) doConvertMessage(replyMessage, type) : null;
- } catch (Exception e) {
+ else {
+ replyMessage = (MessageExt)producer.request(rocketMsg, messageQueueSelector, hashKey, timeout);
+ }
+ return replyMessage != null ? (T)doConvertMessage(replyMessage, type) : null;
+ }
+ catch (Exception e) {
log.error("send request message failed. destination:{}, message:{} ", destination, message);
throw new MessagingException(e.getMessage(), e);
}
@@ -234,11 +236,11 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
/**
* @param destination formats: `topicName:tags`
- * @param payload the payload to be sent.
- * @param type The type that receive
- * @param hashKey needed when sending message orderly
- * @param timeout send timeout in millis
- * @param delayLevel message delay level(0 means no delay)
+ * @param payload the payload to be sent.
+ * @param type The type that receive
+ * @param hashKey needed when sending message orderly
+ * @param timeout send timeout in millis
+ * @param delayLevel message delay level(0 means no delay)
* @return
*/
public <T> T sendAndReceive(String destination, Object payload, Type type, String hashKey,
@@ -248,8 +250,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
}
/**
- * @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message} the message to be sent.
+ * @param destination formats: `topicName:tags`
+ * @param message {@link org.springframework.messaging.Message} the message to be sent.
* @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
* @return
*/
@@ -259,8 +261,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
}
/**
- * @param destination formats: `topicName:tags`
- * @param payload the payload to be sent.
+ * @param destination formats: `topicName:tags`
+ * @param payload the payload to be sent.
* @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
* @return
*/
@@ -270,10 +272,10 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
}
/**
- * @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message} the message to be sent.
+ * @param destination formats: `topicName:tags`
+ * @param message {@link org.springframework.messaging.Message} the message to be sent.
* @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
- * @param timeout send timeout in millis
+ * @param timeout send timeout in millis
* @return
*/
public void sendAndReceive(String destination, Message<?> message,
@@ -282,10 +284,10 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
}
/**
- * @param destination formats: `topicName:tags`
- * @param payload the payload to be sent.
+ * @param destination formats: `topicName:tags`
+ * @param payload the payload to be sent.
* @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
- * @param timeout send timeout in millis
+ * @param timeout send timeout in millis
* @return
*/
public void sendAndReceive(String destination, Object payload,
@@ -294,11 +296,11 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
}
/**
- * @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message} the message to be sent.
+ * @param destination formats: `topicName:tags`
+ * @param message {@link org.springframework.messaging.Message} the message to be sent.
* @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
- * @param timeout send timeout in millis
- * @param delayLevel message delay level(0 means no delay)
+ * @param timeout send timeout in millis
+ * @param delayLevel message delay level(0 means no delay)
* @return
*/
public void sendAndReceive(String destination, Message<?> message,
@@ -307,10 +309,10 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
}
/**
- * @param destination formats: `topicName:tags`
- * @param payload the payload to be sent.
+ * @param destination formats: `topicName:tags`
+ * @param payload the payload to be sent.
* @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
- * @param hashKey needed when sending message orderly
+ * @param hashKey needed when sending message orderly
* @return
*/
public void sendAndReceive(String destination, Object payload,
@@ -319,11 +321,11 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
}
/**
- * @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message} the message to be sent.
+ * @param destination formats: `topicName:tags`
+ * @param message {@link org.springframework.messaging.Message} the message to be sent.
* @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
- * @param hashKey needed when sending message orderly
- * @param timeout send timeout in millis
+ * @param hashKey needed when sending message orderly
+ * @param timeout send timeout in millis
* @return
*/
public void sendAndReceive(String destination, Message<?> message,
@@ -332,11 +334,11 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
}
/**
- * @param destination formats: `topicName:tags`
- * @param payload the payload to be sent.
+ * @param destination formats: `topicName:tags`
+ * @param payload the payload to be sent.
* @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
- * @param hashKey needed when sending message orderly
- * @param timeout send timeout in millis
+ * @param hashKey needed when sending message orderly
+ * @param timeout send timeout in millis
* @return
*/
public void sendAndReceive(String destination, Object payload,
@@ -345,10 +347,10 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
}
/**
- * @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message} the message to be sent.
+ * @param destination formats: `topicName:tags`
+ * @param message {@link org.springframework.messaging.Message} the message to be sent.
* @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
- * @param hashKey needed when sending message orderly
+ * @param hashKey needed when sending message orderly
* @return
*/
public void sendAndReceive(String destination, Message<?> message,
@@ -357,11 +359,11 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
}
/**
- * @param destination formats: `topicName:tags`
- * @param payload the payload to be sent.
+ * @param destination formats: `topicName:tags`
+ * @param payload the payload to be sent.
* @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
- * @param timeout send timeout in millis
- * @param delayLevel message delay level(0 means no delay)
+ * @param timeout send timeout in millis
+ * @param delayLevel message delay level(0 means no delay)
* @return
*/
public void sendAndReceive(String destination, Object payload,
@@ -370,12 +372,12 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
}
/**
- * @param destination formats: `topicName:tags`
- * @param payload the payload to be sent.
+ * @param destination formats: `topicName:tags`
+ * @param payload the payload to be sent.
* @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
- * @param hashKey needed when sending message orderly
- * @param timeout send timeout in millis
- * @param delayLevel message delay level(0 means no delay)
+ * @param hashKey needed when sending message orderly
+ * @param timeout send timeout in millis
+ * @param delayLevel message delay level(0 means no delay)
* @return
*/
public void sendAndReceive(String destination, Object payload,
@@ -388,12 +390,12 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
* Send request message in asynchronous mode. </p> This method returns immediately. On receiving reply message,
* <code>rocketMQLocalRequestCallback</code> will be executed. </p>
*
- * @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message} the message to be sent.
+ * @param destination formats: `topicName:tags`
+ * @param message {@link org.springframework.messaging.Message} the message to be sent.
* @param rocketMQLocalRequestCallback callback that will invoked when reply message received.
- * @param hashKey needed when sending message orderly
- * @param timeout send timeout in millis
- * @param delayLevel message delay level(0 means no delay)
+ * @param hashKey needed when sending message orderly
+ * @param timeout send timeout in millis
+ * @param delayLevel message delay level(0 means no delay)
* @return
*/
public void sendAndReceive(String destination, Message<?> message,
@@ -415,7 +417,7 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
if (rocketMQLocalRequestCallback != null) {
requestCallback = new RequestCallback() {
@Override public void onSuccess(org.apache.rocketmq.common.message.Message message) {
- rocketMQLocalRequestCallback.onSuccess(doConvertMessage((MessageExt) message, getMessageType(rocketMQLocalRequestCallback)));
+ rocketMQLocalRequestCallback.onSuccess(doConvertMessage((MessageExt)message, getMessageType(rocketMQLocalRequestCallback)));
}
@Override public void onException(Throwable e) {
@@ -425,10 +427,12 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
}
if (Objects.isNull(hashKey) || hashKey.isEmpty()) {
producer.request(rocketMsg, requestCallback, timeout);
- } else {
+ }
+ else {
producer.request(rocketMsg, messageQueueSelector, hashKey, requestCallback, timeout);
}
- } catch (
+ }
+ catch (
Exception e) {
log.error("send request message failed. destination:{}, message:{} ", destination, message);
throw new MessagingException(e.getMessage(), e);
@@ -447,7 +451,7 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
* duplication issue.
*
* @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message}
+ * @param message {@link org.springframework.messaging.Message}
* @return {@link SendResult}
*/
public SendResult syncSend(String destination, Message<?> message) {
@@ -458,8 +462,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
* Same to {@link #syncSend(String, Message)} with send timeout specified in addition.
*
* @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message}
- * @param timeout send timeout with millis
+ * @param message {@link org.springframework.messaging.Message}
+ * @param timeout send timeout with millis
* @return {@link SendResult}
*/
public SendResult syncSend(String destination, Message<?> message, long timeout) {
@@ -470,7 +474,7 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
* syncSend batch messages
*
* @param destination formats: `topicName:tags`
- * @param messages Collection of {@link org.springframework.messaging.Message}
+ * @param messages Collection of {@link org.springframework.messaging.Message}
* @return {@link SendResult}
*/
public <T extends Message> SendResult syncSend(String destination, Collection<T> messages) {
@@ -481,8 +485,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
* syncSend batch messages in a given timeout.
*
* @param destination formats: `topicName:tags`
- * @param messages Collection of {@link org.springframework.messaging.Message}
- * @param timeout send timeout with millis
+ * @param messages Collection of {@link org.springframework.messaging.Message}
+ * @param timeout send timeout with millis
* @return {@link SendResult}
*/
public <T extends Message> SendResult syncSend(String destination, Collection<T> messages, long timeout) {
@@ -508,7 +512,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
log.debug("send messages cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
}
return sendResult;
- } catch (Exception e) {
+ }
+ catch (Exception e) {
log.error("syncSend with batch failed. destination:{}, messages.size:{} ", destination, messages.size());
throw new MessagingException(e.getMessage(), e);
}
@@ -518,9 +523,9 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
* Same to {@link #syncSend(String, Message)} with send timeout specified in addition.
*
* @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message}
- * @param timeout send timeout with millis
- * @param delayLevel level for the delay message
+ * @param message {@link org.springframework.messaging.Message}
+ * @param timeout send timeout with millis
+ * @param delayLevel level for the delay message
* @return {@link SendResult}
*/
public SendResult syncSend(String destination, Message<?> message, long timeout, int delayLevel) {
@@ -540,7 +545,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
}
return sendResult;
- } catch (Exception e) {
+ }
+ catch (Exception e) {
log.error("syncSend failed. destination:{}, message:{} ", destination, message);
throw new MessagingException(e.getMessage(), e);
}
@@ -550,7 +556,7 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
* Same to {@link #syncSend(String, Message)}.
*
* @param destination formats: `topicName:tags`
- * @param payload the Object to use as payload
+ * @param payload the Object to use as payload
* @return {@link SendResult}
*/
public SendResult syncSend(String destination, Object payload) {
@@ -561,8 +567,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
* Same to {@link #syncSend(String, Object)} with send timeout specified in addition.
*
* @param destination formats: `topicName:tags`
- * @param payload the Object to use as payload
- * @param timeout send timeout with millis
+ * @param payload the Object to use as payload
+ * @param timeout send timeout with millis
* @return {@link SendResult}
*/
public SendResult syncSend(String destination, Object payload, long timeout) {
@@ -574,8 +580,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
* Same to {@link #syncSend(String, Message)} with send orderly with hashKey by specified.
*
* @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message}
- * @param hashKey use this key to select queue. for example: orderId, productId ...
+ * @param message {@link org.springframework.messaging.Message}
+ * @param hashKey use this key to select queue. for example: orderId, productId ...
* @return {@link SendResult}
*/
public SendResult syncSendOrderly(String destination, Message<?> message, String hashKey) {
@@ -586,9 +592,9 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
* Same to {@link #syncSendOrderly(String, Message, String)} with send timeout specified in addition.
*
* @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message}
- * @param hashKey use this key to select queue. for example: orderId, productId ...
- * @param timeout send timeout with millis
+ * @param message {@link org.springframework.messaging.Message}
+ * @param hashKey use this key to select queue. for example: orderId, productId ...
+ * @param timeout send timeout with millis
* @return {@link SendResult}
*/
public SendResult syncSendOrderly(String destination, Message<?> message, String hashKey, long timeout) {
@@ -605,7 +611,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
}
return sendResult;
- } catch (Exception e) {
+ }
+ catch (Exception e) {
log.error("syncSendOrderly failed. destination:{}, message:{} ", destination, message);
throw new MessagingException(e.getMessage(), e);
}
@@ -615,8 +622,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
* Same to {@link #syncSend(String, Object)} with send orderly with hashKey by specified.
*
* @param destination formats: `topicName:tags`
- * @param payload the Object to use as payload
- * @param hashKey use this key to select queue. for example: orderId, productId ...
+ * @param payload the Object to use as payload
+ * @param hashKey use this key to select queue. for example: orderId, productId ...
* @return {@link SendResult}
*/
public SendResult syncSendOrderly(String destination, Object payload, String hashKey) {
@@ -627,9 +634,9 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
* Same to {@link #syncSendOrderly(String, Object, String)} with send timeout specified in addition.
*
* @param destination formats: `topicName:tags`
- * @param payload the Object to use as payload
- * @param hashKey use this key to select queue. for example: orderId, productId ...
- * @param timeout send timeout with millis
+ * @param payload the Object to use as payload
+ * @param hashKey use this key to select queue. for example: orderId, productId ...
+ * @param timeout send timeout with millis
* @return {@link SendResult}
*/
public SendResult syncSendOrderly(String destination, Object payload, String hashKey, long timeout) {
@@ -641,11 +648,11 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
* Same to {@link #asyncSend(String, Message, SendCallback)} with send timeout and delay level specified in
* addition.
*
- * @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message}
+ * @param destination formats: `topicName:tags`
+ * @param message {@link org.springframework.messaging.Message}
* @param sendCallback {@link SendCallback}
- * @param timeout send timeout with millis
- * @param delayLevel level for the delay message
+ * @param timeout send timeout with millis
+ * @param delayLevel level for the delay message
*/
public void asyncSend(String destination, Message<?> message, SendCallback sendCallback, long timeout,
int delayLevel) {
@@ -659,7 +666,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
rocketMsg.setDelayTimeLevel(delayLevel);
}
producer.send(rocketMsg, sendCallback, timeout);
- } catch (Exception e) {
+ }
+ catch (Exception e) {
log.info("asyncSend failed. destination:{}, message:{} ", destination, message);
throw new MessagingException(e.getMessage(), e);
}
@@ -668,10 +676,10 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
/**
* Same to {@link #asyncSend(String, Message, SendCallback)} with send timeout specified in addition.
*
- * @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message}
+ * @param destination formats: `topicName:tags`
+ * @param message {@link org.springframework.messaging.Message}
* @param sendCallback {@link SendCallback}
- * @param timeout send timeout with millis
+ * @param timeout send timeout with millis
*/
public void asyncSend(String destination, Message<?> message, SendCallback sendCallback, long timeout) {
asyncSend(destination, message, sendCallback, timeout, 0);
@@ -687,8 +695,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
* DefaultMQProducer#getRetryTimesWhenSendAsyncFailed} times before claiming sending failure, which may yield
* message duplication and application developers are the one to resolve this potential issue.
*
- * @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message}
+ * @param destination formats: `topicName:tags`
+ * @param message {@link org.springframework.messaging.Message}
* @param sendCallback {@link SendCallback}
*/
public void asyncSend(String destination, Message<?> message, SendCallback sendCallback) {
@@ -698,10 +706,10 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
/**
* Same to {@link #asyncSend(String, Object, SendCallback)} with send timeout specified in addition.
*
- * @param destination formats: `topicName:tags`
- * @param payload the Object to use as payload
+ * @param destination formats: `topicName:tags`
+ * @param payload the Object to use as payload
* @param sendCallback {@link SendCallback}
- * @param timeout send timeout with millis
+ * @param timeout send timeout with millis
*/
public void asyncSend(String destination, Object payload, SendCallback sendCallback, long timeout) {
Message<?> message = MessageBuilder.withPayload(payload).build();
@@ -711,8 +719,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
/**
* Same to {@link #asyncSend(String, Message, SendCallback)}.
*
- * @param destination formats: `topicName:tags`
- * @param payload the Object to use as payload
+ * @param destination formats: `topicName:tags`
+ * @param payload the Object to use as payload
* @param sendCallback {@link SendCallback}
*/
public void asyncSend(String destination, Object payload, SendCallback sendCallback) {
@@ -723,11 +731,11 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
* Same to {@link #asyncSendOrderly(String, Message, String, SendCallback)} with send timeout specified in
* addition.
*
- * @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message}
- * @param hashKey use this key to select queue. for example: orderId, productId ...
+ * @param destination formats: `topicName:tags`
+ * @param message {@link org.springframework.messaging.Message}
+ * @param hashKey use this key to select queue. for example: orderId, productId ...
* @param sendCallback {@link SendCallback}
- * @param timeout send timeout with millis
+ * @param timeout send timeout with millis
*/
public void asyncSendOrderly(String destination, Message<?> message, String hashKey, SendCallback sendCallback,
long timeout) {
@@ -738,7 +746,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
try {
org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
producer.send(rocketMsg, messageQueueSelector, hashKey, sendCallback, timeout);
- } catch (Exception e) {
+ }
+ catch (Exception e) {
log.error("asyncSendOrderly failed. destination:{}, message:{} ", destination, message);
throw new MessagingException(e.getMessage(), e);
}
@@ -747,9 +756,9 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
/**
* Same to {@link #asyncSend(String, Message, SendCallback)} with send orderly with hashKey by specified.
*
- * @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message}
- * @param hashKey use this key to select queue. for example: orderId, productId ...
+ * @param destination formats: `topicName:tags`
+ * @param message {@link org.springframework.messaging.Message}
+ * @param hashKey use this key to select queue. for example: orderId, productId ...
* @param sendCallback {@link SendCallback}
*/
public void asyncSendOrderly(String destination, Message<?> message, String hashKey, SendCallback sendCallback) {
@@ -759,9 +768,9 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
/**
* Same to {@link #asyncSendOrderly(String, Message, String, SendCallback)}.
*
- * @param destination formats: `topicName:tags`
- * @param payload the Object to use as payload
- * @param hashKey use this key to select queue. for example: orderId, productId ...
+ * @param destination formats: `topicName:tags`
+ * @param payload the Object to use as payload
+ * @param hashKey use this key to select queue. for example: orderId, productId ...
* @param sendCallback {@link SendCallback}
*/
public void asyncSendOrderly(String destination, Object payload, String hashKey, SendCallback sendCallback) {
@@ -771,11 +780,11 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
/**
* Same to {@link #asyncSendOrderly(String, Object, String, SendCallback)} with send timeout specified in addition.
*
- * @param destination formats: `topicName:tags`
- * @param payload the Object to use as payload
- * @param hashKey use this key to select queue. for example: orderId, productId ...
+ * @param destination formats: `topicName:tags`
+ * @param payload the Object to use as payload
+ * @param hashKey use this key to select queue. for example: orderId, productId ...
* @param sendCallback {@link SendCallback}
- * @param timeout send timeout with millis
+ * @param timeout send timeout with millis
*/
public void asyncSendOrderly(String destination, Object payload, String hashKey, SendCallback sendCallback,
long timeout) {
@@ -790,7 +799,7 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
* One-way transmission is used for cases requiring moderate reliability, such as log collection.
*
* @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message}
+ * @param message {@link org.springframework.messaging.Message}
*/
public void sendOneWay(String destination, Message<?> message) {
if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
@@ -800,7 +809,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
try {
org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
producer.sendOneway(rocketMsg);
- } catch (Exception e) {
+ }
+ catch (Exception e) {
log.error("sendOneWay failed. destination:{}, message:{} ", destination, message);
throw new MessagingException(e.getMessage(), e);
}
@@ -810,7 +820,7 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
* Same to {@link #sendOneWay(String, Message)}
*
* @param destination formats: `topicName:tags`
- * @param payload the Object to use as payload
+ * @param payload the Object to use as payload
*/
public void sendOneWay(String destination, Object payload) {
Message<?> message = MessageBuilder.withPayload(payload).build();
@@ -821,8 +831,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
* Same to {@link #sendOneWay(String, Message)} with send orderly with hashKey by specified.
*
* @param destination formats: `topicName:tags`
- * @param message {@link org.springframework.messaging.Message}
- * @param hashKey use this key to select queue. for example: orderId, productId ...
+ * @param message {@link org.springframework.messaging.Message}
+ * @param hashKey use this key to select queue. for example: orderId, productId ...
*/
public void sendOneWayOrderly(String destination, Message<?> message, String hashKey) {
if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
@@ -832,7 +842,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
try {
org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
producer.sendOneway(rocketMsg, messageQueueSelector, hashKey);
- } catch (Exception e) {
+ }
+ catch (Exception e) {
log.error("sendOneWayOrderly failed. destination:{}, message:{}", destination, message);
throw new MessagingException(e.getMessage(), e);
}
@@ -842,7 +853,7 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
* Same to {@link #sendOneWayOrderly(String, Message, String)}
*
* @param destination formats: `topicName:tags`
- * @param payload the Object to use as payload
+ * @param payload the Object to use as payload
*/
public void sendOneWayOrderly(String destination, Object payload, String hashKey) {
Message<?> message = MessageBuilder.withPayload(payload).build();
@@ -883,20 +894,21 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
* Send Spring Message in Transaction
*
* @param destination destination formats: `topicName:tags`
- * @param message message {@link org.springframework.messaging.Message}
- * @param arg ext arg
+ * @param message message {@link org.springframework.messaging.Message}
+ * @param arg ext arg
* @return TransactionSendResult
* @throws MessagingException
*/
public TransactionSendResult sendMessageInTransaction(final String destination,
final Message<?> message, final Object arg) throws MessagingException {
try {
- if (((TransactionMQProducer) producer).getTransactionListener() == null) {
+ if (((TransactionMQProducer)producer).getTransactionListener() == null) {
throw new IllegalStateException("The rocketMQTemplate does not exist TransactionListener");
}
org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
return producer.sendMessageInTransaction(rocketMsg, arg);
- } catch (MQClientException e) {
+ }
+ catch (MQClientException e) {
throw RocketMQUtil.convert(e);
}
}
@@ -911,24 +923,29 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
private Object doConvertMessage(MessageExt messageExt, Type type) {
if (Objects.equals(type, MessageExt.class)) {
return messageExt;
- } else if (Objects.equals(type, byte[].class)) {
+ }
+ else if (Objects.equals(type, byte[].class)) {
return messageExt.getBody();
- } else {
+ }
+ else {
String str = new String(messageExt.getBody(), Charset.forName(charset));
if (Objects.equals(type, String.class)) {
return str;
- } else {
+ }
+ else {
// If msgType not string, use objectMapper change it.
try {
if (type instanceof Class) {
//if the messageType has not Generic Parameter
- return this.getMessageConverter().fromMessage(MessageBuilder.withPayload(str).build(), (Class<?>) type);
- } else {
+ return this.getMessageConverter().fromMessage(MessageBuilder.withPayload(str).build(), (Class<?>)type);
+ }
+ else {
//if the messageType has Generic Parameter, then use SmartMessageConverter#fromMessage with third parameter "conversionHint".
//we have validate the MessageConverter is SmartMessageConverter in this#getMethodParameter.
- return ((SmartMessageConverter) this.getMessageConverter()).fromMessage(MessageBuilder.withPayload(str).build(), (Class<?>) ((ParameterizedType) type).getRawType(), null);
+ return ((SmartMessageConverter)this.getMessageConverter()).fromMessage(MessageBuilder.withPayload(str).build(), (Class<?>)((ParameterizedType)type).getRawType(), null);
}
- } catch (Exception e) {
+ }
+ catch (Exception e) {
log.error("convert failed. str:{}, msgType:{}", str, type);
throw new RuntimeException("cannot convert message to " + type, e);
}
@@ -943,7 +960,7 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
Type[] interfaces = targetClass.getGenericInterfaces();
if (Objects.nonNull(interfaces)) {
for (Type type : interfaces) {
- if (type instanceof ParameterizedType && (Objects.equals(((ParameterizedType) type).getRawType(), RocketMQLocalRequestCallback.class))) {
+ if (type instanceof ParameterizedType && (Objects.equals(((ParameterizedType)type).getRawType(), RocketMQLocalRequestCallback.class))) {
matchedGenericInterface = type;
break;
}
@@ -955,10 +972,10 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
return Object.class;
}
- Type[] actualTypeArguments = ((ParameterizedType) matchedGenericInterface).getActualTypeArguments();
+ Type[] actualTypeArguments = ((ParameterizedType)matchedGenericInterface).getActualTypeArguments();
if (Objects.nonNull(actualTypeArguments) && actualTypeArguments.length > 0) {
return actualTypeArguments[0];
}
return Object.class;
}
-}
\ No newline at end of file
+}